-
Notifications
You must be signed in to change notification settings - Fork 556
Add async transport #4614
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: srothh/async-task-worker
Are you sure you want to change the base?
Add async transport #4614
Changes from all commits
4a58ce7
cbecde7
c8bb55a
05a7de7
38246d0
8b226cb
823215e
4eed4fd
afd494d
fcc7ac3
f659514
8c542ce
30dde67
3392e0e
6c85500
ae5a864
9c537e6
f7554b2
6cb72ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,7 @@ | |
import socket | ||
import ssl | ||
import time | ||
import asyncio | ||
from datetime import datetime, timedelta, timezone | ||
from collections import defaultdict | ||
from urllib.request import getproxies | ||
|
@@ -17,18 +18,27 @@ | |
|
||
try: | ||
import httpcore | ||
except ImportError: | ||
httpcore = None # type: ignore | ||
|
||
try: | ||
import h2 # noqa: F401 | ||
|
||
HTTP2_ENABLED = True | ||
HTTP2_ENABLED = httpcore is not None | ||
except ImportError: | ||
HTTP2_ENABLED = False | ||
|
||
try: | ||
ASYNC_TRANSPORT_ENABLED = httpcore is not None | ||
except ImportError: | ||
ASYNC_TRANSPORT_ENABLED = False | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Unnecessary ImportError HandlingThe Locations (1) |
||
|
||
import urllib3 | ||
import certifi | ||
|
||
from sentry_sdk.consts import EndpointType | ||
from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions | ||
from sentry_sdk.worker import BackgroundWorker, Worker | ||
from sentry_sdk.worker import BackgroundWorker, Worker, AsyncWorker | ||
from sentry_sdk.envelope import Envelope, Item, PayloadRef | ||
|
||
from typing import TYPE_CHECKING | ||
|
@@ -224,9 +234,18 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: | |
elif self._compression_algo == "br": | ||
self._compression_level = 4 | ||
|
||
def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: | ||
# For now, we only support the threaded sync background worker. | ||
return BackgroundWorker(queue_size=options["transport_queue_size"]) | ||
def _create_worker(self, options: dict[str, Any]) -> Worker: | ||
async_enabled = options.get("_experiments", {}).get("transport_async", False) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
try: | ||
asyncio.get_running_loop() | ||
worker_cls = ( | ||
AsyncWorker | ||
if async_enabled and ASYNC_TRANSPORT_ENABLED | ||
else BackgroundWorker | ||
) | ||
except RuntimeError: | ||
worker_cls = BackgroundWorker | ||
return worker_cls(queue_size=options["transport_queue_size"]) | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def record_lost_event( | ||
self: Self, | ||
|
@@ -571,6 +590,240 @@ def flush( | |
self._worker.flush(timeout, callback) | ||
|
||
|
||
if not ASYNC_TRANSPORT_ENABLED: | ||
# Sorry, no AsyncHttpTransport for you | ||
class AsyncHttpTransport(BaseHttpTransport): | ||
def __init__(self: Self, options: Dict[str, Any]) -> None: | ||
super().__init__(options) | ||
logger.warning( | ||
"You tried to use AsyncHttpTransport but don't have httpcore[asyncio] installed. Falling back to sync transport." | ||
) | ||
|
||
else: | ||
|
||
class AsyncHttpTransport(HttpTransportCore): # type: ignore | ||
def __init__(self: Self, options: Dict[str, Any]) -> None: | ||
super().__init__(options) | ||
# Requires event loop at init time | ||
self.loop = asyncio.get_running_loop() | ||
self.background_tasks: set[asyncio.Task[None]] = set() | ||
|
||
def _get_header_value(self: Self, response: Any, header: str) -> Optional[str]: | ||
return next( | ||
( | ||
val.decode("ascii") | ||
for key, val in response.headers | ||
if key.decode("ascii").lower() == header | ||
), | ||
None, | ||
) | ||
|
||
async def _send_envelope(self: Self, envelope: Envelope) -> None: | ||
_prepared_envelope = self._prepare_envelope(envelope) | ||
if _prepared_envelope is not None: | ||
envelope, body, headers = _prepared_envelope | ||
await self._send_request( | ||
body.getvalue(), | ||
headers=headers, | ||
endpoint_type=EndpointType.ENVELOPE, | ||
envelope=envelope, | ||
) | ||
return None | ||
|
||
async def _send_request( | ||
self: Self, | ||
body: bytes, | ||
headers: Dict[str, str], | ||
endpoint_type: EndpointType, | ||
envelope: Optional[Envelope], | ||
) -> None: | ||
self._update_headers(headers) | ||
try: | ||
response = await self._request( | ||
"POST", | ||
endpoint_type, | ||
body, | ||
headers, | ||
) | ||
except Exception: | ||
self._handle_request_error(envelope=envelope, loss_reason="network") | ||
raise | ||
try: | ||
self._handle_response(response=response, envelope=envelope) | ||
finally: | ||
await response.aclose() | ||
|
||
async def _request( # type: ignore[override] | ||
self: Self, | ||
method: str, | ||
endpoint_type: EndpointType, | ||
body: Any, | ||
headers: Mapping[str, str], | ||
) -> httpcore.Response: | ||
return await self._pool.request( | ||
method, | ||
self._auth.get_api_url(endpoint_type), | ||
content=body, | ||
headers=headers, # type: ignore | ||
extensions={ | ||
"timeout": { | ||
"pool": self.TIMEOUT, | ||
"connect": self.TIMEOUT, | ||
"write": self.TIMEOUT, | ||
"read": self.TIMEOUT, | ||
} | ||
}, | ||
) | ||
|
||
async def _flush_client_reports(self: Self, force: bool = False) -> None: | ||
client_report = self._fetch_pending_client_report(force=force, interval=60) | ||
if client_report is not None: | ||
self.capture_envelope(Envelope(items=[client_report])) | ||
|
||
async def _capture_envelope(self: Self, envelope: Envelope) -> None: | ||
async def send_envelope_wrapper() -> None: | ||
with capture_internal_exceptions(): | ||
await self._send_envelope(envelope) | ||
await self._flush_client_reports() | ||
|
||
if not self._worker.submit(send_envelope_wrapper): | ||
self.on_dropped_event("full_queue") | ||
for item in envelope.items: | ||
self.record_lost_event("queue_overflow", item=item) | ||
|
||
def capture_envelope(self: Self, envelope: Envelope) -> None: | ||
# Synchronous entry point | ||
try: | ||
asyncio.get_running_loop() | ||
# We are on the main thread running the event loop | ||
task = asyncio.create_task(self._capture_envelope(envelope)) | ||
self.background_tasks.add(task) | ||
task.add_done_callback(self.background_tasks.discard) | ||
except RuntimeError: | ||
# We are in a background thread, not running an event loop, | ||
# have to launch the task on the loop in a threadsafe way. | ||
if self.loop and self.loop.is_running(): | ||
asyncio.run_coroutine_threadsafe( | ||
self._capture_envelope(envelope), | ||
self.loop, | ||
) | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
# The event loop is no longer running | ||
logger.warning("Async Transport is not running in an event loop.") | ||
self.on_dropped_event("internal_sdk_error") | ||
for item in envelope.items: | ||
self.record_lost_event("internal_sdk_error", item=item) | ||
|
||
def flush( # type: ignore[override] | ||
self: Self, | ||
timeout: float, | ||
callback: Optional[Callable[[int, float], None]] = None, | ||
) -> Optional[asyncio.Task[None]]: | ||
logger.debug("Flushing HTTP transport") | ||
|
||
if timeout > 0: | ||
self._worker.submit(lambda: self._flush_client_reports(force=True)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: Async Method Not Awaited in Synchronous ContextThe Locations (1) |
||
return self._worker.flush(timeout, callback) # type: ignore[func-returns-value] | ||
return None | ||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def _get_pool_options(self: Self) -> Dict[str, Any]: | ||
options: Dict[str, Any] = { | ||
"http2": False, # no HTTP2 for now | ||
"retries": 3, | ||
} | ||
|
||
socket_options = ( | ||
self.options["socket_options"] | ||
if self.options["socket_options"] is not None | ||
else [] | ||
) | ||
|
||
used_options = {(o[0], o[1]) for o in socket_options} | ||
for default_option in KEEP_ALIVE_SOCKET_OPTIONS: | ||
if (default_option[0], default_option[1]) not in used_options: | ||
socket_options.append(default_option) | ||
|
||
options["socket_options"] = socket_options | ||
|
||
ssl_context = ssl.create_default_context() | ||
ssl_context.load_verify_locations( | ||
self.options["ca_certs"] # User-provided bundle from the SDK init | ||
or os.environ.get("SSL_CERT_FILE") | ||
or os.environ.get("REQUESTS_CA_BUNDLE") | ||
or certifi.where() | ||
) | ||
cert_file = self.options["cert_file"] or os.environ.get("CLIENT_CERT_FILE") | ||
key_file = self.options["key_file"] or os.environ.get("CLIENT_KEY_FILE") | ||
if cert_file is not None: | ||
ssl_context.load_cert_chain(cert_file, key_file) | ||
|
||
options["ssl_context"] = ssl_context | ||
|
||
return options | ||
|
||
def _make_pool( | ||
self: Self, | ||
) -> Union[ | ||
httpcore.AsyncSOCKSProxy, | ||
httpcore.AsyncHTTPProxy, | ||
httpcore.AsyncConnectionPool, | ||
]: | ||
if self.parsed_dsn is None: | ||
raise ValueError("Cannot create HTTP-based transport without valid DSN") | ||
proxy = None | ||
no_proxy = self._in_no_proxy(self.parsed_dsn) | ||
|
||
# try HTTPS first | ||
https_proxy = self.options["https_proxy"] | ||
if self.parsed_dsn.scheme == "https" and (https_proxy != ""): | ||
proxy = https_proxy or (not no_proxy and getproxies().get("https")) | ||
|
||
# maybe fallback to HTTP proxy | ||
http_proxy = self.options["http_proxy"] | ||
if not proxy and (http_proxy != ""): | ||
proxy = http_proxy or (not no_proxy and getproxies().get("http")) | ||
|
||
opts = self._get_pool_options() | ||
|
||
if proxy: | ||
proxy_headers = self.options["proxy_headers"] | ||
if proxy_headers: | ||
opts["proxy_headers"] = proxy_headers | ||
|
||
if proxy.startswith("socks"): | ||
try: | ||
if "socket_options" in opts: | ||
socket_options = opts.pop("socket_options") | ||
if socket_options: | ||
logger.warning( | ||
"You have defined socket_options but using a SOCKS proxy which doesn't support these. We'll ignore socket_options." | ||
) | ||
return httpcore.AsyncSOCKSProxy(proxy_url=proxy, **opts) | ||
except RuntimeError: | ||
logger.warning( | ||
"You have configured a SOCKS proxy (%s) but support for SOCKS proxies is not installed. Disabling proxy support.", | ||
proxy, | ||
) | ||
else: | ||
return httpcore.AsyncHTTPProxy(proxy_url=proxy, **opts) | ||
|
||
return httpcore.AsyncConnectionPool(**opts) | ||
|
||
def kill(self: Self) -> Optional[asyncio.Task[None]]: # type: ignore | ||
|
||
logger.debug("Killing HTTP transport") | ||
self._worker.kill() | ||
for task in self.background_tasks: | ||
task.cancel() | ||
self.background_tasks.clear() | ||
try: | ||
# Return the pool cleanup task so caller can await it if needed | ||
return self.loop.create_task(self._pool.aclose()) # type: ignore | ||
except RuntimeError: | ||
logger.warning("Event loop not running, aborting kill.") | ||
return None | ||
|
||
antonpirker marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Bug: AsyncHttpTransport Initialization and Method IssuesThe
Locations (1) |
||
|
||
class HttpTransport(BaseHttpTransport): | ||
if TYPE_CHECKING: | ||
_pool: Union[PoolManager, ProxyManager] | ||
|
@@ -816,11 +1069,18 @@ def make_transport(options: Dict[str, Any]) -> Optional[Transport]: | |
ref_transport = options["transport"] | ||
|
||
use_http2_transport = options.get("_experiments", {}).get("transport_http2", False) | ||
|
||
use_async_transport = options.get("_experiments", {}).get("transport_async", False) | ||
# By default, we use the http transport class | ||
transport_cls: Type[Transport] = ( | ||
Http2Transport if use_http2_transport else HttpTransport | ||
) | ||
if use_async_transport: | ||
try: | ||
asyncio.get_running_loop() | ||
transport_cls: Type[Transport] = AsyncHttpTransport | ||
except RuntimeError: | ||
# No event loop running, fall back to sync transport | ||
logger.warning("No event loop running, falling back to sync transport.") | ||
transport_cls = Http2Transport if use_http2_transport else HttpTransport | ||
else: | ||
transport_cls = Http2Transport if use_http2_transport else HttpTransport | ||
|
||
if isinstance(ref_transport, Transport): | ||
return ref_transport | ||
|
Uh oh!
There was an error while loading. Please reload this page.