99from typing import Callable , Generator , Iterable , Optional , Protocol , Tuple
1010from urllib import parse
1111
12- from ld_eventsource import SSEClient as SSEClientImpl
12+ from ld_eventsource import SSEClient
1313from ld_eventsource .actions import Action , Event , Fault
1414from ld_eventsource .config import (
1515 ConnectStrategy ,
5454STREAMING_ENDPOINT = "/sdk/stream"
5555
5656
57- class SSEClient (Protocol ): # pylint: disable=too-few-public-methods
58- """
59- SSEClient is a protocol that defines the interface for a client that can
60- connect to a Server-Sent Events (SSE) stream and provide an iterable of
61- actions received from that stream.
62- """
63-
64- @property
65- @abstractmethod
66- def all (self ) -> Iterable [Action ]:
67- """
68- Returns an iterable of all actions received from the SSE stream.
69- """
70- raise NotImplementedError
71-
72-
7357SseClientBuilder = Callable [[Config ], SSEClient ]
7458
7559
7660# TODO(sdk-1391): Pass a selector-retrieving function through so it can
7761# re-connect with the last known status.
78- def create_sse_client (config : Config ) -> SSEClientImpl :
62+ def create_sse_client (config : Config ) -> SSEClient :
7963 """ "
80- create_sse_client creates an SSEClientImpl instance configured to connect
64+ create_sse_client creates an SSEClient instance configured to connect
8165 to the LaunchDarkly streaming endpoint.
8266 """
8367 uri = config .stream_base_uri + STREAMING_ENDPOINT
68+ if config .payload_filter_key is not None :
69+ uri += "?%s" % parse .urlencode ({"filter" : config .payload_filter_key })
8470
8571 # We don't want the stream to use the same read timeout as the rest of the SDK.
8672 http_factory = _http_factory (config )
@@ -90,7 +76,7 @@ def create_sse_client(config: Config) -> SSEClientImpl:
9076 override_read_timeout = STREAM_READ_TIMEOUT ,
9177 )
9278
93- return SSEClientImpl (
79+ return SSEClient (
9480 connect = ConnectStrategy .http (
9581 url = uri ,
9682 headers = http_factory .base_headers ,
@@ -119,15 +105,11 @@ class StreamingDataSource(Synchronizer):
119105 from the streaming data source.
120106 """
121107
122- def __init__ (
123- self , config : Config , sse_client_builder : SseClientBuilder = create_sse_client
124- ):
125- self ._sse_client_builder = sse_client_builder
126- self ._uri = config .stream_base_uri + STREAMING_ENDPOINT
127- if config .payload_filter_key is not None :
128- self ._uri += "?%s" % parse .urlencode ({"filter" : config .payload_filter_key })
108+ def __init__ (self , config : Config ):
109+ self ._sse_client_builder = create_sse_client
129110 self ._config = config
130111 self ._sse : Optional [SSEClient ] = None
112+ self ._running = False
131113
132114 @property
133115 def name (self ) -> str :
@@ -142,13 +124,13 @@ def sync(self) -> Generator[Update, None, None]:
142124 Update objects until the connection is closed or an unrecoverable error
143125 occurs.
144126 """
145- log .info ("Starting StreamingUpdateProcessor connecting to uri: %s" , self ._uri )
146127 self ._sse = self ._sse_client_builder (self ._config )
147128 if self ._sse is None :
148129 log .error ("Failed to create SSE client for streaming updates." )
149130 return
150131
151132 change_set_builder = ChangeSetBuilder ()
133+ self ._running = True
152134
153135 for action in self ._sse .all :
154136 if isinstance (action , Fault ):
@@ -177,8 +159,7 @@ def sync(self) -> Generator[Update, None, None]:
177159 log .info (
178160 "Error while handling stream event; will restart stream: %s" , e
179161 )
180- # TODO(sdk-1409)
181- # self._sse.interrupt()
162+ self ._sse .interrupt ()
182163
183164 (update , should_continue ) = self ._handle_error (e )
184165 if update is not None :
@@ -189,8 +170,7 @@ def sync(self) -> Generator[Update, None, None]:
189170 log .info (
190171 "Error while handling stream event; will restart stream: %s" , e
191172 )
192- # TODO(sdk-1409)
193- # self._sse.interrupt()
173+ self ._sse .interrupt ()
194174
195175 yield Update (
196176 state = DataSourceState .INTERRUPTED ,
@@ -210,27 +190,16 @@ def sync(self) -> Generator[Update, None, None]:
210190 # DataSourceState.VALID, None
211191 # )
212192
213- # if not self._ready.is_set():
214- # log.info("StreamingUpdateProcessor initialized ok.")
215- # self._ready.set()
216-
217- # TODO(sdk-1409)
218- # self._sse.close()
219-
220- # TODO(sdk-1409)
221- # def stop(self):
222- # self.__stop_with_error_info(None)
223- #
224- # def __stop_with_error_info(self, error: Optional[DataSourceErrorInfo]):
225- # log.info("Stopping StreamingUpdateProcessor")
226- # self._running = False
227- # if self._sse:
228- # self._sse.close()
229- #
230- # if self._data_source_update_sink is None:
231- # return
232- #
233- # self._data_source_update_sink.update_status(DataSourceState.OFF, error)
193+ self ._sse .close ()
194+
195+ def stop (self ):
196+ """
197+ Stops the streaming synchronizer, closing any open connections.
198+ """
199+ log .info ("Stopping StreamingUpdateProcessor" )
200+ self ._running = False
201+ if self ._sse :
202+ self ._sse .close ()
234203
235204 # pylint: disable=too-many-return-statements
236205 def _process_message (
@@ -317,8 +286,8 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
317286 If an update is provided, it should be forward upstream, regardless of
318287 whether or not we are going to retry this failure.
319288 """
320- # if not self._running:
321- # return (False, None ) # don't retry if we've been deliberately stopped
289+ if not self ._running :
290+ return (None , False ) # don't retry if we've been deliberately stopped
322291
323292 update : Optional [Update ] = None
324293
@@ -362,10 +331,7 @@ def _handle_error(self, error: Exception) -> Tuple[Optional[Update], bool]:
362331
363332 if not is_recoverable :
364333 log .error (http_error_message_result )
365- # TODO(sdk-1409)
366- # self._ready.set() # if client is initializing, make it stop waiting; has no effect if already inited
367- # self.__stop_with_error_info(error_info)
368- # self.stop()
334+ self .stop ()
369335 return (update , False )
370336
371337 log .warning (http_error_message_result )
@@ -391,8 +357,7 @@ def __enter__(self):
391357 return self
392358
393359 def __exit__ (self , type , value , traceback ):
394- # self.stop()
395- pass
360+ self .stop ()
396361
397362
398363class StreamingDataSourceBuilder : # disable: pylint: disable=too-few-public-methods
0 commit comments