Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions ldclient/impl/datasourcev2/polling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import urllib3

from ldclient.config import Config
from ldclient.impl.datasystem import BasisResult, Update
from ldclient.impl.datasystem import BasisResult, SelectorStore, Update
from ldclient.impl.datasystem.protocolv2 import (
Basis,
ChangeSet,
Expand Down Expand Up @@ -96,13 +96,13 @@ def name(self) -> str:
"""Returns the name of the initializer."""
return "PollingDataSourceV2"

def fetch(self) -> BasisResult:
def fetch(self, ss: SelectorStore) -> BasisResult:
"""
Fetch returns a Basis, or an error if the Basis could not be retrieved.
"""
return self._poll()
return self._poll(ss)

def sync(self) -> Generator[Update, None, None]:
def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
"""
sync begins the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
Expand All @@ -111,7 +111,7 @@ def sync(self) -> Generator[Update, None, None]:
log.info("Starting PollingDataSourceV2 synchronizer")
self._stop.clear()
while self._stop.is_set() is False:
result = self._requester.fetch(None)
result = self._requester.fetch(ss.selector())
if isinstance(result, _Fail):
if isinstance(result.exception, UnsuccessfulResponseException):
error_info = DataSourceErrorInfo(
Expand Down Expand Up @@ -170,10 +170,9 @@ def stop(self):
self._task.stop()
self._stop.set()

def _poll(self) -> BasisResult:
def _poll(self, ss: SelectorStore) -> BasisResult:
try:
# TODO(fdv2): Need to pass the selector through
result = self._requester.fetch(None)
result = self._requester.fetch(ss.selector())

if isinstance(result, _Fail):
if isinstance(result.exception, UnsuccessfulResponseException):
Expand Down
17 changes: 10 additions & 7 deletions ldclient/impl/datasourcev2/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ld_eventsource.errors import HTTPStatusError

from ldclient.config import Config
from ldclient.impl.datasystem import Synchronizer, Update
from ldclient.impl.datasystem import SelectorStore, Synchronizer, Update
from ldclient.impl.datasystem.protocolv2 import (
ChangeSetBuilder,
DeleteObject,
Expand Down Expand Up @@ -54,12 +54,10 @@
STREAMING_ENDPOINT = "/sdk/stream"


SseClientBuilder = Callable[[Config], SSEClient]
SseClientBuilder = Callable[[Config, SelectorStore], SSEClient]


# TODO(sdk-1391): Pass a selector-retrieving function through so it can
# re-connect with the last known status.
def create_sse_client(config: Config) -> SSEClient:
def create_sse_client(config: Config, ss: SelectorStore) -> SSEClient:
""" "
create_sse_client creates an SSEClient instance configured to connect
to the LaunchDarkly streaming endpoint.
Expand All @@ -76,12 +74,17 @@ def create_sse_client(config: Config) -> SSEClient:
override_read_timeout=STREAM_READ_TIMEOUT,
)

def query_params() -> dict[str, str]:
selector = ss.selector()
return {"basis": selector.state} if selector.is_defined() else {}

