1
+ from typing import Any , Iterable , Mapping , MutableMapping , Optional , List
2
+ import logging
3
+ import requests
4
+ import time
5
+ from datetime import datetime
6
+ from airbyte_cdk .sources .streams import Stream
7
+ from airbyte_cdk .sources .streams .http import HttpStream
8
+
9
+ logger = logging .getLogger ("airbyte" )
10
+
11
+ class TagsStream (HttpStream ):
12
+ url_base = "https://api.x.com/2/"
13
+ primary_key = "id"
14
+
15
+ def __init__ (self , start_time : str = None , account_id : str = None , tags : List [str ] = None , ** kwargs ):
16
+ super ().__init__ (** kwargs )
17
+ self .start_time = start_time
18
+ self .account_id = account_id
19
+ self .tags = tags or []
20
+
21
+ def stream_slices (self , ** kwargs ) -> Iterable [Optional [Mapping [str , Any ]]]:
22
+ for tag in self .tags :
23
+ yield {"tag" : tag }
24
+
25
+ def path (
26
+ self ,
27
+ stream_state : Mapping [str , Any ] = None ,
28
+ stream_slice : Mapping [str , Any ] = None ,
29
+ next_page_token : Mapping [str , Any ] = None
30
+ ) -> str :
31
+ return "tweets/search/recent" # this endpoint fetches data from the last 7 days
32
+
33
+ def next_page_token (self , response : requests .Response ) -> Optional [Mapping [str , Any ]]:
34
+ if 'meta' in response .json () and 'next_token' in response .json ()['meta' ] and response .json ()['meta' ]['result_count' ] > 0 :
35
+ logger .debug ('DBG-NT: %s' , response .json ()['meta' ]['next_token' ])
36
+ return {"next_token" : response .json ()['meta' ]['next_token' ]}
37
+
38
+ def request_params (
39
+ self ,
40
+ next_page_token : Optional [Mapping [str , Any ]] = None ,
41
+ stream_state : Mapping [str , Any ] = None ,
42
+ stream_slice : Mapping [str , Any ] = None
43
+ ) -> MutableMapping [str , Any ]:
44
+ tag = stream_slice ["tag" ]
45
+ params = {
46
+ "query" : tag ,
47
+ "tweet.fields" : "text,public_metrics,author_id,referenced_tweets,created_at" ,
48
+ "max_results" : 100
49
+ }
50
+ params .update ({"start_time" : self .start_time .strftime ("%Y-%m-%dT%H:%M:%SZ" )})
51
+ if next_page_token :
52
+ params .update (** next_page_token )
53
+ return params
54
+
55
+ def parse_response (
56
+ self ,
57
+ response : requests .Response ,
58
+ stream_slice : Mapping [str , Any ] = None ,
59
+ ** kwargs
60
+ ) -> Iterable [Mapping ]:
61
+ logger .debug ("Full response %s" , response .json ())
62
+ if 'data' in response .json ():
63
+ data = response .json ()['data' ]
64
+ for t in data :
65
+ # Add the tag that matched this tweet
66
+ t ["matched_tag" ] = stream_slice ["tag" ]
67
+ yield t
68
+ time .sleep (2 ) # Rate limiting protection
0 commit comments