Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ There are three ways in which you can authorize the `SecretServer` and `SecretSe

#### Password Authorization

If using traditional `username` and `password` authentication to log in to your Secret Server either directly or through Platform, you can pass the `PasswordGrantAuthorizer` into the `SecretServer` class at instantiation. The `PasswordGrantAuthorizer` requires a `base_url`, `username`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token` or `/identity/api/oauth2/token/xpmplatform`, depending on whether a secret server or platform is used for authentication.
If using traditional `username` and `password` authentication to log in to your Secret Server either directly or through Platform, you can pass the `PasswordGrantAuthorizer` into the `SecretServer` class at instantiation. The `PasswordGrantAuthorizer` requires a `base_url`, `username`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token` or `/identity/api/oauth2/token/xpmplatform`, depending on whether a secret server or platform is used for authentication. It also optionally takes a `server_type` (`"secret_server"` or `"platform"`) to skip automatic server-type detection — see [Server-Type Detection](#server-type-detection).

##### With Secret Server
```python
Expand All @@ -50,7 +50,7 @@ authorizer = PasswordGrantAuthorizer("https://platform.delinea.app", os.getenv("

#### Domain Authorization

To use a domain credential, use the `DomainPasswordGrantAuthorizer`. It requires a `base_url`, `username`, `domain`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`. It is applicable only when authentication is done using a secret server.
To use a domain credential, use the `DomainPasswordGrantAuthorizer`. It requires a `base_url`, `username`, `domain`, and `password`. It optionally takes a `token_path_uri`, but defaults to `/oauth2/token`, and a `server_type` (see [Server-Type Detection](#server-type-detection)). It is applicable only when authentication is done using a secret server.

```python
from delinea.secrets.server import DomainPasswordGrantAuthorizer
Expand All @@ -60,7 +60,7 @@ authorizer = DomainPasswordGrantAuthorizer("https://hostname/SecretServer", os.g

#### Access Token Authorization

If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. The `AccessTokenAuthorizer` requires a `access_token` and `base_url`.
If you already have an `access_token` of Secret Server or Platform user, you can pass directly via the `AccessTokenAuthorizer`. The `AccessTokenAuthorizer` requires a `access_token` and `base_url`. It optionally takes a `server_type` (see [Server-Type Detection](#server-type-detection)).

##### With Secret Server
```python
Expand All @@ -77,6 +77,25 @@ from delinea.secrets.server import AccessTokenAuthorizer
authorizer = AccessTokenAuthorizer("AgJ1slfZsEng9bKsssB-tic0Kh8I...", "https://platform.delinea.app")
```

#### Server-Type Detection

By default every authorizer automatically detects whether the `base_url` points at a Secret Server or a Platform instance by probing its health-check endpoints (`/api/v1/healthcheck` then `/health`). The result is cached per `base_url` for the lifetime of the process, so the probe pair fires only once per `base_url`.

You can skip detection entirely by passing an explicit `server_type` of either `"secret_server"` or `"platform"`. When supplied, no health-check probe is issued. This is recommended for callers that run each lookup in a fresh, short-lived process (for example, some Ansible lookup-plugin runtimes), where a fresh process cannot benefit from the in-process cache and the repeated unauthenticated probes can be rate-limited to `403` by the Delinea Platform WAF.

```python
from delinea.secrets.server import AccessTokenAuthorizer

# No health-check probe is issued; the type is used directly.
authorizer = AccessTokenAuthorizer(
"AgJ1slfZsEng9bKsssB-tic0Kh8I...",
"https://platform.delinea.app",
server_type="platform",
)
```

An explicit `server_type` applies only to the instance that supplies it and is never written to the shared cache, so it cannot affect auto-detection for other authorizers. If a `base_url` is ever re-provisioned to a different server type while a long-lived process is running, call `Authorizer.clear_server_type_cache()` to force re-detection.

## Secret Server Cloud

The SDK API requires an `Authorizer` and either a `tenant` or a `base_url`. In the case of plaform authentication, only a `base_url` is supported.
Expand Down
172 changes: 157 additions & 15 deletions delinea/secrets/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
import json
import re
from abc import ABC, abstractmethod
from collections import OrderedDict
from dataclasses import dataclass
from datetime import datetime, timedelta
from threading import Lock

import requests

Expand Down Expand Up @@ -164,6 +166,78 @@ class SecretServerServiceError(SecretServerError):
class Authorizer(ABC):
"""Main abstract base class for all Authorizer access methods."""

# Accepted values for an explicit ``server_type`` override and for cached
# detections.
VALID_SERVER_TYPES = ("secret_server", "platform")

# Process-scoped, bounded LRU cache mapping a normalized base_url to its
# detected server type ("secret_server" | "platform"). Shared across all
# Authorizer subclasses so the health-check probe pair fires once per
# base_url per process. Bounded to ``_SERVER_TYPE_CACHE_MAXSIZE`` entries so
# a long-lived process that constructs authorizers against many distinct
# URLs cannot grow it without bound; the least-recently-used entry is
# evicted on overflow. Guarded by ``_server_type_cache_lock``.
#
# NOTE: This cache is process-scoped. It deduplicates probes only within a
# single Python process. Callers that run each lookup in a fresh process
# (e.g. some Ansible lookup-plugin runtimes) start with an empty cache and
# will re-probe. To eliminate the probe entirely in that case, pass an
# explicit ``server_type`` to the authorizer (see ``_perform_server_detection``).
_SERVER_TYPE_CACHE_MAXSIZE = 128
_server_type_cache = OrderedDict()
_server_type_cache_lock = Lock()

@classmethod
def _normalize_server_type(cls, server_type):
"""Validate and normalize an explicit ``server_type`` value.

:raise :class:`SecretServerError` when ``server_type`` is not one of
``VALID_SERVER_TYPES``.
"""
normalized = str(server_type).strip().lower()
if normalized not in cls.VALID_SERVER_TYPES:
raise SecretServerError(
f"Invalid server_type {server_type!r}; expected one of "
f"{cls.VALID_SERVER_TYPES}."
)
return normalized

@classmethod
def _get_cached_server_type(cls, key):
"""Return the cached server type for ``key`` (marking it most-recently
used) or ``None`` if absent."""
with Authorizer._server_type_cache_lock:
if key in Authorizer._server_type_cache:
Authorizer._server_type_cache.move_to_end(key)
return Authorizer._server_type_cache[key]
return None

@classmethod
def _cache_server_type(cls, key, server_type):
"""Cache ``server_type`` for ``key``, evicting the least-recently-used
entry if the cache is over capacity."""
with Authorizer._server_type_cache_lock:
Authorizer._server_type_cache[key] = server_type
Authorizer._server_type_cache.move_to_end(key)
while len(Authorizer._server_type_cache) > cls._SERVER_TYPE_CACHE_MAXSIZE:
Authorizer._server_type_cache.popitem(last=False)

@classmethod
def clear_server_type_cache(cls):
"""Clear the process-scoped server-detection cache.

Detection results are cached for the lifetime of the process with no
TTL, because a server's type at a given ``base_url`` is effectively
immutable in practice. Use this escape hatch to force re-detection if a
``base_url`` is ever re-provisioned to a different server type while a
long-lived process is running.
"""
with Authorizer._server_type_cache_lock:
Authorizer._server_type_cache.clear()

# Backwards-compatible alias retained for existing callers/tests.
_clear_server_type_cache = clear_server_type_cache

@staticmethod
def add_bearer_token_authorization_header(bearer_token, existing_headers={}):
"""Adds an HTTP `Authorization` header containing the `Bearer` token
Expand All @@ -179,20 +253,61 @@ def add_bearer_token_authorization_header(bearer_token, existing_headers={}):
**existing_headers,
}

def _perform_server_detection(self, base_url):
"""Detects if the server is Secret Server or Platform by health check endpoints."""
secret_server_endpoint = base_url.rstrip("/") + "/api/v1/healthcheck"
platform_endpoint = base_url.rstrip("/") + "/health"
def _perform_server_detection(self, base_url, server_type=None):
"""Resolve whether the server is Secret Server or Platform.

When an explicit ``server_type`` is supplied the value is validated
and used directly for THIS instance only -- NO health-check probe is
issued. This is the recommended path for callers that run each lookup
in a fresh process (e.g. some Ansible lookup-plugin runtimes) where the
process-scoped cache cannot help: skipping detection eliminates the
unauthenticated ``/api/v1/healthcheck`` + ``/health`` probe burst that
the Delinea Platform WAF rate-limits to 403.

An explicit override is deliberately NOT written to the shared
process-scoped cache: the override is unverified, so seeding the cache
would let a wrong/typo'd value silently poison auto-detection for
unrelated callers using the same ``base_url`` in the same process. Only
verified probe detections populate the shared cache.

Otherwise the type is detected via the health-check endpoints, using a
process-scoped cache. The detected type is cached per normalized
``base_url`` on the ``Authorizer`` base class and shared across all
subclasses, so the probe pair fires only once per ``base_url`` per
process. The cache is read/written under ``_server_type_cache_lock``
for thread safety, but the network probe itself runs OUTSIDE the lock;
detection is idempotent, so a rare double-probe under a race is
harmless. Only successful detections are cached -- failures re-probe on
the next construction.

On every path the per-instance ``_server_type`` attribute is set,
because callers (``SecretServer.ensure_vault_url`` and
``PasswordGrantAuthorizer._refresh``) read ``self._server_type``.
"""
key = base_url.rstrip("/")

if self._validate_health_endpoint(secret_server_endpoint):
self._server_type = "secret_server"
if server_type is not None:
# Per-instance only; intentionally NOT seeded into the shared cache
# so an unverified override cannot poison auto-detection for others.
self._server_type = self._normalize_server_type(server_type)
return
if self._validate_health_endpoint(platform_endpoint):
self._server_type = "platform"

cached = self._get_cached_server_type(key)
if cached is not None:
self._server_type = cached
return
raise SecretServerError(
"Unable to detect server type via health check endpoints."
)

if self._validate_health_endpoint(key + "/api/v1/healthcheck"):
detected = "secret_server"
elif self._validate_health_endpoint(key + "/health"):
detected = "platform"
else:
raise SecretServerError(
"Unable to detect server type via health check endpoints."
)

self._server_type = detected
self._cache_server_type(key, detected)

def _validate_health_endpoint(self, url):
"""Validates if an endpoint returns healthy status."""
Expand Down Expand Up @@ -231,10 +346,14 @@ class AccessTokenAuthorizer(Authorizer):
def get_access_token(self):
return self.access_token

def __init__(self, access_token, base_url):
def __init__(self, access_token, base_url, server_type=None):
"""
:param server_type: optionally ``"secret_server"`` or ``"platform"`` to
skip health-check detection entirely (no probe is issued).
"""
self.access_token = access_token
self.base_url = base_url.rstrip("/")
self._perform_server_detection(self.base_url)
self._perform_server_detection(self.base_url, server_type=server_type)


class PasswordGrantAuthorizer(Authorizer):
Expand Down Expand Up @@ -316,14 +435,31 @@ def _refresh(self, seconds_of_drift=300):
else:
raise SecretServerError("Unknown server type for token request.")

def __init__(self, base_url, username, password, token_path_uri=None, domain=None):
def __init__(
self,
base_url,
username,
password,
token_path_uri=None,
domain=None,
server_type=None,
):
"""
:param server_type: optionally ``"secret_server"`` or ``"platform"`` to
skip health-check detection entirely (no probe is issued); the
matching token endpoint is selected without probing.
"""
self.base_url = base_url.rstrip("/")
self.username = username
self.password = password
self.domain = domain
self.token_path_uri = token_path_uri # May be None, will decide in _refresh
self.token_url = None
self.grant_request = None
# When an explicit type is given, resolve it now (no network) so the
# lazy detection in _refresh is skipped and no probe is ever issued.
if server_type is not None:
self._perform_server_detection(self.base_url, server_type=server_type)

def get_access_token(self):
self._refresh()
Expand All @@ -340,9 +476,15 @@ def __init__(
domain,
password,
token_path_uri=None,
server_type=None,
):
super().__init__(
base_url, username, password, token_path_uri=token_path_uri, domain=domain
base_url,
username,
password,
token_path_uri=token_path_uri,
domain=domain,
server_type=server_type,
)


Expand Down
6 changes: 2 additions & 4 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
try:
secret = secret_server_cloud.get_secret(os.getenv("TSS_SECRET_ID"))
serverSecret = ServerSecret(**secret)
print(
f"""username: {serverSecret.fields['username'].value}
print(f"""username: {serverSecret.fields['username'].value}
password: {serverSecret.fields['password'].value}
template: {serverSecret.secret_template_name}"""
)
template: {serverSecret.secret_template_name}""")
except SecretServerError as error:
print(error.response.text)
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
requests==2.32.4
requests==2.34.2 # pinned to address CVE-2026-25645 (2.33.0 was never published)
tox
pytest
python-dotenv
flit
black
urllib3==2.6.3 # not directly required, pinned by Snyk to avoid a vulnerability
urllib3==2.7.0 # not directly required, pinned by Snyk to avoid a vulnerability
zipp==3.23.0 # not directly required, pinned by Snyk to avoid a vulnerability
Loading