|
12 | 12 | from opentelemetry.trace.span import format_span_id |
13 | 13 |
|
14 | 14 | from instana.configurator import config |
15 | | -from instana.options import StandardOptions |
16 | | -from instana.singletons import agent, tracer |
17 | | -from instana.util.config import parse_ignored_endpoints_from_yaml |
18 | | -from tests.helpers import get_first_span_by_filter, testenv |
19 | | - |
20 | 15 | from instana.instrumentation.kafka import kafka_python |
21 | 16 | from instana.instrumentation.kafka.kafka_python import ( |
22 | 17 | clear_context, |
23 | | - save_consumer_span_into_context, |
24 | 18 | close_consumer_span, |
25 | 19 | consumer_span, |
| 20 | + save_consumer_span_into_context, |
26 | 21 | ) |
| 22 | +from instana.options import StandardOptions |
| 23 | +from instana.singletons import agent, tracer |
27 | 24 | from instana.span.span import InstanaSpan |
| 25 | +from instana.util.config import parse_ignored_endpoints_from_yaml |
| 26 | +from tests.helpers import get_first_span_by_filter, testenv |
28 | 27 |
|
29 | 28 |
|
30 | 29 | class TestKafkaPython: |
@@ -122,6 +121,70 @@ def test_trace_kafka_python_send(self) -> None: |
122 | 121 | assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] |
123 | 122 | assert kafka_span.data["kafka"]["access"] == "send" |
124 | 123 |
|
| 124 | + def test_trace_kafka_python_send_with_keyword_topic(self) -> None: |
| 125 | + """Test that tracing works when topic is passed as a keyword argument.""" |
| 126 | + with tracer.start_as_current_span("test"): |
| 127 | + # Pass topic as a keyword argument |
| 128 | + future = self.producer.send( |
| 129 | + topic=testenv["kafka_topic"], value=b"raw_bytes" |
| 130 | + ) |
| 131 | + |
| 132 | + _ = future.get(timeout=10) # noqa: F841 |
| 133 | + |
| 134 | + spans = self.recorder.queued_spans() |
| 135 | + assert len(spans) == 2 |
| 136 | + |
| 137 | + kafka_span = spans[0] |
| 138 | + test_span = spans[1] |
| 139 | + |
| 140 | + # Same traceId |
| 141 | + assert test_span.t == kafka_span.t |
| 142 | + |
| 143 | + # Parent relationships |
| 144 | + assert kafka_span.p == test_span.s |
| 145 | + |
| 146 | + # Error logging |
| 147 | + assert not test_span.ec |
| 148 | + assert not kafka_span.ec |
| 149 | + |
| 150 | + assert kafka_span.n == "kafka" |
| 151 | + assert kafka_span.k == SpanKind.CLIENT |
| 152 | + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] |
| 153 | + assert kafka_span.data["kafka"]["access"] == "send" |
| 154 | + |
| 155 | + def test_trace_kafka_python_send_with_keyword_args(self) -> None: |
| 156 | + """Test that tracing works when both topic and headers are passed as keyword arguments.""" |
| 157 | + with tracer.start_as_current_span("test"): |
| 158 | + # Pass both topic and headers as keyword arguments |
| 159 | + future = self.producer.send( |
| 160 | + topic=testenv["kafka_topic"], |
| 161 | + value=b"raw_bytes", |
| 162 | + headers=[("custom-header", b"header-value")], |
| 163 | + ) |
| 164 | + |
| 165 | + _ = future.get(timeout=10) # noqa: F841 |
| 166 | + |
| 167 | + spans = self.recorder.queued_spans() |
| 168 | + assert len(spans) == 2 |
| 169 | + |
| 170 | + kafka_span = spans[0] |
| 171 | + test_span = spans[1] |
| 172 | + |
| 173 | + # Same traceId |
| 174 | + assert test_span.t == kafka_span.t |
| 175 | + |
| 176 | + # Parent relationships |
| 177 | + assert kafka_span.p == test_span.s |
| 178 | + |
| 179 | + # Error logging |
| 180 | + assert not test_span.ec |
| 181 | + assert not kafka_span.ec |
| 182 | + |
| 183 | + assert kafka_span.n == "kafka" |
| 184 | + assert kafka_span.k == SpanKind.CLIENT |
| 185 | + assert kafka_span.data["kafka"]["service"] == testenv["kafka_topic"] |
| 186 | + assert kafka_span.data["kafka"]["access"] == "send" |
| 187 | + |
125 | 188 | def test_trace_kafka_python_consume(self) -> None: |
126 | 189 | # Produce some events |
127 | 190 | self.producer.send(testenv["kafka_topic"], b"raw_bytes1") |
|
0 commit comments