97
97
import static com .mongodb .internal .connection .ProtocolHelper .isCommandOk ;
98
98
import static com .mongodb .internal .logging .LogMessage .Level .DEBUG ;
99
99
import static com .mongodb .internal .thread .InterruptionUtil .translateInterruptedException ;
100
+ import static com .mongodb .internal .tracing .Tags .CLIENT_CONNECTION_ID ;
101
+ import static com .mongodb .internal .tracing .Tags .CURSOR_ID ;
102
+ import static com .mongodb .internal .tracing .Tags .NAMESPACE ;
103
+ import static com .mongodb .internal .tracing .Tags .QUERY_OPCODE ;
104
+ import static com .mongodb .internal .tracing .Tags .QUERY_SUMMARY ;
105
+ import static com .mongodb .internal .tracing .Tags .QUERY_TEXT ;
106
+ import static com .mongodb .internal .tracing .Tags .SERVER_ADDRESS ;
107
+ import static com .mongodb .internal .tracing .Tags .SERVER_CONNECTION_ID ;
108
+ import static com .mongodb .internal .tracing .Tags .SERVER_PORT ;
109
+ import static com .mongodb .internal .tracing .Tags .SERVER_TYPE ;
110
+ import static com .mongodb .internal .tracing .Tags .SESSION_ID ;
111
+ import static com .mongodb .internal .tracing .Tags .SYSTEM ;
112
+ import static com .mongodb .internal .tracing .Tags .TRANSACTION_NUMBER ;
100
113
import static java .util .Arrays .asList ;
101
114
102
115
/**
@@ -377,24 +390,13 @@ public boolean isClosed() {
377
390
public <T > T sendAndReceive (final CommandMessage message , final Decoder <T > decoder , final OperationContext operationContext ) {
378
391
Supplier <T > sendAndReceiveInternal = () -> sendAndReceiveInternal (
379
392
message , decoder , operationContext );
380
-
381
- Span tracingSpan = createTracingSpan (message , operationContext );
382
-
383
393
try {
384
394
return sendAndReceiveInternal .get ();
385
- } catch (MongoCommandException e ) {
386
- if (tracingSpan != null ) {
387
- tracingSpan .error (e );
388
- }
389
-
395
+ } catch (Throwable e ) {
390
396
if (reauthenticationIsTriggered (e )) {
391
397
return reauthenticateAndRetry (sendAndReceiveInternal , operationContext );
392
398
}
393
399
throw e ;
394
- } finally {
395
- if (tracingSpan != null ) {
396
- tracingSpan .end ();
397
- }
398
400
}
399
401
}
400
402
@@ -406,9 +408,7 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
406
408
AsyncSupplier <T > sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal (
407
409
message , decoder , operationContext , c );
408
410
409
- beginAsync ().<T >thenSupply (c -> {
410
- sendAndReceiveAsyncInternal .getAsync (c );
411
- }).onErrorIf (e -> reauthenticationIsTriggered (e ), (t , c ) -> {
411
+ beginAsync ().thenSupply (sendAndReceiveAsyncInternal ::getAsync ).onErrorIf (this ::reauthenticationIsTriggered , (t , c ) -> {
412
412
reauthenticateAndRetryAsync (sendAndReceiveAsyncInternal , operationContext , c );
413
413
}).finish (callback );
414
414
}
@@ -447,15 +447,31 @@ public boolean reauthenticationIsTriggered(@Nullable final Throwable t) {
447
447
private <T > T sendAndReceiveInternal (final CommandMessage message , final Decoder <T > decoder ,
448
448
final OperationContext operationContext ) {
449
449
CommandEventSender commandEventSender ;
450
+ Span tracingSpan = createTracingSpan (message , operationContext );
451
+
450
452
try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput (this )) {
451
453
message .encode (bsonOutput , operationContext );
452
- commandEventSender = createCommandEventSender (message , bsonOutput , operationContext );
454
+ BsonDocument commandDocument = message .getCommandDocument (bsonOutput );
455
+
456
+ commandEventSender = createCommandEventSender (message , commandDocument , operationContext );
453
457
commandEventSender .sendStartedEvent ();
458
+
459
+ if (tracingSpan != null && operationContext .getTracingManager ().isCommandPayloadEnabled ()) {
460
+ tracingSpan .tag (QUERY_TEXT , commandDocument .toJson ());
461
+ }
462
+
454
463
try {
455
464
sendCommandMessage (message , bsonOutput , operationContext );
456
465
} catch (Exception e ) {
466
+ if (tracingSpan != null ) {
467
+ tracingSpan .error (e );
468
+ }
457
469
commandEventSender .sendFailedEvent (e );
458
470
throw e ;
471
+ } finally {
472
+ if (tracingSpan != null ) {
473
+ tracingSpan .end ();
474
+ }
459
475
}
460
476
}
461
477
@@ -568,7 +584,8 @@ private <T> void sendAndReceiveAsyncInternal(final CommandMessage message, final
568
584
569
585
try {
570
586
message .encode (bsonOutput , operationContext );
571
- CommandEventSender commandEventSender = createCommandEventSender (message , bsonOutput , operationContext );
587
+ BsonDocument commandDocument = message .getCommandDocument (bsonOutput );
588
+ CommandEventSender commandEventSender = createCommandEventSender (message , commandDocument , operationContext );
572
589
commandEventSender .sendStartedEvent ();
573
590
Compressor localSendCompressor = sendCompressor ;
574
591
if (localSendCompressor == null || SECURITY_SENSITIVE_COMMANDS .contains (message .getCommandDocument (bsonOutput ).getFirstKey ())) {
@@ -887,42 +904,6 @@ public ByteBuf getBuffer(final int size) {
887
904
return stream .getBuffer (size );
888
905
}
889
906
890
- @ Nullable
891
- private Span createTracingSpan (final CommandMessage message , final OperationContext operationContext ) {
892
- TracingManager tracingManager = operationContext .getTracingManager ();
893
- Span span ;
894
- if (tracingManager .isEnabled ()) {
895
- BsonDocument command = message .getCommand ();
896
- TraceContext parentContext = null ;
897
- long cursorId = -1 ;
898
- if (command .containsKey ("getMore" )) {
899
- cursorId = command .getInt64 ("getMore" ).longValue ();
900
- parentContext = tracingManager .getCursorParentContext (cursorId );
901
- } else {
902
- parentContext = tracingManager .getParentContext (operationContext .getId ());
903
- }
904
-
905
- span = tracingManager .addSpan ("Command " + command .getFirstKey (), parentContext );
906
- span .tag ("db.system" , "mongodb" );
907
- span .tag ("db.namespace" , message .getNamespace ().getFullName ());
908
- span .tag ("db.query.summary" , command .getFirstKey ());
909
- span .tag ("db.query.opcode" , String .valueOf (message .getOpCode ()));
910
- span .tag ("db.query.text" , command .toString ());
911
- if (cursorId != -1 ) {
912
- span .tag ("db.mongodb.cursor_id" , String .valueOf (cursorId ));
913
- }
914
- span .tag ("server.address" , serverId .getAddress ().getHost ());
915
- span .tag ("server.port" , String .valueOf (serverId .getAddress ().getPort ()));
916
- span .tag ("server.type" , message .getSettings ().getServerType ().name ());
917
-
918
- span .tag ("db.mongodb.server_connection_id" , this .description .getConnectionId ().toString ());
919
- } else {
920
- span = null ;
921
- }
922
-
923
- return span ;
924
- }
925
-
926
907
private class MessageHeaderCallback implements SingleResultCallback <ByteBuf > {
927
908
private final OperationContext operationContext ;
928
909
private final SingleResultCallback <ResponseBuffers > callback ;
@@ -1003,19 +984,88 @@ public void onResult(@Nullable final ByteBuf result, @Nullable final Throwable t
1003
984
1004
985
private static final StructuredLogger COMMAND_PROTOCOL_LOGGER = new StructuredLogger ("protocol.command" );
1005
986
1006
- private CommandEventSender createCommandEventSender (final CommandMessage message , final ByteBufferBsonOutput bsonOutput ,
987
+ private CommandEventSender createCommandEventSender (final CommandMessage message , final BsonDocument commandDocument ,
1007
988
final OperationContext operationContext ) {
1008
989
boolean listensOrLogs = commandListener != null || COMMAND_PROTOCOL_LOGGER .isRequired (DEBUG , getClusterId ());
1009
990
if (!recordEverything && (isMonitoringConnection || !opened () || !authenticated .get () || !listensOrLogs )) {
1010
991
return new NoOpCommandEventSender ();
1011
992
}
1012
993
return new LoggingCommandEventSender (
1013
994
SECURITY_SENSITIVE_COMMANDS , SECURITY_SENSITIVE_HELLO_COMMANDS , description , commandListener ,
1014
- operationContext , message , bsonOutput ,
995
+ operationContext , message , commandDocument ,
1015
996
COMMAND_PROTOCOL_LOGGER , loggerSettings );
1016
997
}
1017
998
1018
999
private ClusterId getClusterId () {
1019
1000
return description .getConnectionId ().getServerId ().getClusterId ();
1020
1001
}
1002
+
1003
+ /**
1004
+ * Creates a tracing span for the given command message.
1005
+ * <p>
1006
+ * The span is only created if tracing is enabled and the command is not security-sensitive.
1007
+ * It attaches various tags to the span, such as database system, namespace, query summary, opcode,
1008
+ * server address, port, server type, client and server connection IDs, and, if applicable,
1009
+ * transaction number and session ID. For cursor fetching commands, the parent context is retrieved using the cursor ID.
1010
+ * If command payload tracing is enabled, the command document is also attached as a tag.
1011
+ *
1012
+ * @param message the command message to trace
1013
+ * @param operationContext the operation context containing tracing and session information
1014
+ * @return the created {@link Span}, or {@code null} if tracing is not enabled or the command is security-sensitive
1015
+ */
1016
+ @ Nullable
1017
+ private Span createTracingSpan (final CommandMessage message , final OperationContext operationContext ) {
1018
+ TracingManager tracingManager = operationContext .getTracingManager ();
1019
+ BsonDocument command = message .getCommand ();
1020
+ String commandName = command .getFirstKey ();
1021
+ if (!tracingManager .isEnabled ()
1022
+ || SECURITY_SENSITIVE_COMMANDS .contains (commandName )
1023
+ || SECURITY_SENSITIVE_HELLO_COMMANDS .contains (commandName )) {
1024
+ return null ;
1025
+ }
1026
+
1027
+ // Retrieving the appropriate parent context for the span.
1028
+ TraceContext parentContext ;
1029
+ long cursorId = -1 ;
1030
+ if (command .containsKey ("getMore" )) {
1031
+ cursorId = command .getInt64 ("getMore" ).longValue ();
1032
+ parentContext = tracingManager .getCursorParentContext (cursorId );
1033
+ } else {
1034
+ parentContext = tracingManager .getParentContext (operationContext .getId ());
1035
+ }
1036
+
1037
+ Span span = tracingManager
1038
+ .addSpan ("Command " + commandName , parentContext )
1039
+ .tag (SYSTEM , "mongodb" )
1040
+ .tag (NAMESPACE , message .getNamespace ().getDatabaseName ())
1041
+ .tag (QUERY_SUMMARY , command .toString ())
1042
+ .tag (QUERY_OPCODE , String .valueOf (message .getOpCode ()));
1043
+
1044
+ if (cursorId != -1 ) {
1045
+ span .tag (CURSOR_ID , cursorId );
1046
+ }
1047
+
1048
+ tagServerAndConnectionInfo (span , message );
1049
+ tagSessionAndTransactionInfo (span , operationContext );
1050
+
1051
+ return span ;
1052
+ }
1053
+
1054
+ private void tagServerAndConnectionInfo (final Span span , final CommandMessage message ) {
1055
+ span .tag (SERVER_ADDRESS , serverId .getAddress ().getHost ())
1056
+ .tag (SERVER_PORT , String .valueOf (serverId .getAddress ().getPort ()))
1057
+ .tag (SERVER_TYPE , message .getSettings ().getServerType ().name ())
1058
+ .tag (CLIENT_CONNECTION_ID , this .description .getConnectionId ().toString ())
1059
+ .tag (SERVER_CONNECTION_ID , String .valueOf (this .description .getConnectionId ().getServerValue ()));
1060
+ }
1061
+
1062
+ private void tagSessionAndTransactionInfo (final Span span , final OperationContext operationContext ) {
1063
+ SessionContext sessionContext = operationContext .getSessionContext ();
1064
+ if (sessionContext .hasSession () && !sessionContext .isImplicitSession ()) {
1065
+ span .tag (TRANSACTION_NUMBER , String .valueOf (sessionContext .getTransactionNumber ()))
1066
+ .tag (SESSION_ID , String .valueOf (sessionContext .getSessionId ()
1067
+ .get (sessionContext .getSessionId ().getFirstKey ())
1068
+ .asBinary ().asUuid ()));
1069
+ }
1070
+ }
1021
1071
}
0 commit comments