From fe6832eb59296f9c01aca852ee01a208573a70ec Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 12 Aug 2025 18:42:11 +0200 Subject: [PATCH 01/23] initial streaming support --- UnleashClient/__init__.py | 80 ++++++++++++----- UnleashClient/constants.py | 4 +- UnleashClient/streaming/__init__.py | 2 + UnleashClient/streaming/manager.py | 132 ++++++++++++++++++++++++++++ pyproject.toml | 1 + 5 files changed, 198 insertions(+), 21 deletions(-) create mode 100644 UnleashClient/streaming/__init__.py create mode 100644 UnleashClient/streaming/manager.py diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 221d24d1..9e8f2078 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -23,6 +23,7 @@ REQUEST_TIMEOUT, SDK_NAME, SDK_VERSION, + APPLICATION_HEADERS, ) from UnleashClient.events import ( BaseEvent, @@ -35,6 +36,7 @@ aggregate_and_send_metrics, fetch_and_load_features, ) +from UnleashClient.streaming.manager import StreamingManager from .cache import BaseCache, FileCache from .utils import LOGGER, InstanceAllowType, InstanceCounter @@ -136,6 +138,8 @@ def __init__( scheduler_executor: Optional[str] = None, multiple_instance_mode: InstanceAllowType = InstanceAllowType.WARN, event_callback: Optional[Callable[[BaseEvent], None]] = None, + experimental_mode: Optional[dict] = None, + sse_client_factory: Optional[Callable] = None, ) -> None: custom_headers = custom_headers or {} custom_options = custom_options or {} @@ -169,6 +173,9 @@ def __init__( self.unleash_verbose_log_level = verbose_log_level self.unleash_event_callback = event_callback self._ready_callback = build_ready_callback(event_callback) + self.experimental_mode = experimental_mode + self._stream_manager: Optional[StreamingManager] = None + self._sse_client_factory = sse_client_factory self._do_instance_check(multiple_instance_mode) @@ -267,8 +274,10 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: try: base_headers = { **self.unleash_custom_headers, + **APPLICATION_HEADERS, "unleash-connection-id": self.connection_id, "unleash-appname": self.unleash_app_name, + "unleash-instanceid": self.unleash_instance_id, "unleash-sdk": f"{SDK_NAME}:{SDK_VERSION}", } @@ -277,7 +286,6 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "unleash-interval": self.unleash_metrics_interval_str_millis, } - # Setup metrics_args = { "url": self.unleash_url, "app_name": self.unleash_app_name, @@ -303,12 +311,16 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: self.unleash_request_timeout, ) - if fetch_toggles: + # Decide mode + mode = (self.experimental_mode or {}).get("type") if self.experimental_mode else None + format_mode = (self.experimental_mode or {}).get("format") if self.experimental_mode else None + + if fetch_toggles and (mode is None or (mode == "polling" and (format_mode in (None, "full")))): + # Default/full polling fetch_headers = { **base_headers, "unleash-interval": self.unleash_refresh_interval_str_millis, } - job_args = { "url": self.unleash_url, "app_name": self.unleash_app_name, @@ -324,26 +336,46 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "ready_callback": self._ready_callback, } job_func: Callable = fetch_and_load_features - else: + + job_func(**job_args) # initial fetch + self.unleash_scheduler.start() + self.fl_job = self.unleash_scheduler.add_job( + job_func, + trigger=IntervalTrigger( + seconds=int(self.unleash_refresh_interval), + jitter=self.unleash_refresh_jitter, + ), + executor=self.unleash_executor_name, + kwargs=job_args, + ) + elif fetch_toggles and mode == "streaming": # Streaming mode + + stream_headers = { + **base_headers, + "unleash-interval": self.unleash_refresh_interval_str_millis, + } + self._stream_manager = StreamingManager( + url=self.unleash_url, + headers=stream_headers, + request_timeout=self.unleash_request_timeout, + engine=self.engine, + on_ready=self._ready_callback, + sse_client_factory=self._sse_client_factory, + custom_options=self.unleash_custom_options, + ) + self._stream_manager.start() + + # Start metrics job only + self.unleash_scheduler.start() + else: # No fetching - only load from cache job_args = { "cache": self.cache, "engine": self.engine, "ready_callback": self._ready_callback, } - job_func = load_features - - job_func(**job_args) # type: ignore - # Start periodic jobs - self.unleash_scheduler.start() - self.fl_job = self.unleash_scheduler.add_job( - job_func, - trigger=IntervalTrigger( - seconds=int(self.unleash_refresh_interval), - jitter=self.unleash_refresh_jitter, - ), - executor=self.unleash_executor_name, - kwargs=job_args, - ) + load_features(**job_args) + self.unleash_scheduler.start() + if not self.unleash_disable_metrics: self.metric_job = self.unleash_scheduler.add_job( aggregate_and_send_metrics, @@ -396,7 +428,11 @@ def destroy(self) -> None: You shouldn't need this too much! """ - self.fl_job.remove() + try: + if self.fl_job: + self.fl_job.remove() + except Exception: # best-effort + pass if self.metric_job: self.metric_job.remove() @@ -411,7 +447,11 @@ def destroy(self) -> None: request_timeout=self.unleash_request_timeout, engine=self.engine, ) - + if self._stream_manager: + try: + self._stream_manager.stop() + except Exception: + pass self.unleash_scheduler.shutdown() self.cache.destroy() diff --git a/UnleashClient/constants.py b/UnleashClient/constants.py index f6ba034e..e3eac58e 100644 --- a/UnleashClient/constants.py +++ b/UnleashClient/constants.py @@ -6,7 +6,7 @@ REQUEST_TIMEOUT = 30 REQUEST_RETRIES = 3 METRIC_LAST_SENT_TIME = "mlst" -CLIENT_SPEC_VERSION = "5.1.9" +CLIENT_SPEC_VERSION = "5.2.0" # =Unleash= APPLICATION_HEADERS = { @@ -19,6 +19,8 @@ REGISTER_URL = "/client/register" FEATURES_URL = "/client/features" METRICS_URL = "/client/metrics" +DELTA_URL = "/client/delta" +STREAMING_URL = "/client/streaming" # Cache keys FAILED_STRATEGIES = "failed_strategies" diff --git a/UnleashClient/streaming/__init__.py b/UnleashClient/streaming/__init__.py new file mode 100644 index 00000000..8ea075cd --- /dev/null +++ b/UnleashClient/streaming/__init__.py @@ -0,0 +1,2 @@ +# ruff: noqa: F401 +# Streaming package diff --git a/UnleashClient/streaming/manager.py b/UnleashClient/streaming/manager.py new file mode 100644 index 00000000..1db92ebd --- /dev/null +++ b/UnleashClient/streaming/manager.py @@ -0,0 +1,132 @@ +import json +import threading +import time +from typing import Callable, Optional + +from ld_eventsource import SSEClient +from ld_eventsource.config import ConnectStrategy + +from UnleashClient.constants import STREAMING_URL +from UnleashClient.utils import LOGGER +from yggdrasil_engine.engine import UnleashEngine + + +class StreamingManager: + """ + Simple SSE streaming manager with reconnect and backoff. + Applies deltas to the engine by calling engine.take_state(raw_json). + """ + + def __init__( + self, + url: str, + headers: dict, + request_timeout: int, + engine: UnleashEngine, + on_ready: Optional[Callable[[], None]] = None, + sse_client_factory: Optional[Callable[[str, dict, int], SSEClient]] = None, + heartbeat_timeout: int = 60, + backoff_initial: float = 2.0, + backoff_max: float = 30.0, + backoff_jitter: float = 0.5, + custom_options: Optional[dict] = None, + ) -> None: + self._base_url = url.rstrip("/") + STREAMING_URL + self._headers = {**headers, "Accept": "text/event-stream"} + self._timeout = request_timeout + self._engine = engine + self._on_ready = on_ready + self._sse_factory = sse_client_factory + self._hb_timeout = heartbeat_timeout + self._backoff_initial = backoff_initial + self._backoff_max = backoff_max + self._backoff_jitter = backoff_jitter + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + self._hydrated = False + # Ensure urllib3 timeout is honored by ld_eventsource + base_options = custom_options or {} + if self._timeout is not None and "timeout" not in base_options: + base_options = {"timeout": self._timeout, **base_options} + self._custom_options = base_options + + def start(self): + if self._thread and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread(target=self._run, name="UnleashStreaming", daemon=True) + self._thread.start() + + def stop(self): + self._stop.set() + if self._thread: + self._thread.join(timeout=5) + + def _run(self): + try: + LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url) + + # Use LaunchDarkly EventSource client + if self._sse_factory: + client = self._sse_factory(self._base_url, self._headers, self._timeout) + else: + connect_strategy = ConnectStrategy.http( + self._base_url, + headers=self._headers, + urllib3_request_options=self._custom_options, + ) + client = SSEClient( + connect=connect_strategy, + initial_retry_delay=self._backoff_initial, + logger=LOGGER, + ) + + last_event_time = time.time() + + # Iterate over events; SSEClient handles reconnects with internal backoff/jitter + for event in client.events: + if self._stop.is_set(): + client.close() + break + if not event.event: + continue + + last_event_time = time.time() + + if event.event in ("unleash-connected", "unleash-updated"): + try: + data = event.data + if not data: + continue + # Apply under lock + with self._lock: + self._engine.take_state(data) + if event.event == "unleash-connected" and not self._hydrated: + self._hydrated = True + if self._on_ready: + try: + self._on_ready() + except Exception as cb_exc: + LOGGER.debug("Ready callback error: %s", cb_exc) + except Exception as exc: + LOGGER.warning("Error processing SSE event: %s", exc) + else: + LOGGER.debug("Ignoring SSE event type: %s", event.event) + + # Heartbeat timeout: trigger reconnect via SSEClient interrupt (uses internal retry) + if self._hb_timeout and (time.time() - last_event_time > self._hb_timeout): + LOGGER.warning("Heartbeat timeout exceeded; reconnecting") + try: + client.interrupt() + except Exception as _: + # If interrupt fails, close will end the loop; SSEClient will not retry when closed + client.close() + break + except Exception as exc: + LOGGER.warning("Streaming error: %s", exc) + finally: + try: + client.close() + except Exception: + pass diff --git a/pyproject.toml b/pyproject.toml index d09f1194..8930fe15 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,7 @@ dependencies=[ "python-dateutil", "semver < 4.0.0", "yggdrasil-engine", + "launchdarkly-eventsource", ] [project.urls] From db0a83777a0880cbe09da0ef4194a262cef6be0f Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 12 Aug 2025 19:03:49 +0200 Subject: [PATCH 02/23] streaming example --- examples/README.md | 13 ++++++++++ examples/requirements.txt | 2 ++ examples/streaming.py | 51 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) create mode 100644 examples/README.md create mode 100644 examples/requirements.txt create mode 100644 examples/streaming.py diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 00000000..a410fc1c --- /dev/null +++ b/examples/README.md @@ -0,0 +1,13 @@ +How to run the streaming example + +1) Create a virtualenv and install deps + + python -m venv .venv + source .venv/bin/activate + pip install -r examples/requirements.txt + +2) Run the example + + python examples/streaming.py + +You should see periodic prints of the target flag's state. diff --git a/examples/requirements.txt b/examples/requirements.txt new file mode 100644 index 00000000..ec414f1a --- /dev/null +++ b/examples/requirements.txt @@ -0,0 +1,2 @@ +UnleashClient +launchdarkly-eventsource diff --git a/examples/streaming.py b/examples/streaming.py new file mode 100644 index 00000000..bc956e73 --- /dev/null +++ b/examples/streaming.py @@ -0,0 +1,51 @@ +import os +import re +import time + +from UnleashClient import UnleashClient +from UnleashClient.events import UnleashEventType + +URL = "https://sandbox.getunleash.io/enterprise/api" +TOKEN = "hammer-1:development.4819f784fbe351f8a74982d43a68f53dcdbf74cdde554a5d0a81d997" +FLAG = "flag-page-hh" + +headers = {"Authorization": TOKEN} + +ready = False + +def event_callback(event): + global ready + if event.event_type == UnleashEventType.READY: + ready = True + +client = UnleashClient( + url=URL, + app_name="python-streaming-example", + instance_id="example-instance", + custom_headers=headers, + experimental_mode={"type": "streaming"}, + event_callback=event_callback, +) + +client.initialize_client() + +print("Waiting for hydration via streaming... (Ctrl+C to exit)") + +try: + while True: + if not client.is_initialized: + print("Client not initialized yet, waiting...") + time.sleep(0.2) + continue + if not ready: + # FIXME: check if for other modes this works in the same way + print("Waiting for hydration via streaming... (Ctrl+C to exit)") + time.sleep(0.2) + continue + enabled = client.is_enabled(FLAG, {"userId": "example"}) + print(f"{FLAG} enabled? {enabled}") + time.sleep(1) +except KeyboardInterrupt: + pass +finally: + client.destroy() From 86dd667b959a60e82a07dab2f3b27dcb0592a91a Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 12 Aug 2025 19:04:29 +0200 Subject: [PATCH 03/23] delta polling --- UnleashClient/__init__.py | 34 +++++++++ UnleashClient/api/delta.py | 69 +++++++++++++++++++ UnleashClient/periodic_tasks/__init__.py | 1 + .../periodic_tasks/fetch_and_apply_delta.py | 62 +++++++++++++++++ 4 files changed, 166 insertions(+) create mode 100644 UnleashClient/api/delta.py create mode 100644 UnleashClient/periodic_tasks/fetch_and_apply_delta.py diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 9e8f2078..5f027370 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -35,6 +35,7 @@ from UnleashClient.periodic_tasks import ( aggregate_and_send_metrics, fetch_and_load_features, + fetch_and_apply_delta, ) from UnleashClient.streaming.manager import StreamingManager @@ -348,6 +349,39 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: executor=self.unleash_executor_name, kwargs=job_args, ) + elif fetch_toggles and (mode == "polling" and format_mode == "delta"): + # Delta polling + fetch_headers = { + **base_headers, + "unleash-interval": self.unleash_refresh_interval_str_millis, + } + job_args = { + "url": self.unleash_url, + "app_name": self.unleash_app_name, + "instance_id": self.unleash_instance_id, + "headers": fetch_headers, + "custom_options": self.unleash_custom_options, + "cache": self.cache, + "engine": self.engine, + "request_timeout": self.unleash_request_timeout, + "request_retries": self.unleash_request_retries, + "event_callback": self.unleash_event_callback, + "ready_callback": self._ready_callback, + } + job_func: Callable = fetch_and_apply_delta + + # Eager run once + job_func(**job_args) + self.unleash_scheduler.start() + self.fl_job = self.unleash_scheduler.add_job( + job_func, + trigger=IntervalTrigger( + seconds=int(self.unleash_refresh_interval), + jitter=self.unleash_refresh_jitter, + ), + executor=self.unleash_executor_name, + kwargs=job_args, + ) elif fetch_toggles and mode == "streaming": # Streaming mode stream_headers = { diff --git a/UnleashClient/api/delta.py b/UnleashClient/api/delta.py new file mode 100644 index 00000000..9353a1b5 --- /dev/null +++ b/UnleashClient/api/delta.py @@ -0,0 +1,69 @@ +from typing import Optional, Tuple + +import requests +from requests.adapters import HTTPAdapter +from urllib3 import Retry + +from UnleashClient.constants import DELTA_URL +from UnleashClient.utils import LOGGER, log_resp_info + +# pylint: disable=broad-except +def get_feature_deltas( + url: str, + app_name: str, + instance_id: str, + headers: dict, + custom_options: dict, + request_timeout: int, + request_retries: int, + cached_etag: str = "", +) -> Tuple[Optional[str], str]: + """ + Retrieves feature deltas from the Unleash server. + + Returns a tuple of (raw_json_string_or_None, etag). + """ + try: + LOGGER.info("Getting feature deltas.") + + request_specific_headers = { + "UNLEASH-APPNAME": app_name, + "UNLEASH-INSTANCEID": instance_id, + } + + if cached_etag: + request_specific_headers["If-None-Match"] = cached_etag + + base_url = f"{url}{DELTA_URL}" + + adapter = HTTPAdapter( + max_retries=Retry(total=request_retries, status_forcelist=[500, 502, 504]) + ) + with requests.Session() as session: + session.mount("https://", adapter) + session.mount("http://", adapter) + resp = session.get( + base_url, + headers={**headers, **request_specific_headers}, + timeout=request_timeout, + **custom_options, + ) + + if resp.status_code not in [200, 304]: + log_resp_info(resp) + LOGGER.warning( + "Unleash Client delta fetch failed due to unexpected HTTP status code: %s", + resp.status_code, + ) + raise Exception("Unleash Client delta fetch failed!") + + etag = resp.headers.get("etag", "") + + if resp.status_code == 304: + return None, etag + + return resp.text, etag + except Exception as exc: + LOGGER.exception("Unleash Client delta fetch failed due to exception: %s", exc) + + return None, "" diff --git a/UnleashClient/periodic_tasks/__init__.py b/UnleashClient/periodic_tasks/__init__.py index a4391c50..10b479d9 100644 --- a/UnleashClient/periodic_tasks/__init__.py +++ b/UnleashClient/periodic_tasks/__init__.py @@ -1,3 +1,4 @@ # ruff: noqa: F401 from .fetch_and_load import fetch_and_load_features from .send_metrics import aggregate_and_send_metrics +from .fetch_and_apply_delta import fetch_and_apply_delta diff --git a/UnleashClient/periodic_tasks/fetch_and_apply_delta.py b/UnleashClient/periodic_tasks/fetch_and_apply_delta.py new file mode 100644 index 00000000..78a054ec --- /dev/null +++ b/UnleashClient/periodic_tasks/fetch_and_apply_delta.py @@ -0,0 +1,62 @@ +import uuid +from typing import Callable, Optional + +from yggdrasil_engine.engine import UnleashEngine + +from UnleashClient.api.delta import get_feature_deltas +from UnleashClient.cache import BaseCache +from UnleashClient.constants import DELTA_URL, ETAG +from UnleashClient.events import UnleashEventType, UnleashFetchedEvent +from UnleashClient.utils import LOGGER + +def fetch_and_apply_delta( + url: str, + app_name: str, + instance_id: str, + headers: dict, + custom_options: dict, + cache: BaseCache, + request_timeout: int, + request_retries: int, + engine: UnleashEngine, + event_callback: Optional[Callable] = None, + ready_callback: Optional[Callable] = None, +) -> None: + """ + Fetch delta payload and apply to engine with engine.take_state(raw_json). + Fires READY on first hydration (hydration event inside delta stream) and FETCHED on each successful delta. + """ + (delta_payload, etag) = get_feature_deltas( + url, + app_name, + instance_id, + headers, + custom_options, + request_timeout, + request_retries, + cache.get(ETAG), + ) + + if etag: + cache.set(ETAG, etag) + + if not delta_payload: + LOGGER.debug("No delta returned from server, nothing to apply.") + return + + try: + engine.take_state(delta_payload) + + if event_callback: + event = UnleashFetchedEvent( + event_type=UnleashEventType.FETCHED, + event_id=uuid.uuid4(), + raw_features=delta_payload, + ) + event_callback(event) + + # First hydration event as ready signal + if ready_callback and '"type":"hydration"' in delta_payload: + ready_callback() + except Exception as exc: + LOGGER.warning("Failed to apply delta: %s", exc) From f2eab4fb889ae207678adcacb13e2280ba0ebcf8 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 12 Aug 2025 19:09:12 +0200 Subject: [PATCH 04/23] formatting --- UnleashClient/__init__.py | 29 +++++++++++++------ UnleashClient/api/delta.py | 1 + UnleashClient/periodic_tasks/__init__.py | 2 +- .../periodic_tasks/fetch_and_apply_delta.py | 3 +- UnleashClient/streaming/manager.py | 27 ++++++++++------- examples/streaming.py | 2 ++ 6 files changed, 42 insertions(+), 22 deletions(-) diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 5f027370..e6a3415e 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -16,6 +16,7 @@ from UnleashClient.api import register_client from UnleashClient.constants import ( + APPLICATION_HEADERS, DISABLED_VARIATION, ETAG, METRIC_LAST_SENT_TIME, @@ -23,7 +24,6 @@ REQUEST_TIMEOUT, SDK_NAME, SDK_VERSION, - APPLICATION_HEADERS, ) from UnleashClient.events import ( BaseEvent, @@ -34,8 +34,8 @@ from UnleashClient.loader import load_features from UnleashClient.periodic_tasks import ( aggregate_and_send_metrics, - fetch_and_load_features, fetch_and_apply_delta, + fetch_and_load_features, ) from UnleashClient.streaming.manager import StreamingManager @@ -114,7 +114,7 @@ class UnleashClient: :param event_callback: Function to call if impression events are enabled. WARNING: Depending on your event library, this may have performance implications! """ - def __init__( + def __init__( # noqa: PLR0915 self, url: str, app_name: str, @@ -313,10 +313,21 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: ) # Decide mode - mode = (self.experimental_mode or {}).get("type") if self.experimental_mode else None - format_mode = (self.experimental_mode or {}).get("format") if self.experimental_mode else None + mode = ( + (self.experimental_mode or {}).get("type") + if self.experimental_mode + else None + ) + format_mode = ( + (self.experimental_mode or {}).get("format") + if self.experimental_mode + else None + ) - if fetch_toggles and (mode is None or (mode == "polling" and (format_mode in (None, "full")))): + if fetch_toggles and ( + mode is None + or (mode == "polling" and (format_mode in (None, "full"))) + ): # Default/full polling fetch_headers = { **base_headers, @@ -382,8 +393,8 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: executor=self.unleash_executor_name, kwargs=job_args, ) - elif fetch_toggles and mode == "streaming": # Streaming mode - + elif fetch_toggles and mode == "streaming": # Streaming mode + stream_headers = { **base_headers, "unleash-interval": self.unleash_refresh_interval_str_millis, @@ -401,7 +412,7 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: # Start metrics job only self.unleash_scheduler.start() - else: # No fetching - only load from cache + else: # No fetching - only load from cache job_args = { "cache": self.cache, "engine": self.engine, diff --git a/UnleashClient/api/delta.py b/UnleashClient/api/delta.py index 9353a1b5..d4683305 100644 --- a/UnleashClient/api/delta.py +++ b/UnleashClient/api/delta.py @@ -7,6 +7,7 @@ from UnleashClient.constants import DELTA_URL from UnleashClient.utils import LOGGER, log_resp_info + # pylint: disable=broad-except def get_feature_deltas( url: str, diff --git a/UnleashClient/periodic_tasks/__init__.py b/UnleashClient/periodic_tasks/__init__.py index 10b479d9..24a97d35 100644 --- a/UnleashClient/periodic_tasks/__init__.py +++ b/UnleashClient/periodic_tasks/__init__.py @@ -1,4 +1,4 @@ # ruff: noqa: F401 +from .fetch_and_apply_delta import fetch_and_apply_delta from .fetch_and_load import fetch_and_load_features from .send_metrics import aggregate_and_send_metrics -from .fetch_and_apply_delta import fetch_and_apply_delta diff --git a/UnleashClient/periodic_tasks/fetch_and_apply_delta.py b/UnleashClient/periodic_tasks/fetch_and_apply_delta.py index 78a054ec..f2171430 100644 --- a/UnleashClient/periodic_tasks/fetch_and_apply_delta.py +++ b/UnleashClient/periodic_tasks/fetch_and_apply_delta.py @@ -5,10 +5,11 @@ from UnleashClient.api.delta import get_feature_deltas from UnleashClient.cache import BaseCache -from UnleashClient.constants import DELTA_URL, ETAG +from UnleashClient.constants import ETAG from UnleashClient.events import UnleashEventType, UnleashFetchedEvent from UnleashClient.utils import LOGGER + def fetch_and_apply_delta( url: str, app_name: str, diff --git a/UnleashClient/streaming/manager.py b/UnleashClient/streaming/manager.py index 1db92ebd..c3cf6588 100644 --- a/UnleashClient/streaming/manager.py +++ b/UnleashClient/streaming/manager.py @@ -1,14 +1,13 @@ -import json import threading import time from typing import Callable, Optional from ld_eventsource import SSEClient from ld_eventsource.config import ConnectStrategy +from yggdrasil_engine.engine import UnleashEngine from UnleashClient.constants import STREAMING_URL from UnleashClient.utils import LOGGER -from yggdrasil_engine.engine import UnleashEngine class StreamingManager: @@ -55,7 +54,9 @@ def start(self): if self._thread and self._thread.is_alive(): return self._stop.clear() - self._thread = threading.Thread(target=self._run, name="UnleashStreaming", daemon=True) + self._thread = threading.Thread( + target=self._run, name="UnleashStreaming", daemon=True + ) self._thread.start() def stop(self): @@ -63,7 +64,8 @@ def stop(self): if self._thread: self._thread.join(timeout=5) - def _run(self): + def _run(self): # noqa: PLR0912 + client: Optional[SSEClient] = None try: LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url) @@ -107,26 +109,29 @@ def _run(self): if self._on_ready: try: self._on_ready() - except Exception as cb_exc: + except Exception as cb_exc: # noqa: BLE001 LOGGER.debug("Ready callback error: %s", cb_exc) - except Exception as exc: + except Exception as exc: # noqa: BLE001 LOGGER.warning("Error processing SSE event: %s", exc) else: LOGGER.debug("Ignoring SSE event type: %s", event.event) # Heartbeat timeout: trigger reconnect via SSEClient interrupt (uses internal retry) - if self._hb_timeout and (time.time() - last_event_time > self._hb_timeout): + if self._hb_timeout and ( + time.time() - last_event_time > self._hb_timeout + ): LOGGER.warning("Heartbeat timeout exceeded; reconnecting") try: client.interrupt() - except Exception as _: + except Exception: # noqa: BLE001 # If interrupt fails, close will end the loop; SSEClient will not retry when closed client.close() break - except Exception as exc: + except Exception as exc: # noqa: BLE001 LOGGER.warning("Streaming error: %s", exc) finally: try: - client.close() - except Exception: + if client is not None: + client.close() + except Exception: # noqa: BLE001 pass diff --git a/examples/streaming.py b/examples/streaming.py index bc956e73..841bb15a 100644 --- a/examples/streaming.py +++ b/examples/streaming.py @@ -13,11 +13,13 @@ ready = False + def event_callback(event): global ready if event.event_type == UnleashEventType.READY: ready = True + client = UnleashClient( url=URL, app_name="python-streaming-example", From b25e8202518979ef82afa53c3b91b484e0787656 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 12 Aug 2025 19:11:34 +0200 Subject: [PATCH 05/23] update example --- UnleashClient/__init__.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index e6a3415e..2066d370 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -328,7 +328,8 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: mode is None or (mode == "polling" and (format_mode in (None, "full"))) ): - # Default/full polling + # MODE: default - full polling + fetch_headers = { **base_headers, "unleash-interval": self.unleash_refresh_interval_str_millis, @@ -361,7 +362,8 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: kwargs=job_args, ) elif fetch_toggles and (mode == "polling" and format_mode == "delta"): - # Delta polling + # MODE: delta polling + fetch_headers = { **base_headers, "unleash-interval": self.unleash_refresh_interval_str_millis, @@ -380,8 +382,6 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "ready_callback": self._ready_callback, } job_func: Callable = fetch_and_apply_delta - - # Eager run once job_func(**job_args) self.unleash_scheduler.start() self.fl_job = self.unleash_scheduler.add_job( @@ -393,7 +393,8 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: executor=self.unleash_executor_name, kwargs=job_args, ) - elif fetch_toggles and mode == "streaming": # Streaming mode + elif fetch_toggles and mode == "streaming": + # MODE: streaming stream_headers = { **base_headers, From 5c5e5429987a7c3059f907aaf2e857702bdf20a1 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 19 Aug 2025 11:00:30 +0200 Subject: [PATCH 06/23] refactor: consolidate job arguments --- UnleashClient/__init__.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 2066d370..97da6504 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -298,6 +298,12 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "engine": self.engine, } + base_job_args = { + "cache": self.cache, + "engine": self.engine, + "ready_callback": self._ready_callback, + } + # Register app if not self.unleash_disable_registration: register_client( @@ -335,18 +341,16 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "unleash-interval": self.unleash_refresh_interval_str_millis, } job_args = { + **base_job_args, "url": self.unleash_url, "app_name": self.unleash_app_name, "instance_id": self.unleash_instance_id, "headers": fetch_headers, "custom_options": self.unleash_custom_options, - "cache": self.cache, - "engine": self.engine, "request_timeout": self.unleash_request_timeout, "request_retries": self.unleash_request_retries, "project": self.unleash_project_name, "event_callback": self.unleash_event_callback, - "ready_callback": self._ready_callback, } job_func: Callable = fetch_and_load_features @@ -369,17 +373,15 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "unleash-interval": self.unleash_refresh_interval_str_millis, } job_args = { + **base_job_args, "url": self.unleash_url, "app_name": self.unleash_app_name, "instance_id": self.unleash_instance_id, "headers": fetch_headers, "custom_options": self.unleash_custom_options, - "cache": self.cache, - "engine": self.engine, "request_timeout": self.unleash_request_timeout, "request_retries": self.unleash_request_retries, "event_callback": self.unleash_event_callback, - "ready_callback": self._ready_callback, } job_func: Callable = fetch_and_apply_delta job_func(**job_args) @@ -414,12 +416,7 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: # Start metrics job only self.unleash_scheduler.start() else: # No fetching - only load from cache - job_args = { - "cache": self.cache, - "engine": self.engine, - "ready_callback": self._ready_callback, - } - load_features(**job_args) + load_features(**base_job_args) self.unleash_scheduler.start() if not self.unleash_disable_metrics: From f7596b71e16da1f5704f409cfb1f627e80318680 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 19 Aug 2025 15:54:28 +0200 Subject: [PATCH 07/23] refactor: simplify mode selection --- UnleashClient/__init__.py | 59 +++++++----------------------- UnleashClient/streaming/manager.py | 4 +- 2 files changed, 15 insertions(+), 48 deletions(-) diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 97da6504..36436bed 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -318,7 +318,7 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: self.unleash_request_timeout, ) - # Decide mode + # Decide upstream connection mode mode = ( (self.experimental_mode or {}).get("type") if self.experimental_mode @@ -333,27 +333,30 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: if fetch_toggles and ( mode is None or (mode == "polling" and (format_mode in (None, "full"))) + or (mode == "polling" and format_mode == "delta") ): - # MODE: default - full polling + # MODE: polling (full fetching or delta API) + + if mode == "polling" and format_mode == "delta": + job_func: Callable = fetch_and_apply_delta + else: + job_func: Callable = fetch_and_load_features - fetch_headers = { - **base_headers, - "unleash-interval": self.unleash_refresh_interval_str_millis, - } job_args = { **base_job_args, "url": self.unleash_url, "app_name": self.unleash_app_name, "instance_id": self.unleash_instance_id, - "headers": fetch_headers, + "headers": { + **base_headers, + "unleash-interval": self.unleash_refresh_interval_str_millis, + }, "custom_options": self.unleash_custom_options, "request_timeout": self.unleash_request_timeout, "request_retries": self.unleash_request_retries, "project": self.unleash_project_name, "event_callback": self.unleash_event_callback, } - job_func: Callable = fetch_and_load_features - job_func(**job_args) # initial fetch self.unleash_scheduler.start() self.fl_job = self.unleash_scheduler.add_job( @@ -365,46 +368,12 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: executor=self.unleash_executor_name, kwargs=job_args, ) - elif fetch_toggles and (mode == "polling" and format_mode == "delta"): - # MODE: delta polling - - fetch_headers = { - **base_headers, - "unleash-interval": self.unleash_refresh_interval_str_millis, - } - job_args = { - **base_job_args, - "url": self.unleash_url, - "app_name": self.unleash_app_name, - "instance_id": self.unleash_instance_id, - "headers": fetch_headers, - "custom_options": self.unleash_custom_options, - "request_timeout": self.unleash_request_timeout, - "request_retries": self.unleash_request_retries, - "event_callback": self.unleash_event_callback, - } - job_func: Callable = fetch_and_apply_delta - job_func(**job_args) - self.unleash_scheduler.start() - self.fl_job = self.unleash_scheduler.add_job( - job_func, - trigger=IntervalTrigger( - seconds=int(self.unleash_refresh_interval), - jitter=self.unleash_refresh_jitter, - ), - executor=self.unleash_executor_name, - kwargs=job_args, - ) elif fetch_toggles and mode == "streaming": # MODE: streaming - stream_headers = { - **base_headers, - "unleash-interval": self.unleash_refresh_interval_str_millis, - } self._stream_manager = StreamingManager( url=self.unleash_url, - headers=stream_headers, + headers=base_headers, request_timeout=self.unleash_request_timeout, engine=self.engine, on_ready=self._ready_callback, @@ -413,7 +382,7 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: ) self._stream_manager.start() - # Start metrics job only + # Start for metrics job only self.unleash_scheduler.start() else: # No fetching - only load from cache load_features(**base_job_args) diff --git a/UnleashClient/streaming/manager.py b/UnleashClient/streaming/manager.py index c3cf6588..160208c3 100644 --- a/UnleashClient/streaming/manager.py +++ b/UnleashClient/streaming/manager.py @@ -69,7 +69,6 @@ def _run(self): # noqa: PLR0912 try: LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url) - # Use LaunchDarkly EventSource client if self._sse_factory: client = self._sse_factory(self._base_url, self._headers, self._timeout) else: @@ -101,8 +100,7 @@ def _run(self): # noqa: PLR0912 data = event.data if not data: continue - # Apply under lock - with self._lock: + with self._lock: # Apply under lock self._engine.take_state(data) if event.event == "unleash-connected" and not self._hydrated: self._hydrated = True From 394ac5f6979b3dc689b5fe428ba1d1eb8d080451 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 19 Aug 2025 16:16:42 +0200 Subject: [PATCH 08/23] refactor: update mode selection logic --- UnleashClient/__init__.py | 84 +++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 44 deletions(-) diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 36436bed..421fbba9 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -330,47 +330,8 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: else None ) - if fetch_toggles and ( - mode is None - or (mode == "polling" and (format_mode in (None, "full"))) - or (mode == "polling" and format_mode == "delta") - ): - # MODE: polling (full fetching or delta API) - - if mode == "polling" and format_mode == "delta": - job_func: Callable = fetch_and_apply_delta - else: - job_func: Callable = fetch_and_load_features - - job_args = { - **base_job_args, - "url": self.unleash_url, - "app_name": self.unleash_app_name, - "instance_id": self.unleash_instance_id, - "headers": { - **base_headers, - "unleash-interval": self.unleash_refresh_interval_str_millis, - }, - "custom_options": self.unleash_custom_options, - "request_timeout": self.unleash_request_timeout, - "request_retries": self.unleash_request_retries, - "project": self.unleash_project_name, - "event_callback": self.unleash_event_callback, - } - job_func(**job_args) # initial fetch - self.unleash_scheduler.start() - self.fl_job = self.unleash_scheduler.add_job( - job_func, - trigger=IntervalTrigger( - seconds=int(self.unleash_refresh_interval), - jitter=self.unleash_refresh_jitter, - ), - executor=self.unleash_executor_name, - kwargs=job_args, - ) - elif fetch_toggles and mode == "streaming": + if fetch_toggles and mode == "streaming": # MODE: streaming - self._stream_manager = StreamingManager( url=self.unleash_url, headers=base_headers, @@ -381,12 +342,47 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: custom_options=self.unleash_custom_options, ) self._stream_manager.start() + else: + if fetch_toggles: + # MODE: polling + + job_args = { + **base_job_args, + "url": self.unleash_url, + "app_name": self.unleash_app_name, + "instance_id": self.unleash_instance_id, + "headers": { + **base_headers, + "unleash-interval": self.unleash_refresh_interval_str_millis, + }, + "custom_options": self.unleash_custom_options, + "request_timeout": self.unleash_request_timeout, + "request_retries": self.unleash_request_retries, + "project": self.unleash_project_name, + "event_callback": self.unleash_event_callback, + } + + if format_mode == "delta": + job_func: Callable = fetch_and_apply_delta + else: + job_func: Callable = fetch_and_load_features + else: + # MODE: offline - # Start for metrics job only - self.unleash_scheduler.start() - else: # No fetching - only load from cache - load_features(**base_job_args) + job_args = base_job_args + job_func = load_features + + job_func(**job_args) # initial fetch self.unleash_scheduler.start() + self.fl_job = self.unleash_scheduler.add_job( + job_func, + trigger=IntervalTrigger( + seconds=int(self.unleash_refresh_interval), + jitter=self.unleash_refresh_jitter, + ), + executor=self.unleash_executor_name, + kwargs=job_args, + ) if not self.unleash_disable_metrics: self.metric_job = self.unleash_scheduler.add_job( From 6bbc1622423b808ee6ae1ae9934b10129e8e7e20 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 19 Aug 2025 17:01:45 +0200 Subject: [PATCH 09/23] refactor: enhance job scheduling logic for streaming and polling modes --- UnleashClient/__init__.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 421fbba9..2f9abd5b 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -318,6 +318,9 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: self.unleash_request_timeout, ) + # Start periodic jobs + self.unleash_scheduler.start() + # Decide upstream connection mode mode = ( (self.experimental_mode or {}).get("type") @@ -330,19 +333,7 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: else None ) - if fetch_toggles and mode == "streaming": - # MODE: streaming - self._stream_manager = StreamingManager( - url=self.unleash_url, - headers=base_headers, - request_timeout=self.unleash_request_timeout, - engine=self.engine, - on_ready=self._ready_callback, - sse_client_factory=self._sse_client_factory, - custom_options=self.unleash_custom_options, - ) - self._stream_manager.start() - else: + if mode != "streaming": if fetch_toggles: # MODE: polling @@ -372,8 +363,7 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: job_args = base_job_args job_func = load_features - job_func(**job_args) # initial fetch - self.unleash_scheduler.start() + job_func(**job_args) # type: ignore self.fl_job = self.unleash_scheduler.add_job( job_func, trigger=IntervalTrigger( @@ -383,6 +373,18 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: executor=self.unleash_executor_name, kwargs=job_args, ) + else: + # MODE: streaming + self._stream_manager = StreamingManager( + url=self.unleash_url, + headers=base_headers, + request_timeout=self.unleash_request_timeout, + engine=self.engine, + on_ready=self._ready_callback, + sse_client_factory=self._sse_client_factory, + custom_options=self.unleash_custom_options, + ) + self._stream_manager.start() if not self.unleash_disable_metrics: self.metric_job = self.unleash_scheduler.add_job( From fecb283224cee516a253b466dd1b0502775605bf Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 19 Aug 2025 17:04:52 +0200 Subject: [PATCH 10/23] fix: lint errors --- UnleashClient/__init__.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 2f9abd5b..97b843c7 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -353,10 +353,11 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "event_callback": self.unleash_event_callback, } + job_func: Callable if format_mode == "delta": - job_func: Callable = fetch_and_apply_delta + job_func = fetch_and_apply_delta else: - job_func: Callable = fetch_and_load_features + job_func = fetch_and_load_features else: # MODE: offline @@ -563,9 +564,9 @@ def get_variant(self, feature_name: str, context: Optional[dict] = None) -> dict event_type=UnleashEventType.VARIANT, event_id=uuid.uuid4(), context=context, - enabled=variant["enabled"], + enabled=bool(variant["enabled"]), feature_name=feature_name, - variant=variant["name"], + variant=str(variant["name"]), ) self.unleash_event_callback(event) From 9ffc01aafe5c9bef48f43750620c705763778cd4 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 19 Aug 2025 19:05:15 +0200 Subject: [PATCH 11/23] fix: update client specification version to 5.2.2 --- UnleashClient/constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/UnleashClient/constants.py b/UnleashClient/constants.py index e3eac58e..c34bfb4f 100644 --- a/UnleashClient/constants.py +++ b/UnleashClient/constants.py @@ -6,7 +6,7 @@ REQUEST_TIMEOUT = 30 REQUEST_RETRIES = 3 METRIC_LAST_SENT_TIME = "mlst" -CLIENT_SPEC_VERSION = "5.2.0" +CLIENT_SPEC_VERSION = "5.2.2" # =Unleash= APPLICATION_HEADERS = { From c4ffef3c8ba4e0be55b117f1bd6064583aae1b93 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Tue, 19 Aug 2025 19:07:25 +0200 Subject: [PATCH 12/23] chore: remove streaming example and its dependencies --- examples/README.md | 13 ---------- examples/requirements.txt | 2 -- examples/streaming.py | 53 --------------------------------------- 3 files changed, 68 deletions(-) delete mode 100644 examples/README.md delete mode 100644 examples/requirements.txt delete mode 100644 examples/streaming.py diff --git a/examples/README.md b/examples/README.md deleted file mode 100644 index a410fc1c..00000000 --- a/examples/README.md +++ /dev/null @@ -1,13 +0,0 @@ -How to run the streaming example - -1) Create a virtualenv and install deps - - python -m venv .venv - source .venv/bin/activate - pip install -r examples/requirements.txt - -2) Run the example - - python examples/streaming.py - -You should see periodic prints of the target flag's state. diff --git a/examples/requirements.txt b/examples/requirements.txt deleted file mode 100644 index ec414f1a..00000000 --- a/examples/requirements.txt +++ /dev/null @@ -1,2 +0,0 @@ -UnleashClient -launchdarkly-eventsource diff --git a/examples/streaming.py b/examples/streaming.py deleted file mode 100644 index 841bb15a..00000000 --- a/examples/streaming.py +++ /dev/null @@ -1,53 +0,0 @@ -import os -import re -import time - -from UnleashClient import UnleashClient -from UnleashClient.events import UnleashEventType - -URL = "https://sandbox.getunleash.io/enterprise/api" -TOKEN = "hammer-1:development.4819f784fbe351f8a74982d43a68f53dcdbf74cdde554a5d0a81d997" -FLAG = "flag-page-hh" - -headers = {"Authorization": TOKEN} - -ready = False - - -def event_callback(event): - global ready - if event.event_type == UnleashEventType.READY: - ready = True - - -client = UnleashClient( - url=URL, - app_name="python-streaming-example", - instance_id="example-instance", - custom_headers=headers, - experimental_mode={"type": "streaming"}, - event_callback=event_callback, -) - -client.initialize_client() - -print("Waiting for hydration via streaming... (Ctrl+C to exit)") - -try: - while True: - if not client.is_initialized: - print("Client not initialized yet, waiting...") - time.sleep(0.2) - continue - if not ready: - # FIXME: check if for other modes this works in the same way - print("Waiting for hydration via streaming... (Ctrl+C to exit)") - time.sleep(0.2) - continue - enabled = client.is_enabled(FLAG, {"userId": "example"}) - print(f"{FLAG} enabled? {enabled}") - time.sleep(1) -except KeyboardInterrupt: - pass -finally: - client.destroy() From 2cafba57b1fd637be0fecbce665b5b95a92ba071 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Wed, 20 Aug 2025 12:06:39 +0200 Subject: [PATCH 13/23] update yggdrasil --- pyproject.toml | 2 +- requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8930fe15..bfee8565 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ dependencies=[ "importlib_metadata", "python-dateutil", "semver < 4.0.0", - "yggdrasil-engine", + "yggdrasil-engine >= 1.0.0", "launchdarkly-eventsource", ] diff --git a/requirements.txt b/requirements.txt index b2bd7c15..75ec0b96 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ mmhash3 python-dateutil requests semver -yggdrasil-engine +yggdrasil-engine>=1.0.0 # Development packages # - Testing From ac8ce42d82e4c5857cd4c1e98dd4e3e6b0cf99b1 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Wed, 20 Aug 2025 14:04:59 +0200 Subject: [PATCH 14/23] fix: polling --- UnleashClient/__init__.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 97b843c7..437e0719 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -337,7 +337,7 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: if fetch_toggles: # MODE: polling - job_args = { + base_polling_args = { **base_job_args, "url": self.unleash_url, "app_name": self.unleash_app_name, @@ -349,14 +349,18 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "custom_options": self.unleash_custom_options, "request_timeout": self.unleash_request_timeout, "request_retries": self.unleash_request_retries, - "project": self.unleash_project_name, "event_callback": self.unleash_event_callback, } job_func: Callable if format_mode == "delta": + job_args = base_polling_args job_func = fetch_and_apply_delta else: + job_args = { + **base_polling_args, + "project": self.unleash_project_name, + } job_func = fetch_and_load_features else: # MODE: offline From 23be09e59d83452034f03f993670c8dcd97e3cda Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Wed, 20 Aug 2025 16:33:55 +0200 Subject: [PATCH 15/23] remove delta-polling --- UnleashClient/__init__.py | 19 +---- UnleashClient/api/delta.py | 70 ------------------- UnleashClient/constants.py | 1 - UnleashClient/periodic_tasks/__init__.py | 1 - .../periodic_tasks/fetch_and_apply_delta.py | 63 ----------------- 5 files changed, 3 insertions(+), 151 deletions(-) delete mode 100644 UnleashClient/api/delta.py delete mode 100644 UnleashClient/periodic_tasks/fetch_and_apply_delta.py diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index 437e0719..a854530b 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -34,7 +34,6 @@ from UnleashClient.loader import load_features from UnleashClient.periodic_tasks import ( aggregate_and_send_metrics, - fetch_and_apply_delta, fetch_and_load_features, ) from UnleashClient.streaming.manager import StreamingManager @@ -327,17 +326,12 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: if self.experimental_mode else None ) - format_mode = ( - (self.experimental_mode or {}).get("format") - if self.experimental_mode - else None - ) if mode != "streaming": if fetch_toggles: # MODE: polling - base_polling_args = { + job_args = { **base_job_args, "url": self.unleash_url, "app_name": self.unleash_app_name, @@ -350,18 +344,11 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: "request_timeout": self.unleash_request_timeout, "request_retries": self.unleash_request_retries, "event_callback": self.unleash_event_callback, + "project": self.unleash_project_name, } job_func: Callable - if format_mode == "delta": - job_args = base_polling_args - job_func = fetch_and_apply_delta - else: - job_args = { - **base_polling_args, - "project": self.unleash_project_name, - } - job_func = fetch_and_load_features + job_func = fetch_and_load_features else: # MODE: offline diff --git a/UnleashClient/api/delta.py b/UnleashClient/api/delta.py deleted file mode 100644 index d4683305..00000000 --- a/UnleashClient/api/delta.py +++ /dev/null @@ -1,70 +0,0 @@ -from typing import Optional, Tuple - -import requests -from requests.adapters import HTTPAdapter -from urllib3 import Retry - -from UnleashClient.constants import DELTA_URL -from UnleashClient.utils import LOGGER, log_resp_info - - -# pylint: disable=broad-except -def get_feature_deltas( - url: str, - app_name: str, - instance_id: str, - headers: dict, - custom_options: dict, - request_timeout: int, - request_retries: int, - cached_etag: str = "", -) -> Tuple[Optional[str], str]: - """ - Retrieves feature deltas from the Unleash server. - - Returns a tuple of (raw_json_string_or_None, etag). - """ - try: - LOGGER.info("Getting feature deltas.") - - request_specific_headers = { - "UNLEASH-APPNAME": app_name, - "UNLEASH-INSTANCEID": instance_id, - } - - if cached_etag: - request_specific_headers["If-None-Match"] = cached_etag - - base_url = f"{url}{DELTA_URL}" - - adapter = HTTPAdapter( - max_retries=Retry(total=request_retries, status_forcelist=[500, 502, 504]) - ) - with requests.Session() as session: - session.mount("https://", adapter) - session.mount("http://", adapter) - resp = session.get( - base_url, - headers={**headers, **request_specific_headers}, - timeout=request_timeout, - **custom_options, - ) - - if resp.status_code not in [200, 304]: - log_resp_info(resp) - LOGGER.warning( - "Unleash Client delta fetch failed due to unexpected HTTP status code: %s", - resp.status_code, - ) - raise Exception("Unleash Client delta fetch failed!") - - etag = resp.headers.get("etag", "") - - if resp.status_code == 304: - return None, etag - - return resp.text, etag - except Exception as exc: - LOGGER.exception("Unleash Client delta fetch failed due to exception: %s", exc) - - return None, "" diff --git a/UnleashClient/constants.py b/UnleashClient/constants.py index c34bfb4f..c9c06e6d 100644 --- a/UnleashClient/constants.py +++ b/UnleashClient/constants.py @@ -19,7 +19,6 @@ REGISTER_URL = "/client/register" FEATURES_URL = "/client/features" METRICS_URL = "/client/metrics" -DELTA_URL = "/client/delta" STREAMING_URL = "/client/streaming" # Cache keys diff --git a/UnleashClient/periodic_tasks/__init__.py b/UnleashClient/periodic_tasks/__init__.py index 24a97d35..a4391c50 100644 --- a/UnleashClient/periodic_tasks/__init__.py +++ b/UnleashClient/periodic_tasks/__init__.py @@ -1,4 +1,3 @@ # ruff: noqa: F401 -from .fetch_and_apply_delta import fetch_and_apply_delta from .fetch_and_load import fetch_and_load_features from .send_metrics import aggregate_and_send_metrics diff --git a/UnleashClient/periodic_tasks/fetch_and_apply_delta.py b/UnleashClient/periodic_tasks/fetch_and_apply_delta.py deleted file mode 100644 index f2171430..00000000 --- a/UnleashClient/periodic_tasks/fetch_and_apply_delta.py +++ /dev/null @@ -1,63 +0,0 @@ -import uuid -from typing import Callable, Optional - -from yggdrasil_engine.engine import UnleashEngine - -from UnleashClient.api.delta import get_feature_deltas -from UnleashClient.cache import BaseCache -from UnleashClient.constants import ETAG -from UnleashClient.events import UnleashEventType, UnleashFetchedEvent -from UnleashClient.utils import LOGGER - - -def fetch_and_apply_delta( - url: str, - app_name: str, - instance_id: str, - headers: dict, - custom_options: dict, - cache: BaseCache, - request_timeout: int, - request_retries: int, - engine: UnleashEngine, - event_callback: Optional[Callable] = None, - ready_callback: Optional[Callable] = None, -) -> None: - """ - Fetch delta payload and apply to engine with engine.take_state(raw_json). - Fires READY on first hydration (hydration event inside delta stream) and FETCHED on each successful delta. - """ - (delta_payload, etag) = get_feature_deltas( - url, - app_name, - instance_id, - headers, - custom_options, - request_timeout, - request_retries, - cache.get(ETAG), - ) - - if etag: - cache.set(ETAG, etag) - - if not delta_payload: - LOGGER.debug("No delta returned from server, nothing to apply.") - return - - try: - engine.take_state(delta_payload) - - if event_callback: - event = UnleashFetchedEvent( - event_type=UnleashEventType.FETCHED, - event_id=uuid.uuid4(), - raw_features=delta_payload, - ) - event_callback(event) - - # First hydration event as ready signal - if ready_callback and '"type":"hydration"' in delta_payload: - ready_callback() - except Exception as exc: - LOGGER.warning("Failed to apply delta: %s", exc) From 2072ab43030ab01ce159f981e559cbbe72ec78c8 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Thu, 21 Aug 2025 10:21:26 +0200 Subject: [PATCH 16/23] refactor: replace StreamingManager with StreamingConnector and StreamingEventProcessor --- UnleashClient/__init__.py | 15 +-- UnleashClient/streaming/__init__.py | 3 + UnleashClient/streaming/connector.py | 130 ++++++++++++++++++++ UnleashClient/streaming/event_processor.py | 61 ++++++++++ UnleashClient/streaming/manager.py | 135 --------------------- 5 files changed, 202 insertions(+), 142 deletions(-) create mode 100644 UnleashClient/streaming/connector.py create mode 100644 UnleashClient/streaming/event_processor.py delete mode 100644 UnleashClient/streaming/manager.py diff --git a/UnleashClient/__init__.py b/UnleashClient/__init__.py index a854530b..80573c47 100644 --- a/UnleashClient/__init__.py +++ b/UnleashClient/__init__.py @@ -36,7 +36,7 @@ aggregate_and_send_metrics, fetch_and_load_features, ) -from UnleashClient.streaming.manager import StreamingManager +from UnleashClient.streaming import StreamingConnector, StreamingEventProcessor from .cache import BaseCache, FileCache from .utils import LOGGER, InstanceAllowType, InstanceCounter @@ -174,7 +174,7 @@ def __init__( # noqa: PLR0915 self.unleash_event_callback = event_callback self._ready_callback = build_ready_callback(event_callback) self.experimental_mode = experimental_mode - self._stream_manager: Optional[StreamingManager] = None + self._streaming_connector: Optional[StreamingConnector] = None self._sse_client_factory = sse_client_factory self._do_instance_check(multiple_instance_mode) @@ -367,16 +367,17 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: ) else: # MODE: streaming - self._stream_manager = StreamingManager( + processor = StreamingEventProcessor(self.engine) + self._streaming_connector = StreamingConnector( url=self.unleash_url, headers=base_headers, request_timeout=self.unleash_request_timeout, - engine=self.engine, + event_processor=processor, on_ready=self._ready_callback, sse_client_factory=self._sse_client_factory, custom_options=self.unleash_custom_options, ) - self._stream_manager.start() + self._streaming_connector.start() if not self.unleash_disable_metrics: self.metric_job = self.unleash_scheduler.add_job( @@ -449,9 +450,9 @@ def destroy(self) -> None: request_timeout=self.unleash_request_timeout, engine=self.engine, ) - if self._stream_manager: + if self._streaming_connector: try: - self._stream_manager.stop() + self._streaming_connector.stop() except Exception: pass self.unleash_scheduler.shutdown() diff --git a/UnleashClient/streaming/__init__.py b/UnleashClient/streaming/__init__.py index 8ea075cd..ef464852 100644 --- a/UnleashClient/streaming/__init__.py +++ b/UnleashClient/streaming/__init__.py @@ -1,2 +1,5 @@ # ruff: noqa: F401 # Streaming package + +from .connector import StreamingConnector +from .event_processor import StreamingEventProcessor diff --git a/UnleashClient/streaming/connector.py b/UnleashClient/streaming/connector.py new file mode 100644 index 00000000..d7c6512a --- /dev/null +++ b/UnleashClient/streaming/connector.py @@ -0,0 +1,130 @@ +import threading +import time +from typing import Callable, Optional + +from ld_eventsource import SSEClient +from ld_eventsource.config import ConnectStrategy + +from UnleashClient.constants import STREAMING_URL +from UnleashClient.utils import LOGGER + +from .event_processor import StreamingEventProcessor + + +class StreamingConnector: + """ + Manages the SSE connection lifecycle with reconnect/backoff and delegates + event handling to an injected StreamingEventProcessor. + """ + + def __init__( + self, + url: str, + headers: dict, + request_timeout: int, + event_processor: StreamingEventProcessor, + on_ready: Optional[Callable[[], None]] = None, + sse_client_factory: Optional[Callable[[str, dict, int], SSEClient]] = None, + heartbeat_timeout: int = 60, + backoff_initial: float = 2.0, + custom_options: Optional[dict] = None, + ) -> None: + self._base_url = url.rstrip("/") + STREAMING_URL + self._headers = {**headers, "Accept": "text/event-stream"} + self._timeout = request_timeout + self._on_ready = on_ready + self._sse_factory = sse_client_factory + self._hb_timeout = heartbeat_timeout + self._backoff_initial = backoff_initial + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + self._lock = threading.Lock() + self._processor = event_processor + base_options = custom_options or {} + if self._timeout is not None and "timeout" not in base_options: + base_options = {"timeout": self._timeout, **base_options} + self._custom_options = base_options + + def start(self): + if self._thread and self._thread.is_alive(): + return + self._stop.clear() + self._thread = threading.Thread( + target=self._run, name="UnleashStreaming", daemon=True + ) + self._thread.start() + + def stop(self): + self._stop.set() + if self._thread: + self._thread.join(timeout=5) + + def _run(self): # noqa: PLR0912 + while not self._stop.is_set(): + client: Optional[SSEClient] = None + try: + LOGGER.info( + "Connecting to Unleash streaming endpoint: %s", self._base_url + ) + + if self._sse_factory: + client = self._sse_factory( + self._base_url, self._headers, self._timeout + ) + else: + connect_strategy = ConnectStrategy.http( + self._base_url, + headers=self._headers, + urllib3_request_options=self._custom_options, + ) + client = SSEClient( + connect=connect_strategy, + initial_retry_delay=self._backoff_initial, + logger=LOGGER, + ) + + last_event_time = time.time() + + for event in client.events: + if self._stop.is_set(): + client.close() + return + if not event.event: + continue + + last_event_time = time.time() + + # Delegate event processing + self._processor.process(event) + if event.event == "unleash-connected" and self._processor.hydrated: + if self._on_ready: + try: + self._on_ready() + except Exception as cb_exc: # noqa: BLE001 + LOGGER.debug("Ready callback error: %s", cb_exc) + + # Heartbeat timeout - reconnect manually + if self._hb_timeout and ( + time.time() - last_event_time > self._hb_timeout + ): + LOGGER.warning("Heartbeat timeout exceeded; reconnecting") + try: + client.interrupt() + except Exception: # noqa: BLE001 + # If interrupt fails, close will end the loop + client.close() + break + except Exception as exc: # noqa: BLE001 + LOGGER.warning("Streaming error (will retry): %s", exc) + finally: + try: + if client is not None: + client.close() + except Exception: # noqa: BLE001 + pass + + if self._stop.is_set(): + break + + # On catastrophic failure - delay before attempting to recreate SSEClient + time.sleep(1.0) diff --git a/UnleashClient/streaming/event_processor.py b/UnleashClient/streaming/event_processor.py new file mode 100644 index 00000000..29bb4cf0 --- /dev/null +++ b/UnleashClient/streaming/event_processor.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from threading import Lock +from typing import Any + +from yggdrasil_engine.engine import UnleashEngine + +from UnleashClient.utils import LOGGER + + +class StreamingEventProcessor: + """ + Processes SSE events from the Unleash streaming endpoint and applies + resulting deltas/state to the provided engine in a thread-safe manner. + + This class is deliberately unaware of connection/reconnect concerns; it + only deals with event semantics. + """ + + def __init__(self, engine: UnleashEngine) -> None: + self._engine = engine + self._lock = Lock() + self._hydrated = False + + @property + def hydrated(self) -> bool: + return self._hydrated + + def process(self, event: Any) -> None: + """ + Handle a single SSE event object. The object is expected to have + attributes `event` (type) and `data` (payload string or dict). + """ + try: + etype = getattr(event, "event", None) + if not etype: + return + + if etype == "unleash-connected": + self._handle_connected(event) + elif etype == "unleash-updated": + self._handle_updated(event) + else: + LOGGER.debug("Ignoring SSE event type: %s", etype) + except Exception as exc: # noqa: BLE001 + LOGGER.warning("Error processing SSE event: %s", exc) + + def _apply_delta(self, event_data: Any) -> None: + if not event_data: + return + with self._lock: + self._engine.take_state(event_data) + + def _handle_connected(self, event: Any) -> None: + LOGGER.debug("Processing initial hydration data") + self._apply_delta(getattr(event, "data", None)) + if not self._hydrated: + self._hydrated = True + + def _handle_updated(self, event: Any) -> None: + self._apply_delta(getattr(event, "data", None)) diff --git a/UnleashClient/streaming/manager.py b/UnleashClient/streaming/manager.py deleted file mode 100644 index 160208c3..00000000 --- a/UnleashClient/streaming/manager.py +++ /dev/null @@ -1,135 +0,0 @@ -import threading -import time -from typing import Callable, Optional - -from ld_eventsource import SSEClient -from ld_eventsource.config import ConnectStrategy -from yggdrasil_engine.engine import UnleashEngine - -from UnleashClient.constants import STREAMING_URL -from UnleashClient.utils import LOGGER - - -class StreamingManager: - """ - Simple SSE streaming manager with reconnect and backoff. - Applies deltas to the engine by calling engine.take_state(raw_json). - """ - - def __init__( - self, - url: str, - headers: dict, - request_timeout: int, - engine: UnleashEngine, - on_ready: Optional[Callable[[], None]] = None, - sse_client_factory: Optional[Callable[[str, dict, int], SSEClient]] = None, - heartbeat_timeout: int = 60, - backoff_initial: float = 2.0, - backoff_max: float = 30.0, - backoff_jitter: float = 0.5, - custom_options: Optional[dict] = None, - ) -> None: - self._base_url = url.rstrip("/") + STREAMING_URL - self._headers = {**headers, "Accept": "text/event-stream"} - self._timeout = request_timeout - self._engine = engine - self._on_ready = on_ready - self._sse_factory = sse_client_factory - self._hb_timeout = heartbeat_timeout - self._backoff_initial = backoff_initial - self._backoff_max = backoff_max - self._backoff_jitter = backoff_jitter - self._stop = threading.Event() - self._thread: Optional[threading.Thread] = None - self._lock = threading.Lock() - self._hydrated = False - # Ensure urllib3 timeout is honored by ld_eventsource - base_options = custom_options or {} - if self._timeout is not None and "timeout" not in base_options: - base_options = {"timeout": self._timeout, **base_options} - self._custom_options = base_options - - def start(self): - if self._thread and self._thread.is_alive(): - return - self._stop.clear() - self._thread = threading.Thread( - target=self._run, name="UnleashStreaming", daemon=True - ) - self._thread.start() - - def stop(self): - self._stop.set() - if self._thread: - self._thread.join(timeout=5) - - def _run(self): # noqa: PLR0912 - client: Optional[SSEClient] = None - try: - LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url) - - if self._sse_factory: - client = self._sse_factory(self._base_url, self._headers, self._timeout) - else: - connect_strategy = ConnectStrategy.http( - self._base_url, - headers=self._headers, - urllib3_request_options=self._custom_options, - ) - client = SSEClient( - connect=connect_strategy, - initial_retry_delay=self._backoff_initial, - logger=LOGGER, - ) - - last_event_time = time.time() - - # Iterate over events; SSEClient handles reconnects with internal backoff/jitter - for event in client.events: - if self._stop.is_set(): - client.close() - break - if not event.event: - continue - - last_event_time = time.time() - - if event.event in ("unleash-connected", "unleash-updated"): - try: - data = event.data - if not data: - continue - with self._lock: # Apply under lock - self._engine.take_state(data) - if event.event == "unleash-connected" and not self._hydrated: - self._hydrated = True - if self._on_ready: - try: - self._on_ready() - except Exception as cb_exc: # noqa: BLE001 - LOGGER.debug("Ready callback error: %s", cb_exc) - except Exception as exc: # noqa: BLE001 - LOGGER.warning("Error processing SSE event: %s", exc) - else: - LOGGER.debug("Ignoring SSE event type: %s", event.event) - - # Heartbeat timeout: trigger reconnect via SSEClient interrupt (uses internal retry) - if self._hb_timeout and ( - time.time() - last_event_time > self._hb_timeout - ): - LOGGER.warning("Heartbeat timeout exceeded; reconnecting") - try: - client.interrupt() - except Exception: # noqa: BLE001 - # If interrupt fails, close will end the loop; SSEClient will not retry when closed - client.close() - break - except Exception as exc: # noqa: BLE001 - LOGGER.warning("Streaming error: %s", exc) - finally: - try: - if client is not None: - client.close() - except Exception: # noqa: BLE001 - pass From d6b02932b16cd9402812a645d77e8c78dc8573ed Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Thu, 21 Aug 2025 11:14:55 +0200 Subject: [PATCH 17/23] fix: add retry strategies and error handling --- UnleashClient/streaming/connector.py | 139 +++++++++++++++------------ 1 file changed, 76 insertions(+), 63 deletions(-) diff --git a/UnleashClient/streaming/connector.py b/UnleashClient/streaming/connector.py index d7c6512a..7618f22c 100644 --- a/UnleashClient/streaming/connector.py +++ b/UnleashClient/streaming/connector.py @@ -3,7 +3,7 @@ from typing import Callable, Optional from ld_eventsource import SSEClient -from ld_eventsource.config import ConnectStrategy +from ld_eventsource.config import ConnectStrategy, RetryDelayStrategy, ErrorStrategy from UnleashClient.constants import STREAMING_URL from UnleashClient.utils import LOGGER @@ -27,6 +27,9 @@ def __init__( sse_client_factory: Optional[Callable[[str, dict, int], SSEClient]] = None, heartbeat_timeout: int = 60, backoff_initial: float = 2.0, + backoff_max: float = 30.0, + backoff_multiplier: float = 2.0, + backoff_jitter: Optional[float] = 0.5, custom_options: Optional[dict] = None, ) -> None: self._base_url = url.rstrip("/") + STREAMING_URL @@ -36,6 +39,9 @@ def __init__( self._sse_factory = sse_client_factory self._hb_timeout = heartbeat_timeout self._backoff_initial = backoff_initial + self._backoff_max = backoff_max + self._backoff_multiplier = backoff_multiplier + self._backoff_jitter = backoff_jitter self._stop = threading.Event() self._thread: Optional[threading.Thread] = None self._lock = threading.Lock() @@ -60,71 +66,78 @@ def stop(self): self._thread.join(timeout=5) def _run(self): # noqa: PLR0912 - while not self._stop.is_set(): - client: Optional[SSEClient] = None - try: - LOGGER.info( - "Connecting to Unleash streaming endpoint: %s", self._base_url + """ + Main streaming loop. Creates SSEClient once and lets it handle retries internally. + Only recreates client if there's a catastrophic failure. + """ + client: Optional[SSEClient] = None + + try: + LOGGER.info( + "Connecting to Unleash streaming endpoint: %s", self._base_url + ) + + if self._sse_factory: + client = self._sse_factory( + self._base_url, self._headers, self._timeout + ) + else: + connect_strategy = ConnectStrategy.http( + self._base_url, + headers=self._headers, + urllib3_request_options=self._custom_options, ) - if self._sse_factory: - client = self._sse_factory( - self._base_url, self._headers, self._timeout - ) - else: - connect_strategy = ConnectStrategy.http( - self._base_url, - headers=self._headers, - urllib3_request_options=self._custom_options, - ) - client = SSEClient( - connect=connect_strategy, - initial_retry_delay=self._backoff_initial, - logger=LOGGER, - ) + retry_strategy = RetryDelayStrategy.default( + max_delay=self._backoff_max, + backoff_multiplier=self._backoff_multiplier, + jitter_multiplier=self._backoff_jitter, + ) + + client = SSEClient( + connect=connect_strategy, + initial_retry_delay=self._backoff_initial, + retry_delay_strategy=retry_strategy, + retry_delay_reset_threshold=60.0, + error_strategy=ErrorStrategy.always_continue(), + logger=LOGGER, + ) + + last_event_time = time.time() + + for event in client.events: + if self._stop.is_set(): + break + if not event.event: + continue last_event_time = time.time() - for event in client.events: - if self._stop.is_set(): - client.close() - return - if not event.event: - continue - - last_event_time = time.time() - - # Delegate event processing - self._processor.process(event) - if event.event == "unleash-connected" and self._processor.hydrated: - if self._on_ready: - try: - self._on_ready() - except Exception as cb_exc: # noqa: BLE001 - LOGGER.debug("Ready callback error: %s", cb_exc) - - # Heartbeat timeout - reconnect manually - if self._hb_timeout and ( - time.time() - last_event_time > self._hb_timeout - ): - LOGGER.warning("Heartbeat timeout exceeded; reconnecting") + self._processor.process(event) + if event.event == "unleash-connected" and self._processor.hydrated: + if self._on_ready: try: - client.interrupt() - except Exception: # noqa: BLE001 - # If interrupt fails, close will end the loop - client.close() - break - except Exception as exc: # noqa: BLE001 - LOGGER.warning("Streaming error (will retry): %s", exc) - finally: - try: - if client is not None: - client.close() - except Exception: # noqa: BLE001 - pass - - if self._stop.is_set(): - break - - # On catastrophic failure - delay before attempting to recreate SSEClient - time.sleep(1.0) + self._on_ready() + except Exception as cb_exc: # noqa: BLE001 + LOGGER.debug("Ready callback error: %s", cb_exc) + + if self._hb_timeout and ( + time.time() - last_event_time > self._hb_timeout + ): + LOGGER.warning("Heartbeat timeout exceeded; reconnecting") + try: + client.interrupt() # Don't break, rely on SSE client retry + except Exception: # noqa: BLE001 + break + + LOGGER.debug("SSE stream ended") + + except Exception as exc: # noqa: BLE001 + LOGGER.warning("Streaming connection failed: %s", exc) + + finally: + try: + if client is not None: + client.close() + except Exception: # noqa: BLE001 + pass From 2beb5954936b6651a101f30cb4a735680f64ec2c Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 21 Aug 2025 09:17:03 +0000 Subject: [PATCH 18/23] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- UnleashClient/streaming/connector.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/UnleashClient/streaming/connector.py b/UnleashClient/streaming/connector.py index 7618f22c..7eae65cb 100644 --- a/UnleashClient/streaming/connector.py +++ b/UnleashClient/streaming/connector.py @@ -3,7 +3,7 @@ from typing import Callable, Optional from ld_eventsource import SSEClient -from ld_eventsource.config import ConnectStrategy, RetryDelayStrategy, ErrorStrategy +from ld_eventsource.config import ConnectStrategy, ErrorStrategy, RetryDelayStrategy from UnleashClient.constants import STREAMING_URL from UnleashClient.utils import LOGGER @@ -71,16 +71,12 @@ def _run(self): # noqa: PLR0912 Only recreates client if there's a catastrophic failure. """ client: Optional[SSEClient] = None - + try: - LOGGER.info( - "Connecting to Unleash streaming endpoint: %s", self._base_url - ) + LOGGER.info("Connecting to Unleash streaming endpoint: %s", self._base_url) if self._sse_factory: - client = self._sse_factory( - self._base_url, self._headers, self._timeout - ) + client = self._sse_factory(self._base_url, self._headers, self._timeout) else: connect_strategy = ConnectStrategy.http( self._base_url, @@ -93,7 +89,7 @@ def _run(self): # noqa: PLR0912 backoff_multiplier=self._backoff_multiplier, jitter_multiplier=self._backoff_jitter, ) - + client = SSEClient( connect=connect_strategy, initial_retry_delay=self._backoff_initial, @@ -134,7 +130,7 @@ def _run(self): # noqa: PLR0912 except Exception as exc: # noqa: BLE001 LOGGER.warning("Streaming connection failed: %s", exc) - + finally: try: if client is not None: From 3e4500b254fdacd820d2a55ac1a4582105d07aad Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Thu, 21 Aug 2025 14:50:04 +0200 Subject: [PATCH 19/23] add tests --- .../integration_tests/integration_unleash.py | 35 +++++ .../integration_unleashheroku.py | 29 ----- .../streaming/test_event_processor.py | 59 +++++++++ .../streaming/test_streaming_connector.py | 123 ++++++++++++++++++ 4 files changed, 217 insertions(+), 29 deletions(-) create mode 100644 tests/integration_tests/integration_unleash.py delete mode 100644 tests/integration_tests/integration_unleashheroku.py create mode 100644 tests/unit_tests/streaming/test_event_processor.py create mode 100644 tests/unit_tests/streaming/test_streaming_connector.py diff --git a/tests/integration_tests/integration_unleash.py b/tests/integration_tests/integration_unleash.py new file mode 100644 index 00000000..ada75eda --- /dev/null +++ b/tests/integration_tests/integration_unleash.py @@ -0,0 +1,35 @@ +# --- +import os +import logging +import sys +import time + +from UnleashClient import UnleashClient + +root = logging.getLogger() +root.setLevel(logging.DEBUG) + +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") +handler.setFormatter(formatter) +root.addHandler(handler) +# --- + +api_url = os.getenv('UNLEASH_API_URL', 'https://app.unleash-hosted.com/demo/api') +api_token = os.getenv('UNLEASH_API_TOKEN', 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5') +flag = "example-flag" +use_streaming = os.getenv("USE_STREAMING", "true").lower() == "true" + +client = UnleashClient( + url=api_url, + app_name="integration-python", + custom_headers={'Authorization': api_token}, + experimental_mode={"type": "streaming"} if use_streaming else None, + metrics_interval=1) + +client.initialize_client() + +while True: + print(f"'{flag}' is enabled: {client.is_enabled(flag)}") + time.sleep(3) diff --git a/tests/integration_tests/integration_unleashheroku.py b/tests/integration_tests/integration_unleashheroku.py deleted file mode 100644 index 8c68827b..00000000 --- a/tests/integration_tests/integration_unleashheroku.py +++ /dev/null @@ -1,29 +0,0 @@ -# --- -import logging -import sys -import time - -from UnleashClient import UnleashClient - -root = logging.getLogger() -root.setLevel(logging.DEBUG) - -handler = logging.StreamHandler(sys.stdout) -handler.setLevel(logging.DEBUG) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -handler.setFormatter(formatter) -root.addHandler(handler) -# --- - -my_client = UnleashClient( - url="https://unleash.herokuapp.com/api", - environment="staging", - app_name="pyIvan", -) - -my_client.initialize_client() - -while True: - time.sleep(10) - context = {"userId": "1", "sound": "woof"} - print(f"ivantest: {my_client.is_enabled('ivantest', context)}") diff --git a/tests/unit_tests/streaming/test_event_processor.py b/tests/unit_tests/streaming/test_event_processor.py new file mode 100644 index 00000000..24622a5a --- /dev/null +++ b/tests/unit_tests/streaming/test_event_processor.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import json + +import pytest + +from UnleashClient.streaming.event_processor import StreamingEventProcessor + + +class FakeEngine: + def __init__(self): + self.states = [] + + def take_state(self, state): + self.states.append(state) + + +class FakeEvent: + def __init__(self, event: str, data): + self.event = event + self.data = data + + +def test_processor_hydrates_on_connected(): + engine = FakeEngine() + processor = StreamingEventProcessor(engine) + + assert processor.hydrated is False + + payload = {"version": 1, "features": [], "segments": []} + processor.process(FakeEvent("unleash-connected", payload)) + + assert processor.hydrated is True + assert engine.states == [payload] + + +def test_processor_applies_updates(): + engine = FakeEngine() + processor = StreamingEventProcessor(engine) + + connected_payload = {"version": 1, "features": ["f1"], "segments": []} + update_payload = {"version": 2, "features": ["f1", "f2"], "segments": []} + + processor.process(FakeEvent("unleash-connected", connected_payload)) + processor.process(FakeEvent("unleash-updated", update_payload)) + + assert processor.hydrated is True + assert engine.states == [connected_payload, update_payload] + + +def test_processor_ignores_unknown_event_types(): + engine = FakeEngine() + processor = StreamingEventProcessor(engine) + + processor.process(FakeEvent("heartbeat", {})) + processor.process(FakeEvent("message", {})) + + # No states should be applied + assert engine.states == [] diff --git a/tests/unit_tests/streaming/test_streaming_connector.py b/tests/unit_tests/streaming/test_streaming_connector.py new file mode 100644 index 00000000..17fb326f --- /dev/null +++ b/tests/unit_tests/streaming/test_streaming_connector.py @@ -0,0 +1,123 @@ +from __future__ import annotations + +import json +import threading +import time +from typing import Iterable +from unittest.mock import MagicMock, patch + +import pytest + +from UnleashClient.streaming import StreamingConnector +from UnleashClient.streaming.event_processor import StreamingEventProcessor + + +class FakeEngine: + def __init__(self): + self.states = [] + + def take_state(self, state): + self.states.append(state) + + +class FakeEvent: + def __init__(self, event: str, data): + self.event = event + self.data = data + + +class FiniteSSEClient: + """SSE client that yields given events then stops.""" + + def __init__(self, events: Iterable[FakeEvent]): + self._events = list(events) + self.closed = False + + @property + def events(self): + for e in self._events: + if self.closed: + break + yield e + + def close(self): + self.closed = True + + def interrupt(self): + self.close() + + +class FailingSSEClient: + """SSE client that fails immediately when iterating events.""" + + def __init__(self): + self.closed = False + + @property + def events(self): + raise ConnectionError("Simulated connection failure") + + def close(self): + self.closed = True + + def interrupt(self): + self.close() + +def test_successful_connection_calls_ready(): + engine = FakeEngine() + processor = StreamingEventProcessor(engine) + + ready_calls = {"n": 0} + + def on_ready(): + ready_calls["n"] += 1 + + events = [ + FakeEvent("unleash-connected", {"version": 1, "features": [], "segments": []}), + FakeEvent("unleash-updated", {"version": 2, "features": [], "segments": []}), + ] + + controller = StreamingConnector( + url="http://unleash.example", + headers={}, + request_timeout=5, + event_processor=processor, + on_ready=on_ready, + sse_client_factory=lambda url, headers, timeout: FiniteSSEClient(events), + ) + + th = threading.Thread(target=controller._run, daemon=True) + th.start() + time.sleep(0.05) + controller.stop() + th.join(timeout=1) + + assert engine.states # at least one state applied + assert ready_calls["n"] == 1 + + +def test_connection_failures_trigger_retries(): + engine = FakeEngine() + processor = StreamingEventProcessor(engine) + + attempts = [] + + def factory(url, headers, timeout): + attempts.append(time.time()) + return FailingSSEClient() + + controller = StreamingConnector( + url="http://unleash.example", + headers={}, + request_timeout=5, + event_processor=processor, + sse_client_factory=factory, + ) + + th = threading.Thread(target=controller._run, daemon=True) + th.start() + time.sleep(1.5) + controller.stop() + th.join(timeout=1) + + assert len(attempts) >= 1 From a76d913a26f6acf9a95dfe49da942c4a12c88f90 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Thu, 21 Aug 2025 14:55:24 +0200 Subject: [PATCH 20/23] lint --- tests/integration_tests/integration_unleash.py | 14 +++++++++----- tests/unit_tests/streaming/test_event_processor.py | 4 ---- .../streaming/test_streaming_connector.py | 5 +---- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/tests/integration_tests/integration_unleash.py b/tests/integration_tests/integration_unleash.py index ada75eda..48d5da36 100644 --- a/tests/integration_tests/integration_unleash.py +++ b/tests/integration_tests/integration_unleash.py @@ -1,6 +1,6 @@ # --- -import os import logging +import os import sys import time @@ -16,17 +16,21 @@ root.addHandler(handler) # --- -api_url = os.getenv('UNLEASH_API_URL', 'https://app.unleash-hosted.com/demo/api') -api_token = os.getenv('UNLEASH_API_TOKEN', 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5') +api_url = os.getenv("UNLEASH_API_URL", "https://app.unleash-hosted.com/demo/api") +api_token = os.getenv( + "UNLEASH_API_TOKEN", + "demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5", +) flag = "example-flag" use_streaming = os.getenv("USE_STREAMING", "true").lower() == "true" client = UnleashClient( url=api_url, app_name="integration-python", - custom_headers={'Authorization': api_token}, + custom_headers={"Authorization": api_token}, experimental_mode={"type": "streaming"} if use_streaming else None, - metrics_interval=1) + metrics_interval=1, +) client.initialize_client() diff --git a/tests/unit_tests/streaming/test_event_processor.py b/tests/unit_tests/streaming/test_event_processor.py index 24622a5a..f0eca6ba 100644 --- a/tests/unit_tests/streaming/test_event_processor.py +++ b/tests/unit_tests/streaming/test_event_processor.py @@ -1,9 +1,5 @@ from __future__ import annotations -import json - -import pytest - from UnleashClient.streaming.event_processor import StreamingEventProcessor diff --git a/tests/unit_tests/streaming/test_streaming_connector.py b/tests/unit_tests/streaming/test_streaming_connector.py index 17fb326f..6646a7cd 100644 --- a/tests/unit_tests/streaming/test_streaming_connector.py +++ b/tests/unit_tests/streaming/test_streaming_connector.py @@ -1,12 +1,8 @@ from __future__ import annotations -import json import threading import time from typing import Iterable -from unittest.mock import MagicMock, patch - -import pytest from UnleashClient.streaming import StreamingConnector from UnleashClient.streaming.event_processor import StreamingEventProcessor @@ -63,6 +59,7 @@ def close(self): def interrupt(self): self.close() + def test_successful_connection_calls_ready(): engine = FakeEngine() processor = StreamingEventProcessor(engine) From 3cf022293f5ecd5634e02711f106a5b44d0f995d Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Thu, 21 Aug 2025 16:24:08 +0200 Subject: [PATCH 21/23] refactor tests --- .../streaming/test_client_streaming.py | 122 ++++++++++++++++++ .../streaming/test_streaming_connector.py | 120 ----------------- 2 files changed, 122 insertions(+), 120 deletions(-) create mode 100644 tests/unit_tests/streaming/test_client_streaming.py delete mode 100644 tests/unit_tests/streaming/test_streaming_connector.py diff --git a/tests/unit_tests/streaming/test_client_streaming.py b/tests/unit_tests/streaming/test_client_streaming.py new file mode 100644 index 00000000..7b718cdc --- /dev/null +++ b/tests/unit_tests/streaming/test_client_streaming.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import json +import time + +import pytest + +from UnleashClient import INSTANCES, UnleashClient + + +@pytest.fixture(autouse=True) +def reset_instances(): + INSTANCES._reset() + + +class FakeEvent: + def __init__(self, event: str, data): + self.event = event + self.data = data + + +class FiniteSSEClient: + """SSE client that yields given events then stops.""" + + def __init__(self, events): + self._events = list(events) + self.closed = False + + @property + def events(self): + for e in self._events: + if self.closed: + break + yield e + + def close(self): + self.closed = True + + def interrupt(self): + self.close() + + +def _state_with_feature(name: str) -> str: + payload = { + "version": 1, + "features": [ + {"name": name, "enabled": True, "strategies": [{"name": "default"}]} + ], + "segments": [], + } + return json.dumps(payload) + + +def test_streaming_processes_unleash_connected_event(): + captured = {} + + def factory(url, headers, timeout): + captured["url"] = url + captured["headers"] = headers + return FiniteSSEClient( + [FakeEvent("unleash-connected", _state_with_feature("test-feature"))] + ) + + client = UnleashClient( + url="http://unleash.example", + app_name="my-test-app", + instance_id="rspec/test", + disable_metrics=True, + disable_registration=True, + custom_headers={"X-API-KEY": "123"}, + experimental_mode={"type": "streaming"}, + sse_client_factory=factory, + ) + + try: + client.initialize_client() + time.sleep(0.05) + + assert captured["url"].endswith("/client/streaming") + assert captured["headers"]["X-API-KEY"] == "123" + + assert client.is_enabled("test-feature") is True + finally: + client.destroy() + + +def test_streaming_processes_unleash_updated_event(): + captured = {} + + def factory(url, headers, timeout): + captured["url"] = url + captured["headers"] = headers + empty_state = json.dumps({"version": 1, "features": [], "segments": []}) + update_state = _state_with_feature("test-feature") + return FiniteSSEClient( + [ + FakeEvent("unleash-connected", empty_state), + FakeEvent("unleash-updated", update_state), + ] + ) + + client = UnleashClient( + url="http://unleash.example", + app_name="my-test-app", + instance_id="rspec/test", + disable_metrics=True, + disable_registration=True, + custom_headers={"X-API-KEY": "123"}, + experimental_mode={"type": "streaming"}, + sse_client_factory=factory, + ) + + try: + client.initialize_client() + time.sleep(0.05) + + assert captured["url"].endswith("/client/streaming") + assert captured["headers"]["X-API-KEY"] == "123" + + assert client.is_enabled("test-feature") is True + finally: + client.destroy() diff --git a/tests/unit_tests/streaming/test_streaming_connector.py b/tests/unit_tests/streaming/test_streaming_connector.py deleted file mode 100644 index 6646a7cd..00000000 --- a/tests/unit_tests/streaming/test_streaming_connector.py +++ /dev/null @@ -1,120 +0,0 @@ -from __future__ import annotations - -import threading -import time -from typing import Iterable - -from UnleashClient.streaming import StreamingConnector -from UnleashClient.streaming.event_processor import StreamingEventProcessor - - -class FakeEngine: - def __init__(self): - self.states = [] - - def take_state(self, state): - self.states.append(state) - - -class FakeEvent: - def __init__(self, event: str, data): - self.event = event - self.data = data - - -class FiniteSSEClient: - """SSE client that yields given events then stops.""" - - def __init__(self, events: Iterable[FakeEvent]): - self._events = list(events) - self.closed = False - - @property - def events(self): - for e in self._events: - if self.closed: - break - yield e - - def close(self): - self.closed = True - - def interrupt(self): - self.close() - - -class FailingSSEClient: - """SSE client that fails immediately when iterating events.""" - - def __init__(self): - self.closed = False - - @property - def events(self): - raise ConnectionError("Simulated connection failure") - - def close(self): - self.closed = True - - def interrupt(self): - self.close() - - -def test_successful_connection_calls_ready(): - engine = FakeEngine() - processor = StreamingEventProcessor(engine) - - ready_calls = {"n": 0} - - def on_ready(): - ready_calls["n"] += 1 - - events = [ - FakeEvent("unleash-connected", {"version": 1, "features": [], "segments": []}), - FakeEvent("unleash-updated", {"version": 2, "features": [], "segments": []}), - ] - - controller = StreamingConnector( - url="http://unleash.example", - headers={}, - request_timeout=5, - event_processor=processor, - on_ready=on_ready, - sse_client_factory=lambda url, headers, timeout: FiniteSSEClient(events), - ) - - th = threading.Thread(target=controller._run, daemon=True) - th.start() - time.sleep(0.05) - controller.stop() - th.join(timeout=1) - - assert engine.states # at least one state applied - assert ready_calls["n"] == 1 - - -def test_connection_failures_trigger_retries(): - engine = FakeEngine() - processor = StreamingEventProcessor(engine) - - attempts = [] - - def factory(url, headers, timeout): - attempts.append(time.time()) - return FailingSSEClient() - - controller = StreamingConnector( - url="http://unleash.example", - headers={}, - request_timeout=5, - event_processor=processor, - sse_client_factory=factory, - ) - - th = threading.Thread(target=controller._run, daemon=True) - th.start() - time.sleep(1.5) - controller.stop() - th.join(timeout=1) - - assert len(attempts) >= 1 From f626b579d855809284d354968b698630f8fb5e67 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Fri, 22 Aug 2025 11:38:21 +0200 Subject: [PATCH 22/23] remove heartbeat --- UnleashClient/streaming/connector.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/UnleashClient/streaming/connector.py b/UnleashClient/streaming/connector.py index 7eae65cb..eb200de9 100644 --- a/UnleashClient/streaming/connector.py +++ b/UnleashClient/streaming/connector.py @@ -1,5 +1,4 @@ import threading -import time from typing import Callable, Optional from ld_eventsource import SSEClient @@ -25,7 +24,6 @@ def __init__( event_processor: StreamingEventProcessor, on_ready: Optional[Callable[[], None]] = None, sse_client_factory: Optional[Callable[[str, dict, int], SSEClient]] = None, - heartbeat_timeout: int = 60, backoff_initial: float = 2.0, backoff_max: float = 30.0, backoff_multiplier: float = 2.0, @@ -37,7 +35,6 @@ def __init__( self._timeout = request_timeout self._on_ready = on_ready self._sse_factory = sse_client_factory - self._hb_timeout = heartbeat_timeout self._backoff_initial = backoff_initial self._backoff_max = backoff_max self._backoff_multiplier = backoff_multiplier @@ -99,16 +96,12 @@ def _run(self): # noqa: PLR0912 logger=LOGGER, ) - last_event_time = time.time() - for event in client.events: if self._stop.is_set(): break if not event.event: continue - last_event_time = time.time() - self._processor.process(event) if event.event == "unleash-connected" and self._processor.hydrated: if self._on_ready: @@ -117,15 +110,6 @@ def _run(self): # noqa: PLR0912 except Exception as cb_exc: # noqa: BLE001 LOGGER.debug("Ready callback error: %s", cb_exc) - if self._hb_timeout and ( - time.time() - last_event_time > self._hb_timeout - ): - LOGGER.warning("Heartbeat timeout exceeded; reconnecting") - try: - client.interrupt() # Don't break, rely on SSE client retry - except Exception: # noqa: BLE001 - break - LOGGER.debug("SSE stream ended") except Exception as exc: # noqa: BLE001 From 4fcfce9c9a0133c50a8f01ec9aed91d709583307 Mon Sep 17 00:00:00 2001 From: Tymoteusz Czech <2625371+Tymek@users.noreply.github.com> Date: Mon, 25 Aug 2025 12:07:41 +0200 Subject: [PATCH 23/23] add TODO for backup file in event processing --- UnleashClient/streaming/event_processor.py | 1 + 1 file changed, 1 insertion(+) diff --git a/UnleashClient/streaming/event_processor.py b/UnleashClient/streaming/event_processor.py index 29bb4cf0..197d1435 100644 --- a/UnleashClient/streaming/event_processor.py +++ b/UnleashClient/streaming/event_processor.py @@ -50,6 +50,7 @@ def _apply_delta(self, event_data: Any) -> None: return with self._lock: self._engine.take_state(event_data) + # TODO: backup file def _handle_connected(self, event: Any) -> None: LOGGER.debug("Processing initial hydration data")