From 1d96c6b22c63ee26e7a0d79738bcf9783e249c29 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Wed, 10 Sep 2025 14:17:53 +0200 Subject: [PATCH 1/3] Fixing the counter for indexing pipelines It was stuck to 0 because I forgot to increment the guard upon creation. --- quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index b41eba19c79..0ba8fd4952e 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -159,7 +159,9 @@ impl IndexingPipeline { let indexing_pipelines_gauge = crate::metrics::INDEXER_METRICS .indexing_pipelines .with_label_values([¶ms.pipeline_id.index_uid.index_id]); - let indexing_pipelines_gauge_guard = OwnedGaugeGuard::from_gauge(indexing_pipelines_gauge); + let mut indexing_pipelines_gauge_guard = + OwnedGaugeGuard::from_gauge(indexing_pipelines_gauge); + indexing_pipelines_gauge_guard.add(1); let params_fingerprint = params.params_fingerprint; IndexingPipeline { params, From ec1add6e8401b7a77f4cf70f13a596636454e6c4 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Fri, 12 Sep 2025 12:03:26 +0200 Subject: [PATCH 2/3] Fixing GaugeGuard's API --- quickwit/quickwit-actors/src/mailbox.rs | 4 +--- quickwit/quickwit-common/src/metrics.rs | 15 +++++++++++---- quickwit/quickwit-common/src/stream_utils.rs | 5 ++--- quickwit/quickwit-common/src/thread_pool.rs | 9 ++++----- quickwit/quickwit-indexing/src/actors/indexer.rs | 10 ++++++---- .../src/actors/indexing_pipeline.rs | 5 ++--- .../quickwit-indexing/src/models/processed_doc.rs | 6 ++++-- .../quickwit-indexing/src/models/raw_doc_batch.rs | 12 ++++++++---- quickwit/quickwit-indexing/src/source/mod.rs | 2 +- .../quickwit-ingest/src/ingest_v2/ingester.rs | 6 ++++-- .../quickwit-ingest/src/ingest_v2/replication.rs | 6 ++++-- quickwit/quickwit-ingest/src/ingest_v2/router.rs | 6 ++++-- quickwit/quickwit-search/src/scroll_context.rs | 7 ++++--- .../quickwit-search/src/search_permit_provider.rs | 4 ++-- quickwit/quickwit-serve/src/decompression.rs | 6 ++++-- quickwit/quickwit-serve/src/load_shield.rs | 8 ++++---- quickwit/quickwit-storage/src/metrics.rs | 11 ++++++----- 17 files changed, 71 insertions(+), 51 deletions(-) diff --git a/quickwit/quickwit-actors/src/mailbox.rs b/quickwit/quickwit-actors/src/mailbox.rs index 899e289182a..8883af2f134 100644 --- a/quickwit/quickwit-actors/src/mailbox.rs +++ b/quickwit/quickwit-actors/src/mailbox.rs @@ -395,9 +395,7 @@ fn get_actor_inboxes_count_gauge_guard() -> GaugeGuard<'static> { &[], ) }); - let mut gauge_guard = GaugeGuard::from_gauge(gauge); - gauge_guard.add(1); - gauge_guard + GaugeGuard::from_gauge_with_initial_value(gauge, 1) } pub(crate) fn create_mailbox( diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index c59bf953937..352624f093c 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -219,8 +219,13 @@ impl std::fmt::Debug for GaugeGuard<'_> { } impl<'a> GaugeGuard<'a> { - pub fn from_gauge(gauge: &'a IntGauge) -> Self { - Self { gauge, delta: 0i64 } + pub fn from_gauge_with_initial_value(gauge: &'a IntGauge, initial_value: i64) -> Self { + let mut gauge = Self { + gauge, + delta: initial_value, + }; + gauge.add(initial_value); + gauge } pub fn get(&self) -> i64 { @@ -256,8 +261,10 @@ impl std::fmt::Debug for OwnedGaugeGuard { } impl OwnedGaugeGuard { - pub fn from_gauge(gauge: IntGauge) -> Self { - Self { gauge, delta: 0i64 } + pub fn from_gauge_with_initial_value(gauge: IntGauge, initial_value: i64) -> Self { + let mut gauge = Self { gauge, delta: 0i64 }; + gauge.add(initial_value); + gauge } pub fn get(&self) -> i64 { diff --git a/quickwit/quickwit-common/src/stream_utils.rs b/quickwit/quickwit-common/src/stream_utils.rs index e0fc126b465..c9769558916 100644 --- a/quickwit/quickwit-common/src/stream_utils.rs +++ b/quickwit/quickwit-common/src/stream_utils.rs @@ -240,9 +240,8 @@ where T: fmt::Debug impl InFlightValue { pub fn new(value: T, value_size: ByteSize, gauge: &'static IntGauge) -> Self { - let mut gauge_guard = GaugeGuard::from_gauge(gauge); - gauge_guard.add(value_size.as_u64() as i64); - + let gauge_guard = + GaugeGuard::from_gauge_with_initial_value(gauge, value_size.as_u64() as i64); Self(value, gauge_guard) } diff --git a/quickwit/quickwit-common/src/thread_pool.rs b/quickwit/quickwit-common/src/thread_pool.rs index 18201196cf9..00aaa52bcdc 100644 --- a/quickwit/quickwit-common/src/thread_pool.rs +++ b/quickwit/quickwit-common/src/thread_pool.rs @@ -85,9 +85,8 @@ impl ThreadPool { { let span = tracing::Span::current(); let ongoing_tasks = self.ongoing_tasks.clone(); - let mut pending_tasks_guard: OwnedGaugeGuard = - OwnedGaugeGuard::from_gauge(self.pending_tasks.clone()); - pending_tasks_guard.add(1i64); + let pending_tasks_guard: OwnedGaugeGuard = + OwnedGaugeGuard::from_gauge_with_initial_value(self.pending_tasks.clone(), 1i64); let (tx, rx) = oneshot::channel(); self.thread_pool.spawn(move || { drop(pending_tasks_guard); @@ -95,8 +94,8 @@ impl ThreadPool { return; } let _guard = span.enter(); - let mut ongoing_task_guard = GaugeGuard::from_gauge(&ongoing_tasks); - ongoing_task_guard.add(1i64); + let _ongoing_task_guard = + GaugeGuard::from_gauge_with_initial_value(&ongoing_tasks, 1i64); let result = cpu_intensive_fn(); let _ = tx.send(result); }); diff --git a/quickwit/quickwit-indexing/src/actors/indexer.rs b/quickwit/quickwit-indexing/src/actors/indexer.rs index 84ba3987f4a..142501381e3 100644 --- a/quickwit/quickwit-indexing/src/actors/indexer.rs +++ b/quickwit/quickwit-indexing/src/actors/indexer.rs @@ -219,9 +219,10 @@ impl IndexerState { let publish_lock = self.publish_lock.clone(); let publish_token_opt = self.publish_token_opt.clone(); - let mut split_builders_guard = - GaugeGuard::from_gauge(&crate::metrics::INDEXER_METRICS.split_builders); - split_builders_guard.add(1); + let split_builders_guard = GaugeGuard::from_gauge_with_initial_value( + &crate::metrics::INDEXER_METRICS.split_builders, + 1, + ); let workbench = IndexingWorkbench { workbench_id, @@ -233,10 +234,11 @@ impl IndexerState { publish_lock, publish_token_opt, last_delete_opstamp, - memory_usage: GaugeGuard::from_gauge( + memory_usage: GaugeGuard::from_gauge_with_initial_value( &quickwit_common::metrics::MEMORY_METRICS .in_flight .index_writer, + 0i64, ), cooperative_indexing_period, split_builders_guard, diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 0ba8fd4952e..ab7390e2ad7 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -159,9 +159,8 @@ impl IndexingPipeline { let indexing_pipelines_gauge = crate::metrics::INDEXER_METRICS .indexing_pipelines .with_label_values([¶ms.pipeline_id.index_uid.index_id]); - let mut indexing_pipelines_gauge_guard = - OwnedGaugeGuard::from_gauge(indexing_pipelines_gauge); - indexing_pipelines_gauge_guard.add(1); + let indexing_pipelines_gauge_guard = + OwnedGaugeGuard::from_gauge_with_initial_value(indexing_pipelines_gauge, 1); let params_fingerprint = params.params_fingerprint; IndexingPipeline { params, diff --git a/quickwit/quickwit-indexing/src/models/processed_doc.rs b/quickwit/quickwit-indexing/src/models/processed_doc.rs index bed695aa1d4..6aa8ab82a97 100644 --- a/quickwit/quickwit-indexing/src/models/processed_doc.rs +++ b/quickwit/quickwit-indexing/src/models/processed_doc.rs @@ -51,8 +51,10 @@ impl ProcessedDocBatch { force_commit: bool, ) -> Self { let delta = docs.iter().map(|doc| doc.num_bytes as i64).sum::(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.indexer_mailbox); - gauge_guard.add(delta); + let gauge_guard = GaugeGuard::from_gauge_with_initial_value( + &MEMORY_METRICS.in_flight.indexer_mailbox, + delta, + ); Self { docs, checkpoint_delta, diff --git a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs index f88d9fcac2b..7f255a0eaa3 100644 --- a/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs +++ b/quickwit/quickwit-indexing/src/models/raw_doc_batch.rs @@ -34,9 +34,10 @@ impl RawDocBatch { force_commit: bool, ) -> Self { let delta = docs.iter().map(|doc| doc.len() as i64).sum::(); - let mut gauge_guard = - GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.doc_processor_mailbox); - gauge_guard.add(delta); + let gauge_guard = GaugeGuard::from_gauge_with_initial_value( + &MEMORY_METRICS.in_flight.doc_processor_mailbox, + delta, + ); Self { docs, @@ -67,7 +68,10 @@ impl fmt::Debug for RawDocBatch { impl Default for RawDocBatch { fn default() -> Self { - let _gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.doc_processor_mailbox); + let _gauge_guard = GaugeGuard::from_gauge_with_initial_value( + &MEMORY_METRICS.in_flight.doc_processor_mailbox, + 0i64, + ); Self { docs: Vec::new(), checkpoint_delta: SourceCheckpointDelta::default(), diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index 5601e31618d..19f9f916461 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -532,7 +532,7 @@ impl BatchBuilder { SourceType::Pulsar => MEMORY_METRICS.in_flight.pulsar(), _ => MEMORY_METRICS.in_flight.other(), }; - let gauge_guard = GaugeGuard::from_gauge(gauge); + let gauge_guard = GaugeGuard::from_gauge_with_initial_value(gauge, 0i64); Self { docs: Vec::with_capacity(capacity), diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index c2683af12d1..aaae7ca4b11 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -1115,8 +1115,10 @@ impl IngesterService for Ingester { _ => None, }) .sum::(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_persist); - gauge_guard.add(request_size_bytes as i64); + let _gauge_guard = GaugeGuard::from_gauge_with_initial_value( + &MEMORY_METRICS.in_flight.ingester_persist, + request_size_bytes as i64, + ); self.persist_inner(persist_request).await } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs index 5e286ec5b84..f727d012868 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/replication.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/replication.rs @@ -504,8 +504,10 @@ impl ReplicationTask { ))); } let request_size_bytes = replicate_request.num_bytes(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingester_replicate); - gauge_guard.add(request_size_bytes as i64); + let _gauge_guard = GaugeGuard::from_gauge_with_initial_value( + &MEMORY_METRICS.in_flight.ingester_replicate, + request_size_bytes as i64, + ); self.current_replication_seqno += 1; diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index e48cd647c04..b022d2ffd4b 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -602,8 +602,10 @@ impl IngestRouterService for IngestRouter { async fn ingest(&self, ingest_request: IngestRequestV2) -> IngestV2Result { let request_size_bytes = ingest_request.num_bytes(); - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.ingest_router); - gauge_guard.add(request_size_bytes as i64); + let _gauge_guard = GaugeGuard::from_gauge_with_initial_value( + &MEMORY_METRICS.in_flight.ingest_router, + request_size_bytes as i64, + ); let num_subrequests = ingest_request.subrequests.len(); let _permit = self diff --git a/quickwit/quickwit-search/src/scroll_context.rs b/quickwit/quickwit-search/src/scroll_context.rs index a4a31a856b5..9f10135fd54 100644 --- a/quickwit/quickwit-search/src/scroll_context.rs +++ b/quickwit/quickwit-search/src/scroll_context.rs @@ -148,9 +148,10 @@ impl Default for MiniKV { impl MiniKV { pub async fn put(&self, key: Vec, payload: Vec, ttl: Duration) { - let mut metric_guard = - GaugeGuard::from_gauge(&crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes); - metric_guard.add(payload.len() as i64); + let metric_guard = GaugeGuard::from_gauge_with_initial_value( + &crate::SEARCH_METRICS.searcher_local_kv_store_size_bytes, + payload.len() as i64, + ); let mut cache_lock = self.ttl_with_cache.write().await; cache_lock.insert( key, diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index c320f5cc79b..68221a3aade 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -212,10 +212,10 @@ impl SearchPermitActor { while let Some((permit_requester_tx, next_permit_size)) = self.pop_next_request_if_serviceable() { - let mut ongoing_gauge_guard = GaugeGuard::from_gauge( + let ongoing_gauge_guard = GaugeGuard::from_gauge_with_initial_value( &crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing, + 1, ); - ongoing_gauge_guard.add(1); self.total_memory_allocated += next_permit_size; self.num_warmup_slots_available -= 1; permit_requester_tx diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs index d65df7d3bea..f6ac63cddfe 100644 --- a/quickwit/quickwit-serve/src/decompression.rs +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -114,8 +114,10 @@ pub(crate) struct Body { impl Body { pub fn new(content: Bytes, load_shield_permit: LoadShieldPermit) -> Body { - let mut gauge_guard = GaugeGuard::from_gauge(&MEMORY_METRICS.in_flight.rest_server); - gauge_guard.add(content.len() as i64); + let gauge_guard = GaugeGuard::from_gauge_with_initial_value( + &MEMORY_METRICS.in_flight.rest_server, + content.len() as i64, + ); Body { content, _gauge_guard: gauge_guard, diff --git a/quickwit/quickwit-serve/src/load_shield.rs b/quickwit/quickwit-serve/src/load_shield.rs index 477c6e73d79..2f607aad746 100644 --- a/quickwit/quickwit-serve/src/load_shield.rs +++ b/quickwit/quickwit-serve/src/load_shield.rs @@ -78,13 +78,13 @@ impl LoadShield { } pub async fn acquire_permit(&'static self) -> Result { - let mut pending_gauge_guard = GaugeGuard::from_gauge(&self.pending_gauge); - pending_gauge_guard.add(1); + let pending_gauge_guard = + GaugeGuard::from_gauge_with_initial_value(&self.pending_gauge, 1i64); let in_flight_permit_opt = self.acquire_in_flight_permit().await?; let concurrency_permit_opt = self.acquire_concurrency_permit().await; drop(pending_gauge_guard); - let mut ongoing_gauge_guard = GaugeGuard::from_gauge(&self.ongoing_gauge); - ongoing_gauge_guard.add(1); + let ongoing_gauge_guard = + GaugeGuard::from_gauge_with_initial_value(&self.ongoing_gauge, 1i64); Ok(LoadShieldPermit { _in_flight_permit_opt: in_flight_permit_opt, _concurrency_permit_opt: concurrency_permit_opt, diff --git a/quickwit/quickwit-storage/src/metrics.rs b/quickwit/quickwit-storage/src/metrics.rs index 43ef588e192..21d540a40dd 100644 --- a/quickwit/quickwit-storage/src/metrics.rs +++ b/quickwit/quickwit-storage/src/metrics.rs @@ -232,12 +232,13 @@ pub static CACHE_METRICS_FOR_TESTS: Lazy = pub fn object_storage_get_slice_in_flight_guards( get_request_size: usize, ) -> (GaugeGuard<'static>, GaugeGuard<'static>) { - let mut bytes_guard = GaugeGuard::from_gauge( + let bytes_guard = GaugeGuard::from_gauge_with_initial_value( &crate::STORAGE_METRICS.object_storage_get_slice_in_flight_num_bytes, + get_request_size as i64, + ); + let count_guard = GaugeGuard::from_gauge_with_initial_value( + &crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count, + 1, ); - bytes_guard.add(get_request_size as i64); - let mut count_guard = - GaugeGuard::from_gauge(&crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count); - count_guard.add(1); (bytes_guard, count_guard) } From 570f538c8edf5403e6d67eb8fa5ca2b0652bf6bc Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Fri, 12 Sep 2025 12:10:25 +0200 Subject: [PATCH 3/3] using index_label method on pipeline counter --- quickwit/quickwit-common/src/metrics.rs | 5 +---- quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs | 4 +++- quickwit/quickwit-metastore/src/metastore/postgres/pool.rs | 5 ++--- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index 352624f093c..ae13a351cd0 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -220,10 +220,7 @@ impl std::fmt::Debug for GaugeGuard<'_> { impl<'a> GaugeGuard<'a> { pub fn from_gauge_with_initial_value(gauge: &'a IntGauge, initial_value: i64) -> Self { - let mut gauge = Self { - gauge, - delta: initial_value, - }; + let mut gauge = Self { gauge, delta: 0i64 }; gauge.add(initial_value); gauge } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index ab7390e2ad7..4ac454c2cd3 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -158,7 +158,9 @@ impl IndexingPipeline { pub fn new(params: IndexingPipelineParams) -> Self { let indexing_pipelines_gauge = crate::metrics::INDEXER_METRICS .indexing_pipelines - .with_label_values([¶ms.pipeline_id.index_uid.index_id]); + .with_label_values([&quickwit_common::metrics::index_label( + ¶ms.pipeline_id.index_uid.index_id, + )]); let indexing_pipelines_gauge_guard = OwnedGaugeGuard::from_gauge_with_initial_value(indexing_pipelines_gauge, 1); let params_fingerprint = params.params_fingerprint; diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs b/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs index f0437300095..879dad2a66c 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/pool.rs @@ -59,9 +59,8 @@ impl<'a, DB: Database> Acquire<'a> for &TrackedPool { .set(self.inner_pool.num_idle() as i64); Box::pin(async move { - let mut gauge_guard = GaugeGuard::from_gauge(&POSTGRES_METRICS.acquire_connections); - gauge_guard.add(1); - + let _gauge_guard = + GaugeGuard::from_gauge_with_initial_value(&POSTGRES_METRICS.acquire_connections, 1); let conn = acquire_conn_fut.await?; Ok(conn) })