Skip to content

Commit f65a473

Browse files
committed
feat: add ml write latency histogram and entry size histogram as otel metrics
1 parent 771ce41 commit f65a473

File tree

6 files changed

+124
-0
lines changed

6 files changed

+124
-0
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
public class ManagedLedgerAttributes {
2929

3030
private final Attributes attributes;
31+
private final Attributes attributesOnlyNamespace;
3132
private final Attributes attributesOperationSucceed;
3233
private final Attributes attributesOperationFailure;
3334

@@ -37,6 +38,9 @@ public ManagedLedgerAttributes(ManagedLedger ml) {
3738
OpenTelemetryAttributes.ML_NAME, mlName,
3839
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
3940
);
41+
attributesOnlyNamespace = Attributes.of(
42+
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
43+
);
4044
attributesOperationSucceed = Attributes.builder()
4145
.putAll(attributes)
4246
.putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
138138
private final MetadataStore metadataStore;
139139

140140
private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
141+
@Getter
141142
private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats;
142143
private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats;
143144

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public void refreshStats(long period, TimeUnit unit) {
8787
public void addAddEntrySample(long size) {
8888
addEntryOps.recordEvent(size);
8989
entryStats.addValue(size);
90+
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
91+
.recordEntrySize(size, managedLedger);
9092
addEntryWithReplicasOps.recordEvent(size * managedLedger.getConfig().getWriteQuorumSize());
9193
}
9294

@@ -108,14 +110,20 @@ public void recordReadEntriesOpsCacheMisses(int count, long totalSize) {
108110

109111
public void addAddEntryLatencySample(long latency, TimeUnit unit) {
110112
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
113+
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
114+
.recordAddEntryLatency(latency, unit, managedLedger);
111115
}
112116

113117
public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) {
114118
ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
119+
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
120+
.recordLedgerAddEntryLatency(latency, unit, managedLedger);
115121
}
116122

117123
public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) {
118124
ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency));
125+
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
126+
.recordLedgerSwitchLatency(latency, unit, managedLedger);
119127
}
120128

121129
public void addReadEntriesSample(int count, long totalSize) {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,18 @@
2020

2121
import io.opentelemetry.api.OpenTelemetry;
2222
import io.opentelemetry.api.metrics.BatchCallback;
23+
import io.opentelemetry.api.metrics.DoubleHistogram;
24+
import io.opentelemetry.api.metrics.LongHistogram;
2325
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
26+
import java.util.Arrays;
27+
import java.util.concurrent.TimeUnit;
2428
import org.apache.bookkeeper.mledger.ManagedLedger;
2529
import org.apache.pulsar.opentelemetry.Constants;
2630

2731
public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
2832

33+
// ml-level metrics
34+
2935
// Replaces pulsar_ml_AddEntryMessagesRate
3036
public static final String ADD_ENTRY_COUNTER = "pulsar.broker.managed_ledger.message.outgoing.count";
3137
private final ObservableLongMeasurement addEntryCounter;
@@ -62,6 +68,29 @@ public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
6268

6369
private final BatchCallback batchCallback;
6470

71+
// namespace-level metrics
72+
73+
// Replaces ['pulsar_ml_AddEntryLatencyBuckets', 'pulsar_ml_AddEntryLatencyBuckets_OVERFLOW',
74+
// 'pulsar_storage_write_latency_*']
75+
public static final String ADD_ENTRY_LATENCY_HISTOGRAM = "pulsar.broker.managed_ledger.message.outgoing.latency";
76+
private final DoubleHistogram addEntryLatencyHistogram;
77+
78+
// Replaces ['pulsar_ml_LedgerAddEntryLatencyBuckets', 'pulsar_ml_LedgerAddEntryLatencyBuckets_OVERFLOW',
79+
// 'pulsar_storage_ledger_write_latency_*']
80+
public static final String LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM =
81+
"pulsar.broker.managed_ledger.message.outgoing.ledger.latency";
82+
private final DoubleHistogram ledgerAddEntryLatencyHistogram;
83+
84+
// Replaces ['pulsar_ml_LedgerSwitchLatencyBuckets', 'pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW']
85+
public static final String LEDGER_SWITCH_LATENCY_HISTOGRAM =
86+
"pulsar.broker.managed_ledger.ledger.switch.latency";
87+
private final DoubleHistogram ledgerSwitchLatencyHistogram;
88+
89+
// Replaces ['pulsar_ml_EntrySizeBuckets', 'pulsar_ml_EntrySizeBuckets_OVERFLOW',
90+
// 'pulsar_entry_size_*']
91+
public static final String ENTRY_SIZE_HISTOGRAM = "pulsar.broker.managed_ledger.entry.size";
92+
private final LongHistogram entrySizeHistogram;
93+
6594
public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
6695
var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
6796

@@ -124,6 +153,39 @@ public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedge
124153
bytesInCounter,
125154
readEntryCacheMissCounter,
126155
markDeleteCounter);
156+
157+
addEntryLatencyHistogram = meter
158+
.histogramBuilder(ADD_ENTRY_LATENCY_HISTOGRAM)
159+
.setDescription("End-to-end write latency, including time spent in the executor queue.")
160+
.setUnit("s")
161+
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
162+
0.2, 0.5, 1.0, 5.0, 30.0))
163+
.build();
164+
165+
ledgerAddEntryLatencyHistogram = meter
166+
.histogramBuilder(LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM)
167+
.setDescription("End-to end write latency.")
168+
.setUnit("s")
169+
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
170+
0.2, 0.5, 1.0, 5.0, 30.0))
171+
.build();
172+
173+
ledgerSwitchLatencyHistogram = meter
174+
.histogramBuilder(LEDGER_SWITCH_LATENCY_HISTOGRAM)
175+
.setDescription("Time taken to switch to a new ledger.")
176+
.setUnit("s")
177+
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
178+
0.2, 0.5, 1.0, 5.0, 30.0))
179+
.build();
180+
181+
entrySizeHistogram = meter
182+
.histogramBuilder(ENTRY_SIZE_HISTOGRAM)
183+
.ofLongs()
184+
.setDescription("Size of entries written to the ledger.")
185+
.setUnit("By")
186+
.setExplicitBucketBoundariesAdvice(Arrays.asList(128L, 512L, 1024L, 2048L, 4096L, 16_384L,
187+
102_400L, 1_048_576L))
188+
.build();
127189
}
128190

