Skip to content

Commit 9a2c80c

Browse files
srothhsl0thentr0py
andauthored
Integrate async transport with SDK (#4615)
Integrate the async transport with the rest of the SDK. Provide a new experimental option `transport_async` that enables the async transport if an event loop is running. Otherwise, fall back to the sync transport. Furthermore, adapt the client to work with the async transport. To this end, flush and close were changed to be non blocking and awaitable in an async context to avoid deadlocks, however close enforces a completed flush before shutdown. As there are to my knowledge no background threads running flush/close, these methods are currently not thread-safe/loop-aware for async, which can be changed if necessary. Atexit issue: The atexit integration used by the SDK runs after the event loop has already closed if asyncio.run() is used. This makes it impossible for the async flush to happen, as atexit calls client.close(), but a loop is no longer present. I attempted to apply [this fix](https://discuss.python.org/t/atexit-for-asyncio/13911) by patching the loop close in the asyncio integration, but I am unsure if I did it correctly/put it in the correct spot, or if this is a good idea. From my SDK test however, it seems to fix the flush issue. Note also that this will apparently be patched in Python 3.14, as per the discussion in the linked thread. As a final note, I added event loop checking. Whenever the event loop is used, the transport/client catch RuntimeErrors, which would be thrown in case the event loop was already shut down. I am not sure if this is a case we need to consider, but I added it for now because I did not want the transport to potentially throw RuntimeError if the event loop is shutdown during a program. If this should be left out currently for simplicity, I can remove it again. I added the [httpcore[asyncio] ](https://www.encode.io/httpcore/async/) dependency to requirements-testing, as it is needed for the async httpcore functionality. GH-4601 --------- Co-authored-by: Neel Shah <[email protected]>
1 parent 35d7078 commit 9a2c80c

File tree

12 files changed

+891
-158
lines changed

12 files changed

+891
-158
lines changed

requirements-testing.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ asttokens
1111
responses
1212
pysocks
1313
socksio
14-
httpcore[http2]
14+
httpcore[http2,asyncio]
1515
setuptools
1616
freezegun
1717
Brotli

scripts/populate_tox/config.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@
9696
"pytest-asyncio",
9797
"python-multipart",
9898
"requests",
99-
"anyio<4",
99+
"anyio>=3,<5",
100100
],
101101
# There's an incompatibility between FastAPI's TestClient, which is
102102
# actually Starlette's TestClient, which is actually httpx's Client.
@@ -106,6 +106,7 @@
106106
# FastAPI versions we use older httpx which still supports the
107107
# deprecated argument.
108108
"<0.110.1": ["httpx<0.28.0"],
109+
"<0.80": ["anyio<4"],
109110
"py3.6": ["aiocontextvars"],
110111
},
111112
},

scripts/populate_tox/tox.jinja

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ deps =
207207
httpx-v0.25: pytest-httpx==0.25.0
208208
httpx: pytest-httpx
209209
# anyio is a dep of httpx
210-
httpx: anyio<4.0.0
210+
httpx: anyio>=3,<5
211211
httpx-v0.16: httpx~=0.16.0
212212
httpx-v0.18: httpx~=0.18.0
213213
httpx-v0.20: httpx~=0.20.0

sentry_sdk/api.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,14 @@ def flush(
226226
return get_client().flush(timeout=timeout, callback=callback)
227227

228228

229+
@clientmethod
230+
async def flush_async(
231+
timeout: Optional[float] = None,
232+
callback: Optional[Callable[[int, float], None]] = None,
233+
) -> None:
234+
return await get_client().flush_async(timeout=timeout, callback=callback)
235+
236+
229237
def start_span(**kwargs: Any) -> Span:
230238
"""
231239
Start and return a span.

sentry_sdk/client.py

Lines changed: 89 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
)
2626
from sentry_sdk.serializer import serialize
2727
from sentry_sdk.tracing import trace
28-
from sentry_sdk.transport import HttpTransportCore, make_transport
28+
from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport
2929
from sentry_sdk.consts import (
3030
SPANDATA,
3131
DEFAULT_MAX_VALUE_LENGTH,
@@ -214,6 +214,12 @@ def close(self, *args: Any, **kwargs: Any) -> None:
214214
def flush(self, *args: Any, **kwargs: Any) -> None:
215215
return None
216216

217+
async def close_async(self, *args: Any, **kwargs: Any) -> None:
218+
return None
219+
220+
async def flush_async(self, *args: Any, **kwargs: Any) -> None:
221+
return None
222+
217223
def __enter__(self) -> BaseClient:
218224
return self
219225

@@ -917,6 +923,14 @@ def get_integration(
917923

918924
return self.integrations.get(integration_name)
919925

926+
def _close_components(self) -> None:
927+
"""Kill all client components in the correct order."""
928+
self.session_flusher.kill()
929+
if self.log_batcher is not None:
930+
self.log_batcher.kill()
931+
if self.monitor:
932+
self.monitor.kill()
933+
920934
def close(
921935
self,
922936
timeout: Optional[float] = None,
@@ -927,19 +941,43 @@ def close(
927941
semantics as :py:meth:`Client.flush`.
928942
"""
929943
if self.transport is not None:
944+
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
945+
self.transport, "loop"
946+
):
947+
logger.debug(
948+
"close() used with AsyncHttpTransport, aborting. Please use close_async() instead."
949+
)
950+
return
930951
self.flush(timeout=timeout, callback=callback)
931-
932-
self.session_flusher.kill()
933-
934-
if self.log_batcher is not None:
935-
self.log_batcher.kill()
936-
937-
if self.monitor:
938-
self.monitor.kill()
939-
952+
self._close_components()
940953
self.transport.kill()
941954
self.transport = None
942955

