|
1 | 1 | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. |
2 | 2 | # SPDX-License-Identifier: Apache-2.0 |
3 | 3 | import random |
| 4 | +import threading |
4 | 5 | from collections.abc import Callable |
5 | 6 | from dataclasses import dataclass |
6 | 7 | from enum import Enum |
@@ -207,7 +208,7 @@ def __init__( |
207 | 208 | def acquire_initial_retry_token( |
208 | 209 | self, *, token_scope: str | None = None |
209 | 210 | ) -> SimpleRetryToken: |
210 | | - """Called before any retries (for the first attempt at the operation). |
| 211 | + """Create a base retry token for the start of a request. |
211 | 212 |
|
212 | 213 | :param token_scope: This argument is ignored by this retry strategy. |
213 | 214 | """ |
@@ -242,3 +243,169 @@ def refresh_retry_token_for_retry( |
242 | 243 |
|
243 | 244 | def record_success(self, *, token: retries_interface.RetryToken) -> None: |
244 | 245 | """Not used by this retry strategy.""" |
| 246 | + |
| 247 | + |
| 248 | +class StandardRetryQuota: |
| 249 | + """Retry quota used by :py:class:`StandardRetryStrategy`.""" |
| 250 | + |
| 251 | + INITIAL_RETRY_TOKENS: int = 500 |
| 252 | + RETRY_COST: int = 5 |
| 253 | + NO_RETRY_INCREMENT: int = 1 |
| 254 | + TIMEOUT_RETRY_COST: int = 10 |
| 255 | + |
| 256 | + def __init__(self, initial_capacity: int = INITIAL_RETRY_TOKENS): |
| 257 | + """Initialize retry quota with configurable capacity. |
| 258 | +
|
| 259 | + :param initial_capacity: The initial and maximum capacity for the retry quota. |
| 260 | + """ |
| 261 | + self._max_capacity = initial_capacity |
| 262 | + self._available_capacity = initial_capacity |
| 263 | + self._lock = threading.Lock() |
| 264 | + |
| 265 | + def acquire(self, *, error: Exception) -> int: |
| 266 | + """Attempt to acquire capacity for a retry attempt. |
| 267 | +
|
| 268 | + If there's insufficient capacity available, raise an exception. |
| 269 | + Otherwise, return the amount of capacity successfully allocated. |
| 270 | + """ |
| 271 | + capacity_amount = self.RETRY_COST |
| 272 | + |
| 273 | + with self._lock: |
| 274 | + if capacity_amount > self._available_capacity: |
| 275 | + raise RetryError("Retry quota exceeded") |
| 276 | + self._available_capacity -= capacity_amount |
| 277 | + return capacity_amount |
| 278 | + |
| 279 | + def release(self, *, release_amount: int) -> None: |
| 280 | + """Release capacity back to the retry quota. |
| 281 | +
|
| 282 | + The capacity being released will be truncated if necessary to ensure the max |
| 283 | + capacity is never exceeded. |
| 284 | + """ |
| 285 | + increment = self.NO_RETRY_INCREMENT if release_amount == 0 else release_amount |
| 286 | + |
| 287 | + if self._available_capacity == self._max_capacity: |
| 288 | + return |
| 289 | + |
| 290 | + with self._lock: |
| 291 | + self._available_capacity = min( |
| 292 | + self._available_capacity + increment, self._max_capacity |
| 293 | + ) |
| 294 | + |
| 295 | + @property |
| 296 | + def available_capacity(self) -> int: |
| 297 | + """Return the amount of capacity available.""" |
| 298 | + return self._available_capacity |
| 299 | + |
| 300 | + |
| 301 | +@dataclass(kw_only=True) |
| 302 | +class StandardRetryToken: |
| 303 | + retry_count: int |
| 304 | + """Retry count is the total number of attempts minus the initial attempt.""" |
| 305 | + |
| 306 | + retry_delay: float |
| 307 | + """Delay in seconds to wait before the retry attempt.""" |
| 308 | + |
| 309 | + quota_acquired: int = 0 |
| 310 | + """The amount of quota acquired for this retry attempt.""" |
| 311 | + |
| 312 | + |
| 313 | +class StandardRetryStrategy(retries_interface.RetryStrategy): |
| 314 | + def __init__( |
| 315 | + self, |
| 316 | + *, |
| 317 | + backoff_strategy: retries_interface.RetryBackoffStrategy | None = None, |
| 318 | + max_attempts: int = 3, |
| 319 | + retry_quota: StandardRetryQuota | None = None, |
| 320 | + ): |
| 321 | + """Standard retry strategy using truncated binary exponential backoff with full |
| 322 | + jitter. |
| 323 | +
|
| 324 | + :param backoff_strategy: The backoff strategy used by returned tokens to compute |
| 325 | + the retry delay. Defaults to :py:class:`ExponentialRetryBackoffStrategy`. |
| 326 | +
|
| 327 | + :param max_attempts: Upper limit on total number of attempts made, including |
| 328 | + initial attempt and retries. |
| 329 | +
|
| 330 | + :param retry_quota: The retry quota to use for managing retry capacity. Defaults |
| 331 | + to a new :py:class:`StandardRetryQuota` instance. |
| 332 | + """ |
| 333 | + if max_attempts < 0: |
| 334 | + raise ValueError( |
| 335 | + f"max_attempts must be a non-negative integer, got {max_attempts}" |
| 336 | + ) |
| 337 | + |
| 338 | + self.backoff_strategy = backoff_strategy or ExponentialRetryBackoffStrategy( |
| 339 | + backoff_scale_value=1, |
| 340 | + max_backoff=20, |
| 341 | + jitter_type=ExponentialBackoffJitterType.FULL, |
| 342 | + ) |
| 343 | + self.max_attempts = max_attempts |
| 344 | + self._retry_quota = retry_quota or StandardRetryQuota() |
| 345 | + |
| 346 | + def acquire_initial_retry_token( |
| 347 | + self, *, token_scope: str | None = None |
| 348 | + ) -> StandardRetryToken: |
| 349 | + """Create a base retry token for the start of a request. |
| 350 | +
|
| 351 | + :param token_scope: This argument is ignored by this retry strategy. |
| 352 | + """ |
| 353 | + retry_delay = self.backoff_strategy.compute_next_backoff_delay(0) |
| 354 | + return StandardRetryToken(retry_count=0, retry_delay=retry_delay) |
| 355 | + |
| 356 | + def refresh_retry_token_for_retry( |
| 357 | + self, |
| 358 | + *, |
| 359 | + token_to_renew: retries_interface.RetryToken, |
| 360 | + error: Exception, |
| 361 | + ) -> StandardRetryToken: |
| 362 | + """Replace an existing retry token from a failed attempt with a new token. |
| 363 | +
|
| 364 | + This retry strategy always returns a token until the attempt count stored in |
| 365 | + the new token exceeds the ``max_attempts`` value. |
| 366 | +
|
| 367 | + :param token_to_renew: The token used for the previous failed attempt. |
| 368 | + :param error: The error that triggered the need for a retry. |
| 369 | + :raises RetryError: If no further retry attempts are allowed. |
| 370 | + """ |
| 371 | + if not isinstance(token_to_renew, StandardRetryToken): |
| 372 | + raise TypeError( |
| 373 | + f"StandardRetryStrategy requires StandardRetryToken, got {type(token_to_renew).__name__}" |
| 374 | + ) |
| 375 | + |
| 376 | + if isinstance(error, retries_interface.ErrorRetryInfo) and error.is_retry_safe: |
| 377 | + retry_count = token_to_renew.retry_count + 1 |
| 378 | + if retry_count >= self.max_attempts: |
| 379 | + raise RetryError( |
| 380 | + f"Reached maximum number of allowed attempts: {self.max_attempts}" |
| 381 | + ) from error |
| 382 | + |
| 383 | + # Acquire additional quota for this retry attempt |
| 384 | + # (may raise a RetryError if none is available) |
| 385 | + quota_acquired = self._retry_quota.acquire(error=error) |
| 386 | + |
| 387 | + if error.retry_after is not None: |
| 388 | + retry_delay = error.retry_after |
| 389 | + else: |
| 390 | + retry_delay = self.backoff_strategy.compute_next_backoff_delay( |
| 391 | + retry_count |
| 392 | + ) |
| 393 | + |
| 394 | + return StandardRetryToken( |
| 395 | + retry_count=retry_count, |
| 396 | + retry_delay=retry_delay, |
| 397 | + quota_acquired=quota_acquired, |
| 398 | + ) |
| 399 | + else: |
| 400 | + raise RetryError(f"Error is not retryable: {error}") from error |
| 401 | + |
| 402 | + def record_success(self, *, token: retries_interface.RetryToken) -> None: |
| 403 | + """Release retry quota back based on the amount consumed by the last retry. |
| 404 | +
|
| 405 | + :param token: The token used for the previous successful attempt. |
| 406 | + """ |
| 407 | + if not isinstance(token, StandardRetryToken): |
| 408 | + raise TypeError( |
| 409 | + f"StandardRetryStrategy requires StandardRetryToken, got {type(token).__name__}" |
| 410 | + ) |
| 411 | + self._retry_quota.release(release_amount=token.quota_acquired) |
0 commit comments