129191
@Override
@@ -151,4 +213,24 @@ private void recordMetrics(ManagedLedger ml) {
151213
markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
152214
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), attributes);
153215
}
216+
217+
void recordAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
218+
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
219+
this.addEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
220+
}
221+
222+
void recordLedgerAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
223+
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
224+
this.ledgerAddEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
225+
}
226+
227+
void recordLedgerSwitchLatency(long latency, TimeUnit unit, ManagedLedger ml) {
228+
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
229+
this.ledgerSwitchLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
230+
}
231+
232+
void recordEntrySize(long entrySize, ManagedLedger ml) {
233+
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
234+
this.entrySizeHistogram.record(entrySize, attributes);
235+
}
154236
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/BrokerOpenTelemetryTestUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,19 @@ public static void assertMetricDoubleGaugeValue(Collection<MetricData> metrics,
121121
valueConsumer.accept(point.getValue());
122122
}))));
123123
}
124+
125+
public static void assertMetricHistogramValue(Collection<MetricData> metrics, String metricName,
126+
Attributes attributes, Consumer<Long> countConsumer,
127+
Consumer<Double> sumConsumer) {
128+
final Map<AttributeKey<?>, Object> attributesMap = attributes.asMap();
129+
assertThat(metrics).anySatisfy(metric -> assertThat(metric)
130+
.hasName(metricName)
131+
.hasHistogramSatisfying(histogram -> histogram.satisfies(
132+
histoData -> assertThat(histoData.getPoints()).anySatisfy(
133+
point -> {
134+
assertThat(point.getAttributes().asMap()).isEqualTo(attributesMap);
135+
countConsumer.accept(point.getCount());
136+
sumConsumer.accept(point.getSum());
137+
}))));
138+
}
124139
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.stats;
2020

21+
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
2122
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
2223
import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
2324
import static org.assertj.core.api.Assertions.assertThat;
@@ -135,6 +136,9 @@ public void testManagedLedgerMetrics() throws Exception {
135136
OpenTelemetryAttributes.ML_NAME, mlName,
136137
OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace()
137138
);
139+
final var attribOnlyNamespace = Attributes.of(
140+
OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace()
141+
);
138142
var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
139143

140144
Awaitility.await().untilAsserted(() -> {
@@ -189,6 +193,16 @@ public void testManagedLedgerMetrics() throws Exception {
189193
value -> assertThat(value).isGreaterThanOrEqualTo(0));
190194
assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER,
191195
attribCommon, value -> assertThat(value).isGreaterThanOrEqualTo(0));
196+
197+
assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ADD_ENTRY_LATENCY_HISTOGRAM,
198+
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
199+
sum -> assertThat(sum).isGreaterThan(0.0));
200+
assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM,
201+
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
202+
sum -> assertThat(sum).isGreaterThan(0.0));
203+
assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ENTRY_SIZE_HISTOGRAM,
204+
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
205+
sum -> assertThat(sum).isGreaterThan(0.0));
192206
});
193207
}
194208

0 commit comments

Comments
 (0)