From 498fe16d81d0ea9247ba984104ee8d3aab897480 Mon Sep 17 00:00:00 2001 From: Yuri Mizushima Date: Thu, 2 Oct 2025 12:17:42 +0900 Subject: [PATCH] feat: add publish latency histogram as otel metrics --- .../pulsar/broker/service/AbstractTopic.java | 12 +------- .../pulsar/broker/service/PulsarStats.java | 5 ++++ .../stats/BrokerOperabilityMetrics.java | 30 +++++++++++++++++++ .../stats/BrokerOpenTelemetryTestUtil.java | 15 ++++++++++ ...enTelemetryBrokerOperabilityStatsTest.java | 26 ++++++++++++++++ 5 files changed, 77 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index e8253771eded4..3ec6f5a0cd5e6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -67,7 +67,6 @@ import org.apache.pulsar.broker.service.schema.SchemaRegistryService; import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; import org.apache.pulsar.broker.service.schema.exceptions.SchemaException; -import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -899,7 +898,7 @@ private CompletableFuture> handleTopicEpochForExclusiveProducer(P public void recordAddLatency(long latency, TimeUnit unit) { addEntryLatencyStatsUsec.addValue(unit.toMicros(latency)); - PUBLISH_LATENCY.observe(latency, unit); + brokerService.getPulsarStats().recordPublishLatency(latency, unit); } @Override @@ -908,15 +907,6 @@ public long increasePublishLimitedTimes() { return RATE_LIMITED_UPDATER.incrementAndGet(this); } - private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-") - .quantile(0.0) - .quantile(0.50) - .quantile(0.95) - .quantile(0.99) - .quantile(0.999) - .quantile(0.9999) - .quantile(1.0) - .register(); @Override public void incrementPublishCount(Producer producer, int numOfMessages, long msgSizeInBytes) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java index b96e00a8909d6..45a0e8b42f0cb 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; import lombok.Getter; @@ -280,4 +281,8 @@ public void recordConnectionCreateSuccess() { public void recordConnectionCreateFail() { brokerOperabilityMetrics.recordConnectionCreateFail(); } + + public void recordPublishLatency(long latency, TimeUnit unit) { + brokerOperabilityMetrics.recordPublishLatency(latency, unit); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java index 1855e1798b465..310c14d4afa33 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java @@ -18,18 +18,22 @@ */ package org.apache.pulsar.broker.stats; +import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.ObservableLongCounter; import io.prometheus.client.Counter; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.LongAdder; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; import org.apache.pulsar.common.stats.Metrics; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus; import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus; +import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric; /** */ @@ -54,6 +58,19 @@ public class BrokerOperabilityMetrics implements AutoCloseable { "pulsar.broker.connection.create.operation.count"; private final ObservableLongCounter connectionCreateCounter; + public static final String TOPIC_PUBLISH_LATENCY_METRIC_NAME = "pulsar.broker.topic.publish.latency"; + private final DoubleHistogram topicPublishLatencyHistogram; + @PulsarDeprecatedMetric(newMetricName = TOPIC_PUBLISH_LATENCY_METRIC_NAME) + private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-") + .quantile(0.0) + .quantile(0.50) + .quantile(0.95) + .quantile(0.99) + .quantile(0.999) + .quantile(0.9999) + .quantile(1.0) + .register(); + public BrokerOperabilityMetrics(PulsarService pulsar) { this.metricsList = new ArrayList<>(); this.localCluster = pulsar.getConfiguration().getClusterName(); @@ -87,6 +104,14 @@ public BrokerOperabilityMetrics(PulsarService pulsar) { measurement.record(connectionCreateSuccessCount.sum(), ConnectionCreateStatus.SUCCESS.attributes); measurement.record(connectionCreateFailCount.sum(), ConnectionCreateStatus.FAILURE.attributes); }); + + this.topicPublishLatencyHistogram = pulsar.getOpenTelemetry().getMeter() + .histogramBuilder(TOPIC_PUBLISH_LATENCY_METRIC_NAME) + .setUnit("s") + .setDescription("The latency in seconds for publishing messages") + .setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1, + 0.2, 0.5, 1.0, 5.0, 30.0)) + .build(); } @Override @@ -195,4 +220,9 @@ public void recordHealthCheckStatusSuccess() { public void recordHealthCheckStatusFail() { this.healthCheckStatus = 0; } + + public void recordPublishLatency(long latency, TimeUnit unit) { + this.topicPublishLatencyHistogram.record(unit.toMillis(latency) / 1000.0); + PUBLISH_LATENCY.observe(latency, unit); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java index 3bfbf2064e156..76cad804e1ed3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java @@ -121,4 +121,19 @@ public static void assertMetricDoubleGaugeValue(Collection metrics, valueConsumer.accept(point.getValue()); })))); } + + public static void assertMetricHistogramValue(Collection metrics, String metricName, + Attributes attributes, Consumer countConsumer, + Consumer sumConsumer) { + final Map, Object> attributesMap = attributes.asMap(); + assertThat(metrics).anySatisfy(metric -> assertThat(metric) + .hasName(metricName) + .hasHistogramSatisfying(histogram -> histogram.satisfies( + histoData -> assertThat(histoData.getPoints()).anySatisfy( + point -> { + assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap); + countConsumer.accept(point.getCount()); + sumConsumer.accept(point.getSum()); + })))); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java index 4378e6b05b3ee..e197f3bc62192 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java @@ -18,8 +18,11 @@ */ package org.apache.pulsar.broker.stats; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat; +import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue; import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import io.opentelemetry.api.common.Attributes; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import org.apache.pulsar.broker.BrokerTestUtil; @@ -101,4 +104,27 @@ public void testBrokerConnection() throws Exception { assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME, ConnectionCreateStatus.FAILURE.attributes, 1); } + + @Test + public void testPublishLatency() throws Exception { + final var topicName = BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testPublishLatency"); + @Cleanup + final var producer = pulsarClient.newProducer().topic(topicName).create(); + + producer.send(("msg").getBytes()); + + var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricHistogramValue(metrics, BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME, + Attributes.empty(), count -> assertThat(count).isEqualTo(1L), + sum -> assertThat(sum).isGreaterThan(0.0)); + + for (int i = 0; i < 9; i++) { + producer.send(("msg-" + i).getBytes()); + } + + metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics(); + assertMetricHistogramValue(metrics, BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME, + Attributes.empty(), count -> assertThat(count).isEqualTo(10L), + sum -> assertThat(sum).isGreaterThan(0.0)); + } }