diff --git a/source-reddit-fetcher/Dockerfile b/source-reddit-fetcher/Dockerfile new file mode 100644 index 0000000..fc1ed49 --- /dev/null +++ b/source-reddit-fetcher/Dockerfile @@ -0,0 +1,12 @@ +FROM airbyte/python-connector-base:1.1.0 + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . ./airbyte/integration_code +RUN pip install ./airbyte/integration_code + + +# The entrypoint and default env vars are already set in the base image +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] diff --git a/source-reddit-fetcher/README.md b/source-reddit-fetcher/README.md new file mode 100644 index 0000000..459c3ec --- /dev/null +++ b/source-reddit-fetcher/README.md @@ -0,0 +1,80 @@ +# Reddit Source + + +## Usage + +This connector fetch information from a selected subreddit. + +### Configuration + +The connector takes the following input: + +- `client_id` - Reddit's client ID for your account +- `client_secret` - Reddit's client secret for your account +- `username` - your Reddit username that has been used to generate the `client_id` and `client_secret` +- `days` - used to calculate the stop date. Before that date votes and comments will not be monitored. Calculation: $stop\ date = today - days$ + +### Output + +The connector will return the following: + +* [Posts](./source-reddit-fetcher/schemas/posts.json) +* [PostVotes](./source-reddit-fetcher/schemas/posts_votes.json) +* [Comments](./source-reddit-fetcher/schemas/comments.json) +* [CommentsVotes](./source-reddit-fetcher/schemas/comments_votes.json) + +## Local development + +### Prerequisites + +#### Activate Virtual Environment and install dependencies + +From this connector directory, create a virtual environment: + +``` +python -m venv .venv +``` + +``` +source .venv/bin/activate +pip install -r requirements.txt +``` + +### Locally running the connector + +``` +python main.py spec +python main.py check --config sample_files/config-example.json +python main.py discover --config sample_files/config-example.json +python main.py read --config sample_files/config-example.json --catalog sample_files/configured_catalog.json +``` + +### Locally running the connector docker image + +```bash +docker build -t airbyte/source-reddit-fetcher:dev . +# Running the spec command against your patched connector +docker run airbyte/source-reddit-fetcher:dev spec +``` + +#### Run + +Then run any of the connector commands as follows: + +#### Linux / MAC OS + +``` +docker run --rm airbyte/source-reddit-fetcher:dev spec +docker run --rm -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev check --config /sample_files/config-example.json +docker run --rm -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev discover --config /sample_files/config-example.json +docker run --rm -v $(pwd)/sample_files:/sample_files airbyte/source-reddit-fetcher:dev read --config /sample_files/config-example.json --catalog /sample_files/configured_catalog.json +``` + +### Windows + +``` +docker run --rm airbyte/source-reddit-fetcher:dev spec +docker run --rm -v "$PWD\sample_files:/sample_files" airbyte/source-reddit-fetcher:dev check --config /sample_files/config-example.json +docker run --rm -v "$PWD\sample_files:/sample_files" airbyte/source-reddit-fetcher:dev discover --config /sample_files/config-example.json +docker run --rm -v "$PWD\sample_files:/sample_files" airbyte/source-reddit-fetcher:dev read --config /sample_files/config-example.json --catalog /sample_files/configured_catalog.json +``` \ No newline at end of file diff --git a/source-reddit-fetcher/main.py b/source-reddit-fetcher/main.py new file mode 100644 index 0000000..e5e1fdf --- /dev/null +++ b/source-reddit-fetcher/main.py @@ -0,0 +1,4 @@ +from source_reddit_fetcher.run import run + +if __name__ == "__main__": + run() \ No newline at end of file diff --git a/source-reddit-fetcher/metadata.yaml b/source-reddit-fetcher/metadata.yaml new file mode 100644 index 0000000..9a5bc86 --- /dev/null +++ b/source-reddit-fetcher/metadata.yaml @@ -0,0 +1,25 @@ +data: + allowedHosts: + registries: + oss: + enabled: false + cloud: + enabled: false + connectorBuildOptions: + baseImage: docker.io/airbyte/python-connector-base:1.0.0@sha256:dd17e347fbda94f7c3abff539be298a65af2d7fc27a307d89297df1081a45c27 + connectorSubtype: api + connectorType: source + definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920 + dockerImageTag: '0.0.1' + dockerRepository: harbor.status.im/bi/airbyte/source-reddit-fetcher + githubIssueLabel: source-reddit-fetcher + icon: twitter-fetcher.svg + license: MIT + name: Reddit Data Extractor + releaseDate: TODO + supportLevel: community + releaseStage: alpha + documentationUrl: https://docs.bi.status.im/extractions/reddit.html + tags: + - language:python +metadataSpecVersion: "1.0" diff --git a/source-reddit-fetcher/requirements.txt b/source-reddit-fetcher/requirements.txt new file mode 100644 index 0000000..1411a4a --- /dev/null +++ b/source-reddit-fetcher/requirements.txt @@ -0,0 +1 @@ +pandas \ No newline at end of file diff --git a/source-reddit-fetcher/sample_files/config-example.json b/source-reddit-fetcher/sample_files/config-example.json new file mode 100644 index 0000000..a0beddb --- /dev/null +++ b/source-reddit-fetcher/sample_files/config-example.json @@ -0,0 +1,7 @@ +{ + "client_id": "your_reddit_client_id", + "client_secret": "your_reddit_client_secret", + "username": "your_reddit_username", + "subreddit": "subreddit_name_to_be_monitored", + "days": 31 +} \ No newline at end of file diff --git a/source-reddit-fetcher/sample_files/configured_catalog.json b/source-reddit-fetcher/sample_files/configured_catalog.json new file mode 100644 index 0000000..79388bf --- /dev/null +++ b/source-reddit-fetcher/sample_files/configured_catalog.json @@ -0,0 +1,46 @@ +{ + "streams": [ + { + "stream": { + "name": "posts", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "full_refresh", "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "posts_votes", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "incremental" + ] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "comments", + "json_schema": { + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object" + }, + "supported_sync_modes": [ + "full_refresh", "incremental" + ] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] + } diff --git a/source-reddit-fetcher/setup.py b/source-reddit-fetcher/setup.py new file mode 100644 index 0000000..4c38da1 --- /dev/null +++ b/source-reddit-fetcher/setup.py @@ -0,0 +1,35 @@ +# +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk~=0.2", +] + +TEST_REQUIREMENTS = [ + "requests-mock~=1.9.3", + "pytest~=6.2", + "pytest-mock~=3.6.1", + "connector-acceptance-test", +] + +setup( + name="source-reddit-fetcher", + description="Source implementation for Reddit.", + author="Status", + author_email="bi@status.im", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, + entry_points={ + "console_scripts": [ + "source-reddit-connector=source_reddit_fetcher.run:run", + ], + }, +) diff --git a/source-reddit-fetcher/source_reddit_fetcher/__init__.py b/source-reddit-fetcher/source_reddit_fetcher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/source-reddit-fetcher/source_reddit_fetcher/run.py b/source-reddit-fetcher/source_reddit_fetcher/run.py new file mode 100644 index 0000000..e60476d --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/run.py @@ -0,0 +1,7 @@ +import sys +from airbyte_cdk.entrypoint import launch +from .source import SourceRedditFetcher + +def run(): + source = SourceRedditFetcher() + launch(source, sys.argv[1:]) diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json new file mode 100644 index 0000000..f1812c0 --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/comments.json @@ -0,0 +1,49 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "post_id": { + "type": "string" + }, + "subreddit": { + "type": "string" + }, + "comment_id": { + "type": "string" + }, + "created_timestamp": { + "type": "string", + "format": "date-time" + }, + "timezone": { + "type": "string" + }, + "parent_id": { + "type": "string" + }, + "author": { + "type": "string" + }, + "text": { + "type": "string" + }, + "html_text": { + "type": "string" + }, + "url": { + "type": "string" + }, + "ups": { + "type": "integer" + }, + "downs": { + "type": "integer" + }, + "score": { + "type": "integer" + } + } +} diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json new file mode 100644 index 0000000..6730d76 --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts.json @@ -0,0 +1,44 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": "string" + }, + "kind_tag": { + "type": "string" + }, + "kind_name": { + "type": "string" + }, + "subreddit": { + "type": "string" + }, + "post_id": { + "type": "string" + }, + "post_url": { + "type": "string", + "format": "uri" + }, + "created_timestamp": { + "type": "string", + "format": "date-time" + }, + "timezone": { + "type": "string" + }, + "title": { + "type": "string" + }, + "text": { + "type": "string" + }, + "html_text": { + "type": "string" + }, + "author": { + "type": "string" + } + } +} diff --git a/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json new file mode 100644 index 0000000..79a6077 --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/schemas/posts_votes.json @@ -0,0 +1,30 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "post_id": { + "type": ["null", "string"] + }, + "kind_name": { + "type": ["null", "string"] + }, + "kind": { + "type": ["null", "string"] + }, + "ups": { + "type": ["null", "integer"] + }, + "downs": { + "type": ["null", "integer"] + }, + "upvote_ratio": { + "type": ["null", "number"] + }, + "score": { + "type": ["null", "integer"] + } + } +} diff --git a/source-reddit-fetcher/source_reddit_fetcher/source.py b/source-reddit-fetcher/source_reddit_fetcher/source.py new file mode 100644 index 0000000..8d98027 --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/source.py @@ -0,0 +1,268 @@ +from abc import ABC +from typing import Any, Iterable, List, Mapping, Optional, Tuple +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +import logging, json, datetime, requests +import requests.auth +import pandas as pd +import os + +logger = logging.getLogger("airbyte") +BASE_URL = "https://www.reddit.com" + +class RedditCredentialsAuthentication(TokenAuthenticator): + + def __init__(self, client_id: str, client_secret: str, username: str, **kwargs): + headers = { + "User-Agent": f"python:app.client_credentials:v1.0 (by u/{username})" + } + data = { + "grant_type": "client_credentials" + } + + # https://github.com/reddit-archive/reddit/wiki/oauth2#authorization + url = f"{BASE_URL}/api/v1/access_token" + logger.info(f"Authentication URL: {url}") + + auth = requests.auth.HTTPBasicAuth(client_id, client_secret) + response = requests.post(url, auth=auth, data=data, headers=headers) + response.raise_for_status() + logger.info(f"Successfully connected to {url}") + info: dict = response.json() + + token: str = info.get("access_token") + auth_method: str = info.get("token_type") + if not token: + raise Exception("Could not fetch access token... Please further investigate!") + logger.info("Successfully fetched Reddit access token") + logger.info(f"Authentication method: {auth_method.title()}") + valid_hours = info["expires_in"] / (60 * 60) + logger.info(f"Token is valid for: {int(valid_hours)} hours") + + super().__init__(token, auth_method.title()) + + +class RedditStream(HttpStream, ABC): + + primary_key: Optional[str] = None + url_base = "https://oauth.reddit.com/" + + def __init__(self, days: int, subreddit: str, authenticator: requests.auth.AuthBase): + super().__init__(authenticator=authenticator) + + self.subreddit = subreddit + today_utc = datetime.datetime.now(datetime.timezone.utc).date() + self.start_date = (today_utc - pd.offsets.Day(days)).date() + + + def backoff_time(self, response: requests.Response) -> Optional[float]: + wait = response.headers.get("Retry-After") + if not wait: + wait = response.headers.get("x-ratelimit-reset") + if response.status_code == 429: + logger.warning(f"Raised too many requests at once! Waiting {wait}s...") + return float(wait) if wait else None + + def to_utc_timestamp(self, timestamp: float) -> datetime.datetime: + return datetime.datetime.fromtimestamp(timestamp, tz=datetime.timezone.utc) + + def request_params(self, stream_state: Optional[Mapping[str, Any]], stream_slice: Optional[Mapping[str, Any]] = None, next_page_token: Optional[Mapping[str, Any]] = None): + params = { + "limit": 100 + } + if next_page_token: + params.update(next_page_token) + + return params + +class Posts(RedditStream): + # Based on "type prefixes" + type_prefixes = { + "t1": "comment", + "t2": "account", + "t3": "link", + "t4": "message", + "t5": "subreddit", + "t6": "award" + } + + primary_key = "id" + cursor_field = "created_timestamp" + + def path(self, **kwargs): + return f"{self.url_base}/r/{self.subreddit}/new" + + + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + data: dict[str, Any] = response.json() + children: list[dict] = data.get("data", {}).get("children", []) + + for child in children: + data = child.get("data", {}) + if not data: + continue + row = { + "id": f"{self.subreddit}-{data['id']}", + "kind_tag": child["kind"], + "kind_name": self.type_prefixes[child["kind"]], + "subreddit": self.subreddit, + "post_id": data["id"], + "post_url": BASE_URL + data["permalink"], + "url": data["url"], + "domain": data["domain"], + "created_timestamp": datetime.datetime.fromtimestamp(data["created_utc"], tz=datetime.timezone.utc), + "timezone": "UTC", + "title": data["title"], + "text": data["selftext"], + "html_text": data["selftext_html"], + "author": data["author"], + "raw": json.dumps(data) + } + yield row + + + + def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: + data: dict = response.json() + after = data.get("data", {}).get("after") + return {"after": after} if after else None + + + def get_json_schema(self) -> Mapping[str, Any]: + schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") + with open(schema_path, "r") as f: + return json.load(f) + + + +class PostsVotes(RedditStream): + + primary_key = "id" + + def path(self, **kwargs): + return f"{self.url_base}/r/{self.subreddit}/new" + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + data: dict[str, Any] = response.json() + children: list[dict] = data.get("data", {}).get("children", []) + + for child in children: + data = child.get("data", {}) + if not data: + continue + created_utc = self.to_utc_timestamp(data["created_utc"]).date() + if self.start_date > created_utc: + break + + keys = ["ups", "downs", "upvote_ratio", "score"] + row = { + "id": f"{str(datetime.datetime.now().timestamp()).replace('.', '')}-{self.subreddit}-{data['id']}", + "post_id": f"{self.subreddit}-{data['id']}", + "kind": child["kind"], + "kind_name": self.type_prefixes[child["kind"]], + **{key: data[key] for key in keys}, + } + + yield row + + + + def next_page_token(self, response: requests.Response) -> Optional[dict[str, Any]]: + params = super().next_page_token(response) + if not params: + return None + data: dict[str, Any] = response.json() + children: list[dict] = data.get("data", {}).get("children", []) + + if self.start_date > self.to_utc_timestamp(children[-1]["data"]["created_utc"]).date(): + params = None + + return params + + + def get_json_schema(self) -> Mapping[str, Any]: + schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") + with open(schema_path, "r") as f: + return json.load(f) + + + +class Comments(HttpSubStream, Posts): + primary_key = "id" + cursor_field = "created_timestamp" + + def path(self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): + post: dict = stream_slice.get("parent") + post_id = post["post_id"] + url = f"{self.url_base}/r/{post.subreddit}/comments/{post_id}" + return url + + def parse_response(self, response: requests.Response, *, stream_state: Mapping[str, Any], stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None): + _, comments = response.json() + post_id = response.url.split("/")[-1].split("?")[0] + + for child in comments["data"]["children"]: + child_data: dict = child["data"] + row = { + "id": f"{self.subreddit}-{post_id}-{child_data['id']}", + "post_id": f"{self.subreddit}-{post_id}", + "subreddit": self.subreddit, + "comment_id": child_data['id'], + "created_timestamp": self.to_utc_timestamp(child_data["created_utc"]), + "timezone": "UTC", + "parent_id": child_data["parent_id"].split("_")[1], + "author": child_data.get("author"), + "text": child_data["body"], + "html_text": child_data["body_html"], + "url": BASE_URL + child_data["permalink"], + "ups": child_data["ups"], + "downs": child_data["downs"], + "score": child_data["score"] + } + yield row + + def next_page_token(self, response: requests.Response): + _, comments = response.json() + after = comments.get("data", {}).get("after") + return {"after": after} if after else None + + + def get_json_schema(self) -> Mapping[str, Any]: + schema_path = os.path.join(os.path.dirname(__file__), "schemas", f"{self.name}.json") + with open(schema_path, "r") as f: + return json.load(f) + +class SourceRedditFetcher(AbstractSource): + + def __init__(self): + super().__init__() + + def check_connection(self, logger: logging.Logger, config: dict) -> Tuple[bool, Any]: + logger.info(f"Config keys: {config.keys()}") + auth = RedditCredentialsAuthentication( + client_id=config["client_id"], + client_secret=config["client_secret"], + username=config["username"] + ) + url = BASE_URL + f"/r/" + config["subreddit"] + "/comments" + logger.info(f"Fetching: {url}") + resp = requests.get(url, auth=auth) + resp.raise_for_status() + return resp.status_code == 200, None + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + auth = RedditCredentialsAuthentication( + client_id=config["client_id"], + client_secret=config["client_secret"], + username=config["username"] + ) + posts = Posts(days=config["days"], subreddit=config["subreddit"], authenticator=auth) + streams = [ + posts, + PostsVotes(days=config["days"], subreddit=config["subreddit"], authenticator=auth), + Comments(days=config["days"], subreddit=config["subreddit"], authenticator=auth, parent=posts), + ] + return streams diff --git a/source-reddit-fetcher/source_reddit_fetcher/spec.yaml b/source-reddit-fetcher/source_reddit_fetcher/spec.yaml new file mode 100644 index 0000000..842e5de --- /dev/null +++ b/source-reddit-fetcher/source_reddit_fetcher/spec.yaml @@ -0,0 +1,28 @@ +documentationUrl: https://www.reddit.com/dev/api/ +connectionSpecification: + $schema: http://json-schema.org/draft-07/schema# + title: Reddit Fetcher + type: object + required: + - client_id + - client_secret + - username + - days + - subreddit + properties: + client_id: + type: string + description: "The Client ID (username) from the Reddit API" + client_secret: + type: string + description: "The Client Secret (password) from the Reddit API" + airbyte_secret: true + subreddit: + type: string + description: "The Subreddit that will be monitored" + username: + type: string + description: "The username of the Reddit account that has created the Client ID and Secret" + days: + type: integer + description: "How many days in the past to look for votes and comments"