diff --git a/faktory/__init__.py b/faktory/__init__.py index b646bb5..b659096 100644 --- a/faktory/__init__.py +++ b/faktory/__init__.py @@ -1,8 +1,10 @@ from contextlib import contextmanager -from .worker import Worker +from faktory.configuration import config + from .client import Client from .exceptions import * +from .worker import Worker __version__ = "0.4.0" __url__ = "https://github.com/cdrx/faktory_worker_python" diff --git a/faktory/config.toml b/faktory/config.toml new file mode 100644 index 0000000..811afa7 --- /dev/null +++ b/faktory/config.toml @@ -0,0 +1,25 @@ + +# Client configuration placeholder. Useful if we plan +# to further extend the client's capabilities +[client] + + + + +# Worker based configuration. Migrating defaults here +# so users can have smart defaults on a by-worker basis. +[worker] +heartbeat_seconds = 15 +concurrency = 1 +use_threads = false +labels = ["python"] +queues = ["default"] + + # Worker disconnect timeouts in seconds due to different reasons + [worker.disconnect] + # Timeout for control + C + keyboard = 15 + # Timeout due to a worker pool failure + pool_failure = 15 + # Timeout when asked to shut down by Faktory + server_requested = 25 diff --git a/faktory/configuration.py b/faktory/configuration.py new file mode 100644 index 0000000..cb832b5 --- /dev/null +++ b/faktory/configuration.py @@ -0,0 +1,282 @@ +import datetime +import os +import re +from typing import Optional, Union, cast + +import toml +from box import Box + +from faktory.utilities import collections +from faktory.utilities.collections import DotDict + +DEFAULT_CONFIG = os.path.join(os.path.dirname(__file__), "config.toml") +# User config, for specifying either private defaults or ones that +# need to be guaranteed to overwrite the defaults we provide. +USER_CONFIG = os.getenv("FAKTORY_USER_CONFIG_PATH", "~/.faktory/config.toml") +# Environment variables leading with FAKTORY are parsed and loaded, +# all others are ignored. Key in the environment expected to be in +# the format `FAKTORY__{SECTION}__[...{SUBSECTIONS}]` +ENV_VAR_PREFIX = "FAKTORY" + +INTERPOLATION_REGEX = re.compile(r"\${(.[^${}]*)}") + + +class Config(Box): + """ + A config is a Box subclass + """ + + def copy(self) -> "Config": + """ + Create a recursive copy of the config. Each level of the Config is a new Config object, so + modifying keys won't affect the original Config object. However, values are not + deep-copied, and mutations can affect the original. + """ + new_config = Config() + for key, value in self.items(): + if isinstance(value, Config): + value = value.copy() + new_config[key] = value + return new_config + + +def string_to_type(val: str) -> Union[bool, int, float, str]: + """ + Helper function for transforming string env var values into typed values. + Maps: + - "true" (any capitalization) to `True` + - "false" (any capitalization) to `False` + - integers to `int` + - floats to `float` + Arguments: + - val (str): the string value of an environment variable + Returns: + Union[bool, int, float, str]: the type-cast env var value + """ + + # bool + if val.upper() == "TRUE": + return True + elif val.upper() == "FALSE": + return False + + # int + try: + val_as_int = int(val) + if str(val_as_int) == val: + return val_as_int + except Exception: + pass + + # float + try: + val_as_float = float(val) + if str(val_as_float) == val: + return val_as_float + except Exception: + pass + + # return string value + return val + + +def interpolate_env_vars(env_var: str) -> Optional[Union[bool, int, float, str]]: + """ + Expands (potentially nested) env vars by repeatedly applying + `expandvars` and `expanduser` until interpolation stops having + any effect. + """ + if not env_var or not isinstance(env_var, str): + return env_var + + counter = 0 + + while counter < 10: + interpolated = os.path.expanduser(os.path.expandvars(str(env_var))) + if interpolated == env_var: + # if a change was made, apply string-to-type casts; otherwise leave alone + # this is because we don't want to override TOML type-casting if this function + # is applied to a non-interpolated value + if counter > 1: + interpolated = string_to_type(interpolated) # type: ignore + return interpolated + else: + env_var = interpolated + counter += 1 + + return None + + +def create_user_config(dest_path: str, source_path: str = DEFAULT_CONFIG) -> None: + """ + Copies the default configuration to a user-customizable file at `dest_path` + """ + dest_path = cast(str, interpolate_env_vars(dest_path)) + if os.path.isfile(dest_path): + raise ValueError("File already exists: {}".format(dest_path)) + os.makedirs(os.path.dirname(dest_path), exist_ok=True) + + with open(dest_path, "w") as dest: + with open(source_path, "r") as source: + dest.write(source.read()) + + +# Validation ------------------------------------------------------------------ + + +def validate_config(config: Config) -> None: + """ + Validates that the configuration file is valid. + - keys do not shadow Config methods + Note that this is performed when the config is first loaded, but not after. + """ + + def check_valid_keys(config: Config) -> None: + """ + Recursively check that keys do not shadow methods of the Config object + """ + invalid_keys = dir(Config) + for k, v in config.items(): + if k in invalid_keys: + raise ValueError('Invalid config key: "{}"'.format(k)) + if isinstance(v, Config): + check_valid_keys(v) + + check_valid_keys(config) + + +# Load configuration ---------------------------------------------------------- + + +def load_toml(path: str) -> dict: + """ + Loads a config dictionary from TOML + """ + return toml.load(path) + + +def interpolate_config(config: dict, env_var_prefix: str = None) -> Config: + """ + Processes a config dictionary, such as the one loaded from `load_toml`. + """ + + # toml supports nested dicts, so we work with a flattened representation to do any + # requested interpolation + flat_config = collections.dict_to_flatdict(config) + + # --------------------- Interpolate env vars ----------------------- + # check if any env var sets a configuration value with the format: + # [ENV_VAR_PREFIX]__[Section]__[Optional Sub-Sections...]__[Key] = Value + # and if it does, add it to the config file. + + if env_var_prefix: + + for env_var, env_var_value in os.environ.items(): + if env_var.startswith(env_var_prefix + "__"): + + # strip the prefix off the env var + env_var_option = env_var[len(env_var_prefix + "__") :] + + # make sure the resulting env var has at least one delimitied section and key + if "__" not in env_var: + continue + + # env vars with escaped characters are interpreted as literal "\", which + # Python helpfully escapes with a second "\". This step makes sure that + # escaped characters are properly interpreted. + value = cast(str, env_var_value.encode().decode("unicode_escape")) + + config_option = collections.CompoundKey( + env_var_option.lower().split("__") + ) + + flat_config[config_option] = string_to_type( + cast(str, interpolate_env_vars(value)) + ) + + # interpolate any env vars referenced + for k, v in list(flat_config.items()): + flat_config[k] = interpolate_env_vars(v) + + # --------------------- Interpolate other config keys ----------------- + # TOML doesn't support references to other keys... but we do! + # This has the potential to lead to nasty recursions, so we check at most 10 times. + # we use a set called "keys_to_check" to track only the ones of interest, so we aren't + # checking every key every time. + + keys_to_check = set(flat_config.keys()) + + for _ in range(10): + + # iterate over every key and value to check if the value uses interpolation + for k in list(keys_to_check): + + # if the value isn't a string, it can't be a reference, so we exit + if not isinstance(flat_config[k], str): + keys_to_check.remove(k) + continue + + # see if the ${...} syntax was used in the value and exit if it wasn't + match = INTERPOLATION_REGEX.search(flat_config[k]) + if not match: + keys_to_check.remove(k) + continue + + # the matched_string includes "${}"; the matched_key is just the inner value + matched_string = match.group(0) + matched_key = match.group(1) + + # get the referenced key from the config value + ref_key = collections.CompoundKey(matched_key.split(".")) + # get the value corresponding to the referenced key + ref_value = flat_config.get(ref_key, "") + + # if the matched was the entire value, replace it with the interpolated value + if flat_config[k] == matched_string: + flat_config[k] = ref_value + # if it was a partial match, then drop the interpolated value into the string + else: + flat_config[k] = flat_config[k].replace( + matched_string, str(ref_value), 1 + ) + + return cast(Config, collections.flatdict_to_dict(flat_config, dct_class=Config)) + + +def load_configuration( + path: str, user_config_path: str = None, env_var_prefix: str = None, +) -> Config: + """ + Loads a configuration from a known location. + Args: + - path (str): the path to the TOML configuration file + - user_config_path (str): an optional path to a user config file. If a user config + is provided, it will be used to update the main config prior to interpolation + - env_var_prefix (str): any env vars matching this prefix will be used to create + configuration values + Returns: + - Config + """ + + # load default config + default_config = load_toml(path) + + # load user config + if user_config_path and os.path.isfile(str(interpolate_env_vars(user_config_path))): + user_config = load_toml(user_config_path) + # merge user config into default config + default_config = cast( + dict, collections.merge_dicts(default_config, user_config) + ) + + # interpolate after user config has already been merged + config = interpolate_config(default_config, env_var_prefix=env_var_prefix) + + validate_config(config) + return config + + +# load prefect configuration +config = load_configuration( + path=DEFAULT_CONFIG, user_config_path=USER_CONFIG, env_var_prefix=ENV_VAR_PREFIX, +) diff --git a/faktory/utilities/__init__.py b/faktory/utilities/__init__.py new file mode 100644 index 0000000..8b642de --- /dev/null +++ b/faktory/utilities/__init__.py @@ -0,0 +1 @@ +import faktory.utilities.collections diff --git a/faktory/utilities/collections.py b/faktory/utilities/collections.py new file mode 100644 index 0000000..084bd35 --- /dev/null +++ b/faktory/utilities/collections.py @@ -0,0 +1,211 @@ +from collections.abc import MutableMapping +from typing import Any, Dict, Iterable, Iterator, Optional, Union, cast + +DictLike = Union[Dict, "DotDict"] + + +class CompoundKey(tuple): + pass + + +class DotDict(MutableMapping): + """ + A `dict` type object that also supports attribute ("dot") access. This + should be considered an extension to a standard `dict`, and is used + in place of a third party library (like `Box`) to avoid dependencies. + + Only items that you'd expect to be able to access on a class + (valid identifiers) will be accessible via dot notation. This + means strings that start with numbers, special characters, + and double underscores will not be accessible via dot notation. + + Args: + - init_dict (Optional[Dict]): Dictionary to initialize the + `DotDict` with. If `None`, initializes an empty container. + - **kwargs (Optional[Any]): Key value pairs to initialize the + `DotDict`. + + Example: + ```python + dd = DotDict({"a": 34}, b=56, c=set()) + dd.a # 34 + dd['b'] # 56 + dd.c # set() + ``` + """ + + def __init__(self, init_dict: Optional[DictLike] = None, **kwargs: Any): + # a DotDict could have a key that is the same as "update" + if init_dict: + super().update(init_dict) + super().update(kwargs) + + def get(self, key: str, default: Any = None) -> Any: + """ + This method is specifically designed with MyPy integration in mind, + as it complains that the `.get` is inherited incorrectly. + + Args: + - key (str): The key whose value we want to retrieve + - default (Any): A default value to return if not found + + Returns: + - Any: Value of they key if found, otherwise default + """ + + return super().get(key, default) + + def __getitem__(self, key: str) -> Any: + return self.__dict__[key] + + def __setitem__(self, key: str, value: Any) -> None: + self.__dict__[key] = value + + def __setattr__(self, attr: str, value: Any) -> None: + self[attr] = value + + def __iter__(self) -> Iterator[str]: + return iter(self.__dict__.keys()) + + def __delitem__(self, key: str) -> None: + del self.__dict__[key] + + def __len__(self) -> int: + return len(self.__dict__) + + def __repr__(self) -> str: + if len(self) == 0: + return "<{}>".format(type(self).__name__) + + return "<{}: {}>".format( + type(self).__name__, ", ".join(sorted(repr(k) for k in self.keys())) + ) + + def copy(self) -> "DotDict": + """ + Creates a shallow copy of the current `DotDict`. + """ + return type(self)(self.__dict__.copy()) + + def to_dict(self) -> Dict: + """ + Converts the current `DotDict` (and any nested `DotDict` items) + to an appropriately nested dictionary + """ + # Mypy cast + return cast(dict, as_nested_dict(self, dct_class=dict)) + + +def as_nested_dict( + obj: Union[DictLike, Iterable[DictLike]], dct_class: type = DotDict +) -> Union[DictLike, Iterable[DictLike]]: + """ + Given an object formatted as a dictionary, transforms it (and any nested dictionaries) + into the provided `dct_class`. + + Args: + - obj (Union[DictLike, Iterable[DictLike]]): An object that's formatted as a dict or + iterable of dicts. + - dct_class (type): The target dict class to use (defaults to DotDict) + + Returns: + - Union[DictLike, Iterable[DictLike]]: obj formatted as `dct_class`, or an + iterable of `dct_class`. + """ + + # Iterable of dict to apply to each element + if isinstance(obj, (list, tuple, set)): + return type(obj)([as_nested_dict(d, dct_class) for d in obj]) + + elif isinstance(obj, (dict, DotDict)): + # DoctDicts could have keys that shadow `update` and `items`, so we + # peek into `__dict__` to avoid uses where those are overriden. + + return dct_class( + { + k: as_nested_dict(v, dct_class) + for k, v in getattr(obj, "__dict__", obj).items() + } + ) + + # Handle cases where the object isn't actually a dictlike or iterable + # of dictlike, just to make this safe to call recursively on entire dicts. + return obj + + +def merge_dicts(d1: DictLike, d2: DictLike) -> DictLike: + """ + Updates `d1` from `d2` by replacing each `(k, v1)` pair in `d1` + with the corresponding `(k, v2)` pair in `d2`. + + If the value of each pair is itself a dict, then that value + is updated recursively. + + Args: + - d1 (MutableMapping): A dictionary to be updated + - d2 (MutableMapping): A dictionary used for replacement + + Returns: + - A `MutableMapping` with the two dict's content merged + """ + + new_mapping = d1.copy() + for k, v in d2.items(): + if isinstance(new_mapping.get(k), MutableMapping) and isinstance( + v, MutableMapping + ): + new_mapping[k] = merge_dicts(new_mapping[k], d2[k]) + else: + new_mapping[k] = d2[k] + + return new_mapping + + +def dict_to_flatdict(dct: dict, parent: CompoundKey = None) -> dict: + """Converts a (nested) dictionary to a flattened representation. + Each key of the flat dict will be a CompoundKey tuple containing the "chain of keys" + for the corresponding value. + Args: + - dct (dict): The dictionary to flatten + - parent (CompoundKey, optional): Defaults to `None`. The parent key + (you shouldn't need to set this) + Returns: + - dict: A flattened dict + """ + + items = [] # type: list + parent = parent or CompoundKey() + for k, v in dct.items(): + k_parent = CompoundKey(parent + (k,)) + if isinstance(v, dict): + items.extend(dict_to_flatdict(v, parent=k_parent).items()) + else: + items.append((k_parent, v)) + return dict(items) + + +def flatdict_to_dict(dct: dict, dct_class: type = None) -> MutableMapping: + """ + Converts a flattened dictionary back to a nested dictionary. + + Args: + - dct (dict): The dictionary to be nested. Each key should be a + `CompoundKey`, as generated by `dict_to_flatdict()` + - dct_class (type, optional): the type of the result; defaults to `dict` + Returns: + - MutableMapping: A `MutableMapping` used to represent a nested dictionary + """ + + result = (dct_class or dict)() # type: MutableMapping + for k, v in dct.items(): + if isinstance(k, CompoundKey): + current_dict = result + for ki in k[:-1]: + current_dict = current_dict.setdefault( # type: ignore + ki, (dct_class or dict)() + ) + current_dict[k[-1]] = v + else: + result[k] = v + + return result diff --git a/faktory/utilities/configuration.py b/faktory/utilities/configuration.py new file mode 100644 index 0000000..ecb7520 --- /dev/null +++ b/faktory/utilities/configuration.py @@ -0,0 +1,41 @@ +""" +Utilities for interacting with Faktory configuration. These are only intended +to be used for testing. +""" +from contextlib import contextmanager +from typing import Iterator + +import faktory +from faktory.configuration import Config + + +@contextmanager +def set_temporary_config(temp_config: dict) -> Iterator: + """ + Temporarily sets configuration values for the duration of the context manager. + Args: + - temp_config (dict): a dictionary containing (possibly nested) configuration keys and values. + Nested configuration keys should be supplied as `.`-delimited strings. + Example: + ```python + with set_temporary_config({'setting': 1, 'nested.setting': 2}): + assert faktory.config.setting == 1 + assert faktory.config.nested.setting == 2 + ``` + """ + try: + old_config = faktory.config.copy() + + for key, value in temp_config.items(): + # the `key` might be a dot-delimited string, so we split on "." and set the value + cfg = faktory.config + subkeys = key.split(".") + for subkey in subkeys[:-1]: + cfg = cfg.setdefault(subkey, Config()) + cfg[subkeys[-1]] = value + + yield faktory.config + + finally: + faktory.config.clear() + faktory.config.update(old_config) diff --git a/faktory/worker.py b/faktory/worker.py index 41b8ab6..8d46a2e 100644 --- a/faktory/worker.py +++ b/faktory/worker.py @@ -10,13 +10,15 @@ from datetime import datetime, timedelta from typing import Callable, Iterable +from faktory.configuration import config + from ._proto import Connection Task = namedtuple("Task", ["name", "func", "bind"]) class Worker: - send_heartbeat_every = 15 # seconds + send_heartbeat_every = config.worker.heartbeat_seconds is_quiet = False is_disconnecting = False @@ -53,15 +55,14 @@ def __init__(self, *args, **kwargs): :param executor: Set the class of the process executor that will be used. By default concurrenct.futures.ProcessPoolExecutor is used. :type executor: class """ - self.concurrency = kwargs.pop("concurrency", 1) + self.concurrency = kwargs.pop("concurrency", config.worker.concurrency) self.log = kwargs.pop("log", logging.getLogger("faktory.worker")) - self._queues = kwargs.pop("queues", ["default",]) - self._executor_class = kwargs.pop( - "executor", + self._queues = kwargs.pop("queues", config.worker.queues) + self._executor_class = ( ThreadPoolExecutor - if kwargs.pop("use_threads", False) - else ProcessPoolExecutor, + if kwargs.pop("use_threads", config.worker.use_threads) + else ProcessPoolExecutor ) self._last_heartbeat = None self._tasks = dict() @@ -72,7 +73,7 @@ def __init__(self, *args, **kwargs): signal.signal(signal.SIGTERM, self.handle_sigterm) if "labels" not in kwargs: - kwargs["labels"] = ["python"] + kwargs["labels"] = config.worker.labels self.labels = kwargs["labels"] if "worker_id" not in kwargs: @@ -165,10 +166,10 @@ def run(self): self.log.info( "Shutdown: waiting up to 15 seconds for workers to finish current tasks" ) - self.disconnect(wait=15) + self.disconnect(wait=config.worker.disconnect.keyboard) except (BrokenProcessPool, BrokenThreadPool): self.log.info("Shutting down due to pool failure") - self.disconnect(force=True, wait=15) + self.disconnect(force=True, wait=config.worker.disconnect.pool_failure) break if self.faktory.is_connected: @@ -340,7 +341,7 @@ def heartbeat(self) -> None: self.log.warning( "Faktory has asked this worker to shutdown, will cancel any pending tasks still running 25s time" ) - self.disconnect(wait=25) + self.disconnect(wait=config.worker.disconnect.server_requested) self._last_heartbeat = datetime.now() @property diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..2f1fe11 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +python-box >= 3.4.4, < 5.0 diff --git a/setup.py b/setup.py index 3394611..4d6cada 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,8 @@ import sys + from setuptools import setup +install_requires = open("requirements.txt").read().strip().split("\n") dev_requires = open("dev-requirements.txt").read().strip().split("\n") test_requires = open("test-requirements.txt").read().strip().split("\n") @@ -15,6 +17,7 @@ name="faktory", version="0.4.0", description="Python worker for the Faktory project", + install_requires=install_requires, extras_require=extras, classifiers=[ "Development Status :: 3 - Alpha", diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_configuration.py b/tests/test_configuration.py new file mode 100644 index 0000000..eb74e87 --- /dev/null +++ b/tests/test_configuration.py @@ -0,0 +1,330 @@ +import datetime +import os +import shlex +import subprocess +import sys +import tempfile +import uuid +from concurrent.futures import Executor, ProcessPoolExecutor, ThreadPoolExecutor +from typing import List + +import pytest + +import faktory +from faktory import Worker, configuration +from faktory.configuration import Config +from faktory.utilities.configuration import set_temporary_config + +template = b""" + debug = false + [general] + x = 1 + y = "hi" + [general.nested] + x = "${general.x}" + x_interpolated = "${general.x} + 1" + y = "${general.y} or bye" + [interpolation] + key = "x" + value = "${general.nested.${interpolation.key}}" + bad_value = "${general.bad_key}" + [env_vars] + interpolated_path = "$PATH" + interpolated_from_non_string_key_bool = "${env_vars.true}" + interpolated_from_non_string_key_string = "${env_vars.true} string" + not_interpolated_path = "xxx$PATHxxx" + [logging] + format = "log-format" + [secrets] + password = "1234" + very_private = "000" + [worker] + heartbeat_seconds = 5 + concurrency = 4 + use_threads = false + labels = ["python", "testing"] + queues = ["testing"] + [worker.disconnect] + keyboard = 20 + pool_failure = 20 + server_requested = 30 + """ + + +@pytest.fixture +def test_config_file_path(): + with tempfile.TemporaryDirectory() as test_config_dir: + test_config_loc = os.path.join(test_config_dir, "test_config.toml") + with open(test_config_loc, "wb") as test_config: + test_config.write(template) + yield test_config_loc + + +@pytest.fixture +def config(test_config_file_path, monkeypatch): + + monkeypatch.setenv("FAKTORY_TEST__ENV_VARS__NEW_KEY", "TEST") + monkeypatch.setenv("FAKTORY_TEST__ENV_VARS__TWICE__NESTED__NEW_KEY", "TEST") + monkeypatch.setenv("FAKTORY_TEST__ENV_VARS__TRUE", "true") + monkeypatch.setenv("FAKTORY_TEST__ENV_VARS__FALSE", "false") + monkeypatch.setenv("FAKTORY_TEST__ENV_VARS__INT", "10") + monkeypatch.setenv("FAKTORY_TEST__ENV_VARS__NEGATIVE_INT", "-10") + monkeypatch.setenv("FAKTORY_TEST__ENV_VARS__FLOAT", "7.5") + monkeypatch.setenv("FAKTORY_TEST__ENV_VARS__NEGATIVE_FLOAT", "-7.5") + monkeypatch.setenv("PATH", "1/2/3") + monkeypatch.setenv( + "FAKTORY_TEST__ENV_VARS__ESCAPED_CHARACTERS", r"line 1\nline 2\rand 3\tand 4" + ) + + yield configuration.load_configuration( + test_config_file_path, env_var_prefix="FAKTORY_TEST" + ) + + +def test_keys(config): + assert "debug" in config + assert "general" in config + assert "nested" in config.general + assert "x" not in config + + +def test_getattr_missing(config): + with pytest.raises(AttributeError, match="object has no attribute"): + config.hello + + +def test_debug(config): + assert config.debug is False + + +def test_general(config): + assert config.general.x == 1 + assert config.general.y == "hi" + + +def test_general_nested(config): + assert config.general.nested.x == config.general.x == 1 + assert config.general.nested.x_interpolated == "1 + 1" + assert config.general.nested.y == "hi or bye" + + +def test_interpolation(config): + assert config.interpolation.value == config.general.nested.x == 1 + + +def test_env_var_interpolation(config): + assert config.env_vars.interpolated_path == os.environ.get("PATH") + + +def test_string_to_type_function(): + + assert configuration.string_to_type("true") is True + assert configuration.string_to_type("True") is True + assert configuration.string_to_type("TRUE") is True + assert configuration.string_to_type("trUe") is True + assert configuration.string_to_type("false") is False + assert configuration.string_to_type("False") is False + assert configuration.string_to_type("FALSE") is False + assert configuration.string_to_type("falSe") is False + + assert configuration.string_to_type("1") == 1 + assert configuration.string_to_type("1.5") == 1.5 + + assert configuration.string_to_type("-1") == -1 + assert configuration.string_to_type("-1.5") == -1.5 + + assert configuration.string_to_type("x") == "x" + + +def test_env_var_interpolation_with_type_assignment(config): + assert config.env_vars.true is True + assert config.env_vars.false is False + assert config.env_vars.int == 10 + assert config.env_vars.negative_int == -10 + assert config.env_vars.float == 7.5 + assert config.env_vars.negative_float == -7.5 + + +def test_env_var_interpolation_with_type_interpolation(config): + assert config.env_vars.interpolated_from_non_string_key_bool is True + assert config.env_vars.interpolated_from_non_string_key_string == "True string" + + +def test_env_var_interpolation_doesnt_match_internal_dollar_sign(config): + assert config.env_vars.not_interpolated_path == "xxx$PATHxxx" + + +def test_env_var_interpolation_with_nonexistant_key(config): + assert config.interpolation.bad_value == "" + + +def test_env_var_overrides_new_key(config): + assert config.env_vars.new_key == "TEST" + + +def test_env_var_creates_nested_keys(config): + assert config.env_vars.twice.nested.new_key == "TEST" + + +def test_env_var_escaped(config): + assert config.env_vars.escaped_characters == "line 1\nline 2\rand 3\tand 4" + + +def test_copy_leaves_values_mutable(config): + + config = Config(config, default_box=True) + config.x.y.z = [1] + new = config.copy() + assert new.x.y.z == [1] + new.x.y.z.append(2) + assert config.x.y.z == [1, 2] + + +def test_copy_doesnt_make_keys_mutable(config): + + new = config.copy() + new.general.z = 1 + assert "z" not in config.general + + +class TestUserConfig: + def test_load_user_config(self, test_config_file_path): + + with tempfile.TemporaryDirectory() as user_config_dir: + user_config_loc = os.path.join(user_config_dir, "test_config.toml") + with open(user_config_loc, "wb") as user_config: + user_config.write( + b""" + [general] + x = 2 + [user] + foo = "bar" + """ + ) + config = configuration.load_configuration( + path=test_config_file_path, user_config_path=user_config_loc + ) + + # check that user values are loaded + assert config.general.x == 2 + assert config.user.foo == "bar" + + # check that default values are preserved + assert config.general.y == "hi" + + # check that interpolation takes place after user config is loaded + assert config.general.nested.x == 2 + + +class TestConfigValidation: + def test_invalid_keys_raise_error(self): + + with tempfile.TemporaryDirectory() as test_config_dir: + test_config_loc = os.path.join(test_config_dir, "test_config.toml") + with open(test_config_loc, "wb") as test_config: + test_config.write( + b""" + [outer] + x = 1 + [outer.keys] + a = "b" + """ + ) + + with pytest.raises(ValueError): + configuration.load_configuration(test_config_loc) + + def test_invalid_env_var_raises_error(self, monkeypatch): + monkeypatch.setenv("FAKTORY_TEST__X__Y__KEYS__Z", "TEST") + + with tempfile.TemporaryDirectory() as test_config_dir: + test_config_loc = os.path.join(test_config_dir, "test_config.toml") + with open(test_config_loc, "wb") as test_config: + test_config.write(b"") + with pytest.raises(ValueError): + configuration.load_configuration( + test_config_loc, env_var_prefix="FAKTORY_TEST" + ) + + def test_mixed_case_keys_are_ok(self): + with tempfile.TemporaryDirectory() as test_config_dir: + test_config_loc = os.path.join(test_config_dir, "test_config.toml") + with open(test_config_loc, "wb") as test_config: + test_config.write( + b""" + [SeCtIoN] + KeY = 1 + """ + ) + + config = configuration.load_configuration(test_config_loc) + + assert "KeY" in config.SeCtIoN + assert config.SeCtIoN.KeY == 1 + + def test_env_vars_are_interpolated_as_lower_case(self, monkeypatch): + + monkeypatch.setenv("FAKTORY_TEST__SECTION__KEY", "2") + + with tempfile.TemporaryDirectory() as test_config_dir: + test_config_loc = os.path.join(test_config_dir, "test_config.toml") + with open(test_config_loc, "wb") as test_config: + test_config.write( + b""" + [SeCtIoN] + KeY = 1 + """ + ) + + config = configuration.load_configuration( + test_config_loc, env_var_prefix="FAKTORY_TEST" + ) + + assert "KeY" in config.SeCtIoN + assert config.SeCtIoN.KeY == 1 + assert config.section.key == 2 + + +class TestWorkerUsesConfig: + @pytest.mark.parametrize( + "flag,expected", [(False, ProcessPoolExecutor), (True, ThreadPoolExecutor)] + ) + def test_uses_executor(self, flag: bool, expected: Executor): + with set_temporary_config({"worker.use_threads": flag}): + + worker = Worker() + assert worker._executor_class == expected + + @pytest.mark.parametrize( + "labels", [["python"], ["python", "testing"], ["python", "testing", "random"]] + ) + def test_uses_labels(self, labels: List[str]): + with set_temporary_config({"worker.labels": labels}): + worker = Worker() + assert worker.labels == labels + + @pytest.mark.parametrize( + "queues", [["default"], ["default", "testing"], ["testing", "urgent"]] + ) + def test_uses_queues(self, queues: List[str]): + with set_temporary_config({"worker.queues": queues}): + worker = Worker() + assert worker._queues == queues + + @pytest.mark.parametrize("concurrency", [1, 2, 3, 4]) + def test_uses_concurrency(self, concurrency: int): + with set_temporary_config({"worker.concurrency": concurrency}): + worker = Worker() + assert worker.concurrency == concurrency + + @pytest.mark.parametrize("kwargs", [{"concurrency": 5, "queues": ["tests"]}]) + def test_uses_provided_over_config(self, kwargs): + with set_temporary_config( + { + "worker.concurrency": kwargs.get("concurrency"), + "worker.queues": kwargs.get("queues"), + } + ): + worker = Worker(**kwargs) + assert worker.concurrency == kwargs.get("concurrency") + assert worker._queues == kwargs.get("queues") diff --git a/tests/utilities/test_collections.py b/tests/utilities/test_collections.py new file mode 100644 index 0000000..78f8e9f --- /dev/null +++ b/tests/utilities/test_collections.py @@ -0,0 +1,281 @@ +import json +import types + +import pytest + +from faktory.utilities import collections +from faktory.utilities.collections import DotDict, as_nested_dict, merge_dicts + + +@pytest.fixture +def nested_dict(): + return {1: 2, 2: {1: 2, 3: 4}, 3: {1: 2, 3: {4: 5, 6: {7: 8}}}} + + +@pytest.fixture(params=[dict(another=500), DotDict(another=500)]) +def mutable_mapping(request): + "MutableMapping objects to test with" + return request.param + + +class TestDotDict: + def test_initialization_with_kwargs(self): + d = DotDict(car=10, attr="string", other=lambda x: {}) + assert "another" not in d + assert "car" in d + assert "attr" in d + assert "other" in d + + def test_initialization_with_mutable_mapping(self, mutable_mapping): + d = DotDict(mutable_mapping) + assert "car" not in d + assert "another" in d + + def test_update_with_kwargs(self): + d = DotDict(car=10, attr="string", other=lambda x: {}) + assert "another" not in d + d.update(another=500) + assert "another" in d + assert d["another"] == 500 + + def test_update_with_mutable_mapping(self, mutable_mapping): + d = DotDict({"car": 10, "attr": "string", "other": lambda x: {}}) + assert "another" not in d + d.update(mutable_mapping) + assert "another" in d + + def test_len(self): + d = DotDict({"car": 10, "attr": "string", "other": lambda x: {}}) + assert len(d) == 3 + a = DotDict() + assert len(a) == 0 + a.update(new=4) + assert len(a) == 1 + del d["car"] + assert len(d) == 2 + + def test_attr_updates_and_key_updates_agree(self): + d = DotDict(data=5) + d.data += 1 + assert d["data"] == 6 + d["new"] = "value" + assert d.new == "value" + d.another_key = "another_value" + assert d["another_key"] == "another_value" + + def test_del_with_getitem(self): + d = DotDict(data=5) + del d["data"] + assert "data" not in d + assert len(d) == 0 + + def test_del_with_attr(self): + d = DotDict(data=5) + del d.data + assert "data" not in d + assert len(d) == 0 + + def test_get(self): + d = DotDict(data=5) + assert d.get("data") == 5 + assert d.get("no_data") is None + assert d.get("no_data", "fallback") == "fallback" + + def test_setitem(self): + d = DotDict() + d["a"] = 1 + assert d["a"] == 1 + d["a"] = 2 + assert d["a"] == 2 + + def test_setitem_nonstring_key(self): + d = DotDict() + d[1] = 1 + assert d[1] == 1 + d[1] = 2 + assert d[1] == 2 + + def test_initialize_from_nonstring_keys(self): + d = DotDict({1: 1, "a": 2}) + assert d[1] == 1 and d["a"] == 2 + + def test_repr_sorts_mixed_keys(self): + d = DotDict() + assert repr(d) == "" + d["a"] = 1 + d[1] = 1 + d["b"] = 1 + assert repr(d) == "" + + def test_copy(self): + d = DotDict(a=1, b=2, c=3) + assert d == d.copy() + + def test_eq_empty(self): + assert DotDict() == DotDict() + + def test_eq_empty_dict(self): + assert DotDict() == {} + + def test_eq_complex(self): + x = dict(x=1, y=dict(z=[3, 4, dict(a=5)])) + assert as_nested_dict(x) == as_nested_dict(x) + + def test_eq_complex_dict(self): + x = dict(x=1, y=dict(z=[3, 4, dict(a=5)])) + assert as_nested_dict(x) == x + + def test_keyerror_is_thrown_when_accessing_nonexistent_key(self): + d = DotDict(data=5) + with pytest.raises(KeyError): + d["nothing"] + + def test_attributeerror_is_thrown_when_accessing_nonexistent_attr(self): + d = DotDict(data=5) + with pytest.raises(AttributeError): + d.nothing + + def test_iter(self): + d = DotDict(data=5, car="best") + res = set() + for item in d: + res.add(item) + assert res == {"data", "car"} + + def test_setdefault_works(self): + d = DotDict(car="best") + d.setdefault("data", 5) + assert d["data"] == 5 + assert d["car"] == "best" + + def test_items(self): + d = DotDict(data=5, car="best") + res = set() + for k, v in d.items(): + res.add((k, v)) + assert res == {("data", 5), ("car", "best")} + + def test_clear_clears_keys_and_attrs(self): + d = DotDict(data=5, car="best") + assert "data" in d + d.clear() + assert "data" not in d + assert len(d) == 0 + d.new_key = 63 + assert "new_key" in d + d.clear() + assert len(d) == 0 + assert "new_key" not in d + + def test_dotdict_splats(self): + d = DotDict(data=5) + identity = lambda **kwargs: kwargs + assert identity(**d) == {"data": 5} + + def test_dotdict_is_not_json_serializable_with_default_encoder(self): + + with pytest.raises(TypeError): + json.dumps(DotDict(x=1)) + + def test_dotdict_to_dict(self): + d = DotDict(x=5, y=DotDict(z="zzz", qq=DotDict())) + assert d.to_dict() == {"x": 5, "y": {"z": "zzz", "qq": {}}} + + def test_dotdict_to_dict_with_lists_of_dicts(self): + d = DotDict(x=5, y=DotDict(z=[DotDict(abc=10, qq=DotDict())])) + assert d.to_dict() == {"x": 5, "y": {"z": [{"abc": 10, "qq": {}}]}} + + +def test_as_nested_dict_defaults_dotdict(): + orig_d = dict(a=1, b=[2, dict(c=3)], d=dict(e=[dict(f=4)])) + dotdict = as_nested_dict(orig_d) + assert isinstance(dotdict, DotDict) + assert dotdict.a == 1 + assert dotdict.b[1].c == 3 + assert dotdict.d.e[0].f == 4 + + +def test_as_nested_dict_dct_class(): + orig_d = dict(a=1, b=[2, dict(c=3)], d=dict(e=[dict(f=4)])) + dot_dict_d = as_nested_dict(orig_d, DotDict) + dict_d = as_nested_dict(dot_dict_d, dict) + assert type(dict_d) is dict + assert type(dict_d["d"]["e"][0]) is dict + + +@pytest.mark.parametrize("dct_class", [dict, DotDict]) +def test_merge_simple_dicts(dct_class): + a = dct_class(x=1, y=2, z=3) + b = dct_class(z=100, a=101) + + # merge b into a + assert merge_dicts(a, b) == dct_class(x=1, y=2, z=100, a=101) + + # merge a into b + assert merge_dicts(b, a) == dct_class(x=1, y=2, z=3, a=101) + + +@pytest.mark.parametrize("dct_class", [dict, DotDict]) +def test_merge_nested_dicts_reverse_order(dct_class): + a = dct_class(x=dct_class(one=1, two=2), y=dct_class(three=3, four=4), z=0) + b = dct_class(x=dct_class(one=1, two=20), y=dct_class(four=40, five=5)) + # merge b into a + assert merge_dicts(a, b) == dct_class( + x=dct_class(one=1, two=20), y=dct_class(three=3, four=40, five=5), z=0 + ) + + assert merge_dicts(b, a) == dct_class( + x=dct_class(one=1, two=2), y=dct_class(three=3, four=4, five=5), z=0 + ) + + +@pytest.mark.parametrize("dct_class", [dict, DotDict]) +def test_merge_nested_dicts_with_empty_section(dct_class): + a = dct_class(x=dct_class(one=1, two=2), y=dct_class(three=3, four=4)) + b = dct_class(x=dct_class(one=1, two=2), y=dct_class()) + # merge b into a + assert merge_dicts(a, b) == a + # merge a into b + assert merge_dicts(b, a) == a + + +def test_as_nested_dict_works_when_critical_keys_shadowed(): + x = dict(update=1, items=2) + y = as_nested_dict(x, DotDict) + assert y.update == 1 + assert y.items == 2 + + +def test_flatten_dict(nested_dict): + flat = collections.dict_to_flatdict(nested_dict) + assert flat == { + collections.CompoundKey([1]): 2, + collections.CompoundKey([2, 1]): 2, + collections.CompoundKey([2, 3]): 4, + collections.CompoundKey([3, 1]): 2, + collections.CompoundKey([3, 3, 4]): 5, + collections.CompoundKey([3, 3, 6, 7]): 8, + } + + +def test_restore_flattened_dict(nested_dict): + flat = collections.dict_to_flatdict(nested_dict) + restored = collections.flatdict_to_dict(flat) + assert restored == nested_dict + + +def test_call_flatdict_to_dict_on_normal_dict(nested_dict): + restored = collections.flatdict_to_dict({"a": "b"}) + assert restored == {"a": "b"} + + +def test_restore_flattened_dict_with_dict_class(): + nested_dict = DotDict(a=DotDict(x=1), b=DotDict(y=2)) + flat = collections.dict_to_flatdict(nested_dict) + restored = collections.flatdict_to_dict(flat) + assert isinstance(restored, dict) + + restored_dotdict = collections.flatdict_to_dict(flat, dct_class=DotDict) + assert isinstance(restored_dotdict, DotDict) + assert isinstance(restored_dotdict.a, DotDict) + assert restored_dotdict.a == nested_dict.a diff --git a/tests/utilities/test_configuration.py b/tests/utilities/test_configuration.py new file mode 100644 index 0000000..834849b --- /dev/null +++ b/tests/utilities/test_configuration.py @@ -0,0 +1,32 @@ +from datetime import timedelta + +import pytest + +import faktory +from faktory.utilities.configuration import set_temporary_config + + +def test_set_temporary_config_is_temporary(): + # without this setting, the tasks will error because they have max_retries but no + # retry delay + with set_temporary_config({"worker.concurrency": 3}): + with set_temporary_config({"worker.concurrency": 5}): + with set_temporary_config({"worker.concurrency": 1}): + assert faktory.config.worker.concurrency == 1 + + assert faktory.config.worker.concurrency == 5 + assert faktory.config.worker.concurrency == 3 + + +def test_set_temporary_config_can_invent_new_settings(): + with set_temporary_config({"worker.nested.nested_again.val": "5"}): + assert faktory.config.worker.nested.nested_again.val == "5" + + with pytest.raises(AttributeError): + assert faktory.config.worker.nested.nested_again.val == "5" + + +def test_set_temporary_config_with_multiple_keys(): + with set_temporary_config({"x.y.z": 1, "a.b.c": 2}): + assert faktory.config.x.y.z == 1 + assert faktory.config.a.b.c == 2