Skip to content

Commit 9f31644

Browse files
authored
fix(buffered_segments): Skip producing outcomes if --skip-produce is set (#95922)
Relay currently still produces these outcomes for as long as it writes to snuba-items. Whoever writes to snuba-items should also write to outcomes.
1 parent c90dbd3 commit 9f31644

File tree

4 files changed

+30
-6
lines changed

4 files changed

+30
-6
lines changed

src/sentry/spans/consumers/process_segments/factory.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from collections.abc import Mapping
33
from datetime import datetime
4+
from functools import partial
45

56
import orjson
67
from arroyo import Topic as ArroyoTopic
@@ -90,7 +91,7 @@ def create_with_partitions(
9091
unfold_step = Unfold(generator=_unfold_segment, next_step=produce_step)
9192

9293
return run_task_with_multiprocessing(
93-
function=_process_message,
94+
function=partial(_process_message, skip_produce=self.skip_produce),
9495
next_step=unfold_step,
9596
max_batch_size=self.max_batch_size,
9697
max_batch_time=self.max_batch_time,
@@ -103,7 +104,9 @@ def shutdown(self) -> None:
103104
self.pool.close()
104105

105106

106-
def _process_message(message: Message[KafkaPayload]) -> list[Value[KafkaPayload]]:
107+
def _process_message(
108+
message: Message[KafkaPayload], skip_produce: bool = False
109+
) -> list[Value[KafkaPayload]]:
107110
if not options.get("spans.process-segments.consumer.enable"):
108111
return []
109112

@@ -112,7 +115,7 @@ def _process_message(message: Message[KafkaPayload]) -> list[Value[KafkaPayload]
112115
try:
113116
value = message.payload.value
114117
segment = orjson.loads(value)
115-
processed = process_segment(segment["spans"])
118+
processed = process_segment(segment["spans"], skip_produce=skip_produce)
116119
return [_serialize_payload(span, message.timestamp) for span in processed]
117120
except Exception:
118121
logger.exception("segments.invalid-message")

src/sentry/spans/consumers/process_segments/message.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939

4040

4141
@metrics.wraps("spans.consumers.process_segments.process_segment")
42-
def process_segment(unprocessed_spans: list[SegmentSpan]) -> list[Span]:
42+
def process_segment(unprocessed_spans: list[SegmentSpan], skip_produce: bool = False) -> list[Span]:
4343
segment_span, spans = _enrich_spans(unprocessed_spans)
4444
if segment_span is None:
4545
return spans
@@ -65,7 +65,10 @@ def process_segment(unprocessed_spans: list[SegmentSpan]) -> list[Span]:
6565
_create_models(segment_span, project)
6666
_detect_performance_problems(segment_span, spans, project)
6767
_record_signals(segment_span, spans, project)
68-
_track_outcomes(segment_span, spans)
68+
69+
# Only track outcomes if we're actually producing the spans
70+
if not skip_produce:
71+
_track_outcomes(segment_span, spans)
6972

7073
return spans
7174

tests/sentry/spans/consumers/process_segments/test_factory.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ def build_mock_message(data, topic=None):
2525

2626
@override_options({"spans.process-segments.consumer.enable": True})
2727
@mock.patch(
28-
"sentry.spans.consumers.process_segments.factory.process_segment", side_effect=lambda x: x
28+
"sentry.spans.consumers.process_segments.factory.process_segment",
29+
side_effect=lambda x, **kwargs: x,
2930
)
3031
def test_segment_deserialized_correctly(mock_process_segment):
3132
topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"])

tests/sentry/spans/consumers/process_segments/test_message.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,3 +199,20 @@ def repeating_span():
199199
).hexdigest()
200200
]
201201
assert performance_problem.type == PerformanceStreamedSpansGroupTypeExperimental
202+
203+
@mock.patch("sentry.spans.consumers.process_segments.message.track_outcome")
204+
def test_skip_produce_does_not_track_outcomes(self, mock_track_outcome):
205+
"""Test that outcomes are not tracked when skip_produce=True"""
206+
spans = self.generate_basic_spans()
207+
208+
# Process with skip_produce=True
209+
process_segment(spans, skip_produce=True)
210+
211+
# Verify track_outcome was not called
212+
mock_track_outcome.assert_not_called()
213+
214+
# Process with skip_produce=False (default)
215+
process_segment(spans, skip_produce=False)
216+
217+
# Verify track_outcome was called once
218+
mock_track_outcome.assert_called_once()

0 commit comments

Comments
 (0)