Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion optimizely/cmab/cmab_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from optimizely.exceptions import CmabFetchError, CmabInvalidResponseError

# Default constants for CMAB requests
DEFAULT_MAX_RETRIES = 3
DEFAULT_MAX_RETRIES = 1
DEFAULT_INITIAL_BACKOFF = 0.1 # in seconds (100 ms)
DEFAULT_MAX_BACKOFF = 10 # in seconds
DEFAULT_BACKOFF_MULTIPLIER = 2.0
Expand Down
51 changes: 45 additions & 6 deletions optimizely/cmab/cmab_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,18 @@
import hashlib
import threading

from typing import Optional, List, TypedDict
from typing import Optional, List, TypedDict, Tuple
from optimizely.cmab.cmab_client import DefaultCmabClient
from optimizely.odp.lru_cache import LRUCache
from optimizely.optimizely_user_context import OptimizelyUserContext, UserAttributes
from optimizely.project_config import ProjectConfig
from optimizely.decision.optimizely_decide_option import OptimizelyDecideOption
from optimizely import logger as _logging
from optimizely.lib import pymmh3 as mmh3

NUM_LOCK_STRIPES = 1000
DEFAULT_CMAB_CACHE_TIMEOUT = 30 * 60 # 30 minutes
DEFAULT_CMAB_CACHE_SIZE = 1000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this needs to be 10000.
Just like in ODP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IN odp we have 10000 here:

DEFAULT_CAPACITY: Final = 10_000



class CmabDecision(TypedDict):
Expand Down Expand Up @@ -65,26 +68,40 @@ def _get_lock_index(self, user_id: str, rule_id: str) -> int:
return hash_value % NUM_LOCK_STRIPES

def get_decision(self, project_config: ProjectConfig, user_context: OptimizelyUserContext,
rule_id: str, options: List[str]) -> CmabDecision:
rule_id: str, options: List[str]) -> Tuple[CmabDecision, List[str]]:

lock_index = self._get_lock_index(user_context.user_id, rule_id)
with self.locks[lock_index]:
return self._get_decision(project_config, user_context, rule_id, options)

def _get_decision(self, project_config: ProjectConfig, user_context: OptimizelyUserContext,
rule_id: str, options: List[str]) -> CmabDecision:
rule_id: str, options: List[str]) -> Tuple[CmabDecision, List[str]]:

filtered_attributes = self._filter_attributes(project_config, user_context, rule_id)
reasons = []

if OptimizelyDecideOption.IGNORE_CMAB_CACHE in options:
return self._fetch_decision(rule_id, user_context.user_id, filtered_attributes)
reason = f"Ignoring CMAB cache for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
cmab_decision = self._fetch_decision(rule_id, user_context.user_id, filtered_attributes)
return cmab_decision, reasons

if OptimizelyDecideOption.RESET_CMAB_CACHE in options:
reason = f"Resetting CMAB cache for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
self.cmab_cache.reset()

cache_key = self._get_cache_key(user_context.user_id, rule_id)

if OptimizelyDecideOption.INVALIDATE_USER_CMAB_CACHE in options:
reason = f"Invalidating CMAB cache for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
self.cmab_cache.remove(cache_key)

cached_value = self.cmab_cache.lookup(cache_key)
Expand All @@ -93,17 +110,39 @@ def _get_decision(self, project_config: ProjectConfig, user_context: OptimizelyU

if cached_value:
if cached_value['attributes_hash'] == attributes_hash:
return CmabDecision(variation_id=cached_value['variation_id'], cmab_uuid=cached_value['cmab_uuid'])
reason = f"CMAB cache hit for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
return CmabDecision(variation_id=cached_value['variation_id'],
cmab_uuid=cached_value['cmab_uuid']), reasons
else:
reason = (
f"CMAB cache attributes mismatch for user '{user_context.user_id}' "
f"and rule '{rule_id}', fetching new decision."
)
if self.logger:
self.logger.debug(reason)
reasons.append(reason)
self.cmab_cache.remove(cache_key)
else:
reason = f"CMAB cache miss for user '{user_context.user_id}' and rule '{rule_id}'"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)

