Skip to content

Commit de23d90

Browse files
committed
- Adding tracing at a central location instead of spreading it at operation level
1 parent 7144126 commit de23d90

File tree

12 files changed

+179
-75
lines changed

12 files changed

+179
-75
lines changed

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@
100100
import static com.mongodb.internal.tracing.Tags.CLIENT_CONNECTION_ID;
101101
import static com.mongodb.internal.tracing.Tags.CURSOR_ID;
102102
import static com.mongodb.internal.tracing.Tags.NAMESPACE;
103-
import static com.mongodb.internal.tracing.Tags.QUERY_OPCODE;
104103
import static com.mongodb.internal.tracing.Tags.QUERY_SUMMARY;
105104
import static com.mongodb.internal.tracing.Tags.QUERY_TEXT;
106105
import static com.mongodb.internal.tracing.Tags.SERVER_ADDRESS;
@@ -1038,8 +1037,7 @@ private Span createTracingSpan(final CommandMessage message, final OperationCont
10381037
.addSpan("Command " + commandName, parentContext)
10391038
.tag(SYSTEM, "mongodb")
10401039
.tag(NAMESPACE, message.getNamespace().getDatabaseName())
1041-
.tag(QUERY_SUMMARY, command.toString())
1042-
.tag(QUERY_OPCODE, String.valueOf(message.getOpCode()));
1040+
.tag(QUERY_SUMMARY, command.toString());
10431041

10441042
if (cursorId != -1) {
10451043
span.tag(CURSOR_ID, cursorId);

driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.mongodb.internal.binding.ReadBinding;
2929
import com.mongodb.internal.client.model.AggregationLevel;
3030
import com.mongodb.internal.connection.OperationContext;
31+
import com.mongodb.internal.tracing.TracingManager;
3132
import com.mongodb.lang.Nullable;
3233
import org.bson.BsonArray;
3334
import org.bson.BsonBoolean;
@@ -242,9 +243,14 @@ BsonDocument getCommand(final OperationContext operationContext, final int maxWi
242243
}
243244

244245
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
245-
return (result, source, connection) ->
246-
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
247-
getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection);
246+
return (result, source, connection) -> {
247+
OperationContext operationContext = source.getOperationContext();
248+
249+
// register cursor id with the operation context, so 'getMore' commands can be folded under the 'find' operation
250+
TracingManager.linkCursorWithOperation(result, operationContext);
251+
return new CommandBatchCursor<>(getTimeoutMode(), result, batchSize != null ? batchSize : 0,
252+
getMaxTimeForCursor(source.getOperationContext().getTimeoutContext()), decoder, comment, source, connection);
253+
};
248254
}
249255

