Skip to content

Commit e547fd6

Browse files
committed
Support different karafka configurations per topic
1 parent 9e7fa0c commit e547fd6

File tree

3 files changed

+57
-26
lines changed

3 files changed

+57
-26
lines changed

lib/datadog/tracing/contrib/karafka/integration.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ def new_configuration
3838
def patcher
3939
Patcher
4040
end
41+
42+
def resolver
43+
@resolver ||= Contrib::Configuration::Resolvers::PatternResolver.new
44+
end
4145
end
4246
end
4347
end

lib/datadog/tracing/contrib/karafka/patcher.rb

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,6 @@ module Contrib
1010
module Karafka
1111
# Patch to add tracing to Karafka::Messages::Messages
1212
module MessagesPatch
13-
def configuration
14-
Datadog.configuration.tracing[:karafka]
15-
end
16-
17-
def propagation
18-
@propagation ||= Contrib::Karafka::Distributed::Propagation.new
19-
end
20-
2113
# `each` is the most popular access point to Karafka messages,
2214
# but not the only one
2315
# Other access patterns do not have a straightforward tracing avenue
@@ -28,6 +20,7 @@ def each(&block)
2820
parent_trace_digest = Datadog::Tracing.active_trace&.to_digest
2921

3022
@messages_array.each do |message|
23+
configuration = datadog_configuration(message.topic)
3124
trace_digest = if configuration[:distributed_tracing]
3225
headers = if message.metadata.respond_to?(:raw_headers)
3326
message.metadata.raw_headers
@@ -61,6 +54,12 @@ def each(&block)
6154
end
6255
end
6356
end
57+
58+
private
59+
60+
def datadog_configuration(topic)
61+
Datadog.configuration.tracing[:karafka, topic]
62+
end
6463
end
6564

6665
module AppPatch

spec/datadog/tracing/contrib/karafka/patcher_spec.rb

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
before do
1818
Datadog.configure do |c|
1919
c.tracing.instrument :karafka, configuration_options
20+
c.tracing.instrument :karafka, describes: /special_/, distributed_tracing: false
2021
end
2122
end
2223

@@ -31,16 +32,12 @@
3132
let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_MESSAGE_CONSUME }
3233

3334
it 'is expected to send a span' do
34-
metadata = ::Karafka::Messages::Metadata.new
35-
metadata['offset'] = 412
35+
metadata = ::Karafka::Messages::Metadata.new(offset: 412, timestamp: Time.now, topic: 'topic_a')
3636
raw_payload = rand.to_s
3737

3838
message = ::Karafka::Messages::Message.new(raw_payload, metadata)
39-
allow(message).to receive(:timestamp).and_return(Time.now)
40-
allow(message).to receive(:topic).and_return('topic_a')
41-
42-
topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
4339

40+
topic = ::Karafka::Routing::Topic.new(message.topic, double(id: 0))
4441
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
4542

4643
expect(messages).to all(be_a(::Karafka::Messages::Message))
@@ -55,6 +52,7 @@
5552
end
5653

5754
context 'when the message has tracing headers' do
55+
let(:topic_name) { "topic_a" }
5856
let(:message) do
5957
headers = {}
6058
producer_trace = nil
@@ -64,15 +62,15 @@
6462
producer_trace = trace
6563
Datadog::Tracing::Contrib::Karafka.inject(trace.to_digest, headers)
6664
end
67-
metadata = ::Karafka::Messages::Metadata.new
68-
metadata['offset'] = 412
69-
metadata[headers_accessor] = headers
65+
metadata = ::Karafka::Messages::Metadata.new(
66+
offset: 412,
67+
headers_accessor => headers,
68+
topic: topic_name,
69+
timestamp: Time.now
70+
)
7071
raw_payload = rand.to_s
7172

72-
message = ::Karafka::Messages::Message.new(raw_payload, metadata)
73-
allow(message).to receive(:timestamp).and_return(Time.now)
74-
allow(message).to receive(:topic).and_return('topic_a')
75-
message
73+
::Karafka::Messages::Message.new(raw_payload, metadata)
7674
end
7775
let(:headers_accessor) do
7876
::Karafka::Messages::Metadata.members.include?(:raw_headers) ? 'raw_headers' : 'headers'
@@ -89,7 +87,7 @@
8987
consumer_span = Datadog::Tracing.active_span
9088
consumer_trace = Datadog::Tracing.active_trace
9189

92-
topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
90+
topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
9391
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
9492
expect(messages).to all(be_a(::Karafka::Messages::Message))
9593

@@ -113,6 +111,37 @@
113111
end
114112
end
115113

114+
context "when distributed tracing is disabled for the topic in particular" do
115+
let(:topic_name) { "special_topic" }
116+
117+
it 'does not continue the span that produced the message' do
118+
consumer_span = nil
119+
consumer_trace = nil
120+
121+
Datadog::Tracing.trace('consumer') do
122+
consumer_span = Datadog::Tracing.active_span
123+
consumer_trace = Datadog::Tracing.active_trace
124+
125+
topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
126+
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
127+
expect(messages).to all(be_a(::Karafka::Messages::Message))
128+
129+
# assert that the current trace re-set to the original trace after iterating the messages
130+
expect(Datadog::Tracing.active_trace).to eq(consumer_trace)
131+
expect(Datadog::Tracing.active_span).to eq(consumer_span)
132+
end
133+
134+
expect(spans).to have(3).items
135+
136+
# assert that the message span is not continuation of the producer span
137+
expect(span.parent_id).to eq(consumer_span.id)
138+
expect(span.trace_id).to eq(consumer_trace.id)
139+
140+
expect(span.links).to be_empty
141+
expect(consumer_span.links).to be_empty
142+
end
143+
end
144+
116145
context 'when distributed tracing is not enabled' do
117146
let(:configuration_options) { { distributed_tracing: false } }
118147

@@ -124,7 +153,7 @@
124153
consumer_span = Datadog::Tracing.active_span
125154
consumer_trace = Datadog::Tracing.active_trace
126155

127-
topic = ::Karafka::Routing::Topic.new('topic_a', double(id: 0))
156+
topic = ::Karafka::Routing::Topic.new(topic_name, double(id: 0))
128157
messages = ::Karafka::Messages::Builders::Messages.call([message], topic, 0, Time.now)
129158
expect(messages).to all(be_a(::Karafka::Messages::Message))
130159

@@ -150,12 +179,11 @@
150179
let(:span_name) { Datadog::Tracing::Contrib::Karafka::Ext::SPAN_WORKER_PROCESS }
151180

152181
it 'is expected to send a span' do
153-
metadata = ::Karafka::Messages::Metadata.new
154-
metadata['offset'] = 412
182+
metadata = ::Karafka::Messages::Metadata.new(offset: 412, topic: 'topic_a')
155183
raw_payload = rand.to_s
156184

157185
message = ::Karafka::Messages::Message.new(raw_payload, metadata)
158-
job = double(executor: double(topic: double(name: 'topic_a', consumer: 'ABC'), partition: 0), messages: [message])
186+
job = double(executor: double(topic: double(name: message.topic, consumer: 'ABC'), partition: 0), messages: [message])
159187

160188
Karafka.monitor.instrument('worker.processed', { job: job }) do
161189
# Noop

0 commit comments

Comments
 (0)