Skip to content

Commit 9e7fa0c

Browse files
committed
Fix missing original karafka trace after iterating
1 parent 8d1022e commit 9e7fa0c

File tree

2 files changed

+110
-4
lines changed

2 files changed

+110
-4
lines changed

lib/datadog/tracing/contrib/karafka/patcher.rb

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,24 +24,39 @@ def propagation
2424
# (e.g. `my_batch_operation messages.payloads`)
2525
# @see https://github.com/karafka/karafka/blob/b06d1f7c17818e1605f80c2bb573454a33376b40/README.md?plain=1#L29-L35
2626
def each(&block)
27+
parent_span = Datadog::Tracing.active_span
28+
parent_trace_digest = Datadog::Tracing.active_trace&.to_digest
29+
2730
@messages_array.each do |message|
28-
if configuration[:distributed_tracing]
31+
trace_digest = if configuration[:distributed_tracing]
2932
headers = if message.metadata.respond_to?(:raw_headers)
3033
message.metadata.raw_headers
3134
else
3235
message.metadata.headers
3336
end
34-
trace_digest = Karafka.extract(headers)
35-
Datadog::Tracing.continue_trace!(trace_digest) if trace_digest
37+
Karafka.extract(headers)
3638
end
3739

38-
Tracing.trace(Ext::SPAN_MESSAGE_CONSUME) do |span|
40+
Tracing.trace(Ext::SPAN_MESSAGE_CONSUME, continue_from: trace_digest) do |span, trace|
3941
span.set_tag(Ext::TAG_OFFSET, message.metadata.offset)
4042
span.set_tag(Contrib::Ext::Messaging::TAG_DESTINATION, message.topic)
4143
span.set_tag(Contrib::Ext::Messaging::TAG_SYSTEM, Ext::TAG_SYSTEM)
4244

4345
span.resource = message.topic
4446

47+
# link the outer trace (where the messages batch was consumed)
48+
# with the individual message's processing trace, so they're easier to
49+
# correlate in the Datadog UI
50+
if parent_span && span.parent_id != parent_span.id
51+
# add a link from the parent trace to the message span
52+
span_link = Tracing::SpanLink.new(parent_trace_digest)
53+
span.links << span_link
54+
55+
# add a link from the current trace to the parent span
56+
span_link = Tracing::SpanLink.new(trace.to_digest)
57+
parent_span.links << span_link
58+
end
59+
4560
yield message
4661
end
4762
end

spec/datadog/tracing/contrib/karafka/patcher_spec.rb

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,97 @@
5353
expect(span).to_not have_error
5454
expect(span.resource).to eq 'topic_a'
5555
end
56+
57+
context 'when the message has tracing headers' do
58+
let(:message) do
59+
headers = {}
60+
producer_trace = nil
61+
producer_span = nil
62+
Datadog::Tracing.trace('producer') do |span, trace|
63+
producer_span = span
64+
producer_trace = trace
65+
Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers)
66+
end
67+
metadata = ::Karafka::Messages::Metadata.new
68+
metadata['offset'] = 412
69+
metadata[headers_accessor] = headers
70+
raw_payload = rand.to_s
71+
72+
message = ::Karafka::Messages::Message.new(raw_payload, metadata)
73+
allow(message).to receive(:timestamp).and_return(Time.now)
74+
allow(message).to receive(:topic).and_return('topic_a')
75+
message
76+
end
77+
let(:headers_accessor) do
78+
::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers'
79+
end
80+
81+
context 'when distributed tracing is enabled' do
82+
it 'continues the span that produced the message' do
83+
producer_trace_digest = Datadog::Tracing::Contrib::Karafka.extract(message.metadata[headers_accessor])
84+
85+
consumer_span = nil
86+
consumer_trace = nil
87+
88+
Datadog::Tracing.trace('consumer') do
89+
consumer_span = Datadog::Tracing.active_span
90+
consumer_trace = Datadog::Tracing.active_trace
91+
92+
topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
93+
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
94+
expect(messages).to all(be_a(::Karafka::Messages::Message))
95+
96+
# assert that the current trace re-set to the original trace after iterating the messages
97+
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
98+
expect(Datadog::Tracing.active_span).to eq(consumer_span)
99+
end
100+
101+
expect(spans).to have(3).items
102+
103+
# assert that the message span is a continuation of the producer span
104+
expect(span.parent_id).to eq producer_trace_digest.span_id
105+
expect(span.trace_id).to eq producer_trace_digest.trace_id
106+
107+
expect(span.links.map { |l| [l.trace_id, l.span_id] }).to contain_exactly(
108+
[consumer_trace.id, consumer_span.id]
109+
)
110+
expect(consumer_span.links.map { |l| [l.trace_id, l.span_id] }).to contain_exactly(
111+
[span.trace_id, span.id]
112+
)
113+
end
114+
end
115+
116+
context 'when distributed tracing is not enabled' do
117+
let(:configuration_options) { { distributed_tracing: false } }
118+
119+
it 'does not continue the span that produced the message' do
120+
consumer_span = nil
121+
consumer_trace = nil
122+
123+
Datadog::Tracing.trace('consumer') do
124+
consumer_span = Datadog::Tracing.active_span
125+
consumer_trace = Datadog::Tracing.active_trace
126+
127+
topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
128+
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
129+
expect(messages).to all(be_a(::Karafka::Messages::Message))
130+
131+
# assert that the current trace re-set to the original trace after iterating the messages
132+
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
133+
expect(Datadog::Tracing.active_span).to eq(consumer_span)
134+
end
135+
136+
expect(spans).to have(3).items
137+
138+
# assert that the message span is not continuation of the producer span
139+
expect(span.parent_id).to eq(consumer_span.id)
140+
expect(span.trace_id).to eq(consumer_trace.id)
141+
142+
expect(span.links).to be_empty
143+
expect(consumer_span.links).to be_empty
144+
end
145+
end
146+
end
56147
end
57148

58149
describe 'worker.processed' do

0 commit comments

Comments
 (0)