Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/sentry/workflow_engine/tasks/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
build_workflow_event_data_from_event,
)
from sentry.workflow_engine.types import WorkflowEventData
from sentry.workflow_engine.utils import log_context
from sentry.workflow_engine.utils import log_context, timeout_grouping_context

logger = log_context.get_logger(__name__)

Expand Down Expand Up @@ -137,7 +137,10 @@ def trigger_action(
)

if should_trigger_actions:
action.trigger(event_data, detector)
# Set up a timeout grouping context because we want to make sure any Sentry timeout reporting
# in this scope is grouped properly.
with timeout_grouping_context(action.type):
action.trigger(event_data, detector)
else:
logger.info(
"workflow_engine.triggered_actions.dry-run",
Expand Down
2 changes: 2 additions & 0 deletions src/sentry/workflow_engine/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
__all__ = [
"metrics_incr",
"MetricTags",
"timeout_grouping_context",
]

from .metrics import MetricTags, metrics_incr
from .timeout_grouping import timeout_grouping_context
47 changes: 47 additions & 0 deletions src/sentry/workflow_engine/utils/timeout_grouping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import logging
from collections.abc import Generator
from contextlib import contextmanager
from typing import Any

import sentry_sdk

from sentry.taskworker.state import current_task
from sentry.taskworker.workerchild import ProcessingDeadlineExceeded

logger = logging.getLogger(__name__)


# sentry_sdk doesn't export these.
_Event = Any
_ExcInfo = Any


@contextmanager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i just went on a deep dive learning about these, neat!

def timeout_grouping_context(*refinements: str) -> Generator[None]:
"""
Context manager that ensures that ProcessingDeadlineExceeded errors are grouped together on the task level.
Grouping based on specific stacktrace is usually inappropriate because once we've past the deadline, any
subsequent line of code executed may be running when it is raised.
Defaulting to grouping by task is more accurate, and where there's a need to subdivide that, we
offer the ability to refine.
"""
task_state = current_task()
if task_state:

def process_error(event: _Event, exc_info: _ExcInfo) -> _Event | None:
exc = exc_info[1]
if isinstance(exc, ProcessingDeadlineExceeded):
event["fingerprint"] = [
"task.processing_deadline_exceeded",
task_state.namespace,
task_state.taskname,
*refinements,
]
return event

with sentry_sdk.new_scope() as scope:
scope.add_error_processor(process_error)
yield
else:
logger.info("No task state found in timeout_grouping_context")
yield
121 changes: 121 additions & 0 deletions tests/sentry/workflow_engine/utils/test_timeout_grouping.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from typing import Any
from unittest.mock import Mock, patch

from sentry.taskworker.state import CurrentTaskState
from sentry.taskworker.workerchild import ProcessingDeadlineExceeded
from sentry.workflow_engine.utils.timeout_grouping import timeout_grouping_context


class TestTimeoutGroupingContext:
def test_with_task_state_and_processing_deadline_exceeded(self) -> None:
mock_task_state = CurrentTaskState(
id="test_id",
namespace="test_namespace",
taskname="test_task",
attempt=1,
processing_deadline_duration=30,
retries_remaining=True,
)

with patch(
"sentry.workflow_engine.utils.timeout_grouping.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
mock_scope_instance = Mock()
mock_scope.return_value.__enter__ = Mock(return_value=mock_scope_instance)
mock_scope.return_value.__exit__ = Mock(return_value=None)

# Capture the error processor function
captured_processor = None

def capture_processor(processor: Any) -> None:
nonlocal captured_processor
captured_processor = processor

mock_scope_instance.add_error_processor = capture_processor

with timeout_grouping_context("refinement1", "refinement2"):
pass

# Test the processor function
assert captured_processor is not None

# Create a mock event and exception info
event: Any = {}
exc = ProcessingDeadlineExceeded("Test timeout")
exc_info = (ProcessingDeadlineExceeded, exc, None)

# Process the event
result = captured_processor(event, exc_info)

# Verify the fingerprint was set correctly
assert result["fingerprint"] == [
"task.processing_deadline_exceeded",
"test_namespace",
"test_task",
"refinement1",
"refinement2",
]

def test_with_task_state_and_non_processing_deadline_exceeded(self) -> None:
mock_task_state = CurrentTaskState(
id="test_id",
namespace="test_namespace",
taskname="test_task",
attempt=1,
processing_deadline_duration=30,
retries_remaining=True,
)

with patch(
"sentry.workflow_engine.utils.timeout_grouping.current_task",
return_value=mock_task_state,
):
with patch("sentry_sdk.new_scope") as mock_scope:
mock_scope_instance = Mock()
mock_scope.return_value.__enter__ = Mock(return_value=mock_scope_instance)
mock_scope.return_value.__exit__ = Mock(return_value=None)

captured_processor = None

def capture_processor(processor: Any) -> None:
nonlocal captured_processor
captured_processor = processor

mock_scope_instance.add_error_processor = capture_processor

with timeout_grouping_context():
pass

assert captured_processor is not None

# Test the processor function with a different exception
event = {"original": "data"}
exc = ValueError("Some other error")
exc_info = (ValueError, exc, None)

result = captured_processor(event, exc_info)

# Event should be unchanged
assert result == {"original": "data"}
assert "fingerprint" not in result

def test_without_task_state(self) -> None:
with patch("sentry.workflow_engine.utils.timeout_grouping.current_task", return_value=None):
with patch("sentry.workflow_engine.utils.timeout_grouping.logger") as mock_logger:
with timeout_grouping_context():
pass

# Should log that no task state was found
mock_logger.info.assert_called_once_with(
"No task state found in timeout_grouping_context"
)

def test_context_manager_yields_correctly(self) -> None:
executed = False
with patch("sentry.workflow_engine.utils.timeout_grouping.current_task", return_value=None):
with timeout_grouping_context():
executed = True

assert executed is True
Loading