Skip to content

Commit 94390c6

Browse files
committed
Add tracing support using Micrometer
1 parent 58b51e3 commit 94390c6

File tree

23 files changed

+536
-35
lines changed

23 files changed

+536
-35
lines changed

driver-core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ dependencies {
5454

5555
optionalImplementation(libs.snappy.java)
5656
optionalImplementation(libs.zstd.jni)
57+
optionalImplementation(libs.micrometer)
5758

5859
testImplementation(project(path = ":bson", configuration = "testArtifacts"))
5960
testImplementation(libs.reflections)

driver-core/src/main/com/mongodb/MongoClientSettings.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import com.mongodb.connection.SslSettings;
3131
import com.mongodb.connection.TransportSettings;
3232
import com.mongodb.event.CommandListener;
33+
import com.mongodb.internal.tracing.TracingManager;
3334
import com.mongodb.lang.Nullable;
3435
import com.mongodb.spi.dns.DnsClient;
3536
import com.mongodb.spi.dns.InetAddressResolver;
37+
import com.mongodb.tracing.Tracer;
3638
import org.bson.UuidRepresentation;
3739
import org.bson.codecs.BsonCodecProvider;
3840
import org.bson.codecs.BsonValueCodecProvider;
@@ -118,6 +120,7 @@ public final class MongoClientSettings {
118120
private final InetAddressResolver inetAddressResolver;
119121
@Nullable
120122
private final Long timeoutMS;
123+
private final TracingManager tracingManager;
121124

122125
/**
123126
* Gets the default codec registry. It includes the following providers:
@@ -238,6 +241,7 @@ public static final class Builder {
238241
private ContextProvider contextProvider;
239242
private DnsClient dnsClient;
240243
private InetAddressResolver inetAddressResolver;
244+
private TracingManager tracingManager;
241245

242246
private Builder() {
243247
}
@@ -275,6 +279,7 @@ private Builder(final MongoClientSettings settings) {
275279
if (settings.heartbeatSocketTimeoutSetExplicitly) {
276280
heartbeatSocketTimeoutMS = settings.heartbeatSocketSettings.getReadTimeout(MILLISECONDS);
277281
}
282+
tracingManager = settings.tracingManager;
278283
}
279284

280285
/**
@@ -723,6 +728,20 @@ Builder heartbeatSocketTimeoutMS(final int heartbeatSocketTimeoutMS) {
723728
return this;
724729
}
725730

731+
/**
732+
* Sets the tracer to use for creating Spans for operations and commands.
733+
*
734+
* @param tracer the tracer
735+
* @see com.mongodb.tracing.MicrometerTracer
736+
* @return this
737+
* @since 5.5
738+
*/
739+
@Alpha(Reason.CLIENT)
740+
public Builder tracer(final Tracer tracer) {
741+
this.tracingManager = new TracingManager(tracer);
742+
return this;
743+
}
744+
726745
/**
727746
* Build an instance of {@code MongoClientSettings}.
728747
*
@@ -1040,6 +1059,17 @@ public ContextProvider getContextProvider() {
10401059
return contextProvider;
10411060
}
10421061

1062+
/**
1063+
* Get the tracer to create Spans for operations and commands.
1064+
*
1065+
* @return this
1066+
* @since 5.5
1067+
*/
1068+
@Alpha(Reason.CLIENT)
1069+
public TracingManager getTracingManager() {
1070+
return tracingManager;
1071+
}
1072+
10431073
@Override
10441074
public boolean equals(final Object o) {
10451075
if (this == o) {
@@ -1156,5 +1186,6 @@ private MongoClientSettings(final Builder builder) {
11561186
heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0;
11571187
contextProvider = builder.contextProvider;
11581188
timeoutMS = builder.timeoutMS;
1189+
tracingManager = builder.tracingManager;
11591190
}
11601191
}

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
186186
}
187187
}
188188

189+
BsonDocument getCommand() {
190+
return command;
191+
}
192+
189193
/**
190194
* Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type
191195
* `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE`.

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@
5151
import com.mongodb.internal.logging.StructuredLogger;
5252
import com.mongodb.internal.session.SessionContext;
5353
import com.mongodb.internal.time.Timeout;
54+
import com.mongodb.internal.tracing.Span;
55+
import com.mongodb.internal.tracing.TraceContext;
56+
import com.mongodb.internal.tracing.TracingManager;
5457
import com.mongodb.lang.Nullable;
5558
import org.bson.BsonBinaryReader;
5659
import org.bson.BsonDocument;
@@ -75,8 +78,8 @@
7578
import static com.mongodb.assertions.Assertions.assertNull;
7679
import static com.mongodb.assertions.Assertions.isTrue;
7780
import static com.mongodb.assertions.Assertions.notNull;
78-
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
7981
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
82+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
8083
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
8184
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
8285
import static com.mongodb.internal.connection.CommandHelper.HELLO;
@@ -374,13 +377,24 @@ public boolean isClosed() {
374377
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final OperationContext operationContext) {
375378
Supplier<T> sendAndReceiveInternal = () -> sendAndReceiveInternal(
376379
message, decoder, operationContext);
380+
381+
Span tracingSpan = createTracingSpan(message, operationContext);
382+
377383
try {
378384
return sendAndReceiveInternal.get();
379385
} catch (MongoCommandException e) {
386+
if (tracingSpan != null) {
387+
tracingSpan.error(e);
388+
}
389+
380390
if (reauthenticationIsTriggered(e)) {
381391
return reauthenticateAndRetry(sendAndReceiveInternal, operationContext);
382392
}
383393
throw e;
394+
} finally {
395+
if (tracingSpan != null) {
396+
tracingSpan.end();
397+
}
384398
}
385399
}
386400

@@ -391,6 +405,7 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
391405

392406
AsyncSupplier<T> sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal(
393407
message, decoder, operationContext, c);
408+
394409
beginAsync().<T>thenSupply(c -> {
395410
sendAndReceiveAsyncInternal.getAsync(c);
396411
}).onErrorIf(e -> reauthenticationIsTriggered(e), (t, c) -> {
@@ -872,6 +887,42 @@ public ByteBuf getBuffer(final int size) {
872887
return stream.getBuffer(size);
873888
}
874889

890+
@Nullable
891+
private Span createTracingSpan(final CommandMessage message, final OperationContext operationContext) {
892+
TracingManager tracingManager = operationContext.getTracingManager();
893+
Span span;
894+
if (tracingManager.isEnabled()) {
895+
BsonDocument command = message.getCommand();
896+
TraceContext parentContext = null;
897+
long cursorId = -1;
898+
if (command.containsKey("getMore")) {
899+
cursorId = command.getInt64("getMore").longValue();
900+
parentContext = tracingManager.getCursorParentContext(cursorId);
901+
} else {
902+
parentContext = tracingManager.getParentContext(operationContext.getId());
903+
}
904+
905+
span = tracingManager.addSpan("Command " + command.getFirstKey(), parentContext);
906+
span.tag("db.system", "mongodb");
907+
span.tag("db.namespace", message.getNamespace().getFullName());
908+
span.tag("db.query.summary", command.getFirstKey());
909+
span.tag("db.query.opcode", String.valueOf(message.getOpCode()));
910+
span.tag("db.query.text", command.toString());
911+
if (cursorId != -1) {
912+
span.tag("db.mongodb.cursor_id", String.valueOf(cursorId));
913+
}
914+
span.tag("server.address", serverId.getAddress().getHost());
915+
span.tag("server.port", String.valueOf(serverId.getAddress().getPort()));
916+
span.tag("server.type", message.getSettings().getServerType().name());
917+
918+
span.tag("db.mongodb.server_connection_id", this.description.getConnectionId().toString());
919+
} else {
920+
span = null;
921+
}
922+
923+
return span;
924+
}
925+
875926
private class MessageHeaderCallback implements SingleResultCallback<ByteBuf> {
876927
private final OperationContext operationContext;
877928
private final SingleResultCallback<ResponseBuffers> callback;

driver-core/src/main/com/mongodb/internal/connection/OperationContext.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.mongodb.internal.TimeoutSettings;
2828
import com.mongodb.internal.VisibleForTesting;
2929
import com.mongodb.internal.session.SessionContext;
30+
import com.mongodb.internal.tracing.TracingManager;
3031
import com.mongodb.lang.Nullable;
3132
import com.mongodb.selector.ServerSelector;
3233

@@ -49,10 +50,11 @@ public class OperationContext {
4950
private final TimeoutContext timeoutContext;
5051
@Nullable
5152
private final ServerApi serverApi;
53+
private final TracingManager tracingManager;
5254

5355
public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext,
54-
@Nullable final ServerApi serverApi) {
55-
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi);
56+
@Nullable final ServerApi serverApi, final TracingManager tracingManager) {
57+
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, tracingManager);
5658
}
5759

5860
public static OperationContext simpleOperationContext(
@@ -61,29 +63,35 @@ public static OperationContext simpleOperationContext(
6163
IgnorableRequestContext.INSTANCE,
6264
NoOpSessionContext.INSTANCE,
6365
new TimeoutContext(timeoutSettings),
64-
serverApi);
66+
serverApi,
67+
TracingManager.NO_OP);
6568
}
6669

6770
public static OperationContext simpleOperationContext(final TimeoutContext timeoutContext) {
6871
return new OperationContext(
6972
IgnorableRequestContext.INSTANCE,
7073
NoOpSessionContext.INSTANCE,
7174
timeoutContext,
72-
null);
75+
null,
76+
TracingManager.NO_OP);
7377
}
7478

7579
public OperationContext withSessionContext(final SessionContext sessionContext) {
76-
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi);
80+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, tracingManager);
7781
}
7882

7983
public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) {
80-
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi);
84+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, tracingManager);
8185
}
8286

8387
public long getId() {
8488
return id;
8589
}
8690

91+
public TracingManager getTracingManager() {
92+
return tracingManager;
93+
}
94+
8795
public SessionContext getSessionContext() {
8896
return sessionContext;
8997
}
@@ -107,27 +115,31 @@ public OperationContext(final long id,
107115
final SessionContext sessionContext,
108116
final TimeoutContext timeoutContext,
109117
final ServerDeprioritization serverDeprioritization,
110-
@Nullable final ServerApi serverApi) {
118+
@Nullable final ServerApi serverApi,
119+
final TracingManager tracingManager) {
111120
this.id = id;
112121
this.serverDeprioritization = serverDeprioritization;
113122
this.requestContext = requestContext;
114123
this.sessionContext = sessionContext;
115124
this.timeoutContext = timeoutContext;
116125
this.serverApi = serverApi;
126+
this.tracingManager = tracingManager;
117127
}
118128

119129
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
120130
public OperationContext(final long id,
121131
final RequestContext requestContext,
122132
final SessionContext sessionContext,
123133
final TimeoutContext timeoutContext,
124-
@Nullable final ServerApi serverApi) {
134+
@Nullable final ServerApi serverApi,
135+
final TracingManager tracingManager) {
125136
this.id = id;
126137
this.serverDeprioritization = new ServerDeprioritization();
127138
this.requestContext = requestContext;
128139
this.sessionContext = sessionContext;
129140
this.timeoutContext = timeoutContext;
130141
this.serverApi = serverApi;
142+
this.tracingManager = tracingManager;
131143
}
132144

133145

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
7575
@Nullable
7676
private List<T> nextBatch;
7777
private boolean resetTimeoutWhenClosing;
78+
private final long cursorId;
7879

7980
CommandBatchCursor(
8081
final TimeoutMode timeoutMode,
@@ -95,10 +96,13 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
9596
operationContext = connectionSource.getOperationContext();
9697
this.timeoutMode = timeoutMode;
9798

99+
ServerCursor serverCursor = commandCursorResult.getServerCursor();
100+
this.cursorId = serverCursor != null ? serverCursor.getId() : -1;
101+
98102
operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);
99103

100104
Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null;
101-
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
105+
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, serverCursor);
102106
resetTimeoutWhenClosing = true;
103107
}
104108

@@ -169,6 +173,7 @@ public void remove() {
169173

170174
@Override
171175
public void close() {
176+
operationContext.getTracingManager().removeCursorParentContext(cursorId);
172177
resourceManager.close();
173178
}
174179

driver-core/src/main/com/mongodb/internal/operation/FindOperation.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import com.mongodb.internal.binding.AsyncReadBinding;
3131
import com.mongodb.internal.binding.ReadBinding;
3232
import com.mongodb.internal.connection.OperationContext;
33+
import com.mongodb.internal.tracing.Span;
34+
import com.mongodb.internal.tracing.TracingManager;
3335
import com.mongodb.lang.Nullable;
3436
import org.bson.BsonBoolean;
3537
import org.bson.BsonDocument;
@@ -290,21 +292,35 @@ public BatchCursor<T> execute(final ReadBinding binding) {
290292
if (invalidTimeoutModeException != null) {
291293
throw invalidTimeoutModeException;
292294
}
295+
OperationContext operationContext = binding.getOperationContext();
293296

294-
RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext());
295-
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
297+
// Adds a Tracing Span for 'find' operation
298+
TracingManager tracingManager = operationContext.getTracingManager();
299+
Span tracingSpan = tracingManager.addSpan("find", operationContext.getId());
300+
301+
RetryState retryState = initialRetryState(retryReads, operationContext.getTimeoutContext());
302+
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, operationContext, () ->
296303
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
297-
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getOperationContext()));
304+
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), operationContext));
298305
try {
299-
return createReadCommandAndExecute(retryState, binding.getOperationContext(), source, namespace.getDatabaseName(),
306+
return createReadCommandAndExecute(retryState, operationContext, source, namespace.getDatabaseName(),
300307
getCommandCreator(), CommandResultDocumentCodec.create(decoder, FIRST_BATCH),
301308
transformer(), connection);
302309
} catch (MongoCommandException e) {
303310
throw new MongoQueryException(e.getResponse(), e.getServerAddress());
304311
}
305312
})
306313
);
307-
return read.get();
314+
try {
315+
return read.get();
316+
} catch (MongoQueryException e) {
317+
tracingSpan.error(e);
318+
throw e;
319+
} finally {
320+
tracingSpan.end();
321+
// Clean up the tracing span after the operation is complete
322+
tracingManager.cleanContexts(operationContext.getId());
323+
}
308324
}
309325

310326
@Override
@@ -469,9 +485,17 @@ private TimeoutMode getTimeoutMode() {
469485
}
470486

471487
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
472-
return (result, source, connection) ->
473-
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize, getMaxTimeForCursor(source.getOperationContext()), decoder,
474-
comment, source, connection);
488+
return (result, source, connection) -> {
489+
OperationContext operationContext = source.getOperationContext();
490+
491+
// register cursor id with the operation context, so 'getMore' commands can be folded under the 'find' operation
492+
long cursorId = result.getDocument("cursor").getInt64("id").longValue();
493+
TracingManager tracingManager = operationContext.getTracingManager();
494+
tracingManager.addCursorParentContext(cursorId, operationContext.getId());
495+
496+
return new CommandBatchCursor<>(getTimeoutMode(), result, batchSize, getMaxTimeForCursor(operationContext), decoder,
497+
comment, source, connection);
498+
};
475499
}
476500

477501
private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {

0 commit comments

Comments
 (0)