Skip to content

twitter : new stream to extract tweets tagging us #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions source-twitter-fetcher/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ data:
connectorSubtype: api
connectorType: source
definitionId: 1c448bfb-8950-478c-9ae0-f03aaaf4e920
dockerImageTag: '1.0.0'
dockerRepository: harbor.status.im/bi/airbyte/source-twitter-fetcher
dockerImageTag: '2.0.0'
dockerRepository: harbor.status.im/bi/airbyte/source-twitter-fetcher
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you add a whitespace at the end ?

githubIssueLabel: source-twitter-fetcher
icon: twitter-fetcher.svg
license: MIT
Expand Down
26 changes: 7 additions & 19 deletions source-twitter-fetcher/sample_files/config-example.json
Original file line number Diff line number Diff line change
@@ -1,24 +1,12 @@
{
"credentials":{
"client_id": "some-id",
"client_secret": "some-secret",
"access_token": "some-access-token",
"refresh_token": "some-refresh-token",
"token_expiry_date": ""
"client_id": "your_client_id",
"client_secret": "your_client_secret",
"access_token": "your_access_token",
"refresh_token": "your_refresh_token",
"token_expiry_date": "2024-12-31T00:00:00Z"
},
"account_id": "123456789",
"account_id": "your_account_id",
"start_time": "2024-01-01T00:00:00Z",
"comment_days_limit": 2,
"filtered_author_ids": [
"1417373828544487426",
"1527270456658632706",
"1573349900905054212",
"1636287274961829888",
"1101033576454340608",
"1151831284110385152",
"1083104775825252353",
"774689518767181828",
"1783824207631077376",
"18904639"
]
"tags": ["@IFT", "@status", "@Airbyte"]
}
11 changes: 11 additions & 0 deletions source-twitter-fetcher/sample_files/configured_catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@
},
"sync_mode": "incremental",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "tags",
"json_schema": {},
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false,
"default_cursor_field": []
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"id": {
"type": ["null", "string"]
},
"text": {
"type": ["null", "string"]
},
"created_at": {
"type": ["null", "string"]
},
"author_id": {
"type": ["null", "string"]
},
"author_username": {
"type": ["null", "string"],
"description": "The Twitter handle/username of the tweet author (e.g., 'john_doe')"
},
"author_name": {
"type": ["null", "string"],
"description": "The display name of the tweet author (e.g., 'John Doe')"
},
"author_verified": {
"type": ["null", "boolean"],
"description": "Whether the tweet author is verified"
},
"conversation_id": {
"type": ["null", "string"]
},
"reply_settings": {
"type": ["null", "string"]
},
"matched_tag": {
"type": ["null", "string"],
"description": "The tag that matched this tweet (e.g., '@IFT' or '@status')"
},
"referenced_tweets": {
"type": ["null", "array"],
"items": {
"type": ["object"],
"properties":{
"type": {
"type": ["null", "string"]
},
"id": {
"type": ["null", "string"]
}
}
}
},
"public_metrics": {
"type": ["null", "object"],
"properties": {
"retweet_count": {
"type": ["null", "number"]
},
"reply_count": {
"type": ["null", "number"]
},
"like_count": {
"type": ["null", "number"]
},
"quote_count": {
"type": ["null", "number"]
},
"impression_count": {
"type": ["null", "number"]
},
"bookmark_count": {
"type": ["null", "number"]
}
}
}
}
}
82 changes: 58 additions & 24 deletions source-twitter-fetcher/source_twitter_fetcher/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
from airbyte_cdk.sources.streams import Stream

from .tweets_stream import Account, Tweet, TweetMetrics, TweetPromoted
from .tweets_comments_stream import TweetComments
from .ads_stream import PromotedTweetActive, PromotedTweetBilling, PromotedTweetEngagement
from .spaces_stream import Space
from .tweets_comments_stream import TweetComments
from .tags_stream import TagsStream
from .auth import TwitterOAuth

