Skip to content

Commit d906bc2

Browse files
committed
netty-4.0's fix to correct context prop
1 parent 6c4a011 commit d906bc2

File tree

4 files changed

+166
-35
lines changed

4 files changed

+166
-35
lines changed

instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/NettyChannelPipelineInstrumentation.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientRequestTracingHandler;
3737
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientResponseTracingHandler;
3838
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.HttpClientTracingHandler;
39+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.OtelHttpClientRequestTracingHandler;
3940
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerBlockingRequestHandler;
4041
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerRequestTracingHandler;
4142
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.server.HttpServerResponseTracingHandler;
@@ -135,14 +136,13 @@ public static void addHandler(
135136
HttpClientTracingHandler.class.getName(),
136137
new HttpClientTracingHandler());
137138

138-
// add OTEL request handler to start spans
139+
// add our custom request handler to start spans with proper context propagation
139140
pipeline.addAfter(
140141
HttpClientTracingHandler.class.getName(),
141142
io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
142143
.HttpClientRequestTracingHandler.class
143144
.getName(),
144-
new io.opentelemetry.javaagent.instrumentation.netty.v4_0.client
145-
.HttpClientRequestTracingHandler());
145+
new OtelHttpClientRequestTracingHandler());
146146
} else if (handler instanceof HttpRequestEncoder) {
147147
pipeline.addLast(
148148
HttpClientRequestTracingHandler.class.getName(),
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Copyright The Hypertrace Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client;
18+
19+
import io.netty.channel.ChannelHandlerContext;
20+
import io.netty.channel.ChannelPromise;
21+
import io.netty.handler.codec.http.HttpRequest;
22+
import io.netty.handler.codec.http.HttpResponse;
23+
import io.opentelemetry.api.trace.Span;
24+
import io.opentelemetry.api.trace.SpanContext;
25+
import io.opentelemetry.context.Context;
26+
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
27+
import io.opentelemetry.instrumentation.netty.v4.common.HttpRequestAndChannel;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.AttributeKeys;
30+
import io.opentelemetry.javaagent.instrumentation.netty.v4_0.client.HttpClientRequestTracingHandler;
31+
32+
/**
33+
* Custom extension of OpenTelemetry's HttpClientRequestTracingHandler that ensures proper context
34+
* propagation by using Context.current() as the parent context.
35+
*/
36+
public class OtelHttpClientRequestTracingHandler extends HttpClientRequestTracingHandler {
37+
38+
// Store the server context for each thread
39+
private static final ThreadLocal<Context> SERVER_CONTEXT = new ThreadLocal<>();
40+
41+
// Store the mapping from thread ID to server span context (for cross-thread scenarios)
42+
private static final ConcurrentHashMap<Long, SpanContext> THREAD_TO_SPAN_CONTEXT =
43+
new ConcurrentHashMap<>();
44+
45+
// Maximum size for the thread map before triggering cleanup
46+
private static final int MAX_THREAD_MAP_SIZE = 1000;
47+
48+
// Cleanup flag to avoid excessive synchronized blocks
49+
private static volatile boolean cleanupNeeded = false;
50+
51+
public OtelHttpClientRequestTracingHandler() {
52+
super();
53+
}
54+
55+
/**
56+
* Stores the current context as the server context for this thread. This should be called from
57+
* the server handler.
58+
*/
59+
public static void storeServerContext(Context context) {
60+
SERVER_CONTEXT.set(context);
61+
62+
// Also store the span context by thread ID for cross-thread lookup
63+
Span span = Span.fromContext(context);
64+
if (span != null && span.getSpanContext().isValid()) {
65+
THREAD_TO_SPAN_CONTEXT.put(Thread.currentThread().getId(), span.getSpanContext());
66+
67+
// Check if we need to clean up the map
68+
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
69+
cleanupNeeded = true;
70+
}
71+
}
72+
}
73+
74+
/**
75+
* Perform cleanup of the thread map if it has grown too large. This is done in a synchronized
76+
* block to prevent concurrent modification issues.
77+
*/
78+
private static void cleanupThreadMapIfNeeded() {
79+
if (cleanupNeeded) {
80+
synchronized (THREAD_TO_SPAN_CONTEXT) {
81+
if (THREAD_TO_SPAN_CONTEXT.size() > MAX_THREAD_MAP_SIZE) {
82+
THREAD_TO_SPAN_CONTEXT.clear();
83+
cleanupNeeded = false;
84+
}
85+
}
86+
}
87+
}
88+
89+
@Override
90+
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise prm) {
91+
try {
92+
if (!(msg instanceof HttpRequest)) {
93+
super.write(ctx, msg, prm);
94+
return;
95+
}
96+
97+
Context parentContext = SERVER_CONTEXT.get();
98+
99+
// Fallback -> If no context in thread local, try Context.current()
100+
if (parentContext == null) {
101+
parentContext = Context.current();
102+
}
103+
104+
// Store the parent context in the channel attributes
105+
// This is used by the Opentelemetry's HttpClientRequestTracingHandler in propagating correct
106+
// context.
107+
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_CONTEXT).set(parentContext);
108+
109+
// Call the parent implementation which will use our stored parent context
110+
super.write(ctx, msg, prm);
111+
112+
// Clean up after use to prevent memory leaks
113+
SERVER_CONTEXT.remove();
114+
THREAD_TO_SPAN_CONTEXT.remove(Thread.currentThread().getId());
115+
cleanupThreadMapIfNeeded();
116+
117+
} catch (Exception ignored) {
118+
}
119+
}
120+
}

