Skip to content

fix(buffered_segments): Skip producing outcomes if --skip-produce is set #95922

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions src/sentry/spans/consumers/process_segments/factory.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from collections.abc import Mapping
from datetime import datetime
from functools import partial

import orjson
from arroyo import Topic as ArroyoTopic
Expand Down Expand Up @@ -90,7 +91,7 @@ def create_with_partitions(
unfold_step = Unfold(generator=_unfold_segment, next_step=produce_step)

return run_task_with_multiprocessing(
function=_process_message,
function=partial(_process_message, skip_produce=self.skip_produce),
next_step=unfold_step,
max_batch_size=self.max_batch_size,
max_batch_time=self.max_batch_time,
Expand All @@ -103,7 +104,9 @@ def shutdown(self) -> None:
self.pool.close()


def _process_message(message: Message[KafkaPayload]) -> list[Value[KafkaPayload]]:
def _process_message(
message: Message[KafkaPayload], skip_produce: bool = False
) -> list[Value[KafkaPayload]]:
if not options.get("spans.process-segments.consumer.enable"):
return []

Expand All @@ -112,7 +115,7 @@ def _process_message(message: Message[KafkaPayload]) -> list[Value[KafkaPayload]
try:
value = message.payload.value
segment = orjson.loads(value)
processed = process_segment(segment["spans"])
processed = process_segment(segment["spans"], skip_produce=skip_produce)
return [_serialize_payload(span, message.timestamp) for span in processed]
except Exception:
logger.exception("segments.invalid-message")
Expand Down
7 changes: 5 additions & 2 deletions src/sentry/spans/consumers/process_segments/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@


@metrics.wraps("spans.consumers.process_segments.process_segment")
def process_segment(unprocessed_spans: list[SegmentSpan]) -> list[Span]:
def process_segment(unprocessed_spans: list[SegmentSpan], skip_produce: bool = False) -> list[Span]:
segment_span, spans = _enrich_spans(unprocessed_spans)
if segment_span is None:
return spans
Expand All @@ -65,7 +65,10 @@ def process_segment(unprocessed_spans: list[SegmentSpan]) -> list[Span]:
_create_models(segment_span, project)
_detect_performance_problems(segment_span, spans, project)
_record_signals(segment_span, spans, project)
_track_outcomes(segment_span, spans)

# Only track outcomes if we're actually producing the spans
if not skip_produce:
_track_outcomes(segment_span, spans)

return spans

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def build_mock_message(data, topic=None):

@override_options({"spans.process-segments.consumer.enable": True})
@mock.patch(
"sentry.spans.consumers.process_segments.factory.process_segment", side_effect=lambda x: x
"sentry.spans.consumers.process_segments.factory.process_segment",
side_effect=lambda x, **kwargs: x,
)
def test_segment_deserialized_correctly(mock_process_segment):
topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"])
Expand Down
17 changes: 17 additions & 0 deletions tests/sentry/spans/consumers/process_segments/test_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,20 @@ def repeating_span():
).hexdigest()
]
assert performance_problem.type == PerformanceStreamedSpansGroupTypeExperimental

@mock.patch("sentry.spans.consumers.process_segments.message.track_outcome")
def test_skip_produce_does_not_track_outcomes(self, mock_track_outcome):
"""Test that outcomes are not tracked when skip_produce=True"""
spans = self.generate_basic_spans()

# Process with skip_produce=True
process_segment(spans, skip_produce=True)

# Verify track_outcome was not called
mock_track_outcome.assert_not_called()

# Process with skip_produce=False (default)
process_segment(spans, skip_produce=False)

# Verify track_outcome was called once
mock_track_outcome.assert_called_once()
Loading