Skip to content

Commit 9f3358b

Browse files
committed
Add tracing support using Micrometer
1 parent 141ee1e commit 9f3358b

File tree

23 files changed

+529
-30
lines changed

23 files changed

+529
-30
lines changed

driver-core/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ dependencies {
5454

5555
optionalImplementation(libs.snappy.java)
5656
optionalImplementation(libs.zstd.jni)
57+
optionalImplementation(libs.micrometer)
5758

5859
testImplementation(project(path = ":bson", configuration = "testArtifacts"))
5960
testImplementation(libs.reflections)

driver-core/src/main/com/mongodb/MongoClientSettings.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,11 @@
3030
import com.mongodb.connection.SslSettings;
3131
import com.mongodb.connection.TransportSettings;
3232
import com.mongodb.event.CommandListener;
33+
import com.mongodb.internal.tracing.TracingManager;
3334
import com.mongodb.lang.Nullable;
3435
import com.mongodb.spi.dns.DnsClient;
3536
import com.mongodb.spi.dns.InetAddressResolver;
37+
import com.mongodb.tracing.Tracer;
3638
import org.bson.UuidRepresentation;
3739
import org.bson.codecs.BsonCodecProvider;
3840
import org.bson.codecs.BsonValueCodecProvider;
@@ -118,6 +120,7 @@ public final class MongoClientSettings {
118120
private final InetAddressResolver inetAddressResolver;
119121
@Nullable
120122
private final Long timeoutMS;
123+
private final TracingManager tracingManager;
121124

122125
/**
123126
* Gets the default codec registry. It includes the following providers:
@@ -238,6 +241,7 @@ public static final class Builder {
238241
private ContextProvider contextProvider;
239242
private DnsClient dnsClient;
240243
private InetAddressResolver inetAddressResolver;
244+
private TracingManager tracingManager;
241245

242246
private Builder() {
243247
}
@@ -275,6 +279,7 @@ private Builder(final MongoClientSettings settings) {
275279
if (settings.heartbeatSocketTimeoutSetExplicitly) {
276280
heartbeatSocketTimeoutMS = settings.heartbeatSocketSettings.getReadTimeout(MILLISECONDS);
277281
}
282+
tracingManager = settings.tracingManager;
278283
}
279284

280285
/**
@@ -723,6 +728,20 @@ Builder heartbeatSocketTimeoutMS(final int heartbeatSocketTimeoutMS) {
723728
return this;
724729
}
725730

731+
/**
732+
* Sets the tracer to use for creating Spans for operations and commands.
733+
*
734+
* @param tracer the tracer
735+
* @see com.mongodb.tracing.MicrometerTracer
736+
* @return this
737+
* @since 5.5
738+
*/
739+
@Alpha(Reason.CLIENT)
740+
public Builder tracer(final Tracer tracer) {
741+
this.tracingManager = new TracingManager(tracer);
742+
return this;
743+
}
744+
726745
/**
727746
* Build an instance of {@code MongoClientSettings}.
728747
*
@@ -1040,6 +1059,17 @@ public ContextProvider getContextProvider() {
10401059
return contextProvider;
10411060
}
10421061

1062+
/**
1063+
* Get the tracer to create Spans for operations and commands.
1064+
*
1065+
* @return this
1066+
* @since 5.5
1067+
*/
1068+
@Alpha(Reason.CLIENT)
1069+
public TracingManager getTracingManager() {
1070+
return tracingManager;
1071+
}
1072+
10431073
@Override
10441074
public boolean equals(final Object o) {
10451075
if (this == o) {
@@ -1156,5 +1186,6 @@ private MongoClientSettings(final Builder builder) {
11561186
heartbeatConnectTimeoutSetExplicitly = builder.heartbeatConnectTimeoutMS != 0;
11571187
contextProvider = builder.contextProvider;
11581188
timeoutMS = builder.timeoutMS;
1189+
tracingManager = builder.tracingManager;
11591190
}
11601191
}

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) {
186186
}
187187
}
188188

189+
BsonDocument getCommand() {
190+
return command;
191+
}
192+
189193
/**
190194
* Get the field name from a buffer positioned at the start of the document sequence identifier of an OP_MSG Section of type
191195
* `PAYLOAD_TYPE_1_DOCUMENT_SEQUENCE`.

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@
5151
import com.mongodb.internal.logging.StructuredLogger;
5252
import com.mongodb.internal.session.SessionContext;
5353
import com.mongodb.internal.time.Timeout;
54+
import com.mongodb.internal.tracing.Span;
55+
import com.mongodb.internal.tracing.TraceContext;
56+
import com.mongodb.internal.tracing.TracingManager;
5457
import com.mongodb.lang.Nullable;
5558
import org.bson.BsonBinaryReader;
5659
import org.bson.BsonDocument;
@@ -374,13 +377,24 @@ public boolean isClosed() {
374377
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final OperationContext operationContext) {
375378
Supplier<T> sendAndReceiveInternal = () -> sendAndReceiveInternal(
376379
message, decoder, operationContext);
380+
381+
Span tracingSpan = createTracingSpan(message, operationContext);
382+
377383
try {
378384
return sendAndReceiveInternal.get();
379385
} catch (MongoCommandException e) {
386+
if (tracingSpan != null) {
387+
tracingSpan.error(e);
388+
}
389+
380390
if (reauthenticationIsTriggered(e)) {
381391
return reauthenticateAndRetry(sendAndReceiveInternal, operationContext);
382392
}
383393
throw e;
394+
} finally {
395+
if (tracingSpan != null) {
396+
tracingSpan.end();
397+
}
384398
}
385399
}
386400

@@ -391,6 +405,7 @@ public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<
391405

392406
AsyncSupplier<T> sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal(
393407
message, decoder, operationContext, c);
408+
394409
beginAsync().<T>thenSupply(c -> {
395410
sendAndReceiveAsyncInternal.getAsync(c);
396411
}).onErrorIf(e -> reauthenticationIsTriggered(e), (t, c) -> {
@@ -872,6 +887,42 @@ public ByteBuf getBuffer(final int size) {
872887
return stream.getBuffer(size);
873888
}
874889

890+
@Nullable
891+
private Span createTracingSpan(final CommandMessage message, final OperationContext operationContext) {
892+
TracingManager tracingManager = operationContext.getTracingManager();
893+
Span span;
894+
if (tracingManager.isEnabled()) {
895+
BsonDocument command = message.getCommand();
896+
TraceContext parentContext = null;
897+
long cursorId = -1;
898+
if (command.containsKey("getMore")) {
899+
cursorId = command.getInt64("getMore").longValue();
900+
parentContext = tracingManager.getCursorParentContext(cursorId);
901+
} else {
902+
parentContext = tracingManager.getParentContext(operationContext.getId());
903+
}
904+
905+
span = tracingManager.addSpan("Command " + command.getFirstKey(), parentContext);
906+
span.tag("db.system", "mongodb");
907+
span.tag("db.namespace", message.getNamespace().getFullName());
908+
span.tag("db.query.summary", command.getFirstKey());
909+
span.tag("db.query.opcode", String.valueOf(message.getOpCode()));
910+
span.tag("db.query.text", command.toString());
911+
if (cursorId != -1) {
912+
span.tag("db.mongodb.cursor_id", String.valueOf(cursorId));
913+
}
914+
span.tag("server.address", serverId.getAddress().getHost());
915+
span.tag("server.port", String.valueOf(serverId.getAddress().getPort()));
916+
span.tag("server.type", message.getSettings().getServerType().name());
917+
918+
span.tag("db.mongodb.server_connection_id", this.description.getConnectionId().toString());
919+
} else {
920+
span = null;
921+
}
922+
923+
return span;
924+
}
925+
875926
private class MessageHeaderCallback implements SingleResultCallback<ByteBuf> {
876927
private final OperationContext operationContext;
877928
private final SingleResultCallback<ResponseBuffers> callback;

driver-core/src/main/com/mongodb/internal/connection/OperationContext.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.mongodb.internal.TimeoutSettings;
2828
import com.mongodb.internal.VisibleForTesting;
2929
import com.mongodb.internal.session.SessionContext;
30+
import com.mongodb.internal.tracing.TracingManager;
3031
import com.mongodb.lang.Nullable;
3132
import com.mongodb.selector.ServerSelector;
3233

@@ -51,15 +52,17 @@ public class OperationContext {
5152
private final ServerApi serverApi;
5253
@Nullable
5354
private final String operationName;
55+
private final TracingManager tracingManager;
5456

5557
public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext,
5658
@Nullable final ServerApi serverApi) {
57-
this(requestContext, sessionContext, timeoutContext, serverApi, null);
59+
this(requestContext, sessionContext, timeoutContext, serverApi, null, TracingManager.NO_OP);
5860
}
5961

6062
public OperationContext(final RequestContext requestContext, final SessionContext sessionContext, final TimeoutContext timeoutContext,
61-
@Nullable final ServerApi serverApi, @Nullable final String operationName) {
62-
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi, operationName);
63+
@Nullable final ServerApi serverApi, @Nullable final String operationName, final TracingManager tracingManager) {
64+
this(NEXT_ID.incrementAndGet(), requestContext, sessionContext, timeoutContext, new ServerDeprioritization(), serverApi,
65+
operationName, tracingManager);
6366
}
6467

6568
public static OperationContext simpleOperationContext(
@@ -69,7 +72,8 @@ public static OperationContext simpleOperationContext(
6972
NoOpSessionContext.INSTANCE,
7073
new TimeoutContext(timeoutSettings),
7174
serverApi,
72-
null);
75+
null,
76+
TracingManager.NO_OP);
7377
}
7478

7579
public static OperationContext simpleOperationContext(final TimeoutContext timeoutContext) {
@@ -78,25 +82,30 @@ public static OperationContext simpleOperationContext(final TimeoutContext timeo
7882
NoOpSessionContext.INSTANCE,
7983
timeoutContext,
8084
null,
81-
null);
85+
null,
86+
TracingManager.NO_OP);
8287
}
8388

8489
public OperationContext withSessionContext(final SessionContext sessionContext) {
85-
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName);
90+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName, tracingManager);
8691
}
8792

8893
public OperationContext withTimeoutContext(final TimeoutContext timeoutContext) {
89-
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName);
94+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName, tracingManager);
9095
}
9196

9297
public OperationContext withOperationName(final String operationName) {
93-
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName);
98+
return new OperationContext(id, requestContext, sessionContext, timeoutContext, serverDeprioritization, serverApi, operationName, tracingManager);
9499
}
95100

96101
public long getId() {
97102
return id;
98103
}
99104

105+
public TracingManager getTracingManager() {
106+
return tracingManager;
107+
}
108+
100109
public SessionContext getSessionContext() {
101110
return sessionContext;
102111
}
@@ -126,14 +135,16 @@ public OperationContext(final long id,
126135
final TimeoutContext timeoutContext,
127136
final ServerDeprioritization serverDeprioritization,
128137
@Nullable final ServerApi serverApi,
129-
@Nullable final String operationName) {
138+
@Nullable final String operationName,
139+
final TracingManager tracingManager) {
130140
this.id = id;
131141
this.serverDeprioritization = serverDeprioritization;
132142
this.requestContext = requestContext;
133143
this.sessionContext = sessionContext;
134144
this.timeoutContext = timeoutContext;
135145
this.serverApi = serverApi;
136146
this.operationName = operationName;
147+
this.tracingManager = tracingManager;
137148
}
138149

139150
@VisibleForTesting(otherwise = VisibleForTesting.AccessModifier.PRIVATE)
@@ -142,14 +153,16 @@ public OperationContext(final long id,
142153
final SessionContext sessionContext,
143154
final TimeoutContext timeoutContext,
144155
@Nullable final ServerApi serverApi,
145-
@Nullable final String operationName) {
156+
@Nullable final String operationName,
157+
final TracingManager tracingManager) {
146158
this.id = id;
147159
this.serverDeprioritization = new ServerDeprioritization();
148160
this.requestContext = requestContext;
149161
this.sessionContext = sessionContext;
150162
this.timeoutContext = timeoutContext;
151163
this.serverApi = serverApi;
152164
this.operationName = operationName;
165+
this.tracingManager = tracingManager;
153166
}
154167

155168

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
7575
@Nullable
7676
private List<T> nextBatch;
7777
private boolean resetTimeoutWhenClosing;
78+
private final long cursorId;
7879

7980
CommandBatchCursor(
8081
final TimeoutMode timeoutMode,
@@ -95,10 +96,13 @@ class CommandBatchCursor<T> implements AggregateResponseBatchCursor<T> {
9596
operationContext = connectionSource.getOperationContext();
9697
this.timeoutMode = timeoutMode;
9798

99+
ServerCursor serverCursor = commandCursorResult.getServerCursor();
100+
this.cursorId = serverCursor != null ? serverCursor.getId() : -1;
101+
98102
operationContext.getTimeoutContext().setMaxTimeOverride(maxTimeMS);
99103

100104
Connection connectionToPin = connectionSource.getServerDescription().getType() == ServerType.LOAD_BALANCER ? connection : null;
101-
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, commandCursorResult.getServerCursor());
105+
resourceManager = new ResourceManager(namespace, connectionSource, connectionToPin, serverCursor);
102106
resetTimeoutWhenClosing = true;
103107
}
104108

@@ -169,6 +173,7 @@ public void remove() {
169173

170174
@Override
171175
public void close() {
176+
operationContext.getTracingManager().removeCursorParentContext(cursorId);
172177
resourceManager.close();
173178
}
174179

driver-core/src/main/com/mongodb/internal/operation/FindOperation.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import com.mongodb.internal.binding.AsyncReadBinding;
3131
import com.mongodb.internal.binding.ReadBinding;
3232
import com.mongodb.internal.connection.OperationContext;
33+
import com.mongodb.internal.tracing.Span;
34+
import com.mongodb.internal.tracing.TracingManager;
3335
import com.mongodb.lang.Nullable;
3436
import org.bson.BsonBoolean;
3537
import org.bson.BsonDocument;
@@ -296,21 +298,35 @@ public BatchCursor<T> execute(final ReadBinding binding) {
296298
if (invalidTimeoutModeException != null) {
297299
throw invalidTimeoutModeException;
298300
}
301+
OperationContext operationContext = binding.getOperationContext();
299302

300-
RetryState retryState = initialRetryState(retryReads, binding.getOperationContext().getTimeoutContext());
301-
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, binding.getOperationContext(), () ->
303+
// Adds a Tracing Span for 'find' operation
304+
TracingManager tracingManager = operationContext.getTracingManager();
305+
Span tracingSpan = tracingManager.addSpan("find", operationContext.getId());
306+
307+
RetryState retryState = initialRetryState(retryReads, operationContext.getTimeoutContext());
308+
Supplier<BatchCursor<T>> read = decorateReadWithRetries(retryState, operationContext, () ->
302309
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
303-
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getOperationContext()));
310+
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), operationContext));
304311
try {
305-
return createReadCommandAndExecute(retryState, binding.getOperationContext(), source, namespace.getDatabaseName(),
312+
return createReadCommandAndExecute(retryState, operationContext, source, namespace.getDatabaseName(),
306313
getCommandCreator(), CommandResultDocumentCodec.create(decoder, FIRST_BATCH),
307314
transformer(), connection);
308315
} catch (MongoCommandException e) {
309316
throw new MongoQueryException(e.getResponse(), e.getServerAddress());
310317
}
311318
})
312319
);
313-
return read.get();
320+
try {
321+
return read.get();
322+
} catch (MongoQueryException e) {
323+
tracingSpan.error(e);
324+
throw e;
325+
} finally {
326+
tracingSpan.end();
327+
// Clean up the tracing span after the operation is complete
328+
tracingManager.cleanContexts(operationContext.getId());
329+
}
314330
}
315331

316332
@Override
@@ -473,9 +489,17 @@ private TimeoutMode getTimeoutMode() {
473489
}
474490

475491
private CommandReadTransformer<BsonDocument, CommandBatchCursor<T>> transformer() {
476-
return (result, source, connection) ->
477-
new CommandBatchCursor<>(getTimeoutMode(), result, batchSize, getMaxTimeForCursor(source.getOperationContext()), decoder,
478-
comment, source, connection);
492+
return (result, source, connection) -> {
493+
OperationContext operationContext = source.getOperationContext();
494+
495+
// register cursor id with the operation context, so 'getMore' commands can be folded under the 'find' operation
496+
long cursorId = result.getDocument("cursor").getInt64("id").longValue();
497+
TracingManager tracingManager = operationContext.getTracingManager();
498+
tracingManager.addCursorParentContext(cursorId, operationContext.getId());
499+
500+
return new CommandBatchCursor<>(getTimeoutMode(), result, batchSize, getMaxTimeForCursor(operationContext), decoder,
501+
comment, source, connection);
502+
};
479503
}
480504

481505
private CommandReadTransformerAsync<BsonDocument, AsyncBatchCursor<T>> asyncTransformer() {

0 commit comments

Comments
 (0)