diff --git a/README.md b/README.md index a64c152..d8af401 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ There are three ways in which you can authorize the `SecretServer` and `SecretSe #### Password Authorization -If using traditional `username` and `password` authentication to log in to your Secret Server either directly or through Platform, you can pass the `PasswordGrantAuthorizer` into the `SecretServer` class at instantiation. The `PasswordGrantAuthorizer` requires a `base_url`, `username`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token` or `/identity/api/oauth2/token/xpmplatform`, depending on whether a secret server or platform is used for authentication. +If using traditional `username` and `password` authentication to log in to your Secret Server either directly or through Platform, you can pass the `PasswordGrantAuthorizer` into the `SecretServer` class at instantiation. The `PasswordGrantAuthorizer` requires a `base_url`, `username`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token` or `/identity/api/oauth2/token/xpmplatform`, depending on whether a secret server or platform is used for authentication. It also optionally takes a `server_type` (`"secret_server"` or `"platform"`) to skip automatic server-type detection — see [Server-Type Detection](#server-type-detection). ##### With Secret Server ```python @@ -50,7 +50,7 @@ authorizer = PasswordGrantAuthorizer("https://platform.delinea.app", os.getenv(" #### Domain Authorization -To use a domain credential, use the `DomainPasswordGrantAuthorizer`. It requires a `base_url`, `username`, `domain`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`. It is applicable only when authentication is done using a secret server. +To use a domain credential, use the `DomainPasswordGrantAuthorizer`. It requires a `base_url`, `username`, `domain`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`, and a `server_type` (see [Server-Type Detection](#server-type-detection)). It is applicable only when authentication is done using a secret server. ```python from delinea.secrets.server import DomainPasswordGrantAuthorizer @@ -60,7 +60,7 @@ authorizer = DomainPasswordGrantAuthorizer("https://hostname/SecretServer", os.g #### Access Token Authorization -If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. The `AccessTokenAuthorizer` requires a `access_token` and `base_url`. +If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. The `AccessTokenAuthorizer` requires a `access_token` and `base_url`. It optionally takes a `server_type` (see [Server-Type Detection](#server-type-detection)). ##### With Secret Server ```python @@ -77,6 +77,25 @@ from delinea.secrets.server import AccessTokenAuthorizer authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://platform.delinea.app") ``` +#### Server-Type Detection + +By default every authorizer automatically detects whether the `base_url` points at a Secret Server or a Platform instance by probing its health-check endpoints (`/api/v1/healthcheck` then `/health`). The result is cached per `base_url` for the lifetime of the process, so the probe pair fires only once per `base_url`. + +You can skip detection entirely by passing an explicit `server_type` of either `"secret_server"` or `"platform"`. When supplied, no health-check probe is issued. This is recommended for callers that run each lookup in a fresh, short-lived process (for example, some Ansible lookup-plugin runtimes), where a fresh process cannot benefit from the in-process cache and the repeated unauthenticated probes can be rate-limited to `403` by the Delinea Platform WAF. + +```python +from delinea.secrets.server import AccessTokenAuthorizer + +# No health-check probe is issued; the type is used directly. +authorizer = AccessTokenAuthorizer( + "AgJ1slfZsEng9bKsssB-tic0Kh8I...", + "https://platform.delinea.app", + server_type="platform", +) +``` + +An explicit `server_type` applies only to the instance that supplies it and is never written to the shared cache, so it cannot affect auto-detection for other authorizers. If a `base_url` is ever re-provisioned to a different server type while a long-lived process is running, call `Authorizer.clear_server_type_cache()` to force re-detection. + ## Secret Server Cloud The SDK API requires an `Authorizer` and either a `tenant` or a `base_url`. In the case of plaform authentication, only a `base_url` is supported. diff --git a/delinea/secrets/server.py b/delinea/secrets/server.py index 0f26d4a..d0b0deb 100644 --- a/delinea/secrets/server.py +++ b/delinea/secrets/server.py @@ -17,8 +17,10 @@ import json import re from abc import ABC, abstractmethod +from collections import OrderedDict from dataclasses import dataclass from datetime import datetime, timedelta +from threading import Lock import requests @@ -164,6 +166,78 @@ class SecretServerServiceError(SecretServerError): class Authorizer(ABC): """Main abstract base class for all Authorizer access methods.""" + # Accepted values for an explicit ``server_type`` override and for cached + # detections. + VALID_SERVER_TYPES = ("secret_server", "platform") + + # Process-scoped, bounded LRU cache mapping a normalized base_url to its + # detected server type ("secret_server" | "platform"). Shared across all + # Authorizer subclasses so the health-check probe pair fires once per + # base_url per process. Bounded to ``_SERVER_TYPE_CACHE_MAXSIZE`` entries so + # a long-lived process that constructs authorizers against many distinct + # URLs cannot grow it without bound; the least-recently-used entry is + # evicted on overflow. Guarded by ``_server_type_cache_lock``. + # + # NOTE: This cache is process-scoped. It deduplicates probes only within a + # single Python process. Callers that run each lookup in a fresh process + # (e.g. some Ansible lookup-plugin runtimes) start with an empty cache and + # will re-probe. To eliminate the probe entirely in that case, pass an + # explicit ``server_type`` to the authorizer (see ``_perform_server_detection``). + _SERVER_TYPE_CACHE_MAXSIZE = 128 + _server_type_cache = OrderedDict() + _server_type_cache_lock = Lock() + + @classmethod + def _normalize_server_type(cls, server_type): + """Validate and normalize an explicit ``server_type`` value. + + :raise :class:`SecretServerError` when ``server_type`` is not one of + ``VALID_SERVER_TYPES``. + """ + normalized = str(server_type).strip().lower() + if normalized not in cls.VALID_SERVER_TYPES: + raise SecretServerError( + f"Invalid server_type {server_type!r}; expected one of " + f"{cls.VALID_SERVER_TYPES}." + ) + return normalized + + @classmethod + def _get_cached_server_type(cls, key): + """Return the cached server type for ``key`` (marking it most-recently + used) or ``None`` if absent.""" + with Authorizer._server_type_cache_lock: + if key in Authorizer._server_type_cache: + Authorizer._server_type_cache.move_to_end(key) + return Authorizer._server_type_cache[key] + return None + + @classmethod + def _cache_server_type(cls, key, server_type): + """Cache ``server_type`` for ``key``, evicting the least-recently-used + entry if the cache is over capacity.""" + with Authorizer._server_type_cache_lock: + Authorizer._server_type_cache[key] = server_type + Authorizer._server_type_cache.move_to_end(key) + while len(Authorizer._server_type_cache) > cls._SERVER_TYPE_CACHE_MAXSIZE: + Authorizer._server_type_cache.popitem(last=False) + + @classmethod + def clear_server_type_cache(cls): + """Clear the process-scoped server-detection cache. + + Detection results are cached for the lifetime of the process with no + TTL, because a server's type at a given ``base_url`` is effectively + immutable in practice. Use this escape hatch to force re-detection if a + ``base_url`` is ever re-provisioned to a different server type while a + long-lived process is running. + """ + with Authorizer._server_type_cache_lock: + Authorizer._server_type_cache.clear() + + # Backwards-compatible alias retained for existing callers/tests. + _clear_server_type_cache = clear_server_type_cache + @staticmethod def add_bearer_token_authorization_header(bearer_token, existing_headers={}): """Adds an HTTP `Authorization` header containing the `Bearer` token @@ -179,20 +253,61 @@ def add_bearer_token_authorization_header(bearer_token, existing_headers={}): **existing_headers, } - def _perform_server_detection(self, base_url): - """Detects if the server is Secret Server or Platform by health check endpoints.""" - secret_server_endpoint = base_url.rstrip("/") + "/api/v1/healthcheck" - platform_endpoint = base_url.rstrip("/") + "/health" + def _perform_server_detection(self, base_url, server_type=None): + """Resolve whether the server is Secret Server or Platform. + + When an explicit ``server_type`` is supplied the value is validated + and used directly for THIS instance only -- NO health-check probe is + issued. This is the recommended path for callers that run each lookup + in a fresh process (e.g. some Ansible lookup-plugin runtimes) where the + process-scoped cache cannot help: skipping detection eliminates the + unauthenticated ``/api/v1/healthcheck`` + ``/health`` probe burst that + the Delinea Platform WAF rate-limits to 403. + + An explicit override is deliberately NOT written to the shared + process-scoped cache: the override is unverified, so seeding the cache + would let a wrong/typo'd value silently poison auto-detection for + unrelated callers using the same ``base_url`` in the same process. Only + verified probe detections populate the shared cache. + + Otherwise the type is detected via the health-check endpoints, using a + process-scoped cache. The detected type is cached per normalized + ``base_url`` on the ``Authorizer`` base class and shared across all + subclasses, so the probe pair fires only once per ``base_url`` per + process. The cache is read/written under ``_server_type_cache_lock`` + for thread safety, but the network probe itself runs OUTSIDE the lock; + detection is idempotent, so a rare double-probe under a race is + harmless. Only successful detections are cached -- failures re-probe on + the next construction. + + On every path the per-instance ``_server_type`` attribute is set, + because callers (``SecretServer.ensure_vault_url`` and + ``PasswordGrantAuthorizer._refresh``) read ``self._server_type``. + """ + key = base_url.rstrip("/") - if self._validate_health_endpoint(secret_server_endpoint): - self._server_type = "secret_server" + if server_type is not None: + # Per-instance only; intentionally NOT seeded into the shared cache + # so an unverified override cannot poison auto-detection for others. + self._server_type = self._normalize_server_type(server_type) return - if self._validate_health_endpoint(platform_endpoint): - self._server_type = "platform" + + cached = self._get_cached_server_type(key) + if cached is not None: + self._server_type = cached return - raise SecretServerError( - "Unable to detect server type via health check endpoints." - ) + + if self._validate_health_endpoint(key + "/api/v1/healthcheck"): + detected = "secret_server" + elif self._validate_health_endpoint(key + "/health"): + detected = "platform" + else: + raise SecretServerError( + "Unable to detect server type via health check endpoints." + ) + + self._server_type = detected + self._cache_server_type(key, detected) def _validate_health_endpoint(self, url): """Validates if an endpoint returns healthy status.""" @@ -231,10 +346,14 @@ class AccessTokenAuthorizer(Authorizer): def get_access_token(self): return self.access_token - def __init__(self, access_token, base_url): + def __init__(self, access_token, base_url, server_type=None): + """ + :param server_type: optionally ``"secret_server"`` or ``"platform"`` to + skip health-check detection entirely (no probe is issued). + """ self.access_token = access_token self.base_url = base_url.rstrip("/") - self._perform_server_detection(self.base_url) + self._perform_server_detection(self.base_url, server_type=server_type) class PasswordGrantAuthorizer(Authorizer): @@ -316,7 +435,20 @@ def _refresh(self, seconds_of_drift=300): else: raise SecretServerError("Unknown server type for token request.") - def __init__(self, base_url, username, password, token_path_uri=None, domain=None): + def __init__( + self, + base_url, + username, + password, + token_path_uri=None, + domain=None, + server_type=None, + ): + """ + :param server_type: optionally ``"secret_server"`` or ``"platform"`` to + skip health-check detection entirely (no probe is issued); the + matching token endpoint is selected without probing. + """ self.base_url = base_url.rstrip("/") self.username = username self.password = password @@ -324,6 +456,10 @@ def __init__(self, base_url, username, password, token_path_uri=None, domain=Non self.token_path_uri = token_path_uri # May be None, will decide in _refresh self.token_url = None self.grant_request = None + # When an explicit type is given, resolve it now (no network) so the + # lazy detection in _refresh is skipped and no probe is ever issued. + if server_type is not None: + self._perform_server_detection(self.base_url, server_type=server_type) def get_access_token(self): self._refresh() @@ -340,9 +476,15 @@ def __init__( domain, password, token_path_uri=None, + server_type=None, ): super().__init__( - base_url, username, password, token_path_uri=token_path_uri, domain=domain + base_url, + username, + password, + token_path_uri=token_path_uri, + domain=domain, + server_type=server_type, ) diff --git a/example.py b/example.py index d37d7cc..9e3da94 100644 --- a/example.py +++ b/example.py @@ -23,10 +23,8 @@ try: secret = secret_server_cloud.get_secret(os.getenv("TSS_SECRET_ID")) serverSecret = ServerSecret(**secret) - print( - f"""username: {serverSecret.fields['username'].value} + print(f"""username: {serverSecret.fields['username'].value} password: {serverSecret.fields['password'].value} - template: {serverSecret.secret_template_name}""" - ) + template: {serverSecret.secret_template_name}""") except SecretServerError as error: print(error.response.text) diff --git a/requirements.txt b/requirements.txt index 3f6a39b..46bae68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,8 +1,8 @@ -requests==2.32.4 +requests==2.34.2 # pinned to address CVE-2026-25645 (2.33.0 was never published) tox pytest python-dotenv flit black -urllib3==2.6.3 # not directly required, pinned by Snyk to avoid a vulnerability +urllib3==2.7.0 # not directly required, pinned by Snyk to avoid a vulnerability zipp==3.23.0 # not directly required, pinned by Snyk to avoid a vulnerability diff --git a/tests/test_server_detection_cache.py b/tests/test_server_detection_cache.py new file mode 100644 index 0000000..6c6555f --- /dev/null +++ b/tests/test_server_detection_cache.py @@ -0,0 +1,338 @@ +"""Offline unit tests for the process-scoped server-detection cache on the +``Authorizer`` base class. + +These tests are fully OFFLINE: the network is mocked by patching +``delinea.secrets.server.requests.get`` (the symbol the SDK actually calls +inside ``_validate_health_endpoint``). Unlike ``tests/test_server.py`` these +do NOT require live credentials. + +The cache is process-global, so each test clears it via the +``Authorizer._clear_server_type_cache()`` hook (see the autouse fixture). +""" + +import threading + +import pytest + +from delinea.secrets.server import ( + AccessTokenAuthorizer, + Authorizer, + PasswordGrantAuthorizer, + SecretServerError, +) + +SECRET_SERVER_HEALTH = "/api/v1/healthcheck" +PLATFORM_HEALTH = "/health" + + +class FakeResponse: + """Minimal stand-in for a ``requests.Response`` as consumed by + ``_validate_health_endpoint`` (reads ``.content`` and ``.json()``).""" + + def __init__(self, healthy): + self._healthy = healthy + self.content = b'{"Healthy": true}' if healthy else b"{}" + + def json(self): + return {"Healthy": self._healthy} + + +def make_probe_counter(healthy_endpoints): + """Return a (fake_get, counter) pair. + + ``fake_get`` replaces ``requests.get``. It returns a healthy + ``FakeResponse`` only when the requested URL ends with one of + ``healthy_endpoints`` (e.g. ``/health``); every other health probe gets an + unhealthy response. ``counter`` is a mutable dict tracking how many times + each health endpoint suffix was probed plus a total. + """ + + # "rounds" counts how many times a full detection probe sequence began, + # i.e. how many times the FIRST endpoint of the pair (the secret_server + # healthcheck) was hit. A platform detection issues two raw GETs per round + # (healthcheck=unhealthy, then health=healthy); a cache hit issues zero, so + # "rounds" is the meaningful "probe pair fired N times" metric. + counter = {"total": 0, "rounds": 0, SECRET_SERVER_HEALTH: 0, PLATFORM_HEALTH: 0} + + def fake_get(url, *args, **kwargs): + for suffix in (SECRET_SERVER_HEALTH, PLATFORM_HEALTH): + if url.endswith(suffix): + counter["total"] += 1 + counter[suffix] += 1 + if suffix == SECRET_SERVER_HEALTH: + counter["rounds"] += 1 + return FakeResponse(suffix in healthy_endpoints) + # Any other GET (e.g. vault lookups) is not a health probe. + return FakeResponse(False) + + return fake_get, counter + + +@pytest.fixture(autouse=True) +def clear_detection_cache(): + """The detection cache is process-global; clear before and after each test + so cached entries cannot leak between tests.""" + Authorizer._clear_server_type_cache() + yield + Authorizer._clear_server_type_cache() + + +# Behavior 1: repeated construction with the same base_url probes once total. +def test_repeated_construction_probes_once(monkeypatch): + base_url = "https://platform.example.com" + fake_get, counter = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + instances = [AccessTokenAuthorizer("tok", base_url) for _ in range(20)] + + assert all(inst._server_type == "platform" for inst in instances) + # The probe pair fires exactly once total across all 20 constructions. + assert counter["rounds"] == 1 + assert counter[PLATFORM_HEALTH] == 1 + assert counter[SECRET_SERVER_HEALTH] == 1 + + +# Behavior 2: cache is shared across different authorizer subclasses. +def test_cache_shared_across_subclasses(monkeypatch): + base_url = "https://platform.example.com" + fake_get, counter = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + AccessTokenAuthorizer("tok", base_url) + grant = PasswordGrantAuthorizer(base_url, "user", "pass") + try: + # Triggers lazy detection in _refresh; the grant POST will fail offline + # but we only care that detection used the cache. + grant.get_access_token() + except Exception: + pass + + assert grant._server_type == "platform" + # Detection probes fire once total across both authorizers. + assert counter["rounds"] == 1 + + +# Behavior 3: a cache hit still sets the per-instance _server_type attribute. +def test_cache_hit_sets_instance_attr(monkeypatch): + base_url = "https://platform.example.com" + fake_get, counter = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + AccessTokenAuthorizer("tok", base_url) # populates the cache + assert counter["rounds"] == 1 + probes_after_first = counter["total"] + + second = AccessTokenAuthorizer("tok", base_url) # cache hit, no new probe + assert second._server_type == "platform" + assert counter["rounds"] == 1 + assert counter["total"] == probes_after_first + + +# Behavior 4: two distinct base_urls get independent, correct cache entries. +def test_two_distinct_base_urls(monkeypatch): + ss_url = "https://secretserver.example.com" + platform_url = "https://platform.example.com" + + def fake_get(url, *args, **kwargs): + if url.startswith(ss_url) and url.endswith(SECRET_SERVER_HEALTH): + return FakeResponse(True) + if url.startswith(platform_url) and url.endswith(PLATFORM_HEALTH): + return FakeResponse(True) + return FakeResponse(False) + + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + ss_auth = AccessTokenAuthorizer("tok", ss_url) + platform_auth = AccessTokenAuthorizer("tok", platform_url) + + assert ss_auth._server_type == "secret_server" + assert platform_auth._server_type == "platform" + + cache = Authorizer._server_type_cache + assert cache[ss_url] == "secret_server" + assert cache[platform_url] == "platform" + assert len(cache) == 2 + + +# Behavior 5: detection failure is NOT cached; a later healthy probe succeeds. +def test_failure_is_not_cached(monkeypatch): + base_url = "https://unknown.example.com" + + # First: both probes unhealthy -> detection raises. + unhealthy_get, _ = make_probe_counter(set()) + monkeypatch.setattr("delinea.secrets.server.requests.get", unhealthy_get) + with pytest.raises(SecretServerError): + AccessTokenAuthorizer("tok", base_url) + + assert base_url not in Authorizer._server_type_cache + + # Then: probes become healthy -> re-probe succeeds (failure was not cached). + healthy_get, counter = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", healthy_get) + instance = AccessTokenAuthorizer("tok", base_url) + + assert instance._server_type == "platform" + assert counter["total"] >= 1 + + +# Behavior 6: concurrent construction is thread-safe and probes few times. +def test_concurrent_construction_thread_safe(monkeypatch): + base_url = "https://platform.example.com" + fake_get, counter = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + results = [] + errors = [] + start = threading.Event() + + def worker(): + start.wait() + try: + inst = AccessTokenAuthorizer("tok", base_url) + results.append(inst._server_type) + except Exception as exc: # pragma: no cover - failure path + errors.append(exc) + + threads = [threading.Thread(target=worker) for _ in range(20)] + for t in threads: + t.start() + start.set() + for t in threads: + t.join() + + assert errors == [] + assert len(results) == 20 + assert all(r == "platform" for r in results) + # Probe count is a small constant: the probe pair fires at least once, and + # is bounded by the number of threads even under a detection race (commonly + # exactly 1). + assert counter["rounds"] >= 1 + assert counter["rounds"] <= 20 + + +# Behavior 7: an explicit server_type override skips detection entirely (no probe) +# and is per-instance only -- it must NOT seed the shared process cache. +@pytest.mark.parametrize("server_type", ["platform", "secret_server"]) +def test_explicit_server_type_skips_probe(monkeypatch, server_type): + base_url = "https://anything.example.com" + # Every health endpoint is unhealthy: if any probe fired, detection would + # raise. It must not, because the override bypasses probing. + fake_get, counter = make_probe_counter(set()) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + inst = AccessTokenAuthorizer("tok", base_url, server_type=server_type) + + assert inst._server_type == server_type + assert counter["total"] == 0 # zero probes -> no WAF burst + # The unverified override must NOT be written to the shared cache (otherwise + # it could poison auto-detection for other callers using the same base_url). + assert base_url not in Authorizer._server_type_cache + + +# Behavior 8: the override is normalized (case/whitespace-insensitive). +def test_explicit_server_type_is_normalized(monkeypatch): + fake_get, counter = make_probe_counter(set()) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + inst = AccessTokenAuthorizer( + "tok", "https://x.example.com", server_type=" Platform " + ) + + assert inst._server_type == "platform" + assert counter["total"] == 0 + + +# Behavior 9: an invalid override raises and issues no probe. +def test_invalid_server_type_raises(monkeypatch): + fake_get, counter = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + with pytest.raises(SecretServerError): + AccessTokenAuthorizer("tok", "https://x.example.com", server_type="bogus") + + assert counter["total"] == 0 + + +# Behavior 10: PasswordGrantAuthorizer with an override never probes in _refresh. +def test_password_grant_override_skips_detection(monkeypatch): + base_url = "https://platform.example.com" + fake_get, counter = make_probe_counter(set()) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + grant = PasswordGrantAuthorizer(base_url, "user", "pass", server_type="platform") + assert grant._server_type == "platform" + + try: + # The grant POST will fail offline, but detection must not have probed. + grant.get_access_token() + except Exception: + pass + + assert counter["total"] == 0 + # Platform token endpoint was selected without any health probe. + assert grant.token_path_uri == PasswordGrantAuthorizer.PLATFORM_TOKEN_PATH_URI + + +# Behavior 11: the cache is bounded; the least-recently-used entry is evicted. +def test_cache_is_bounded_lru(monkeypatch): + # Every base_url detects as platform (healthy /health) so each distinct URL + # seeds one verified cache entry. Only verified detections populate the + # shared cache, so the cache must be filled via detection (not overrides). + fake_get, _ = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + maxsize = Authorizer._SERVER_TYPE_CACHE_MAXSIZE + + # Fill exactly to capacity via auto-detection. + for i in range(maxsize): + AccessTokenAuthorizer("tok", f"https://host-{i}.example.com") + assert len(Authorizer._server_type_cache) == maxsize + + first_key = "https://host-0.example.com" + # Touch host-0 so it becomes most-recently-used and survives the next insert. + Authorizer._get_cached_server_type(first_key) + + # One more distinct URL overflows the cache by one entry. + AccessTokenAuthorizer("tok", "https://overflow.example.com") + + assert len(Authorizer._server_type_cache) == maxsize + assert first_key in Authorizer._server_type_cache # survived (recently used) + assert "https://host-1.example.com" not in Authorizer._server_type_cache # evicted + + +# Behavior 13: an unverified override must not poison auto-detection for a later +# caller that relies on probing for the same base_url. +def test_override_does_not_poison_autodetect(monkeypatch): + base_url = "https://platform.example.com" + # The server is really a platform (healthy /health); probing would detect it. + fake_get, counter = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + # First caller supplies a WRONG override and issues no probe. + poisoner = AccessTokenAuthorizer("tok", base_url, server_type="secret_server") + assert poisoner._server_type == "secret_server" + assert counter["total"] == 0 + assert base_url not in Authorizer._server_type_cache # not seeded + + # Second caller relies on auto-detection -> must probe and get the real type, + # NOT the poisoned override value. + detected = AccessTokenAuthorizer("tok", base_url) + assert detected._server_type == "platform" + assert counter["rounds"] == 1 # a real probe fired + assert Authorizer._server_type_cache[base_url] == "platform" + + +# Behavior 12: the public clear-cache method forces re-detection. +def test_public_clear_cache(monkeypatch): + base_url = "https://platform.example.com" + fake_get, counter = make_probe_counter({PLATFORM_HEALTH}) + monkeypatch.setattr("delinea.secrets.server.requests.get", fake_get) + + AccessTokenAuthorizer("tok", base_url) + assert counter["rounds"] == 1 + + Authorizer.clear_server_type_cache() + assert base_url not in Authorizer._server_type_cache + + AccessTokenAuthorizer("tok", base_url) # cache empty -> probes again + assert counter["rounds"] == 2