-
Notifications
You must be signed in to change notification settings - Fork 92
feat: retry and retry_async support streaming rpcs #495
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 120 commits
953106a
89aeb75
27feb80
0dffa6d
b330c3b
67ceaa2
ee2647a
5a5396c
7afa76b
2d91ade
0cd384e
f72bbec
88eed5c
91f9cc4
c3eb997
f6c6201
57b0ee3
e814ce7
0ffb03f
a8024f3
c76f641
ee631e3
70eb78c
42ee132
102d83b
f029dbd
c83c62a
185826c
c5f7bbe
4242036
9c4799c
0bd6cab
67aeeaf
985b13a
0ea8297
b952652
6cb3e2d
99da116
7f862d0
04a4a69
d20cf08
183c221
d2217e4
d4a9d30
06d45cc
de41a14
dcb3766
dd368e4
452b9bb
6879418
847509f
b5e3796
7a7d9ac
6619895
27fc930
d6a23ea
90ef834
6201db6
61ce3a7
69149a1
d63871e
773e033
d1def5d
cbaaa1d
21a863f
878ddfb
7b0a600
0188228
902a4ab
74f3f3e
e506aad
5baa2aa
5c3805d
265d998
0423ebe
c4049f5
acd6546
b1ad4b3
8dcf67c
6104c59
43d0913
9ba7676
14c195c
de7b51a
4cdee6b
a526d65
ee2bbdd
5f82355
9900c40
2c2dcbe
3340399
de07714
67068ac
54325bc
bafa18b
2ae2a32
9cadd63
c9ef1d5
41c7868
30fccb9
a2b0e6c
4aa1ab4
8349424
ece5cf8
5ddda24
9e3ea92
3b06b3a
8bb6b0c
37c64a0
cee0028
3a7e5fa
ba6dc9f
0500b8b
1ccadb1
c312262
1fe57e0
4f09f29
06824b9
343157b
93f82cc
0915ca0
61e5ab5
51c125b
02604bc
6269db2
0dcd0de
54e9c81
2342910
eada0d7
ae2bf37
c8a4f26
2840b9f
82274a3
1594a17
9b0ddb0
8985127
60b20ab
237ca3d
a46c0f7
93727b7
796ae52
0688ffe
da048ab
80e5eb0
562079b
a0fecc5
8cc6ea9
e7a5cd4
02c12cc
03b1608
b05b11f
0b5d3a2
03f2af5
5fee888
239ed7d
94eb0f5
7d1e246
b0faa2d
6c44298
51df672
e207376
39716a7
2bbf33f
3b03bfa
e63701d
c101ea6
3642d74
34cfa08
583181d
b311b87
19a998d
5637e88
c4be5f2
4d9e762
2e9e84b
d183a7e
e2d9c9c
4543106
d791aad
638cc68
f7b1e14
07db4c2
d448a52
781426a
4a05404
b221c8d
b5b4534
0f1145d
8408512
aa69c56
d1ac29d
3ab88fc
382d0e2
4258823
1bc9731
aafe057
8095229
de9f518
7864667
4c24322
7855513
f4bfb02
a88cf6f
b5c62e1
852f4f8
cd8323e
ace61eb
1bbd1f0
35cc00a
89abfa4
74ab817
85b3e02
6dbe17d
71e5888
cbae3d3
61198b8
acf9752
7cf9fbf
f62439a
b7abeca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,7 @@ def check_if_exists(): | |
|
||
from google.api_core import datetime_helpers | ||
from google.api_core import exceptions | ||
from google.api_core.retry_streaming import retry_target_stream | ||
from google.auth import exceptions as auth_exceptions | ||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
@@ -154,7 +155,7 @@ def retry_target( | |
higher-level retry helper :class:`Retry`. | ||
|
||
Args: | ||
target(Callable): The function to call and retry. This must be a | ||
target(Callable[[], Any]): The function to call and retry. This must be a | ||
nullary function - apply arguments with `functools.partial`. | ||
predicate (Callable[Exception]): A callable used to determine if an | ||
exception raised by the target should be considered retryable. | ||
|
@@ -301,6 +302,18 @@ class Retry(object): | |
maximum (float): The maximum amount of time to delay in seconds. | ||
multiplier (float): The multiplier applied to the delay. | ||
timeout (float): How long to keep retrying, in seconds. | ||
on_error (Callable[Exception]): A function to call while processing | ||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
is_stream (bool): Indicates whether the input function | ||
should be treated as an stream function (i.e. a Generator, | ||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
or function that returns an Iterable). If True, the iterable | ||
will be wrapped with retry logic, and any failed outputs will | ||
restart the stream. If False, only the input function call itself | ||
|
||
will be retried. Defaults to False. | ||
To avoid duplicate values, retryable streams should typically be | ||
wrapped in additional filter logic before use. For more details, see | ||
``google.api_core.retry_streaming.retry_target_stream``. | ||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
deadline (float): DEPRECATED: use `timeout` instead. For backward | ||
compatibility, if specified it will override the ``timeout`` parameter. | ||
""" | ||
|
@@ -313,7 +326,8 @@ def __init__( | |
multiplier=_DEFAULT_DELAY_MULTIPLIER, | ||
timeout=_DEFAULT_DEADLINE, | ||
on_error=None, | ||
**kwargs | ||
is_stream=False, | ||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
**kwargs, | ||
): | ||
self._predicate = predicate | ||
self._initial = initial | ||
|
@@ -322,6 +336,7 @@ def __init__( | |
self._timeout = kwargs.get("deadline", timeout) | ||
self._deadline = self._timeout | ||
self._on_error = on_error | ||
self._is_stream = is_stream | ||
|
||
def __call__(self, func, on_error=None): | ||
"""Wrap a callable with retry behavior. | ||
|
@@ -346,7 +361,8 @@ def retry_wrapped_func(*args, **kwargs): | |
sleep_generator = exponential_sleep_generator( | ||
self._initial, self._maximum, multiplier=self._multiplier | ||
) | ||
return retry_target( | ||
retry_func = retry_target_stream if self._is_stream else retry_target | ||
return retry_func( | ||
target, | ||
self._predicate, | ||
sleep_generator, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -61,6 +61,7 @@ async def check_if_exists(): | |
from google.api_core.retry import exponential_sleep_generator | ||
from google.api_core.retry import if_exception_type # noqa: F401 | ||
from google.api_core.retry import if_transient_error | ||
from google.api_core.retry_streaming_async import retry_target_stream | ||
|
||
|
||
_LOGGER = logging.getLogger(__name__) | ||
|
@@ -74,13 +75,13 @@ async def check_if_exists(): | |
async def retry_target( | ||
target, predicate, sleep_generator, timeout=None, on_error=None, **kwargs | ||
|
||
): | ||
"""Call a function and retry if it fails. | ||
"""Await a coroutine and retry if it fails. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, the sleep-delay logic should be the same between all four versions of this function: syncvs async, unary vs streaming. I would suggest:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was trying to avoid making changes to the existing retries, but I suppose with the new base classes, it makes sense to generalize them a bit. I added a shared Note though that I had to make a (potentially breaking) change to the async unary retry logic to make it consistent though. Previously async_unary would attempt to cancel early when the deadline is reached, while the other retries only check the deadline after the request terminates. The old behavior was very slow (using asyncio.wait_for), and caused race conditions. I'd consier it a bug, but fixing the behavior may count as a breaking change (even though gapic functions shouldn't be affected) |
||
|
||
This is the lowest-level retry helper. Generally, you'll use the | ||
higher-level retry helper :class:`Retry`. | ||
|
||
Args: | ||
target(Callable): The function to call and retry. This must be a | ||
target(Callable[[], Any]): The function to call and retry. This must be a | ||
nullary function - apply arguments with `functools.partial`. | ||
predicate (Callable[Exception]): A callable used to determine if an | ||
exception raised by the target should be considered retryable. | ||
|
@@ -156,7 +157,7 @@ async def retry_target( | |
|
||
|
||
class AsyncRetry: | ||
"""Exponential retry decorator for async functions. | ||
"""Exponential retry decorator for async coroutines. | ||
|
||
This class is a decorator used to add exponential back-off retry behavior | ||
to an RPC call. | ||
|
@@ -175,6 +176,15 @@ class AsyncRetry: | |
on_error (Callable[Exception]): A function to call while processing | ||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
is_stream (bool): Indicates whether the input function | ||
should be treated as an stream function (i.e. an AsyncGenerator, | ||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
or function or coroutine that returns an AsyncIterable). | ||
If True, the iterable will be wrapped with retry logic, and any | ||
failed outputs will restart the stream. If False, only the input | ||
function call itself will be retried. Defaults to False. | ||
To avoid duplicate values, retryable streams should typically be | ||
wrapped in additional filter logic before use. For more details, see | ||
``google.api_core.retry_streaming_async.retry_target_stream``. | ||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
deadline (float): DEPRECATED use ``timeout`` instead. If set it will | ||
override ``timeout`` parameter. | ||
""" | ||
|
@@ -187,6 +197,7 @@ def __init__( | |
multiplier=_DEFAULT_DELAY_MULTIPLIER, | ||
timeout=_DEFAULT_TIMEOUT, | ||
on_error=None, | ||
is_stream=False, | ||
**kwargs | ||
): | ||
self._predicate = predicate | ||
|
@@ -196,12 +207,13 @@ def __init__( | |
self._timeout = kwargs.get("deadline", timeout) | ||
self._deadline = self._timeout | ||
self._on_error = on_error | ||
self._is_stream = is_stream | ||
|
||
def __call__(self, func, on_error=None): | ||
"""Wrap a callable with retry behavior. | ||
|
||
Args: | ||
func (Callable): The callable to add retry behavior to. | ||
func (Callable): The callable or stream to add retry behavior to. | ||
on_error (Callable[Exception]): A function to call while processing | ||
a retryable exception. Any error raised by this function will | ||
*not* be caught. | ||
|
@@ -224,11 +236,29 @@ async def retry_wrapped_func(*args, **kwargs): | |
target, | ||
self._predicate, | ||
sleep_generator, | ||
self._timeout, | ||
timeout=self._timeout, | ||
on_error=on_error, | ||
) | ||
|
||
@functools.wraps(func) | ||
def retry_wrapped_stream(*args, **kwargs): | ||
"""A wrapper that iterates over target stream with retry.""" | ||
target = functools.partial(func, *args, **kwargs) | ||
sleep_generator = exponential_sleep_generator( | ||
self._initial, self._maximum, multiplier=self._multiplier | ||
) | ||
return retry_target_stream( | ||
target, | ||
self._predicate, | ||
sleep_generator, | ||
timeout=self._timeout, | ||
on_error=on_error, | ||
) | ||
|
||
return retry_wrapped_func | ||
if self._is_stream: | ||
|
||
return retry_wrapped_stream | ||
else: | ||
return retry_wrapped_func | ||
|
||
def _replace( | ||
self, | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,221 @@ | ||||||
# Copyright 2023 Google LLC | ||||||
# | ||||||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
# you may not use this file except in compliance with the License. | ||||||
# You may obtain a copy of the License at | ||||||
# | ||||||
# http://www.apache.org/licenses/LICENSE-2.0 | ||||||
# | ||||||
# Unless required by applicable law or agreed to in writing, software | ||||||
# distributed under the License is distributed on an "AS IS" BASIS, | ||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
# See the License for the specific language governing permissions and | ||||||
# limitations under the License. | ||||||
|
||||||
""" | ||||||
Generator wrapper for retryable streaming RPCs. | ||||||
This function will be used when initilizing a retry with | ||||||
``Retry(is_stream=True)``. | ||||||
When ``is_stream=False``, the target is treated as a callable, | ||||||
|
||||||
and will retry when the callable returns an error. When ``is_stream=True``, | ||||||
the target will be treated as a callable that retruns an iterable. Instead | ||||||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
of just wrapping the initial call in retry logic, the entire iterable is | ||||||
wrapped, with each yield passing through the retryable generator. If any yield | ||||||
in the stream raises a retryable exception, the entire stream will be | ||||||
retried. | ||||||
Important Note: when a stream is encounters a retryable error, it will | ||||||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
silently construct a fresh iterator instance in the background | ||||||
and continue yielding (likely duplicate) values as if no error occurred. | ||||||
This is the most general way to retry a stream, but it often is not the | ||||||
desired behavior. Example: iter([1, 2, 1/0]) -> [1, 2, 1, 2, ...] | ||||||
There are two ways to build more advanced retry logic for streams: | ||||||
1. Wrap the target | ||||||
Use a ``target`` that maintains state between retries, and creates a | ||||||
different generator on each retry call. For example, you can wrap a | ||||||
network call in a function that modifies the request based on what has | ||||||
already been returned: | ||||||
``` | ||||||
def attempt_with_modified_request(target, request, seen_items=[]): | ||||||
# remove seen items from request on each attempt | ||||||
new_request = modify_request(request, seen_items) | ||||||
new_generator = target(new_request) | ||||||
for item in new_generator: | ||||||
yield item | ||||||
seen_items.append(item) | ||||||
retry_wrapped = Retry(is_stream=True)(attempt_with_modified_request, target, request, []) | ||||||
|
||||||
``` | ||||||
2. Wrap the retry generator | ||||||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
Alternatively, you can wrap the retryable generator itself before | ||||||
passing it to the end-user to add a filter on the stream. For | ||||||
example, you can keep track of the items that were successfully yielded | ||||||
in previous retry attempts, and only yield new items when the | ||||||
new attempt surpasses the previous ones: | ||||||
`` | ||||||
def retryable_with_filter(target): | ||||||
stream_idx = 0 | ||||||
# reset stream_idx when the stream is retried | ||||||
def on_error(e): | ||||||
nonlocal stream_idx | ||||||
stream_idx = 0 | ||||||
# build retryable | ||||||
retryable_gen = Retry(is_stream=True, on_error=on_error, ...)(target) | ||||||
# keep track of what has been yielded out of filter | ||||||
yielded_items = [] | ||||||
for item in retryable_gen: | ||||||
if stream_idx >= len(yielded_items): | ||||||
yield item | ||||||
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
yielded_items.append(item) | ||||||
elif item != previous_stream[stream_idx]: | ||||||
raise ValueError("Stream differs from last attempt")" | ||||||
stream_idx += 1 | ||||||
filter_retry_wrapped = retryable_with_filter(target) | ||||||
``` | ||||||
""" | ||||||
|
||||||
from typing import ( | ||||||
Callable, | ||||||
Optional, | ||||||
List, | ||||||
Tuple, | ||||||
Iterable, | ||||||
Generator, | ||||||
TypeVar, | ||||||
Any, | ||||||
Union, | ||||||
cast, | ||||||
) | ||||||
|
||||||
import logging | ||||||
import time | ||||||
from functools import partial | ||||||
|
||||||
from google.api_core import exceptions | ||||||
|
||||||
_LOGGER = logging.getLogger(__name__) | ||||||
|
||||||
T = TypeVar("T") | ||||||
|
||||||
|
||||||
def _build_timeout_error( | ||||||
exc_list: List[Exception], is_timeout: bool, timeout_val: float | ||||||
) -> Tuple[Exception, Optional[Exception]]: | ||||||
""" | ||||||
Default exception_factory implementation. Builds an exception after the retry fails | ||||||
Args: | ||||||
- exc_list (list[Exception]): list of exceptions that occurred during the retry | ||||||
- is_timeout (bool): whether the failure is due to the timeout value being exceeded, | ||||||
or due to a non-retryable exception | ||||||
- timeout_val (float): the original timeout value for the retry, for use in the exception message | ||||||
Returns: | ||||||
- tuple[Exception, Exception|None]: a tuple of the exception to be raised, and the cause exception if any | ||||||
""" | ||||||
src_exc = exc_list[-1] if exc_list else None | ||||||
if is_timeout: | ||||||
return ( | ||||||
exceptions.RetryError( | ||||||
"Timeout of {:.1f}s exceeded".format(timeout_val), | ||||||
src_exc, | ||||||
), | ||||||
src_exc, | ||||||
) | ||||||
else: | ||||||
return exc_list[-1], None | ||||||
|
return exc_list[-1], None | |
return src_exc, None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the problem was with mypy, since src_exc can be None but we want to always return an exception.
I made some changes to fall back to a default error if no exceptions are passed though. Let me know what you think
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
daniel-sanche marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm unclear: It seems (but I haven't found many references) like the attribute is True
when the generator is already closed, so why do we need to close it again? Do you have links to info about this attribute?
Also, am I correct in understanding that the purpose of this to ensure the generator is closed when we process an exception above?
My understanding is that the finally
clause will also fire when the function returns from the try
block, so does it make sense to move the contents of the finally
clause into the except
cause? It might make your intent clearer and be more robust against accidental changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I believe this is left-over from an earlier attempt, where I was trying to implement more of the generator protocol manually instead of relying on yield from
(I originally wanted to implement timeouts in between yields as well)
I think it should be safe to remove this extra check, and the unit tests seem to back that up
Uh oh!
There was an error while loading. Please reload this page.