956+
async def close_async(
957+
self,
958+
timeout: Optional[float] = None,
959+
callback: Optional[Callable[[int, float], None]] = None,
960+
) -> None:
961+
"""
962+
Asynchronously close the client and shut down the transport. Arguments have the same
963+
semantics as :py:meth:`Client.flush_async`.
964+
"""
965+
if self.transport is not None:
966+
if not (
967+
isinstance(self.transport, AsyncHttpTransport)
968+
and hasattr(self.transport, "loop")
969+
):
970+
logger.debug(
971+
"close_async() used with non-async transport, aborting. Please use close() instead."
972+
)
973+
return
974+
await self.flush_async(timeout=timeout, callback=callback)
975+
self._close_components()
976+
kill_task = self.transport.kill() # type: ignore
977+
if kill_task is not None:
978+
await kill_task
979+
self.transport = None
980+
943981
def flush(
944982
self,
945983
timeout: Optional[float] = None,
@@ -953,15 +991,52 @@ def flush(
953991
:param callback: Is invoked with the number of pending events and the configured timeout.
954992
"""
955993
if self.transport is not None:
994+
if isinstance(self.transport, AsyncHttpTransport) and hasattr(
995+
self.transport, "loop"
996+
):
997+
logger.debug(
998+
"flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead."
999+
)
1000+
return
9561001
if timeout is None:
9571002
timeout = self.options["shutdown_timeout"]
958-
self.session_flusher.flush()
959-
960-
if self.log_batcher is not None:
961-
self.log_batcher.flush()
1003+
self._flush_components()
9621004

9631005
self.transport.flush(timeout=timeout, callback=callback)
9641006

1007+
async def flush_async(
1008+
self,
1009+
timeout: Optional[float] = None,
1010+
callback: Optional[Callable[[int, float], None]] = None,
1011+
) -> None:
1012+
"""
1013+
Asynchronously wait for the current events to be sent.
1014+
1015+
:param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used.
1016+
1017+
:param callback: Is invoked with the number of pending events and the configured timeout.
1018+
"""
1019+
if self.transport is not None:
1020+
if not (
1021+
isinstance(self.transport, AsyncHttpTransport)
1022+
and hasattr(self.transport, "loop")
1023+
):
1024+
logger.debug(
1025+
"flush_async() used with non-async transport, aborting. Please use flush() instead."
1026+
)
1027+
return
1028+
if timeout is None:
1029+
timeout = self.options["shutdown_timeout"]
1030+
self._flush_components()
1031+
flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore
1032+
if flush_task is not None:
1033+
await flush_task
1034+
1035+
def _flush_components(self) -> None:
1036+
self.session_flusher.flush()
1037+
if self.log_batcher is not None:
1038+
self.log_batcher.flush()
1039+
9651040
def __enter__(self) -> _Client:
9661041
return self
9671042

sentry_sdk/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class CompressionAlgo(Enum):
7575
"transport_compression_algo": Optional[CompressionAlgo],
7676
"transport_num_pools": Optional[int],
7777
"transport_http2": Optional[bool],
78+
"transport_async": Optional[bool],
7879
"enable_logs": Optional[bool],
7980
"before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]],
8081
},

sentry_sdk/integrations/asyncio.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from sentry_sdk.consts import OP
66
from sentry_sdk.integrations import Integration, DidNotEnable
77
from sentry_sdk.utils import event_from_exception, logger, reraise
8+
from sentry_sdk.transport import AsyncHttpTransport
89

910
try:
1011
import asyncio
@@ -29,6 +30,45 @@ def get_name(coro: Any) -> str:
2930
)
3031

3132

33+
def patch_loop_close() -> None:
34+
"""Patch loop.close to flush pending events before shutdown."""
35+
# Atexit shutdown hook happens after the event loop is closed.
36+
# Therefore, it is necessary to patch the loop.close method to ensure
37+
# that pending events are flushed before the interpreter shuts down.
38+
try:
39+
loop = asyncio.get_running_loop()
40+
except RuntimeError:
41+
# No running loop → cannot patch now
42+
return
43+
44+
if getattr(loop, "_sentry_flush_patched", False):
45+
return
46+
47+
async def _flush() -> None:
48+
client = sentry_sdk.get_client()
49+
if not client:
50+
return
51+
52+
try:
53+
if not isinstance(client.transport, AsyncHttpTransport):
54+
return
55+
56+
await client.close_async()
57+
except Exception:
58+
logger.warning("Sentry flush failed during loop shutdown", exc_info=True)
59+
60+
orig_close = loop.close
61+
62+
def _patched_close() -> None:
63+
try:
64+
loop.run_until_complete(_flush())
65+
finally:
66+
orig_close()
67+
68+
loop.close = _patched_close # type: ignore
69+
loop._sentry_flush_patched = True # type: ignore
70+
71+
3272
def patch_asyncio() -> None:
3373
orig_task_factory = None
3474
try:
@@ -124,3 +164,4 @@ class AsyncioIntegration(Integration):
124164
@staticmethod
125165
def setup_once() -> None:
126166
patch_asyncio()
167+
patch_loop_close()

0 commit comments

Comments
 (0)