|
29 | 29 | _LOGGER = logging.getLogger(__name__) |
30 | 30 |
|
31 | 31 |
|
32 | | -# The reason this is necessary is because it lets the user have control on |
33 | | -# when they would want to send requests proto messages instead of sending all |
34 | | -# of them initially. |
35 | | -# |
36 | | -# This is achieved via asynchronous queue (asyncio.Queue), |
37 | | -# gRPC awaits until there's a message in the queue. |
38 | | -# |
39 | | -# Finally, it allows for retrying without swapping queues because if it does |
40 | | -# pull an item off the queue when the RPC is inactive, it'll immediately put |
41 | | -# it back and then exit. This is necessary because yielding the item in this |
42 | | -# case will cause gRPC to discard it. In practice, this means that the order |
43 | | -# of messages is not guaranteed. If preserving order is necessary it would be |
44 | | -# easy to use a priority queue. |
| 32 | + |
45 | 33 | class _AsyncRequestQueueGenerator: |
46 | 34 | """_AsyncRequestQueueGenerator is a helper class for sending asynchronous |
47 | 35 | requests to a gRPC stream from a Queue. |
@@ -92,6 +80,20 @@ def _is_active(self) -> bool: |
92 | 80 | return self.call is None or not self.call.done() |
93 | 81 |
|
94 | 82 | async def __aiter__(self): |
| 83 | + # The reason this is necessary is because it lets the user have |
| 84 | + # control on when they would want to send requests proto messages |
| 85 | + # instead of sending all of them initially. |
| 86 | + # |
| 87 | + # This is achieved via asynchronous queue (asyncio.Queue), |
| 88 | + # gRPC awaits until there's a message in the queue. |
| 89 | + # |
| 90 | + # Finally, it allows for retrying without swapping queues because if |
| 91 | + # it does pull an item off the queue when the RPC is inactive, it'll |
| 92 | + # immediately put it back and then exit. This is necessary because |
| 93 | + # yielding the item in this case will cause gRPC to discard it. In |
| 94 | + # practice, this means that the order of messages is not guaranteed. |
| 95 | + # If preserving order is necessary it would be easy to use a priority |
| 96 | + # queue. |
95 | 97 | if self._initial_request is not None: |
96 | 98 | if callable(self._initial_request): |
97 | 99 | yield self._initial_request() |
|
0 commit comments