diff --git a/packages/smithy-core/.changes/next-release/smithy-core-breaking-20251106184528.json b/packages/smithy-core/.changes/next-release/smithy-core-breaking-20251106184528.json new file mode 100644 index 000000000..6fa68f5d8 --- /dev/null +++ b/packages/smithy-core/.changes/next-release/smithy-core-breaking-20251106184528.json @@ -0,0 +1,4 @@ +{ + "type": "feature", + "description": "Added support for `standard` retry mode." +} \ No newline at end of file diff --git a/packages/smithy-core/src/smithy_core/aio/client.py b/packages/smithy-core/src/smithy_core/aio/client.py index bf27c440c..01357cd65 100644 --- a/packages/smithy-core/src/smithy_core/aio/client.py +++ b/packages/smithy-core/src/smithy_core/aio/client.py @@ -349,7 +349,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape]( if isinstance(output_context.response, Exception): try: - retry_strategy.refresh_retry_token_for_retry( + retry_token = retry_strategy.refresh_retry_token_for_retry( token_to_renew=retry_token, error=output_context.response, ) diff --git a/packages/smithy-core/src/smithy_core/interfaces/retries.py b/packages/smithy-core/src/smithy_core/interfaces/retries.py index a5c9d428b..f100dca32 100644 --- a/packages/smithy-core/src/smithy_core/interfaces/retries.py +++ b/packages/smithy-core/src/smithy_core/interfaces/retries.py @@ -64,7 +64,7 @@ class RetryStrategy(Protocol): def acquire_initial_retry_token( self, *, token_scope: str | None = None ) -> RetryToken: - """Called before any retries (for the first attempt at the operation). + """Create a base retry token for the start of a request. :param token_scope: An arbitrary string accepted by the retry strategy to separate tokens into scopes. diff --git a/packages/smithy-core/src/smithy_core/retries.py b/packages/smithy-core/src/smithy_core/retries.py index 06bf6f988..b56cc87c2 100644 --- a/packages/smithy-core/src/smithy_core/retries.py +++ b/packages/smithy-core/src/smithy_core/retries.py @@ -1,6 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import random +import threading from collections.abc import Callable from dataclasses import dataclass from enum import Enum @@ -207,7 +208,7 @@ def __init__( def acquire_initial_retry_token( self, *, token_scope: str | None = None ) -> SimpleRetryToken: - """Called before any retries (for the first attempt at the operation). + """Create a base retry token for the start of a request. :param token_scope: This argument is ignored by this retry strategy. """ @@ -242,3 +243,174 @@ def refresh_retry_token_for_retry( def record_success(self, *, token: retries_interface.RetryToken) -> None: """Not used by this retry strategy.""" + + +class StandardRetryQuota: + """Retry quota used by :py:class:`StandardRetryStrategy`.""" + + INITIAL_RETRY_TOKENS: int = 500 + RETRY_COST: int = 5 + NO_RETRY_INCREMENT: int = 1 + TIMEOUT_RETRY_COST: int = 10 + + def __init__(self, initial_capacity: int = INITIAL_RETRY_TOKENS): + """Initialize retry quota with configurable capacity. + + :param initial_capacity: The initial and maximum capacity for the retry quota. + """ + self._max_capacity = initial_capacity + self._available_capacity = initial_capacity + self._lock = threading.Lock() + + def acquire(self, *, error: Exception) -> int: + """Attempt to acquire capacity for a retry attempt. + + If there's insufficient capacity available, raise an exception. + Otherwise, return the amount of capacity successfully allocated. + """ + capacity_amount = self.RETRY_COST + + with self._lock: + if capacity_amount > self._available_capacity: + raise RetryError("Retry quota exceeded") + self._available_capacity -= capacity_amount + return capacity_amount + + def release(self, *, release_amount: int) -> None: + """Release capacity back to the retry quota. + + The capacity being released will be truncated if necessary to ensure the max + capacity is never exceeded. + """ + increment = self.NO_RETRY_INCREMENT if release_amount == 0 else release_amount + + if self._available_capacity == self._max_capacity: + return + + with self._lock: + self._available_capacity = min( + self._available_capacity + increment, self._max_capacity + ) + + @property + def available_capacity(self) -> int: + """Return the amount of capacity available.""" + return self._available_capacity + + +@dataclass(kw_only=True) +class StandardRetryToken: + retry_count: int + """Retry count is the total number of attempts minus the initial attempt.""" + + retry_delay: float + """Delay in seconds to wait before the retry attempt.""" + + quota_consumed: int = 0 + """The total amount of quota consumed.""" + + last_quota_acquired: int = 0 + """The amount of last quota acquired.""" + + +class StandardRetryStrategy(retries_interface.RetryStrategy): + def __init__( + self, + *, + backoff_strategy: retries_interface.RetryBackoffStrategy | None = None, + max_attempts: int = 3, + retry_quota: StandardRetryQuota | None = None, + ): + """Standard retry strategy using truncated binary exponential backoff with full + jitter. + + :param backoff_strategy: The backoff strategy used by returned tokens to compute + the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`. + + :param max_attempts: Upper limit on total number of attempts made, including + initial attempt and retries. + + :param retry_quota: The retry quota to use for managing retry capacity. Defaults + to a new :py:class:`StandardRetryQuota` instance. + """ + if max_attempts < 0: + raise ValueError( + f"max_attempts must be a non-negative integer, got {max_attempts}" + ) + + self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy( + backoff_scale_value=1, + max_backoff=20, + jitter_type=ExponentialBackoffJitterType.FULL, + ) + self.max_attempts = max_attempts + self._retry_quota = retry_quota or StandardRetryQuota() + + def acquire_initial_retry_token( + self, *, token_scope: str | None = None + ) -> StandardRetryToken: + """Create a base retry token for the start of a request. + + :param token_scope: This argument is ignored by this retry strategy. + """ + retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) + return StandardRetryToken(retry_count=0, retry_delay=retry_delay) + + def refresh_retry_token_for_retry( + self, + *, + token_to_renew: retries_interface.RetryToken, + error: Exception, + ) -> StandardRetryToken: + """Replace an existing retry token from a failed attempt with a new token. + + This retry strategy always returns a token until the attempt count stored in + the new token exceeds the ``max_attempts`` value. + + :param token_to_renew: The token used for the previous failed attempt. + :param error: The error that triggered the need for a retry. + :raises RetryError: If no further retry attempts are allowed. + """ + if not isinstance(token_to_renew, StandardRetryToken): + raise TypeError( + f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}" + ) + + if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe: + retry_count = token_to_renew.retry_count + 1 + if retry_count >= self.max_attempts: + raise RetryError( + f"Reached maximum number of allowed attempts: {self.max_attempts}" + ) from error + + # Acquire additional quota for this retry attempt + # (may raise a RetryError if none is available) + quota_acquired = self._retry_quota.acquire(error=error) + total_quota: int = token_to_renew.quota_consumed + quota_acquired + + if error.retry_after is not None: + retry_delay = error.retry_after + else: + retry_delay = self.backoff_strategy.compute_next_backoff_delay( + retry_count + ) + + return StandardRetryToken( + retry_count=retry_count, + retry_delay=retry_delay, + quota_consumed=total_quota, + last_quota_acquired=quota_acquired, + ) + else: + raise RetryError(f"Error is not retryable: {error}") from error + + def record_success(self, *, token: retries_interface.RetryToken) -> None: + """Release retry quota back based on the amount consumed by the last retry. + + :param token: The token used for the previous successful attempt. + """ + if not isinstance(token, StandardRetryToken): + raise TypeError( + f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}" + ) + self._retry_quota.release(release_amount=token.last_quota_acquired) diff --git a/packages/smithy-core/tests/unit/test_retries.py b/packages/smithy-core/tests/unit/test_retries.py index 0b3c23be4..63b257050 100644 --- a/packages/smithy-core/tests/unit/test_retries.py +++ b/packages/smithy-core/tests/unit/test_retries.py @@ -4,7 +4,12 @@ import pytest from smithy_core.exceptions import CallError, RetryError from smithy_core.retries import ExponentialBackoffJitterType as EBJT -from smithy_core.retries import ExponentialRetryBackoffStrategy, SimpleRetryStrategy +from smithy_core.retries import ( + ExponentialRetryBackoffStrategy, + SimpleRetryStrategy, + StandardRetryQuota, + StandardRetryStrategy, +) @pytest.mark.parametrize( @@ -100,3 +105,119 @@ def test_simple_retry_does_not_retry_unsafe() -> None: token = strategy.acquire_initial_retry_token() with pytest.raises(RetryError): strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + + +@pytest.mark.parametrize("max_attempts", [2, 3, 10]) +def test_standard_retry_strategy(max_attempts: int) -> None: + strategy = StandardRetryStrategy(max_attempts=max_attempts) + error = CallError(is_retry_safe=True) + token = strategy.acquire_initial_retry_token() + for _ in range(max_attempts - 1): + token = strategy.refresh_retry_token_for_retry( + token_to_renew=token, error=error + ) + with pytest.raises(RetryError): + strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + + +@pytest.mark.parametrize( + "error", + [ + Exception(), + CallError(is_retry_safe=None), + CallError(fault="client", is_retry_safe=False), + ], + ids=[ + "unclassified_error", + "safety_unknown_error", + "unsafe_error", + ], +) +def test_standard_retry_does_not_retry(error: Exception | CallError) -> None: + strategy = StandardRetryStrategy() + token = strategy.acquire_initial_retry_token() + with pytest.raises(RetryError): + strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + + +def test_standard_retry_after_overrides_backoff() -> None: + strategy = StandardRetryStrategy() + error = CallError(is_retry_safe=True, retry_after=5.5) + token = strategy.acquire_initial_retry_token() + token = strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + assert token.retry_delay == 5.5 + + +def test_standard_retry_quota_consumed_accumulates() -> None: + strategy = StandardRetryStrategy() + error = CallError(is_retry_safe=True) + token = strategy.acquire_initial_retry_token() + + token = strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + first_consumed = token.quota_consumed + assert first_consumed == StandardRetryQuota.RETRY_COST + + token = strategy.refresh_retry_token_for_retry(token_to_renew=token, error=error) + assert token.quota_consumed == first_consumed + StandardRetryQuota.RETRY_COST + + +def test_standard_retry_invalid_max_attempts() -> None: + with pytest.raises(ValueError): + StandardRetryStrategy(max_attempts=-1) + + +@pytest.fixture +def retry_quota() -> StandardRetryQuota: + return StandardRetryQuota(initial_capacity=10) + + +def test_retry_quota_initial_state( + retry_quota: StandardRetryQuota, +) -> None: + assert retry_quota.available_capacity == 10 + + +def test_retry_quota_acquire_success( + retry_quota: StandardRetryQuota, +) -> None: + acquired = retry_quota.acquire(error=Exception()) + assert retry_quota.available_capacity == 10 - acquired + + +def test_retry_quota_acquire_when_exhausted( + retry_quota: StandardRetryQuota, +) -> None: + # Drain capacity until insufficient for next acquire + retry_quota.acquire(error=Exception()) + retry_quota.acquire(error=Exception()) + + # Not enough capacity for another retry (need 5, only 0 left) + with pytest.raises(RetryError, match="Retry quota exceeded"): + retry_quota.acquire(error=Exception()) + + +def test_retry_quota_release_restores_capacity( + retry_quota: StandardRetryQuota, +) -> None: + acquired = retry_quota.acquire(error=Exception()) + retry_quota.release(release_amount=acquired) + assert retry_quota.available_capacity == 10 + + +def test_retry_quota_release_zero_adds_increment( + retry_quota: StandardRetryQuota, +) -> None: + retry_quota.acquire(error=Exception()) + assert retry_quota.available_capacity == 5 + retry_quota.release(release_amount=0) + assert retry_quota.available_capacity == 6 + + +def test_retry_quota_release_caps_at_max( + retry_quota: StandardRetryQuota, +) -> None: + # Drain some capacity + retry_quota.acquire(error=Exception()) + # Release more than we acquired. Should cap at initial capacity. + retry_quota.release(release_amount=50) + assert retry_quota.available_capacity == 10