250256
private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ public void remove() {
173173

174174
@Override
175175
public void close() {
176-
operationContext.getTracingManager().removeCursorParentContext(cursorId);
176+
if (cursorId != -1) {
177+
operationContext.getTracingManager().removeCursorParentContext(cursorId);
178+
}
177179
resourceManager.close();
178180
}
179181

driver-core/src/main/com/mongodb/internal/operation/FindOperation.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import static com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute;
6363
import static com.mongodb.internal.operation.SyncOperationHelper.decorateReadWithRetries;
6464
import static com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection;
65-
import static com.mongodb.internal.tracing.TracingManager.runWithTracing;
6665

6766
/**
6867
* An operation that queries a collection using the provided criteria.
@@ -298,22 +297,21 @@ public BatchCursor<T> execute(final ReadBinding binding) {
298297
if (invalidTimeoutModeException != null) {
299298
throw invalidTimeoutModeException;
300299
}
301-
OperationContext operationContext = binding.getOperationContext();
302300

303-
RetryState retryState = initialRetryState(retryReads, operationContext.getTimeoutContext());
304-
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, operationContext, () ->
301+
RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext());
302+
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
305303
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
306-
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), operationContext));
304+
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getOperationContext()));
307305
try {
308-
return createReadCommandAndExecute(retryState, operationContext, source, namespace.getDatabaseName(),
306+
return createReadCommandAndExecute(retryState, binding.getOperationContext(), source, namespace.getDatabaseName(),
309307
getCommandCreator(), CommandResultDocumentCodec.create(decoder, FIRST_BATCH),
310308
transformer(), connection);
311309
} catch (MongoCommandException e) {
312310
throw new MongoQueryException(e.getResponse(), e.getServerAddress());
313311
}
314312
})
315313
);
316-
return runWithTracing(read, operationContext, "find", namespace);
314+
return read.get();
317315
}
318316

319317
@Override

driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@
7272
import static com.mongodb.internal.operation.OperationHelper.validateWriteRequests;
7373
import static com.mongodb.internal.operation.OperationHelper.validateWriteRequestsAndCompleteIfInvalid;
7474
import static com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection;
75-
import static com.mongodb.internal.tracing.TracingManager.runWithTracing;
7675

7776
/**
7877
* An operation to execute a series of write operations in bulk.
@@ -188,7 +187,6 @@ public String getCommandName() {
188187
@Override
189188
public BulkWriteResult execute(final WriteBinding binding) {
190189
TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext();
191-
OperationContext operationContext = binding.getOperationContext();
192190
/* We cannot use the tracking of attempts built in the `RetryState` class because conceptually we have to maintain multiple attempt
193191
* counters while executing a single bulk write operation:
194192
* - a counter that limits attempts to select server and checkout a connection before we created a batch;
@@ -198,12 +196,12 @@ public BulkWriteResult execute(final WriteBinding binding) {
198196
* and the code related to the attempt tracking in `BulkWriteTracker` will be removed. */
199197
RetryState retryState = new RetryState(timeoutContext);
200198
BulkWriteTracker.attachNew(retryState, retryWrites, timeoutContext);
201-
Supplier<BulkWriteResult> retryingBulkWrite = decorateWriteWithRetries(retryState, operationContext, () ->
199+
Supplier<BulkWriteResult> retryingBulkWrite = decorateWriteWithRetries(retryState, binding.getOperationContext(), () ->
202200
withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> {
203201
ConnectionDescription connectionDescription = connection.getDescription();
204202
// attach `maxWireVersion` ASAP because it is used to check whether we can retry
205203
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
206-
SessionContext sessionContext = operationContext.getSessionContext();
204+
SessionContext sessionContext = binding.getOperationContext().getSessionContext();
207205
WriteConcern writeConcern = validateAndGetEffectiveWriteConcern(this.writeConcern, sessionContext);
208206
if (!isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext)) {
209207
handleMongoWriteConcernWithResponseException(retryState, true, timeoutContext);
@@ -212,13 +210,13 @@ public BulkWriteResult execute(final WriteBinding binding) {
212210
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
213211
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
214212
connectionDescription, ordered, writeConcern,
215-
bypassDocumentValidation, retryWrites, writeRequests, operationContext, comment, variables), timeoutContext);
213+
bypassDocumentValidation, retryWrites, writeRequests, binding.getOperationContext(), comment, variables), timeoutContext);
216214
}
217215
return executeBulkWriteBatch(retryState, writeConcern, binding, connection);
218216
})
219217
);
220218
try {
221-
return runWithTracing(retryingBulkWrite, operationContext, "MixedBulkWriteOperation", namespace);
219+
return retryingBulkWrite.get();
222220
} catch (MongoException e) {
223221
throw transformWriteException(e);
224222
}

driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
3737
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableWrite;
3838
import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer;
39-
import static com.mongodb.internal.tracing.TracingManager.runWithTracing;
4039

