diff --git a/src/sentry/spans/consumers/process_segments/factory.py b/src/sentry/spans/consumers/process_segments/factory.py index fcbebaed23c377..76443a4444b55a 100644 --- a/src/sentry/spans/consumers/process_segments/factory.py +++ b/src/sentry/spans/consumers/process_segments/factory.py @@ -1,6 +1,7 @@ import logging from collections.abc import Mapping from datetime import datetime +from functools import partial import orjson from arroyo import Topic as ArroyoTopic @@ -90,7 +91,7 @@ def create_with_partitions( unfold_step = Unfold(generator=_unfold_segment, next_step=produce_step) return run_task_with_multiprocessing( - function=_process_message, + function=partial(_process_message, skip_produce=self.skip_produce), next_step=unfold_step, max_batch_size=self.max_batch_size, max_batch_time=self.max_batch_time, @@ -103,7 +104,9 @@ def shutdown(self) -> None: self.pool.close() -def _process_message(message: Message[KafkaPayload]) -> list[Value[KafkaPayload]]: +def _process_message( + message: Message[KafkaPayload], skip_produce: bool = False +) -> list[Value[KafkaPayload]]: if not options.get("spans.process-segments.consumer.enable"): return [] @@ -112,7 +115,7 @@ def _process_message(message: Message[KafkaPayload]) -> list[Value[KafkaPayload] try: value = message.payload.value segment = orjson.loads(value) - processed = process_segment(segment["spans"]) + processed = process_segment(segment["spans"], skip_produce=skip_produce) return [_serialize_payload(span, message.timestamp) for span in processed] except Exception: logger.exception("segments.invalid-message") diff --git a/src/sentry/spans/consumers/process_segments/message.py b/src/sentry/spans/consumers/process_segments/message.py index 12013794bae9ab..0605b12a5854ef 100644 --- a/src/sentry/spans/consumers/process_segments/message.py +++ b/src/sentry/spans/consumers/process_segments/message.py @@ -39,7 +39,7 @@ @metrics.wraps("spans.consumers.process_segments.process_segment") -def process_segment(unprocessed_spans: list[SegmentSpan]) -> list[Span]: +def process_segment(unprocessed_spans: list[SegmentSpan], skip_produce: bool = False) -> list[Span]: segment_span, spans = _enrich_spans(unprocessed_spans) if segment_span is None: return spans @@ -65,7 +65,10 @@ def process_segment(unprocessed_spans: list[SegmentSpan]) -> list[Span]: _create_models(segment_span, project) _detect_performance_problems(segment_span, spans, project) _record_signals(segment_span, spans, project) - _track_outcomes(segment_span, spans) + + # Only track outcomes if we're actually producing the spans + if not skip_produce: + _track_outcomes(segment_span, spans) return spans diff --git a/tests/sentry/spans/consumers/process_segments/test_factory.py b/tests/sentry/spans/consumers/process_segments/test_factory.py index 130df37b7ff268..986c57148e2088 100644 --- a/tests/sentry/spans/consumers/process_segments/test_factory.py +++ b/tests/sentry/spans/consumers/process_segments/test_factory.py @@ -25,7 +25,8 @@ def build_mock_message(data, topic=None): @override_options({"spans.process-segments.consumer.enable": True}) @mock.patch( - "sentry.spans.consumers.process_segments.factory.process_segment", side_effect=lambda x: x + "sentry.spans.consumers.process_segments.factory.process_segment", + side_effect=lambda x, **kwargs: x, ) def test_segment_deserialized_correctly(mock_process_segment): topic = ArroyoTopic(get_topic_definition(Topic.BUFFERED_SEGMENTS)["real_topic_name"]) diff --git a/tests/sentry/spans/consumers/process_segments/test_message.py b/tests/sentry/spans/consumers/process_segments/test_message.py index 51cea67f706897..fac64f6be835ae 100644 --- a/tests/sentry/spans/consumers/process_segments/test_message.py +++ b/tests/sentry/spans/consumers/process_segments/test_message.py @@ -199,3 +199,20 @@ def repeating_span(): ).hexdigest() ] assert performance_problem.type == PerformanceStreamedSpansGroupTypeExperimental + + @mock.patch("sentry.spans.consumers.process_segments.message.track_outcome") + def test_skip_produce_does_not_track_outcomes(self, mock_track_outcome): + """Test that outcomes are not tracked when skip_produce=True""" + spans = self.generate_basic_spans() + + # Process with skip_produce=True + process_segment(spans, skip_produce=True) + + # Verify track_outcome was not called + mock_track_outcome.assert_not_called() + + # Process with skip_produce=False (default) + process_segment(spans, skip_produce=False) + + # Verify track_outcome was called once + mock_track_outcome.assert_called_once()