Skip to content

Commit 91121d3

Browse files
authored
[feat][monitor] Add ML write latency histogram and entry size histogram as OTel metrics (#24815)
1 parent 7b6f9fc commit 91121d3

File tree

5 files changed

+114
-0
lines changed

5 files changed

+114
-0
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerAttributes.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
public class ManagedLedgerAttributes {
2929

3030
private final Attributes attributes;
31+
private final Attributes attributesOnlyNamespace;
3132
private final Attributes attributesOperationSucceed;
3233
private final Attributes attributesOperationFailure;
3334

@@ -37,6 +38,9 @@ public ManagedLedgerAttributes(ManagedLedger ml) {
3738
OpenTelemetryAttributes.ML_NAME, mlName,
3839
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
3940
);
41+
attributesOnlyNamespace = Attributes.of(
42+
OpenTelemetryAttributes.PULSAR_NAMESPACE, getNamespace(mlName)
43+
);
4044
attributesOperationSucceed = Attributes.builder()
4145
.putAll(attributes)
4246
.putAll(ManagedLedgerOperationStatus.SUCCESS.attributes)

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
138138
private final MetadataStore metadataStore;
139139

140140
private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
141+
@Getter
141142
private final OpenTelemetryManagedLedgerStats openTelemetryManagedLedgerStats;
142143
private final OpenTelemetryManagedCursorStats openTelemetryManagedCursorStats;
143144

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerMBeanImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ public void refreshStats(long period, TimeUnit unit) {
8787
public void addAddEntrySample(long size) {
8888
addEntryOps.recordEvent(size);
8989
entryStats.addValue(size);
90+
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
91+
.recordEntrySize(size, managedLedger);
9092
addEntryWithReplicasOps.recordEvent(size * managedLedger.getConfig().getWriteQuorumSize());
9193
}
9294

@@ -108,14 +110,20 @@ public void recordReadEntriesOpsCacheMisses(int count, long totalSize) {
108110

109111
public void addAddEntryLatencySample(long latency, TimeUnit unit) {
110112
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
113+
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
114+
.recordAddEntryLatency(latency, unit, managedLedger);
111115
}
112116

113117
public void addLedgerAddEntryLatencySample(long latency, TimeUnit unit) {
114118
ledgerAddEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
119+
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
120+
.recordLedgerAddEntryLatency(latency, unit, managedLedger);
115121
}
116122

117123
public void addLedgerSwitchLatencySample(long latency, TimeUnit unit) {
118124
ledgerSwitchLatencyStatsUsec.addValue(unit.toMicros(latency));
125+
managedLedger.getFactory().getOpenTelemetryManagedLedgerStats()
126+
.recordLedgerSwitchLatency(latency, unit, managedLedger);
119127
}
120128

121129
public void addReadEntriesSample(int count, long totalSize) {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpenTelemetryManagedLedgerStats.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,18 @@
2020

2121
import io.opentelemetry.api.OpenTelemetry;
2222
import io.opentelemetry.api.metrics.BatchCallback;
23+
import io.opentelemetry.api.metrics.DoubleHistogram;
24+
import io.opentelemetry.api.metrics.LongHistogram;
2325
import io.opentelemetry.api.metrics.ObservableLongMeasurement;
26+
import java.util.Arrays;
27+
import java.util.concurrent.TimeUnit;
2428
import org.apache.bookkeeper.mledger.ManagedLedger;
2529
import org.apache.pulsar.opentelemetry.Constants;
2630

2731
public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
2832

33+
// ml-level metrics
34+
2935
// Replaces pulsar_ml_AddEntryMessagesRate
3036
public static final String ADD_ENTRY_COUNTER = "pulsar.broker.managed_ledger.message.outgoing.count";
3137
private final ObservableLongMeasurement addEntryCounter;
@@ -62,6 +68,34 @@ public class OpenTelemetryManagedLedgerStats implements AutoCloseable {
6268

6369
private final BatchCallback batchCallback;
6470

71+
// namespace-level metrics
72+
73+
// Histograms support only synchronous mode, so record measurements directly.
74+
// Synchronous histograms currently do not support delete operations.
75+
// Therefore, use only namespace-level attributes to avoid leaking high-cardinality attributes (e.g. topic name).
76+
// See: https://github.com/apache/pulsar/blob/master/pip/pip-264.md
77+
78+
// Replaces ['pulsar_ml_AddEntryLatencyBuckets', 'pulsar_ml_AddEntryLatencyBuckets_OVERFLOW',
79+
// 'pulsar_storage_write_latency_*']
80+
public static final String ADD_ENTRY_LATENCY_HISTOGRAM = "pulsar.broker.managed_ledger.message.outgoing.latency";
81+
private final DoubleHistogram addEntryLatencyHistogram;
82+
83+
// Replaces ['pulsar_ml_LedgerAddEntryLatencyBuckets', 'pulsar_ml_LedgerAddEntryLatencyBuckets_OVERFLOW',
84+
// 'pulsar_storage_ledger_write_latency_*']
85+
public static final String LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM =
86+
"pulsar.broker.managed_ledger.message.outgoing.ledger.latency";
87+
private final DoubleHistogram ledgerAddEntryLatencyHistogram;
88+
89+
// Replaces ['pulsar_ml_LedgerSwitchLatencyBuckets', 'pulsar_ml_LedgerSwitchLatencyBuckets_OVERFLOW']
90+
public static final String LEDGER_SWITCH_LATENCY_HISTOGRAM =
91+
"pulsar.broker.managed_ledger.ledger.switch.latency";
92+
private final DoubleHistogram ledgerSwitchLatencyHistogram;
93+
94+
// Replaces ['pulsar_ml_EntrySizeBuckets', 'pulsar_ml_EntrySizeBuckets_OVERFLOW',
95+
// 'pulsar_entry_size_*']
96+
public static final String ENTRY_SIZE_HISTOGRAM = "pulsar.broker.managed_ledger.entry.size";
97+
private final LongHistogram entrySizeHistogram;
98+
6599
public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
66100
var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
67101

@@ -124,6 +158,39 @@ public OpenTelemetryManagedLedgerStats(OpenTelemetry openTelemetry, ManagedLedge
124158
bytesInCounter,
125159
readEntryCacheMissCounter,
126160
markDeleteCounter);
161+
162+
addEntryLatencyHistogram = meter
163+
.histogramBuilder(ADD_ENTRY_LATENCY_HISTOGRAM)
164+
.setDescription("End-to-end write latency, including time spent in the executor queue.")
165+
.setUnit("s")
166+
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
167+
0.2, 0.5, 1.0, 5.0, 30.0))
168+
.build();
169+
170+
ledgerAddEntryLatencyHistogram = meter
171+
.histogramBuilder(LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM)
172+
.setDescription("End-to end write latency.")
173+
.setUnit("s")
174+
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
175+
0.2, 0.5, 1.0, 5.0, 30.0))
176+
.build();
177+
178+
ledgerSwitchLatencyHistogram = meter
179+
.histogramBuilder(LEDGER_SWITCH_LATENCY_HISTOGRAM)
180+
.setDescription("Time taken to switch to a new ledger.")
181+
.setUnit("s")
182+
.setExplicitBucketBoundariesAdvice(Arrays.asList(0.001, 0.005, 0.01, 0.02, 0.05, 0.1,
183+
0.2, 0.5, 1.0, 5.0, 30.0))
184+
.build();
185+
186+
entrySizeHistogram = meter
187+
.histogramBuilder(ENTRY_SIZE_HISTOGRAM)
188+
.ofLongs()
189+
.setDescription("Size of entries written to the ledger.")
190+
.setUnit("By")
191+
.setExplicitBucketBoundariesAdvice(Arrays.asList(128L, 512L, 1024L, 2048L, 4096L, 16_384L,
192+
102_400L, 1_048_576L))
193+
.build();
127194
}
128195

129196
@Override
@@ -151,4 +218,24 @@ private void recordMetrics(ManagedLedger ml) {
151218
markDeleteCounter.record(stats.getMarkDeleteTotal(), attributes);
152219
readEntryCacheMissCounter.record(stats.getReadEntriesOpsCacheMissesTotal(), attributes);
153220
}
221+
222+
void recordAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
223+
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
224+
this.addEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
225+
}
226+
227+
void recordLedgerAddEntryLatency(long latency, TimeUnit unit, ManagedLedger ml) {
228+
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
229+
this.ledgerAddEntryLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
230+
}
231+
232+
void recordLedgerSwitchLatency(long latency, TimeUnit unit, ManagedLedger ml) {
233+
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
234+
this.ledgerSwitchLatencyHistogram.record(unit.toMillis(latency) / 1000.0, attributes);
235+
}
236+
237+
void recordEntrySize(long entrySize, ManagedLedger ml) {
238+
final var attributes = ml.getManagedLedgerAttributes().getAttributesOnlyNamespace();
239+
this.entrySizeHistogram.record(entrySize, attributes);
240+
}
154241
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/ManagedLedgerMetricsTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.stats;
2020