DATE_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
Expand All @@ -22,11 +23,35 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
token_refresh_endpoint="https://api.x.com/2/oauth2/token"
)

tweet = Tweet(
authenticator=auth,
account_id=config["account_id"],
start_time=datetime.strptime(config['start_time'], DATE_FORMAT),
)
# Parse start_time if provided, otherwise streams will use their defaults
start_time = None
if "start_time" in config:
start_time = datetime.strptime(config['start_time'], DATE_FORMAT)

tweet_kwargs = {
"authenticator": auth,
"account_id": config["account_id"]
}
if start_time:
tweet_kwargs["start_time"] = start_time

tweet = Tweet(**tweet_kwargs)

tags_kwargs = {
"authenticator": auth,
"account_id": config["account_id"],
"tags": config["tags"]
}

# Add start_time only if provided in config
if start_time:
tags_kwargs["start_time"] = start_time

# Add tags_frequent_extractions if provided in config
if "tags_frequent_extractions" in config:
tags_kwargs["tags_frequent_extractions"] = config["tags_frequent_extractions"]

tags = TagsStream(**tags_kwargs)

tweet_metrics = TweetMetrics(
authenticator=auth,
Expand All @@ -40,19 +65,24 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
parent=tweet
)

tweet_comments = TweetComments(
authenticator=auth,
account_id=config['account_id'],
parent=tweet,
comment_days_limit=config.get('comment_days_limit', 2),
filtered_author_ids=config.get('filtered_author_ids', [])
)
tweet_comments_kwargs = {
"authenticator": auth,
"account_id": config['account_id'],
"parent": tweet
}
if start_time:
tweet_comments_kwargs["start_time"] = start_time

tweet_comments = TweetComments(**tweet_comments_kwargs)

promoted_tweet_active = PromotedTweetActive(
authenticator=auth,
account_id=config['account_id'],
start_time=datetime.strptime(config['start_time'], DATE_FORMAT),
)
promoted_tweet_active_kwargs = {
"authenticator": auth,
"account_id": config['account_id']
}
if start_time:
promoted_tweet_active_kwargs["start_time"] = start_time

promoted_tweet_active = PromotedTweetActive(**promoted_tweet_active_kwargs)

promoted_tweet_billing = PromotedTweetBilling(
authenticator=auth,
Expand All @@ -66,11 +96,14 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
parent=promoted_tweet_active
)

space = Space(
authenticator=auth,
account_id=config['account_id'],
start_time=datetime.strptime(config['start_time'], DATE_FORMAT)
)
space_kwargs = {
"authenticator": auth,
"account_id": config['account_id']
}
if start_time:
space_kwargs["start_time"] = start_time

space = Space(**space_kwargs)

return [
Account(authenticator=auth, account_id=config["account_id"]),
Expand All @@ -81,5 +114,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
promoted_tweet_active,
promoted_tweet_billing,
promoted_tweet_engagement,
space
space,
tags
]
28 changes: 13 additions & 15 deletions source-twitter-fetcher/source_twitter_fetcher/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ connectionSpecification:
required:
- credentials
- account_id
- start_time
- tags
properties:
credentials:
title: Twitter Dev account Credentials
Expand Down Expand Up @@ -42,19 +42,17 @@ connectionSpecification:
description: "Id of the Twitter Account to fetch info from"
start_time:
type: string
description: "Start date of fetching data"
description: "Start date of fetching data. If not provided, defaults to 5 days before current time."
format: datetime
comment_days_limit:
type: integer
title: "Comment Days Limit"
description: "Number of days to look back for comments on tweets (default: 2)"
default: 2
minimum: 1
maximum: 7
filtered_author_ids:
type: array
title: "Filtered Author IDs"
description: "List of Twitter author IDs to filter out from comments (e.g., your own organization's account IDs)"
tags:
type: array
title: "Tags to Monitor"
description: "List of Twitter handles to monitor (e.g., ['@IFT', '@Airbyte'])"
items:
type: string
default: []
type: string
minItems: 1
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why minimum 1 item ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess i will never know :'(

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no tag the connector will not work the "for tag in self.tags" loop never executes also the goal of the connector is to look for tags it doesn't make sense to have it empty that's why it is also a mandatory parameter

