From da4b118b5c00761d9f369dac3ecf4da3ee3c8fdb Mon Sep 17 00:00:00 2001 From: Minje Park Date: Fri, 18 Jul 2025 09:51:48 +0900 Subject: [PATCH 1/5] add capture header to producer and consumer --- .../kafkaclients/v2_6/TracingConsumerInterceptor.java | 4 ++++ .../kafkaclients/v2_6/TracingProducerInterceptor.java | 9 ++++++++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java index 368767c1022a..62c0173c4bff 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java @@ -12,6 +12,7 @@ import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; +import java.util.ArrayList; import java.util.Map; import java.util.Objects; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -32,6 +33,9 @@ public class TracingConsumerInterceptor implements ConsumerInterceptor())) .build(); private String consumerGroup; diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java index c64dae0496cb..ddd9949b45a8 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java @@ -7,6 +7,8 @@ import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil; +import java.util.ArrayList; import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; @@ -22,7 +24,12 @@ */ public class TracingProducerInterceptor implements ProducerInterceptor { - private static final KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get()); + private static final KafkaTelemetry telemetry = + KafkaTelemetry.builder(GlobalOpenTelemetry.get()) + .setCapturedHeaders( + ConfigPropertiesUtil.getList( + "otel.instrumentation.messaging.experimental.capture-headers", new ArrayList<>())) + .build(); @Nullable private String clientId; From aef51194cee562599b371ad5def3291b86cc7a97 Mon Sep 17 00:00:00 2001 From: Minje Park Date: Sun, 20 Jul 2025 17:00:39 +0900 Subject: [PATCH 2/5] add capture headers to InterceptorsTest --- .../kafka-clients-2.6/library/build.gradle.kts | 1 + .../kafkaclients/v2_6/InterceptorsTest.java | 17 +++++++++++++++++ .../kafka-clients-2.6/metadata.yaml | 11 ++++++++++- 3 files changed, 28 insertions(+), 1 deletion(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts index 17dadd3bc579..eb5a22e48c86 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts @@ -36,6 +36,7 @@ tasks { excludeTestsMatching("WrapperSuppressReceiveSpansTest") } jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") + jvmArgs("-Dotel.instrumentation.messaging.experimental.capture-headers=baggage") } check { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index 8f4f65dc2262..83aa97c5fd59 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -18,10 +18,12 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; import static org.assertj.core.api.Assertions.assertThat; +import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.trace.data.LinkData; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.AbstractStringAssert; @@ -42,6 +44,11 @@ void assertTraces() { .hasKind(SpanKind.PRODUCER) .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( + equalTo( + AttributeKey.stringArrayKey("messaging.header.baggage"), + Arrays.asList( + "test-baggage-key-1=test-baggage-value-1", + "test-baggage-key-2=test-baggage-value-2")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "publish"), @@ -64,6 +71,11 @@ void assertTraces() { .hasNoParent() .hasLinksSatisfying(links -> assertThat(links).isEmpty()) .hasAttributesSatisfyingExactly( + equalTo( + AttributeKey.stringArrayKey("messaging.header.baggage"), + Arrays.asList( + "test-baggage-key-1=test-baggage-value-1", + "test-baggage-key-2=test-baggage-value-2")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "receive"), @@ -78,6 +90,11 @@ void assertTraces() { .hasParent(trace.getSpan(0)) .hasLinks(LinkData.create(producerSpanContext.get())) .hasAttributesSatisfyingExactly( + equalTo( + AttributeKey.stringArrayKey("messaging.header.baggage"), + Arrays.asList( + "test-baggage-key-1=test-baggage-value-1", + "test-baggage-key-2=test-baggage-value-2")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "process"), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml index 5898d2e01872..f2f26ee3e407 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml @@ -1,2 +1,11 @@ description: > - This instrumentation provides a library integeration that enables messaging spans and metrics for Apache Kafka 2.6+ clients. + This instrumentation provides both library and wrapper integrations that enable messaging spans and metrics for Apache Kafka 2.6+ clients. +configurations: + - name: otel.instrumentation.messaging.experimental.capture-headers + description: A comma-separated list of header names to capture as span attributes. + type: list + default: '' + - name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled + description: Enables experimental receive telemetry for Kafka instrumentation. + type: boolean + default: false From a474d874f1bff230934eec518e0d370d46579512 Mon Sep 17 00:00:00 2001 From: Minje Park Date: Mon, 21 Jul 2025 09:07:14 +0900 Subject: [PATCH 3/5] use stringArrayKey and asList from static import --- .../kafkaclients/v2_6/TracingConsumerInterceptor.java | 5 +++-- .../kafkaclients/v2_6/TracingProducerInterceptor.java | 5 +++-- .../instrumentation/kafkaclients/v2_6/InterceptorsTest.java | 6 ++++-- .../kafka/kafka-clients/kafka-clients-2.6/metadata.yaml | 4 +++- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java index 62c0173c4bff..8f60b07e6a88 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingConsumerInterceptor.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6; +import static java.util.Collections.emptyList; + import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.context.Context; @@ -12,7 +14,6 @@ import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext; import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil; -import java.util.ArrayList; import java.util.Map; import java.util.Objects; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -35,7 +36,7 @@ public class TracingConsumerInterceptor implements ConsumerInterceptor())) + "otel.instrumentation.messaging.experimental.capture-headers", emptyList())) .build(); private String consumerGroup; diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java index ddd9949b45a8..833b0785d181 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/main/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/TracingProducerInterceptor.java @@ -5,10 +5,11 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6; +import static java.util.Collections.emptyList; + import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil; -import java.util.ArrayList; import java.util.Map; import java.util.Objects; import javax.annotation.Nullable; @@ -28,7 +29,7 @@ public class TracingProducerInterceptor implements ProducerInterceptor())) + "otel.instrumentation.messaging.experimental.capture-headers", emptyList())) .build(); @Nullable private String clientId; diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index 83aa97c5fd59..85dbb2cfe60c 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -5,6 +5,7 @@ package io.opentelemetry.instrumentation.kafkaclients.v2_6; +import static io.opentelemetry.api.common.AttributeKey.stringArrayKey; import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; @@ -16,6 +17,7 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; +import static java.util.Arrays.asList; import static org.assertj.core.api.Assertions.assertThat; import io.opentelemetry.api.common.AttributeKey; @@ -45,8 +47,8 @@ void assertTraces() { .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( equalTo( - AttributeKey.stringArrayKey("messaging.header.baggage"), - Arrays.asList( + stringArrayKey("messaging.header.baggage"), + asList( "test-baggage-key-1=test-baggage-value-1", "test-baggage-key-2=test-baggage-value-2")), equalTo(MESSAGING_SYSTEM, "kafka"), diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml index f2f26ee3e407..5544765ac893 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml @@ -6,6 +6,8 @@ configurations: type: list default: '' - name: otel.instrumentation.messaging.experimental.receive-telemetry.enabled - description: Enables experimental receive telemetry for Kafka instrumentation. + description: > + Enables experimental receive telemetry, which will cause consumers to start a new trace, with + only a span link connecting it to the producer trace. type: boolean default: false From e482dd563a31d9de841864be1aaa1d75d9d79c99 Mon Sep 17 00:00:00 2001 From: Minje Park Date: Tue, 22 Jul 2025 18:01:43 +0900 Subject: [PATCH 4/5] wrapper word is removed in metadata.yaml --- .../kafka/kafka-clients/kafka-clients-2.6/metadata.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml index 5544765ac893..652cea4da3f3 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/metadata.yaml @@ -1,5 +1,5 @@ description: > - This instrumentation provides both library and wrapper integrations that enable messaging spans and metrics for Apache Kafka 2.6+ clients. + This instrumentation provides a library integrations that enables messaging spans and metrics for Apache Kafka 2.6+ clients. configurations: - name: otel.instrumentation.messaging.experimental.capture-headers description: A comma-separated list of header names to capture as span attributes. From c973fae1c9d436b1e8aa45135a848e7a2795e140 Mon Sep 17 00:00:00 2001 From: Lauri Tulmin Date: Wed, 27 Aug 2025 09:48:53 +0300 Subject: [PATCH 5/5] use the same test header as other tests --- .../library/build.gradle.kts | 2 +- .../v2_6/AbstractInterceptorsTest.java | 2 ++ .../kafkaclients/v2_6/InterceptorsTest.java | 22 ++++++------------- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts index eb5a22e48c86..e9bd32628c83 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/build.gradle.kts @@ -36,7 +36,7 @@ tasks { excludeTestsMatching("WrapperSuppressReceiveSpansTest") } jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true") - jvmArgs("-Dotel.instrumentation.messaging.experimental.capture-headers=baggage") + systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header") } check { diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java index 73d62592a267..0314c504c71f 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/AbstractInterceptorsTest.java @@ -53,6 +53,8 @@ void testInterceptors() throws InterruptedException { new ProducerRecord<>(SHARED_TOPIC, greeting); producerRecord .headers() + // add header to test capturing header value as span attribute + .add("test-message-header", "test".getBytes(StandardCharsets.UTF_8)) // adding baggage header in w3c baggage format .add( "baggage", diff --git a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java index 85dbb2cfe60c..2d6a41149525 100644 --- a/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java +++ b/instrumentation/kafka/kafka-clients/kafka-clients-2.6/library/src/test/java/io/opentelemetry/instrumentation/kafkaclients/v2_6/InterceptorsTest.java @@ -17,15 +17,13 @@ import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION; import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM; -import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; -import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.trace.SpanContext; import io.opentelemetry.api.trace.SpanKind; import io.opentelemetry.sdk.trace.data.LinkData; import java.nio.charset.StandardCharsets; -import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; import org.assertj.core.api.AbstractLongAssert; import org.assertj.core.api.AbstractStringAssert; @@ -47,10 +45,8 @@ void assertTraces() { .hasParent(trace.getSpan(0)) .hasAttributesSatisfyingExactly( equalTo( - stringArrayKey("messaging.header.baggage"), - asList( - "test-baggage-key-1=test-baggage-value-1", - "test-baggage-key-2=test-baggage-value-2")), + stringArrayKey("messaging.header.test_message_header"), + singletonList("test")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "publish"), @@ -74,10 +70,8 @@ void assertTraces() { .hasLinksSatisfying(links -> assertThat(links).isEmpty()) .hasAttributesSatisfyingExactly( equalTo( - AttributeKey.stringArrayKey("messaging.header.baggage"), - Arrays.asList( - "test-baggage-key-1=test-baggage-value-1", - "test-baggage-key-2=test-baggage-value-2")), + stringArrayKey("messaging.header.test_message_header"), + singletonList("test")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "receive"), @@ -93,10 +87,8 @@ void assertTraces() { .hasLinks(LinkData.create(producerSpanContext.get())) .hasAttributesSatisfyingExactly( equalTo( - AttributeKey.stringArrayKey("messaging.header.baggage"), - Arrays.asList( - "test-baggage-key-1=test-baggage-value-1", - "test-baggage-key-2=test-baggage-value-2")), + stringArrayKey("messaging.header.test_message_header"), + singletonList("test")), equalTo(MESSAGING_SYSTEM, "kafka"), equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC), equalTo(MESSAGING_OPERATION, "process"),