Skip to content

Conversation

Tymek
Copy link
Member

@Tymek Tymek commented Aug 12, 2025

Description

Added streaming support.

Fixes internal issue CTO-249

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)
  • This change requires a documentation update

How Has This Been Tested?

Please describe the tests that you ran to verify your changes.

  • Unit tests
  • Spec Tests
  • Integration tests / Manual Tests

Checklist:

  • My code follows the style guidelines of this project
  • I have performed a self-review of my own code
  • I have commented my code, particularly in hard-to-understand areas
  • I have made corresponding changes to the documentation
  • My changes generate no new warnings
  • I have added tests that prove my fix is effective or that my feature works
  • New and existing unit tests pass locally with my changes
  • Any dependent changes have been merged and published in downstream modules

@gastonfournier gastonfournier moved this from New to In Progress in Issues and PRs Aug 13, 2025
Comment on lines +369 to +373
else:
# MODE: offline

job_args = base_job_args
job_func = load_features
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python SDK sets up periodic update on bootstrapped toggles. I don't think this is consistent across SDKs

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it's a Python specific thing. It's to work around the incompatibility between the threading model used in this SDK and server tech like gunicorn which uses a forked process model.

@coveralls
Copy link

coveralls commented Aug 19, 2025

Pull Request Test Coverage Report for Build 17217536564

Details

  • 112 of 134 (83.58%) changed or added relevant lines in 5 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage decreased (-2.6%) to 92.397%

Changes Missing Coverage Covered Lines Changed/Added Lines %
UnleashClient/init.py 24 28 85.71%
UnleashClient/streaming/event_processor.py 33 37 89.19%
UnleashClient/streaming/connector.py 51 65 78.46%
Totals Coverage Status
Change from base Build 16809197061: -2.6%
Covered Lines: 559
Relevant Lines: 605

💛 - Coveralls

@Tymek Tymek requested a review from kwasniew August 19, 2025 17:37
@Tymek Tymek marked this pull request as ready for review August 20, 2025 07:47
Copy link
Contributor

@kwasniew kwasniew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any unit or integration tests showing how streaming works?

@Tymek Tymek requested a review from kwasniew August 21, 2025 09:29
Copy link
Contributor

@kwasniew kwasniew left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't see any tests