tags_frequent_extractions:
type: boolean
title: "Tags Frequent Extractions"
description: "If true, defaults start_time to 1 hour 15 minutes before current time for more frequent extractions. If false, defaults to 5 days before current time."
default: false
94 changes: 94 additions & 0 deletions source-twitter-fetcher/source_twitter_fetcher/tags_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from typing import Any, Iterable, Mapping, MutableMapping, Optional, List
import logging
import requests
import time
from datetime import datetime, timedelta
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http import HttpStream

from .tweets_stream import TwitterStream

logger = logging.getLogger("airbyte")

class TagsStream(TwitterStream):
primary_key = "id"

def __init__(self, start_time: str = None, account_id: str = None, tags: List[str] = None, tags_frequent_extractions: bool = False, **kwargs):
super().__init__(start_time=start_time, account_id=account_id, **kwargs)

if not self.start_time:
if tags_frequent_extractions:
# Default to 1 hour 15 minutes before current time
self.start_time = datetime.utcnow() - timedelta(hours=1, minutes=15)
else:
# Default to 5 days before current time
self.start_time = datetime.utcnow() - timedelta(days=5)

self.tags = tags or []

def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
for tag in self.tags:
yield {"tag": tag}

def path(
self,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None,
next_page_token: Mapping[str, Any] = None
) -> str:
return "tweets/search/recent" # this endpoint fetches data from the last 7 days

def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
if 'meta' in response.json() and 'next_token' in response.json()['meta'] and response.json()['meta']['result_count'] > 0:
logger.debug('DBG-NT: %s', response.json()['meta']['next_token'])
return {"next_token": response.json()['meta']['next_token']}
Comment on lines +41 to +44
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be mutualized in TwitterStream

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you thing ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree !

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end it can't be mutulalized here because :

  • TagsStream uses /tweets/search/recent
    Pagination parameter: next_token --> Used for searching tweets by query

  • TweetStream uses /users/{id}/tweets
    Pagination parameter: pagination_token --> used for getting user's timeline


def request_params(
self,
next_page_token: Optional[Mapping[str, Any]] = None,
stream_state: Mapping[str, Any] = None,
stream_slice: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
tag = stream_slice["tag"]
params = {
"query": tag,
"tweet.fields": "text,public_metrics,author_id,referenced_tweets,created_at",
"expansions": "author_id",
"user.fields": "username,name,verified,public_metrics",
"max_results": 100
}
params.update({"start_time": self.start_time.strftime("%Y-%m-%dT%H:%M:%SZ")})
if next_page_token:
params.update(**next_page_token)
return params

def parse_response(
self,
response: requests.Response,
stream_slice: Mapping[str, Any] = None,
**kwargs
) -> Iterable[Mapping]:
logger.debug("Full response %s", response.json())
response_data = response.json()

# Create a mapping of user_id to user info for quick lookup because user data is returned separately in the includes.users array, you need to manually join them using the author_id as the key
users_map = {}
if 'includes' in response_data and 'users' in response_data['includes']:
for user in response_data['includes']['users']:
users_map[user['id']] = user

if 'data' in response_data:
data = response_data['data']
for t in data:
t["matched_tag"] = stream_slice["tag"]

if t.get('author_id') and t['author_id'] in users_map:
user_info = users_map[t['author_id']]
t["author_username"] = user_info.get('username')
t["author_name"] = user_info.get('name')
t["author_verified"] = user_info.get('verified')

yield t
self._apply_rate_limiting()


Loading