Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/instrumentation-list.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1933,7 +1933,7 @@ libraries:
type: boolean
default: true
- name: otel.instrumentation.kafka.experimental-span-attributes
description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms"
description: Enables the capture of the experimental consumer attributes "kafka.record.queue_time_ms" and "messaging.kafka.bootstrap.servers"
type: boolean
default: false
- name: kafka-clients-2.6
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.consumerReceiveInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
Expand All @@ -18,14 +19,19 @@
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

Expand All @@ -38,6 +44,10 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class).or(takesArgument(0, Properties.class))),
this.getClass().getName() + "$ConstructorAdvice");

transformer.applyAdviceToMethod(
named("poll")
.and(isPublic())
Expand All @@ -47,6 +57,26 @@ public void transform(TypeTransformer transformer) {
this.getClass().getName() + "$PollAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This Consumer<?, ?> consumer, @Advice.Argument(0) Object configs) {

Object bootstrapServersConfig = null;
if (configs instanceof Map) {
bootstrapServersConfig = ((Map<?, ?>) configs).get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
}

if (bootstrapServersConfig != null
&& KafkaSingletons.CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(consumer) == null) {
List<String> bootstrapServers = KafkaUtil.parseBootstrapServers(bootstrapServersConfig);
KafkaSingletons.CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.set(consumer, bootstrapServers);
}
}
}

@SuppressWarnings("unused")
public static class PollAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11;

import static io.opentelemetry.javaagent.instrumentation.kafkaclients.v0_11.KafkaSingletons.producerInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
Expand All @@ -15,14 +16,20 @@
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaPropagation;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
import io.opentelemetry.javaagent.bootstrap.Java8BytecodeBridge;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerInstrumentation implements TypeInstrumentation {
Expand All @@ -34,6 +41,10 @@ public ElementMatcher<TypeDescription> typeMatcher() {

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor().and(takesArgument(0, Map.class).or(takesArgument(0, Properties.class))),
this.getClass().getName() + "$ConstructorAdvice");

transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
Expand All @@ -43,19 +54,43 @@ public void transform(TypeTransformer transformer) {
KafkaProducerInstrumentation.class.getName() + "$SendAdvice");
}

@SuppressWarnings("unused")
public static class ConstructorAdvice {

@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This Producer<?, ?> producer, @Advice.Argument(0) Object configs) {

Object bootstrapServersConfig = null;
if (configs instanceof Map) {
bootstrapServersConfig = ((Map<?, ?>) configs).get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
}

if (bootstrapServersConfig != null
&& KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(producer) == null) {
List<String> bootstrapServers = KafkaUtil.parseBootstrapServers(bootstrapServersConfig);
KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.set(producer, bootstrapServers);
}
}
}

