From e2ca91c567b8e29dc33af9f7fa14e2d9a18adf09 Mon Sep 17 00:00:00 2001 From: Nick Ninov Date: Tue, 15 Jul 2025 20:59:18 +0100 Subject: [PATCH 01/10] Reddit Source downloader - Subreddit Posts - completed - Subreddit Votes - completed - Subreddit Comments - in development - Output fields are based on Dagster process --- source-reddit-fetcher/Dockerfile | 11 + source-reddit-fetcher/main.py | 4 + source-reddit-fetcher/metadata.yaml | 25 ++ source-reddit-fetcher/requirements.txt | 1 + .../sample_files/config-example.json | 7 + .../sample_files/configured_catalog.json | 32 +++ source-reddit-fetcher/setup.py | 35 +++ .../source_reddit_fetcher/__init__.py | 0 .../source_reddit_fetcher/run.py | 12 + .../schemas/subreddit_posts.json | 44 ++++ .../schemas/subreddit_votes.json | 31 +++ .../source_reddit_fetcher/source.py | 247 ++++++++++++++++++ .../source_reddit_fetcher/spec.yaml | 23 ++ 13 files changed, 472 insertions(+) create mode 100644 source-reddit-fetcher/Dockerfile create mode 100644 source-reddit-fetcher/main.py create mode 100644 source-reddit-fetcher/metadata.yaml create mode 100644 source-reddit-fetcher/requirements.txt create mode 100644 source-reddit-fetcher/sample_files/config-example.json create mode 100644 source-reddit-fetcher/sample_files/configured_catalog.json create mode 100644 source-reddit-fetcher/setup.py create mode 100644 source-reddit-fetcher/source_reddit_fetcher/__init__.py create mode 100644 source-reddit-fetcher/source_reddit_fetcher/run.py create mode 100644 source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_posts.json create mode 100644 source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_votes.json create mode 100644 source-reddit-fetcher/source_reddit_fetcher/source.py create mode 100644 source-reddit-fetcher/source_reddit_fetcher/spec.yaml diff --git a/source-reddit-fetcher/Dockerfile b/source-reddit-fetcher/Dockerfile new file mode 100644 index 0000000..fce999e --- /dev/null +++ b/source-reddit-fetcher/Dockerfile @@ -0,0 +1,11 @@ +FROM airbyte/python-connector-base:1.1.0 + +COPY . ./airbyte/integration_code +RUN pip install ./airbyte/integration_code + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# The entrypoint and default env vars are already set in the base image +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] diff --git a/source-reddit-fetcher/main.py b/source-reddit-fetcher/main.py new file mode 100644 index 0000000..e5e1fdf --- /dev/null +++ b/source-reddit-fetcher/main.py @@ -0,0 +1,4 @@ +from source_reddit_fetcher.run import run + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/source-reddit-fetcher/metadata.yaml b/source-reddit-fetcher/metadata.yaml new file mode 100644 index 0000000..c3062dc --- /dev/null +++ b/source-reddit-fetcher/metadata.yaml @@ -0,0 +1,25 @@ +data: + allowedHosts: + registries: + oss: + enabled: false + cloud: + enabled: false + connectorBuildOptions: + baseImage: docker.io/airbyte/python-connector-base:1.0.0@sha256:dd17e347fbda94f7c3abff539be298a65af2d7fc27a307d89297df1081a45c27 + connectorSubtype: api + connectorType: source + definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920 + dockerImageTag: '0.0.2' + dockerRepository: harbor.status.im/bi/airbyte/source-reddit-fetcher + githubIssueLabel: source-reddit-fetcher + icon: twitter-fetcher.svg + license: MIT + name: Reddit Data Extractor + releaseDate: TODO + supportLevel: community + releaseStage: alpha + documentationUrl: https://docs.airbyte.com/integrations/sources/twitter-fetcher + tags: + - language:python +metadataSpecVersion: "1.0" diff --git a/source-reddit-fetcher/requirements.txt b/source-reddit-fetcher/requirements.txt new file mode 100644 index 0000000..1411a4a --- /dev/null +++ b/source-reddit-fetcher/requirements.txt @@ -0,0 +1 @@ +pandas \ No newline at end of file diff --git a/source-reddit-fetcher/sample_files/config-example.json b/source-reddit-fetcher/sample_files/config-example.json new file mode 100644 index 0000000..a0beddb --- /dev/null +++ b/source-reddit-fetcher/sample_files/config-example.json @@ -0,0 +1,7 @@ +{ + "client_id": "your_reddit_client_id", + "client_secret": "your_reddit_client_secret", + "username": "your_reddit_username", + "subreddit": "subreddit_name_to_be_monitored", + "days": 31 +} \ No newline at end of file diff --git a/source-reddit-fetcher/sample_files/configured_catalog.json b/source-reddit-fetcher/sample_files/configured_catalog.json new file mode 100644 index 0000000..17f3977 --- /dev/null +++ b/source-reddit-fetcher/sample_files/configured_catalog.json @@ -0,0 +1,32 @@ +{ + "streams": [ + { + "stream": { + "name": "subreddit_posts", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "full_refresh", "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "subreddit_votes", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] + } \ No newline at end of file diff --git a/source-reddit-fetcher/setup.py b/source-reddit-fetcher/setup.py new file mode 100644 index 0000000..434a7e4 --- /dev/null +++ b/source-reddit-fetcher/setup.py @@ -0,0 +1,35 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk~=0.2", +] + +TEST_REQUIREMENTS = [ + "requests-mock~=1.9.3", + "pytest~=6.2", + "pytest-mock~=3.6.1", + "connector-acceptance-test", +] + +setup( + name="source-reddit-fetcher", + description="Source implementation for Reddit.", + author="Status", + author_email="devops@status.im", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, + entry_points={ + "console_scripts": [ + "source-reddit-connector=source_blockchain_reddit.run:run", + ], + }, +) diff --git a/source-reddit-fetcher/source_reddit_fetcher/__init__.py b/source-reddit-fetcher/source_reddit_fetcher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/source-reddit-fetcher/source_reddit_fetcher/run.py b/source-reddit-fetcher/source_reddit_fetcher/run.py new file mode 100644 index 0000000..32f4857 --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/run.py @@ -0,0 +1,12 @@ +import sys +import logging +from airbyte_cdk.entrypoint import launch +from .source import SourceRedditFetcher + +def run(): + + logger = logging.getLogger("airbyte") + args = sys.argv[1:] + + source = SourceRedditFetcher() + launch(source, sys.argv[1:]) \ No newline at end of file diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_posts.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_posts.json new file mode 100644 index 0000000..6730d76 --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_posts.json @@ -0,0 +1,44 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "kind_tag": { + "type": "string" + }, + "kind_name": { + "type": "string" + }, + "subreddit": { + "type": "string" + }, + "post_id": { + "type": "string" + }, + "post_url": { + "type": "string", + "format": "uri" + }, + "created_timestamp": { + "type": "string", + "format": "date-time" + }, + "timezone": { + "type": "string" + }, + "title": { + "type": "string" + }, + "text": { + "type": "string" + }, + "html_text": { + "type": "string" + }, + "author": { + "type": "string" + } + } +} diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_votes.json new file mode 100644 index 0000000..b786121 --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_votes.json @@ -0,0 +1,31 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "title": "subreddit_posts", + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "post_id": { + "type": "string" + }, + "kind_name": { + "type": "string" + }, + "kind": { + "type": "string" + }, + "ups": { + "type": "integer" + }, + "downs": { + "type": "integer" + }, + "upvote_ratio": { + "type": "number" + }, + "score": { + "type": "integer" + } + } +} diff --git a/source-reddit-fetcher/source_reddit_fetcher/source.py b/source-reddit-fetcher/source_reddit_fetcher/source.py new file mode 100644 index 0000000..d6e5a8b --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/source.py @@ -0,0 +1,247 @@ +from typing import Any, Iterable, List, Mapping, Optional, Tuple +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +import logging, json, datetime, requests +import requests.auth +import pandas as pd + +logger = logging.getLogger("airbyte") +BASE_URL = "https://www.reddit.com" + +class RedditCredentialsAuthentication(TokenAuthenticator): + + def __init__(self, client_id: str, client_secret: str, username: str, **kwargs): + + headers = { + "User-Agent": f"python:app.client_credentials:v1.0 (by u/{username})" + } + data = { + "grant_type": "client_credentials" + } + + url = f"{BASE_URL}/api/v1/access_token" + logger.info(f"Authentication URL: {url}") + + auth = requests.auth.HTTPBasicAuth(client_id, client_secret) + response = requests.post(url, auth=auth, data=data, headers=headers) + response.raise_for_status() + logger.info(f"Successfully connected to {url}") + token = response.json().get("access_token") + + if not token: + raise Exception("Could not fetch access token... Please further investigate!") + + logger.info("Successfully fetched Reddit access token") + super().__init__(token) + + + +class ApiStream(HttpStream): + + primary_key: Optional[str] = None + url_base = "https://oauth.reddit.com/" + + def __init__(self, authenticator: requests.auth.AuthBase): + super().__init__(authenticator=authenticator) + + + def backoff_time(self, response: requests.Response) -> Optional[float]: + + wait = response.headers.get("Retry-After") + if not wait: + wait = response.headers.get("x-ratelimit-reset") + + if response.status_code == 429: + logger.warning(f"Raised too many requests at once! Waiting {wait}s...") + + return float(wait) if wait else None + + + def to_utc_timestamp(self, timestamp: float) -> datetime.date: + return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc).date() + + +class SubredditPosts(ApiStream): + + # Based on "type prefixes" + # https://www.reddit.com/dev/api/ + type_prefixes = { + "t1": "comment", + "t2": "account", + "t3": "link", + "t4": "message", + "t5": "subreddit", + "t6": "award" + } + + primary_key = "id" + cursor_field = "created_timestamp" + + def __init__(self, subreddit: str, authenticator: requests.auth.AuthBase): + super().__init__(authenticator) + self.subreddit = subreddit + + + + def path(self, **kwargs): + return f"{self.url_base}/r/{self.subreddit}/new" + + + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + + data: dict[str, Any] = response.json() + children: list[dict] = data.get("data", {}).get("children", []) + + for child in children: + + data = child.get("data", {}) + if not data: + continue + + row = { + "id": f"{self.subreddit}-{data['id']}", + "kind_tag": child["kind"], + "kind_name": self.type_prefixes[child["kind"]], + "subreddit": self.subreddit, + "post_id": data["id"], + "post_url": BASE_URL + data["permalink"], + "url": data["url"], + "domain": data["domain"], + "created_timestamp": datetime.datetime.fromtimestamp(data["created_utc"], tz=datetime.timezone.utc), + "timezone": "UTC", + "title": data["title"], + "text": data["selftext"], + "html_text": data["selftext_html"], + "author": data["author"], + "raw": json.dumps(data) + } + yield row + + + + def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: + data: dict = response.json() + after = data.get("data", {}).get("after") + return {"after": after} if after else None + + + +class SubredditVotes(SubredditPosts): + + primary_key = "id" + cursor_field = "" + + def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase): + super().__init__(subreddit, authenticator) + + today_utc = datetime.datetime.now(datetime.timezone.utc).date() + self.start_date = (today_utc - pd.offsets.Day(days)).date() + + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + + data: dict[str, Any] = response.json() + children: list[dict] = data.get("data", {}).get("children", []) + + for child in children: + + data = child.get("data", {}) + if not data: + continue + + created_utc = self.to_utc_timestamp(data["created_utc"]) + if self.start_date > created_utc: + break + + keys = ["ups", "downs", "upvote_ratio", "score"] + + row = { + "id": f"{str(datetime.datetime.now().timestamp()).replace('.', '')}-{self.subreddit}-{data['id']}", + "post_id": f"{self.subreddit}-{data['id']}", + "kind": child["kind"], + "kind_name": self.type_prefixes[child["kind"]], + **{key: data[key] for key in keys}, + } + + yield row + + + + def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: + + params = super().next_page_token(response) + + if not params: + return None + + data: dict[str, Any] = response.json() + children: list[dict] = data.get("data", {}).get("children", []) + + if self.start_date > self.to_utc_timestamp(children[-1]["data"]["created_utc"]): + params = None + + return params + + + +class SubredditComments(HttpSubStream, ApiStream): + + def __init__(self, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): + super().__init__(parent=SubredditPosts, **kwargs) + + self.subreddit = subreddit + self.parent = SubredditPosts(self.subreddit, authenticator) + + def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): + post: dict = stream_slice.get("parent") + post_id = post["post_id"] + + url = f"{self.url_base}/r/{self.subreddit}/comments/{post_id}" + logger.info(f"{self.__class__.__name__}: {url}") + return url + + + def parse_response(self, response: requests.Response, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): + + _, comments = response.json() + post_id = response.url.split("/")[-1].split("?")[0] + + for child in comments["data"]["children"]: + child_data: dict = child["data"] + + row = { + "id": f"{self.subreddit}-{post_id}-{child_data['id']}", + "post_id": f"{self.subreddit}-{post_id}", + "subreddit": self.subreddit, + "comment_id": child_data['id'], + "created_timestamp": self.to_utc_timestamp(child_data["created_utc"]), + "timezone": "UTC", + "parent_id": child_data["parent_id"].split("_")[1], + "author": child_data.get("author"), + "text": child_data["body"], + "html_text": child_data["body_html"], + "url": BASE_URL + child_data["permalink"] + } + yield row + + + +class SourceRedditFetcher(AbstractSource): + + def __init__(self): + super().__init__() + + def check_connection(self, logger: logging.Logger, config: dict) -> Tuple[bool, Any]: + return True, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + + auth = RedditCredentialsAuthentication(**config) + streams = [ + SubredditPosts(subreddit=config["subreddit"], authenticator=auth), + SubredditVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth) + ] + return streams \ No newline at end of file diff --git a/source-reddit-fetcher/source_reddit_fetcher/spec.yaml b/source-reddit-fetcher/source_reddit_fetcher/spec.yaml new file mode 100644 index 0000000..f00a9f8 --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/spec.yaml @@ -0,0 +1,23 @@ +documentationUrl: https://www.reddit.com/dev/api/ +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: Reddit Fetcher + type: object + required: + - client_id + - client_secret + - username + - days + properties: + client_id: + type: string + description: "The Client ID (username) from the Reddit API" + blocks_to_do: + type: string + description: "The Client Secret (password) from the Reddit API" + username: + type: string + description: "The username of the Reddit account that has created the Client ID and Secret" + days: + type: integer + description: "The days that votes and comments will be scraped for" \ No newline at end of file From 1a2ed032c065490c9571ce5f434e5ca8c0e7d533 Mon Sep 17 00:00:00 2001 From: Nick Ninov Date: Wed, 16 Jul 2025 11:21:07 +0100 Subject: [PATCH 02/10] Documentation --- source-reddit-fetcher/README.md | 78 +++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 source-reddit-fetcher/README.md diff --git a/source-reddit-fetcher/README.md b/source-reddit-fetcher/README.md new file mode 100644 index 0000000..ab9d5cd --- /dev/null +++ b/source-reddit-fetcher/README.md @@ -0,0 +1,78 @@ +# Reddit Source + + +## Usage + +This connector fetch information from a selected subreddit. + +### Configuration + +The connector takes the following input: + +- `client_id` - Reddit's client ID for your account +- `client_secret` - Reddit's client secret for your account +- `username` - your Reddit username that has been used to generate the `client_id` and `client_secret` +- `days` - used to calculate the stop date. Before that date votes and comments will not be monitored. Calculation: $stop\ date = today - days$ + +### Output + +The connector will return the following: + +* [SubredditPosts](./source-reddit-fetcher/schemas/subreddit_posts.json) +* [SubredditVotes](./source-reddit-fetcher/schemas/subreddit_votes.json) + +## Local development + +### Prerequisites + +#### Activate Virtual Environment and install dependencies + +From this connector directory, create a virtual environment: + +``` +python -m venv .venv +``` + +``` +source .venv/bin/activate +pip install -r requirements.txt +``` + +### Locally running the connector + +``` +python main.py spec +python main.py check --config sample_files/config-example.json +python main.py discover --config sample_files/config-example.json +python main.py read --config sample_files/config-example.json --catalog sample_files/configured_catalog.json +``` + +### Locally running the connector docker image + +```bash +docker build -t airbyte/source-reddit-fetcher:dev . +# Running the spec command against your patched connector +docker run airbyte/source-reddit-fetcher:dev spec +``` + +#### Run + +Then run any of the connector commands as follows: + +#### Linux / MAC OS + +``` +docker run --rm airbyte/source-reddit-fetcher:dev spec +docker run --rm -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev check --config /sample_files/config-example.json +docker run --rm -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev discover --config /sample_files/config-example.json +docker run --rm -v $(pwd)/sample_files:/sample_files -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev read --config /sample_files/config-example.json --catalog /sample_files/configured_catalog.json +``` + +### Windows + +``` +docker run --rm airbyte/source-reddit-fetcher:dev spec +docker run --rm -v "$PWD\sample_files:/sample_files" airbyte/source-reddit-fetcher:dev check --config /sample_files/config-example.json +docker run --rm -v "$PWD\sample_files:/sample_files" airbyte/source-reddit-fetcher:dev discover --config /sample_files/config-example.json +docker run --rm -v "$PWD\sample_files:/sample_files" airbyte/source-reddit-fetcher:dev read --config /sample_files/config-example.json --catalog /sample_files/configured_catalog.json +``` \ No newline at end of file From 03a258e159a7df3bc40e79e003292dc4aecf6f60 Mon Sep 17 00:00:00 2001 From: Nick Ninov Date: Thu, 17 Jul 2025 18:16:12 +0100 Subject: [PATCH 03/10] Subreddit Comment Votes - Add the votes for the comments --- .../sample_files/config-example.json | 8 +- .../sample_files/configured_catalog.json | 32 +++++- .../schemas/subreddit_comments.json | 40 +++++++ .../schemas/subreddit_comments_votes.json | 21 ++++ ...t_votes.json => subreddit_post_votes.json} | 0 .../source_reddit_fetcher/source.py | 103 ++++++++++++++---- 6 files changed, 177 insertions(+), 27 deletions(-) create mode 100644 source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments.json create mode 100644 source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments_votes.json rename source-reddit-fetcher/source_reddit_fetcher/schemas/{subreddit_votes.json => subreddit_post_votes.json} (100%) diff --git a/source-reddit-fetcher/sample_files/config-example.json b/source-reddit-fetcher/sample_files/config-example.json index a0beddb..cd7eec7 100644 --- a/source-reddit-fetcher/sample_files/config-example.json +++ b/source-reddit-fetcher/sample_files/config-example.json @@ -1,7 +1,7 @@ { - "client_id": "your_reddit_client_id", - "client_secret": "your_reddit_client_secret", - "username": "your_reddit_username", - "subreddit": "subreddit_name_to_be_monitored", + "client_id": "ZMX2JG0-HmebOUdbZpjATg", + "client_secret": "32KUcImO7JtOLZS6WzvnC7pmyeeoGA", + "username": "massive-skill-issue", + "subreddit": "statusim", "days": 31 } \ No newline at end of file diff --git a/source-reddit-fetcher/sample_files/configured_catalog.json b/source-reddit-fetcher/sample_files/configured_catalog.json index 17f3977..1294691 100644 --- a/source-reddit-fetcher/sample_files/configured_catalog.json +++ b/source-reddit-fetcher/sample_files/configured_catalog.json @@ -12,11 +12,39 @@ ] }, "sync_mode": "incremental", - "destination_sync_mode": "overwrite" + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "subreddit_post_votes", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "subreddit_comments", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" }, { "stream": { - "name": "subreddit_votes", + "name": "subreddit_comments_votes", "json_schema": { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object" diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments.json new file mode 100644 index 0000000..ee3d23e --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments.json @@ -0,0 +1,40 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "post_id": { + "type": "string" + }, + "subreddit": { + "type": "string" + }, + "comment_id": { + "type": "string" + }, + "created_timestamp": { + "type": "string", + "format": "date-time" + }, + "timezone": { + "type": "string" + }, + "parent_id": { + "type": "string" + }, + "author": { + "type": "string" + }, + "text": { + "type": "string" + }, + "html_text": { + "type": "string" + }, + "url": { + "type": "string" + } + } +} diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments_votes.json new file mode 100644 index 0000000..0e7e3fb --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments_votes.json @@ -0,0 +1,21 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "subreddit": { + "type": "string" + }, + "ups": { + "type": "integer" + }, + "downs": { + "type": "integer" + }, + "score": { + "type": "integer" + } + } +} diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_post_votes.json similarity index 100% rename from source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_votes.json rename to source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_post_votes.json diff --git a/source-reddit-fetcher/source_reddit_fetcher/source.py b/source-reddit-fetcher/source_reddit_fetcher/source.py index d6e5a8b..dac1982 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/source.py +++ b/source-reddit-fetcher/source_reddit_fetcher/source.py @@ -21,6 +21,7 @@ def __init__(self, client_id: str, client_secret: str, username: str, **kwargs): "grant_type": "client_credentials" } + # https://github.com/reddit-archive/reddit/wiki/oauth2#authorization url = f"{BASE_URL}/api/v1/access_token" logger.info(f"Authentication URL: {url}") @@ -28,13 +29,22 @@ def __init__(self, client_id: str, client_secret: str, username: str, **kwargs): response = requests.post(url, auth=auth, data=data, headers=headers) response.raise_for_status() logger.info(f"Successfully connected to {url}") - token = response.json().get("access_token") + + info: dict = response.json() + + token: str = info.get("access_token") + auth_method: str = info.get("token_type") if not token: raise Exception("Could not fetch access token... Please further investigate!") logger.info("Successfully fetched Reddit access token") - super().__init__(token) + logger.info(f"Authentication method: {auth_method.title()}") + + valid_hours = info["expires_in"] / (60 * 60) + logger.info(f"Token is valid for: {valid_hours}") + + super().__init__(token, auth_method.title()) @@ -43,9 +53,12 @@ class ApiStream(HttpStream): primary_key: Optional[str] = None url_base = "https://oauth.reddit.com/" - def __init__(self, authenticator: requests.auth.AuthBase): + def __init__(self, days: int, authenticator: requests.auth.AuthBase): super().__init__(authenticator=authenticator) + today_utc = datetime.datetime.now(datetime.timezone.utc).date() + self.start_date = (today_utc - pd.offsets.Day(days)).date() + def backoff_time(self, response: requests.Response) -> Optional[float]: @@ -59,8 +72,8 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: return float(wait) if wait else None - def to_utc_timestamp(self, timestamp: float) -> datetime.date: - return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc).date() + def to_utc_timestamp(self, timestamp: float) -> datetime.datetime: + return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) class SubredditPosts(ApiStream): @@ -79,8 +92,8 @@ class SubredditPosts(ApiStream): primary_key = "id" cursor_field = "created_timestamp" - def __init__(self, subreddit: str, authenticator: requests.auth.AuthBase): - super().__init__(authenticator) + def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): + super().__init__(days, authenticator) self.subreddit = subreddit @@ -129,16 +142,14 @@ def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any -class SubredditVotes(SubredditPosts): +class SubredditPostVotes(SubredditPosts): primary_key = "id" cursor_field = "" - def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase): - super().__init__(subreddit, authenticator) + def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): + super().__init__(days, subreddit, authenticator) - today_utc = datetime.datetime.now(datetime.timezone.utc).date() - self.start_date = (today_utc - pd.offsets.Day(days)).date() def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: @@ -152,7 +163,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp if not data: continue - created_utc = self.to_utc_timestamp(data["created_utc"]) + created_utc = self.to_utc_timestamp(data["created_utc"]).date() if self.start_date > created_utc: break @@ -180,7 +191,7 @@ def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any data: dict[str, Any] = response.json() children: list[dict] = data.get("data", {}).get("children", []) - if self.start_date > self.to_utc_timestamp(children[-1]["data"]["created_utc"]): + if self.start_date > self.to_utc_timestamp(children[-1]["data"]["created_utc"]).date(): params = None return params @@ -189,21 +200,23 @@ def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any class SubredditComments(HttpSubStream, ApiStream): - def __init__(self, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): + def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): + kwargs.update({ + "days": days, + "authenticator": authenticator + }) super().__init__(parent=SubredditPosts, **kwargs) self.subreddit = subreddit - self.parent = SubredditPosts(self.subreddit, authenticator) + self.parent = SubredditPosts(days, self.subreddit, authenticator) def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): post: dict = stream_slice.get("parent") post_id = post["post_id"] url = f"{self.url_base}/r/{self.subreddit}/comments/{post_id}" - logger.info(f"{self.__class__.__name__}: {url}") return url - def parse_response(self, response: requests.Response, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): _, comments = response.json() @@ -227,6 +240,48 @@ def parse_response(self, response: requests.Response, *, stream_state: Mapping[s } yield row + def next_page_token(self, response: requests.Response): + _, comments = response.json() + after = comments.get("data", {}).get("after") + return {"after": after} if after else None + + + +class SubredditCommentsVotes(SubredditComments): + + def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): + super().__init__(days, subreddit, authenticator, **kwargs) + + def parse_response(self, response: requests.Response, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): + + _, comments = response.json() + post_id = response.url.split("/")[-1].split("?")[0] + + for child in comments["data"]["children"]: + + child_data: dict = child["data"] + + row = { + "id": f"{self.subreddit}-{post_id}-{child_data['id']}", + "subreddit": self.subreddit, + "ups": child_data["ups"], + "downs": child_data["downs"], + "score": child_data["score"] + } + + yield row + + def next_page_token(self, response: requests.Response): + _, comments = response.json() + params = super().next_page_token(response) + + children = comments.get("data", {}).get("children", []) + + if len(children) == 0 or self.start_date > self.to_utc_timestamp(children[-1]["data"]["created_utc"]).date(): + params = None + + return params + class SourceRedditFetcher(AbstractSource): @@ -239,9 +294,15 @@ def check_connection(self, logger: logging.Logger, config: dict) -> Tuple[bool, def streams(self, config: Mapping[str, Any]) -> List[Stream]: - auth = RedditCredentialsAuthentication(**config) + auth = RedditCredentialsAuthentication( + client_id=config["client_id"], + client_secret=config["client_secret"], + username=config["username"] + ) streams = [ - SubredditPosts(subreddit=config["subreddit"], authenticator=auth), - SubredditVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth) + SubredditPosts(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + SubredditPostVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + SubredditComments(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + SubredditCommentsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth) ] return streams \ No newline at end of file From 6095f12ab4c09babf7ee30cd2056deacee14cab3 Mon Sep 17 00:00:00 2001 From: Nick Ninov Date: Fri, 18 Jul 2025 07:39:08 +0100 Subject: [PATCH 04/10] Updated - Secret has been deleted. Old one doesn't work anymore --- source-reddit-fetcher/sample_files/config-example.json | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source-reddit-fetcher/sample_files/config-example.json b/source-reddit-fetcher/sample_files/config-example.json index cd7eec7..a0beddb 100644 --- a/source-reddit-fetcher/sample_files/config-example.json +++ b/source-reddit-fetcher/sample_files/config-example.json @@ -1,7 +1,7 @@ { - "client_id": "ZMX2JG0-HmebOUdbZpjATg", - "client_secret": "32KUcImO7JtOLZS6WzvnC7pmyeeoGA", - "username": "massive-skill-issue", - "subreddit": "statusim", + "client_id": "your_reddit_client_id", + "client_secret": "your_reddit_client_secret", + "username": "your_reddit_username", + "subreddit": "subreddit_name_to_be_monitored", "days": 31 } \ No newline at end of file From 56d3277f0172fcdd62fc6d6b8dfe4e52c80289a8 Mon Sep 17 00:00:00 2001 From: Nick Ninov Date: Fri, 18 Jul 2025 08:48:15 +0100 Subject: [PATCH 05/10] Renaming - Airbyte stream rename - Add request_params --- source-reddit-fetcher/README.md | 2 +- source-reddit-fetcher/metadata.yaml | 4 +- .../sample_files/configured_catalog.json | 8 +-- ...{subreddit_comments.json => comments.json} | 0 ...omments_votes.json => comments_votes.json} | 0 ...reddit_post_votes.json => post_votes.json} | 0 .../{subreddit_posts.json => posts.json} | 0 .../source_reddit_fetcher/source.py | 50 ++++++++++++------- 8 files changed, 39 insertions(+), 25 deletions(-) rename source-reddit-fetcher/source_reddit_fetcher/schemas/{subreddit_comments.json => comments.json} (100%) rename source-reddit-fetcher/source_reddit_fetcher/schemas/{subreddit_comments_votes.json => comments_votes.json} (100%) rename source-reddit-fetcher/source_reddit_fetcher/schemas/{subreddit_post_votes.json => post_votes.json} (100%) rename source-reddit-fetcher/source_reddit_fetcher/schemas/{subreddit_posts.json => posts.json} (100%) diff --git a/source-reddit-fetcher/README.md b/source-reddit-fetcher/README.md index ab9d5cd..28fa3f9 100644 --- a/source-reddit-fetcher/README.md +++ b/source-reddit-fetcher/README.md @@ -65,7 +65,7 @@ Then run any of the connector commands as follows: docker run --rm airbyte/source-reddit-fetcher:dev spec docker run --rm -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev check --config /sample_files/config-example.json docker run --rm -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev discover --config /sample_files/config-example.json -docker run --rm -v $(pwd)/sample_files:/sample_files -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev read --config /sample_files/config-example.json --catalog /sample_files/configured_catalog.json +docker run --rm -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev read --config /sample_files/config-example.json --catalog /sample_files/configured_catalog.json ``` ### Windows diff --git a/source-reddit-fetcher/metadata.yaml b/source-reddit-fetcher/metadata.yaml index c3062dc..e0cfd83 100644 --- a/source-reddit-fetcher/metadata.yaml +++ b/source-reddit-fetcher/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920 - dockerImageTag: '0.0.2' + dockerImageTag: '0.0.1' dockerRepository: harbor.status.im/bi/airbyte/source-reddit-fetcher githubIssueLabel: source-reddit-fetcher icon: twitter-fetcher.svg @@ -19,7 +19,7 @@ data: releaseDate: TODO supportLevel: community releaseStage: alpha - documentationUrl: https://docs.airbyte.com/integrations/sources/twitter-fetcher + documentationUrl: https://docs.bi.status.im/extractions/reddit.html tags: - language:python metadataSpecVersion: "1.0" diff --git a/source-reddit-fetcher/sample_files/configured_catalog.json b/source-reddit-fetcher/sample_files/configured_catalog.json index 1294691..5057aa5 100644 --- a/source-reddit-fetcher/sample_files/configured_catalog.json +++ b/source-reddit-fetcher/sample_files/configured_catalog.json @@ -2,7 +2,7 @@ "streams": [ { "stream": { - "name": "subreddit_posts", + "name": "posts", "json_schema": { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object" @@ -16,7 +16,7 @@ }, { "stream": { - "name": "subreddit_post_votes", + "name": "post_votes", "json_schema": { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object" @@ -30,7 +30,7 @@ }, { "stream": { - "name": "subreddit_comments", + "name": "comments", "json_schema": { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object" @@ -44,7 +44,7 @@ }, { "stream": { - "name": "subreddit_comments_votes", + "name": "comments_votes", "json_schema": { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object" diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json similarity index 100% rename from source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments.json rename to source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/comments_votes.json similarity index 100% rename from source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_comments_votes.json rename to source-reddit-fetcher/source_reddit_fetcher/schemas/comments_votes.json diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_post_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/post_votes.json similarity index 100% rename from source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_post_votes.json rename to source-reddit-fetcher/source_reddit_fetcher/schemas/post_votes.json diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_posts.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json similarity index 100% rename from source-reddit-fetcher/source_reddit_fetcher/schemas/subreddit_posts.json rename to source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json diff --git a/source-reddit-fetcher/source_reddit_fetcher/source.py b/source-reddit-fetcher/source_reddit_fetcher/source.py index dac1982..ca99dd7 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/source.py +++ b/source-reddit-fetcher/source_reddit_fetcher/source.py @@ -48,14 +48,15 @@ def __init__(self, client_id: str, client_secret: str, username: str, **kwargs): -class ApiStream(HttpStream): +class RedditStream(HttpStream): primary_key: Optional[str] = None url_base = "https://oauth.reddit.com/" - def __init__(self, days: int, authenticator: requests.auth.AuthBase): + def __init__(self, subreddit: str, days: int, authenticator: requests.auth.AuthBase): super().__init__(authenticator=authenticator) + self.subreddit = subreddit today_utc = datetime.datetime.now(datetime.timezone.utc).date() self.start_date = (today_utc - pd.offsets.Day(days)).date() @@ -76,7 +77,19 @@ def to_utc_timestamp(self, timestamp: float) -> datetime.datetime: return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) -class SubredditPosts(ApiStream): + def request_params(self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): + + params = { + "limit": 100 + } + if next_page_token: + params.update(next_page_token) + + return params + + + +class Posts(RedditStream): # Based on "type prefixes" # https://www.reddit.com/dev/api/ @@ -93,8 +106,7 @@ class SubredditPosts(ApiStream): cursor_field = "created_timestamp" def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): - super().__init__(days, authenticator) - self.subreddit = subreddit + super().__init__(subreddit, days, authenticator) @@ -138,11 +150,12 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: data: dict = response.json() after = data.get("data", {}).get("after") + return None return {"after": after} if after else None -class SubredditPostVotes(SubredditPosts): +class PostVotes(Posts): primary_key = "id" cursor_field = "" @@ -198,17 +211,15 @@ def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any -class SubredditComments(HttpSubStream, ApiStream): +class Comments(HttpSubStream, RedditStream): - def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): + def __init__(self, days: int, authenticator: requests.auth.AuthBase, **kwargs): kwargs.update({ "days": days, "authenticator": authenticator }) - super().__init__(parent=SubredditPosts, **kwargs) - - self.subreddit = subreddit - self.parent = SubredditPosts(days, self.subreddit, authenticator) + super().__init__(parent=Posts, **kwargs) + self.parent = Posts(days, self.subreddit, authenticator) def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): post: dict = stream_slice.get("parent") @@ -247,10 +258,13 @@ def next_page_token(self, response: requests.Response): -class SubredditCommentsVotes(SubredditComments): +class CommentsVotes(Comments): def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): - super().__init__(days, subreddit, authenticator, **kwargs) + kwargs.update({ + "subreddit": subreddit + }) + super().__init__(days, authenticator, **kwargs) def parse_response(self, response: requests.Response, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): @@ -300,9 +314,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: username=config["username"] ) streams = [ - SubredditPosts(days=config["days"], subreddit=config["subreddit"], authenticator=auth), - SubredditPostVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth), - SubredditComments(days=config["days"], subreddit=config["subreddit"], authenticator=auth), - SubredditCommentsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth) + Posts(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + PostVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + Comments(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + CommentsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth) ] return streams \ No newline at end of file From 7962798853e71b981de2c4c19b152dbd70aeca5a Mon Sep 17 00:00:00 2001 From: Nick Ninov Date: Fri, 18 Jul 2025 08:57:57 +0100 Subject: [PATCH 06/10] Documentation --- source-reddit-fetcher/README.md | 6 ++++-- source-reddit-fetcher/sample_files/configured_catalog.json | 2 +- .../schemas/{post_votes.json => posts_votes.json} | 0 source-reddit-fetcher/source_reddit_fetcher/source.py | 7 +++---- 4 files changed, 8 insertions(+), 7 deletions(-) rename source-reddit-fetcher/source_reddit_fetcher/schemas/{post_votes.json => posts_votes.json} (100%) diff --git a/source-reddit-fetcher/README.md b/source-reddit-fetcher/README.md index 28fa3f9..459c3ec 100644 --- a/source-reddit-fetcher/README.md +++ b/source-reddit-fetcher/README.md @@ -18,8 +18,10 @@ The connector takes the following input: The connector will return the following: -* [SubredditPosts](./source-reddit-fetcher/schemas/subreddit_posts.json) -* [SubredditVotes](./source-reddit-fetcher/schemas/subreddit_votes.json) +* [Posts](./source-reddit-fetcher/schemas/posts.json) +* [PostVotes](./source-reddit-fetcher/schemas/posts_votes.json) +* [Comments](./source-reddit-fetcher/schemas/comments.json) +* [CommentsVotes](./source-reddit-fetcher/schemas/comments_votes.json) ## Local development diff --git a/source-reddit-fetcher/sample_files/configured_catalog.json b/source-reddit-fetcher/sample_files/configured_catalog.json index 5057aa5..f0c6a49 100644 --- a/source-reddit-fetcher/sample_files/configured_catalog.json +++ b/source-reddit-fetcher/sample_files/configured_catalog.json @@ -16,7 +16,7 @@ }, { "stream": { - "name": "post_votes", + "name": "posts_votes", "json_schema": { "$schema": "http://json-schema.org/draft-04/schema#", "type": "object" diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/post_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json similarity index 100% rename from source-reddit-fetcher/source_reddit_fetcher/schemas/post_votes.json rename to source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json diff --git a/source-reddit-fetcher/source_reddit_fetcher/source.py b/source-reddit-fetcher/source_reddit_fetcher/source.py index ca99dd7..1f0a444 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/source.py +++ b/source-reddit-fetcher/source_reddit_fetcher/source.py @@ -42,7 +42,7 @@ def __init__(self, client_id: str, client_secret: str, username: str, **kwargs): logger.info(f"Authentication method: {auth_method.title()}") valid_hours = info["expires_in"] / (60 * 60) - logger.info(f"Token is valid for: {valid_hours}") + logger.info(f"Token is valid for: {int(valid_hours)} hours") super().__init__(token, auth_method.title()) @@ -150,12 +150,11 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: data: dict = response.json() after = data.get("data", {}).get("after") - return None return {"after": after} if after else None -class PostVotes(Posts): +class PostsVotes(Posts): primary_key = "id" cursor_field = "" @@ -315,7 +314,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: ) streams = [ Posts(days=config["days"], subreddit=config["subreddit"], authenticator=auth), - PostVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + PostsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth), Comments(days=config["days"], subreddit=config["subreddit"], authenticator=auth), CommentsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth) ] From d7141c36617c524506ba20d610312c983c51a359 Mon Sep 17 00:00:00 2001 From: Nick Ninov Date: Fri, 18 Jul 2025 13:28:39 +0100 Subject: [PATCH 07/10] Test env bug fix --- .../source_reddit_fetcher/source.py | 17 ++++++++++++++++- .../source_reddit_fetcher/spec.yaml | 8 ++++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/source-reddit-fetcher/source_reddit_fetcher/source.py b/source-reddit-fetcher/source_reddit_fetcher/source.py index 1f0a444..62888e1 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/source.py +++ b/source-reddit-fetcher/source_reddit_fetcher/source.py @@ -303,7 +303,22 @@ def __init__(self): super().__init__() def check_connection(self, logger: logging.Logger, config: dict) -> Tuple[bool, Any]: - return True, None + + logger.info(f"Config keys: {config.keys()}") + + auth = RedditCredentialsAuthentication( + client_id=config["client_id"], + client_secret=config["client_secret"], + username=config["username"] + ) + + url = BASE_URL + f"/r/" + config["subreddit"] + "/comments" + logger.info(f"Fetching: {url}") + + resp = requests.get(url) + resp.raise_for_status() + + return resp.status_code == 200, None def streams(self, config: Mapping[str, Any]) -> List[Stream]: diff --git a/source-reddit-fetcher/source_reddit_fetcher/spec.yaml b/source-reddit-fetcher/source_reddit_fetcher/spec.yaml index f00a9f8..517bbdb 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/spec.yaml +++ b/source-reddit-fetcher/source_reddit_fetcher/spec.yaml @@ -8,16 +8,20 @@ connectionSpecification: - client_secret - username - days + - subreddit properties: client_id: type: string description: "The Client ID (username) from the Reddit API" - blocks_to_do: + client_secret: type: string description: "The Client Secret (password) from the Reddit API" + subreddit: + type: string + description: "The Subreddit that will be monitored" username: type: string description: "The username of the Reddit account that has created the Client ID and Secret" days: type: integer - description: "The days that votes and comments will be scraped for" \ No newline at end of file + description: "How many days in the past to look for votes and comments" \ No newline at end of file From 3e63a693bd54e813114fbbb091e6e477f5c8ae5d Mon Sep 17 00:00:00 2001 From: apentori Date: Tue, 29 Jul 2025 09:11:08 +0200 Subject: [PATCH 08/10] reddit: fix connector Add auth to test connection Fix wrong naming in setup combine Comments and CommentsVotes Fix naming in PostVotes schemas Signed-off-by: apentori --- source-reddit-fetcher/Dockerfile | 5 +- source-reddit-fetcher/metadata.yaml | 2 +- .../sample_files/configured_catalog.json | 22 +-- source-reddit-fetcher/setup.py | 4 +- .../source_reddit_fetcher/run.py | 7 +- .../schemas/comments.json | 9 ++ .../schemas/comments_votes.json | 21 --- .../schemas/posts_votes.json | 17 +-- .../source_reddit_fetcher/source.py | 138 +++++------------- .../source_reddit_fetcher/spec.yaml | 3 +- 10 files changed, 65 insertions(+), 163 deletions(-) delete mode 100644 source-reddit-fetcher/source_reddit_fetcher/schemas/comments_votes.json diff --git a/source-reddit-fetcher/Dockerfile b/source-reddit-fetcher/Dockerfile index fce999e..fc1ed49 100644 --- a/source-reddit-fetcher/Dockerfile +++ b/source-reddit-fetcher/Dockerfile @@ -1,10 +1,11 @@ FROM airbyte/python-connector-base:1.1.0 +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + COPY . ./airbyte/integration_code RUN pip install ./airbyte/integration_code -COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt # The entrypoint and default env vars are already set in the base image ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" diff --git a/source-reddit-fetcher/metadata.yaml b/source-reddit-fetcher/metadata.yaml index e0cfd83..9a5bc86 100644 --- a/source-reddit-fetcher/metadata.yaml +++ b/source-reddit-fetcher/metadata.yaml @@ -19,7 +19,7 @@ data: releaseDate: TODO supportLevel: community releaseStage: alpha - documentationUrl: https://docs.bi.status.im/extractions/reddit.html + documentationUrl: https://docs.bi.status.im/extractions/reddit.html tags: - language:python metadataSpecVersion: "1.0" diff --git a/source-reddit-fetcher/sample_files/configured_catalog.json b/source-reddit-fetcher/sample_files/configured_catalog.json index f0c6a49..79388bf 100644 --- a/source-reddit-fetcher/sample_files/configured_catalog.json +++ b/source-reddit-fetcher/sample_files/configured_catalog.json @@ -36,25 +36,11 @@ "type": "object" }, "supported_sync_modes": [ - "incremental" - ] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "comments_votes", - "json_schema": { - "$schema": "http://json-schema.org/draft-04/schema#", - "type": "object" - }, - "supported_sync_modes": [ - "incremental" + "full_refresh", "incremental" ] }, - "sync_mode": "incremental", - "destination_sync_mode": "append" + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" } ] - } \ No newline at end of file + } diff --git a/source-reddit-fetcher/setup.py b/source-reddit-fetcher/setup.py index 434a7e4..4c38da1 100644 --- a/source-reddit-fetcher/setup.py +++ b/source-reddit-fetcher/setup.py @@ -20,7 +20,7 @@ name="source-reddit-fetcher", description="Source implementation for Reddit.", author="Status", - author_email="devops@status.im", + author_email="bi@status.im", packages=find_packages(), install_requires=MAIN_REQUIREMENTS, package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, @@ -29,7 +29,7 @@ }, entry_points={ "console_scripts": [ - "source-reddit-connector=source_blockchain_reddit.run:run", + "source-reddit-connector=source_reddit_fetcher.run:run", ], }, ) diff --git a/source-reddit-fetcher/source_reddit_fetcher/run.py b/source-reddit-fetcher/source_reddit_fetcher/run.py index 32f4857..e60476d 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/run.py +++ b/source-reddit-fetcher/source_reddit_fetcher/run.py @@ -1,12 +1,7 @@ import sys -import logging from airbyte_cdk.entrypoint import launch from .source import SourceRedditFetcher def run(): - - logger = logging.getLogger("airbyte") - args = sys.argv[1:] - source = SourceRedditFetcher() - launch(source, sys.argv[1:]) \ No newline at end of file + launch(source, sys.argv[1:]) diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json index ee3d23e..f1812c0 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json @@ -35,6 +35,15 @@ }, "url": { "type": "string" + }, + "ups": { + "type": "integer" + }, + "downs": { + "type": "integer" + }, + "score": { + "type": "integer" } } } diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/comments_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/comments_votes.json deleted file mode 100644 index 0e7e3fb..0000000 --- a/source-reddit-fetcher/source_reddit_fetcher/schemas/comments_votes.json +++ /dev/null @@ -1,21 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-04/schema#", - "type": "object", - "properties": { - "id": { - "type": "string" - }, - "subreddit": { - "type": "string" - }, - "ups": { - "type": "integer" - }, - "downs": { - "type": "integer" - }, - "score": { - "type": "integer" - } - } -} diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json index b786121..79a6077 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json @@ -1,31 +1,30 @@ { "$schema": "http://json-schema.org/draft-04/schema#", - "title": "subreddit_posts", "type": "object", "properties": { "id": { - "type": "string" + "type": ["null", "string"] }, "post_id": { - "type": "string" + "type": ["null", "string"] }, "kind_name": { - "type": "string" + "type": ["null", "string"] }, "kind": { - "type": "string" + "type": ["null", "string"] }, "ups": { - "type": "integer" + "type": ["null", "integer"] }, "downs": { - "type": "integer" + "type": ["null", "integer"] }, "upvote_ratio": { - "type": "number" + "type": ["null", "number"] }, "score": { - "type": "integer" + "type": ["null", "integer"] } } } diff --git a/source-reddit-fetcher/source_reddit_fetcher/source.py b/source-reddit-fetcher/source_reddit_fetcher/source.py index 62888e1..8d98027 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/source.py +++ b/source-reddit-fetcher/source_reddit_fetcher/source.py @@ -1,3 +1,4 @@ +from abc import ABC from typing import Any, Iterable, List, Mapping, Optional, Tuple from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream @@ -6,6 +7,7 @@ import logging, json, datetime, requests import requests.auth import pandas as pd +import os logger = logging.getLogger("airbyte") BASE_URL = "https://www.reddit.com" @@ -13,7 +15,6 @@ class RedditCredentialsAuthentication(TokenAuthenticator): def __init__(self, client_id: str, client_secret: str, username: str, **kwargs): - headers = { "User-Agent": f"python:app.client_credentials:v1.0 (by u/{username})" } @@ -29,31 +30,26 @@ def __init__(self, client_id: str, client_secret: str, username: str, **kwargs): response = requests.post(url, auth=auth, data=data, headers=headers) response.raise_for_status() logger.info(f"Successfully connected to {url}") - info: dict = response.json() token: str = info.get("access_token") auth_method: str = info.get("token_type") - if not token: raise Exception("Could not fetch access token... Please further investigate!") - logger.info("Successfully fetched Reddit access token") logger.info(f"Authentication method: {auth_method.title()}") - valid_hours = info["expires_in"] / (60 * 60) logger.info(f"Token is valid for: {int(valid_hours)} hours") super().__init__(token, auth_method.title()) - -class RedditStream(HttpStream): +class RedditStream(HttpStream, ABC): primary_key: Optional[str] = None url_base = "https://oauth.reddit.com/" - def __init__(self, subreddit: str, days: int, authenticator: requests.auth.AuthBase): + def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase): super().__init__(authenticator=authenticator) self.subreddit = subreddit @@ -62,37 +58,27 @@ def __init__(self, subreddit: str, days: int, authenticator: requests.auth.AuthB def backoff_time(self, response: requests.Response) -> Optional[float]: - wait = response.headers.get("Retry-After") if not wait: wait = response.headers.get("x-ratelimit-reset") - if response.status_code == 429: logger.warning(f"Raised too many requests at once! Waiting {wait}s...") - return float(wait) if wait else None - def to_utc_timestamp(self, timestamp: float) -> datetime.datetime: return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) - def request_params(self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): - params = { "limit": 100 } if next_page_token: params.update(next_page_token) - - return params - + return params class Posts(RedditStream): - # Based on "type prefixes" - # https://www.reddit.com/dev/api/ type_prefixes = { "t1": "comment", "t2": "account", @@ -105,27 +91,19 @@ class Posts(RedditStream): primary_key = "id" cursor_field = "created_timestamp" - def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): - super().__init__(subreddit, days, authenticator) - - - def path(self, **kwargs): return f"{self.url_base}/r/{self.subreddit}/new" def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - data: dict[str, Any] = response.json() children: list[dict] = data.get("data", {}).get("children", []) for child in children: - data = child.get("data", {}) if not data: continue - row = { "id": f"{self.subreddit}-{data['id']}", "kind_tag": child["kind"], @@ -153,34 +131,33 @@ def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any return {"after": after} if after else None + def get_json_schema(self) -> Mapping[str, Any]: + schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") + with open(schema_path, "r") as f: + return json.load(f) -class PostsVotes(Posts): - primary_key = "id" - cursor_field = "" - def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): - super().__init__(days, subreddit, authenticator) - +class PostsVotes(RedditStream): + primary_key = "id" + + def path(self, **kwargs): + return f"{self.url_base}/r/{self.subreddit}/new" def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - data: dict[str, Any] = response.json() children: list[dict] = data.get("data", {}).get("children", []) for child in children: - data = child.get("data", {}) if not data: continue - created_utc = self.to_utc_timestamp(data["created_utc"]).date() if self.start_date > created_utc: break keys = ["ups", "downs", "upvote_ratio", "score"] - row = { "id": f"{str(datetime.datetime.now().timestamp()).replace('.', '')}-{self.subreddit}-{data['id']}", "post_id": f"{self.subreddit}-{data['id']}", @@ -194,12 +171,9 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: - params = super().next_page_token(response) - if not params: return None - data: dict[str, Any] = response.json() children: list[dict] = data.get("data", {}).get("children", []) @@ -209,32 +183,29 @@ def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any return params + def get_json_schema(self) -> Mapping[str, Any]: + schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") + with open(schema_path, "r") as f: + return json.load(f) -class Comments(HttpSubStream, RedditStream): - def __init__(self, days: int, authenticator: requests.auth.AuthBase, **kwargs): - kwargs.update({ - "days": days, - "authenticator": authenticator - }) - super().__init__(parent=Posts, **kwargs) - self.parent = Posts(days, self.subreddit, authenticator) + +class Comments(HttpSubStream, Posts): + primary_key = "id" + cursor_field = "created_timestamp" def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): post: dict = stream_slice.get("parent") post_id = post["post_id"] - - url = f"{self.url_base}/r/{self.subreddit}/comments/{post_id}" + url = f"{self.url_base}/r/{post.subreddit}/comments/{post_id}" return url def parse_response(self, response: requests.Response, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): - _, comments = response.json() post_id = response.url.split("/")[-1].split("?")[0] for child in comments["data"]["children"]: child_data: dict = child["data"] - row = { "id": f"{self.subreddit}-{post_id}-{child_data['id']}", "post_id": f"{self.subreddit}-{post_id}", @@ -246,56 +217,23 @@ def parse_response(self, response: requests.Response, *, stream_state: Mapping[s "author": child_data.get("author"), "text": child_data["body"], "html_text": child_data["body_html"], - "url": BASE_URL + child_data["permalink"] - } - yield row - - def next_page_token(self, response: requests.Response): - _, comments = response.json() - after = comments.get("data", {}).get("after") - return {"after": after} if after else None - - - -class CommentsVotes(Comments): - - def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase, **kwargs): - kwargs.update({ - "subreddit": subreddit - }) - super().__init__(days, authenticator, **kwargs) - - def parse_response(self, response: requests.Response, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): - - _, comments = response.json() - post_id = response.url.split("/")[-1].split("?")[0] - - for child in comments["data"]["children"]: - - child_data: dict = child["data"] - - row = { - "id": f"{self.subreddit}-{post_id}-{child_data['id']}", - "subreddit": self.subreddit, + "url": BASE_URL + child_data["permalink"], "ups": child_data["ups"], "downs": child_data["downs"], "score": child_data["score"] } - yield row def next_page_token(self, response: requests.Response): _, comments = response.json() - params = super().next_page_token(response) - - children = comments.get("data", {}).get("children", []) - - if len(children) == 0 or self.start_date > self.to_utc_timestamp(children[-1]["data"]["created_utc"]).date(): - params = None - - return params + after = comments.get("data", {}).get("after") + return {"after": after} if after else None + def get_json_schema(self) -> Mapping[str, Any]: + schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") + with open(schema_path, "r") as f: + return json.load(f) class SourceRedditFetcher(AbstractSource): @@ -303,34 +241,28 @@ def __init__(self): super().__init__() def check_connection(self, logger: logging.Logger, config: dict) -> Tuple[bool, Any]: - logger.info(f"Config keys: {config.keys()}") - auth = RedditCredentialsAuthentication( client_id=config["client_id"], client_secret=config["client_secret"], username=config["username"] ) - url = BASE_URL + f"/r/" + config["subreddit"] + "/comments" logger.info(f"Fetching: {url}") - - resp = requests.get(url) + resp = requests.get(url, auth=auth) resp.raise_for_status() - return resp.status_code == 200, None def streams(self, config: Mapping[str, Any]) -> List[Stream]: - auth = RedditCredentialsAuthentication( client_id=config["client_id"], client_secret=config["client_secret"], username=config["username"] ) + posts = Posts(days=config["days"], subreddit=config["subreddit"], authenticator=auth) streams = [ - Posts(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + posts, PostsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth), - Comments(days=config["days"], subreddit=config["subreddit"], authenticator=auth), - CommentsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth) + Comments(days=config["days"], subreddit=config["subreddit"], authenticator=auth, parent=posts), ] - return streams \ No newline at end of file + return streams diff --git a/source-reddit-fetcher/source_reddit_fetcher/spec.yaml b/source-reddit-fetcher/source_reddit_fetcher/spec.yaml index 517bbdb..842e5de 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/spec.yaml +++ b/source-reddit-fetcher/source_reddit_fetcher/spec.yaml @@ -16,6 +16,7 @@ connectionSpecification: client_secret: type: string description: "The Client Secret (password) from the Reddit API" + airbyte_secret: true subreddit: type: string description: "The Subreddit that will be monitored" @@ -24,4 +25,4 @@ connectionSpecification: description: "The username of the Reddit account that has created the Client ID and Secret" days: type: integer - description: "How many days in the past to look for votes and comments" \ No newline at end of file + description: "How many days in the past to look for votes and comments" From fea0e52336ee074f937203ce22ad0902b51f7e8e Mon Sep 17 00:00:00 2001 From: apentori Date: Fri, 1 Aug 2025 10:56:10 +0200 Subject: [PATCH 09/10] reddit: group posts and posts_votes fix next_page_token mecanism Signed-off-by: apentori --- .../sample_files/configured_catalog.json | 16 +-- .../source_reddit_fetcher/schemas/posts.json | 12 +++ .../schemas/posts_votes.json | 30 ------ .../source_reddit_fetcher/source.py | 100 ++++-------------- 4 files changed, 36 insertions(+), 122 deletions(-) delete mode 100644 source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json diff --git a/source-reddit-fetcher/sample_files/configured_catalog.json b/source-reddit-fetcher/sample_files/configured_catalog.json index 79388bf..99df2d1 100644 --- a/source-reddit-fetcher/sample_files/configured_catalog.json +++ b/source-reddit-fetcher/sample_files/configured_catalog.json @@ -14,21 +14,7 @@ "sync_mode": "incremental", "destination_sync_mode": "append" }, - { - "stream": { - "name": "posts_votes", - "json_schema": { - "$schema": "http://json-schema.org/draft-04/schema#", - "type": "object" - }, - "supported_sync_modes": [ - "incremental" - ] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { + { "stream": { "name": "comments", "json_schema": { diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json index 6730d76..8b97c4c 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json @@ -39,6 +39,18 @@ }, "author": { "type": "string" + }, + "ups": { + "type": ["null", "integer"] + }, + "downs": { + "type": ["null", "integer"] + }, + "upvote_ratio": { + "type": ["null", "number"] + }, + "score": { + "type": ["null", "integer"] } } } diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json deleted file mode 100644 index 79a6077..0000000 --- a/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "$schema": "http://json-schema.org/draft-04/schema#", - "type": "object", - "properties": { - "id": { - "type": ["null", "string"] - }, - "post_id": { - "type": ["null", "string"] - }, - "kind_name": { - "type": ["null", "string"] - }, - "kind": { - "type": ["null", "string"] - }, - "ups": { - "type": ["null", "integer"] - }, - "downs": { - "type": ["null", "integer"] - }, - "upvote_ratio": { - "type": ["null", "number"] - }, - "score": { - "type": ["null", "integer"] - } - } -} diff --git a/source-reddit-fetcher/source_reddit_fetcher/source.py b/source-reddit-fetcher/source_reddit_fetcher/source.py index 8d98027..d985b69 100644 --- a/source-reddit-fetcher/source_reddit_fetcher/source.py +++ b/source-reddit-fetcher/source_reddit_fetcher/source.py @@ -4,7 +4,8 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator -import logging, json, datetime, requests +import logging, json, requests +from datetime import datetime, timezone, timedelta import requests.auth import pandas as pd import os @@ -53,7 +54,7 @@ def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthB super().__init__(authenticator=authenticator) self.subreddit = subreddit - today_utc = datetime.datetime.now(datetime.timezone.utc).date() + today_utc = datetime.now(timezone.utc).date() self.start_date = (today_utc - pd.offsets.Day(days)).date() @@ -65,12 +66,12 @@ def backoff_time(self, response: requests.Response) -> Optional[float]: logger.warning(f"Raised too many requests at once! Waiting {wait}s...") return float(wait) if wait else None - def to_utc_timestamp(self, timestamp: float) -> datetime.datetime: - return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) + def to_utc_timestamp(self, timestamp: float) -> datetime: + return datetime.fromtimestamp(timestamp, tz=timezone.utc) def request_params(self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): params = { - "limit": 100 + "limit": 10 } if next_page_token: params.update(next_page_token) @@ -90,6 +91,8 @@ class Posts(RedditStream): primary_key = "id" cursor_field = "created_timestamp" + _last_post = None + _last_id = None def path(self, **kwargs): return f"{self.url_base}/r/{self.subreddit}/new" @@ -113,81 +116,30 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp "post_url": BASE_URL + data["permalink"], "url": data["url"], "domain": data["domain"], - "created_timestamp": datetime.datetime.fromtimestamp(data["created_utc"], tz=datetime.timezone.utc), + "created_timestamp": datetime.fromtimestamp(data["created_utc"], tz=timezone.utc), "timezone": "UTC", "title": data["title"], "text": data["selftext"], "html_text": data["selftext_html"], "author": data["author"], + "author_fullname": data["author"], + "downs": data["downs"], + "ups": data["ups"], + "score": data["score"], + "upvote_ratio": data["upvote_ratio"], + "subreddit_subscribers": data["subreddit_subscribers"], "raw": json.dumps(data) } yield row - + self._last_post = datetime.fromtimestamp(data["created_utc"], tz=timezone.utc).date() + self._last_id = f"t5_{data['id']}" def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: data: dict = response.json() - after = data.get("data", {}).get("after") - return {"after": after} if after else None - - - def get_json_schema(self) -> Mapping[str, Any]: - schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") - with open(schema_path, "r") as f: - return json.load(f) - - - -class PostsVotes(RedditStream): - - primary_key = "id" - - def path(self, **kwargs): - return f"{self.url_base}/r/{self.subreddit}/new" - - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - data: dict[str, Any] = response.json() - children: list[dict] = data.get("data", {}).get("children", []) - - for child in children: - data = child.get("data", {}) - if not data: - continue - created_utc = self.to_utc_timestamp(data["created_utc"]).date() - if self.start_date > created_utc: - break - - keys = ["ups", "downs", "upvote_ratio", "score"] - row = { - "id": f"{str(datetime.datetime.now().timestamp()).replace('.', '')}-{self.subreddit}-{data['id']}", - "post_id": f"{self.subreddit}-{data['id']}", - "kind": child["kind"], - "kind_name": self.type_prefixes[child["kind"]], - **{key: data[key] for key in keys}, - } - - yield row - - - - def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: - params = super().next_page_token(response) - if not params: - return None - data: dict[str, Any] = response.json() - children: list[dict] = data.get("data", {}).get("children", []) - - if self.start_date > self.to_utc_timestamp(children[-1]["data"]["created_utc"]).date(): - params = None - - return params - - - def get_json_schema(self) -> Mapping[str, Any]: - schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") - with open(schema_path, "r") as f: - return json.load(f) + if self.start_date < self._last_post: + return {"before": self._last_id} class Comments(HttpSubStream, Posts): @@ -197,13 +149,12 @@ class Comments(HttpSubStream, Posts): def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): post: dict = stream_slice.get("parent") post_id = post["post_id"] - url = f"{self.url_base}/r/{post.subreddit}/comments/{post_id}" + url = f"{self.url_base}/r/{self.subreddit}/comments/{post_id}" return url def parse_response(self, response: requests.Response, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): _, comments = response.json() post_id = response.url.split("/")[-1].split("?")[0] - for child in comments["data"]["children"]: child_data: dict = child["data"] row = { @@ -224,16 +175,12 @@ def parse_response(self, response: requests.Response, *, stream_state: Mapping[s } yield row + def next_page_token(self, response: requests.Response): _, comments = response.json() - after = comments.get("data", {}).get("after") - return {"after": after} if after else None - + before = comments.get("data", {}).get("before") + return {"before": before} if before else None - def get_json_schema(self) -> Mapping[str, Any]: - schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") - with open(schema_path, "r") as f: - return json.load(f) class SourceRedditFetcher(AbstractSource): @@ -262,7 +209,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: posts = Posts(days=config["days"], subreddit=config["subreddit"], authenticator=auth) streams = [ posts, - PostsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth), Comments(days=config["days"], subreddit=config["subreddit"], authenticator=auth, parent=posts), ] return streams From 1d34c86e1d5f0e953dba2af04c62f9b3ead9f39b Mon Sep 17 00:00:00 2001 From: apentori Date: Fri, 1 Aug 2025 14:56:11 +0200 Subject: [PATCH 10/10] reddit: update metadata version Signed-off-by: apentori --- source-reddit-fetcher/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source-reddit-fetcher/metadata.yaml b/source-reddit-fetcher/metadata.yaml index 9a5bc86..0e89274 100644 --- a/source-reddit-fetcher/metadata.yaml +++ b/source-reddit-fetcher/metadata.yaml @@ -10,7 +10,7 @@ data: connectorSubtype: api connectorType: source definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920 - dockerImageTag: '0.0.1' + dockerImageTag: '1.0.0' dockerRepository: harbor.status.im/bi/airbyte/source-reddit-fetcher githubIssueLabel: source-reddit-fetcher icon: twitter-fetcher.svg