diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index fb840d9ad0..745f41c289 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -145,7 +145,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera ServerDeprioritization serverDeprioritization = operationContext.getServerDeprioritization(); boolean selectionWaitingLogged = false; Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout(); - logServerSelectionStarted(clusterId, operationContext.getId(), serverSelector, description); + logServerSelectionStarted(operationContext, clusterId, serverSelector, description); while (true) { CountDownLatch currentPhaseLatch = phase.get(); ClusterDescription currentDescription = description; @@ -154,16 +154,11 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera computedServerSelectionTimeout, operationContext.getTimeoutContext()); if (!currentDescription.isCompatibleWithDriver()) { - logAndThrowIncompatibleException(operationContext.getId(), serverSelector, currentDescription); + logAndThrowIncompatibleException(operationContext, serverSelector, currentDescription); } if (serverTuple != null) { ServerAddress serverAddress = serverTuple.getServerDescription().getAddress(); - logServerSelectionSucceeded( - clusterId, - operationContext.getId(), - serverAddress, - serverSelector, - currentDescription); + logServerSelectionSucceeded(operationContext, clusterId, serverAddress, serverSelector, currentDescription); serverDeprioritization.updateCandidate(serverAddress); return serverTuple; } @@ -171,7 +166,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera logAndThrowTimeoutException(operationContext, serverSelector, currentDescription)); if (!selectionWaitingLogged) { - logServerSelectionWaiting(clusterId, operationContext.getId(), computedServerSelectionTimeout, serverSelector, currentDescription); + logServerSelectionWaiting(operationContext, clusterId, computedServerSelectionTimeout, serverSelector, currentDescription); selectionWaitingLogged = true; } connect(); @@ -197,11 +192,7 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati CountDownLatch currentPhase = phase.get(); ClusterDescription currentDescription = description; - logServerSelectionStarted( - clusterId, - operationContext.getId(), - serverSelector, - currentDescription); + logServerSelectionStarted(operationContext, clusterId, serverSelector, currentDescription); if (!handleServerSelectionRequest(request, currentPhase, currentDescription)) { notifyWaitQueueHandler(request); @@ -290,12 +281,11 @@ private boolean handleServerSelectionRequest( try { OperationContext operationContext = request.getOperationContext(); - long operationId = operationContext.getId(); if (currentPhase != request.phase) { CountDownLatch prevPhase = request.phase; request.phase = currentPhase; if (!description.isCompatibleWithDriver()) { - logAndThrowIncompatibleException(operationId, request.originalSelector, description); + logAndThrowIncompatibleException(operationContext, request.originalSelector, description); } @@ -309,23 +299,13 @@ private boolean handleServerSelectionRequest( if (serverTuple != null) { ServerAddress serverAddress = serverTuple.getServerDescription().getAddress(); - logServerSelectionSucceeded( - clusterId, - operationId, - serverAddress, - request.originalSelector, - description); + logServerSelectionSucceeded(operationContext, clusterId, serverAddress, request.originalSelector, description); serverDeprioritization.updateCandidate(serverAddress); request.onResult(serverTuple, null); return true; } if (prevPhase == null) { - logServerSelectionWaiting( - clusterId, - operationId, - request.getTimeout(), - request.originalSelector, - description); + logServerSelectionWaiting(operationContext, clusterId, request.getTimeout(), request.originalSelector, description); } } @@ -410,11 +390,11 @@ protected ClusterableServer createServer(final ServerAddress serverAddress) { } private void logAndThrowIncompatibleException( - final long operationId, + final OperationContext operationContext, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { MongoIncompatibleDriverException exception = createIncompatibleException(clusterDescription); - logServerSelectionFailed(clusterId, operationId, exception, serverSelector, clusterDescription); + logServerSelectionFailed(operationContext, clusterId, exception, serverSelector, clusterDescription); throw exception; } @@ -448,7 +428,7 @@ private void logAndThrowTimeoutException( MongoTimeoutException exception = operationContext.getTimeoutContext().hasTimeoutMS() ? new MongoOperationTimeoutException(message) : new MongoTimeoutException(message); - logServerSelectionFailed(clusterId, operationContext.getId(), exception, serverSelector, clusterDescription); + logServerSelectionFailed(operationContext, clusterId, exception, serverSelector, clusterDescription); throw exception; } @@ -557,16 +537,16 @@ public void run() { } static void logServerSelectionStarted( + final OperationContext operationContext, final ClusterId clusterId, - final long operationId, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection started", clusterId, asList( - new Entry(OPERATION, null), - new Entry(OPERATION_ID, operationId), + new Entry(OPERATION, operationContext.getOperationName()), + new Entry(OPERATION_ID, operationContext.getId()), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), "Server selection started for operation[ {}] with ID {}. Selector: {}, topology description: {}")); @@ -574,8 +554,8 @@ static void logServerSelectionStarted( } private static void logServerSelectionWaiting( + final OperationContext operationContext, final ClusterId clusterId, - final long operationId, final Timeout timeout, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { @@ -583,8 +563,8 @@ private static void logServerSelectionWaiting( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, INFO, "Waiting for suitable server to become available", clusterId, asList( - new Entry(OPERATION, null), - new Entry(OPERATION_ID, operationId), + new Entry(OPERATION, operationContext.getOperationName()), + new Entry(OPERATION_ID, operationContext.getId()), timeout.call(MILLISECONDS, () -> new Entry(REMAINING_TIME_MS, "infinite"), (ms) -> new Entry(REMAINING_TIME_MS, ms), @@ -597,8 +577,8 @@ private static void logServerSelectionWaiting( } private static void logServerSelectionFailed( + final OperationContext operationContext, final ClusterId clusterId, - final long operationId, final MongoException failure, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { @@ -612,8 +592,8 @@ private static void logServerSelectionFailed( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection failed", clusterId, asList( - new Entry(OPERATION, null), - new Entry(OPERATION_ID, operationId), + new Entry(OPERATION, operationContext.getOperationName()), + new Entry(OPERATION_ID, operationContext.getId()), new Entry(FAILURE, failureDescription), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), @@ -622,8 +602,8 @@ private static void logServerSelectionFailed( } static void logServerSelectionSucceeded( + final OperationContext operationContext, final ClusterId clusterId, - final long operationId, final ServerAddress serverAddress, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { @@ -631,8 +611,8 @@ static void logServerSelectionSucceeded( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection succeeded", clusterId, asList( - new Entry(OPERATION, null), - new Entry(OPERATION_ID, operationId), + new Entry(OPERATION, operationContext.getOperationName()), + new Entry(OPERATION_ID, operationContext.getId()), new Entry(SERVER_HOST, serverAddress.getHost()), new Entry(SERVER_PORT, serverAddress instanceof UnixServerAddress ? null : serverAddress.getPort()), new Entry(SELECTOR, serverSelector.toString()), diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index 588bd9f609..2129f28ca3 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -222,9 +222,9 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera throw createResolvedToMultipleHostsException(); } ClusterDescription curDescription = description; - logServerSelectionStarted(clusterId, operationContext.getId(), serverSelector, curDescription); + logServerSelectionStarted(operationContext, clusterId, serverSelector, curDescription); ServerTuple serverTuple = new ServerTuple(assertNotNull(server), curDescription.getServerDescriptions().get(0)); - logServerSelectionSucceeded(clusterId, operationContext.getId(), serverTuple.getServerDescription().getAddress(), + logServerSelectionSucceeded(operationContext, clusterId, serverTuple.getServerDescription().getAddress(), serverSelector, curDescription); return serverTuple; } @@ -254,8 +254,8 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati return; } Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout(); - ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(operationContext.getId(), serverSelector, - operationContext, computedServerSelectionTimeout, callback); + ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(serverSelector, operationContext, + computedServerSelectionTimeout, callback); if (initializationCompleted) { handleServerSelectionRequest(serverSelectionRequest); } else { @@ -309,9 +309,9 @@ private void handleServerSelectionRequest(final ServerSelectionRequest serverSel } else { ClusterDescription curDescription = description; logServerSelectionStarted( - clusterId, serverSelectionRequest.operationId, serverSelectionRequest.serverSelector, curDescription); + serverSelectionRequest.operationContext, clusterId, serverSelectionRequest.serverSelector, curDescription); ServerTuple serverTuple = new ServerTuple(assertNotNull(server), curDescription.getServerDescriptions().get(0)); - logServerSelectionSucceeded(clusterId, serverSelectionRequest.operationId, + logServerSelectionSucceeded(serverSelectionRequest.operationContext, clusterId, serverTuple.getServerDescription().getAddress(), serverSelectionRequest.serverSelector, curDescription); serverSelectionRequest.onSuccess(serverTuple); } @@ -416,15 +416,13 @@ public void run() { } private static final class ServerSelectionRequest { - private final long operationId; private final ServerSelector serverSelector; private final SingleResultCallback callback; private final Timeout timeout; private final OperationContext operationContext; - private ServerSelectionRequest(final long operationId, final ServerSelector serverSelector, final OperationContext operationContext, + private ServerSelectionRequest(final ServerSelector serverSelector, final OperationContext operationContext, final Timeout timeout, final SingleResultCallback callback) { - this.operationId = operationId; this.serverSelector = serverSelector; this.timeout = timeout; this.operationContext = operationContext; diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java index bf29ebc051..7e0de92da1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java +++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java @@ -49,10 +49,17 @@ public class OperationContext { private final TimeoutContext timeoutContext; @Nullable private final ServerApi serverApi; + @Nullable + private final String operationName; public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, @Nullable final ServerApi serverApi) { - this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi); + this(requestContext, sessionContext, timeoutContext, serverApi, null); + } + + public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, + @Nullable final ServerApi serverApi, @Nullable final String operationName) { + this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, operationName); } public static OperationContext simpleOperationContext( @@ -61,7 +68,8 @@ public static OperationContext simpleOperationContext( IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, new TimeoutContext(timeoutSettings), - serverApi); + serverApi, + null); } public static OperationContext simpleOperationContext(final TimeoutContext timeoutContext) { @@ -69,15 +77,20 @@ public static OperationContext simpleOperationContext(final TimeoutContext timeo IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, timeoutContext, + null, null); } public OperationContext withSessionContext(final SessionContext sessionContext) { - return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi); + return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName); } public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) { - return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi); + return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName); + } + + public OperationContext withOperationName(final String operationName) { + return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName); } public long getId() { @@ -101,19 +114,26 @@ public ServerApi getServerApi() { return serverApi; } + @Nullable + public String getOperationName() { + return operationName; + } + @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) public OperationContext(final long id, final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, final ServerDeprioritization serverDeprioritization, - @Nullable final ServerApi serverApi) { + @Nullable final ServerApi serverApi, + @Nullable final String operationName) { this.id = id; this.serverDeprioritization = serverDeprioritization; this.requestContext = requestContext; this.sessionContext = sessionContext; this.timeoutContext = timeoutContext; this.serverApi = serverApi; + this.operationName = operationName; } @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) @@ -121,13 +141,15 @@ public OperationContext(final long id, final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, - @Nullable final ServerApi serverApi) { + @Nullable final ServerApi serverApi, + @Nullable final String operationName) { this.id = id; this.serverDeprioritization = new ServerDeprioritization(); this.requestContext = requestContext; this.sessionContext = sessionContext; this.timeoutContext = timeoutContext; this.serverApi = serverApi; + this.operationName = operationName; } diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index ec769e4f7a..eef275faf7 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -104,9 +104,6 @@ public enum Name { COMMAND_NAME("commandName"), REQUEST_ID("requestId"), OPERATION_ID("operationId"), - /** - * Not supported. - */ OPERATION("operation"), AWAITED("awaited"), SERVICE_ID("serviceId"), diff --git a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java index bbd7ce7300..bc7e6655bc 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java @@ -31,6 +31,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class AbortTransactionOperation extends TransactionOperation { + private static final String COMMAND_NAME = "abortTransaction"; private BsonDocument recoveryToken; public AbortTransactionOperation(final WriteConcern writeConcern) { @@ -43,8 +44,8 @@ public AbortTransactionOperation recoveryToken(@Nullable final BsonDocument reco } @Override - protected String getCommandName() { - return "abortTransaction"; + public String getCommandName() { + return COMMAND_NAME; } @Override diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java index 1f25bc87bf..f9f25cd5fe 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java @@ -135,6 +135,11 @@ public AggregateOperation timeoutMode(@Nullable final TimeoutMode timeoutMode return this; } + @Override + public String getCommandName() { + return wrapped.getCommandName(); + } + @Override public BatchCursor execute(final ReadBinding binding) { return wrapped.execute(binding); @@ -145,17 +150,19 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb wrapped.executeAsync(binding, callback); } + @Override public ReadOperation asExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { return createExplainableOperation(verbosity, resultDecoder); } + @Override public AsyncReadOperation asAsyncExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { return createExplainableOperation(verbosity, resultDecoder); } CommandReadOperation createExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { - return new CommandReadOperation<>(getNamespace().getDatabaseName(), + return new CommandReadOperation<>(getNamespace().getDatabaseName(), wrapped.getCommandName(), (operationContext, serverDescription, connectionDescription) -> { BsonDocument command = wrapped.getCommand(operationContext, UNKNOWN_WIRE_VERSION); applyMaxTimeMS(operationContext.getTimeoutContext(), command); @@ -166,5 +173,4 @@ CommandReadOperation createExplainableOperation(@Nullable final ExplainVe MongoNamespace getNamespace() { return wrapped.getNamespace(); } - } diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java index 7ba2c56b87..3a650fc2f9 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java @@ -53,6 +53,7 @@ import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; class AggregateOperationImpl implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "aggregate"; private static final String RESULT = "result"; private static final String CURSOR = "cursor"; private static final String FIRST_BATCH = "firstBatch"; @@ -185,6 +186,11 @@ AggregateOperationImpl hint(@Nullable final BsonValue hint) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { return executeRetryableRead(binding, namespace.getDatabaseName(), @@ -207,7 +213,7 @@ private CommandCreator getCommandCreator() { } BsonDocument getCommand(final OperationContext operationContext, final int maxWireVersion) { - BsonDocument commandDocument = new BsonDocument("aggregate", aggregateTarget.create()); + BsonDocument commandDocument = new BsonDocument(getCommandName(), aggregateTarget.create()); appendReadConcernToCommand(operationContext.getSessionContext(), maxWireVersion, commandDocument); commandDocument.put("pipeline", pipelineCreator.create()); setNonTailableCursorMaxTimeSupplier(timeoutMode, operationContext); diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java index 904f85042a..022c00383b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java @@ -54,7 +54,8 @@ * *

This class is not part of the public API and may be removed or changed at any time

*/ -public class AggregateToCollectionOperation implements AsyncReadOperation, ReadOperation { +public class AggregateToCollectionOperation implements ReadOperation, AsyncReadOperation { + private static final String COMMAND_NAME = "aggregate"; private final MongoNamespace namespace; private final List pipeline; private final WriteConcern writeConcern; @@ -151,6 +152,11 @@ public AggregateToCollectionOperation timeoutMode(@Nullable final TimeoutMode ti return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Void execute(final ReadBinding binding) { return executeRetryableRead(binding, @@ -183,7 +189,7 @@ private CommandOperationHelper.CommandCreator getCommandCreator() { BsonValue aggregationTarget = (aggregationLevel == AggregationLevel.DATABASE) ? new BsonInt32(1) : new BsonString(namespace.getCollectionName()); - BsonDocument commandDocument = new BsonDocument("aggregate", aggregationTarget); + BsonDocument commandDocument = new BsonDocument(getCommandName(), aggregationTarget); commandDocument.put("pipeline", new BsonArray(pipeline)); if (allowDiskUse != null) { commandDocument.put("allowDiskUse", BsonBoolean.valueOf(allowDiskUse)); diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncReadOperation.java index 75b18f5cb0..3c9cf2117e 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncReadOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncReadOperation.java @@ -28,6 +28,11 @@ */ public interface AsyncReadOperation { + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + /** * General execute which can return anything of type T * diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteOperation.java index 334c3bde8a..ca6f5f910a 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteOperation.java @@ -28,6 +28,11 @@ */ public interface AsyncWriteOperation { + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + /** * General execute which can return anything of type T * diff --git a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java index e523ee3f38..c1fc6adc9f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java @@ -46,6 +46,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public abstract class BaseFindAndModifyOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "findAndModify"; private final MongoNamespace namespace; private final WriteConcern writeConcern; private final boolean retryWrites; @@ -68,6 +69,12 @@ protected BaseFindAndModifyOperation(final MongoNamespace namespace, final Write this.decoder = notNull("decoder", decoder); } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override public T execute(final WriteBinding binding) { return executeRetryableWrite(binding, getDatabaseName(), null, getFieldNameValidator(), @@ -184,7 +191,7 @@ private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { SessionContext sessionContext = operationContext.getSessionContext(); - BsonDocument commandDocument = new BsonDocument("findAndModify", new BsonString(getNamespace().getCollectionName())); + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(getNamespace().getCollectionName())); putIfNotNull(commandDocument, "query", getFilter()); putIfNotNull(commandDocument, "fields", getProjection()); putIfNotNull(commandDocument, "sort", getSort()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 4ef28c796c..84d5513dd6 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -192,6 +192,11 @@ private AggregateOperationImpl getAggregateOperation(final Time return wrapped; } + @Override + public String getCommandName() { + return wrapped.getCommandName(); + } + @Override public BatchCursor execute(final ReadBinding binding) { TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java index f6ff7632c8..b2a3c93e4d 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java @@ -146,7 +146,7 @@ /** * This class is not part of the public API and may be removed or changed at any time. */ -public final class ClientBulkWriteOperation implements WriteOperation, AsyncWriteOperation { +public final class ClientBulkWriteOperation implements AsyncWriteOperation, WriteOperation { private static final ConcreteClientBulkWriteOptions EMPTY_OPTIONS = new ConcreteClientBulkWriteOptions(); private static final String BULK_WRITE_COMMAND_NAME = "bulkWrite"; private static final EncoderContext DEFAULT_ENCODER_CONTEXT = EncoderContext.builder().build(); @@ -177,6 +177,11 @@ public ClientBulkWriteOperation( this.codecRegistry = codecRegistry; } + @Override + public String getCommandName() { + return "bulkWrite"; + } + @Override public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBulkWriteException { WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java index ea89dfb303..1e395315c2 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java @@ -33,20 +33,28 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CommandReadOperation implements AsyncReadOperation, ReadOperation { + private final String commandName; private final String databaseName; private final CommandCreator commandCreator; private final Decoder decoder; - public CommandReadOperation(final String databaseName, final BsonDocument command, final Decoder decoder) { - this(databaseName, (operationContext, serverDescription, connectionDescription) -> command, decoder); + public CommandReadOperation(final String databaseName, final BsonDocument command, final Decoder decoder) { + this(databaseName, command.getFirstKey(), (operationContext, serverDescription, connectionDescription) -> command, decoder); } - public CommandReadOperation(final String databaseName, final CommandCreator commandCreator, final Decoder decoder) { + public CommandReadOperation(final String databaseName, final String commandName, final CommandCreator commandCreator, + final Decoder decoder) { + this.commandName = notNull("commandName", commandName); this.databaseName = notNull("databaseName", databaseName); this.commandCreator = notNull("commandCreator", commandCreator); this.decoder = notNull("decoder", decoder); } + @Override + public String getCommandName() { + return commandName; + } + @Override public T execute(final ReadBinding binding) { return executeRetryableRead(binding, databaseName, commandCreator, decoder, diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java index 6c2338d47d..998a002f34 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java @@ -46,6 +46,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CommitTransactionOperation extends TransactionOperation { + private static final String COMMAND_NAME = "commitTransaction"; private final boolean alreadyCommitted; private BsonDocument recoveryToken; @@ -110,10 +111,9 @@ private static boolean shouldAddUnknownTransactionCommitResultLabel(final MongoE return false; } - @Override - protected String getCommandName() { - return "commitTransaction"; + public String getCommandName() { + return COMMAND_NAME; } @Override diff --git a/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java index 1095dd4450..6789adb093 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java @@ -38,6 +38,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CountDocumentsOperation implements AsyncReadOperation, ReadOperation { + private static final String COMMAND_NAME = "aggregate"; private static final Decoder DECODER = new BsonDocumentCodec(); private final MongoNamespace namespace; private boolean retryReads; @@ -119,6 +120,11 @@ public CountDocumentsOperation comment(@Nullable final BsonValue comment) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Long execute(final ReadBinding binding) { try (BatchCursor cursor = getAggregateOperation().execute(binding)) { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java index f9aa0a8eaa..23dfe4b52e 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java @@ -42,6 +42,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CountOperation implements AsyncReadOperation, ReadOperation { + private static final String COMMAND_NAME = "count"; private static final Decoder DECODER = new BsonDocumentCodec(); private final MongoNamespace namespace; private boolean retryReads; @@ -109,6 +110,11 @@ public CountOperation collation(@Nullable final Collation collation) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Long execute(final ReadBinding binding) { return executeRetryableRead(binding, namespace.getDatabaseName(), @@ -131,7 +137,7 @@ private CommandReadTransformerAsync asyncTransformer() { private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument document = new BsonDocument("count", new BsonString(namespace.getCollectionName())); + BsonDocument document = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); appendReadConcernToCommand(operationContext.getSessionContext(), connectionDescription.getMaxWireVersion(), document); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java index d9a11d2028..582a622d21 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java @@ -231,6 +231,11 @@ public CreateCollectionOperation encryptedFields(@Nullable final BsonDocument en return this; } + @Override + public String getCommandName() { + return "createCollection"; + } + @Override public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java index 76de0757ff..34eaea3713 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java @@ -58,6 +58,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CreateIndexesOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "createIndexes"; private final MongoNamespace namespace; private final List requests; private final WriteConcern writeConcern; @@ -99,6 +100,11 @@ public CreateIndexesOperation commitQuorum(@Nullable final CreateIndexCommitQuor return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Void execute(final WriteBinding binding) { try { @@ -189,7 +195,7 @@ private BsonDocument getIndex(final IndexRequest request) { private CommandOperationHelper.CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument command = new BsonDocument("createIndexes", new BsonString(namespace.getCollectionName())); + BsonDocument command = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); List values = new ArrayList<>(); for (IndexRequest request : requests) { values.add(getIndex(request)); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateSearchIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateSearchIndexesOperation.java index a57087e921..bf75ee88b0 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateSearchIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateSearchIndexesOperation.java @@ -41,6 +41,11 @@ public CreateSearchIndexesOperation(final MongoNamespace namespace, final List requests) { return requests.stream() .map(CreateSearchIndexesOperation::convert) @@ -63,7 +68,7 @@ private static BsonDocument convert(final SearchIndexRequest request) { @Override BsonDocument buildCommand() { - return new BsonDocument(COMMAND_NAME, new BsonString(getNamespace().getCollectionName())) + return new BsonDocument(getCommandName(), new BsonString(getNamespace().getCollectionName())) .append("indexes", convert(indexRequests)); } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java index 3636db0859..26ece818ec 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java @@ -123,6 +123,11 @@ public CreateViewOperation collation(@Nullable final Collation collation) { return this; } + @Override + public String getCommandName() { + return "createView"; + } + @Override public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> { diff --git a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java index 547e5f0dfc..6fe02f7ac0 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java @@ -46,6 +46,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class DistinctOperation implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "distinct"; private static final String VALUES = "values"; private final MongoNamespace namespace; private final String fieldName; @@ -107,6 +108,11 @@ public DistinctOperation hint(@Nullable final BsonValue hint) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { return executeRetryableRead(binding, namespace.getDatabaseName(), getCommandCreator(), createCommandDecoder(), @@ -126,7 +132,7 @@ private Codec createCommandDecoder() { private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("distinct", new BsonString(namespace.getCollectionName())); + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); appendReadConcernToCommand(operationContext.getSessionContext(), connectionDescription.getMaxWireVersion(), commandDocument); commandDocument.put("key", new BsonString(fieldName)); putIfNotNull(commandDocument, "query", filter); diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java index d879f83e54..bf9ac32637 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java @@ -86,6 +86,11 @@ public DropCollectionOperation autoEncryptedFields(final boolean autoEncryptedFi return this; } + @Override + public String getCommandName() { + return "dropCollection"; + } + @Override public Void execute(final WriteBinding binding) { BsonDocument localEncryptedFields = getEncryptedFields((ReadWriteBinding) binding); diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java index 9dd942cb72..8900d112bb 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java @@ -55,6 +55,11 @@ public WriteConcern getWriteConcern() { return writeConcern; } + @Override + public String getCommandName() { + return "dropDatabase"; + } + @Override public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> { diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java index e66a4e10bb..81fcf5129e 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java @@ -41,6 +41,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class DropIndexOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "dropIndexes"; private final MongoNamespace namespace; private final String indexName; private final BsonDocument indexKeys; @@ -64,6 +65,11 @@ public WriteConcern getWriteConcern() { return writeConcern; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Void execute(final WriteBinding binding) { try { @@ -90,7 +96,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall private CommandOperationHelper.CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument command = new BsonDocument("dropIndexes", new BsonString(namespace.getCollectionName())); + BsonDocument command = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); if (indexName != null) { command.put("index", new BsonString(indexName)); } else { diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropSearchIndexOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropSearchIndexOperation.java index 657dedca94..a440dbd0e7 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropSearchIndexOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropSearchIndexOperation.java @@ -37,6 +37,11 @@ final class DropSearchIndexOperation extends AbstractWriteSearchIndexOperation { this.indexName = indexName; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override void swallowOrThrow(@Nullable final E mongoExecutionException) throws E { if (mongoExecutionException != null && !isNamespaceError(mongoExecutionException)) { @@ -46,7 +51,7 @@ void swallowOrThrow(@Nullable final E mongoExecutionExcept @Override BsonDocument buildCommand() { - return new BsonDocument(COMMAND_NAME, new BsonString(getNamespace().getCollectionName())) + return new BsonDocument(getCommandName(), new BsonString(getNamespace().getCollectionName())) .append("name", new BsonString(indexName)); } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java b/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java index 17f7e61740..1d8ddd429e 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java @@ -45,6 +45,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class EstimatedDocumentCountOperation implements AsyncReadOperation, ReadOperation { + private static final String COMMAND_NAME = "count"; private static final Decoder DECODER = new BsonDocumentCodec(); private final MongoNamespace namespace; private boolean retryReads; @@ -69,6 +70,11 @@ public EstimatedDocumentCountOperation comment(@Nullable final BsonValue comment return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Long execute(final ReadBinding binding) { try { @@ -108,7 +114,7 @@ private long transformResult(final BsonDocument result, final ConnectionDescript private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument document = new BsonDocument("count", new BsonString(namespace.getCollectionName())); + BsonDocument document = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); appendReadConcernToCommand(operationContext.getSessionContext(), connectionDescription.getMaxWireVersion(), document); if (comment != null) { document.put("comment", comment); diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java index 373b17949d..db9d61b1dd 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java @@ -88,12 +88,13 @@ public FindAndDeleteOperation let(@Nullable final BsonDocument variables) { return this; } + @Override protected FieldNameValidator getFieldNameValidator() { return NoOpFieldNameValidator.INSTANCE; } + @Override protected void specializeCommand(final BsonDocument commandDocument, final ConnectionDescription connectionDescription) { commandDocument.put("remove", BsonBoolean.TRUE); } - } diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java index 59362cc667..7073260a4c 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java @@ -130,12 +130,14 @@ public FindAndReplaceOperation let(@Nullable final BsonDocument variables) { return this; } + @Override protected FieldNameValidator getFieldNameValidator() { return new MappedFieldNameValidator( NoOpFieldNameValidator.INSTANCE, singletonMap("update", ReplacingDocumentFieldNameValidator.INSTANCE)); } + @Override protected void specializeCommand(final BsonDocument commandDocument, final ConnectionDescription connectionDescription) { commandDocument.put("new", new BsonBoolean(!isReturnOriginal())); putIfTrue(commandDocument, "upsert", isUpsert()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java index bba62d6262..e83deba30f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java @@ -159,10 +159,12 @@ public FindAndUpdateOperation let(@Nullable final BsonDocument variables) { return this; } + @Override protected FieldNameValidator getFieldNameValidator() { return new MappedFieldNameValidator(NoOpFieldNameValidator.INSTANCE, singletonMap("update", new UpdateFieldNameValidator())); } + @Override protected void specializeCommand(final BsonDocument commandDocument, final ConnectionDescription connectionDescription) { commandDocument.put("new", new BsonBoolean(!isReturnOriginal())); putIfTrue(commandDocument, "upsert", isUpsert()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java index 4f834bee34..ab37613db1 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java @@ -68,6 +68,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class FindOperation implements AsyncExplainableReadOperation>, ExplainableReadOperation> { + private static final String COMMAND_NAME = "find"; private static final String FIRST_BATCH = "firstBatch"; private final MongoNamespace namespace; @@ -284,6 +285,11 @@ public FindOperation allowDiskUse(@Nullable final Boolean allowDiskUse) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { IllegalStateException invalidTimeoutModeException = invalidTimeoutModeException(); @@ -352,11 +358,9 @@ private static SingleResultCallback exceptionTransformingCallback(final S } @Override - public ReadOperation asExplainableOperation(@Nullable final ExplainVerbosity verbosity, - final Decoder resultDecoder) { + public CommandReadOperation asExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { return createExplainableOperation(verbosity, resultDecoder); } - @Override public AsyncReadOperation asAsyncExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { @@ -364,7 +368,7 @@ public AsyncReadOperation asAsyncExplainableOperation(@Nullable final Exp } CommandReadOperation createExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { - return new CommandReadOperation<>(getNamespace().getDatabaseName(), + return new CommandReadOperation<>(getNamespace().getDatabaseName(), getCommandName(), (operationContext, serverDescription, connectionDescription) -> { BsonDocument command = getCommand(operationContext, UNKNOWN_WIRE_VERSION); applyMaxTimeMS(operationContext.getTimeoutContext(), command); @@ -373,7 +377,7 @@ CommandReadOperation createExplainableOperation(@Nullable final ExplainVe } private BsonDocument getCommand(final OperationContext operationContext, final int maxWireVersion) { - BsonDocument commandDocument = new BsonDocument("find", new BsonString(namespace.getCollectionName())); + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); appendReadConcernToCommand(operationContext.getSessionContext(), maxWireVersion, commandDocument); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java index 73abe905ae..cb20bbf897 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java @@ -70,6 +70,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class ListCollectionsOperation implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "listCollections"; private final String databaseName; private final Decoder decoder; private boolean retryReads; @@ -157,6 +158,11 @@ public ListCollectionsOperation timeoutMode(@Nullable final TimeoutMode timeo return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext()); @@ -215,7 +221,7 @@ private CommandReadTransformerAsync> asyncTran private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("listCollections", new BsonInt32(1)) + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonInt32(1)) .append("cursor", getCursorDocumentFromBatchSize(batchSize == 0 ? null : batchSize)); putIfNotNull(commandDocument, "filter", filter); putIfTrue(commandDocument, "nameOnly", nameOnly); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java index 5f61c9192d..ae05eb245b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java @@ -43,6 +43,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class ListDatabasesOperation implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "listDatabases"; private static final String DATABASES = "databases"; private final Decoder decoder; private boolean retryReads; @@ -101,6 +102,11 @@ public ListDatabasesOperation comment(@Nullable final BsonValue comment) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { return executeRetryableRead(binding, "admin", getCommandCreator(), CommandResultDocumentCodec.create(decoder, DATABASES), @@ -115,7 +121,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("listDatabases", new BsonInt32(1)); + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonInt32(1)); putIfNotNull(commandDocument, "filter", filter); putIfNotNull(commandDocument, "nameOnly", nameOnly); putIfNotNull(commandDocument, "authorizedDatabases", authorizedDatabasesOnly); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java index e540f752db..d52021b2dc 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java @@ -49,8 +49,8 @@ import static com.mongodb.internal.operation.CursorHelper.getCursorDocumentFromBatchSize; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.setNonTailableCursorMaxTimeSupplier; import static com.mongodb.internal.operation.OperationHelper.canRetryRead; +import static com.mongodb.internal.operation.OperationHelper.setNonTailableCursorMaxTimeSupplier; import static com.mongodb.internal.operation.SingleBatchCursor.createEmptySingleBatchCursor; import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; import static com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute; @@ -65,6 +65,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class ListIndexesOperation implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "listIndexes"; private final MongoNamespace namespace; private final Decoder decoder; private boolean retryReads; @@ -116,6 +117,11 @@ public ListIndexesOperation timeoutMode(@Nullable final TimeoutMode timeoutMo return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext()); @@ -165,7 +171,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("listIndexes", new BsonString(namespace.getCollectionName())) + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())) .append("cursor", getCursorDocumentFromBatchSize(batchSize == 0 ? null : batchSize)); setNonTailableCursorMaxTimeSupplier(timeoutMode, operationContext); putIfNotNull(commandDocument, "comment", comment); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java index 3dfde30511..dd28e5f3e2 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java @@ -44,6 +44,7 @@ */ public final class ListSearchIndexesOperation implements AsyncExplainableReadOperation>, ExplainableReadOperation> { + private static final String COMMAND_NAME = "aggregate"; private static final String STAGE_LIST_SEARCH_INDEXES = "$listSearchIndexes"; private final MongoNamespace namespace; private final Decoder decoder; @@ -73,6 +74,11 @@ public ListSearchIndexesOperation(final MongoNamespace namespace, final Decoder< this.retryReads = retryReads; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { try { diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java index 327aa5e5fa..79151c0fb6 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java @@ -59,6 +59,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class MapReduceToCollectionOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "mapReduce"; private final MongoNamespace namespace; private final BsonJavaScript mapFunction; private final BsonJavaScript reduceFunction; @@ -208,6 +209,11 @@ public MapReduceToCollectionOperation collation(@Nullable final Collation collat return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public MapReduceStatistics execute(final WriteBinding binding) { return executeCommand(binding, namespace.getDatabaseName(), getCommandCreator(), transformer(binding @@ -243,7 +249,7 @@ public AsyncReadOperation asExplainableOperationAsync(final Explai } private CommandReadOperation createExplainableOperation(final ExplainVerbosity explainVerbosity) { - return new CommandReadOperation<>(getNamespace().getDatabaseName(), + return new CommandReadOperation<>(getNamespace().getDatabaseName(), getCommandName(), (operationContext, serverDescription, connectionDescription) -> { BsonDocument command = getCommandCreator().create(operationContext, serverDescription, connectionDescription); applyMaxTimeMS(operationContext.getTimeoutContext(), command); diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java index 273d8595ec..76f3e67430 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java @@ -53,8 +53,9 @@ * *

This class is not part of the public API and may be removed or changed at any time

*/ -public class MapReduceWithInlineResultsOperation implements AsyncReadOperation>, - ReadOperation> { +public class MapReduceWithInlineResultsOperation implements ReadOperation>, + AsyncReadOperation> { + private static final String COMMAND_NAME = "mapReduce"; private final MongoNamespace namespace; private final BsonJavaScript mapFunction; private final BsonJavaScript reduceFunction; @@ -164,6 +165,11 @@ public MapReduceWithInlineResultsOperation collation(@Nullable final Collatio return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public MapReduceBatchCursor execute(final ReadBinding binding) { return executeRetryableRead(binding, namespace.getDatabaseName(), @@ -188,7 +194,7 @@ public AsyncReadOperation asExplainableOperationAsync(final Explai } private CommandReadOperation createExplainableOperation(final ExplainVerbosity explainVerbosity) { - return new CommandReadOperation<>(namespace.getDatabaseName(), + return new CommandReadOperation<>(namespace.getDatabaseName(), getCommandName(), (operationContext, serverDescription, connectionDescription) -> { BsonDocument command = getCommandCreator().create(operationContext, serverDescription, connectionDescription); applyMaxTimeMS(operationContext.getTimeoutContext(), command); @@ -214,7 +220,7 @@ private CommandReadTransformerAsync> private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("mapReduce", new BsonString(namespace.getCollectionName())) + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())) .append("map", getMapFunction()) .append("reduce", getReduceFunction()) .append("out", new BsonDocument("inline", new BsonInt32(1))); diff --git a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java index 06d392bceb..9bc947f045 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java @@ -49,6 +49,7 @@ import org.bson.BsonValue; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; @@ -84,17 +85,20 @@ public class MixedBulkWriteOperation implements AsyncWriteOperation writeRequests, final boolean ordered, final WriteConcern writeConcern, final boolean retryWrites) { + notNull("writeRequests", writeRequests); + isTrueArgument("writeRequests is not an empty list", !writeRequests.isEmpty()); + this.commandName = notNull("commandName", writeRequests.get(0).getType().toString().toLowerCase(Locale.ROOT)); this.namespace = notNull("namespace", namespace); - this.writeRequests = notNull("writes", writeRequests); + this.writeRequests = writeRequests; this.ordered = ordered; this.writeConcern = notNull("writeConcern", writeConcern); this.retryWrites = retryWrites; - isTrueArgument("writes is not an empty list", !writeRequests.isEmpty()); } public MongoNamespace getNamespace() { @@ -175,6 +179,11 @@ private boolean shouldAttemptToRetryWrite(final RetryState retryState, final Thr return decision; } + @Override + public String getCommandName() { + return commandName; + } + @Override public BulkWriteResult execute(final WriteBinding binding) { TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); @@ -420,8 +429,9 @@ private BsonDocument executeCommand( final OperationContext operationContext, final Connection connection, final BulkWriteBatch batch) { + commandName = batch.getCommand().getFirstKey(); return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), - operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload()); + operationContext.withOperationName(commandName), shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload()); } private void executeCommandAsync( @@ -430,8 +440,10 @@ private void executeCommandAsync( final AsyncConnection connection, final BulkWriteBatch batch, final SingleResultCallback callback) { + commandName = batch.getCommand().getFirstKey(); connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), - operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload(), callback); + operationContext.withOperationName(commandName), shouldExpectResponse(batch, effectiveWriteConcern), + batch.getPayload(), callback); } private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) { diff --git a/driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java index aa5d2e7d45..a60e60f58f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java @@ -25,6 +25,11 @@ */ public interface ReadOperation { + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + /** * General execute which can return anything of type T * diff --git a/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java index fd727f2fd8..1ca81e215b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java @@ -48,6 +48,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class RenameCollectionOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "renameCollection"; private final MongoNamespace originalNamespace; private final MongoNamespace newNamespace; private final WriteConcern writeConcern; @@ -73,6 +74,11 @@ public RenameCollectionOperation dropTarget(final boolean dropTarget) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> executeCommand(binding, "admin", getCommand(), connection, @@ -94,7 +100,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall } private BsonDocument getCommand() { - BsonDocument commandDocument = new BsonDocument("renameCollection", new BsonString(originalNamespace.getFullName())) + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(originalNamespace.getFullName())) .append("to", new BsonString(newNamespace.getFullName())) .append("dropTarget", BsonBoolean.valueOf(dropTarget)); appendWriteConcernToCommand(writeConcern, commandDocument); diff --git a/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java index 8bf7ee76d2..e344cfb2b6 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java @@ -42,7 +42,7 @@ * *

This class is not part of the public API and may be removed or changed at any time

*/ -public abstract class TransactionOperation implements WriteOperation, AsyncWriteOperation { +public abstract class TransactionOperation implements AsyncWriteOperation, WriteOperation { private final WriteConcern writeConcern; TransactionOperation(final WriteConcern writeConcern) { @@ -82,12 +82,5 @@ CommandCreator getCommandCreator() { }; } - /** - * Gets the command name. - * - * @return the command name - */ - protected abstract String getCommandName(); - protected abstract Function getRetryCommandModifier(TimeoutContext timeoutContext); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/UpdateSearchIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/UpdateSearchIndexesOperation.java index 7bd3373068..ca23fd8e50 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/UpdateSearchIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/UpdateSearchIndexesOperation.java @@ -34,9 +34,14 @@ final class UpdateSearchIndexesOperation extends AbstractWriteSearchIndexOperati this.request = request; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override BsonDocument buildCommand() { - return new BsonDocument(COMMAND_NAME, new BsonString(getNamespace().getCollectionName())) + return new BsonDocument(getCommandName(), new BsonString(getNamespace().getCollectionName())) .append("name", new BsonString(request.getIndexName())) .append("definition", request.getDefinition()); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java index 1a4fee36e1..e7e606bd01 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java @@ -25,6 +25,11 @@ */ public interface WriteOperation { + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + /** * General execute which can return anything of type T * diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java index 09976e363d..30792bf048 100644 --- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java +++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java @@ -379,11 +379,7 @@ public static ReadWriteBinding getBinding(final ReadPreference readPreference) { } public static OperationContext createNewOperationContext(final TimeoutSettings timeoutSettings) { - return new OperationContext(OPERATION_CONTEXT.getId(), - OPERATION_CONTEXT.getRequestContext(), - OPERATION_CONTEXT.getSessionContext(), - new TimeoutContext(timeoutSettings), - OPERATION_CONTEXT.getServerApi()); + return OPERATION_CONTEXT.withTimeoutContext(new TimeoutContext(timeoutSettings)); } private static ReadWriteBinding getBinding(final Cluster cluster, diff --git a/driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java b/driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java index 4d8eb22cb7..acd9c3d606 100644 --- a/driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java +++ b/driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java @@ -47,12 +47,8 @@ * Operation for bulk writes for the legacy API. */ final class LegacyMixedBulkWriteOperation implements WriteOperation { - private final WriteConcern writeConcern; - private final MongoNamespace namespace; - private final List writeRequests; + private final MixedBulkWriteOperation wrappedOperation; private final WriteRequest.Type type; - private final boolean ordered; - private final boolean retryWrites; private Boolean bypassDocumentValidation; static LegacyMixedBulkWriteOperation createBulkWriteOperationForInsert(final MongoNamespace namespace, final boolean ordered, @@ -79,17 +75,14 @@ static LegacyMixedBulkWriteOperation createBulkWriteOperationForDelete(final Mon private LegacyMixedBulkWriteOperation(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern, final boolean retryWrites, final List writeRequests, final WriteRequest.Type type) { - isTrueArgument("writeRequests not empty", !writeRequests.isEmpty()); - this.writeRequests = notNull("writeRequests", writeRequests); + notNull("writeRequests", writeRequests); + isTrueArgument("writeRequests is not an empty list", !writeRequests.isEmpty()); this.type = type; - this.ordered = ordered; - this.namespace = notNull("namespace", namespace); - this.writeConcern = notNull("writeConcern", writeConcern); - this.retryWrites = retryWrites; + this.wrappedOperation = new MixedBulkWriteOperation(namespace, writeRequests, ordered, writeConcern, retryWrites); } List getWriteRequests() { - return writeRequests; + return wrappedOperation.getWriteRequests(); } LegacyMixedBulkWriteOperation bypassDocumentValidation(@Nullable final Boolean bypassDocumentValidation) { @@ -97,11 +90,15 @@ LegacyMixedBulkWriteOperation bypassDocumentValidation(@Nullable final Boolean b return this; } + @Override + public String getCommandName() { + return wrappedOperation.getCommandName(); + } + @Override public WriteConcernResult execute(final WriteBinding binding) { try { - BulkWriteResult result = new MixedBulkWriteOperation(namespace, writeRequests, ordered, writeConcern, retryWrites) - .bypassDocumentValidation(bypassDocumentValidation).execute(binding); + BulkWriteResult result = wrappedOperation.bypassDocumentValidation(bypassDocumentValidation).execute(binding); if (result.wasAcknowledged()) { return translateBulkWriteResult(result); } else { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index f8371c8afb..a01dc7e3ea 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -232,6 +232,11 @@ AsyncReadOperation> getOperation() { return operation; } + @Override + public String getCommandName() { + return operation.getCommandName(); + } + @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { operation.executeAsync(binding, callback::onResult); @@ -249,6 +254,11 @@ AsyncWriteOperation getOperation() { return operation; } + @Override + public String getCommandName() { + return operation.getCommandName(); + } + @Override public void executeAsync(final AsyncWriteBinding binding, final SingleResultCallback callback) { operation.executeAsync(binding, (result, t) -> callback.onResult(null, t)); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index 0a4b0318d1..4cbe980477 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -84,7 +84,7 @@ public Mono execute(final AsyncReadOperation operation, final ReadPref return Mono.from(subscriber -> clientSessionHelper.withClientSession(session, this) .map(clientSession -> getReadWriteBinding(getContext(subscriber), - readPreference, readConcern, clientSession, session == null)) + readPreference, readConcern, clientSession, session == null, operation.getCommandName())) .flatMap(binding -> { if (session != null && session.hasActiveTransaction() && !binding.getReadPreference().equals(primary())) { binding.release(); @@ -119,7 +119,7 @@ public Mono execute(final AsyncWriteOperation operation, final ReadCon return Mono.from(subscriber -> clientSessionHelper.withClientSession(session, this) .map(clientSession -> getReadWriteBinding(getContext(subscriber), - primary(), readConcern, clientSession, session == null)) + primary(), readConcern, clientSession, session == null, operation.getCommandName())) .flatMap(binding -> Mono.create(sink -> operation.executeAsync(binding, (result, t) -> { try { @@ -176,11 +176,11 @@ private void unpinServerAddressOnTransientTransactionError(@Nullable final Clien private AsyncReadWriteBinding getReadWriteBinding(final RequestContext requestContext, final ReadPreference readPreference, final ReadConcern readConcern, final ClientSession session, - final boolean ownsSession) { + final boolean ownsSession, final String commandName) { notNull("readPreference", readPreference); AsyncClusterAwareReadWriteBinding readWriteBinding = new AsyncClusterBinding(mongoClient.getCluster(), getReadPreferenceForBinding(readPreference, session), readConcern, - getOperationContext(requestContext, session, readConcern)); + getOperationContext(requestContext, session, readConcern, commandName)); Crypt crypt = mongoClient.getCrypt(); if (crypt != null) { @@ -196,12 +196,13 @@ private AsyncReadWriteBinding getReadWriteBinding(final RequestContext requestCo } private OperationContext getOperationContext(final RequestContext requestContext, final ClientSession session, - final ReadConcern readConcern) { + final ReadConcern readConcern, final String commandName) { return new OperationContext( requestContext, new ReadConcernAwareNoOpSessionContext(readConcern), createTimeoutContext(session, timeoutSettings), - mongoClient.getSettings().getServerApi()); + mongoClient.getSettings().getServerApi(), + commandName); } private ReadPreference getReadPreferenceForBinding(final ReadPreference readPreference, @Nullable final ClientSession session) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java index 17a54c345a..8352b5fe22 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java @@ -39,6 +39,11 @@ public AsyncReadOperation> getCursorReadOperation() { return cursorReadOperation; } + @Override + public String getCommandName() { + return readOperation.getCommandName(); + } + @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { readOperation.executeAsync(binding, (result, t) -> { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java index bde5811a71..e0f812f057 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java @@ -33,6 +33,11 @@ class VoidWriteOperationThenCursorReadOperation implements AsyncReadOperation this.cursorReadOperation = cursorReadOperation; } + @Override + public String getCommandName() { + return writeOperation.getCommandName(); + } + @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { writeOperation.executeAsync((AsyncWriteBinding) binding, (result, t) -> { diff --git a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java index 8a0107aafe..541bd9d351 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java @@ -232,6 +232,11 @@ ReadOperation> getOperation() { this.operation = operation; } + @Override + public String getCommandName() { + return operation.getCommandName(); + } + @Override public BatchCursor execute(final ReadBinding binding) { return operation.execute(binding); diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java index 9c0033e42a..0430d9407c 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java @@ -415,7 +415,8 @@ public T execute(final ReadOperation operation, final ReadPreference read } ClientSession actualClientSession = getClientSession(session); - ReadBinding binding = getReadBinding(readPreference, readConcern, actualClientSession, session == null); + ReadBinding binding = getReadBinding(readPreference, readConcern, actualClientSession, session == null, + operation.getCommandName()); try { if (actualClientSession.hasActiveTransaction() && !binding.getReadPreference().equals(primary())) { @@ -440,7 +441,7 @@ public T execute(final WriteOperation operation, final ReadConcern readCo } ClientSession actualClientSession = getClientSession(session); - WriteBinding binding = getWriteBinding(readConcern, actualClientSession, session == null); + WriteBinding binding = getWriteBinding(readConcern, actualClientSession, session == null, operation.getCommandName()); try { return operation.execute(binding); @@ -467,20 +468,23 @@ public TimeoutSettings getTimeoutSettings() { return executorTimeoutSettings; } - WriteBinding getWriteBinding(final ReadConcern readConcern, final ClientSession session, final boolean ownsSession) { - return getReadWriteBinding(primary(), readConcern, session, ownsSession); + WriteBinding getWriteBinding(final ReadConcern readConcern, final ClientSession session, final boolean ownsSession, + final String commandName) { + return getReadWriteBinding(primary(), readConcern, session, ownsSession, commandName); } ReadBinding getReadBinding(final ReadPreference readPreference, final ReadConcern readConcern, final ClientSession session, - final boolean ownsSession) { - return getReadWriteBinding(readPreference, readConcern, session, ownsSession); + final boolean ownsSession, final String commandName) { + return getReadWriteBinding(readPreference, readConcern, session, ownsSession, commandName); } ReadWriteBinding getReadWriteBinding(final ReadPreference readPreference, - final ReadConcern readConcern, final ClientSession session, final boolean ownsSession) { + final ReadConcern readConcern, final ClientSession session, final boolean ownsSession, + final String commandName) { ClusterAwareReadWriteBinding readWriteBinding = new ClusterBinding(cluster, - getReadPreferenceForBinding(readPreference, session), readConcern, getOperationContext(session, readConcern)); + getReadPreferenceForBinding(readPreference, session), readConcern, + getOperationContext(session, readConcern, commandName)); if (crypt != null) { readWriteBinding = new CryptBinding(readWriteBinding, crypt); @@ -489,12 +493,13 @@ ReadWriteBinding getReadWriteBinding(final ReadPreference readPreference, return new ClientSessionBinding(session, ownsSession, readWriteBinding); } - private OperationContext getOperationContext(final ClientSession session, final ReadConcern readConcern) { + private OperationContext getOperationContext(final ClientSession session, final ReadConcern readConcern, final String commandName) { return new OperationContext( getRequestContext(), new ReadConcernAwareNoOpSessionContext(readConcern), createTimeoutContext(session, executorTimeoutSettings), - serverApi); + serverApi, + commandName); } private RequestContext getRequestContext() { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 0be87ee341..e067e36d99 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -91,7 +91,6 @@ import static com.mongodb.client.unified.UnifiedTestModifications.testDef; import static java.lang.String.format; import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -374,9 +373,7 @@ public void shouldPassAllOutcomes( } if (definition.containsKey("expectLogMessages")) { - ArrayList tweaks = new ArrayList<>(singletonList( - // `LogMessage.Entry.Name.OPERATION` is not supported, therefore we skip matching its value - LogMatcher.Tweak.skip(LogMessage.Entry.Name.OPERATION))); + ArrayList tweaks = new ArrayList<>(); if (getMongoClientSettings().getClusterSettings() .getHosts().stream().anyMatch(serverAddress -> serverAddress instanceof UnixServerAddress)) { tweaks.add(LogMatcher.Tweak.skip(LogMessage.Entry.Name.SERVER_PORT)); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 8f43b58b7d..3cf703d664 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -104,14 +104,12 @@ public static void applyCustomizations(final TestDef def) { .test("atlas-data-lake-testing", "getMore", "A successful find event with getMore"); // connection-monitoring-and-pooling - - // TODO-JAVA-5711 reason, jira - // added as part of https://jira.mongodb.org/browse/JAVA-4976 , but unknown Jira to complete - // The implementation of the functionality related to clearing the connection pool before closing the connection - // will be carried out once the specification is finalized and ready. - def.skipUnknownReason("") + def.skipNoncompliant("According to the test, we should clear the pool then close the connection. Our implementation" + + "immediately closes the failed connection, then clears the pool.") .test("connection-monitoring-and-pooling/tests/logging", "connection-logging", "Connection checkout fails due to error establishing connection"); - def.skipUnknownReason("") + + + def.skipNoncompliant("Driver does not support waitQueueSize or waitQueueMultiple options") .test("connection-monitoring-and-pooling/tests/logging", "connection-pool-options", "waitQueueSize should be included in connection pool created message when specified") .test("connection-monitoring-and-pooling/tests/logging", "connection-pool-options", "waitQueueMultiple should be included in connection pool created message when specified");