4140
/**
4241
* A base class for transaction-related operations
@@ -58,10 +57,9 @@ public WriteConcern getWriteConcern() {
5857
public Void execute(final WriteBinding binding) {
5958
isTrue("in transaction", binding.getOperationContext().getSessionContext().hasActiveTransaction());
6059
TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext();
61-
// Add a span for 'commit' or 'abort' operation if tracing is enabled
62-
return runWithTracing(() -> executeRetryableWrite(binding, "admin", null, NoOpFieldNameValidator.INSTANCE,
63-
new BsonDocumentCodec(), getCommandCreator(),
64-
writeConcernErrorTransformer(timeoutContext), getRetryCommandModifier(timeoutContext)), binding.getOperationContext(), getCommandName(), null);
60+
return executeRetryableWrite(binding, "admin", null, NoOpFieldNameValidator.INSTANCE,
61+
new BsonDocumentCodec(), getCommandCreator(),
62+
writeConcernErrorTransformer(timeoutContext), getRetryCommandModifier(timeoutContext));
6563
}
6664

6765
@Override

driver-core/src/main/com/mongodb/internal/tracing/Tags.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ private Tags() {
3030
public static final String NAMESPACE = "db.namespace";
3131
public static final String COLLECTION = "db.collection.name";
3232
public static final String QUERY_SUMMARY = "db.query.summary";
33-
public static final String QUERY_OPCODE = "db.query.opcode";
3433
public static final String QUERY_TEXT = "db.query.text";
3534
public static final String CURSOR_ID = "db.mongodb.cursor_id";
3635
public static final String SERVER_ADDRESS = "server.address";

driver-core/src/main/com/mongodb/internal/tracing/TracingManager.java

Lines changed: 17 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.mongodb.internal.tracing;
1818

19-
import com.mongodb.MongoNamespace;
2019
import com.mongodb.internal.connection.OperationContext;
2120
import com.mongodb.lang.Nullable;
2221
import com.mongodb.session.ServerSession;
@@ -26,10 +25,10 @@
2625
import java.util.concurrent.ConcurrentHashMap;
2726
import java.util.function.Supplier;
2827

29-
import static com.mongodb.internal.tracing.Tags.COLLECTION;
28+
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
3029
import static com.mongodb.internal.tracing.Tags.CURSOR_ID;
31-
import static com.mongodb.internal.tracing.Tags.NAMESPACE;
3230
import static com.mongodb.internal.tracing.Tags.SYSTEM;
31+
import static java.lang.String.format;
3332

3433
/**
3534
* Manages tracing spans for MongoDB driver activities.
@@ -106,6 +105,7 @@ public Span addSpan(final String name, final Long operationId) {
106105
*/
107106
public Span addTransactionSpan(final ServerSession session) {
108107
Span span = tracer.nextSpan("transaction", parentContext);
108+
span.tag(SYSTEM, "mongodb");
109109
transactions.put(getTransactionId(session), span.context());
110110
return span;
111111
}
@@ -176,16 +176,14 @@ public boolean isEnabled() {
176176
*
177177
* @param supplier The supplier to execute.
178178
* @param operationContext The operation context.
179-
* @param opName The name of the operation.
180-
* @param namespace The MongoDB namespace, or null if none exists.
181179
* @param <T> The type of the result.
182180
* @return The result of the supplier.
183181
*/
184-
public static <T> T runWithTracing(final Supplier<T> supplier, final OperationContext operationContext, final String opName,
185-
@Nullable final MongoNamespace namespace) {
182+
public static <T> T runWithTracing(final Supplier<T> supplier, final OperationContext operationContext) {
186183
TracingManager tracingManager = operationContext.getTracingManager();
187-
Span tracingSpan = tracingManager.addOperationSpan(buildSpanName(opName, namespace), operationContext);
188-
addNamespaceTags(tracingSpan, namespace);
184+
String opName = operationContext.getOperationName();
185+
Span tracingSpan = tracingManager.addOperationSpan(opName == null ? "operation" : opName, operationContext);
186+
tracingSpan.tag(SYSTEM, "mongodb");
189187

190188
try {
191189
return supplier.get();
@@ -207,7 +205,9 @@ public static <T> T runWithTracing(final Supplier<T> supplier, final OperationCo
207205
*/
208206
public static void linkCursorWithOperation(final BsonDocument queryResult, final OperationContext operationContext) {
209207
long cursorId = queryResult.getDocument("cursor").getInt64("id").longValue();
210-
operationContext.getTracingManager().addCursorParentContext(cursorId, operationContext.getId());
208+
if (cursorId != 0) { // cursorId == 0 indicates no cursor is needed, all results are in the same first batch
209+
operationContext.getTracingManager().addCursorParentContext(cursorId, operationContext.getId());
210+
}
211211
}
212212

213213
/**
@@ -244,36 +244,13 @@ private Span addOperationSpan(final String name, final OperationContext operatio
244244
* @param operationId The ID of the operation.
245245
*/
246246
private void addCursorParentContext(final long cursorId, final long operationId) {
247-
if (!operations.containsKey(operationId)) {
248-
throw new IllegalArgumentException("Operation ID " + operationId + " does not exist.");
249-
}
250-
Span operationSpan = operations.get(operationId);
251-
operationSpan.tag(CURSOR_ID, cursorId);
252-
cursors.put(cursorId, operationSpan.context());
253-
}
254-
255-
/**
256-
* Builds a span name based on the operation name and namespace.
257-
*
258-
* @param opName The name of the operation.
259-
* @param namespace The MongoDB namespace, or null if none exists.
260-
* @return The span name.
261-
*/
262-
private static String buildSpanName(final String opName, @Nullable final MongoNamespace namespace) {
263-
return namespace != null ? opName + " " + namespace.getFullName() : opName;
264-
}
265-
266-
/**
267-
* Adds namespace-related tags to the specified span.
268-
*
269-
* @param span The span to add tags to.
270-
* @param namespace The MongoDB namespace, or null if none exists.
271-
*/
272-
private static void addNamespaceTags(final Span span, @Nullable final MongoNamespace namespace) {
273-
span.tag(SYSTEM, "mongodb");
274-
if (namespace != null) {
275-
span.tag(NAMESPACE, namespace.getDatabaseName());
276-
span.tag(COLLECTION, namespace.getCollectionName());
247+
if (operations.containsKey(operationId)) {
248+
Span operationSpan = operations.get(operationId);
249+
operationSpan.tag(CURSOR_ID, cursorId);
250+
cursors.put(cursorId, operationSpan.context());
251+
} else {
252+
// log warning message
253+
LOGGER.warn(format("Trying to link cursor %s with non-existent operation %s", cursorId, operationId));
277254
}
278255
}
279256

driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra
264264
MongoException exceptionToHandle = OperationHelper.unwrap((MongoException) e);
265265
if (exceptionToHandle.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)
266266
&& ClientSessionClock.INSTANCE.now() - startTime < MAX_RETRY_TIME_LIMIT_MS) {
267-
spanFinalizing();
267+
spanFinalizing(false);
268268
continue;
269269
}
270270
}
@@ -285,7 +285,7 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra
285285
&& e.hasErrorLabel(UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL)) {
286286
continue;
287287
} else if (e.hasErrorLabel(TRANSIENT_TRANSACTION_ERROR_LABEL)) {
288-
spanFinalizing();
288+
spanFinalizing(true);
289289
continue outer;
290290
}
291291
}
@@ -296,7 +296,7 @@ public <T> T withTransaction(final TransactionBody<T> transactionBody, final Tra
296296
return retVal;
297297
}
298298
} finally {
299-
spanFinalizing();
299+
spanFinalizing(true);
300300
}
301301
}
302302

