Skip to content

Commit 24f3814

Browse files
authored
[feat][monitor] Add publish latency histogram as OTel metrics (#24810)
1 parent 7d35d7c commit 24f3814

File tree

5 files changed

+77
-11
lines changed

5 files changed

+77
-11
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@
6767
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
6868
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
6969
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
70-
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
7170
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
7271
import org.apache.pulsar.common.naming.NamespaceName;
7372
import org.apache.pulsar.common.naming.TopicName;
@@ -899,7 +898,7 @@ private CompletableFuture<Optional<Long>> handleTopicEpochForExclusiveProducer(P
899898
public void recordAddLatency(long latency, TimeUnit unit) {
900899
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
901900

902-
PUBLISH_LATENCY.observe(latency, unit);
901+
brokerService.getPulsarStats().recordPublishLatency(latency, unit);
903902
}
904903

905904
@Override
@@ -908,15 +907,6 @@ public long increasePublishLimitedTimes() {
908907
return RATE_LIMITED_UPDATER.incrementAndGet(this);
909908
}
910909

911-
private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
912-
.quantile(0.0)
913-
.quantile(0.50)
914-
.quantile(0.95)
915-
.quantile(0.99)
916-
.quantile(0.999)
917-
.quantile(0.9999)
918-
.quantile(1.0)
919-
.register();
920910

921911
@Override
922912
public void incrementPublishCount(Producer producer, int numOfMessages, long msgSizeInBytes) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarStats.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.locks.ReentrantReadWriteLock;
3031
import java.util.function.Consumer;
3132
import lombok.Getter;
@@ -280,4 +281,8 @@ public void recordConnectionCreateSuccess() {
280281
public void recordConnectionCreateFail() {
281282
brokerOperabilityMetrics.recordConnectionCreateFail();
282283
}
284+
285+
public void recordPublishLatency(long latency, TimeUnit unit) {
286+
brokerOperabilityMetrics.recordPublishLatency(latency, unit);
287+
}
283288
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/BrokerOperabilityMetrics.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,22 @@
1818
*/
1919
package org.apache.pulsar.broker.stats;
2020

21+
import io.opentelemetry.api.metrics.DoubleHistogram;
2122
import io.opentelemetry.api.metrics.ObservableLongCounter;
2223
import io.prometheus.client.Counter;
2324
import java.util.ArrayList;
25+
import java.util.Arrays;
2426
import java.util.HashMap;
2527
import java.util.List;
2628
import java.util.Map;
2729
import java.util.concurrent.TimeUnit;
2830
import java.util.concurrent.atomic.LongAdder;
2931
import org.apache.pulsar.broker.PulsarService;
32+
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
3033
import org.apache.pulsar.common.stats.Metrics;
3134
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
3235
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
36+
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;
3337

3438
/**
3539
*/
@@ -54,6 +58,19 @@ public class BrokerOperabilityMetrics implements AutoCloseable {
5458
"pulsar.broker.connection.create.operation.count";
5559
private final ObservableLongCounter connectionCreateCounter;
5660

61+
public static final String TOPIC_PUBLISH_LATENCY_METRIC_NAME = "pulsar.broker.topic.publish.latency";
62+
private final DoubleHistogram topicPublishLatencyHistogram;
63+
@PulsarDeprecatedMetric(newMetricName = TOPIC_PUBLISH_LATENCY_METRIC_NAME)
64+
private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
65+
.quantile(0.0)
66+
.quantile(0.50)
67+
.quantile(0.95)
68+
.quantile(0.99)
69+
.quantile(0.999)
70+
.quantile(0.9999)
71+
.quantile(1.0)
72+
.register();
73+
5774
public BrokerOperabilityMetrics(PulsarService pulsar) {
5875
this.metricsList = new ArrayList<>();
5976
this.localCluster = pulsar.getConfiguration().getClusterName();
@@ -87,6 +104,14 @@ public BrokerOperabilityMetrics(PulsarService pulsar) {
87104
measurement.record(connectionCreateSuccessCount.sum(), ConnectionCreateStatus.SUCCESS.attributes);
88105
measurement.record(connectionCreateFailCount.sum(), ConnectionCreateStatus.FAILURE.attributes);
89106
});
107+
108+
this.topicPublishLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
109+
.histogramBuilder(TOPIC_PUBLISH_LATENCY_METRIC_NAME)
110+
.setUnit("s")
111+
.setDescription("The latency in seconds for publishing messages")
112+
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
113+
0.2, 0.5, 1.0, 5.0, 30.0))
114+
.build();
90115
}
91116

92117
@Override
@@ -195,4 +220,9 @@ public void recordHealthCheckStatusSuccess() {
195220
public void recordHealthCheckStatusFail() {
196221
this.healthCheckStatus = 0;
197222
}
223+
224+
public void recordPublishLatency(long latency, TimeUnit unit) {
225+
this.topicPublishLatencyHistogram.record(unit.toMillis(latency) / 1000.0);
226+
PUBLISH_LATENCY.observe(latency, unit);
227+
}
198228
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,19 @@ public static void assertMetricDoubleGaugeValue(Collection<MetricData> metrics,
121121
valueConsumer.accept(point.getValue());
122122
}))));
123123
}
124+
125+
public static void assertMetricHistogramValue(Collection<MetricData> metrics, String metricName,
126+
Attributes attributes, Consumer<Long> countConsumer,
127+
Consumer<Double> sumConsumer) {
128+
final Map<AttributeKey<?>, Object> attributesMap = attributes.asMap();
129+
assertThat(metrics).anySatisfy(metric -> assertThat(metric)
130+
.hasName(metricName)
131+
.hasHistogramSatisfying(histogram -> histogram.satisfies(
132+
histoData -> assertThat(histoData.getPoints()).anySatisfy(
133+
point -> {
134+
assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap);
135+
countConsumer.accept(point.getCount());
136+
sumConsumer.accept(point.getSum());
137+
}))));
138+
}
124139
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryBrokerOperabilityStatsTest.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818
*/
1919
package org.apache.pulsar.broker.stats;
2020

21+
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
22+
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
2123
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
2224
import static org.assertj.core.api.Assertions.assertThatThrownBy;
25+
import io.opentelemetry.api.common.Attributes;
2326
import java.util.concurrent.TimeUnit;
2427
import lombok.Cleanup;
2528
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -101,4 +104,27 @@ public void testBrokerConnection() throws Exception {
101104
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
102105
ConnectionCreateStatus.FAILURE.attributes, 1);
103106
}
107+
108+
@Test
109+
public void testPublishLatency() throws Exception {
110+
final var topicName = BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testPublishLatency");
111+
@Cleanup
112+
final var producer = pulsarClient.newProducer().topic(topicName).create();
113+
114+
producer.send(("msg").getBytes());
115+
116+
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
117+
assertMetricHistogramValue(metrics, BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
118+
Attributes.empty(), count -> assertThat(count).isEqualTo(1L),
119+
sum -> assertThat(sum).isGreaterThan(0.0));
120+
121+
for (int i = 0; i < 9; i++) {
122+
producer.send(("msg-" + i).getBytes());
123+
}
124+
125+
metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
126+
assertMetricHistogramValue(metrics, BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
127+
Attributes.empty(), count -> assertThat(count).isEqualTo(10L),
128+
sum -> assertThat(sum).isGreaterThan(0.0));
129+
}
104130
}

0 commit comments

Comments
 (0)