|
29 | 29 |
|
30 | 30 | from sentry_sdk.consts import EndpointType
|
31 | 31 | from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions
|
32 |
| -from sentry_sdk.worker import BackgroundWorker, Worker |
| 32 | +from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker |
33 | 33 | from sentry_sdk.envelope import Envelope, Item, PayloadRef
|
34 | 34 |
|
35 | 35 | from typing import TYPE_CHECKING
|
@@ -225,9 +225,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None:
|
225 | 225 | elif self._compression_algo == "br":
|
226 | 226 | self._compression_level = 4
|
227 | 227 |
|
228 |
| - def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: |
229 |
| - # For now, we only support the threaded sync background worker. |
230 |
| - return BackgroundWorker(queue_size=options["transport_queue_size"]) |
| 228 | + def _create_worker(self, options: dict[str, Any]) -> Worker: |
| 229 | + async_enabled = options.get("_experiments", {}).get("transport_async", False) |
| 230 | + worker_cls = AsyncWorker if async_enabled else BackgroundWorker |
| 231 | + return worker_cls(queue_size=options["transport_queue_size"]) |
231 | 232 |
|
232 | 233 | def record_lost_event(
|
233 | 234 | self: Self,
|
@@ -642,18 +643,26 @@ async def send_envelope_wrapper() -> None:
|
642 | 643 |
|
643 | 644 | def capture_envelope(self: Self, envelope: Envelope) -> None:
|
644 | 645 | # Synchronous entry point
|
645 |
| - if asyncio.get_running_loop() is not None: |
| 646 | + try: |
| 647 | + asyncio.get_running_loop() |
646 | 648 | # We are on the main thread running the event loop
|
647 | 649 | task = asyncio.create_task(self._capture_envelope(envelope))
|
648 | 650 | self.background_tasks.add(task)
|
649 | 651 | task.add_done_callback(self.background_tasks.discard)
|
650 |
| - else: |
| 652 | + except RuntimeError: |
651 | 653 | # We are in a background thread, not running an event loop,
|
652 | 654 | # have to launch the task on the loop in a threadsafe way.
|
653 |
| - asyncio.run_coroutine_threadsafe( |
654 |
| - self._capture_envelope(envelope), |
655 |
| - self._loop, |
656 |
| - ) |
| 655 | + if self._loop and self._loop.is_running(): |
| 656 | + asyncio.run_coroutine_threadsafe( |
| 657 | + self._capture_envelope(envelope), |
| 658 | + self._loop, |
| 659 | + ) |
| 660 | + else: |
| 661 | + # The event loop is no longer running |
| 662 | + logger.warning("Async Transport is not running in an event loop.") |
| 663 | + self.on_dropped_event("no_async_context") |
| 664 | + for item in envelope.items: |
| 665 | + self.record_lost_event("no_async_context", item=item) |
657 | 666 |
|
658 | 667 | async def flush_async(
|
659 | 668 | self: Self,
|
@@ -993,11 +1002,13 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]:
|
993 | 1002 | ref_transport = options["transport"]
|
994 | 1003 |
|
995 | 1004 | use_http2_transport = options.get("_experiments", {}).get("transport_http2", False)
|
996 |
| - |
| 1005 | + use_async_transport = options.get("_experiments", {}).get("transport_async", False) |
997 | 1006 | # By default, we use the http transport class
|
998 |
| - transport_cls: Type[Transport] = ( |
999 |
| - Http2Transport if use_http2_transport else HttpTransport |
1000 |
| - ) |
| 1007 | + if use_async_transport and asyncio.get_running_loop() is not None: |
| 1008 | + transport_cls: Type[Transport] = AsyncHttpTransport |
| 1009 | + else: |
| 1010 | + use_http2 = use_http2_transport |
| 1011 | + transport_cls = Http2Transport if use_http2 else HttpTransport |
1001 | 1012 |
|
1002 | 1013 | if isinstance(ref_transport, Transport):
|
1003 | 1014 | return ref_transport
|
|
0 commit comments