instrumentation/netty/netty-4.0/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_0/server/HttpServerRequestTracingHandler.java

Lines changed: 42 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,14 @@
2727
import io.opentelemetry.api.common.AttributeKey;
2828
import io.opentelemetry.api.trace.Span;
2929
import io.opentelemetry.context.Context;
30+
import io.opentelemetry.context.Scope;
3031
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.AttributeKeys;
3132
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.DataCaptureUtils;
3233
import java.nio.charset.Charset;
3334
import java.util.HashMap;
3435
import java.util.Map;
36+
37+
import io.opentelemetry.javaagent.instrumentation.hypertrace.netty.v4_0.client.OtelHttpClientRequestTracingHandler;
3538
import org.hypertrace.agent.core.config.InstrumentationConfig;
3639
import org.hypertrace.agent.core.instrumentation.HypertraceSemanticAttributes;
3740
import org.hypertrace.agent.core.instrumentation.buffer.BoundedBuffersFactory;
@@ -57,47 +60,55 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
5760
ctx.fireChannelRead(msg);
5861
return;
5962
}
60-
Span span = Span.fromContext(context);
6163

62-
if (msg instanceof HttpRequest) {
63-
HttpRequest httpRequest = (HttpRequest) msg;
6464

65-
Map<String, String> headersMap = headersToMap(httpRequest);
66-
if (instrumentationConfig.httpHeaders().request()) {
67-
headersMap.forEach(span::setAttribute);
68-
}
69-
// used by blocking handler
70-
channel.attr(AttributeKeys.REQUEST_HEADERS).set(headersMap);
65+
// Store the server context in our ThreadLocal for later use by client handlers
66+
// This is CRITICAL for proper context propagation to client spans
67+
OtelHttpClientRequestTracingHandler.storeServerContext(context);
68+
69+
try (Scope ignored = context.makeCurrent()) {
70+
Span span = Span.fromContext(context);
7171

72-
CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
73-
if (instrumentationConfig.httpBody().request()
74-
&& contentType != null
75-
&& ContentTypeUtils.shouldCapture(contentType.toString())) {
72+
if (msg instanceof HttpRequest) {
73+
HttpRequest httpRequest = (HttpRequest) msg;
7674

77-
CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
78-
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);
75+
Map<String, String> headersMap = headersToMap(httpRequest);
76+
if (instrumentationConfig.httpHeaders().request()) {
77+
headersMap.forEach(span::setAttribute);
78+
}
79+
// used by blocking handler
80+
channel.attr(AttributeKeys.REQUEST_HEADERS).set(headersMap);
7981

80-
String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
81-
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);
82+
CharSequence contentType = DataCaptureUtils.getContentType(httpRequest);
83+
if (instrumentationConfig.httpBody().request()
84+
&& contentType != null
85+
&& ContentTypeUtils.shouldCapture(contentType.toString())) {
8286

83-
// set the buffer to capture response body
84-
// the buffer is used byt captureBody method
85-
Attribute<BoundedByteArrayOutputStream> bufferAttr =
86-
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
87-
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));
87+
CharSequence contentLengthHeader = DataCaptureUtils.getContentLength(httpRequest);
88+
int contentLength = ContentLengthUtils.parseLength(contentLengthHeader);
8889

89-
channel.attr(AttributeKeys.CHARSET).set(charset);
90+
String charsetString = ContentTypeUtils.parseCharset(contentType.toString());
91+
Charset charset = ContentTypeCharsetUtils.toCharset(charsetString);
92+
93+
// set the buffer to capture response body
94+
// the buffer is used byt captureBody method
95+
Attribute<BoundedByteArrayOutputStream> bufferAttr =
96+
ctx.channel().attr(AttributeKeys.REQUEST_BODY_BUFFER);
97+
bufferAttr.set(BoundedBuffersFactory.createStream(contentLength, charset));
98+
99+
channel.attr(AttributeKeys.CHARSET).set(charset);
100+
}
90101
}
91-
}
92102

93-
if ((msg instanceof HttpContent || msg instanceof ByteBuf)
94-
&& instrumentationConfig.httpBody().request()) {
95-
Charset charset = channel.attr(AttributeKeys.CHARSET).get();
96-
if (charset == null) {
97-
charset = ContentTypeCharsetUtils.getDefaultCharset();
103+
if ((msg instanceof HttpContent || msg instanceof ByteBuf)
104+
&& instrumentationConfig.httpBody().request()) {
105+
Charset charset = channel.attr(AttributeKeys.CHARSET).get();
106+
if (charset == null) {
107+
charset = ContentTypeCharsetUtils.getDefaultCharset();
108+
}
109+
DataCaptureUtils.captureBody(
110+
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
98111
}
99-
DataCaptureUtils.captureBody(
100-
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
101112
}
102113

103114
ctx.fireChannelRead(msg);

instrumentation/netty/netty-4.1/src/main/java/io/opentelemetry/javaagent/instrumentation/hypertrace/netty/v4_1/server/HttpServerRequestTracingHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,8 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
111111
span, channel, AttributeKeys.REQUEST_BODY_BUFFER, msg, null, charset);
112112
}
113113

114-
ctx.fireChannelRead(msg);
115114
}
115+
ctx.fireChannelRead(msg);
116116
}
117117

118118
private static Map<String, String> headersToMap(HttpMessage httpMessage) {

0 commit comments

Comments
 (0)