diff --git a/source-twitter-fetcher/metadata.yaml b/source-twitter-fetcher/metadata.yaml index 49cfed3..d60dfe2 100644 --- a/source-twitter-fetcher/metadata.yaml +++ b/source-twitter-fetcher/metadata.yaml @@ -10,8 +10,8 @@ data: connectorSubtype: api connectorType: source definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920 - dockerImageTag: '1.0.0' - dockerRepository: harbor.status.im/bi/airbyte/source-twitter-fetcher + dockerImageTag: '2.0.0' + dockerRepository: harbor.status.im/bi/airbyte/source-twitter-fetcher githubIssueLabel: source-twitter-fetcher icon: twitter-fetcher.svg license: MIT diff --git a/source-twitter-fetcher/sample_files/config-example.json b/source-twitter-fetcher/sample_files/config-example.json index f98a0d2..9656266 100644 --- a/source-twitter-fetcher/sample_files/config-example.json +++ b/source-twitter-fetcher/sample_files/config-example.json @@ -1,24 +1,12 @@ { "credentials":{ - "client_id": "some-id", - "client_secret": "some-secret", - "access_token": "some-access-token", - "refresh_token": "some-refresh-token", - "token_expiry_date": "" + "client_id": "your_client_id", + "client_secret": "your_client_secret", + "access_token": "your_access_token", + "refresh_token": "your_refresh_token", + "token_expiry_date": "2024-12-31T00:00:00Z" }, - "account_id": "123456789", + "account_id": "your_account_id", "start_time": "2024-01-01T00:00:00Z", - "comment_days_limit": 2, - "filtered_author_ids": [ - "1417373828544487426", - "1527270456658632706", - "1573349900905054212", - "1636287274961829888", - "1101033576454340608", - "1151831284110385152", - "1083104775825252353", - "774689518767181828", - "1783824207631077376", - "18904639" - ] + "tags": ["@IFT", "@status", "@Airbyte"] } diff --git a/source-twitter-fetcher/sample_files/configured_catalog.json b/source-twitter-fetcher/sample_files/configured_catalog.json index 39cd7ff..394c83b 100644 --- a/source-twitter-fetcher/sample_files/configured_catalog.json +++ b/source-twitter-fetcher/sample_files/configured_catalog.json @@ -125,6 +125,17 @@ }, "sync_mode": "incremental", "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "tags", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "default_cursor_field": [] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" } ] } diff --git a/source-twitter-fetcher/source_twitter_fetcher/schemas/tags_stream.json b/source-twitter-fetcher/source_twitter_fetcher/schemas/tags_stream.json new file mode 100644 index 0000000..0f0a3dc --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/schemas/tags_stream.json @@ -0,0 +1,77 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "id": { + "type": ["null", "string"] + }, + "text": { + "type": ["null", "string"] + }, + "created_at": { + "type": ["null", "string"] + }, + "author_id": { + "type": ["null", "string"] + }, + "author_username": { + "type": ["null", "string"], + "description": "The Twitter handle/username of the tweet author (e.g., 'john_doe')" + }, + "author_name": { + "type": ["null", "string"], + "description": "The display name of the tweet author (e.g., 'John Doe')" + }, + "author_verified": { + "type": ["null", "boolean"], + "description": "Whether the tweet author is verified" + }, + "conversation_id": { + "type": ["null", "string"] + }, + "reply_settings": { + "type": ["null", "string"] + }, + "matched_tag": { + "type": ["null", "string"], + "description": "The tag that matched this tweet (e.g., '@IFT' or '@status')" + }, + "referenced_tweets": { + "type": ["null", "array"], + "items": { + "type": ["object"], + "properties":{ + "type": { + "type": ["null", "string"] + }, + "id": { + "type": ["null", "string"] + } + } + } + }, + "public_metrics": { + "type": ["null", "object"], + "properties": { + "retweet_count": { + "type": ["null", "number"] + }, + "reply_count": { + "type": ["null", "number"] + }, + "like_count": { + "type": ["null", "number"] + }, + "quote_count": { + "type": ["null", "number"] + }, + "impression_count": { + "type": ["null", "number"] + }, + "bookmark_count": { + "type": ["null", "number"] + } + } + } + } +} \ No newline at end of file diff --git a/source-twitter-fetcher/source_twitter_fetcher/source.py b/source-twitter-fetcher/source_twitter_fetcher/source.py index a174866..1333304 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/source.py +++ b/source-twitter-fetcher/source_twitter_fetcher/source.py @@ -5,9 +5,10 @@ from airbyte_cdk.sources.streams import Stream from .tweets_stream import Account, Tweet, TweetMetrics, TweetPromoted +from .tweets_comments_stream import TweetComments from .ads_stream import PromotedTweetActive, PromotedTweetBilling, PromotedTweetEngagement from .spaces_stream import Space -from .tweets_comments_stream import TweetComments +from .tags_stream import TagsStream from .auth import TwitterOAuth DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ" @@ -22,11 +23,35 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: token_refresh_endpoint="https://api.x.com/2/oauth2/token" ) - tweet = Tweet( - authenticator=auth, - account_id=config["account_id"], - start_time=datetime.strptime(config['start_time'], DATE_FORMAT), - ) + # Parse start_time if provided, otherwise streams will use their defaults + start_time = None + if "start_time" in config: + start_time = datetime.strptime(config['start_time'], DATE_FORMAT) + + tweet_kwargs = { + "authenticator": auth, + "account_id": config["account_id"] + } + if start_time: + tweet_kwargs["start_time"] = start_time + + tweet = Tweet(**tweet_kwargs) + + tags_kwargs = { + "authenticator": auth, + "account_id": config["account_id"], + "tags": config["tags"] + } + + # Add start_time only if provided in config + if start_time: + tags_kwargs["start_time"] = start_time + + # Add tags_frequent_extractions if provided in config + if "tags_frequent_extractions" in config: + tags_kwargs["tags_frequent_extractions"] = config["tags_frequent_extractions"] + + tags = TagsStream(**tags_kwargs) tweet_metrics = TweetMetrics( authenticator=auth, @@ -40,19 +65,24 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: parent=tweet ) - tweet_comments = TweetComments( - authenticator=auth, - account_id=config['account_id'], - parent=tweet, - comment_days_limit=config.get('comment_days_limit', 2), - filtered_author_ids=config.get('filtered_author_ids', []) - ) + tweet_comments_kwargs = { + "authenticator": auth, + "account_id": config['account_id'], + "parent": tweet + } + if start_time: + tweet_comments_kwargs["start_time"] = start_time + + tweet_comments = TweetComments(**tweet_comments_kwargs) - promoted_tweet_active = PromotedTweetActive( - authenticator=auth, - account_id=config['account_id'], - start_time=datetime.strptime(config['start_time'], DATE_FORMAT), - ) + promoted_tweet_active_kwargs = { + "authenticator": auth, + "account_id": config['account_id'] + } + if start_time: + promoted_tweet_active_kwargs["start_time"] = start_time + + promoted_tweet_active = PromotedTweetActive(**promoted_tweet_active_kwargs) promoted_tweet_billing = PromotedTweetBilling( authenticator=auth, @@ -66,11 +96,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: parent=promoted_tweet_active ) - space = Space( - authenticator=auth, - account_id=config['account_id'], - start_time=datetime.strptime(config['start_time'], DATE_FORMAT) - ) + space_kwargs = { + "authenticator": auth, + "account_id": config['account_id'] + } + if start_time: + space_kwargs["start_time"] = start_time + + space = Space(**space_kwargs) return [ Account(authenticator=auth, account_id=config["account_id"]), @@ -81,5 +114,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: promoted_tweet_active, promoted_tweet_billing, promoted_tweet_engagement, - space + space, + tags ] diff --git a/source-twitter-fetcher/source_twitter_fetcher/spec.yaml b/source-twitter-fetcher/source_twitter_fetcher/spec.yaml index f0f2f6d..af0e826 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/spec.yaml +++ b/source-twitter-fetcher/source_twitter_fetcher/spec.yaml @@ -6,7 +6,7 @@ connectionSpecification: required: - credentials - account_id - - start_time + - tags properties: credentials: title: Twitter Dev account Credentials @@ -42,19 +42,17 @@ connectionSpecification: description: "Id of the Twitter Account to fetch info from" start_time: type: string - description: "Start date of fetching data" + description: "Start date of fetching data. If not provided, defaults to 5 days before current time." format: datetime - comment_days_limit: - type: integer - title: "Comment Days Limit" - description: "Number of days to look back for comments on tweets (default: 2)" - default: 2 - minimum: 1 - maximum: 7 - filtered_author_ids: - type: array - title: "Filtered Author IDs" - description: "List of Twitter author IDs to filter out from comments (e.g., your own organization's account IDs)" + tags: + type: array + title: "Tags to Monitor" + description: "List of Twitter handles to monitor (e.g., ['@IFT', '@Airbyte'])" items: - type: string - default: [] + type: string + minItems: 1 + tags_frequent_extractions: + type: boolean + title: "Tags Frequent Extractions" + description: "If true, defaults start_time to 1 hour 15 minutes before current time for more frequent extractions. If false, defaults to 5 days before current time." + default: false diff --git a/source-twitter-fetcher/source_twitter_fetcher/tags_stream.py b/source-twitter-fetcher/source_twitter_fetcher/tags_stream.py new file mode 100644 index 0000000..92bf27f --- /dev/null +++ b/source-twitter-fetcher/source_twitter_fetcher/tags_stream.py @@ -0,0 +1,94 @@ +from typing import Any, Iterable, Mapping, MutableMapping, Optional, List +import logging +import requests +import time +from datetime import datetime, timedelta +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream + +from .tweets_stream import TwitterStream + +logger = logging.getLogger("airbyte") + +class TagsStream(TwitterStream): + primary_key = "id" + + def __init__(self, start_time: str = None, account_id: str = None, tags: List[str] = None, tags_frequent_extractions: bool = False, **kwargs): + super().__init__(start_time=start_time, account_id=account_id, **kwargs) + + if not self.start_time: + if tags_frequent_extractions: + # Default to 1 hour 15 minutes before current time + self.start_time = datetime.utcnow() - timedelta(hours=1, minutes=15) + else: + # Default to 5 days before current time + self.start_time = datetime.utcnow() - timedelta(days=5) + + self.tags = tags or [] + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + for tag in self.tags: + yield {"tag": tag} + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None + ) -> str: + return "tweets/search/recent" # this endpoint fetches data from the last 7 days + + def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]: + if 'meta' in response.json() and 'next_token' in response.json()['meta'] and response.json()['meta']['result_count'] > 0: + logger.debug('DBG-NT: %s', response.json()['meta']['next_token']) + return {"next_token": response.json()['meta']['next_token']} + + def request_params( + self, + next_page_token: Optional[Mapping[str, Any]] = None, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + tag = stream_slice["tag"] + params = { + "query": tag, + "tweet.fields": "text,public_metrics,author_id,referenced_tweets,created_at", + "expansions": "author_id", + "user.fields": "username,name,verified,public_metrics", + "max_results": 100 + } + params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")}) + if next_page_token: + params.update(**next_page_token) + return params + + def parse_response( + self, + response: requests.Response, + stream_slice: Mapping[str, Any] = None, + **kwargs + ) -> Iterable[Mapping]: + logger.debug("Full response %s", response.json()) + response_data = response.json() + + # Create a mapping of user_id to user info for quick lookup because user data is returned separately in the includes.users array, you need to manually join them using the author_id as the key + users_map = {} + if 'includes' in response_data and 'users' in response_data['includes']: + for user in response_data['includes']['users']: + users_map[user['id']] = user + + if 'data' in response_data: + data = response_data['data'] + for t in data: + t["matched_tag"] = stream_slice["tag"] + + if t.get('author_id') and t['author_id'] in users_map: + user_info = users_map[t['author_id']] + t["author_username"] = user_info.get('username') + t["author_name"] = user_info.get('name') + t["author_verified"] = user_info.get('verified') + + yield t + self._apply_rate_limiting() + + \ No newline at end of file diff --git a/source-twitter-fetcher/source_twitter_fetcher/tweets_comments_stream.py b/source-twitter-fetcher/source_twitter_fetcher/tweets_comments_stream.py index 049788a..cb70488 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/tweets_comments_stream.py +++ b/source-twitter-fetcher/source_twitter_fetcher/tweets_comments_stream.py @@ -14,8 +14,8 @@ class TweetComments(HttpSubStream, Tweet): primary_key = "id" cursor_field = "created_at" - def __init__(self, comment_days_limit: int = 2, filtered_author_ids: List[str] = None, **kwargs): - super().__init__(**kwargs) + def __init__(self, start_time: str = None, comment_days_limit: int = 2, filtered_author_ids: List[str] = None, **kwargs): + super().__init__(start_time=start_time, **kwargs) self.comment_days_limit = comment_days_limit self.limit_date = datetime.now() - timedelta(days=self.comment_days_limit) # Use provided filtered_author_ids or default to empty list diff --git a/source-twitter-fetcher/source_twitter_fetcher/tweets_stream.py b/source-twitter-fetcher/source_twitter_fetcher/tweets_stream.py index fe436b6..7f4d713 100644 --- a/source-twitter-fetcher/source_twitter_fetcher/tweets_stream.py +++ b/source-twitter-fetcher/source_twitter_fetcher/tweets_stream.py @@ -55,6 +55,13 @@ def parse_response( class Tweet(TwitterStream): primary_key = "id" + def __init__(self, start_time: str = None, account_id: str = None, **kwargs): + super().__init__(start_time=start_time, account_id=account_id, **kwargs) + + # Set default start_time if not provided (5 days before current time) + if not self.start_time: + self.start_time = datetime.utcnow() - timedelta(days=5) + @property def use_cache(self) -> bool: return True