diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index d612028250..f8328cac12 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -28,7 +28,7 @@ from sentry_sdk.consts import EndpointType from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions -from sentry_sdk.worker import BackgroundWorker +from sentry_sdk.worker import BackgroundWorker, Worker from sentry_sdk.envelope import Envelope, Item, PayloadRef from typing import TYPE_CHECKING @@ -173,7 +173,7 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: Transport.__init__(self, options) assert self.parsed_dsn is not None self.options: Dict[str, Any] = options - self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) + self._worker = self._create_worker(options) self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until: Dict[Optional[str], datetime] = {} # We only use this Retry() class for the `get_retry_after` method it exposes @@ -224,6 +224,10 @@ def __init__(self: Self, options: Dict[str, Any]) -> None: elif self._compression_algo == "br": self._compression_level = 4 + def _create_worker(self: Self, options: Dict[str, Any]) -> Worker: + # For now, we only support the threaded sync background worker. + return BackgroundWorker(queue_size=options["transport_queue_size"]) + def record_lost_event( self: Self, reason: str, diff --git a/sentry_sdk/worker.py b/sentry_sdk/worker.py index d911e15623..555539dc3a 100644 --- a/sentry_sdk/worker.py +++ b/sentry_sdk/worker.py @@ -1,4 +1,5 @@ from __future__ import annotations +from abc import ABC, abstractmethod import os import threading @@ -16,7 +17,65 @@ _TERMINATOR = object() -class BackgroundWorker: +class Worker(ABC): + """ + Base class for all workers. + + A worker is used to process events in the background and send them to Sentry. + """ + + @property + @abstractmethod + def is_alive(self) -> bool: + """ + Checks whether the worker is alive and running. + + Returns True if the worker is alive, False otherwise. + """ + pass + + @abstractmethod + def kill(self) -> None: + """ + Kills the worker. + + This method is used to kill the worker. The queue will be drained up to the point where the worker is killed. + The worker will not be able to process any more events. + """ + pass + + def flush( + self, timeout: float, callback: Optional[Callable[[int, float], Any]] = None + ) -> None: + """ + Flush the worker. + + This method blocks until the worker has flushed all events or the specified timeout is reached. + Default implementation is a no-op, since this method may only be relevant to some workers. + Subclasses should override this method if necessary. + """ + return None + + @abstractmethod + def full(self) -> bool: + """ + Checks whether the worker's queue is full. + + Returns True if the queue is full, False otherwise. + """ + pass + + @abstractmethod + def submit(self, callback: Callable[[], Any]) -> bool: + """ + Schedule a callback to be executed by the worker. + + Returns True if the callback was scheduled, False if the queue is full. + """ + pass + + +class BackgroundWorker(Worker): def __init__(self, queue_size: int = DEFAULT_QUEUE_SIZE) -> None: self._queue: Queue = Queue(queue_size) self._lock = threading.Lock() @@ -106,7 +165,7 @@ def _wait_flush(self, timeout: float, callback: Optional[Any]) -> None: pending = self._queue.qsize() + 1 logger.error("flush timed out, dropped %s events", pending) - def submit(self, callback: Callable[[], None]) -> bool: + def submit(self, callback: Callable[[], Any]) -> bool: self._ensure_thread() try: self._queue.put_nowait(callback)