Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.service.schema.exceptions.SchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -899,7 +898,7 @@ private CompletableFuture<Optional<Long>> handleTopicEpochForExclusiveProducer(P
public void recordAddLatency(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));

PUBLISH_LATENCY.observe(latency, unit);
brokerService.getPulsarStats().recordPublishLatency(latency, unit);
}

@Override
Expand All @@ -908,15 +907,6 @@ public long increasePublishLimitedTimes() {
return RATE_LIMITED_UPDATER.incrementAndGet(this);
}

private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
.quantile(0.95)
.quantile(0.99)
.quantile(0.999)
.quantile(0.9999)
.quantile(1.0)
.register();

@Override
public void incrementPublishCount(Producer producer, int numOfMessages, long msgSizeInBytes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import lombok.Getter;
Expand Down Expand Up @@ -280,4 +281,8 @@ public void recordConnectionCreateSuccess() {
public void recordConnectionCreateFail() {
brokerOperabilityMetrics.recordConnectionCreateFail();
}

public void recordPublishLatency(long latency, TimeUnit unit) {
brokerOperabilityMetrics.recordPublishLatency(latency, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@
*/
package org.apache.pulsar.broker.stats;

import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.ObservableLongCounter;
import io.prometheus.client.Counter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionCreateStatus;
import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.ConnectionStatus;
import org.apache.pulsar.opentelemetry.annotations.PulsarDeprecatedMetric;

/**
*/
Expand All @@ -54,6 +58,19 @@ public class BrokerOperabilityMetrics implements AutoCloseable {
"pulsar.broker.connection.create.operation.count";
private final ObservableLongCounter connectionCreateCounter;

public static final String TOPIC_PUBLISH_LATENCY_METRIC_NAME = "pulsar.broker.topic.publish.latency";
private final DoubleHistogram topicPublishLatencyHistogram;
@PulsarDeprecatedMetric(newMetricName = TOPIC_PUBLISH_LATENCY_METRIC_NAME)
private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
.quantile(0.95)
.quantile(0.99)
.quantile(0.999)
.quantile(0.9999)
.quantile(1.0)
.register();

public BrokerOperabilityMetrics(PulsarService pulsar) {
this.metricsList = new ArrayList<>();
this.localCluster = pulsar.getConfiguration().getClusterName();
Expand Down Expand Up @@ -87,6 +104,14 @@ public BrokerOperabilityMetrics(PulsarService pulsar) {
measurement.record(connectionCreateSuccessCount.sum(), ConnectionCreateStatus.SUCCESS.attributes);
measurement.record(connectionCreateFailCount.sum(), ConnectionCreateStatus.FAILURE.attributes);
});

this.topicPublishLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
.histogramBuilder(TOPIC_PUBLISH_LATENCY_METRIC_NAME)
.setUnit("s")
.setDescription("The latency in seconds for publishing messages")
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
0.2, 0.5, 1.0, 5.0, 30.0))
.build();
}

@Override
Expand Down Expand Up @@ -195,4 +220,9 @@ public void recordHealthCheckStatusSuccess() {
public void recordHealthCheckStatusFail() {
this.healthCheckStatus = 0;
}

public void recordPublishLatency(long latency, TimeUnit unit) {
this.topicPublishLatencyHistogram.record(unit.toMillis(latency) / 1000.0);
PUBLISH_LATENCY.observe(latency, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,19 @@ public static void assertMetricDoubleGaugeValue(Collection<MetricData> metrics,
valueConsumer.accept(point.getValue());
}))));
}

public static void assertMetricHistogramValue(Collection<MetricData> metrics, String metricName,
Attributes attributes, Consumer<Long> countConsumer,
Consumer<Double> sumConsumer) {
final Map<AttributeKey<?>, Object> attributesMap = attributes.asMap();
assertThat(metrics).anySatisfy(metric -> assertThat(metric)
.hasName(metricName)
.hasHistogramSatisfying(histogram -> histogram.satisfies(
histoData -> assertThat(histoData.getPoints()).anySatisfy(
point -> {
assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap);
countConsumer.accept(point.getCount());
sumConsumer.accept(point.getSum());
}))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
*/
package org.apache.pulsar.broker.stats;

import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import io.opentelemetry.api.common.Attributes;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.broker.BrokerTestUtil;
Expand Down Expand Up @@ -101,4 +104,27 @@ public void testBrokerConnection() throws Exception {
assertMetricLongSumValue(metrics, BrokerOperabilityMetrics.CONNECTION_CREATE_COUNTER_METRIC_NAME,
ConnectionCreateStatus.FAILURE.attributes, 1);
}

@Test
public void testPublishLatency() throws Exception {
final var topicName = BrokerTestUtil.newUniqueName("persistent://my-namespace/use/my-ns/testPublishLatency");
@Cleanup
final var producer = pulsarClient.newProducer().topic(topicName).create();

producer.send(("msg").getBytes());

var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricHistogramValue(metrics, BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
Attributes.empty(), count -> assertThat(count).isEqualTo(1L),
sum -> assertThat(sum).isGreaterThan(0.0));

for (int i = 0; i < 9; i++) {
producer.send(("msg-" + i).getBytes());
}

metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricHistogramValue(metrics, BrokerOperabilityMetrics.TOPIC_PUBLISH_LATENCY_METRIC_NAME,
Attributes.empty(), count -> assertThat(count).isEqualTo(10L),
sum -> assertThat(sum).isGreaterThan(0.0));
}
}
Loading