@SuppressWarnings("unused")
public static class SendAdvice {

@Advice.OnMethodEnter(suppress = Throwable.class)
public static KafkaProducerRequest onEnter(
@Advice.This Producer<?, ?> producer,
@Advice.FieldValue("apiVersions") ApiVersions apiVersions,
@Advice.FieldValue("clientId") String clientId,
@Advice.Argument(value = 0, readOnly = false) ProducerRecord<?, ?> record,
@Advice.Argument(value = 1, readOnly = false) Callback callback,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
List<String> bootstrapServers =
KafkaSingletons.PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD.get(producer);
KafkaProducerRequest request =
KafkaProducerRequest.create(record, clientId, bootstrapServers);
Context parentContext = Java8BytecodeBridge.currentContext();
if (!producerInstrumenter().shouldStart(parentContext, request)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaInstrumenterFactory;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProcessRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaProducerRequest;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaReceiveRequest;
import io.opentelemetry.javaagent.bootstrap.internal.AgentInstrumentationConfig;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;

public final class KafkaSingletons {
Expand All @@ -26,6 +30,11 @@ public final class KafkaSingletons {
private static final Instrumenter<KafkaReceiveRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER;
private static final Instrumenter<KafkaProcessRequest, Void> CONSUMER_PROCESS_INSTRUMENTER;

public static final VirtualField<Consumer<?, ?>, List<String>>
CONSUMER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD = VirtualField.find(Consumer.class, List.class);
public static final VirtualField<Producer<?, ?>, List<String>>
PRODUCER_BOOTSTRAP_SERVERS_VIRTUAL_FIELD = VirtualField.find(Producer.class, List.class);

static {
KafkaInstrumenterFactory instrumenterFactory =
new KafkaInstrumenterFactory(GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ configurations:
type: boolean
default: true
- name: otel.instrumentation.kafka.experimental-span-attributes
description: Enables the capture of the experimental consumer attribute "kafka.record.queue_time_ms"
description: Enables the capture of the experimental consumer attributes "kafka.record.queue_time_ms" and "messaging.kafka.bootstrap.servers"
type: boolean
default: false
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static org.assertj.core.api.Assertions.assertThat;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
Expand Down Expand Up @@ -66,6 +67,20 @@ public abstract class KafkaClientBaseTest {
protected static final String SHARED_TOPIC = "shared.topic";
protected static final AttributeKey<String> MESSAGING_CLIENT_ID =
AttributeKey.stringKey("messaging.client_id");
protected static final AttributeKey<List<String>> MESSAGING_KAFKA_BOOTSTRAP_SERVERS =
AttributeKey.stringArrayKey("messaging.kafka.bootstrap.servers");

protected static AttributeAssertion bootstrapServersAssertion() {
return satisfies(
MESSAGING_KAFKA_BOOTSTRAP_SERVERS,
listAssert -> {
if (Boolean.getBoolean("otel.instrumentation.kafka.experimental-span-attributes")) {
listAssert.isNotEmpty().allSatisfy(server -> assertThat(server).isNotEmpty());
} else {
listAssert.isNullOrEmpty();
}
});
}

private KafkaContainer kafka;
protected Producer<Integer, String> producer;
Expand Down Expand Up @@ -177,6 +192,7 @@ protected static List<AttributeAssertion> sendAttributes(
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "publish"),
bootstrapServersAssertion(),
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("producer")),
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative)));
Expand All @@ -203,6 +219,7 @@ protected static List<AttributeAssertion> receiveAttributes(boolean testHeaders)
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "receive"),
bootstrapServersAssertion(),
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isPositive)));
// consumer group is not available in version 0.11
Expand All @@ -227,6 +244,7 @@ protected static List<AttributeAssertion> processAttributes(
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "process"),
bootstrapServersAssertion(),
satisfies(MESSAGING_CLIENT_ID, stringAssert -> stringAssert.startsWith("consumer")),
satisfies(MESSAGING_DESTINATION_PARTITION_ID, AbstractStringAssert::isNotEmpty),
satisfies(MESSAGING_KAFKA_MESSAGE_OFFSET, AbstractLongAssert::isNotNegative),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,11 @@ <K, V> ConsumerRecords<K, V> addTracing(
*
* @param record the producer record to inject span info.
*/
<K, V> void buildAndInjectSpan(ProducerRecord<K, V> record, String clientId) {
<K, V> void buildAndInjectSpan(
ProducerRecord<K, V> record, String clientId, List<String> bootstrapServers) {
Context parentContext = Context.current();

KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId);
KafkaProducerRequest request = KafkaProducerRequest.create(record, clientId, bootstrapServers);
if (!producerInstrumenter.shouldStart(parentContext, request)) {
return;
}
Expand Down Expand Up @@ -262,16 +263,25 @@ <K, V> Future<RecordMetadata> buildAndInjectSpan(
private <K, V> Context buildAndFinishSpan(
ConsumerRecords<K, V> records, Consumer<K, V> consumer, Timer timer) {
return buildAndFinishSpan(
records, KafkaUtil.getConsumerGroup(consumer), KafkaUtil.getClientId(consumer), timer);
records,
KafkaUtil.getConsumerGroup(consumer),
KafkaUtil.getClientId(consumer),
KafkaUtil.getBootstrapServers(consumer),
timer);
}

<K, V> Context buildAndFinishSpan(
ConsumerRecords<K, V> records, String consumerGroup, String clientId, Timer timer) {
ConsumerRecords<K, V> records,
String consumerGroup,
String clientId,
List<String> bootstrapServers,
Timer timer) {
if (records.isEmpty()) {
return null;
}
Context parentContext = Context.current();
KafkaReceiveRequest request = KafkaReceiveRequest.create(records, consumerGroup, clientId);
KafkaReceiveRequest request =
KafkaReceiveRequest.create(records, consumerGroup, clientId, bootstrapServers);
Context context = null;
if (consumerReceiveInstrumenter.shouldStart(parentContext, request)) {
context =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContext;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaConsumerContextUtil;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.ConsumerConfig;
Expand All @@ -34,6 +36,7 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.build();

private List<String> bootstrapServers;
private String consumerGroup;
private String clientId;

Expand All @@ -42,12 +45,13 @@ public class TracingConsumerInterceptor<K, V> implements ConsumerInterceptor<K,
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
// timer should be started before fetching ConsumerRecords, but there is no callback for that
Timer timer = Timer.start();
Context receiveContext = telemetry.buildAndFinishSpan(records, consumerGroup, clientId, timer);
Context receiveContext =
telemetry.buildAndFinishSpan(records, consumerGroup, clientId, bootstrapServers, timer);
if (receiveContext == null) {
receiveContext = Context.current();
}
KafkaConsumerContext consumerContext =
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId);
KafkaConsumerContextUtil.create(receiveContext, consumerGroup, clientId, bootstrapServers);
return telemetry.addTracing(records, consumerContext);
}

Expand All @@ -59,6 +63,8 @@ public void close() {}

@Override
public void configure(Map<String, ?> configs) {
bootstrapServers =
KafkaUtil.parseBootstrapServers(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
consumerGroup = Objects.toString(configs.get(ConsumerConfig.GROUP_ID_CONFIG), null);
clientId = Objects.toString(configs.get(ConsumerConfig.CLIENT_ID_CONFIG), null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.kafkaclients.common.v0_11.internal.KafkaUtil;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
Expand All @@ -24,12 +26,14 @@ public class TracingProducerInterceptor<K, V> implements ProducerInterceptor<K,

private static final KafkaTelemetry telemetry = KafkaTelemetry.create(GlobalOpenTelemetry.get());

@Nullable private List<String> bootstrapServers;

@Nullable private String clientId;

@Override
@CanIgnoreReturnValue
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> producerRecord) {
telemetry.buildAndInjectSpan(producerRecord, clientId);
telemetry.buildAndInjectSpan(producerRecord, clientId, bootstrapServers);
return producerRecord;
}

Expand All @@ -42,6 +46,8 @@ public void close() {}
@Override
public void configure(Map<String, ?> map) {
clientId = Objects.toString(map.get(ProducerConfig.CLIENT_ID_CONFIG), null);
bootstrapServers =
KafkaUtil.parseBootstrapServers(map.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));

// TODO: support experimental attributes config
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ void testWrappers(boolean testHeaders) throws InterruptedException {
.setCapturedHeaders(singletonList("test-message-header"))
// TODO run tests both with and without experimental span attributes
.setCaptureExperimentalSpanAttributes(true);
System.setProperty(
"otel.instrumentation.kafka.experimental-span-attributes", String.valueOf(true));
configure(telemetryBuilder);
KafkaTelemetry telemetry = telemetryBuilder.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void assertTraces() {
equalTo(MESSAGING_SYSTEM, "kafka"),
equalTo(MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(MESSAGING_OPERATION, "publish"),
bootstrapServersAssertion(),
satisfies(
MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer"))),
Expand All @@ -52,6 +53,7 @@ void assertTraces() {
equalTo(
MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
bootstrapServersAssertion(),
satisfies(
MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
Expand Down
Loading
Loading