cmab_decision = self._fetch_decision(rule_id, user_context.user_id, filtered_attributes)
reason = f"CMAB decision is {cmab_decision}"
if self.logger:
self.logger.debug(reason)
reasons.append(reason)

self.cmab_cache.save(cache_key, {
'attributes_hash': attributes_hash,
'variation_id': cmab_decision['variation_id'],
'cmab_uuid': cmab_decision['cmab_uuid'],
})
return cmab_decision
return cmab_decision, reasons

def _fetch_decision(self, rule_id: str, user_id: str, attributes: UserAttributes) -> CmabDecision:
cmab_uuid = str(uuid.uuid4())
Expand Down
3 changes: 2 additions & 1 deletion optimizely/decision_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,10 @@ def _get_decision_for_cmab_experiment(
# User is in CMAB allocation, proceed to CMAB decision
try:
options_list = list(options) if options is not None else []
cmab_decision = self.cmab_service.get_decision(
cmab_decision, cmab_reasons = self.cmab_service.get_decision(
project_config, user_context, experiment.id, options_list
)
decide_reasons.extend(cmab_reasons)
return {
"error": False,
"result": cmab_decision,
Expand Down
2 changes: 1 addition & 1 deletion optimizely/event/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class Signal:

def __init__(
self,
event_dispatcher: Optional[type[EventDispatcher] | CustomEventDispatcher] = None,
event_dispatcher: Optional[EventDispatcher | CustomEventDispatcher] = None,
logger: Optional[_logging.Logger] = None,
start_on_init: bool = False,
event_queue: Optional[queue.Queue[UserEvent | Signal]] = None,
Expand Down
31 changes: 16 additions & 15 deletions optimizely/optimizely.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,13 @@
from .optimizely_user_context import OptimizelyUserContext, UserAttributes
from .project_config import ProjectConfig
from .cmab.cmab_client import DefaultCmabClient, CmabRetryConfig
from .cmab.cmab_service import DefaultCmabService, CmabCacheValue
from .cmab.cmab_service import DefaultCmabService, CmabCacheValue, DEFAULT_CMAB_CACHE_SIZE, DEFAULT_CMAB_CACHE_TIMEOUT

if TYPE_CHECKING:
# prevent circular dependency by skipping import at runtime
from .user_profile import UserProfileService
from .helpers.event_tag_utils import EventTags

# Default constants for CMAB cache
DEFAULT_CMAB_CACHE_TIMEOUT = 30 * 60 * 1000 # 30 minutes in milliseconds
DEFAULT_CMAB_CACHE_SIZE = 1000


class Optimizely:
""" Class encapsulating all SDK functionality. """
Expand All @@ -77,6 +73,7 @@ def __init__(
default_decide_options: Optional[list[str]] = None,
event_processor_options: Optional[dict[str, Any]] = None,
settings: Optional[OptimizelySdkSettings] = None,
cmab_service: Optional[DefaultCmabService] = None,
) -> None:
""" Optimizely init method for managing Custom projects.

Expand Down Expand Up @@ -178,16 +175,20 @@ def __init__(
self.event_builder = event_builder.EventBuilder()

# Initialize CMAB components
self.cmab_client = DefaultCmabClient(
retry_config=CmabRetryConfig(),
logger=self.logger
)
self.cmab_cache: LRUCache[str, CmabCacheValue] = LRUCache(DEFAULT_CMAB_CACHE_SIZE, DEFAULT_CMAB_CACHE_TIMEOUT)
self.cmab_service = DefaultCmabService(
cmab_cache=self.cmab_cache,
cmab_client=self.cmab_client,
logger=self.logger
)
if cmab_service:
self.cmab_service = cmab_service
else:
self.cmab_client = DefaultCmabClient(
retry_config=CmabRetryConfig(),
logger=self.logger
)
self.cmab_cache: LRUCache[str, CmabCacheValue] = LRUCache(DEFAULT_CMAB_CACHE_SIZE,
DEFAULT_CMAB_CACHE_TIMEOUT)
self.cmab_service = DefaultCmabService(
cmab_cache=self.cmab_cache,
cmab_client=self.cmab_client,
logger=self.logger
)
self.decision_service = decision_service.DecisionService(self.logger, user_profile_service, self.cmab_service)
self.user_profile_service = user_profile_service

Expand Down
72 changes: 70 additions & 2 deletions optimizely/optimizely_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
from .event_dispatcher import EventDispatcher, CustomEventDispatcher
from .notification_center import NotificationCenter
from .optimizely import Optimizely
from .odp.lru_cache import LRUCache
from .cmab.cmab_client import DefaultCmabClient, CmabRetryConfig
from .cmab.cmab_service import DefaultCmabService, CmabCacheValue, DEFAULT_CMAB_CACHE_TIMEOUT, DEFAULT_CMAB_CACHE_SIZE

if TYPE_CHECKING:
# prevent circular dependenacy by skipping import at runtime
Expand All @@ -36,6 +39,9 @@ class OptimizelyFactory:
max_event_flush_interval: Optional[int] = None
polling_interval: Optional[float] = None
blocking_timeout: Optional[int] = None
cmab_cache_size: Optional[int] = None
cmab_cache_ttl: Optional[int] = None
cmab_custom_cache: Optional[LRUCache[str, CmabCacheValue]] = None

@staticmethod
def set_batch_size(batch_size: int) -> int:
Expand Down Expand Up @@ -75,6 +81,51 @@ def set_blocking_timeout(blocking_timeout: int) -> int:
OptimizelyFactory.blocking_timeout = blocking_timeout
return OptimizelyFactory.blocking_timeout

@staticmethod
def set_cmab_cache_size(cache_size: int, logger: Optional[optimizely_logger.Logger] = None) -> Optional[int]:
""" Convenience method for setting the maximum number of items in CMAB cache.
Args:
cache_size: Maximum number of items in CMAB cache.
logger: Optional logger for logging messages.
"""
logger = logger or optimizely_logger.NoOpLogger()

if not isinstance(cache_size, int) or cache_size <= 0:
logger.error(
f"CMAB cache size is invalid, setting to default size {DEFAULT_CMAB_CACHE_SIZE}."
)
return None

OptimizelyFactory.cmab_cache_size = cache_size
return OptimizelyFactory.cmab_cache_size

@staticmethod
def set_cmab_cache_ttl(cache_ttl: int, logger: Optional[optimizely_logger.Logger] = None) -> Optional[int]:
""" Convenience method for setting CMAB cache TTL.
Args:
cache_ttl: Time in seconds for cache entries to live.
logger: Optional logger for logging messages.
"""
logger = logger or optimizely_logger.NoOpLogger()

if not isinstance(cache_ttl, (int, float)) or cache_ttl <= 0:
logger.error(
f"CMAB cache TTL is invalid, setting to default TTL {DEFAULT_CMAB_CACHE_TIMEOUT}."
)
return None

OptimizelyFactory.cmab_cache_ttl = int(cache_ttl)
return OptimizelyFactory.cmab_cache_ttl

@staticmethod
def set_cmab_custom_cache(custom_cache: LRUCache[str, CmabCacheValue]) -> LRUCache[str, CmabCacheValue]:
""" Convenience method for setting custom CMAB cache.
Args:
custom_cache: Cache implementation with lookup, save, remove, and reset methods.
"""
OptimizelyFactory.cmab_custom_cache = custom_cache
return OptimizelyFactory.cmab_custom_cache

@staticmethod
def default_instance(sdk_key: str, datafile: Optional[str] = None) -> Optimizely:
""" Returns a new optimizely instance..
Expand Down Expand Up @@ -104,9 +155,17 @@ def default_instance(sdk_key: str, datafile: Optional[str] = None) -> Optimizely
notification_center=notification_center,
)

# Initialize CMAB components
cmab_client = DefaultCmabClient(retry_config=CmabRetryConfig(), logger=logger)
cmab_cache = OptimizelyFactory.cmab_custom_cache or LRUCache(
OptimizelyFactory.cmab_cache_size or DEFAULT_CMAB_CACHE_SIZE,
OptimizelyFactory.cmab_cache_ttl or DEFAULT_CMAB_CACHE_TIMEOUT
)
cmab_service = DefaultCmabService(cmab_cache, cmab_client, logger)

optimizely = Optimizely(
datafile, None, logger, error_handler, None, None, sdk_key, config_manager, notification_center,
event_processor
event_processor, cmab_service=cmab_service
)
return optimizely

Expand Down Expand Up @@ -174,7 +233,16 @@ def custom_instance(
notification_center=notification_center,
)

# Initialize CMAB components
cmab_client = DefaultCmabClient(retry_config=CmabRetryConfig(), logger=logger)
cmab_cache = OptimizelyFactory.cmab_custom_cache or LRUCache(
OptimizelyFactory.cmab_cache_size or DEFAULT_CMAB_CACHE_SIZE,
OptimizelyFactory.cmab_cache_ttl or DEFAULT_CMAB_CACHE_TIMEOUT
)
cmab_service = DefaultCmabService(cmab_cache, cmab_client, logger)

return Optimizely(
datafile, event_dispatcher, logger, error_handler, skip_json_validation, user_profile_service,
sdk_key, config_manager, notification_center, event_processor, settings=settings
sdk_key, config_manager, notification_center, event_processor, settings=settings,
cmab_service=cmab_service
)
10 changes: 5 additions & 5 deletions tests/test_cmab_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_returns_decision_from_cache_when_valid(self):
"cmab_uuid": "uuid-123"
}

decision = self.cmab_service.get_decision(
decision, _ = self.cmab_service.get_decision(
self.mock_project_config, self.mock_user_context, "exp1", []
)

Expand All @@ -72,7 +72,7 @@ def test_ignores_cache_when_option_given(self):
self.mock_cmab_client.fetch_decision.return_value = "varB"
expected_attributes = {"age": 25, "location": "USA"}

decision = self.cmab_service.get_decision(
decision, _ = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
Expand Down Expand Up @@ -105,7 +105,7 @@ def test_invalidates_user_cache_when_option_given(self):
def test_resets_cache_when_option_given(self):
self.mock_cmab_client.fetch_decision.return_value = "varD"

decision = self.cmab_service.get_decision(
decision, _ = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
Expand All @@ -128,7 +128,7 @@ def test_new_decision_when_hash_changes(self):
expected_hash = self.cmab_service._hash_attributes(expected_attribute)
expected_key = self.cmab_service._get_cache_key("user123", "exp1")

decision = self.cmab_service.get_decision(self.mock_project_config, self.mock_user_context, "exp1", [])
decision, _ = self.cmab_service.get_decision(self.mock_project_config, self.mock_user_context, "exp1", [])
self.mock_cmab_cache.remove.assert_called_once_with(expected_key)
self.mock_cmab_cache.save.assert_called_once_with(
expected_key,
Expand Down Expand Up @@ -171,7 +171,7 @@ def test_only_cmab_attributes_passed_to_client(self):
}
self.mock_cmab_client.fetch_decision.return_value = "varF"

decision = self.cmab_service.get_decision(
decision, _ = self.cmab_service.get_decision(
self.mock_project_config,
self.mock_user_context,
"exp1",
Expand Down
11 changes: 7 additions & 4 deletions tests/test_decision_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,10 +792,13 @@ def test_get_variation_cmab_experiment_user_in_traffic_allocation(self):
'logger') as mock_logger:

# Configure CMAB service to return a decision
mock_cmab_service.get_decision.return_value = {
'variation_id': '111151',
'cmab_uuid': 'test-cmab-uuid-123'
}
mock_cmab_service.get_decision.return_value = (
{
'variation_id': '111151',
'cmab_uuid': 'test-cmab-uuid-123'
},
[] # reasons list
)

# Call get_variation with the CMAB experiment
variation_result = self.decision_service.get_variation(
Expand Down
Loading