From 99aeb2d5677e5dcd7d3143e04a2aa88be6370e55 Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Fri, 5 Sep 2025 16:19:20 -0700 Subject: [PATCH 1/2] fix(aci): Ensure timeouts reported in actions are grouped --- src/sentry/workflow_engine/tasks/actions.py | 7 +- src/sentry/workflow_engine/utils/__init__.py | 2 + .../workflow_engine/utils/timeout_grouping.py | 41 ++++++ .../utils/test_timeout_grouping.py | 118 ++++++++++++++++++ 4 files changed, 166 insertions(+), 2 deletions(-) create mode 100644 src/sentry/workflow_engine/utils/timeout_grouping.py create mode 100644 tests/sentry/workflow_engine/utils/test_timeout_grouping.py diff --git a/src/sentry/workflow_engine/tasks/actions.py b/src/sentry/workflow_engine/tasks/actions.py index 6bfdc3fc353dd1..6155737fb67363 100644 --- a/src/sentry/workflow_engine/tasks/actions.py +++ b/src/sentry/workflow_engine/tasks/actions.py @@ -16,7 +16,7 @@ build_workflow_event_data_from_event, ) from sentry.workflow_engine.types import WorkflowEventData -from sentry.workflow_engine.utils import log_context +from sentry.workflow_engine.utils import log_context, timeout_grouping_context logger = log_context.get_logger(__name__) @@ -137,7 +137,10 @@ def trigger_action( ) if should_trigger_actions: - action.trigger(event_data, detector) + # Set up a timeout grouping context because we want to make sure any Sentry timeout reporting + # in this scope is grouped properly. + with timeout_grouping_context(action.type): + action.trigger(event_data, detector) else: logger.info( "workflow_engine.triggered_actions.dry-run", diff --git a/src/sentry/workflow_engine/utils/__init__.py b/src/sentry/workflow_engine/utils/__init__.py index 53625d8a162d31..9b41fafa1b76ce 100644 --- a/src/sentry/workflow_engine/utils/__init__.py +++ b/src/sentry/workflow_engine/utils/__init__.py @@ -1,6 +1,8 @@ __all__ = [ "metrics_incr", "MetricTags", + "timeout_grouping_context", ] from .metrics import MetricTags, metrics_incr +from .timeout_grouping import timeout_grouping_context diff --git a/src/sentry/workflow_engine/utils/timeout_grouping.py b/src/sentry/workflow_engine/utils/timeout_grouping.py new file mode 100644 index 00000000000000..f9aac55ae6b455 --- /dev/null +++ b/src/sentry/workflow_engine/utils/timeout_grouping.py @@ -0,0 +1,41 @@ +import logging +from collections.abc import Generator +from contextlib import contextmanager + +import sentry_sdk + +from sentry.taskworker.state import current_task +from sentry.taskworker.workerchild import ProcessingDeadlineExceeded + +logger = logging.getLogger(__name__) + + +@contextmanager +def timeout_grouping_context(*refinements: str) -> Generator[None]: + """ + Context manager that ensures that ProcessingDeadlineExceeded errors are grouped together on the task level. + Grouping based on specific stacktrace is usually inappropriate because once we've past the deadline, any + subsequent line of code executed may be running when it is raised. + Defaulting to grouping by task is more accurate, and where there's a need to subdivide that, we + offer the ability to refine. + """ + task_state = current_task() + if task_state: + + def process_error(event, exc_info): + exc = exc_info[1] + if isinstance(exc, ProcessingDeadlineExceeded): + event["fingerprint"] = [ + "task.processing_deadline_exceeded", + task_state.namespace, + task_state.taskname, + *refinements, + ] + return event + + with sentry_sdk.new_scope() as scope: + scope.add_error_processor(process_error) + yield + else: + logger.info("No task state found in timeout_grouping_context") + yield diff --git a/tests/sentry/workflow_engine/utils/test_timeout_grouping.py b/tests/sentry/workflow_engine/utils/test_timeout_grouping.py new file mode 100644 index 00000000000000..523fd54947bbdc --- /dev/null +++ b/tests/sentry/workflow_engine/utils/test_timeout_grouping.py @@ -0,0 +1,118 @@ +from unittest.mock import Mock, patch + +from sentry.taskworker.state import CurrentTaskState +from sentry.taskworker.workerchild import ProcessingDeadlineExceeded +from sentry.workflow_engine.utils.timeout_grouping import timeout_grouping_context + + +class TestTimeoutGroupingContext: + def test_with_task_state_and_processing_deadline_exceeded(self): + mock_task_state = CurrentTaskState( + id="test_id", + namespace="test_namespace", + taskname="test_task", + attempt=1, + processing_deadline_duration=30, + retries_remaining=True, + ) + + with patch( + "sentry.workflow_engine.utils.timeout_grouping.current_task", + return_value=mock_task_state, + ): + with patch("sentry_sdk.new_scope") as mock_scope: + mock_scope_instance = Mock() + mock_scope.return_value.__enter__ = Mock(return_value=mock_scope_instance) + mock_scope.return_value.__exit__ = Mock(return_value=None) + + # Capture the error processor function + captured_processor = None + + def capture_processor(processor): + nonlocal captured_processor + captured_processor = processor + + mock_scope_instance.add_error_processor = capture_processor + + with timeout_grouping_context("refinement1", "refinement2"): + pass + + # Test the processor function + assert captured_processor is not None + + # Create a mock event and exception info + event = {} + exc = ProcessingDeadlineExceeded("Test timeout") + exc_info = (ProcessingDeadlineExceeded, exc, None) + + # Process the event + result = captured_processor(event, exc_info) + + # Verify the fingerprint was set correctly + assert result["fingerprint"] == [ + "task.processing_deadline_exceeded", + "test_namespace", + "test_task", + "refinement1", + "refinement2", + ] + + def test_with_task_state_and_non_processing_deadline_exceeded(self): + mock_task_state = CurrentTaskState( + id="test_id", + namespace="test_namespace", + taskname="test_task", + attempt=1, + processing_deadline_duration=30, + retries_remaining=True, + ) + + with patch( + "sentry.workflow_engine.utils.timeout_grouping.current_task", + return_value=mock_task_state, + ): + with patch("sentry_sdk.new_scope") as mock_scope: + mock_scope_instance = Mock() + mock_scope.return_value.__enter__ = Mock(return_value=mock_scope_instance) + mock_scope.return_value.__exit__ = Mock(return_value=None) + + captured_processor = None + + def capture_processor(processor): + nonlocal captured_processor + captured_processor = processor + + mock_scope_instance.add_error_processor = capture_processor + + with timeout_grouping_context(): + pass + + # Test the processor function with a different exception + event = {"original": "data"} + exc = ValueError("Some other error") + exc_info = (ValueError, exc, None) + + result = captured_processor(event, exc_info) + + # Event should be unchanged + assert result == {"original": "data"} + assert "fingerprint" not in result + + def test_without_task_state(self): + with patch("sentry.workflow_engine.utils.timeout_grouping.current_task", return_value=None): + with patch("sentry.workflow_engine.utils.timeout_grouping.logger") as mock_logger: + with timeout_grouping_context(): + pass + + # Should log that no task state was found + mock_logger.info.assert_called_once_with( + "No task state found in timeout_grouping_context" + ) + + def test_context_manager_yields_correctly(self): + executed = False + with patch("sentry.workflow_engine.utils.timeout_grouping.current_task", return_value=None): + with timeout_grouping_context(): + executed = True + + assert executed is True From 88e199f8ded49cb063850cf0a4343cc56c9b80dc Mon Sep 17 00:00:00 2001 From: Kyle Consalus Date: Thu, 11 Sep 2025 09:08:59 -0700 Subject: [PATCH 2/2] Appease mypy --- .../workflow_engine/utils/timeout_grouping.py | 8 +++++++- .../utils/test_timeout_grouping.py | 17 ++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/src/sentry/workflow_engine/utils/timeout_grouping.py b/src/sentry/workflow_engine/utils/timeout_grouping.py index f9aac55ae6b455..5ca434d438e41d 100644 --- a/src/sentry/workflow_engine/utils/timeout_grouping.py +++ b/src/sentry/workflow_engine/utils/timeout_grouping.py @@ -1,6 +1,7 @@ import logging from collections.abc import Generator from contextlib import contextmanager +from typing import Any import sentry_sdk @@ -10,6 +11,11 @@ logger = logging.getLogger(__name__) +# sentry_sdk doesn't export these. +_Event = Any +_ExcInfo = Any + + @contextmanager def timeout_grouping_context(*refinements: str) -> Generator[None]: """ @@ -22,7 +28,7 @@ def timeout_grouping_context(*refinements: str) -> Generator[None]: task_state = current_task() if task_state: - def process_error(event, exc_info): + def process_error(event: _Event, exc_info: _ExcInfo) -> _Event | None: exc = exc_info[1] if isinstance(exc, ProcessingDeadlineExceeded): event["fingerprint"] = [ diff --git a/tests/sentry/workflow_engine/utils/test_timeout_grouping.py b/tests/sentry/workflow_engine/utils/test_timeout_grouping.py index 523fd54947bbdc..3ed16893806cc5 100644 --- a/tests/sentry/workflow_engine/utils/test_timeout_grouping.py +++ b/tests/sentry/workflow_engine/utils/test_timeout_grouping.py @@ -1,3 +1,4 @@ +from typing import Any from unittest.mock import Mock, patch from sentry.taskworker.state import CurrentTaskState @@ -6,7 +7,7 @@ class TestTimeoutGroupingContext: - def test_with_task_state_and_processing_deadline_exceeded(self): + def test_with_task_state_and_processing_deadline_exceeded(self) -> None: mock_task_state = CurrentTaskState( id="test_id", namespace="test_namespace", @@ -28,7 +29,7 @@ def test_with_task_state_and_processing_deadline_exceeded(self): # Capture the error processor function captured_processor = None - def capture_processor(processor): + def capture_processor(processor: Any) -> None: nonlocal captured_processor captured_processor = processor @@ -41,7 +42,7 @@ def capture_processor(processor): assert captured_processor is not None # Create a mock event and exception info - event = {} + event: Any = {} exc = ProcessingDeadlineExceeded("Test timeout") exc_info = (ProcessingDeadlineExceeded, exc, None) @@ -57,7 +58,7 @@ def capture_processor(processor): "refinement2", ] - def test_with_task_state_and_non_processing_deadline_exceeded(self): + def test_with_task_state_and_non_processing_deadline_exceeded(self) -> None: mock_task_state = CurrentTaskState( id="test_id", namespace="test_namespace", @@ -78,7 +79,7 @@ def test_with_task_state_and_non_processing_deadline_exceeded(self): captured_processor = None - def capture_processor(processor): + def capture_processor(processor: Any) -> None: nonlocal captured_processor captured_processor = processor @@ -87,6 +88,8 @@ def capture_processor(processor): with timeout_grouping_context(): pass + assert captured_processor is not None + # Test the processor function with a different exception event = {"original": "data"} exc = ValueError("Some other error") @@ -98,7 +101,7 @@ def capture_processor(processor): assert result == {"original": "data"} assert "fingerprint" not in result - def test_without_task_state(self): + def test_without_task_state(self) -> None: with patch("sentry.workflow_engine.utils.timeout_grouping.current_task", return_value=None): with patch("sentry.workflow_engine.utils.timeout_grouping.logger") as mock_logger: with timeout_grouping_context(): @@ -109,7 +112,7 @@ def test_without_task_state(self): "No task state found in timeout_grouping_context" ) - def test_context_manager_yields_correctly(self): + def test_context_manager_yields_correctly(self) -> None: executed = False with patch("sentry.workflow_engine.utils.timeout_grouping.current_task", return_value=None): with timeout_grouping_context():