Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"type": "feature",
"description": "Added support for `standard` retry mode."
}
6 changes: 3 additions & 3 deletions packages/smithy-core/src/smithy_core/aio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](
return await self._handle_attempt(call, request_context, request_future)

retry_strategy = call.retry_strategy
retry_token = retry_strategy.acquire_initial_retry_token(
retry_token = await retry_strategy.acquire_initial_retry_token(
token_scope=call.retry_scope
)

Expand All @@ -349,7 +349,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](

if isinstance(output_context.response, Exception):
try:
retry_strategy.refresh_retry_token_for_retry(
retry_token = await retry_strategy.refresh_retry_token_for_retry(
token_to_renew=retry_token,
error=output_context.response,
)
Expand All @@ -364,7 +364,7 @@ async def _retry[I: SerializeableShape, O: DeserializeableShape](

await seek(request_context.transport_request.body, 0)
else:
retry_strategy.record_success(token=retry_token)
await retry_strategy.record_success(token=retry_token)
return output_context

async def _handle_attempt[I: SerializeableShape, O: DeserializeableShape](
Expand Down
6 changes: 3 additions & 3 deletions packages/smithy-core/src/smithy_core/interfaces/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class RetryStrategy(Protocol):
max_attempts: int
"""Upper limit on total attempt count (initial attempt plus retries)."""

def acquire_initial_retry_token(
async def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> RetryToken:
"""Called before any retries (for the first attempt at the operation).
Expand All @@ -74,7 +74,7 @@ def acquire_initial_retry_token(
"""
...

def refresh_retry_token_for_retry(
async def refresh_retry_token_for_retry(
self, *, token_to_renew: RetryToken, error: Exception
) -> RetryToken:
"""Replace an existing retry token from a failed attempt with a new token.
Expand All @@ -91,7 +91,7 @@ def refresh_retry_token_for_retry(
"""
...

def record_success(self, *, token: RetryToken) -> None:
async def record_success(self, *, token: RetryToken) -> None:
"""Return token after successful completion of an operation.

Upon successful completion of the operation, a user calls this function to
Expand Down
180 changes: 177 additions & 3 deletions packages/smithy-core/src/smithy_core/retries.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import asyncio
import random
from collections.abc import Callable
from dataclasses import dataclass
Expand Down Expand Up @@ -204,7 +205,7 @@ def __init__(
self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy()
self.max_attempts = max_attempts

def acquire_initial_retry_token(
async def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> SimpleRetryToken:
"""Called before any retries (for the first attempt at the operation).
Expand All @@ -214,7 +215,7 @@ def acquire_initial_retry_token(
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0)
return SimpleRetryToken(retry_count=0, retry_delay=retry_delay)

def refresh_retry_token_for_retry(
async def refresh_retry_token_for_retry(
self,
*,
token_to_renew: retries_interface.RetryToken,
Expand All @@ -240,5 +241,178 @@ def refresh_retry_token_for_retry(
else:
raise RetryError(f"Error is not retryable: {error}") from error

def record_success(self, *, token: retries_interface.RetryToken) -> None:
async def record_success(self, *, token: retries_interface.RetryToken) -> None:
"""Not used by this retry strategy."""


@dataclass(kw_only=True)
class StandardRetryToken:
retry_count: int
"""Retry count is the total number of attempts minus the initial attempt."""

retry_delay: float
"""Delay in seconds to wait before the retry attempt."""

quota_consumed: int = 0
"""The total amount of quota consumed."""

last_quota_acquired: int = 0
"""The amount of last quota acquired."""


class StandardRetryStrategy(retries_interface.RetryStrategy):
def __init__(self, *, max_attempts: int = 3):
"""Standard retry strategy using truncated binary exponential backoff with full
jitter.

:param max_attempts: Upper limit on total number of attempts made, including
initial attempt and retries.
"""
if max_attempts < 1:
raise ValueError(
f"max_attempts must be a positive integer, got {max_attempts}"
)

self.backoff_strategy = ExponentialRetryBackoffStrategy(
backoff_scale_value=1,
max_backoff=20,
jitter_type=ExponentialBackoffJitterType.FULL,
)
self.max_attempts = max_attempts
self._retry_quota = StandardRetryQuota()

async def acquire_initial_retry_token(
self, *, token_scope: str | None = None
) -> StandardRetryToken:
"""Called before any retries (for the first attempt at the operation).

:param token_scope: This argument is ignored by this retry strategy.
"""
retry_delay = self.backoff_strategy.compute_next_backoff_delay(0)
return StandardRetryToken(retry_count=0, retry_delay=retry_delay)

async def refresh_retry_token_for_retry(
self,
*,
token_to_renew: retries_interface.RetryToken,
error: Exception,
) -> StandardRetryToken:
"""Replace an existing retry token from a failed attempt with a new token.

This retry strategy always returns a token until the attempt count stored in
the new token exceeds the ``max_attempts`` value.

:param token_to_renew: The token used for the previous failed attempt.
:param error: The error that triggered the need for a retry.
:raises RetryError: If no further retry attempts are allowed.
"""
if not isinstance(token_to_renew, StandardRetryToken):
raise TypeError(
f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}"
)

if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe:
retry_count = token_to_renew.retry_count + 1
if retry_count >= self.max_attempts:
raise RetryError(
f"Reached maximum number of allowed attempts: {self.max_attempts}"
) from error

# Acquire additional quota for this retry attempt
# (may raise a RetryError if none is available)
quota_acquired = await self._retry_quota.acquire(error=error)
total_quota: int = token_to_renew.quota_consumed + quota_acquired

if error.retry_after is not None:
retry_delay = error.retry_after
else:
retry_delay = self.backoff_strategy.compute_next_backoff_delay(
retry_count
)

return StandardRetryToken(
retry_count=retry_count,
retry_delay=retry_delay,
quota_consumed=total_quota,
last_quota_acquired=quota_acquired,
)
else:
raise RetryError(f"Error is not retryable: {error}") from error

async def record_success(self, *, token: retries_interface.RetryToken) -> None:
"""Return token after successful completion of an operation.

Releases retry tokens back to the retry quota based on the previous amount
consumed.

:param token: The token used for the previous successful attempt.
"""
if not isinstance(token, StandardRetryToken):
raise TypeError(
f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}"
)
await self._retry_quota.release(release_amount=token.last_quota_acquired)


class StandardRetryQuota:
"""Retry quota used by :py:class:`StandardRetryStrategy`."""

INITIAL_RETRY_TOKENS: int = 500
RETRY_COST: int = 5
NO_RETRY_INCREMENT: int = 1
TIMEOUT_RETRY_COST: int = 10

def __init__(self):
self._max_capacity = self.INITIAL_RETRY_TOKENS
self._available_capacity = self.INITIAL_RETRY_TOKENS
self._lock = asyncio.Lock()

async def acquire(self, *, error: Exception) -> int:
"""Attempt to acquire a certain amount of capacity.

If there's no sufficient amount of capacity available, raise an exception.
Otherwise, we return the amount of capacity successfully allocated.
"""
# TODO: update `is_timeout` when `is_timeout_error` is implemented
is_timeout = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have is_timeout_error done, should we merge that before we add this? We should try to avoid adding non-functioning code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chatted with Sam about this and he mentioned he would update this TODO in the timeout error PR #590 since we're merging to a feature branch. Should I remove the timeout logic entirely for now and let him add it, or leave the TODO as-is since it'll be addressed soon?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed timeout TODO in 21cde20.

capacity_amount = self.TIMEOUT_RETRY_COST if is_timeout else self.RETRY_COST

async with self._lock:
if capacity_amount > self._available_capacity:
raise RetryError("Retry quota exceeded")
self._available_capacity -= capacity_amount
return capacity_amount

async def release(self, *, release_amount: int) -> None:
"""Release capacity back to the retry quota.

The capacity being released will be truncated if necessary to ensure the max
capacity is never exceeded.
"""
increment = self.NO_RETRY_INCREMENT if release_amount == 0 else release_amount

if self._available_capacity == self._max_capacity:
return

async with self._lock:
self._available_capacity = min(
self._available_capacity + increment, self._max_capacity
)

@property
def available_capacity(self) -> int:
"""Return the amount of capacity available."""
return self._available_capacity


class RetryStrategyMode(Enum):
"""Enumeration of available retry strategies."""

SIMPLE = "simple"
STANDARD = "standard"


RETRY_MODE_MAP = {
RetryStrategyMode.SIMPLE: SimpleRetryStrategy,
RetryStrategyMode.STANDARD: StandardRetryStrategy,
}
Loading