-
Notifications
You must be signed in to change notification settings - Fork 66
Streaming support #365
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming support #365
Changes from 8 commits
fe6832e
db0a837
86dd667
f2eab4f
b25e820
5c5e542
f7596b7
394ac5f
6bbc162
fecb283
9ffc01a
c4ffef3
2cafba5
ac8ce42
23be09e
2072ab4
d6b0293
2beb595
3e4500b
a76d913
3cf0222
f626b57
4fcfce9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,6 +16,7 @@ | |
|
|
||
| from UnleashClient.api import register_client | ||
| from UnleashClient.constants import ( | ||
| APPLICATION_HEADERS, | ||
| DISABLED_VARIATION, | ||
| ETAG, | ||
| METRIC_LAST_SENT_TIME, | ||
|
|
@@ -33,8 +34,10 @@ | |
| from UnleashClient.loader import load_features | ||
| from UnleashClient.periodic_tasks import ( | ||
| aggregate_and_send_metrics, | ||
| fetch_and_apply_delta, | ||
| fetch_and_load_features, | ||
| ) | ||
| from UnleashClient.streaming.manager import StreamingManager | ||
|
|
||
| from .cache import BaseCache, FileCache | ||
| from .utils import LOGGER, InstanceAllowType, InstanceCounter | ||
|
|
@@ -111,7 +114,7 @@ class UnleashClient: | |
| :param event_callback: Function to call if impression events are enabled. WARNING: Depending on your event library, this may have performance implications! | ||
| """ | ||
|
|
||
| def __init__( | ||
| def __init__( # noqa: PLR0915 | ||
| self, | ||
| url: str, | ||
| app_name: str, | ||
|
|
@@ -136,6 +139,8 @@ def __init__( | |
| scheduler_executor: Optional[str] = None, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In Ruby we managed to re-use fetcher_scheduled_executor that is either polling toggle fetcher or streaming. Check usage of https://github.com/Unleash/unleash-ruby-sdk/blob/main/lib/unleash/client.rb#L14 for details. @sighphyre started a great discussion about it here https://github.com/Unleash/unleash-ruby-sdk/pull/248/files#r2262867667. Since Python is similar to Ruby it should be doable too. In Java it was too difficult. In Node I will try to migrate towards this approach too in a subsequent PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed! This SDK is a bit more complex than Ruby but the abstractions are more or less sane so I don't think this should be massively challenging |
||
| multiple_instance_mode: InstanceAllowType = InstanceAllowType.WARN, | ||
| event_callback: Optional[Callable[[BaseEvent], None]] = None, | ||
| experimental_mode: Optional[dict] = None, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's really unclear to me how to use this. I think it wants a proper type on it so that end users can leverage their type checker |
||
| sse_client_factory: Optional[Callable] = None, | ||
| ) -> None: | ||
| custom_headers = custom_headers or {} | ||
| custom_options = custom_options or {} | ||
|
|
@@ -169,6 +174,9 @@ def __init__( | |
| self.unleash_verbose_log_level = verbose_log_level | ||
| self.unleash_event_callback = event_callback | ||
| self._ready_callback = build_ready_callback(event_callback) | ||
| self.experimental_mode = experimental_mode | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't like the fact that we have explosion of fields that work for either polling or streaming. In clear OO design we'd have swappable mechanism for one fetching strategy at a time. I know in Java and .NET we also mix but I plan to fix this in Node. Maybe worth investigating this option in Python? |
||
| self._stream_manager: Optional[StreamingManager] = None | ||
| self._sse_client_factory = sse_client_factory | ||
|
|
||
| self._do_instance_check(multiple_instance_mode) | ||
|
|
||
|
|
@@ -267,8 +275,10 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: | |
| try: | ||
| base_headers = { | ||
| **self.unleash_custom_headers, | ||
| **APPLICATION_HEADERS, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why these changes here? What does this fix? What test breaks without it and what test ensures that it now correctly works? |
||
| "unleash-connection-id": self.connection_id, | ||
| "unleash-appname": self.unleash_app_name, | ||
| "unleash-instanceid": self.unleash_instance_id, | ||
| "unleash-sdk": f"{SDK_NAME}:{SDK_VERSION}", | ||
| } | ||
|
|
||
|
|
@@ -277,7 +287,6 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: | |
| "unleash-interval": self.unleash_metrics_interval_str_millis, | ||
| } | ||
|
|
||
| # Setup | ||
| metrics_args = { | ||
| "url": self.unleash_url, | ||
| "app_name": self.unleash_app_name, | ||
|
|
@@ -289,6 +298,12 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: | |
| "engine": self.engine, | ||
| } | ||
|
|
||
| base_job_args = { | ||
| "cache": self.cache, | ||
| "engine": self.engine, | ||
| "ready_callback": self._ready_callback, | ||
| } | ||
|
|
||
| # Register app | ||
| if not self.unleash_disable_registration: | ||
| register_client( | ||
|
|
@@ -303,47 +318,72 @@ def initialize_client(self, fetch_toggles: bool = True) -> None: | |
| self.unleash_request_timeout, | ||
| ) | ||
|
|
||
| if fetch_toggles: | ||
| fetch_headers = { | ||
| **base_headers, | ||
| "unleash-interval": self.unleash_refresh_interval_str_millis, | ||
| } | ||
|
|
||
| job_args = { | ||
| "url": self.unleash_url, | ||
| "app_name": self.unleash_app_name, | ||
| "instance_id": self.unleash_instance_id, | ||
| "headers": fetch_headers, | ||
| "custom_options": self.unleash_custom_options, | ||
| "cache": self.cache, | ||
| "engine": self.engine, | ||
| "request_timeout": self.unleash_request_timeout, | ||
| "request_retries": self.unleash_request_retries, | ||
| "project": self.unleash_project_name, | ||
| "event_callback": self.unleash_event_callback, | ||
| "ready_callback": self._ready_callback, | ||
| } | ||
| job_func: Callable = fetch_and_load_features | ||
| else: | ||
| job_args = { | ||
| "cache": self.cache, | ||
| "engine": self.engine, | ||
| "ready_callback": self._ready_callback, | ||
| } | ||
| job_func = load_features | ||
|
|
||
| job_func(**job_args) # type: ignore | ||
| # Start periodic jobs | ||
| self.unleash_scheduler.start() | ||
| self.fl_job = self.unleash_scheduler.add_job( | ||
| job_func, | ||
| trigger=IntervalTrigger( | ||
| seconds=int(self.unleash_refresh_interval), | ||
| jitter=self.unleash_refresh_jitter, | ||
| ), | ||
| executor=self.unleash_executor_name, | ||
| kwargs=job_args, | ||
| # Decide upstream connection mode | ||
| mode = ( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's not a major issue but I think the belongs somewhere else. Be cool to see a a function that returns an Enum of POLLING | OFFLINE | STREAMING or something like that |
||
| (self.experimental_mode or {}).get("type") | ||
| if self.experimental_mode | ||
| else None | ||
| ) | ||
| format_mode = ( | ||
| (self.experimental_mode or {}).get("format") | ||
| if self.experimental_mode | ||
| else None | ||
| ) | ||
|
|
||
| if fetch_toggles and mode == "streaming": | ||
| # MODE: streaming | ||
| self._stream_manager = StreamingManager( | ||
| url=self.unleash_url, | ||
| headers=base_headers, | ||
| request_timeout=self.unleash_request_timeout, | ||
| engine=self.engine, | ||
| on_ready=self._ready_callback, | ||
| sse_client_factory=self._sse_client_factory, | ||
| custom_options=self.unleash_custom_options, | ||
| ) | ||
| self._stream_manager.start() | ||
| else: | ||
| if fetch_toggles: | ||
| # MODE: polling | ||
|
|
||
| job_args = { | ||
| **base_job_args, | ||
| "url": self.unleash_url, | ||
| "app_name": self.unleash_app_name, | ||
| "instance_id": self.unleash_instance_id, | ||
| "headers": { | ||
| **base_headers, | ||
| "unleash-interval": self.unleash_refresh_interval_str_millis, | ||
| }, | ||
| "custom_options": self.unleash_custom_options, | ||
| "request_timeout": self.unleash_request_timeout, | ||
| "request_retries": self.unleash_request_retries, | ||
| "project": self.unleash_project_name, | ||
| "event_callback": self.unleash_event_callback, | ||
| } | ||
|
|
||
| if format_mode == "delta": | ||
| job_func: Callable = fetch_and_apply_delta | ||
| else: | ||
| job_func: Callable = fetch_and_load_features | ||
| else: | ||
| # MODE: offline | ||
|
|
||
| job_args = base_job_args | ||
| job_func = load_features | ||
|
Comment on lines
+352
to
+356
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Python SDK sets up periodic update on bootstrapped toggles. I don't think this is consistent across SDKs There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, it's a Python specific thing. It's to work around the incompatibility between the threading model used in this SDK and server tech like gunicorn which uses a forked process model. |
||
|
|
||
| job_func(**job_args) # initial fetch | ||
| self.unleash_scheduler.start() | ||
| self.fl_job = self.unleash_scheduler.add_job( | ||
| job_func, | ||
| trigger=IntervalTrigger( | ||
| seconds=int(self.unleash_refresh_interval), | ||
| jitter=self.unleash_refresh_jitter, | ||
| ), | ||
| executor=self.unleash_executor_name, | ||
| kwargs=job_args, | ||
| ) | ||
|
|
||
| if not self.unleash_disable_metrics: | ||
| self.metric_job = self.unleash_scheduler.add_job( | ||
| aggregate_and_send_metrics, | ||
|
|
@@ -396,7 +436,11 @@ def destroy(self) -> None: | |
|
|
||
| You shouldn't need this too much! | ||
| """ | ||
| self.fl_job.remove() | ||
| try: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why the new try catch? Seems to only be for polling and if was never added, streaming won't raise an exception here |
||
| if self.fl_job: | ||
| self.fl_job.remove() | ||
| except Exception: # best-effort | ||
| pass | ||
| if self.metric_job: | ||
| self.metric_job.remove() | ||
|
|
||
|
|
@@ -411,7 +455,11 @@ def destroy(self) -> None: | |
| request_timeout=self.unleash_request_timeout, | ||
| engine=self.engine, | ||
| ) | ||
|
|
||
| if self._stream_manager: | ||
| try: | ||
| self._stream_manager.stop() | ||
| except Exception: | ||
| pass | ||
| self.unleash_scheduler.shutdown() | ||
| self.cache.destroy() | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| from typing import Optional, Tuple | ||
|
|
||
| import requests | ||
| from requests.adapters import HTTPAdapter | ||
| from urllib3 import Retry | ||
|
|
||
| from UnleashClient.constants import DELTA_URL | ||
| from UnleashClient.utils import LOGGER, log_resp_info | ||
|
|
||
|
|
||
| # pylint: disable=broad-except | ||
| def get_feature_deltas( | ||
| url: str, | ||
| app_name: str, | ||
| instance_id: str, | ||
| headers: dict, | ||
| custom_options: dict, | ||
| request_timeout: int, | ||
| request_retries: int, | ||
| cached_etag: str = "", | ||
| ) -> Tuple[Optional[str], str]: | ||
| """ | ||
| Retrieves feature deltas from the Unleash server. | ||
|
|
||
| Returns a tuple of (raw_json_string_or_None, etag). | ||
| """ | ||
| try: | ||
| LOGGER.info("Getting feature deltas.") | ||
|
|
||
| request_specific_headers = { | ||
| "UNLEASH-APPNAME": app_name, | ||
| "UNLEASH-INSTANCEID": instance_id, | ||
| } | ||
|
|
||
| if cached_etag: | ||
| request_specific_headers["If-None-Match"] = cached_etag | ||
|
|
||
| base_url = f"{url}{DELTA_URL}" | ||
|
|
||
| adapter = HTTPAdapter( | ||
| max_retries=Retry(total=request_retries, status_forcelist=[500, 502, 504]) | ||
| ) | ||
| with requests.Session() as session: | ||
| session.mount("https://", adapter) | ||
| session.mount("http://", adapter) | ||
| resp = session.get( | ||
| base_url, | ||
| headers={**headers, **request_specific_headers}, | ||
| timeout=request_timeout, | ||
| **custom_options, | ||
| ) | ||
|
|
||
| if resp.status_code not in [200, 304]: | ||
| log_resp_info(resp) | ||
| LOGGER.warning( | ||
| "Unleash Client delta fetch failed due to unexpected HTTP status code: %s", | ||
| resp.status_code, | ||
| ) | ||
| raise Exception("Unleash Client delta fetch failed!") | ||
|
|
||
| etag = resp.headers.get("etag", "") | ||
|
|
||
| if resp.status_code == 304: | ||
| return None, etag | ||
|
|
||
| return resp.text, etag | ||
| except Exception as exc: | ||
| LOGGER.exception("Unleash Client delta fetch failed due to exception: %s", exc) | ||
|
|
||
| return None, "" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,4 @@ | ||
| # ruff: noqa: F401 | ||
| from .fetch_and_apply_delta import fetch_and_apply_delta | ||
| from .fetch_and_load import fetch_and_load_features | ||
| from .send_metrics import aggregate_and_send_metrics |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| import uuid | ||
| from typing import Callable, Optional | ||
|
|
||
| from yggdrasil_engine.engine import UnleashEngine | ||
|
|
||
| from UnleashClient.api.delta import get_feature_deltas | ||
| from UnleashClient.cache import BaseCache | ||
| from UnleashClient.constants import ETAG | ||
| from UnleashClient.events import UnleashEventType, UnleashFetchedEvent | ||
| from UnleashClient.utils import LOGGER | ||
|
|
||
|
|
||
| def fetch_and_apply_delta( | ||
| url: str, | ||
| app_name: str, | ||
| instance_id: str, | ||
| headers: dict, | ||
| custom_options: dict, | ||
| cache: BaseCache, | ||
| request_timeout: int, | ||
| request_retries: int, | ||
| engine: UnleashEngine, | ||
| event_callback: Optional[Callable] = None, | ||
| ready_callback: Optional[Callable] = None, | ||
| ) -> None: | ||
| """ | ||
| Fetch delta payload and apply to engine with engine.take_state(raw_json). | ||
| Fires READY on first hydration (hydration event inside delta stream) and FETCHED on each successful delta. | ||
| """ | ||
| (delta_payload, etag) = get_feature_deltas( | ||
| url, | ||
| app_name, | ||
| instance_id, | ||
| headers, | ||
| custom_options, | ||
| request_timeout, | ||
| request_retries, | ||
| cache.get(ETAG), | ||
| ) | ||
|
|
||
| if etag: | ||
| cache.set(ETAG, etag) | ||
|
|
||
| if not delta_payload: | ||
| LOGGER.debug("No delta returned from server, nothing to apply.") | ||
| return | ||
|
|
||
| try: | ||
| engine.take_state(delta_payload) | ||
|
|
||
| if event_callback: | ||
| event = UnleashFetchedEvent( | ||
| event_type=UnleashEventType.FETCHED, | ||
| event_id=uuid.uuid4(), | ||
| raw_features=delta_payload, | ||
| ) | ||
| event_callback(event) | ||
|
|
||
| # First hydration event as ready signal | ||
| if ready_callback and '"type":"hydration"' in delta_payload: | ||
| ready_callback() | ||
| except Exception as exc: | ||
| LOGGER.warning("Failed to apply delta: %s", exc) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| # ruff: noqa: F401 | ||
| # Streaming package |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was it added?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"too many statements" linting exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
26 is... well... a LOT of parameters. I'm open to being told this isn't the right time but gosh this feels like we should fix this at this point
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As the person most responsible for this, I would tend to agree but any change would be (kind of by definition) backwards incompatible. I would be on board with updating that but I think it would be a major version bump. (Unless you want a v2 client object...but that way lies maintenance headaches).
If I did this nowadays (with the benefit of like 10 years more experience), I would probably split between required and pseudo-required arguments (url an headers respectively) and put some of the less important/optional configuration (jitter) in an options dataclass or similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this. Something to keep in mind for the next major maybe