21+
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricHistogramValue;
2122
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
2223
import static org.apache.pulsar.transaction.coordinator.impl.DisabledTxnLogBufferedWriterMetricsStats.DISABLED_BUFFERED_WRITER_METRICS;
2324
import static org.assertj.core.api.Assertions.assertThat;
@@ -135,6 +136,9 @@ public void testManagedLedgerMetrics() throws Exception {
135136
OpenTelemetryAttributes.ML_NAME, mlName,
136137
OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace()
137138
);
139+
final var attribOnlyNamespace = Attributes.of(
140+
OpenTelemetryAttributes.PULSAR_NAMESPACE, topicNameObj.getNamespace()
141+
);
138142
var metricReader = pulsarTestContext.getOpenTelemetryMetricReader();
139143

140144
Awaitility.await().untilAsserted(() -> {
@@ -189,6 +193,16 @@ public void testManagedLedgerMetrics() throws Exception {
189193
value -> assertThat(value).isGreaterThanOrEqualTo(0));
190194
assertMetricLongSumValue(otelMetrics, OpenTelemetryManagedLedgerStats.READ_ENTRY_CACHE_MISS_COUNTER,
191195
attribCommon, value -> assertThat(value).isGreaterThanOrEqualTo(0));
196+
197+
assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ADD_ENTRY_LATENCY_HISTOGRAM,
198+
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
199+
sum -> assertThat(sum).isGreaterThan(0.0));
200+
assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.LEDGER_ADD_ENTRY_LATENCY_HISTOGRAM,
201+
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
202+
sum -> assertThat(sum).isGreaterThan(0.0));
203+
assertMetricHistogramValue(otelMetrics, OpenTelemetryManagedLedgerStats.ENTRY_SIZE_HISTOGRAM,
204+
attribOnlyNamespace, count -> assertThat(count).isEqualTo(15L),
205+
sum -> assertThat(sum).isGreaterThan(0.0));
192206
});
193207
}
194208

0 commit comments

Comments
 (0)