diff --git a/requirements-testing.txt b/requirements-testing.txt index 8e7bc47be0..e9a972680c 100644 --- a/requirements-testing.txt +++ b/requirements-testing.txt @@ -11,7 +11,7 @@ asttokens responses pysocks socksio -httpcore[http2] +httpcore[http2,asyncio] setuptools freezegun Brotli diff --git a/scripts/populate_tox/config.py b/scripts/populate_tox/config.py index 78bed91475..06eac2aa83 100644 --- a/scripts/populate_tox/config.py +++ b/scripts/populate_tox/config.py @@ -96,7 +96,7 @@ "pytest-asyncio", "python-multipart", "requests", - "anyio<4", + "anyio>=3,<5", ], # There's an incompatibility between FastAPI's TestClient, which is # actually Starlette's TestClient, which is actually httpx's Client. @@ -106,6 +106,7 @@ # FastAPI versions we use older httpx which still supports the # deprecated argument. "<0.110.1": ["httpx<0.28.0"], + "<0.80": ["anyio<4"], "py3.6": ["aiocontextvars"], }, }, diff --git a/scripts/populate_tox/tox.jinja b/scripts/populate_tox/tox.jinja index 66b1d7885a..514566ea46 100644 --- a/scripts/populate_tox/tox.jinja +++ b/scripts/populate_tox/tox.jinja @@ -207,7 +207,7 @@ deps = httpx-v0.25: pytest-httpx==0.25.0 httpx: pytest-httpx # anyio is a dep of httpx - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16: httpx~=0.16.0 httpx-v0.18: httpx~=0.18.0 httpx-v0.20: httpx~=0.20.0 diff --git a/sentry_sdk/api.py b/sentry_sdk/api.py index 3aefc57f69..3252622746 100644 --- a/sentry_sdk/api.py +++ b/sentry_sdk/api.py @@ -226,6 +226,14 @@ def flush( return get_client().flush(timeout=timeout, callback=callback) +@clientmethod +async def flush_async( + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, +) -> None: + return await get_client().flush_async(timeout=timeout, callback=callback) + + def start_span(**kwargs: Any) -> Span: """ Start and return a span. diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 917701f39c..b9d07e1402 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -25,7 +25,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace -from sentry_sdk.transport import HttpTransportCore, make_transport +from sentry_sdk.transport import HttpTransportCore, make_transport, AsyncHttpTransport from sentry_sdk.consts import ( SPANDATA, DEFAULT_MAX_VALUE_LENGTH, @@ -214,6 +214,12 @@ def close(self, *args: Any, **kwargs: Any) -> None: def flush(self, *args: Any, **kwargs: Any) -> None: return None + async def close_async(self, *args: Any, **kwargs: Any) -> None: + return None + + async def flush_async(self, *args: Any, **kwargs: Any) -> None: + return None + def __enter__(self) -> BaseClient: return self @@ -917,6 +923,14 @@ def get_integration( return self.integrations.get(integration_name) + def _close_components(self) -> None: + """Kill all client components in the correct order.""" + self.session_flusher.kill() + if self.log_batcher is not None: + self.log_batcher.kill() + if self.monitor: + self.monitor.kill() + def close( self, timeout: Optional[float] = None, @@ -927,19 +941,43 @@ def close( semantics as :py:meth:`Client.flush`. """ if self.transport is not None: + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + logger.debug( + "close() used with AsyncHttpTransport, aborting. Please use close_async() instead." + ) + return self.flush(timeout=timeout, callback=callback) - - self.session_flusher.kill() - - if self.log_batcher is not None: - self.log_batcher.kill() - - if self.monitor: - self.monitor.kill() - + self._close_components() self.transport.kill() self.transport = None + async def close_async( + self, + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, + ) -> None: + """ + Asynchronously close the client and shut down the transport. Arguments have the same + semantics as :py:meth:`Client.flush_async`. + """ + if self.transport is not None: + if not ( + isinstance(self.transport, AsyncHttpTransport) + and hasattr(self.transport, "loop") + ): + logger.debug( + "close_async() used with non-async transport, aborting. Please use close() instead." + ) + return + await self.flush_async(timeout=timeout, callback=callback) + self._close_components() + kill_task = self.transport.kill() # type: ignore + if kill_task is not None: + await kill_task + self.transport = None + def flush( self, timeout: Optional[float] = None, @@ -953,15 +991,52 @@ def flush( :param callback: Is invoked with the number of pending events and the configured timeout. """ if self.transport is not None: + if isinstance(self.transport, AsyncHttpTransport) and hasattr( + self.transport, "loop" + ): + logger.debug( + "flush() used with AsyncHttpTransport, aborting. Please use flush_async() instead." + ) + return if timeout is None: timeout = self.options["shutdown_timeout"] - self.session_flusher.flush() - - if self.log_batcher is not None: - self.log_batcher.flush() + self._flush_components() self.transport.flush(timeout=timeout, callback=callback) + async def flush_async( + self, + timeout: Optional[float] = None, + callback: Optional[Callable[[int, float], None]] = None, + ) -> None: + """ + Asynchronously wait for the current events to be sent. + + :param timeout: Wait for at most `timeout` seconds. If no `timeout` is provided, the `shutdown_timeout` option value is used. + + :param callback: Is invoked with the number of pending events and the configured timeout. + """ + if self.transport is not None: + if not ( + isinstance(self.transport, AsyncHttpTransport) + and hasattr(self.transport, "loop") + ): + logger.debug( + "flush_async() used with non-async transport, aborting. Please use flush() instead." + ) + return + if timeout is None: + timeout = self.options["shutdown_timeout"] + self._flush_components() + flush_task = self.transport.flush(timeout=timeout, callback=callback) # type: ignore + if flush_task is not None: + await flush_task + + def _flush_components(self) -> None: + self.session_flusher.flush() + if self.log_batcher is not None: + self.log_batcher.flush() + def __enter__(self) -> _Client: return self diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 2b81fc4a2b..643cbb871a 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -75,6 +75,7 @@ class CompressionAlgo(Enum): "transport_compression_algo": Optional[CompressionAlgo], "transport_num_pools": Optional[int], "transport_http2": Optional[bool], + "transport_async": Optional[bool], "enable_logs": Optional[bool], "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], }, diff --git a/sentry_sdk/integrations/asyncio.py b/sentry_sdk/integrations/asyncio.py index 719cbba1a8..b515345f1a 100644 --- a/sentry_sdk/integrations/asyncio.py +++ b/sentry_sdk/integrations/asyncio.py @@ -5,6 +5,7 @@ from sentry_sdk.consts import OP from sentry_sdk.integrations import Integration, DidNotEnable from sentry_sdk.utils import event_from_exception, logger, reraise +from sentry_sdk.transport import AsyncHttpTransport try: import asyncio @@ -29,6 +30,45 @@ def get_name(coro: Any) -> str: ) +def patch_loop_close() -> None: + """Patch loop.close to flush pending events before shutdown.""" + # Atexit shutdown hook happens after the event loop is closed. + # Therefore, it is necessary to patch the loop.close method to ensure + # that pending events are flushed before the interpreter shuts down. + try: + loop = asyncio.get_running_loop() + except RuntimeError: + # No running loop → cannot patch now + return + + if getattr(loop, "_sentry_flush_patched", False): + return + + async def _flush() -> None: + client = sentry_sdk.get_client() + if not client: + return + + try: + if not isinstance(client.transport, AsyncHttpTransport): + return + + await client.close_async() + except Exception: + logger.warning("Sentry flush failed during loop shutdown", exc_info=True) + + orig_close = loop.close + + def _patched_close() -> None: + try: + loop.run_until_complete(_flush()) + finally: + orig_close() + + loop.close = _patched_close # type: ignore + loop._sentry_flush_patched = True # type: ignore + + def patch_asyncio() -> None: orig_task_factory = None try: @@ -124,3 +164,4 @@ class AsyncioIntegration(Integration): @staticmethod def setup_once() -> None: patch_asyncio() + patch_loop_close() diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index eec4025048..5c2c864198 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -29,6 +29,8 @@ HTTP2_ENABLED = False try: + import anyio # noqa: F401 + ASYNC_TRANSPORT_ENABLED = httpcore is not None except ImportError: ASYNC_TRANSPORT_ENABLED = False @@ -583,9 +585,115 @@ def flush( self._worker.flush(timeout, callback) +class HttpTransport(BaseHttpTransport): + if TYPE_CHECKING: + _pool: Union[PoolManager, ProxyManager] + + def _get_pool_options(self: Self) -> Dict[str, Any]: + + num_pools = self.options.get("_experiments", {}).get("transport_num_pools") + options = { + "num_pools": 2 if num_pools is None else int(num_pools), + "cert_reqs": "CERT_REQUIRED", + "timeout": urllib3.Timeout(total=self.TIMEOUT), + } + + socket_options: Optional[List[Tuple[int, int, int | bytes]]] = None + + if self.options["socket_options"] is not None: + socket_options = self.options["socket_options"] + + if self.options["keep_alive"]: + if socket_options is None: + socket_options = [] + + used_options = {(o[0], o[1]) for o in socket_options} + for default_option in KEEP_ALIVE_SOCKET_OPTIONS: + if (default_option[0], default_option[1]) not in used_options: + socket_options.append(default_option) + + if socket_options is not None: + options["socket_options"] = socket_options + + options["ca_certs"] = ( + self.options["ca_certs"] # User-provided bundle from the SDK init + or os.environ.get("SSL_CERT_FILE") + or os.environ.get("REQUESTS_CA_BUNDLE") + or certifi.where() + ) + + options["cert_file"] = self.options["cert_file"] or os.environ.get( + "CLIENT_CERT_FILE" + ) + options["key_file"] = self.options["key_file"] or os.environ.get( + "CLIENT_KEY_FILE" + ) + + return options + + def _make_pool(self: Self) -> Union[PoolManager, ProxyManager]: + if self.parsed_dsn is None: + raise ValueError("Cannot create HTTP-based transport without valid DSN") + + proxy = None + no_proxy = self._in_no_proxy(self.parsed_dsn) + + # try HTTPS first + https_proxy = self.options["https_proxy"] + if self.parsed_dsn.scheme == "https" and (https_proxy != ""): + proxy = https_proxy or (not no_proxy and getproxies().get("https")) + + # maybe fallback to HTTP proxy + http_proxy = self.options["http_proxy"] + if not proxy and (http_proxy != ""): + proxy = http_proxy or (not no_proxy and getproxies().get("http")) + + opts = self._get_pool_options() + + if proxy: + proxy_headers = self.options["proxy_headers"] + if proxy_headers: + opts["proxy_headers"] = proxy_headers + + if proxy.startswith("socks"): + use_socks_proxy = True + try: + # Check if PySocks dependency is available + from urllib3.contrib.socks import SOCKSProxyManager + except ImportError: + use_socks_proxy = False + logger.warning( + "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.", + proxy, + ) + + if use_socks_proxy: + return SOCKSProxyManager(proxy, **opts) + else: + return urllib3.PoolManager(**opts) + else: + return urllib3.ProxyManager(proxy, **opts) + else: + return urllib3.PoolManager(**opts) + + def _request( + self: Self, + method: str, + endpoint_type: EndpointType, + body: Any, + headers: Mapping[str, str], + ) -> urllib3.BaseHTTPResponse: + return self._pool.request( + method, + self._auth.get_api_url(endpoint_type), + body=body, + headers=headers, + ) + + if not ASYNC_TRANSPORT_ENABLED: # Sorry, no AsyncHttpTransport for you - AsyncHttpTransport = BaseHttpTransport + AsyncHttpTransport = HttpTransport else: @@ -799,112 +907,6 @@ def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore return None -class HttpTransport(BaseHttpTransport): - if TYPE_CHECKING: - _pool: Union[PoolManager, ProxyManager] - - def _get_pool_options(self: Self) -> Dict[str, Any]: - - num_pools = self.options.get("_experiments", {}).get("transport_num_pools") - options = { - "num_pools": 2 if num_pools is None else int(num_pools), - "cert_reqs": "CERT_REQUIRED", - "timeout": urllib3.Timeout(total=self.TIMEOUT), - } - - socket_options: Optional[List[Tuple[int, int, int | bytes]]] = None - - if self.options["socket_options"] is not None: - socket_options = self.options["socket_options"] - - if self.options["keep_alive"]: - if socket_options is None: - socket_options = [] - - used_options = {(o[0], o[1]) for o in socket_options} - for default_option in KEEP_ALIVE_SOCKET_OPTIONS: - if (default_option[0], default_option[1]) not in used_options: - socket_options.append(default_option) - - if socket_options is not None: - options["socket_options"] = socket_options - - options["ca_certs"] = ( - self.options["ca_certs"] # User-provided bundle from the SDK init - or os.environ.get("SSL_CERT_FILE") - or os.environ.get("REQUESTS_CA_BUNDLE") - or certifi.where() - ) - - options["cert_file"] = self.options["cert_file"] or os.environ.get( - "CLIENT_CERT_FILE" - ) - options["key_file"] = self.options["key_file"] or os.environ.get( - "CLIENT_KEY_FILE" - ) - - return options - - def _make_pool(self: Self) -> Union[PoolManager, ProxyManager]: - if self.parsed_dsn is None: - raise ValueError("Cannot create HTTP-based transport without valid DSN") - - proxy = None - no_proxy = self._in_no_proxy(self.parsed_dsn) - - # try HTTPS first - https_proxy = self.options["https_proxy"] - if self.parsed_dsn.scheme == "https" and (https_proxy != ""): - proxy = https_proxy or (not no_proxy and getproxies().get("https")) - - # maybe fallback to HTTP proxy - http_proxy = self.options["http_proxy"] - if not proxy and (http_proxy != ""): - proxy = http_proxy or (not no_proxy and getproxies().get("http")) - - opts = self._get_pool_options() - - if proxy: - proxy_headers = self.options["proxy_headers"] - if proxy_headers: - opts["proxy_headers"] = proxy_headers - - if proxy.startswith("socks"): - use_socks_proxy = True - try: - # Check if PySocks dependency is available - from urllib3.contrib.socks import SOCKSProxyManager - except ImportError: - use_socks_proxy = False - logger.warning( - "You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support. Please add `PySocks` (or `urllib3` with the `[socks]` extra) to your dependencies.", - proxy, - ) - - if use_socks_proxy: - return SOCKSProxyManager(proxy, **opts) - else: - return urllib3.PoolManager(**opts) - else: - return urllib3.ProxyManager(proxy, **opts) - else: - return urllib3.PoolManager(**opts) - - def _request( - self: Self, - method: str, - endpoint_type: EndpointType, - body: Any, - headers: Mapping[str, str], - ) -> urllib3.BaseHTTPResponse: - return self._pool.request( - method, - self._auth.get_api_url(endpoint_type), - body=body, - headers=headers, - ) - - if not HTTP2_ENABLED: # Sorry, no Http2Transport for you class Http2Transport(HttpTransport): @@ -1045,6 +1047,11 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) use_async_transport = options.get("_experiments", {}).get("transport_async", False) + async_integration = any( + integration.__class__.__name__ == "AsyncioIntegration" + for integration in options.get("integrations") or [] + ) + # By default, we use the http transport class transport_cls: Type[Transport] = ( Http2Transport if use_http2_transport else HttpTransport @@ -1052,7 +1059,12 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: if use_async_transport and ASYNC_TRANSPORT_ENABLED: try: asyncio.get_running_loop() - transport_cls = AsyncHttpTransport + if async_integration: + transport_cls = AsyncHttpTransport + else: + logger.warning( + "You tried to use AsyncHttpTransport but the AsyncioIntegration is not enabled. Falling back to sync transport." + ) except RuntimeError: # No event loop running, fall back to sync transport logger.warning("No event loop running, falling back to sync transport.") diff --git a/tests/integrations/asyncio/test_asyncio.py b/tests/integrations/asyncio/test_asyncio.py index 2ae71f8f43..5c329f8185 100644 --- a/tests/integrations/asyncio/test_asyncio.py +++ b/tests/integrations/asyncio/test_asyncio.py @@ -377,3 +377,52 @@ async def test_span_origin( assert event["contexts"]["trace"]["origin"] == "manual" assert event["spans"][0]["origin"] == "auto.function.asyncio" + + +@minimum_python_38 +def test_loop_close_patching(sentry_init): + sentry_init(integrations=[AsyncioIntegration()]) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + assert not hasattr(loop, "_sentry_flush_patched") + AsyncioIntegration.setup_once() + assert hasattr(loop, "_sentry_flush_patched") + assert loop._sentry_flush_patched is True + + finally: + if not loop.is_closed(): + loop.close() + + +@minimum_python_38 +def test_loop_close_flushes_async_transport(sentry_init): + from sentry_sdk.transport import AsyncHttpTransport + from unittest.mock import Mock, AsyncMock + + sentry_init(integrations=[AsyncioIntegration()]) + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + with patch("asyncio.get_running_loop", return_value=loop): + AsyncioIntegration.setup_once() + + mock_client = Mock() + mock_transport = Mock(spec=AsyncHttpTransport) + mock_client.transport = mock_transport + mock_client.close = AsyncMock(return_value=None) + + with patch("sentry_sdk.get_client", return_value=mock_client): + loop.close() + + mock_client.close.assert_called_once() + mock_client.close.assert_awaited_once() + + except Exception: + if not loop.is_closed(): + loop.close() diff --git a/tests/test_client.py b/tests/test_client.py index 8290c8e575..25a3a8ab00 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -23,9 +23,12 @@ from sentry_sdk.spotlight import DEFAULT_SPOTLIGHT_URL from sentry_sdk.utils import capture_internal_exception from sentry_sdk.integrations.executing import ExecutingIntegration -from sentry_sdk.transport import Transport +from sentry_sdk.integrations.asyncio import AsyncioIntegration + +from sentry_sdk.transport import Transport, AsyncHttpTransport from sentry_sdk.serializer import MAX_DATABAG_BREADTH from sentry_sdk.consts import DEFAULT_MAX_BREADCRUMBS, DEFAULT_MAX_VALUE_LENGTH +from sentry_sdk._compat import PY38 from typing import TYPE_CHECKING @@ -1498,3 +1501,323 @@ def test_keep_alive(env_value, arg_value, expected_value): ) assert transport_cls.options["keep_alive"] is expected_value + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "https://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": "", + "arg_https_proxy": None, + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": "", + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": "https://localhost/123", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + # NO_PROXY testcases + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": "http://localhost/123", + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": "https://localhost/123", + "env_no_proxy": "example.com,sentry.io", + "arg_http_proxy": None, + "arg_https_proxy": None, + "expected_proxy_scheme": None, + }, + { + "dsn": "http://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "expected_proxy_scheme": "http", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + }, + { + "dsn": "https://foo@sentry.io/123", + "env_http_proxy": None, + "env_https_proxy": None, + "env_no_proxy": "sentry.io,example.com", + "arg_http_proxy": None, + "arg_https_proxy": "https://localhost/123", + "expected_proxy_scheme": "https", + "arg_proxy_headers": {"Test-Header": "foo-bar"}, + }, + ], +) +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_proxy(monkeypatch, testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + if testcase["env_http_proxy"] is not None: + monkeypatch.setenv("HTTP_PROXY", testcase["env_http_proxy"]) + if testcase["env_https_proxy"] is not None: + monkeypatch.setenv("HTTPS_PROXY", testcase["env_https_proxy"]) + if testcase.get("env_no_proxy") is not None: + monkeypatch.setenv("NO_PROXY", testcase["env_no_proxy"]) + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + if testcase.get("arg_proxy_headers") is not None: + kwargs["proxy_headers"] = testcase["arg_proxy_headers"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + proxy = getattr( + client.transport._pool, + "proxy", + getattr(client.transport._pool, "_proxy_url", None), + ) + if testcase["expected_proxy_scheme"] is None: + assert proxy is None + else: + scheme = ( + proxy.scheme.decode("ascii") + if isinstance(proxy.scheme, bytes) + else proxy.scheme + ) + assert scheme == testcase["expected_proxy_scheme"] + + if testcase.get("arg_proxy_headers") is not None: + proxy_headers = dict( + (k.decode("ascii"), v.decode("ascii")) + for k, v in client.transport._pool._proxy_headers + ) + assert proxy_headers == testcase["arg_proxy_headers"] + + await client.close_async() + + +@pytest.mark.parametrize( + "testcase", + [ + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "http://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": False, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4a://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks4://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5h://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": "socks5://localhost/123", + "arg_https_proxy": None, + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4a://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks4://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5h://localhost/123", + "should_be_socks_proxy": True, + }, + { + "dsn": "https://foo@sentry.io/123", + "arg_http_proxy": None, + "arg_https_proxy": "socks5://localhost/123", + "should_be_socks_proxy": True, + }, + ], +) +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_socks_proxy(testcase): + # These are just the same tests as the sync ones, but they need to be run in an event loop + # and respect the shutdown behavior of the async transport + + kwargs = { + "_experiments": {"transport_async": True}, + "integrations": [AsyncioIntegration()], + } + + if testcase["arg_http_proxy"] is not None: + kwargs["http_proxy"] = testcase["arg_http_proxy"] + if testcase["arg_https_proxy"] is not None: + kwargs["https_proxy"] = testcase["arg_https_proxy"] + + client = Client(testcase["dsn"], **kwargs) + assert isinstance(client.transport, AsyncHttpTransport) + + assert ("socks" in str(type(client.transport._pool)).lower()) == testcase[ + "should_be_socks_proxy" + ], ( + f"Expected {kwargs} to result in SOCKS == {testcase['should_be_socks_proxy']}" + f"but got {str(type(client.transport._pool))}" + ) + + await client.close_async() diff --git a/tests/test_transport.py b/tests/test_transport.py index e612bfcaa5..53426795c6 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -3,6 +3,8 @@ import os import socket import sys +import asyncio +import threading from collections import defaultdict from datetime import datetime, timedelta, timezone from unittest import mock @@ -28,8 +30,10 @@ from sentry_sdk.transport import ( KEEP_ALIVE_SOCKET_OPTIONS, _parse_rate_limits, + AsyncHttpTransport, ) from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger +from sentry_sdk.integrations.asyncio import AsyncioIntegration server = None @@ -146,6 +150,89 @@ def test_transport_works( assert any("Sending envelope" in record.msg for record in caplog.records) == debug +@pytest.mark.asyncio +@pytest.mark.parametrize("debug", (True, False)) +@pytest.mark.parametrize("client_flush_method", ["close", "flush"]) +@pytest.mark.parametrize("use_pickle", (True, False)) +@pytest.mark.parametrize("compression_level", (0, 9, None)) +@pytest.mark.parametrize("compression_algo", ("gzip", "br", "", None)) +@pytest.mark.skipif(not PY38, reason="Async transport only supported in Python 3.8+") +async def test_transport_works_async( + capturing_server, + request, + capsys, + caplog, + debug, + make_client, + client_flush_method, + use_pickle, + compression_level, + compression_algo, +): + caplog.set_level(logging.DEBUG) + + experiments = {} + if compression_level is not None: + experiments["transport_compression_level"] = compression_level + + if compression_algo is not None: + experiments["transport_compression_algo"] = compression_algo + + # Enable async transport + experiments["transport_async"] = True + + client = make_client( + debug=debug, + _experiments=experiments, + integrations=[AsyncioIntegration()], + ) + + if use_pickle: + client = pickle.loads(pickle.dumps(client)) + + # Verify we're using async transport + assert isinstance( + client.transport, AsyncHttpTransport + ), "Expected AsyncHttpTransport" + + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + + add_breadcrumb( + level="info", message="i like bread", timestamp=datetime.now(timezone.utc) + ) + capture_message("löl") + + if client_flush_method == "close": + await client.close_async(timeout=2.0) + if client_flush_method == "flush": + await client.flush_async(timeout=2.0) + + out, err = capsys.readouterr() + assert not err and not out + assert capturing_server.captured + should_compress = ( + # default is to compress with brotli if available, gzip otherwise + (compression_level is None) + or ( + # setting compression level to 0 means don't compress + compression_level + > 0 + ) + ) and ( + # if we couldn't resolve to a known algo, we don't compress + compression_algo + != "" + ) + + assert capturing_server.captured[0].compressed == should_compress + # After flush, the worker task is still running, but the end of the test will shut down the event loop + # Therefore, we need to explicitly close the client to clean up the worker task + assert any("Sending envelope" in record.msg for record in caplog.records) == debug + if client_flush_method == "flush": + await client.close_async(timeout=2.0) + + @pytest.mark.parametrize( "num_pools,expected_num_pools", ( @@ -717,3 +804,138 @@ def mock_record_lost_event(reason, data_category=None, item=None): assert calls[0] == ("on_dropped_event", "connection_error") assert calls[1][0:2] == ("record_lost_event", "network_error") assert calls[2][0:2] == ("record_lost_event", "network_error") + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_background_thread_capture( + capturing_server, make_client, caplog +): + """Test capture_envelope from background threads uses run_coroutine_threadsafe""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + captured_from_thread = [] + exception_from_thread = [] + + def background_thread_work(): + try: + # This should use run_coroutine_threadsafe path + capture_message("from background thread") + captured_from_thread.append(True) + except Exception as e: + exception_from_thread.append(e) + + thread = threading.Thread(target=background_thread_work) + thread.start() + thread.join() + assert not exception_from_thread + assert captured_from_thread + await client.close_async(timeout=2.0) + assert capturing_server.captured + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_event_loop_closed_scenario( + capturing_server, make_client, caplog +): + """Test behavior when trying to capture after event loop context ends""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + sentry_sdk.get_global_scope().set_client(client) + original_loop = client.transport.loop + + with mock.patch("asyncio.get_running_loop", side_effect=RuntimeError("no loop")): + with mock.patch.object(client.transport.loop, "is_running", return_value=False): + with mock.patch("sentry_sdk.transport.logger") as mock_logger: + # This should trigger the "no_async_context" path + capture_message("after loop closed") + + mock_logger.warning.assert_called_with( + "Async Transport is not running in an event loop." + ) + + client.transport.loop = original_loop + await client.close_async(timeout=2.0) + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_concurrent_requests( + capturing_server, make_client, caplog +): + """Test multiple simultaneous envelope submissions""" + caplog.set_level(logging.DEBUG) + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + + num_messages = 15 + + async def send_message(i): + capture_message(f"concurrent message {i}") + + tasks = [send_message(i) for i in range(num_messages)] + await asyncio.gather(*tasks) + await client.close_async(timeout=2.0) + assert len(capturing_server.captured) == num_messages + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_transport_rate_limiting_with_concurrency( + capturing_server, make_client, request +): + """Test async transport rate limiting with concurrent requests""" + experiments = {"transport_async": True} + client = make_client(_experiments=experiments, integrations=[AsyncioIntegration()]) + + assert isinstance(client.transport, AsyncHttpTransport) + sentry_sdk.get_global_scope().set_client(client) + request.addfinalizer(lambda: sentry_sdk.get_global_scope().set_client(None)) + capturing_server.respond_with( + code=429, headers={"X-Sentry-Rate-Limits": "60:error:organization"} + ) + + # Send one request first to trigger rate limiting + capture_message("initial message") + await asyncio.sleep(0.1) # Wait for request to execute + assert client.transport._check_disabled("error") is True + capturing_server.clear_captured() + + async def send_message(i): + capture_message(f"message {i}") + await asyncio.sleep(0.01) + + await asyncio.gather(*[send_message(i) for i in range(5)]) + await asyncio.sleep(0.1) + # New request should be dropped due to rate limiting + assert len(capturing_server.captured) == 0 + await client.close_async(timeout=2.0) + + +@pytest.mark.asyncio +@pytest.mark.skipif(not PY38, reason="Async transport requires Python 3.8+") +async def test_async_two_way_ssl_authentication(): + current_dir = os.path.dirname(__file__) + cert_file = f"{current_dir}/test.pem" + key_file = f"{current_dir}/test.key" + + client = Client( + "https://foo@sentry.io/123", + cert_file=cert_file, + key_file=key_file, + _experiments={"transport_async": True}, + integrations=[AsyncioIntegration()], + ) + assert isinstance(client.transport, AsyncHttpTransport) + + options = client.transport._get_pool_options() + assert options["ssl_context"] is not None + + await client.close_async() diff --git a/tox.ini b/tox.ini index fd52035fac..d6f5e173eb 100644 --- a/tox.ini +++ b/tox.ini @@ -10,7 +10,7 @@ # The file (and all resulting CI YAMLs) then need to be regenerated via # "scripts/generate-test-files.sh". # -# Last generated: 2025-07-23T07:24:30.467173+00:00 +# Last generated: 2025-07-30T13:59:12.959550+00:00 [tox] requires = @@ -125,16 +125,16 @@ envlist = # ~~~ Common ~~~ {py3.7,py3.8,py3.9}-common-v1.4.1 - {py3.7,py3.8,py3.9,py3.10,py3.11}-common-v1.14.0 - {py3.8,py3.9,py3.10,py3.11}-common-v1.24.0 - {py3.9,py3.10,py3.11,py3.12,py3.13}-common-v1.35.0 + {py3.7,py3.8,py3.9,py3.10,py3.11}-common-v1.15.0 + {py3.8,py3.9,py3.10,py3.11,py3.12}-common-v1.26.0 + {py3.9,py3.10,py3.11,py3.12,py3.13}-common-v1.36.0 # ~~~ AI ~~~ {py3.8,py3.11,py3.12}-anthropic-v0.16.0 - {py3.8,py3.11,py3.12}-anthropic-v0.30.1 - {py3.8,py3.11,py3.12}-anthropic-v0.44.0 - {py3.8,py3.12,py3.13}-anthropic-v0.58.2 + {py3.8,py3.11,py3.12}-anthropic-v0.31.2 + {py3.8,py3.11,py3.12}-anthropic-v0.46.0 + {py3.8,py3.12,py3.13}-anthropic-v0.60.0 {py3.9,py3.10,py3.11}-cohere-v5.4.0 {py3.9,py3.11,py3.12}-cohere-v5.9.4 @@ -143,12 +143,13 @@ envlist = {py3.10,py3.11,py3.12}-openai_agents-v0.0.19 {py3.10,py3.12,py3.13}-openai_agents-v0.1.0 - {py3.10,py3.12,py3.13}-openai_agents-v0.2.3 + {py3.10,py3.12,py3.13}-openai_agents-v0.2.4 {py3.8,py3.10,py3.11}-huggingface_hub-v0.22.2 {py3.8,py3.11,py3.12}-huggingface_hub-v0.26.5 {py3.8,py3.12,py3.13}-huggingface_hub-v0.30.2 - {py3.8,py3.12,py3.13}-huggingface_hub-v0.33.4 + {py3.8,py3.12,py3.13}-huggingface_hub-v0.34.3 + {py3.8,py3.12,py3.13}-huggingface_hub-v0.35.0rc0 # ~~~ DBs ~~~ @@ -164,7 +165,7 @@ envlist = {py3.7,py3.8,py3.9}-sqlalchemy-v1.3.24 {py3.7,py3.11,py3.12}-sqlalchemy-v1.4.54 - {py3.7,py3.12,py3.13}-sqlalchemy-v2.0.41 + {py3.7,py3.12,py3.13}-sqlalchemy-v2.0.42 # ~~~ Flags ~~~ @@ -179,7 +180,7 @@ envlist = {py3.7,py3.12,py3.13}-statsig-v0.55.3 {py3.7,py3.12,py3.13}-statsig-v0.57.3 {py3.7,py3.12,py3.13}-statsig-v0.59.1 - {py3.7,py3.12,py3.13}-statsig-v0.60.0 + {py3.7,py3.12,py3.13}-statsig-v0.61.0 {py3.8,py3.12,py3.13}-unleash-v6.0.1 {py3.8,py3.12,py3.13}-unleash-v6.1.0 @@ -210,8 +211,7 @@ envlist = {py3.7,py3.8}-grpc-v1.32.0 {py3.7,py3.9,py3.10}-grpc-v1.46.5 {py3.7,py3.11,py3.12}-grpc-v1.60.2 - {py3.9,py3.12,py3.13}-grpc-v1.73.1 - {py3.9,py3.12,py3.13}-grpc-v1.74.0rc1 + {py3.9,py3.12,py3.13}-grpc-v1.74.0 # ~~~ Tasks ~~~ @@ -262,7 +262,7 @@ envlist = {py3.7}-aiohttp-v3.4.4 {py3.7,py3.8,py3.9}-aiohttp-v3.7.4 {py3.8,py3.12,py3.13}-aiohttp-v3.10.11 - {py3.9,py3.12,py3.13}-aiohttp-v3.12.14 + {py3.9,py3.12,py3.13}-aiohttp-v3.12.15 {py3.7}-bottle-v0.12.25 {py3.8,py3.12,py3.13}-bottle-v0.13.4 @@ -378,7 +378,7 @@ deps = httpx-v0.25: pytest-httpx==0.25.0 httpx: pytest-httpx # anyio is a dep of httpx - httpx: anyio<4.0.0 + httpx: anyio>=3,<5 httpx-v0.16: httpx~=0.16.0 httpx-v0.18: httpx~=0.18.0 httpx-v0.20: httpx~=0.20.0 @@ -484,9 +484,9 @@ deps = # ~~~ Common ~~~ common-v1.4.1: opentelemetry-sdk==1.4.1 - common-v1.14.0: opentelemetry-sdk==1.14.0 - common-v1.24.0: opentelemetry-sdk==1.24.0 - common-v1.35.0: opentelemetry-sdk==1.35.0 + common-v1.15.0: opentelemetry-sdk==1.15.0 + common-v1.26.0: opentelemetry-sdk==1.26.0 + common-v1.36.0: opentelemetry-sdk==1.36.0 common: pytest common: pytest-asyncio py3.7-common: pytest<7.0.0 @@ -495,13 +495,13 @@ deps = # ~~~ AI ~~~ anthropic-v0.16.0: anthropic==0.16.0 - anthropic-v0.30.1: anthropic==0.30.1 - anthropic-v0.44.0: anthropic==0.44.0 - anthropic-v0.58.2: anthropic==0.58.2 + anthropic-v0.31.2: anthropic==0.31.2 + anthropic-v0.46.0: anthropic==0.46.0 + anthropic-v0.60.0: anthropic==0.60.0 anthropic: pytest-asyncio anthropic-v0.16.0: httpx<0.28.0 - anthropic-v0.30.1: httpx<0.28.0 - anthropic-v0.44.0: httpx<0.28.0 + anthropic-v0.31.2: httpx<0.28.0 + anthropic-v0.46.0: httpx<0.28.0 cohere-v5.4.0: cohere==5.4.0 cohere-v5.9.4: cohere==5.9.4 @@ -510,13 +510,14 @@ deps = openai_agents-v0.0.19: openai-agents==0.0.19 openai_agents-v0.1.0: openai-agents==0.1.0 - openai_agents-v0.2.3: openai-agents==0.2.3 + openai_agents-v0.2.4: openai-agents==0.2.4 openai_agents: pytest-asyncio huggingface_hub-v0.22.2: huggingface_hub==0.22.2 huggingface_hub-v0.26.5: huggingface_hub==0.26.5 huggingface_hub-v0.30.2: huggingface_hub==0.30.2 - huggingface_hub-v0.33.4: huggingface_hub==0.33.4 + huggingface_hub-v0.34.3: huggingface_hub==0.34.3 + huggingface_hub-v0.35.0rc0: huggingface_hub==0.35.0rc0 # ~~~ DBs ~~~ @@ -533,7 +534,7 @@ deps = sqlalchemy-v1.3.24: sqlalchemy==1.3.24 sqlalchemy-v1.4.54: sqlalchemy==1.4.54 - sqlalchemy-v2.0.41: sqlalchemy==2.0.41 + sqlalchemy-v2.0.42: sqlalchemy==2.0.42 # ~~~ Flags ~~~ @@ -548,7 +549,7 @@ deps = statsig-v0.55.3: statsig==0.55.3 statsig-v0.57.3: statsig==0.57.3 statsig-v0.59.1: statsig==0.59.1 - statsig-v0.60.0: statsig==0.60.0 + statsig-v0.61.0: statsig==0.61.0 statsig: typing_extensions unleash-v6.0.1: UnleashClient==6.0.1 @@ -592,8 +593,7 @@ deps = grpc-v1.32.0: grpcio==1.32.0 grpc-v1.46.5: grpcio==1.46.5 grpc-v1.60.2: grpcio==1.60.2 - grpc-v1.73.1: grpcio==1.73.1 - grpc-v1.74.0rc1: grpcio==1.74.0rc1 + grpc-v1.74.0: grpcio==1.74.0 grpc: protobuf grpc: mypy-protobuf grpc: types-protobuf @@ -686,10 +686,11 @@ deps = fastapi: pytest-asyncio fastapi: python-multipart fastapi: requests - fastapi: anyio<4 + fastapi: anyio>=3,<5 fastapi-v0.79.1: httpx<0.28.0 fastapi-v0.91.0: httpx<0.28.0 fastapi-v0.103.2: httpx<0.28.0 + fastapi-v0.79.1: anyio<4 py3.6-fastapi: aiocontextvars @@ -697,10 +698,10 @@ deps = aiohttp-v3.4.4: aiohttp==3.4.4 aiohttp-v3.7.4: aiohttp==3.7.4 aiohttp-v3.10.11: aiohttp==3.10.11 - aiohttp-v3.12.14: aiohttp==3.12.14 + aiohttp-v3.12.15: aiohttp==3.12.15 aiohttp: pytest-aiohttp aiohttp-v3.10.11: pytest-asyncio - aiohttp-v3.12.14: pytest-asyncio + aiohttp-v3.12.15: pytest-asyncio bottle-v0.12.25: bottle==0.12.25 bottle-v0.13.4: bottle==0.13.4