From 03e860b5f258d619a8c139aacb8f02b078d54a5b Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 7 Jul 2025 10:29:13 +0100 Subject: [PATCH 1/4] Added operation (aka command name) to structured logging JAVA-5197 --- .../internal/connection/BaseCluster.java | 66 +++++++------------ .../connection/LoadBalancedCluster.java | 16 ++--- .../internal/connection/OperationContext.java | 30 +++++++-- .../mongodb/internal/logging/LogMessage.java | 3 - .../operation/AbortTransactionOperation.java | 5 +- .../operation/AggregateOperation.java | 10 ++- .../operation/AggregateOperationImpl.java | 8 ++- .../AggregateToCollectionOperation.java | 10 ++- .../operation/AsyncReadOperation.java | 5 ++ .../operation/AsyncWriteOperation.java | 5 ++ .../operation/BaseFindAndModifyOperation.java | 9 ++- .../operation/ChangeStreamOperation.java | 5 ++ .../operation/ClientBulkWriteOperation.java | 7 +- .../operation/CommandReadOperation.java | 14 +++- .../operation/CommitTransactionOperation.java | 6 +- .../operation/CountDocumentsOperation.java | 6 ++ .../internal/operation/CountOperation.java | 8 ++- .../operation/CreateCollectionOperation.java | 5 ++ .../operation/CreateIndexesOperation.java | 8 ++- .../CreateSearchIndexesOperation.java | 7 +- .../operation/CreateViewOperation.java | 5 ++ .../internal/operation/DistinctOperation.java | 8 ++- .../operation/DropCollectionOperation.java | 5 ++ .../operation/DropDatabaseOperation.java | 5 ++ .../operation/DropIndexOperation.java | 8 ++- .../operation/DropSearchIndexOperation.java | 7 +- .../EstimatedDocumentCountOperation.java | 8 ++- .../operation/FindAndDeleteOperation.java | 1 - .../internal/operation/FindOperation.java | 14 ++-- .../operation/ListCollectionsOperation.java | 8 ++- .../operation/ListDatabasesOperation.java | 8 ++- .../operation/ListIndexesOperation.java | 10 ++- .../operation/ListSearchIndexesOperation.java | 6 ++ .../MapReduceToCollectionOperation.java | 8 ++- .../MapReduceWithInlineResultsOperation.java | 14 ++-- .../operation/MixedBulkWriteOperation.java | 15 ++++- .../internal/operation/ReadOperation.java | 5 ++ .../operation/RenameCollectionOperation.java | 8 ++- .../operation/TransactionOperation.java | 9 +-- .../UpdateSearchIndexesOperation.java | 7 +- .../internal/operation/WriteOperation.java | 5 ++ .../com/mongodb/ClusterFixture.java | 6 +- .../LegacyMixedBulkWriteOperation.java | 25 ++++--- .../internal/MapReducePublisherImpl.java | 10 +++ .../internal/OperationExecutorImpl.java | 13 ++-- ...dReadOperationThenCursorReadOperation.java | 5 ++ ...WriteOperationThenCursorReadOperation.java | 5 ++ .../internal/MapReduceIterableImpl.java | 5 ++ .../client/internal/MongoClusterImpl.java | 25 ++++--- .../mongodb/client/unified/UnifiedTest.java | 5 +- .../unified/UnifiedTestModifications.java | 12 ++-- 51 files changed, 352 insertions(+), 156 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index fb840d9ad08..f4ec77a3af3 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -145,7 +145,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera ServerDeprioritization serverDeprioritization = operationContext.getServerDeprioritization(); boolean selectionWaitingLogged = false; Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout(); - logServerSelectionStarted(clusterId, operationContext.getId(), serverSelector, description); + logServerSelectionStarted(operationContext, clusterId, serverSelector, description); while (true) { CountDownLatch currentPhaseLatch = phase.get(); ClusterDescription currentDescription = description; @@ -154,16 +154,11 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera computedServerSelectionTimeout, operationContext.getTimeoutContext()); if (!currentDescription.isCompatibleWithDriver()) { - logAndThrowIncompatibleException(operationContext.getId(), serverSelector, currentDescription); + logAndThrowIncompatibleException(operationContext, serverSelector, currentDescription); } if (serverTuple != null) { ServerAddress serverAddress = serverTuple.getServerDescription().getAddress(); - logServerSelectionSucceeded( - clusterId, - operationContext.getId(), - serverAddress, - serverSelector, - currentDescription); + logServerSelectionSucceeded(operationContext, clusterId, serverAddress, serverSelector, currentDescription); serverDeprioritization.updateCandidate(serverAddress); return serverTuple; } @@ -171,7 +166,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera logAndThrowTimeoutException(operationContext, serverSelector, currentDescription)); if (!selectionWaitingLogged) { - logServerSelectionWaiting(clusterId, operationContext.getId(), computedServerSelectionTimeout, serverSelector, currentDescription); + logServerSelectionWaiting(operationContext, clusterId, computedServerSelectionTimeout, serverSelector, currentDescription); selectionWaitingLogged = true; } connect(); @@ -197,11 +192,7 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati CountDownLatch currentPhase = phase.get(); ClusterDescription currentDescription = description; - logServerSelectionStarted( - clusterId, - operationContext.getId(), - serverSelector, - currentDescription); + logServerSelectionStarted(operationContext, clusterId, serverSelector, currentDescription); if (!handleServerSelectionRequest(request, currentPhase, currentDescription)) { notifyWaitQueueHandler(request); @@ -290,12 +281,11 @@ private boolean handleServerSelectionRequest( try { OperationContext operationContext = request.getOperationContext(); - long operationId = operationContext.getId(); if (currentPhase != request.phase) { CountDownLatch prevPhase = request.phase; request.phase = currentPhase; if (!description.isCompatibleWithDriver()) { - logAndThrowIncompatibleException(operationId, request.originalSelector, description); + logAndThrowIncompatibleException(operationContext, request.originalSelector, description); } @@ -309,23 +299,13 @@ private boolean handleServerSelectionRequest( if (serverTuple != null) { ServerAddress serverAddress = serverTuple.getServerDescription().getAddress(); - logServerSelectionSucceeded( - clusterId, - operationId, - serverAddress, - request.originalSelector, - description); + logServerSelectionSucceeded(operationContext, clusterId, serverAddress, request.originalSelector, description); serverDeprioritization.updateCandidate(serverAddress); request.onResult(serverTuple, null); return true; } if (prevPhase == null) { - logServerSelectionWaiting( - clusterId, - operationId, - request.getTimeout(), - request.originalSelector, - description); + logServerSelectionWaiting(operationContext, clusterId, request.getTimeout(), request.originalSelector, description); } } @@ -410,11 +390,11 @@ protected ClusterableServer createServer(final ServerAddress serverAddress) { } private void logAndThrowIncompatibleException( - final long operationId, + final OperationContext operationContext, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { MongoIncompatibleDriverException exception = createIncompatibleException(clusterDescription); - logServerSelectionFailed(clusterId, operationId, exception, serverSelector, clusterDescription); + logServerSelectionFailed(operationContext, clusterId, exception, serverSelector, clusterDescription); throw exception; } @@ -448,7 +428,7 @@ private void logAndThrowTimeoutException( MongoTimeoutException exception = operationContext.getTimeoutContext().hasTimeoutMS() ? new MongoOperationTimeoutException(message) : new MongoTimeoutException(message); - logServerSelectionFailed(clusterId, operationContext.getId(), exception, serverSelector, clusterDescription); + logServerSelectionFailed(operationContext, clusterId, exception, serverSelector, clusterDescription); throw exception; } @@ -557,16 +537,16 @@ public void run() { } static void logServerSelectionStarted( + final OperationContext operationContext, final ClusterId clusterId, - final long operationId, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { if (STRUCTURED_LOGGER.isRequired(DEBUG, clusterId)) { STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection started", clusterId, asList( - new Entry(OPERATION, null), - new Entry(OPERATION_ID, operationId), + new Entry(OPERATION, operationContext.getCommandName()), + new Entry(OPERATION_ID, operationContext.getId()), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), "Server selection started for operation[ {}] with ID {}. Selector: {}, topology description: {}")); @@ -574,8 +554,8 @@ static void logServerSelectionStarted( } private static void logServerSelectionWaiting( + final OperationContext operationContext, final ClusterId clusterId, - final long operationId, final Timeout timeout, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { @@ -583,8 +563,8 @@ private static void logServerSelectionWaiting( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, INFO, "Waiting for suitable server to become available", clusterId, asList( - new Entry(OPERATION, null), - new Entry(OPERATION_ID, operationId), + new Entry(OPERATION, operationContext.getCommandName()), + new Entry(OPERATION_ID, operationContext.getId()), timeout.call(MILLISECONDS, () -> new Entry(REMAINING_TIME_MS, "infinite"), (ms) -> new Entry(REMAINING_TIME_MS, ms), @@ -597,8 +577,8 @@ private static void logServerSelectionWaiting( } private static void logServerSelectionFailed( + final OperationContext operationContext, final ClusterId clusterId, - final long operationId, final MongoException failure, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { @@ -612,8 +592,8 @@ private static void logServerSelectionFailed( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection failed", clusterId, asList( - new Entry(OPERATION, null), - new Entry(OPERATION_ID, operationId), + new Entry(OPERATION, operationContext.getCommandName()), + new Entry(OPERATION_ID, operationContext.getId()), new Entry(FAILURE, failureDescription), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), @@ -622,8 +602,8 @@ private static void logServerSelectionFailed( } static void logServerSelectionSucceeded( + final OperationContext operationContext, final ClusterId clusterId, - final long operationId, final ServerAddress serverAddress, final ServerSelector serverSelector, final ClusterDescription clusterDescription) { @@ -631,8 +611,8 @@ static void logServerSelectionSucceeded( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection succeeded", clusterId, asList( - new Entry(OPERATION, null), - new Entry(OPERATION_ID, operationId), + new Entry(OPERATION, operationContext.getCommandName()), + new Entry(OPERATION_ID, operationContext.getId()), new Entry(SERVER_HOST, serverAddress.getHost()), new Entry(SERVER_PORT, serverAddress instanceof UnixServerAddress ? null : serverAddress.getPort()), new Entry(SELECTOR, serverSelector.toString()), diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index 588bd9f6092..2129f28ca33 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -222,9 +222,9 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera throw createResolvedToMultipleHostsException(); } ClusterDescription curDescription = description; - logServerSelectionStarted(clusterId, operationContext.getId(), serverSelector, curDescription); + logServerSelectionStarted(operationContext, clusterId, serverSelector, curDescription); ServerTuple serverTuple = new ServerTuple(assertNotNull(server), curDescription.getServerDescriptions().get(0)); - logServerSelectionSucceeded(clusterId, operationContext.getId(), serverTuple.getServerDescription().getAddress(), + logServerSelectionSucceeded(operationContext, clusterId, serverTuple.getServerDescription().getAddress(), serverSelector, curDescription); return serverTuple; } @@ -254,8 +254,8 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati return; } Timeout computedServerSelectionTimeout = operationContext.getTimeoutContext().computeServerSelectionTimeout(); - ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(operationContext.getId(), serverSelector, - operationContext, computedServerSelectionTimeout, callback); + ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(serverSelector, operationContext, + computedServerSelectionTimeout, callback); if (initializationCompleted) { handleServerSelectionRequest(serverSelectionRequest); } else { @@ -309,9 +309,9 @@ private void handleServerSelectionRequest(final ServerSelectionRequest serverSel } else { ClusterDescription curDescription = description; logServerSelectionStarted( - clusterId, serverSelectionRequest.operationId, serverSelectionRequest.serverSelector, curDescription); + serverSelectionRequest.operationContext, clusterId, serverSelectionRequest.serverSelector, curDescription); ServerTuple serverTuple = new ServerTuple(assertNotNull(server), curDescription.getServerDescriptions().get(0)); - logServerSelectionSucceeded(clusterId, serverSelectionRequest.operationId, + logServerSelectionSucceeded(serverSelectionRequest.operationContext, clusterId, serverTuple.getServerDescription().getAddress(), serverSelectionRequest.serverSelector, curDescription); serverSelectionRequest.onSuccess(serverTuple); } @@ -416,15 +416,13 @@ public void run() { } private static final class ServerSelectionRequest { - private final long operationId; private final ServerSelector serverSelector; private final SingleResultCallback callback; private final Timeout timeout; private final OperationContext operationContext; - private ServerSelectionRequest(final long operationId, final ServerSelector serverSelector, final OperationContext operationContext, + private ServerSelectionRequest(final ServerSelector serverSelector, final OperationContext operationContext, final Timeout timeout, final SingleResultCallback callback) { - this.operationId = operationId; this.serverSelector = serverSelector; this.timeout = timeout; this.operationContext = operationContext; diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java index bf29ebc051b..cbeff997ad6 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java +++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java @@ -49,10 +49,17 @@ public class OperationContext { private final TimeoutContext timeoutContext; @Nullable private final ServerApi serverApi; + @Nullable + private final String commandName; public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, @Nullable final ServerApi serverApi) { - this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi); + this(requestContext, sessionContext, timeoutContext, serverApi, null); + } + + public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, + @Nullable final ServerApi serverApi, @Nullable final String commandName) { + this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, commandName); } public static OperationContext simpleOperationContext( @@ -61,7 +68,8 @@ public static OperationContext simpleOperationContext( IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, new TimeoutContext(timeoutSettings), - serverApi); + serverApi, + null); } public static OperationContext simpleOperationContext(final TimeoutContext timeoutContext) { @@ -69,15 +77,16 @@ public static OperationContext simpleOperationContext(final TimeoutContext timeo IgnorableRequestContext.INSTANCE, NoOpSessionContext.INSTANCE, timeoutContext, + null, null); } public OperationContext withSessionContext(final SessionContext sessionContext) { - return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi); + return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, commandName); } public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) { - return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi); + return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, commandName); } public long getId() { @@ -101,19 +110,26 @@ public ServerApi getServerApi() { return serverApi; } + @Nullable + public String getCommandName() { + return commandName; + } + @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) public OperationContext(final long id, final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, final ServerDeprioritization serverDeprioritization, - @Nullable final ServerApi serverApi) { + @Nullable final ServerApi serverApi, + @Nullable final String commandName) { this.id = id; this.serverDeprioritization = serverDeprioritization; this.requestContext = requestContext; this.sessionContext = sessionContext; this.timeoutContext = timeoutContext; this.serverApi = serverApi; + this.commandName = commandName; } @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) @@ -121,13 +137,15 @@ public OperationContext(final long id, final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, - @Nullable final ServerApi serverApi) { + @Nullable final ServerApi serverApi, + @Nullable final String commandName) { this.id = id; this.serverDeprioritization = new ServerDeprioritization(); this.requestContext = requestContext; this.sessionContext = sessionContext; this.timeoutContext = timeoutContext; this.serverApi = serverApi; + this.commandName = commandName; } diff --git a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java index ec769e4f7a6..eef275faf7f 100644 --- a/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java +++ b/driver-core/src/main/com/mongodb/internal/logging/LogMessage.java @@ -104,9 +104,6 @@ public enum Name { COMMAND_NAME("commandName"), REQUEST_ID("requestId"), OPERATION_ID("operationId"), - /** - * Not supported. - */ OPERATION("operation"), AWAITED("awaited"), SERVICE_ID("serviceId"), diff --git a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java index bbd7ce7300e..bc7e6655bc7 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java @@ -31,6 +31,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class AbortTransactionOperation extends TransactionOperation { + private static final String COMMAND_NAME = "abortTransaction"; private BsonDocument recoveryToken; public AbortTransactionOperation(final WriteConcern writeConcern) { @@ -43,8 +44,8 @@ public AbortTransactionOperation recoveryToken(@Nullable final BsonDocument reco } @Override - protected String getCommandName() { - return "abortTransaction"; + public String getCommandName() { + return COMMAND_NAME; } @Override diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java index 1f25bc87bf9..f9f25cd5fe1 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java @@ -135,6 +135,11 @@ public AggregateOperation timeoutMode(@Nullable final TimeoutMode timeoutMode return this; } + @Override + public String getCommandName() { + return wrapped.getCommandName(); + } + @Override public BatchCursor execute(final ReadBinding binding) { return wrapped.execute(binding); @@ -145,17 +150,19 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb wrapped.executeAsync(binding, callback); } + @Override public ReadOperation asExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { return createExplainableOperation(verbosity, resultDecoder); } + @Override public AsyncReadOperation asAsyncExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { return createExplainableOperation(verbosity, resultDecoder); } CommandReadOperation createExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { - return new CommandReadOperation<>(getNamespace().getDatabaseName(), + return new CommandReadOperation<>(getNamespace().getDatabaseName(), wrapped.getCommandName(), (operationContext, serverDescription, connectionDescription) -> { BsonDocument command = wrapped.getCommand(operationContext, UNKNOWN_WIRE_VERSION); applyMaxTimeMS(operationContext.getTimeoutContext(), command); @@ -166,5 +173,4 @@ CommandReadOperation createExplainableOperation(@Nullable final ExplainVe MongoNamespace getNamespace() { return wrapped.getNamespace(); } - } diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java index 7ba2c56b874..3a650fc2f9b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java @@ -53,6 +53,7 @@ import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; class AggregateOperationImpl implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "aggregate"; private static final String RESULT = "result"; private static final String CURSOR = "cursor"; private static final String FIRST_BATCH = "firstBatch"; @@ -185,6 +186,11 @@ AggregateOperationImpl hint(@Nullable final BsonValue hint) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { return executeRetryableRead(binding, namespace.getDatabaseName(), @@ -207,7 +213,7 @@ private CommandCreator getCommandCreator() { } BsonDocument getCommand(final OperationContext operationContext, final int maxWireVersion) { - BsonDocument commandDocument = new BsonDocument("aggregate", aggregateTarget.create()); + BsonDocument commandDocument = new BsonDocument(getCommandName(), aggregateTarget.create()); appendReadConcernToCommand(operationContext.getSessionContext(), maxWireVersion, commandDocument); commandDocument.put("pipeline", pipelineCreator.create()); setNonTailableCursorMaxTimeSupplier(timeoutMode, operationContext); diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java index 904f85042ac..022c00383bc 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java @@ -54,7 +54,8 @@ * *

This class is not part of the public API and may be removed or changed at any time

*/ -public class AggregateToCollectionOperation implements AsyncReadOperation, ReadOperation { +public class AggregateToCollectionOperation implements ReadOperation, AsyncReadOperation { + private static final String COMMAND_NAME = "aggregate"; private final MongoNamespace namespace; private final List pipeline; private final WriteConcern writeConcern; @@ -151,6 +152,11 @@ public AggregateToCollectionOperation timeoutMode(@Nullable final TimeoutMode ti return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Void execute(final ReadBinding binding) { return executeRetryableRead(binding, @@ -183,7 +189,7 @@ private CommandOperationHelper.CommandCreator getCommandCreator() { BsonValue aggregationTarget = (aggregationLevel == AggregationLevel.DATABASE) ? new BsonInt32(1) : new BsonString(namespace.getCollectionName()); - BsonDocument commandDocument = new BsonDocument("aggregate", aggregationTarget); + BsonDocument commandDocument = new BsonDocument(getCommandName(), aggregationTarget); commandDocument.put("pipeline", new BsonArray(pipeline)); if (allowDiskUse != null) { commandDocument.put("allowDiskUse", BsonBoolean.valueOf(allowDiskUse)); diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncReadOperation.java index 75b18f5cb00..3c9cf2117ed 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncReadOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncReadOperation.java @@ -28,6 +28,11 @@ */ public interface AsyncReadOperation { + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + /** * General execute which can return anything of type T * diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteOperation.java index 334c3bde8ac..ca6f5f910a5 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncWriteOperation.java @@ -28,6 +28,11 @@ */ public interface AsyncWriteOperation { + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + /** * General execute which can return anything of type T * diff --git a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java index e523ee3f389..c1fc6adc9f3 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java @@ -46,6 +46,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public abstract class BaseFindAndModifyOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "findAndModify"; private final MongoNamespace namespace; private final WriteConcern writeConcern; private final boolean retryWrites; @@ -68,6 +69,12 @@ protected BaseFindAndModifyOperation(final MongoNamespace namespace, final Write this.decoder = notNull("decoder", decoder); } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + + @Override public T execute(final WriteBinding binding) { return executeRetryableWrite(binding, getDatabaseName(), null, getFieldNameValidator(), @@ -184,7 +191,7 @@ private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { SessionContext sessionContext = operationContext.getSessionContext(); - BsonDocument commandDocument = new BsonDocument("findAndModify", new BsonString(getNamespace().getCollectionName())); + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(getNamespace().getCollectionName())); putIfNotNull(commandDocument, "query", getFilter()); putIfNotNull(commandDocument, "fields", getProjection()); putIfNotNull(commandDocument, "sort", getSort()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 4ef28c796cb..84d5513dd69 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -192,6 +192,11 @@ private AggregateOperationImpl getAggregateOperation(final Time return wrapped; } + @Override + public String getCommandName() { + return wrapped.getCommandName(); + } + @Override public BatchCursor execute(final ReadBinding binding) { TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java index f6ff7632c8f..b2a3c93e4d5 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ClientBulkWriteOperation.java @@ -146,7 +146,7 @@ /** * This class is not part of the public API and may be removed or changed at any time. */ -public final class ClientBulkWriteOperation implements WriteOperation, AsyncWriteOperation { +public final class ClientBulkWriteOperation implements AsyncWriteOperation, WriteOperation { private static final ConcreteClientBulkWriteOptions EMPTY_OPTIONS = new ConcreteClientBulkWriteOptions(); private static final String BULK_WRITE_COMMAND_NAME = "bulkWrite"; private static final EncoderContext DEFAULT_ENCODER_CONTEXT = EncoderContext.builder().build(); @@ -177,6 +177,11 @@ public ClientBulkWriteOperation( this.codecRegistry = codecRegistry; } + @Override + public String getCommandName() { + return "bulkWrite"; + } + @Override public ClientBulkWriteResult execute(final WriteBinding binding) throws ClientBulkWriteException { WriteConcern effectiveWriteConcern = validateAndGetEffectiveWriteConcern(binding.getOperationContext().getSessionContext()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java index ea89dfb303e..1e395315c24 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java @@ -33,20 +33,28 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CommandReadOperation implements AsyncReadOperation, ReadOperation { + private final String commandName; private final String databaseName; private final CommandCreator commandCreator; private final Decoder decoder; - public CommandReadOperation(final String databaseName, final BsonDocument command, final Decoder decoder) { - this(databaseName, (operationContext, serverDescription, connectionDescription) -> command, decoder); + public CommandReadOperation(final String databaseName, final BsonDocument command, final Decoder decoder) { + this(databaseName, command.getFirstKey(), (operationContext, serverDescription, connectionDescription) -> command, decoder); } - public CommandReadOperation(final String databaseName, final CommandCreator commandCreator, final Decoder decoder) { + public CommandReadOperation(final String databaseName, final String commandName, final CommandCreator commandCreator, + final Decoder decoder) { + this.commandName = notNull("commandName", commandName); this.databaseName = notNull("databaseName", databaseName); this.commandCreator = notNull("commandCreator", commandCreator); this.decoder = notNull("decoder", decoder); } + @Override + public String getCommandName() { + return commandName; + } + @Override public T execute(final ReadBinding binding) { return executeRetryableRead(binding, databaseName, commandCreator, decoder, diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java index 6c2338d47de..998a002f348 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java @@ -46,6 +46,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CommitTransactionOperation extends TransactionOperation { + private static final String COMMAND_NAME = "commitTransaction"; private final boolean alreadyCommitted; private BsonDocument recoveryToken; @@ -110,10 +111,9 @@ private static boolean shouldAddUnknownTransactionCommitResultLabel(final MongoE return false; } - @Override - protected String getCommandName() { - return "commitTransaction"; + public String getCommandName() { + return COMMAND_NAME; } @Override diff --git a/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java index 1095dd44508..6789adb093c 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java @@ -38,6 +38,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CountDocumentsOperation implements AsyncReadOperation, ReadOperation { + private static final String COMMAND_NAME = "aggregate"; private static final Decoder DECODER = new BsonDocumentCodec(); private final MongoNamespace namespace; private boolean retryReads; @@ -119,6 +120,11 @@ public CountDocumentsOperation comment(@Nullable final BsonValue comment) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Long execute(final ReadBinding binding) { try (BatchCursor cursor = getAggregateOperation().execute(binding)) { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java index f9aa0a8eaa2..23dfe4b52e8 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java @@ -42,6 +42,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CountOperation implements AsyncReadOperation, ReadOperation { + private static final String COMMAND_NAME = "count"; private static final Decoder DECODER = new BsonDocumentCodec(); private final MongoNamespace namespace; private boolean retryReads; @@ -109,6 +110,11 @@ public CountOperation collation(@Nullable final Collation collation) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Long execute(final ReadBinding binding) { return executeRetryableRead(binding, namespace.getDatabaseName(), @@ -131,7 +137,7 @@ private CommandReadTransformerAsync asyncTransformer() { private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument document = new BsonDocument("count", new BsonString(namespace.getCollectionName())); + BsonDocument document = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); appendReadConcernToCommand(operationContext.getSessionContext(), connectionDescription.getMaxWireVersion(), document); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java index d9a11d20287..582a622d21b 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java @@ -231,6 +231,11 @@ public CreateCollectionOperation encryptedFields(@Nullable final BsonDocument en return this; } + @Override + public String getCommandName() { + return "createCollection"; + } + @Override public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java index 76de0757ff1..34eaea3713d 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java @@ -58,6 +58,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class CreateIndexesOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "createIndexes"; private final MongoNamespace namespace; private final List requests; private final WriteConcern writeConcern; @@ -99,6 +100,11 @@ public CreateIndexesOperation commitQuorum(@Nullable final CreateIndexCommitQuor return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Void execute(final WriteBinding binding) { try { @@ -189,7 +195,7 @@ private BsonDocument getIndex(final IndexRequest request) { private CommandOperationHelper.CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument command = new BsonDocument("createIndexes", new BsonString(namespace.getCollectionName())); + BsonDocument command = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); List values = new ArrayList<>(); for (IndexRequest request : requests) { values.add(getIndex(request)); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateSearchIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateSearchIndexesOperation.java index a57087e9217..bf75ee88b0d 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateSearchIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateSearchIndexesOperation.java @@ -41,6 +41,11 @@ public CreateSearchIndexesOperation(final MongoNamespace namespace, final List requests) { return requests.stream() .map(CreateSearchIndexesOperation::convert) @@ -63,7 +68,7 @@ private static BsonDocument convert(final SearchIndexRequest request) { @Override BsonDocument buildCommand() { - return new BsonDocument(COMMAND_NAME, new BsonString(getNamespace().getCollectionName())) + return new BsonDocument(getCommandName(), new BsonString(getNamespace().getCollectionName())) .append("indexes", convert(indexRequests)); } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java index 3636db08593..26ece818ec6 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java @@ -123,6 +123,11 @@ public CreateViewOperation collation(@Nullable final Collation collation) { return this; } + @Override + public String getCommandName() { + return "createView"; + } + @Override public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> { diff --git a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java index 547e5f0dfc1..6fe02f7ac08 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java @@ -46,6 +46,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class DistinctOperation implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "distinct"; private static final String VALUES = "values"; private final MongoNamespace namespace; private final String fieldName; @@ -107,6 +108,11 @@ public DistinctOperation hint(@Nullable final BsonValue hint) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { return executeRetryableRead(binding, namespace.getDatabaseName(), getCommandCreator(), createCommandDecoder(), @@ -126,7 +132,7 @@ private Codec createCommandDecoder() { private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("distinct", new BsonString(namespace.getCollectionName())); + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); appendReadConcernToCommand(operationContext.getSessionContext(), connectionDescription.getMaxWireVersion(), commandDocument); commandDocument.put("key", new BsonString(fieldName)); putIfNotNull(commandDocument, "query", filter); diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java index d879f83e542..bf9ac326376 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java @@ -86,6 +86,11 @@ public DropCollectionOperation autoEncryptedFields(final boolean autoEncryptedFi return this; } + @Override + public String getCommandName() { + return "dropCollection"; + } + @Override public Void execute(final WriteBinding binding) { BsonDocument localEncryptedFields = getEncryptedFields((ReadWriteBinding) binding); diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java index 9dd942cb726..8900d112bb8 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java @@ -55,6 +55,11 @@ public WriteConcern getWriteConcern() { return writeConcern; } + @Override + public String getCommandName() { + return "dropDatabase"; + } + @Override public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> { diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java index e66a4e10bbf..81fcf5129e7 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java @@ -41,6 +41,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class DropIndexOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "dropIndexes"; private final MongoNamespace namespace; private final String indexName; private final BsonDocument indexKeys; @@ -64,6 +65,11 @@ public WriteConcern getWriteConcern() { return writeConcern; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Void execute(final WriteBinding binding) { try { @@ -90,7 +96,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall private CommandOperationHelper.CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument command = new BsonDocument("dropIndexes", new BsonString(namespace.getCollectionName())); + BsonDocument command = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); if (indexName != null) { command.put("index", new BsonString(indexName)); } else { diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropSearchIndexOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropSearchIndexOperation.java index 657dedca942..a440dbd0e7e 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropSearchIndexOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropSearchIndexOperation.java @@ -37,6 +37,11 @@ final class DropSearchIndexOperation extends AbstractWriteSearchIndexOperation { this.indexName = indexName; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override void swallowOrThrow(@Nullable final E mongoExecutionException) throws E { if (mongoExecutionException != null && !isNamespaceError(mongoExecutionException)) { @@ -46,7 +51,7 @@ void swallowOrThrow(@Nullable final E mongoExecutionExcept @Override BsonDocument buildCommand() { - return new BsonDocument(COMMAND_NAME, new BsonString(getNamespace().getCollectionName())) + return new BsonDocument(getCommandName(), new BsonString(getNamespace().getCollectionName())) .append("name", new BsonString(indexName)); } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java b/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java index 17f7e617405..1d8ddd429ea 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java @@ -45,6 +45,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class EstimatedDocumentCountOperation implements AsyncReadOperation, ReadOperation { + private static final String COMMAND_NAME = "count"; private static final Decoder DECODER = new BsonDocumentCodec(); private final MongoNamespace namespace; private boolean retryReads; @@ -69,6 +70,11 @@ public EstimatedDocumentCountOperation comment(@Nullable final BsonValue comment return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Long execute(final ReadBinding binding) { try { @@ -108,7 +114,7 @@ private long transformResult(final BsonDocument result, final ConnectionDescript private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument document = new BsonDocument("count", new BsonString(namespace.getCollectionName())); + BsonDocument document = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); appendReadConcernToCommand(operationContext.getSessionContext(), connectionDescription.getMaxWireVersion(), document); if (comment != null) { document.put("comment", comment); diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java index 373b17949dc..df200916152 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java @@ -95,5 +95,4 @@ protected FieldNameValidator getFieldNameValidator() { protected void specializeCommand(final BsonDocument commandDocument, final ConnectionDescription connectionDescription) { commandDocument.put("remove", BsonBoolean.TRUE); } - } diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java index 4f834bee349..ab37613db13 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java @@ -68,6 +68,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class FindOperation implements AsyncExplainableReadOperation>, ExplainableReadOperation> { + private static final String COMMAND_NAME = "find"; private static final String FIRST_BATCH = "firstBatch"; private final MongoNamespace namespace; @@ -284,6 +285,11 @@ public FindOperation allowDiskUse(@Nullable final Boolean allowDiskUse) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { IllegalStateException invalidTimeoutModeException = invalidTimeoutModeException(); @@ -352,11 +358,9 @@ private static SingleResultCallback exceptionTransformingCallback(final S } @Override - public ReadOperation asExplainableOperation(@Nullable final ExplainVerbosity verbosity, - final Decoder resultDecoder) { + public CommandReadOperation asExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { return createExplainableOperation(verbosity, resultDecoder); } - @Override public AsyncReadOperation asAsyncExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { @@ -364,7 +368,7 @@ public AsyncReadOperation asAsyncExplainableOperation(@Nullable final Exp } CommandReadOperation createExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) { - return new CommandReadOperation<>(getNamespace().getDatabaseName(), + return new CommandReadOperation<>(getNamespace().getDatabaseName(), getCommandName(), (operationContext, serverDescription, connectionDescription) -> { BsonDocument command = getCommand(operationContext, UNKNOWN_WIRE_VERSION); applyMaxTimeMS(operationContext.getTimeoutContext(), command); @@ -373,7 +377,7 @@ CommandReadOperation createExplainableOperation(@Nullable final ExplainVe } private BsonDocument getCommand(final OperationContext operationContext, final int maxWireVersion) { - BsonDocument commandDocument = new BsonDocument("find", new BsonString(namespace.getCollectionName())); + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())); appendReadConcernToCommand(operationContext.getSessionContext(), maxWireVersion, commandDocument); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java index 73abe905aea..cb20bbf897f 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java @@ -70,6 +70,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class ListCollectionsOperation implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "listCollections"; private final String databaseName; private final Decoder decoder; private boolean retryReads; @@ -157,6 +158,11 @@ public ListCollectionsOperation timeoutMode(@Nullable final TimeoutMode timeo return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext()); @@ -215,7 +221,7 @@ private CommandReadTransformerAsync> asyncTran private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("listCollections", new BsonInt32(1)) + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonInt32(1)) .append("cursor", getCursorDocumentFromBatchSize(batchSize == 0 ? null : batchSize)); putIfNotNull(commandDocument, "filter", filter); putIfTrue(commandDocument, "nameOnly", nameOnly); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java index 5f61c9192dd..ae05eb245b2 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java @@ -43,6 +43,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class ListDatabasesOperation implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "listDatabases"; private static final String DATABASES = "databases"; private final Decoder decoder; private boolean retryReads; @@ -101,6 +102,11 @@ public ListDatabasesOperation comment(@Nullable final BsonValue comment) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { return executeRetryableRead(binding, "admin", getCommandCreator(), CommandResultDocumentCodec.create(decoder, DATABASES), @@ -115,7 +121,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("listDatabases", new BsonInt32(1)); + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonInt32(1)); putIfNotNull(commandDocument, "filter", filter); putIfNotNull(commandDocument, "nameOnly", nameOnly); putIfNotNull(commandDocument, "authorizedDatabases", authorizedDatabasesOnly); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java index e540f752dbc..d52021b2dca 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java @@ -49,8 +49,8 @@ import static com.mongodb.internal.operation.CursorHelper.getCursorDocumentFromBatchSize; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.setNonTailableCursorMaxTimeSupplier; import static com.mongodb.internal.operation.OperationHelper.canRetryRead; +import static com.mongodb.internal.operation.OperationHelper.setNonTailableCursorMaxTimeSupplier; import static com.mongodb.internal.operation.SingleBatchCursor.createEmptySingleBatchCursor; import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; import static com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute; @@ -65,6 +65,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class ListIndexesOperation implements AsyncReadOperation>, ReadOperation> { + private static final String COMMAND_NAME = "listIndexes"; private final MongoNamespace namespace; private final Decoder decoder; private boolean retryReads; @@ -116,6 +117,11 @@ public ListIndexesOperation timeoutMode(@Nullable final TimeoutMode timeoutMo return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext()); @@ -165,7 +171,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("listIndexes", new BsonString(namespace.getCollectionName())) + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())) .append("cursor", getCursorDocumentFromBatchSize(batchSize == 0 ? null : batchSize)); setNonTailableCursorMaxTimeSupplier(timeoutMode, operationContext); putIfNotNull(commandDocument, "comment", comment); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java index 3dfde30511d..dd28e5f3e25 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java @@ -44,6 +44,7 @@ */ public final class ListSearchIndexesOperation implements AsyncExplainableReadOperation>, ExplainableReadOperation> { + private static final String COMMAND_NAME = "aggregate"; private static final String STAGE_LIST_SEARCH_INDEXES = "$listSearchIndexes"; private final MongoNamespace namespace; private final Decoder decoder; @@ -73,6 +74,11 @@ public ListSearchIndexesOperation(final MongoNamespace namespace, final Decoder< this.retryReads = retryReads; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public BatchCursor execute(final ReadBinding binding) { try { diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java index 327aa5e5fa7..79151c0fb6e 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java @@ -59,6 +59,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class MapReduceToCollectionOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "mapReduce"; private final MongoNamespace namespace; private final BsonJavaScript mapFunction; private final BsonJavaScript reduceFunction; @@ -208,6 +209,11 @@ public MapReduceToCollectionOperation collation(@Nullable final Collation collat return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public MapReduceStatistics execute(final WriteBinding binding) { return executeCommand(binding, namespace.getDatabaseName(), getCommandCreator(), transformer(binding @@ -243,7 +249,7 @@ public AsyncReadOperation asExplainableOperationAsync(final Explai } private CommandReadOperation createExplainableOperation(final ExplainVerbosity explainVerbosity) { - return new CommandReadOperation<>(getNamespace().getDatabaseName(), + return new CommandReadOperation<>(getNamespace().getDatabaseName(), getCommandName(), (operationContext, serverDescription, connectionDescription) -> { BsonDocument command = getCommandCreator().create(operationContext, serverDescription, connectionDescription); applyMaxTimeMS(operationContext.getTimeoutContext(), command); diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java index 273d8595ec8..76f3e674308 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java @@ -53,8 +53,9 @@ * *

This class is not part of the public API and may be removed or changed at any time

*/ -public class MapReduceWithInlineResultsOperation implements AsyncReadOperation>, - ReadOperation> { +public class MapReduceWithInlineResultsOperation implements ReadOperation>, + AsyncReadOperation> { + private static final String COMMAND_NAME = "mapReduce"; private final MongoNamespace namespace; private final BsonJavaScript mapFunction; private final BsonJavaScript reduceFunction; @@ -164,6 +165,11 @@ public MapReduceWithInlineResultsOperation collation(@Nullable final Collatio return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public MapReduceBatchCursor execute(final ReadBinding binding) { return executeRetryableRead(binding, namespace.getDatabaseName(), @@ -188,7 +194,7 @@ public AsyncReadOperation asExplainableOperationAsync(final Explai } private CommandReadOperation createExplainableOperation(final ExplainVerbosity explainVerbosity) { - return new CommandReadOperation<>(namespace.getDatabaseName(), + return new CommandReadOperation<>(namespace.getDatabaseName(), getCommandName(), (operationContext, serverDescription, connectionDescription) -> { BsonDocument command = getCommandCreator().create(operationContext, serverDescription, connectionDescription); applyMaxTimeMS(operationContext.getTimeoutContext(), command); @@ -214,7 +220,7 @@ private CommandReadTransformerAsync> private CommandCreator getCommandCreator() { return (operationContext, serverDescription, connectionDescription) -> { - BsonDocument commandDocument = new BsonDocument("mapReduce", new BsonString(namespace.getCollectionName())) + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(namespace.getCollectionName())) .append("map", getMapFunction()) .append("reduce", getReduceFunction()) .append("out", new BsonDocument("inline", new BsonInt32(1))); diff --git a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java index 06d392bceb2..f496ffa9505 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java @@ -49,6 +49,7 @@ import org.bson.BsonValue; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.Set; import java.util.function.Supplier; @@ -84,17 +85,20 @@ public class MixedBulkWriteOperation implements AsyncWriteOperation writeRequests, final boolean ordered, final WriteConcern writeConcern, final boolean retryWrites) { + notNull("writeRequests", writeRequests); + isTrueArgument("writeRequests is not an empty list", !writeRequests.isEmpty()); + this.commandName = notNull("commandName", writeRequests.get(0).getType().toString().toLowerCase(Locale.ROOT)); this.namespace = notNull("namespace", namespace); - this.writeRequests = notNull("writes", writeRequests); + this.writeRequests = writeRequests; this.ordered = ordered; this.writeConcern = notNull("writeConcern", writeConcern); this.retryWrites = retryWrites; - isTrueArgument("writes is not an empty list", !writeRequests.isEmpty()); } public MongoNamespace getNamespace() { @@ -175,6 +179,11 @@ private boolean shouldAttemptToRetryWrite(final RetryState retryState, final Thr return decision; } + @Override + public String getCommandName() { + return commandName; + } + @Override public BulkWriteResult execute(final WriteBinding binding) { TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); @@ -420,6 +429,7 @@ private BsonDocument executeCommand( final OperationContext operationContext, final Connection connection, final BulkWriteBatch batch) { + commandName = batch.getCommand().getFirstKey(); return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload()); } @@ -430,6 +440,7 @@ private void executeCommandAsync( final AsyncConnection connection, final BulkWriteBatch batch, final SingleResultCallback callback) { + commandName = batch.getCommand().getFirstKey(); connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload(), callback); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java index aa5d2e7d451..a60e60f58f5 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ReadOperation.java @@ -25,6 +25,11 @@ */ public interface ReadOperation { + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + /** * General execute which can return anything of type T * diff --git a/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java index fd727f2fd81..1ca81e215b5 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java @@ -48,6 +48,7 @@ *

This class is not part of the public API and may be removed or changed at any time

*/ public class RenameCollectionOperation implements AsyncWriteOperation, WriteOperation { + private static final String COMMAND_NAME = "renameCollection"; private final MongoNamespace originalNamespace; private final MongoNamespace newNamespace; private final WriteConcern writeConcern; @@ -73,6 +74,11 @@ public RenameCollectionOperation dropTarget(final boolean dropTarget) { return this; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> executeCommand(binding, "admin", getCommand(), connection, @@ -94,7 +100,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall } private BsonDocument getCommand() { - BsonDocument commandDocument = new BsonDocument("renameCollection", new BsonString(originalNamespace.getFullName())) + BsonDocument commandDocument = new BsonDocument(getCommandName(), new BsonString(originalNamespace.getFullName())) .append("to", new BsonString(newNamespace.getFullName())) .append("dropTarget", BsonBoolean.valueOf(dropTarget)); appendWriteConcernToCommand(writeConcern, commandDocument); diff --git a/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java index 8bf7ee76d25..e344cfb2b69 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java @@ -42,7 +42,7 @@ * *

This class is not part of the public API and may be removed or changed at any time

*/ -public abstract class TransactionOperation implements WriteOperation, AsyncWriteOperation { +public abstract class TransactionOperation implements AsyncWriteOperation, WriteOperation { private final WriteConcern writeConcern; TransactionOperation(final WriteConcern writeConcern) { @@ -82,12 +82,5 @@ CommandCreator getCommandCreator() { }; } - /** - * Gets the command name. - * - * @return the command name - */ - protected abstract String getCommandName(); - protected abstract Function getRetryCommandModifier(TimeoutContext timeoutContext); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/UpdateSearchIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/UpdateSearchIndexesOperation.java index 7bd33730680..ca23fd8e502 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/UpdateSearchIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/UpdateSearchIndexesOperation.java @@ -34,9 +34,14 @@ final class UpdateSearchIndexesOperation extends AbstractWriteSearchIndexOperati this.request = request; } + @Override + public String getCommandName() { + return COMMAND_NAME; + } + @Override BsonDocument buildCommand() { - return new BsonDocument(COMMAND_NAME, new BsonString(getNamespace().getCollectionName())) + return new BsonDocument(getCommandName(), new BsonString(getNamespace().getCollectionName())) .append("name", new BsonString(request.getIndexName())) .append("definition", request.getDefinition()); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java index 1a4fee36e1c..e7e606bd01a 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/WriteOperation.java @@ -25,6 +25,11 @@ */ public interface WriteOperation { + /** + * @return the command name of the operation, e.g. "insert", "update", "delete", "bulkWrite", etc. + */ + String getCommandName(); + /** * General execute which can return anything of type T * diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java index 09976e363d6..30792bf0487 100644 --- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java +++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java @@ -379,11 +379,7 @@ public static ReadWriteBinding getBinding(final ReadPreference readPreference) { } public static OperationContext createNewOperationContext(final TimeoutSettings timeoutSettings) { - return new OperationContext(OPERATION_CONTEXT.getId(), - OPERATION_CONTEXT.getRequestContext(), - OPERATION_CONTEXT.getSessionContext(), - new TimeoutContext(timeoutSettings), - OPERATION_CONTEXT.getServerApi()); + return OPERATION_CONTEXT.withTimeoutContext(new TimeoutContext(timeoutSettings)); } private static ReadWriteBinding getBinding(final Cluster cluster, diff --git a/driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java b/driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java index 4d8eb22cb7a..acd9c3d606e 100644 --- a/driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java +++ b/driver-legacy/src/main/com/mongodb/LegacyMixedBulkWriteOperation.java @@ -47,12 +47,8 @@ * Operation for bulk writes for the legacy API. */ final class LegacyMixedBulkWriteOperation implements WriteOperation { - private final WriteConcern writeConcern; - private final MongoNamespace namespace; - private final List writeRequests; + private final MixedBulkWriteOperation wrappedOperation; private final WriteRequest.Type type; - private final boolean ordered; - private final boolean retryWrites; private Boolean bypassDocumentValidation; static LegacyMixedBulkWriteOperation createBulkWriteOperationForInsert(final MongoNamespace namespace, final boolean ordered, @@ -79,17 +75,14 @@ static LegacyMixedBulkWriteOperation createBulkWriteOperationForDelete(final Mon private LegacyMixedBulkWriteOperation(final MongoNamespace namespace, final boolean ordered, final WriteConcern writeConcern, final boolean retryWrites, final List writeRequests, final WriteRequest.Type type) { - isTrueArgument("writeRequests not empty", !writeRequests.isEmpty()); - this.writeRequests = notNull("writeRequests", writeRequests); + notNull("writeRequests", writeRequests); + isTrueArgument("writeRequests is not an empty list", !writeRequests.isEmpty()); this.type = type; - this.ordered = ordered; - this.namespace = notNull("namespace", namespace); - this.writeConcern = notNull("writeConcern", writeConcern); - this.retryWrites = retryWrites; + this.wrappedOperation = new MixedBulkWriteOperation(namespace, writeRequests, ordered, writeConcern, retryWrites); } List getWriteRequests() { - return writeRequests; + return wrappedOperation.getWriteRequests(); } LegacyMixedBulkWriteOperation bypassDocumentValidation(@Nullable final Boolean bypassDocumentValidation) { @@ -97,11 +90,15 @@ LegacyMixedBulkWriteOperation bypassDocumentValidation(@Nullable final Boolean b return this; } + @Override + public String getCommandName() { + return wrappedOperation.getCommandName(); + } + @Override public WriteConcernResult execute(final WriteBinding binding) { try { - BulkWriteResult result = new MixedBulkWriteOperation(namespace, writeRequests, ordered, writeConcern, retryWrites) - .bypassDocumentValidation(bypassDocumentValidation).execute(binding); + BulkWriteResult result = wrappedOperation.bypassDocumentValidation(bypassDocumentValidation).execute(binding); if (result.wasAcknowledged()) { return translateBulkWriteResult(result); } else { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java index f8371c8afb6..a01dc7e3eae 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MapReducePublisherImpl.java @@ -232,6 +232,11 @@ AsyncReadOperation> getOperation() { return operation; } + @Override + public String getCommandName() { + return operation.getCommandName(); + } + @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { operation.executeAsync(binding, callback::onResult); @@ -249,6 +254,11 @@ AsyncWriteOperation getOperation() { return operation; } + @Override + public String getCommandName() { + return operation.getCommandName(); + } + @Override public void executeAsync(final AsyncWriteBinding binding, final SingleResultCallback callback) { operation.executeAsync(binding, (result, t) -> callback.onResult(null, t)); diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java index 0a4b0318d1c..4cbe9804777 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/OperationExecutorImpl.java @@ -84,7 +84,7 @@ public Mono execute(final AsyncReadOperation operation, final ReadPref return Mono.from(subscriber -> clientSessionHelper.withClientSession(session, this) .map(clientSession -> getReadWriteBinding(getContext(subscriber), - readPreference, readConcern, clientSession, session == null)) + readPreference, readConcern, clientSession, session == null, operation.getCommandName())) .flatMap(binding -> { if (session != null && session.hasActiveTransaction() && !binding.getReadPreference().equals(primary())) { binding.release(); @@ -119,7 +119,7 @@ public Mono execute(final AsyncWriteOperation operation, final ReadCon return Mono.from(subscriber -> clientSessionHelper.withClientSession(session, this) .map(clientSession -> getReadWriteBinding(getContext(subscriber), - primary(), readConcern, clientSession, session == null)) + primary(), readConcern, clientSession, session == null, operation.getCommandName())) .flatMap(binding -> Mono.create(sink -> operation.executeAsync(binding, (result, t) -> { try { @@ -176,11 +176,11 @@ private void unpinServerAddressOnTransientTransactionError(@Nullable final Clien private AsyncReadWriteBinding getReadWriteBinding(final RequestContext requestContext, final ReadPreference readPreference, final ReadConcern readConcern, final ClientSession session, - final boolean ownsSession) { + final boolean ownsSession, final String commandName) { notNull("readPreference", readPreference); AsyncClusterAwareReadWriteBinding readWriteBinding = new AsyncClusterBinding(mongoClient.getCluster(), getReadPreferenceForBinding(readPreference, session), readConcern, - getOperationContext(requestContext, session, readConcern)); + getOperationContext(requestContext, session, readConcern, commandName)); Crypt crypt = mongoClient.getCrypt(); if (crypt != null) { @@ -196,12 +196,13 @@ private AsyncReadWriteBinding getReadWriteBinding(final RequestContext requestCo } private OperationContext getOperationContext(final RequestContext requestContext, final ClientSession session, - final ReadConcern readConcern) { + final ReadConcern readConcern, final String commandName) { return new OperationContext( requestContext, new ReadConcernAwareNoOpSessionContext(readConcern), createTimeoutContext(session, timeoutSettings), - mongoClient.getSettings().getServerApi()); + mongoClient.getSettings().getServerApi(), + commandName); } private ReadPreference getReadPreferenceForBinding(final ReadPreference readPreference, @Nullable final ClientSession session) { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java index 17a54c345a5..8352b5fe225 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidReadOperationThenCursorReadOperation.java @@ -39,6 +39,11 @@ public AsyncReadOperation> getCursorReadOperation() { return cursorReadOperation; } + @Override + public String getCommandName() { + return readOperation.getCommandName(); + } + @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { readOperation.executeAsync(binding, (result, t) -> { diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java index bde5811a713..e0f812f0579 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/VoidWriteOperationThenCursorReadOperation.java @@ -33,6 +33,11 @@ class VoidWriteOperationThenCursorReadOperation implements AsyncReadOperation this.cursorReadOperation = cursorReadOperation; } + @Override + public String getCommandName() { + return writeOperation.getCommandName(); + } + @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { writeOperation.executeAsync((AsyncWriteBinding) binding, (result, t) -> { diff --git a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java index 8a0107aafeb..541bd9d3518 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MapReduceIterableImpl.java @@ -232,6 +232,11 @@ ReadOperation> getOperation() { this.operation = operation; } + @Override + public String getCommandName() { + return operation.getCommandName(); + } + @Override public BatchCursor execute(final ReadBinding binding) { return operation.execute(binding); diff --git a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java index 9c0033e42a7..0430d9407c1 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/MongoClusterImpl.java @@ -415,7 +415,8 @@ public T execute(final ReadOperation operation, final ReadPreference read } ClientSession actualClientSession = getClientSession(session); - ReadBinding binding = getReadBinding(readPreference, readConcern, actualClientSession, session == null); + ReadBinding binding = getReadBinding(readPreference, readConcern, actualClientSession, session == null, + operation.getCommandName()); try { if (actualClientSession.hasActiveTransaction() && !binding.getReadPreference().equals(primary())) { @@ -440,7 +441,7 @@ public T execute(final WriteOperation operation, final ReadConcern readCo } ClientSession actualClientSession = getClientSession(session); - WriteBinding binding = getWriteBinding(readConcern, actualClientSession, session == null); + WriteBinding binding = getWriteBinding(readConcern, actualClientSession, session == null, operation.getCommandName()); try { return operation.execute(binding); @@ -467,20 +468,23 @@ public TimeoutSettings getTimeoutSettings() { return executorTimeoutSettings; } - WriteBinding getWriteBinding(final ReadConcern readConcern, final ClientSession session, final boolean ownsSession) { - return getReadWriteBinding(primary(), readConcern, session, ownsSession); + WriteBinding getWriteBinding(final ReadConcern readConcern, final ClientSession session, final boolean ownsSession, + final String commandName) { + return getReadWriteBinding(primary(), readConcern, session, ownsSession, commandName); } ReadBinding getReadBinding(final ReadPreference readPreference, final ReadConcern readConcern, final ClientSession session, - final boolean ownsSession) { - return getReadWriteBinding(readPreference, readConcern, session, ownsSession); + final boolean ownsSession, final String commandName) { + return getReadWriteBinding(readPreference, readConcern, session, ownsSession, commandName); } ReadWriteBinding getReadWriteBinding(final ReadPreference readPreference, - final ReadConcern readConcern, final ClientSession session, final boolean ownsSession) { + final ReadConcern readConcern, final ClientSession session, final boolean ownsSession, + final String commandName) { ClusterAwareReadWriteBinding readWriteBinding = new ClusterBinding(cluster, - getReadPreferenceForBinding(readPreference, session), readConcern, getOperationContext(session, readConcern)); + getReadPreferenceForBinding(readPreference, session), readConcern, + getOperationContext(session, readConcern, commandName)); if (crypt != null) { readWriteBinding = new CryptBinding(readWriteBinding, crypt); @@ -489,12 +493,13 @@ ReadWriteBinding getReadWriteBinding(final ReadPreference readPreference, return new ClientSessionBinding(session, ownsSession, readWriteBinding); } - private OperationContext getOperationContext(final ClientSession session, final ReadConcern readConcern) { + private OperationContext getOperationContext(final ClientSession session, final ReadConcern readConcern, final String commandName) { return new OperationContext( getRequestContext(), new ReadConcernAwareNoOpSessionContext(readConcern), createTimeoutContext(session, executorTimeoutSettings), - serverApi); + serverApi, + commandName); } private RequestContext getRequestContext() { diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java index 0be87ee3415..e067e36d993 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTest.java @@ -91,7 +91,6 @@ import static com.mongodb.client.unified.UnifiedTestModifications.testDef; import static java.lang.String.format; import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -374,9 +373,7 @@ public void shouldPassAllOutcomes( } if (definition.containsKey("expectLogMessages")) { - ArrayList tweaks = new ArrayList<>(singletonList( - // `LogMessage.Entry.Name.OPERATION` is not supported, therefore we skip matching its value - LogMatcher.Tweak.skip(LogMessage.Entry.Name.OPERATION))); + ArrayList tweaks = new ArrayList<>(); if (getMongoClientSettings().getClusterSettings() .getHosts().stream().anyMatch(serverAddress -> serverAddress instanceof UnixServerAddress)) { tweaks.add(LogMatcher.Tweak.skip(LogMessage.Entry.Name.SERVER_PORT)); diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java index 8f43b58b7d0..3cf703d6645 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedTestModifications.java @@ -104,14 +104,12 @@ public static void applyCustomizations(final TestDef def) { .test("atlas-data-lake-testing", "getMore", "A successful find event with getMore"); // connection-monitoring-and-pooling - - // TODO-JAVA-5711 reason, jira - // added as part of https://jira.mongodb.org/browse/JAVA-4976 , but unknown Jira to complete - // The implementation of the functionality related to clearing the connection pool before closing the connection - // will be carried out once the specification is finalized and ready. - def.skipUnknownReason("") + def.skipNoncompliant("According to the test, we should clear the pool then close the connection. Our implementation" + + "immediately closes the failed connection, then clears the pool.") .test("connection-monitoring-and-pooling/tests/logging", "connection-logging", "Connection checkout fails due to error establishing connection"); - def.skipUnknownReason("") + + + def.skipNoncompliant("Driver does not support waitQueueSize or waitQueueMultiple options") .test("connection-monitoring-and-pooling/tests/logging", "connection-pool-options", "waitQueueSize should be included in connection pool created message when specified") .test("connection-monitoring-and-pooling/tests/logging", "connection-pool-options", "waitQueueMultiple should be included in connection pool created message when specified"); From 7db12f4fa076dd1a0af2693063e287e694e8fed2 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 7 Jul 2025 14:25:31 +0100 Subject: [PATCH 2/4] Add overrides --- .../com/mongodb/internal/operation/FindAndDeleteOperation.java | 2 ++ .../com/mongodb/internal/operation/FindAndReplaceOperation.java | 2 ++ .../com/mongodb/internal/operation/FindAndUpdateOperation.java | 2 ++ 3 files changed, 6 insertions(+) diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java index df200916152..db9d61b1dd4 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java @@ -88,10 +88,12 @@ public FindAndDeleteOperation let(@Nullable final BsonDocument variables) { return this; } + @Override protected FieldNameValidator getFieldNameValidator() { return NoOpFieldNameValidator.INSTANCE; } + @Override protected void specializeCommand(final BsonDocument commandDocument, final ConnectionDescription connectionDescription) { commandDocument.put("remove", BsonBoolean.TRUE); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java index 59362cc667d..7073260a4c7 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java @@ -130,12 +130,14 @@ public FindAndReplaceOperation let(@Nullable final BsonDocument variables) { return this; } + @Override protected FieldNameValidator getFieldNameValidator() { return new MappedFieldNameValidator( NoOpFieldNameValidator.INSTANCE, singletonMap("update", ReplacingDocumentFieldNameValidator.INSTANCE)); } + @Override protected void specializeCommand(final BsonDocument commandDocument, final ConnectionDescription connectionDescription) { commandDocument.put("new", new BsonBoolean(!isReturnOriginal())); putIfTrue(commandDocument, "upsert", isUpsert()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java index bba62d62628..e83deba30f3 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java @@ -159,10 +159,12 @@ public FindAndUpdateOperation let(@Nullable final BsonDocument variables) { return this; } + @Override protected FieldNameValidator getFieldNameValidator() { return new MappedFieldNameValidator(NoOpFieldNameValidator.INSTANCE, singletonMap("update", new UpdateFieldNameValidator())); } + @Override protected void specializeCommand(final BsonDocument commandDocument, final ConnectionDescription connectionDescription) { commandDocument.put("new", new BsonBoolean(!isReturnOriginal())); putIfTrue(commandDocument, "upsert", isUpsert()); From 1de5a192f0da69c0fd4eb88a0f4cce92b515b11d Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 7 Jul 2025 14:33:46 +0100 Subject: [PATCH 3/4] Renamed commandName to operationName in operation context --- .../internal/connection/BaseCluster.java | 8 +++---- .../internal/connection/OperationContext.java | 22 +++++++++---------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index f4ec77a3af3..745f41c2891 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -545,7 +545,7 @@ static void logServerSelectionStarted( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection started", clusterId, asList( - new Entry(OPERATION, operationContext.getCommandName()), + new Entry(OPERATION, operationContext.getOperationName()), new Entry(OPERATION_ID, operationContext.getId()), new Entry(SELECTOR, serverSelector.toString()), new Entry(TOPOLOGY_DESCRIPTION, clusterDescription.getShortDescription())), @@ -563,7 +563,7 @@ private static void logServerSelectionWaiting( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, INFO, "Waiting for suitable server to become available", clusterId, asList( - new Entry(OPERATION, operationContext.getCommandName()), + new Entry(OPERATION, operationContext.getOperationName()), new Entry(OPERATION_ID, operationContext.getId()), timeout.call(MILLISECONDS, () -> new Entry(REMAINING_TIME_MS, "infinite"), @@ -592,7 +592,7 @@ private static void logServerSelectionFailed( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection failed", clusterId, asList( - new Entry(OPERATION, operationContext.getCommandName()), + new Entry(OPERATION, operationContext.getOperationName()), new Entry(OPERATION_ID, operationContext.getId()), new Entry(FAILURE, failureDescription), new Entry(SELECTOR, serverSelector.toString()), @@ -611,7 +611,7 @@ static void logServerSelectionSucceeded( STRUCTURED_LOGGER.log(new LogMessage( SERVER_SELECTION, DEBUG, "Server selection succeeded", clusterId, asList( - new Entry(OPERATION, operationContext.getCommandName()), + new Entry(OPERATION, operationContext.getOperationName()), new Entry(OPERATION_ID, operationContext.getId()), new Entry(SERVER_HOST, serverAddress.getHost()), new Entry(SERVER_PORT, serverAddress instanceof UnixServerAddress ? null : serverAddress.getPort()), diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java index cbeff997ad6..9d8ad0a432a 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java +++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java @@ -50,7 +50,7 @@ public class OperationContext { @Nullable private final ServerApi serverApi; @Nullable - private final String commandName; + private final String operationName; public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, @Nullable final ServerApi serverApi) { @@ -58,8 +58,8 @@ public OperationContext(final RequestContext requestContext, final SessionContex } public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext, - @Nullable final ServerApi serverApi, @Nullable final String commandName) { - this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, commandName); + @Nullable final ServerApi serverApi, @Nullable final String operationName) { + this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, operationName); } public static OperationContext simpleOperationContext( @@ -82,11 +82,11 @@ public static OperationContext simpleOperationContext(final TimeoutContext timeo } public OperationContext withSessionContext(final SessionContext sessionContext) { - return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, commandName); + return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName); } public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) { - return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, commandName); + return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName); } public long getId() { @@ -111,8 +111,8 @@ public ServerApi getServerApi() { } @Nullable - public String getCommandName() { - return commandName; + public String getOperationName() { + return operationName; } @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) @@ -122,14 +122,14 @@ public OperationContext(final long id, final TimeoutContext timeoutContext, final ServerDeprioritization serverDeprioritization, @Nullable final ServerApi serverApi, - @Nullable final String commandName) { + @Nullable final String operationName) { this.id = id; this.serverDeprioritization = serverDeprioritization; this.requestContext = requestContext; this.sessionContext = sessionContext; this.timeoutContext = timeoutContext; this.serverApi = serverApi; - this.commandName = commandName; + this.operationName = operationName; } @VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE) @@ -138,14 +138,14 @@ public OperationContext(final long id, final SessionContext sessionContext, final TimeoutContext timeoutContext, @Nullable final ServerApi serverApi, - @Nullable final String commandName) { + @Nullable final String operationName) { this.id = id; this.serverDeprioritization = new ServerDeprioritization(); this.requestContext = requestContext; this.sessionContext = sessionContext; this.timeoutContext = timeoutContext; this.serverApi = serverApi; - this.commandName = commandName; + this.operationName = operationName; } From cd608410f5ef626c7ff1330195510939f95eec83 Mon Sep 17 00:00:00 2001 From: Ross Lawley Date: Mon, 7 Jul 2025 14:37:48 +0100 Subject: [PATCH 4/4] Ensure bulk operations use and operationContext with the batches commandName --- .../com/mongodb/internal/connection/OperationContext.java | 4 ++++ .../mongodb/internal/operation/MixedBulkWriteOperation.java | 5 +++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java index 9d8ad0a432a..7e0de92da1d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java +++ b/driver-core/src/main/com/mongodb/internal/connection/OperationContext.java @@ -89,6 +89,10 @@ public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName); } + public OperationContext withOperationName(final String operationName) { + return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName); + } + public long getId() { return id; } diff --git a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java index f496ffa9505..9bc947f0450 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java @@ -431,7 +431,7 @@ private BsonDocument executeCommand( final BulkWriteBatch batch) { commandName = batch.getCommand().getFirstKey(); return connection.command(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), - operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload()); + operationContext.withOperationName(commandName), shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload()); } private void executeCommandAsync( @@ -442,7 +442,8 @@ private void executeCommandAsync( final SingleResultCallback callback) { commandName = batch.getCommand().getFirstKey(); connection.commandAsync(namespace.getDatabaseName(), batch.getCommand(), NoOpFieldNameValidator.INSTANCE, null, batch.getDecoder(), - operationContext, shouldExpectResponse(batch, effectiveWriteConcern), batch.getPayload(), callback); + operationContext.withOperationName(commandName), shouldExpectResponse(batch, effectiveWriteConcern), + batch.getPayload(), callback); } private boolean shouldExpectResponse(final BulkWriteBatch batch, final WriteConcern effectiveWriteConcern) {