return SSEClient(
connect=ConnectStrategy.http(
url=uri,
headers=http_factory.base_headers,
pool=stream_http_factory.create_pool_manager(1, uri),
urllib3_request_options={"timeout": stream_http_factory.timeout},
query_params=query_params
),
# we'll make error-handling decisions when we see a Fault
error_strategy=ErrorStrategy.always_continue(),
Expand Down Expand Up @@ -118,13 +121,13 @@ def name(self) -> str:
"""
return "streaming"

def sync(self) -> Generator[Update, None, None]:
def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
"""
sync should begin the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
occurs.
"""
self._sse = self._sse_client_builder(self._config)
self._sse = self._sse_client_builder(self._config, ss)
if self._sse is None:
log.error("Failed to create SSE client for streaming updates.")
return
Expand Down
27 changes: 23 additions & 4 deletions ldclient/impl/datasystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from dataclasses import dataclass
from enum import Enum
from threading import Event
from typing import Generator, Optional, Protocol
from typing import Callable, Generator, Optional, Protocol

from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet
from ldclient.impl.datasystem.protocolv2 import Basis, ChangeSet, Selector
from ldclient.impl.util import _Result
from ldclient.interfaces import (
DataSourceErrorInfo,
Expand Down Expand Up @@ -142,6 +142,21 @@ def target_availability(self) -> DataAvailability:
raise NotImplementedError


class SelectorStore(Protocol):
"""
SelectorStore represents a component capable of providing Selectors
for data retrieval.
"""

@abstractmethod
def selector(self) -> Selector:
"""
get_selector should return a Selector object that defines the criteria
for data retrieval.
"""
raise NotImplementedError


BasisResult = _Result[Basis, str]


Expand All @@ -165,10 +180,12 @@ def name(self) -> str:
raise NotImplementedError

@abstractmethod
def fetch(self) -> BasisResult:
def fetch(self, ss: SelectorStore) -> BasisResult:
"""
fetch should retrieve the initial data set for the data source, returning
a Basis object on success, or an error message on failure.

:param ss: A SelectorStore that provides the Selector to use as a basis for data retrieval.
"""
raise NotImplementedError

Expand Down Expand Up @@ -205,11 +222,13 @@ def name(self) -> str:
raise NotImplementedError

@abstractmethod
def sync(self) -> Generator[Update, None, None]:
def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
"""
sync should begin the synchronization process for the data source, yielding
Update objects until the connection is closed or an unrecoverable error
occurs.

:param ss: A SelectorStore that provides the Selector to use as a basis for data retrieval.
"""
raise NotImplementedError

Expand Down
4 changes: 2 additions & 2 deletions ldclient/impl/datasystem/fdv2.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def _run_initializers(self, set_on_ready: Event):
initializer = initializer_builder(self._config)
log.info("Attempting to initialize via %s", initializer.name)

basis_result = initializer.fetch()
basis_result = initializer.fetch(self._store)

if isinstance(basis_result, _Fail):
log.warning("Initializer %s failed: %s", initializer.name, basis_result.error)
Expand Down Expand Up @@ -426,7 +426,7 @@ def _consume_synchronizer_results(
:return: Tuple of (should_remove_sync, fallback_to_fdv1)
"""
try:
for update in synchronizer.sync():
for update in synchronizer.sync(self._store):
log.info("Synchronizer %s update: %s", synchronizer.name, update.state)
if self._stop_event.is_set():
return False, False
Expand Down
13 changes: 7 additions & 6 deletions ldclient/impl/integrations/test_datav2/test_data_sourcev2.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from queue import Empty, Queue
from typing import Generator

from ldclient.impl.datasystem import BasisResult, Update
from ldclient.impl.datasystem import BasisResult, SelectorStore, Update
from ldclient.impl.datasystem.protocolv2 import (
Basis,
ChangeSetBuilder,
Expand All @@ -16,6 +16,7 @@
DataSourceErrorKind,
DataSourceState
)
from ldclient.testing.mock_components import MockSelectorStore


class _TestDataSourceV2:
Expand Down Expand Up @@ -47,7 +48,7 @@ def name(self) -> str:
"""Return the name of this data source."""
return "TestDataV2"

def fetch(self) -> BasisResult:
def fetch(self, ss: SelectorStore) -> BasisResult:
"""
Implementation of the Initializer.fetch method.

Expand Down Expand Up @@ -90,15 +91,15 @@ def fetch(self) -> BasisResult:
except Exception as e:
return _Fail(f"Error fetching test data: {str(e)}")

def sync(self) -> Generator[Update, None, None]:
def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
"""
Implementation of the Synchronizer.sync method.

Yields updates as test data changes occur.
"""

# First yield initial data
initial_result = self.fetch()
initial_result = self.fetch(ss)
if isinstance(initial_result, _Fail):
yield Update(
state=DataSourceState.OFF,
Expand Down Expand Up @@ -143,8 +144,8 @@ def sync(self) -> Generator[Update, None, None]:
)
break

def close(self):
"""Close the data source and clean up resources."""
def stop(self):
"""Stop the data source and clean up resources"""
with self._lock:
if self._closed:
return
Expand Down
15 changes: 8 additions & 7 deletions ldclient/testing/impl/datasourcev2/test_polling_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
)
from ldclient.impl.datasystem.protocolv2 import ChangeSetBuilder, IntentCode
from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success
from ldclient.testing.mock_components import MockSelectorStore


class MockExceptionThrowingPollingRequester: # pylint: disable=too-few-public-methods
Expand All @@ -37,7 +38,7 @@ def test_error_is_returned_on_failure():
mock_requester = MockPollingRequester(_Fail(error="failure message"))
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)

result = ds.fetch()
result = ds.fetch(MockSelectorStore(Selector.no_selector()))

assert isinstance(result, _Fail)
assert result.error == "failure message"
Expand All @@ -50,7 +51,7 @@ def test_error_is_recoverable():
)
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)

result = ds.fetch()
result = ds.fetch(MockSelectorStore(Selector.no_selector()))

assert isinstance(result, _Fail)
assert result.error is not None
Expand All @@ -64,7 +65,7 @@ def test_error_is_unrecoverable():
)
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)

result = ds.fetch()
result = ds.fetch(MockSelectorStore(Selector.no_selector()))

assert isinstance(result, _Fail)
assert result.error is not None
Expand All @@ -78,7 +79,7 @@ def test_handles_transfer_none():
)
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)

result = ds.fetch()
result = ds.fetch(MockSelectorStore(Selector.no_selector()))

assert isinstance(result, _Success)
assert result.value is not None
Expand All @@ -92,7 +93,7 @@ def test_handles_uncaught_exception():
mock_requester = MockExceptionThrowingPollingRequester()
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)

result = ds.fetch()
result = ds.fetch(MockSelectorStore(Selector.no_selector()))

assert isinstance(result, _Fail)
assert result.error is not None
Expand All @@ -111,7 +112,7 @@ def test_handles_transfer_full():
mock_requester = MockPollingRequester(_Success(value=(change_set_result.value, {})))
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)

result = ds.fetch()
result = ds.fetch(MockSelectorStore(Selector.no_selector()))

assert isinstance(result, _Success)
assert result.value is not None
Expand All @@ -129,7 +130,7 @@ def test_handles_transfer_changes():
mock_requester = MockPollingRequester(_Success(value=(change_set_result.value, {})))
ds = PollingDataSource(poll_interval=1.0, requester=mock_requester)

result = ds.fetch()
result = ds.fetch(MockSelectorStore(Selector.no_selector()))

assert isinstance(result, _Success)
assert result.value is not None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)
from ldclient.impl.util import UnsuccessfulResponseException, _Fail, _Success
from ldclient.interfaces import DataSourceErrorKind, DataSourceState
from ldclient.testing.mock_components import MockSelectorStore


class ListBasedRequester:
Expand Down Expand Up @@ -103,7 +104,7 @@ def test_handles_no_changes():
poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result]))
)

valid = next(synchronizer.sync())
valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector())))

assert valid.state == DataSourceState.VALID
assert valid.error is None
Expand All @@ -124,7 +125,7 @@ def test_handles_empty_changeset():
synchronizer = PollingDataSource(
poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result]))
)
valid = next(synchronizer.sync())
valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector())))

assert valid.state == DataSourceState.VALID
assert valid.error is None
Expand Down Expand Up @@ -152,7 +153,7 @@ def test_handles_put_objects():
synchronizer = PollingDataSource(
poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result]))
)
valid = next(synchronizer.sync())
valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector())))

assert valid.state == DataSourceState.VALID
assert valid.error is None
Expand Down Expand Up @@ -183,7 +184,7 @@ def test_handles_delete_objects():
synchronizer = PollingDataSource(
poll_interval=0.01, requester=ListBasedRequester(results=iter([polling_result]))
)
valid = next(synchronizer.sync())
valid = next(synchronizer.sync(MockSelectorStore(Selector.no_selector())))

assert valid.state == DataSourceState.VALID
assert valid.error is None
Expand Down Expand Up @@ -216,7 +217,7 @@ def test_generic_error_interrupts_and_recovers():
results=iter([_Fail(error="error for test"), polling_result])
),
)
sync = synchronizer.sync()
sync = synchronizer.sync(MockSelectorStore(Selector.no_selector()))
interrupted = next(sync)
valid = next(sync)

Expand Down Expand Up @@ -250,7 +251,7 @@ def test_recoverable_error_continues():
poll_interval=0.01,
requester=ListBasedRequester(results=iter([_failure, polling_result])),
)
sync = synchronizer.sync()
sync = synchronizer.sync(MockSelectorStore(Selector.no_selector()))
interrupted = next(sync)
valid = next(sync)

Expand Down Expand Up @@ -288,7 +289,7 @@ def test_unrecoverable_error_shuts_down():
poll_interval=0.01,
requester=ListBasedRequester(results=iter([_failure, polling_result])),
)
sync = synchronizer.sync()
sync = synchronizer.sync(MockSelectorStore(Selector.no_selector()))
off = next(sync)
assert off.state == DataSourceState.OFF
assert off.error is not None
Expand Down
Loading