@@ -351,13 +351,13 @@ private void handleTransactionSpanError(final Throwable e) {
351351
} else {
352352
// report error as Span error
353353
transactionSpan.error(e);
354+
tracingManager.cleanupTransactionContext(this.getServerSession());
354355
}
355356

356357
if (!isConvenientTransaction) {
357358
transactionSpan.end();
358359
transactionSpan = null;
359360
}
360-
tracingManager.cleanupTransactionContext(this.getServerSession());
361361
}
362362
}
363363

@@ -374,15 +374,19 @@ private void finalizeTransactionSpan(final String status) {
374374
}
375375
}
376376

377-
private void spanFinalizing() {
377+
private void spanFinalizing(boolean cleanupTransactionContext) {
378378
if (transactionSpan != null) {
379379
if (lastSpanEvent != null) {
380380
transactionSpan.error(lastSpanEvent);
381381
}
382382
transactionSpan.end();
383383
}
384-
isConvenientTransaction = false;
385384
transactionSpan = null;
386385
lastSpanEvent = null;
386+
// Don't clean-up transaction context if we're still retrying (we want the retries to fold under the original transaction span)
387+
if (cleanupTransactionContext) {
388+
tracingManager.cleanupTransactionContext(this.getServerSession());
389+
isConvenientTransaction = false;
390+
}
387391
}
388392
}

0 commit comments

Comments
 (0)