@Tymek Tymek force-pushed the feat/streaming branch 2 times, most recently from 9f44f35 to 364a973 Compare August 21, 2025 14:43
@Tymek Tymek requested a review from kwasniew August 21, 2025 14:44
@@ -111,7 +113,7 @@ class UnleashClient:
:param event_callback: Function to call if impression events are enabled. WARNING: Depending on your event library, this may have performance implications!
"""

def __init__(
def __init__( # noqa: PLR0915
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was it added?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"too many statements" linting exception

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

26 is... well... a LOT of parameters. I'm open to being told this isn't the right time but gosh this feels like we should fix this at this point

Copy link
Collaborator

@ivanklee86 ivanklee86 Aug 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the person most responsible for this, I would tend to agree but any change would be (kind of by definition) backwards incompatible. I would be on board with updating that but I think it would be a major version bump. (Unless you want a v2 client object...but that way lies maintenance headaches).

If I did this nowadays (with the benefit of like 10 years more experience), I would probably split between required and pseudo-required arguments (url an headers respectively) and put some of the less important/optional configuration (jitter) in an options dataclass or similar.

@@ -136,6 +138,8 @@ def __init__(
scheduler_executor: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Ruby we managed to re-use fetcher_scheduled_executor that is either polling toggle fetcher or streaming. Check usage of https://github.com/Unleash/unleash-ruby-sdk/blob/main/lib/unleash/client.rb#L14 for details. @sighphyre started a great discussion about it here https://github.com/Unleash/unleash-ruby-sdk/pull/248/files#r2262867667. Since Python is similar to Ruby it should be doable too. In Java it was too difficult. In Node I will try to migrate towards this approach too in a subsequent PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! This SDK is a bit more complex than Ruby but the abstractions are more or less sane so I don't think this should be massively challenging

@@ -169,6 +173,9 @@ def __init__(
self.unleash_verbose_log_level = verbose_log_level
self.unleash_event_callback = event_callback
self._ready_callback = build_ready_callback(event_callback)
self.experimental_mode = experimental_mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the fact that we have explosion of fields that work for either polling or streaming. In clear OO design we'd have swappable mechanism for one fetching strategy at a time. I know in Java and .NET we also mix but I plan to fix this in Node. Maybe worth investigating this option in Python?

self._headers = {**headers, "Accept": "text/event-stream"}
self._timeout = request_timeout
self._on_ready = on_ready
self._sse_factory = sse_client_factory
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

creating testability/swapping option is only valuable if you gonnd use this option. From my review it looks like we exposed this option (increased complexity) but we never exercised this option. I think in the production implementation of a connector.py we only need to wrap LD client. More detail why too much dependency inversion can be harmful: https://dannorth.net/blog/cupid-the-back-story/#dependency-inversion-principle

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this. Too much injection is how you end up with a need for IoC containers shudder. No one wants that

self.data = data


class FiniteSSEClient:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current approach with FiniteSSEClient and factory pattern is:

  • More isolated (unit test style)
  • Faster but less realistic
  • Requires production code changes (factory parameter)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Network-level testing (check Java and Ruby, Node is not doing this) would be more realistic because:

  1. Tests the actual SSE client library behavior
  2. Verifies correct handling of SSE format
  3. Catches integration issues
  4. More confidence in production behavior

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code uses responses library for polling mode tests.
It could theoretically mock SSE responses

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this, this is too complex and too delicate of a feature to only lean on unit tests. These don't give me enough confidence that this features works correctly and that we're not going to completely break it by accident in the future

Comment on lines +334 to +351
job_args = {
**base_job_args,
"url": self.unleash_url,
"app_name": self.unleash_app_name,
"instance_id": self.unleash_instance_id,
"headers": {
**base_headers,
"unleash-interval": self.unleash_refresh_interval_str_millis,
},
"custom_options": self.unleash_custom_options,
"request_timeout": self.unleash_request_timeout,
"request_retries": self.unleash_request_retries,
"event_callback": self.unleash_event_callback,
"project": self.unleash_project_name,
}

job_func: Callable
job_func = fetch_and_load_features
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal here was to avoid larger refactor. I'd like to extract 'mode' (fetching strategy / connector) into a separate layer, but in this SDK fetching runs on the same 'scheduler' as metrics. Decoupling this should be done in another PR in my opinion.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it belongs in a different PR but I think that PR should come before this one. This function is now 150 lines, in the Python world that's a monster of a function. I think we're starting to see that show itself in the form of comments like "MODE: Polling" which are trying to add order to the chaos here. IMO this is now too complex and needs to be split up. I want to say the Poller vs Streamer abstraction is probably what we want here

@Tymek Tymek requested a review from sighphyre August 25, 2025 18:35
Copy link
Member

@sighphyre sighphyre left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay so left some initial thoughts. I think we should consider refactoring before we do this PR so we can get a reasonable abstraction that swaps out a Poller for a EventStreamer. There's also 7 lint disables in this PR, currently the SDK has 6, most of which are in tests, so we're more than doubling the linter disables which is a smell for me. I strongly feel we should fix that before merging. I'm not aware of any massive urgency around this feature here so we should be able to spend a little bit more time on this so that future work isn't difficult

if self._thread:
self._thread.join(timeout=5)

def _run(self): # noqa: PLR0912
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Ruff has a point, there's a lot of branching here. I think we should fix that and break this up rather than disable linting

@@ -6,7 +6,7 @@ mmhash3
python-dateutil
requests
semver
yggdrasil-engine
yggdrasil-engine>=1.0.0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need a launchdarkly-eventsource here as well

@@ -136,6 +138,8 @@ def __init__(
scheduler_executor: Optional[str] = None,
multiple_instance_mode: InstanceAllowType = InstanceAllowType.WARN,
event_callback: Optional[Callable[[BaseEvent], None]] = None,
experimental_mode: Optional[dict] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's really unclear to me how to use this. I think it wants a proper type on it so that end users can leverage their type checker

@@ -267,8 +274,10 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
try:
base_headers = {
**self.unleash_custom_headers,
**APPLICATION_HEADERS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why these changes here? What does this fix? What test breaks without it and what test ensures that it now correctly works?

kwargs=job_args,

# Decide upstream connection mode
mode = (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a major issue but I think the belongs somewhere else. Be cool to see a a function that returns an Enum of POLLING | OFFLINE | STREAMING or something like that

self.data = data


class FiniteSSEClient:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with this, this is too complex and too delicate of a feature to only lean on unit tests. These don't give me enough confidence that this features works correctly and that we're not going to completely break it by accident in the future

@@ -111,7 +113,7 @@ class UnleashClient:
:param event_callback: Function to call if impression events are enabled. WARNING: Depending on your event library, this may have performance implications!
"""

def __init__(
def __init__( # noqa: PLR0915
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

26 is... well... a LOT of parameters. I'm open to being told this isn't the right time but gosh this feels like we should fix this at this point

@@ -136,6 +138,8 @@ def __init__(
scheduler_executor: Optional[str] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed! This SDK is a bit more complex than Ruby but the abstractions are more or less sane so I don't think this should be massively challenging

Comment on lines +369 to +373
else:
# MODE: offline

job_args = base_job_args
job_func = load_features
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it's a Python specific thing. It's to work around the incompatibility between the threading model used in this SDK and server tech like gunicorn which uses a forked process model.

@Tymek Tymek marked this pull request as draft August 26, 2025 08:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

5 participants