Skip to content

Commit 69a94f8

Browse files
authored
refactor: connectors abstraction (#367)
1 parent dc141f0 commit 69a94f8

File tree

13 files changed

+581
-296
lines changed

13 files changed

+581
-296
lines changed

UnleashClient/__init__.py

Lines changed: 87 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,17 @@
1010
from apscheduler.executors.pool import ThreadPoolExecutor
1111
from apscheduler.job import Job
1212
from apscheduler.schedulers.background import BackgroundScheduler
13-
from apscheduler.schedulers.base import BaseScheduler
13+
from apscheduler.schedulers.base import STATE_RUNNING, BaseScheduler
1414
from apscheduler.triggers.interval import IntervalTrigger
1515
from yggdrasil_engine.engine import UnleashEngine
1616

1717
from UnleashClient.api import register_client
18+
from UnleashClient.connectors import (
19+
BaseConnector,
20+
BootstrapConnector,
21+
OfflineConnector,
22+
PollingConnector,
23+
)
1824
from UnleashClient.constants import (
1925
DISABLED_VARIATION,
2026
ETAG,
@@ -30,10 +36,8 @@
3036
UnleashEventType,
3137
UnleashReadyEvent,
3238
)
33-
from UnleashClient.loader import load_features
3439
from UnleashClient.periodic_tasks import (
3540
aggregate_and_send_metrics,
36-
fetch_and_load_features,
3741
)
3842

3943
from .cache import BaseCache, FileCache
@@ -185,7 +189,31 @@ def __init__(
185189

186190
self.metrics_headers: dict = {}
187191

188-
# Scheduler bootstrapping
192+
self._init_scheduler(scheduler, scheduler_executor)
193+
194+
if custom_strategies:
195+
self.engine.register_custom_strategies(custom_strategies)
196+
197+
self.strategy_mapping = {**custom_strategies}
198+
199+
# Client status
200+
self.is_initialized = False
201+
202+
# Bootstrapping
203+
if self.unleash_bootstrapped:
204+
BootstrapConnector(
205+
engine=self.engine,
206+
cache=self.cache,
207+
).start()
208+
209+
self.connector: BaseConnector = None
210+
211+
def _init_scheduler(
212+
self, scheduler: Optional[BaseScheduler], scheduler_executor: Optional[str]
213+
) -> None:
214+
"""
215+
Scheduler bootstrapping
216+
"""
189217
# - Figure out the Unleash executor name.
190218
if scheduler and scheduler_executor:
191219
self.unleash_executor_name = scheduler_executor
@@ -208,25 +236,6 @@ def __init__(
208236
executors = {self.unleash_executor_name: ThreadPoolExecutor()}
209237
self.unleash_scheduler = BackgroundScheduler(executors=executors)
210238

211-
if custom_strategies:
212-
self.engine.register_custom_strategies(custom_strategies)
213-
214-
self.strategy_mapping = {**custom_strategies}
215-
216-
# Client status
217-
self.is_initialized = False
218-
219-
# Bootstrapping
220-
if self.unleash_bootstrapped:
221-
load_features(
222-
cache=self.cache,
223-
engine=self.engine,
224-
)
225-
226-
@property
227-
def unleash_refresh_interval_str_millis(self) -> str:
228-
return str(self.unleash_refresh_interval * 1000)
229-
230239
@property
231240
def unleash_metrics_interval_str_millis(self) -> str:
232241
return str(self.unleash_metrics_interval * 1000)
@@ -265,30 +274,14 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
265274
if not self.is_initialized:
266275
# pylint: disable=no-else-raise
267276
try:
277+
start_scheduler = False
268278
base_headers = {
269279
**self.unleash_custom_headers,
270280
"unleash-connection-id": self.connection_id,
271281
"unleash-appname": self.unleash_app_name,
272282
"unleash-sdk": f"{SDK_NAME}:{SDK_VERSION}",
273283
}
274284

275-
self.metrics_headers = {
276-
**base_headers,
277-
"unleash-interval": self.unleash_metrics_interval_str_millis,
278-
}
279-
280-
# Setup
281-
metrics_args = {
282-
"url": self.unleash_url,
283-
"app_name": self.unleash_app_name,
284-
"connection_id": self.connection_id,
285-
"instance_id": self.unleash_instance_id,
286-
"headers": self.metrics_headers,
287-
"custom_options": self.unleash_custom_options,
288-
"request_timeout": self.unleash_request_timeout,
289-
"engine": self.engine,
290-
}
291-
292285
# Register app
293286
if not self.unleash_disable_registration:
294287
register_client(
@@ -304,47 +297,58 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
304297
)
305298

306299
if fetch_toggles:
307-
fetch_headers = {
300+
start_scheduler = True
301+
self.connector = PollingConnector(
302+
engine=self.engine,
303+
cache=self.cache,
304+
scheduler=self.unleash_scheduler,
305+
url=self.unleash_url,
306+
app_name=self.unleash_app_name,
307+
instance_id=self.unleash_instance_id,
308+
headers=base_headers,
309+
custom_options=self.unleash_custom_options,
310+
request_timeout=self.unleash_request_timeout,
311+
request_retries=self.unleash_request_retries,
312+
project=self.unleash_project_name,
313+
scheduler_executor=self.unleash_executor_name,
314+
refresh_interval=self.unleash_refresh_interval,
315+
event_callback=self.unleash_event_callback,
316+
ready_callback=self._ready_callback,
317+
)
318+
else:
319+
start_scheduler = True
320+
self.connector = OfflineConnector(
321+
engine=self.engine,
322+
cache=self.cache,
323+
scheduler=self.unleash_scheduler,
324+
scheduler_executor=self.unleash_executor_name,
325+
refresh_interval=self.unleash_refresh_interval,
326+
refresh_jitter=self.unleash_refresh_jitter,
327+
ready_callback=self._ready_callback,
328+
)
329+
330+
self.connector.start()
331+
332+
if not self.unleash_disable_metrics:
333+
if getattr(self.unleash_scheduler, "state", None) != STATE_RUNNING:
334+
start_scheduler = True
335+
336+
self.metrics_headers = {
308337
**base_headers,
309-
"unleash-interval": self.unleash_refresh_interval_str_millis,
338+
"unleash-interval": self.unleash_metrics_interval_str_millis,
310339
}
311340

312-
job_args = {
341+
metrics_args = {
313342
"url": self.unleash_url,
314343
"app_name": self.unleash_app_name,
344+
"connection_id": self.connection_id,
315345
"instance_id": self.unleash_instance_id,
316-
"headers": fetch_headers,
346+
"headers": self.metrics_headers,
317347
"custom_options": self.unleash_custom_options,
318-
"cache": self.cache,
319-
"engine": self.engine,
320348
"request_timeout": self.unleash_request_timeout,
321-
"request_retries": self.unleash_request_retries,
322-
"project": self.unleash_project_name,
323-
"event_callback": self.unleash_event_callback,
324-
"ready_callback": self._ready_callback,
325-
}
326-
job_func: Callable = fetch_and_load_features
327-
else:
328-
job_args = {
329-
"cache": self.cache,
330349
"engine": self.engine,
331-
"ready_callback": self._ready_callback,
332350
}
333-
job_func = load_features
334-
335-
job_func(**job_args) # type: ignore
336-
# Start periodic jobs
337-
self.unleash_scheduler.start()
338-
self.fl_job = self.unleash_scheduler.add_job(
339-
job_func,
340-
trigger=IntervalTrigger(
341-
seconds=int(self.unleash_refresh_interval),
342-
jitter=self.unleash_refresh_jitter,
343-
),
344-
executor=self.unleash_executor_name,
345-
kwargs=job_args,
346-
)
347-
if not self.unleash_disable_metrics:
351+
348352
self.metric_job = self.unleash_scheduler.add_job(
349353
aggregate_and_send_metrics,
350354
trigger=IntervalTrigger(
@@ -354,6 +358,10 @@ def initialize_client(self, fetch_toggles: bool = True) -> None:
354358
executor=self.unleash_executor_name,
355359
kwargs=metrics_args,
356360
)
361+
362+
if start_scheduler:
363+
self.unleash_scheduler.start()
364+
357365
except Exception as excep:
358366
# Log exceptions during initialization. is_initialized will remain false.
359367
LOGGER.warning(
@@ -396,7 +404,8 @@ def destroy(self) -> None:
396404
397405
You shouldn't need this too much!
398406
"""
399-
self.fl_job.remove()
407+
if self.connector:
408+
self.connector.stop()
400409
if self.metric_job:
401410
self.metric_job.remove()
402411

@@ -412,7 +421,10 @@ def destroy(self) -> None:
412421
engine=self.engine,
413422
)
414423

415-
self.unleash_scheduler.shutdown()
424+
try:
425+
self.unleash_scheduler.shutdown()
426+
except Exception as exc:
427+
LOGGER.warning("Exception during scheduler shutdown: %s", exc)
416428
self.cache.destroy()
417429

418430
@staticmethod
@@ -513,9 +525,9 @@ def get_variant(self, feature_name: str, context: Optional[dict] = None) -> dict
513525
event_type=UnleashEventType.VARIANT,
514526
event_id=uuid.uuid4(),
515527
context=context,
516-
enabled=variant["enabled"],
528+
enabled=bool(variant["enabled"]),
517529
feature_name=feature_name,
518-
variant=variant["name"],
530+
variant=str(variant["name"]),
519531
)
520532

521533
self.unleash_event_callback(event)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
from .base_connector import BaseConnector
2+
from .bootstrap_connector import BootstrapConnector
3+
from .offline_connector import OfflineConnector
4+
from .polling_connector import PollingConnector
5+
6+
__all__ = [
7+
"BaseConnector",
8+
"BootstrapConnector",
9+
"OfflineConnector",
10+
"PollingConnector",
11+
]
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Callable, Optional
3+
4+
from yggdrasil_engine.engine import UnleashEngine
5+
6+
from UnleashClient.cache import BaseCache
7+
from UnleashClient.constants import FEATURES_URL
8+
from UnleashClient.utils import LOGGER
9+
10+
11+
class BaseConnector(ABC):
12+
def __init__(
13+
self,
14+
engine: UnleashEngine,
15+
cache: BaseCache,
16+
ready_callback: Optional[Callable] = None,
17+
):
18+
"""
19+
:param engine: Feature evaluation engine instance (UnleashEngine).
20+
:param cache: Should be the cache class variable from UnleashClient
21+
:param ready_callback: Optional function to call when features are successfully loaded.
22+
"""
23+
self.engine = engine
24+
self.cache = cache
25+
self.ready_callback = ready_callback
26+
27+
@abstractmethod
28+
def start(self):
29+
pass
30+
31+
@abstractmethod
32+
def stop(self):
33+
pass
34+
35+
def load_features(self):
36+
feature_provisioning = self.cache.get(FEATURES_URL)
37+
if not feature_provisioning:
38+
LOGGER.warning(
39+
"Unleash client does not have cached features. "
40+
"Please make sure client can communicate with Unleash server!"
41+
)
42+
return
43+
44+
try:
45+
warnings = self.engine.take_state(feature_provisioning)
46+
if self.ready_callback:
47+
self.ready_callback()
48+
if warnings:
49+
LOGGER.warning(
50+
"Some features were not able to be parsed correctly, they may not evaluate as expected"
51+
)
52+
LOGGER.warning(warnings)
53+
except Exception as e:
54+
LOGGER.error(f"Error loading features: {e}")
55+
LOGGER.debug(
56+
f"Full feature response body from server: {feature_provisioning}"
57+
)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from yggdrasil_engine.engine import UnleashEngine
2+
3+
from UnleashClient.cache import BaseCache
4+
5+
from .base_connector import BaseConnector
6+
7+
8+
class BootstrapConnector(BaseConnector):
9+
def __init__(
10+
self,
11+
engine: UnleashEngine,
12+
cache: BaseCache,
13+
):
14+
self.engine = engine
15+
self.cache = cache
16+
self.job = None
17+
18+
def start(self):
19+
self.load_features()
20+
21+
def stop(self):
22+
pass
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from typing import Callable
2+
3+
from apscheduler.schedulers.background import BackgroundScheduler
4+
from apscheduler.triggers.interval import IntervalTrigger
5+
from yggdrasil_engine.engine import UnleashEngine
6+
7+
from UnleashClient.cache import BaseCache
8+
9+
from .base_connector import BaseConnector
10+
11+
12+
class OfflineConnector(BaseConnector):
13+
def __init__(
14+
self,
15+
engine: UnleashEngine,
16+
cache: BaseCache,
17+
scheduler: BackgroundScheduler,
18+
scheduler_executor: str = "default",
19+
refresh_interval: int = 15,
20+
refresh_jitter: int = None,
21+
ready_callback: Callable = None,
22+
):
23+
self.engine = engine
24+
self.cache = cache
25+
self.ready_callback = ready_callback
26+
self.scheduler = scheduler
27+
self.scheduler_executor = scheduler_executor
28+
self.refresh_interval = refresh_interval
29+
self.refresh_jitter = refresh_jitter
30+
self.job = None
31+
32+
def start(self):
33+
self.load_features()
34+
35+
self.job = self.scheduler.add_job(
36+
self.load_features,
37+
trigger=IntervalTrigger(
38+
seconds=self.refresh_interval, jitter=self.refresh_jitter
39+
),
40+
executor=self.scheduler_executor,
41+
)
42+
43+
if self.ready_callback:
44+
self.ready_callback()
45+
46+
def stop(self):
47+
if self.job:
48+
self.job.remove()
49+
self.job = None

0 commit comments

Comments
 (0)