From 872a905ee196fc568bafa776c77ff96aae05cd98 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Tue, 2 Sep 2025 13:19:56 -0700 Subject: [PATCH 1/5] [ENH]: consolidate retries --- Cargo.lock | 4 +- Cargo.toml | 2 +- decode_log.py | 26 ++ deserialize_version_file.py | 61 +++ rust/frontend/src/config.rs | 5 +- rust/frontend/src/executor/distributed.rs | 49 ++- rust/frontend/src/executor/local.rs | 14 +- rust/frontend/src/executor/mod.rs | 19 +- .../get_collection_with_segments_provider.rs | 40 -- .../src/impls/service_based_frontend.rs | 399 ++++++++++++------ rust/log-service/src/lib.rs | 3 + rust/log/src/grpc_log.rs | 4 +- rust/python_bindings/src/bindings.rs | 9 +- 13 files changed, 440 insertions(+), 195 deletions(-) create mode 100644 decode_log.py create mode 100644 deserialize_version_file.py diff --git a/Cargo.lock b/Cargo.lock index d2160e9bfb4..82f2b674454 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1085,9 +1085,9 @@ dependencies = [ [[package]] name = "backon" -version = "1.3.0" +version = "1.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7" +checksum = "592277618714fbcecda9a02ba7a8781f319d26532a88553bbacc77ba5d2b3a8d" dependencies = [ "fastrand", "gloo-timers", diff --git a/Cargo.toml b/Cargo.toml index a78f0808408..0bc5e4b6c63 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,7 @@ tokio-util = "0.7.12" tonic = "0.12" tonic-health = "0.12.3" tower = { version = "0.4.13", features = ["discover"] } -backon = "1.3.0" +backon = "1.5.2" tracing = { version = "0.1" } tracing-bunyan-formatter = "0.3" tracing-opentelemetry = "0.28.0" diff --git a/decode_log.py b/decode_log.py new file mode 100644 index 00000000000..a60022e90ae --- /dev/null +++ b/decode_log.py @@ -0,0 +1,26 @@ +# import pandas as pd +# import chromadb.proto.chroma_pb2 as chroma_pb2 + +# path = '/Users/sanketkedia/Downloads/FragmentSeqNo=0000000000000001.parquet' +# df = pd.read_parquet(path) +# body = df['body'].iloc[0] +# log_record = chroma_pb2.OperationRecord() +# err = log_record.ParseFromString(body) +# print(log_record) + + + +import sys +from chromadb.proto.chroma_pb2 import Operation, OperationRecord + +hex_line = "0a046665667712001a1b0a190a0f6368726f6d613a646f63756d656e7412060a0465667765" + +def decode_file(): + data = bytes.fromhex(hex_line) + # Decode data to OperationRecord and print + record = OperationRecord.FromString(data) + print(record) + print(record.operation) + +if __name__ == "__main__": + decode_file() \ No newline at end of file diff --git a/deserialize_version_file.py b/deserialize_version_file.py new file mode 100644 index 00000000000..b74685bdf27 --- /dev/null +++ b/deserialize_version_file.py @@ -0,0 +1,61 @@ +import chromadb.proto.coordinator_pb2 as coordinator_pb2 + +# Read the serialized file +with open('/Users/sanketkedia/Downloads/000038_5415a1ef-4efc-4503-8a24-ef144a9d1911_flush', 'rb') as f: + serialized_data = f.read() + +# Deserialize the data +collection_version_file = coordinator_pb2.CollectionVersionFile() +collection_version_file.ParseFromString(serialized_data) + +if not collection_version_file.HasField('version_history'): + print("No version history found in the file.") + exit(0) + +versions = collection_version_file.version_history.versions +print(f"Found {len(versions)} versions in the file.") +print(collection_version_file) + +exit(0) + +output_file_path = "/Users/sanketkedia/Documents/000310_166f7650-8cd8-452d-904b-6f92ca5fd62c_flush" + + # Track the last non-empty segment_info we've seen +last_non_empty_segment_info = None +last_non_empty_version = None + +def has_non_empty_segment_info(version_info): + """Check if a version has non-empty segment_info.""" + if not version_info.HasField('segment_info'): + return False + + segment_info = version_info.segment_info + return len(segment_info.segment_compaction_info) > 0 + +# Process each version +for i, version in enumerate(versions): + version_num = version.version + + if has_non_empty_segment_info(version): + # This version has non-empty segment_info, update our tracker + last_non_empty_segment_info = version.segment_info + last_non_empty_version = version_num + print(f"Version {version_num}: Has non-empty segment_info with {len(version.segment_info.segment_compaction_info)} segments") + else: + # This version has empty segment_info + if last_non_empty_segment_info is not None: + # We have a previous non-empty segment_info to use + print(f"Version {version_num}: Patching with segment_info from version {last_non_empty_version}") + + # Clear any existing segment_info and copy from the last non-empty one + version.segment_info.CopyFrom(last_non_empty_segment_info) + else: + print(f"Version {version_num}: No previous non-empty segment_info found, keeping empty") + +print(f"\nWriting patched file to: {output_file_path}") +serialized_output = collection_version_file.SerializeToString() + +with open(output_file_path, 'wb') as f: + f.write(serialized_output) + +print("Patching complete!") \ No newline at end of file diff --git a/rust/frontend/src/config.rs b/rust/frontend/src/config.rs index 4a01d4e0e47..9de8fe22395 100644 --- a/rust/frontend/src/config.rs +++ b/rust/frontend/src/config.rs @@ -1,5 +1,5 @@ use crate::{ - executor::config::{ExecutorConfig, LocalExecutorConfig}, + executor::config::{ExecutorConfig, LocalExecutorConfig, RetryConfig}, CollectionsWithSegmentsProviderConfig, }; use chroma_log::config::LogConfig; @@ -69,6 +69,8 @@ pub struct FrontendConfig { pub tenants_to_migrate_immediately: Vec, #[serde(default = "Default::default")] pub tenants_to_migrate_immediately_threshold: Option, + #[serde(default)] + pub retry: RetryConfig, } impl FrontendConfig { @@ -87,6 +89,7 @@ impl FrontendConfig { default_knn_index: default_default_knn_index(), tenants_to_migrate_immediately: vec![], tenants_to_migrate_immediately_threshold: None, + retry: RetryConfig::default(), } } } diff --git a/rust/frontend/src/executor/distributed.rs b/rust/frontend/src/executor/distributed.rs index b7786577b24..2d3e19c30c9 100644 --- a/rust/frontend/src/executor/distributed.rs +++ b/rust/frontend/src/executor/distributed.rs @@ -1,3 +1,5 @@ +use std::future::Future; + use super::config; use async_trait::async_trait; use backon::ExponentialBuilder; @@ -22,6 +24,7 @@ use chroma_types::{ use rand::distributions::Distribution; use tonic::Request; +use tonic::Status; // Convenience type alias for the gRPC query client used by the DistributedExecutor type QueryClient = @@ -148,7 +151,15 @@ impl DistributedExecutor { Ok(res.into_inner().into()) } - pub async fn get(&mut self, plan: Get) -> Result { + pub async fn get( + &mut self, + plan: Get, + replan_closure: F, + ) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { let clients = self .client_assigner .clients( @@ -161,6 +172,7 @@ impl DistributedExecutor { ) .map_err(|e| ExecutorError::Internal(e.boxed()))?; let attempt_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let last_error = std::sync::Arc::new(parking_lot::Mutex::new(tonic::Code::Ok)); let config = self.client_selection_config.clone(); let res = { let attempt_count = attempt_count.clone(); @@ -168,12 +180,31 @@ impl DistributedExecutor { let current_attempt = attempt_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let is_retry = current_attempt > 0; + if is_retry { + let last_error_code = *last_error.lock(); + let replan_get = + replan_closure(last_error_code) + .await + .map_err(|e| -> tonic::Status { + Status::new( + e.code().into(), + format!("Failed to replan get {:?}", e), + ) + })?; + return choose_query_client_weighted(&clients, &config, is_retry)? + .get(Request::new(replan_get.clone().try_into()?)) + .await; + } choose_query_client_weighted(&clients, &config, is_retry)? .get(Request::new(plan.clone().try_into()?)) .await }) .retry(self.backoff) .when(is_retryable_error) + .notify(|e, _| { + let mut last_error = last_error.lock(); + *last_error = e.code(); + }) .await? }; Ok(res.into_inner().try_into()?) @@ -199,12 +230,22 @@ impl DistributedExecutor { let current_attempt = attempt_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let is_retry = current_attempt > 0; - choose_query_client_weighted(&clients, &config, is_retry)? + let r = choose_query_client_weighted(&clients, &config, is_retry)? .knn(Request::new(plan.clone().try_into()?)) - .await + .await; + if r.is_err() { + println!("(Sanket-temp) Knn query failed with error {:?}", r); + } + r }) .retry(self.backoff) .when(is_retryable_error) + .notify(|e, dur| { + println!( + "(Sanket-temp) KNN query failed with error: {}, retrying in {:?}", + e, dur + ) + }) .await? }; Ok(res.into_inner().try_into()?) @@ -313,6 +354,8 @@ fn is_retryable_error(e: &tonic::Status) -> bool { || e.code() == tonic::Code::DeadlineExceeded || e.code() == tonic::Code::Aborted || e.code() == tonic::Code::ResourceExhausted + || e.code() == tonic::Code::Unknown + || e.code() == tonic::Code::NotFound } fn no_clients_found_status() -> tonic::Status { diff --git a/rust/frontend/src/executor/local.rs b/rust/frontend/src/executor/local.rs index ded0f4808f9..a02db5f8aab 100644 --- a/rust/frontend/src/executor/local.rs +++ b/rust/frontend/src/executor/local.rs @@ -95,7 +95,11 @@ impl LocalExecutor { Ok(()) } - pub async fn get(&mut self, plan: Get) -> Result { + pub async fn get(&mut self, plan: Get, _: F) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: std::future::Future>>, + { let collection_and_segments = plan.scan.collection_and_segments.clone(); self.try_backfill_collection(&collection_and_segments) .await?; @@ -169,7 +173,7 @@ impl LocalExecutor { }; let allowed_uids = self - .get(filter_plan) + .get(filter_plan.clone(), |_| async { Ok(filter_plan.clone()) }) .await? .result .records @@ -278,7 +282,11 @@ impl LocalExecutor { }, }; - let hydrated_records = self.get(projection_plan).await?; + let hydrated_records = self + .get(projection_plan.clone(), |_| async { + Ok(projection_plan.clone()) + }) + .await?; let mut user_id_to_document = HashMap::new(); let mut user_id_to_metadata = HashMap::new(); for ProjectionRecord { diff --git a/rust/frontend/src/executor/mod.rs b/rust/frontend/src/executor/mod.rs index 94fd19c5fee..0485cf26571 100644 --- a/rust/frontend/src/executor/mod.rs +++ b/rust/frontend/src/executor/mod.rs @@ -1,3 +1,6 @@ +use std::future::Future; + +use chroma_error::ChromaError; use chroma_types::{ operator::{CountResult, GetResult, KnnBatchResult, SearchResult}, plan::{Count, Get, Knn, Search}, @@ -26,10 +29,20 @@ impl Executor { Executor::Local(local_executor) => local_executor.count(plan).await, } } - pub async fn get(&mut self, plan: Get) -> Result { + pub async fn get( + &mut self, + plan: Get, + replan_closure: F, + ) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { match self { - Executor::Distributed(distributed_executor) => distributed_executor.get(plan).await, - Executor::Local(local_executor) => local_executor.get(plan).await, + Executor::Distributed(distributed_executor) => { + distributed_executor.get(plan, replan_closure).await + } + Executor::Local(local_executor) => local_executor.get(plan, replan_closure).await, } } pub async fn knn(&mut self, plan: Knn) -> Result { diff --git a/rust/frontend/src/get_collection_with_segments_provider.rs b/rust/frontend/src/get_collection_with_segments_provider.rs index 20575522ab5..0ecef6f1c78 100644 --- a/rust/frontend/src/get_collection_with_segments_provider.rs +++ b/rust/frontend/src/get_collection_with_segments_provider.rs @@ -1,4 +1,3 @@ -use backon::ConstantBuilder; use chroma_cache::{AysncPartitionedMutex, Cache, CacheError, Weighted}; use chroma_config::Configurable; use chroma_error::{ChromaError, ErrorCodes}; @@ -11,37 +10,11 @@ use std::{ }; use thiserror::Error; -#[derive(Deserialize, Serialize, Clone, Debug)] -pub struct CacheInvalidationRetryConfig { - pub delay_ms: u32, - pub max_retries: u32, -} - -impl CacheInvalidationRetryConfig { - pub fn new(delay_ms: u32, max_retries: u32) -> Self { - Self { - delay_ms, - max_retries, - } - } -} - -impl Default for CacheInvalidationRetryConfig { - fn default() -> Self { - Self { - delay_ms: 0, - max_retries: 3, - } - } -} - #[derive(Deserialize, Clone, Serialize, Debug)] pub struct CollectionsWithSegmentsProviderConfig { pub cache: chroma_cache::CacheConfig, pub cache_ttl_secs: u32, pub permitted_parallelism: u32, - #[serde(default = "CacheInvalidationRetryConfig::default")] - pub cache_invalidation_retry_policy: CacheInvalidationRetryConfig, } impl Default for CollectionsWithSegmentsProviderConfig { @@ -50,7 +23,6 @@ impl Default for CollectionsWithSegmentsProviderConfig { cache: chroma_cache::CacheConfig::Nop, cache_ttl_secs: 60, permitted_parallelism: 100, - cache_invalidation_retry_policy: CacheInvalidationRetryConfig::default(), } } } @@ -69,12 +41,6 @@ impl Configurable for CollectionsWithSegm let sysdb_rpc_lock = AysncPartitionedMutex::with_parallelism(config.permitted_parallelism as usize, ()); - let retry_backoff = ConstantBuilder::default() - .with_delay(Duration::from_millis( - config.cache_invalidation_retry_policy.delay_ms as u64, - )) - .with_max_times(config.cache_invalidation_retry_policy.max_retries as usize); - let sysdb = registry .get::() .map_err(|e| Box::new(e) as Box)?; @@ -84,7 +50,6 @@ impl Configurable for CollectionsWithSegm collections_with_segments_cache: collections_with_segments_cache.into(), cache_ttl_secs: config.cache_ttl_secs, sysdb_rpc_lock, - retry_backoff, }) } } @@ -109,7 +74,6 @@ pub struct CollectionsWithSegmentsProvider { Arc>, pub(crate) cache_ttl_secs: u32, pub(crate) sysdb_rpc_lock: chroma_cache::AysncPartitionedMutex, - pub(crate) retry_backoff: ConstantBuilder, } #[derive(Debug, Error)] @@ -130,10 +94,6 @@ impl ChromaError for CollectionsWithSegmentsProviderError { } impl CollectionsWithSegmentsProvider { - pub(crate) fn get_retry_backoff(&self) -> ConstantBuilder { - self.retry_backoff - } - pub(crate) async fn get_collection_with_segments( &mut self, collection_id: CollectionUuid, diff --git a/rust/frontend/src/impls/service_based_frontend.rs b/rust/frontend/src/impls/service_based_frontend.rs index 99095cef6d0..16aab185a6c 100644 --- a/rust/frontend/src/impls/service_based_frontend.rs +++ b/rust/frontend/src/impls/service_based_frontend.rs @@ -77,7 +77,7 @@ pub struct ServiceBasedFrontend { max_batch_size: u32, metrics: Arc, default_knn_index: KnnIndex, - retries_builder: ExponentialBuilder, + retry_policy: ExponentialBuilder, } impl ServiceBasedFrontend { @@ -90,6 +90,7 @@ impl ServiceBasedFrontend { executor: Executor, max_batch_size: u32, default_knn_index: KnnIndex, + retry_policy: ExponentialBuilder, ) -> Self { let meter = global::meter("chroma"); let fork_retries_counter = meter.u64_counter("fork_retries").build(); @@ -120,18 +121,6 @@ impl ServiceBasedFrontend { metering_write_counter, metering_external_read_counter, }); - // factor: 2.0, - // min_delay_ms: 100, - // max_delay_ms: 5000, - // max_attempts: 5, - // jitter: true, - // TODO(Sanket): Ideally config for this. - let retries_builder = ExponentialBuilder::default() - .with_max_times(5) - .with_factor(2.0) - .with_max_delay(Duration::from_millis(5000)) - .with_min_delay(Duration::from_millis(100)) - .with_jitter(); ServiceBasedFrontend { allow_reset, executor, @@ -141,7 +130,7 @@ impl ServiceBasedFrontend { max_batch_size, metrics, default_knn_index, - retries_builder, + retry_policy, } } @@ -699,12 +688,12 @@ impl ServiceBasedFrontend { }; let res = fork_to_retry - .retry(self.collections_with_segments_provider.get_retry_backoff()) + .retry(self.retry_policy) // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried .when(|e| { matches!( e.code(), - ErrorCodes::FailedPrecondition | ErrorCodes::Unknown + ErrorCodes::FailedPrecondition | ErrorCodes::Unknown | ErrorCodes::Unavailable ) }) .notify(|_, _| { @@ -773,9 +762,10 @@ impl ServiceBasedFrontend { .await } }; + // Any error other than a backoff is not safe to retry. let res = add_to_retry - .retry(self.retries_builder) - .when(|e| matches!(e.code(), ErrorCodes::AlreadyExists)) + .retry(self.retry_policy) + .when(|e| matches!(e.code(), ErrorCodes::Unavailable)) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -812,7 +802,7 @@ impl ServiceBasedFrontend { Ok(AddCollectionRecordsResponse {}) } Err(e) => { - if e.code() == ErrorCodes::AlreadyExists { + if e.code() == ErrorCodes::Unavailable { Err(AddCollectionRecordsError::Backoff) } else { Err(AddCollectionRecordsError::Other(Box::new(e) as _)) @@ -861,9 +851,10 @@ impl ServiceBasedFrontend { .await } }; + // Any error other than a backoff is not safe to retry. let res = add_to_retry - .retry(self.retries_builder) - .when(|e| matches!(e.code(), ErrorCodes::AlreadyExists)) + .retry(self.retry_policy) + .when(|e| matches!(e.code(), ErrorCodes::Unavailable)) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -900,7 +891,7 @@ impl ServiceBasedFrontend { Ok(UpdateCollectionRecordsResponse {}) } Err(e) => { - if e.code() == ErrorCodes::AlreadyExists { + if e.code() == ErrorCodes::Unavailable { Err(UpdateCollectionRecordsError::Backoff) } else { Err(UpdateCollectionRecordsError::Other(Box::new(e) as _)) @@ -954,9 +945,10 @@ impl ServiceBasedFrontend { .await } }; + // Any error other than a backoff is not safe to retry. let res = add_to_retry - .retry(self.retries_builder) - .when(|e| matches!(e.code(), ErrorCodes::AlreadyExists)) + .retry(self.retry_policy) + .when(|e| matches!(e.code(), ErrorCodes::Unavailable)) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -993,7 +985,7 @@ impl ServiceBasedFrontend { Ok(UpsertCollectionRecordsResponse {}) } Err(e) => { - if e.code() == ErrorCodes::AlreadyExists { + if e.code() == ErrorCodes::Unavailable { Err(UpsertCollectionRecordsError::Backoff) } else { Err(UpsertCollectionRecordsError::Other(Box::new(e) as _)) @@ -1002,17 +994,81 @@ impl ServiceBasedFrontend { } } - pub async fn retryable_delete( + pub async fn retryable_push_delete_logs( + &mut self, + tenant_id: String, + database_name: String, + collection_id: CollectionUuid, + records: Vec, + ) -> Result { + let collection_write_context_container = + chroma_metering::create::(CollectionWriteContext::new( + tenant_id.clone(), + database_name.clone(), + collection_id.0.to_string(), + WriteAction::Delete, + )); + + // Closure for write context operations + (async { + if records.is_empty() { + tracing::debug!("Bailing because no records were found"); + return Ok::<_, DeleteCollectionRecordsError>(DeleteCollectionRecordsResponse {}); + } + + let log_size_bytes = records.iter().map(OperationRecord::size_bytes).sum(); + + self.log_client + .push_logs(&tenant_id, collection_id, records) + .await + .map_err(|err| { + if err.code() == ErrorCodes::Unavailable { + DeleteCollectionRecordsError::Backoff + } else { + DeleteCollectionRecordsError::Internal(Box::new(err) as _) + } + })?; + + // Attach metadata to the write context + chroma_metering::with_current(|context| { + context.log_size_bytes(log_size_bytes); + }); + + Ok(DeleteCollectionRecordsResponse {}) + }) + .meter(collection_write_context_container.clone()) + .await?; + + // Need to re-enter the write context before attempting to close + collection_write_context_container.enter(); + + // TODO: Submit event after the response is sent + match chroma_metering::close::() { + Ok(collection_write_context) => { + if let Ok(()) = MeterEvent::CollectionWrite(collection_write_context) + .submit() + .await + { + self.metrics.metering_write_counter.add(1, &[]); + } + } + Err(e) => { + tracing::error!("Failed to submit metering event to receiver: {:?}", e) + } + } + + Ok(DeleteCollectionRecordsResponse {}) + } + + pub async fn retryable_get_records_to_delete( &mut self, DeleteCollectionRecordsRequest { - tenant_id, - database_name, collection_id, ids, r#where, .. }: DeleteCollectionRecordsRequest, - ) -> Result { + ) -> Result, DeleteCollectionRecordsError> { let mut records = Vec::new(); let read_event = if let Some(where_clause) = r#where { @@ -1032,22 +1088,43 @@ impl ServiceBasedFrontend { where_clause: Some(where_clause), }; + let get_plan = Get { + scan: Scan { + collection_and_segments, + }, + filter, + limit: Limit { + offset: 0, + limit: None, + }, + proj: Projection { + document: false, + embedding: false, + metadata: false, + }, + }; + let get_result = self .executor - .get(Get { - scan: Scan { - collection_and_segments, - }, - filter, - limit: Limit { - offset: 0, - limit: None, - }, - proj: Projection { - document: false, - embedding: false, - metadata: false, - }, + .get(get_plan.clone(), |code: tonic::Code| { + let mut provider = self.collections_with_segments_provider.clone(); + let mut get_plan2 = get_plan.clone(); + async move { + if code == tonic::Code::NotFound { + provider + .collections_with_segments_cache + .remove(&collection_id) + .await; + } + let collection_and_segments = provider + .get_collection_with_segments(collection_id) + .await + .map_err(|err| Box::new(err) as Box)?; + get_plan2.scan = Scan { + collection_and_segments, + }; + Ok(get_plan2) + } }) .await?; @@ -1098,68 +1175,10 @@ impl ServiceBasedFrontend { }; if let Some(event) = read_event { - if let Ok(()) = event.submit().await { - self.metrics.metering_read_counter.add(1, &[]); - } - } - - let collection_write_context_container = - chroma_metering::create::(CollectionWriteContext::new( - tenant_id.clone(), - database_name.clone(), - collection_id.0.to_string(), - WriteAction::Delete, - )); - - // Closure for write context operations - (async { - if records.is_empty() { - tracing::debug!("Bailing because no records were found"); - return Ok::<_, DeleteCollectionRecordsError>(DeleteCollectionRecordsResponse {}); - } - - let log_size_bytes = records.iter().map(OperationRecord::size_bytes).sum(); - - self.log_client - .push_logs(&tenant_id, collection_id, records) - .await - .map_err(|err| { - if err.code() == ErrorCodes::Unavailable { - DeleteCollectionRecordsError::Backoff - } else { - DeleteCollectionRecordsError::Internal(Box::new(err) as _) - } - })?; - - // Attach metadata to the write context - chroma_metering::with_current(|context| { - context.log_size_bytes(log_size_bytes); - }); - - Ok(DeleteCollectionRecordsResponse {}) - }) - .meter(collection_write_context_container.clone()) - .await?; - - // Need to re-enter the write context before attempting to close - collection_write_context_container.enter(); - - // TODO: Submit event after the response is sent - match chroma_metering::close::() { - Ok(collection_write_context) => { - if let Ok(()) = MeterEvent::CollectionWrite(collection_write_context) - .submit() - .await - { - self.metrics.metering_write_counter.add(1, &[]); - } - } - Err(e) => { - tracing::error!("Failed to submit metering event to receiver: {:?}", e) - } + event.submit().await; } - Ok(DeleteCollectionRecordsResponse {}) + Ok(records) } pub async fn delete( @@ -1167,7 +1186,7 @@ impl ServiceBasedFrontend { request: DeleteCollectionRecordsRequest, ) -> Result { let retries = Arc::new(AtomicUsize::new(0)); - let delete_to_retry = || { + let retryable_get_records_to_delete = || { let mut self_clone = self.clone(); let request_clone = request.clone(); let cache_clone = self @@ -1175,7 +1194,9 @@ impl ServiceBasedFrontend { .collections_with_segments_cache .clone(); async move { - let res = self_clone.retryable_delete(request_clone).await; + let res = self_clone + .retryable_get_records_to_delete(request_clone) + .await; match res { Ok(res) => Ok(res), Err(e) => { @@ -1191,24 +1212,70 @@ impl ServiceBasedFrontend { } } }; - let res = delete_to_retry - .retry(self.collections_with_segments_provider.get_retry_backoff()) + let records_to_delete = retryable_get_records_to_delete + .retry(self.retry_policy) // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| matches!(e.code(), ErrorCodes::NotFound | ErrorCodes::Unknown)) + .when(|e| { + matches!( + e.code(), + ErrorCodes::NotFound | ErrorCodes::Unknown | ErrorCodes::Unavailable + ) + }) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { tracing::info!( - "Retrying delete() request for collection {}", + "Retrying get records to delete request for collection {}", request.collection_id ); } }) - .await; + .adjust(|e, d| { + if e.code() == ErrorCodes::NotFound { + // Retry immediately for cache invalidation + Some(Duration::from_micros(0)) + } else { + d + } + }) + .await?; + + let retries = Arc::new(AtomicUsize::new(0)); + let retryable_push_delete_logs = || { + let mut self_clone = self.clone(); + let tenant = request.tenant_id.clone(); + let database_name = request.database_name.clone(); + let records_to_delete = records_to_delete.clone(); + async move { + self_clone + .retryable_push_delete_logs( + tenant, + database_name, + request.collection_id, + records_to_delete, + ) + .await + } + }; + let res = retryable_push_delete_logs + .retry(self.retry_policy) + // NOTE: It's not safe to retry adds to the log except when it is unavailable. + .when(|e| matches!(e.code(), ErrorCodes::Unavailable)) + .notify(|_, _| { + let retried = retries.fetch_add(1, Ordering::Relaxed); + if retried > 0 { + tracing::info!( + "Retrying push delete logs request for collection {}", + request.collection_id + ); + } + }) + .await?; + self.metrics .delete_retries_counter .add(retries.load(Ordering::Relaxed) as u64, &[]); - res + Ok(res) } pub async fn retryable_count( @@ -1299,9 +1366,14 @@ impl ServiceBasedFrontend { } }; let res = count_to_retry - .retry(self.collections_with_segments_provider.get_retry_backoff()) + .retry(self.retry_policy) // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| matches!(e.code(), ErrorCodes::NotFound | ErrorCodes::Unknown)) + .when(|e| { + matches!( + e.code(), + ErrorCodes::NotFound | ErrorCodes::Unknown | ErrorCodes::Unavailable + ) + }) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -1311,6 +1383,14 @@ impl ServiceBasedFrontend { ); } }) + .adjust(|e, d| { + if e.code() == ErrorCodes::NotFound { + // Retry immediately for cache invalidation + Some(Duration::from_micros(0)) + } else { + d + } + }) .await; self.metrics .count_retries_counter @@ -1346,24 +1426,44 @@ impl ServiceBasedFrontend { .as_ref() .map(Where::fts_query_length) .unwrap_or_default(); + let get_plan = Get { + scan: Scan { + collection_and_segments, + }, + filter: Filter { + query_ids: ids, + where_clause: r#where, + }, + limit: Limit { offset, limit }, + proj: Projection { + document: include.0.contains(&Include::Document), + embedding: include.0.contains(&Include::Embedding), + // If URI is requested, metadata is also requested so we can extract the URI. + metadata: (include.0.contains(&Include::Metadata) + || include.0.contains(&Include::Uri)), + }, + }; let get_result = self .executor - .get(Get { - scan: Scan { - collection_and_segments, - }, - filter: Filter { - query_ids: ids, - where_clause: r#where, - }, - limit: Limit { offset, limit }, - proj: Projection { - document: include.0.contains(&Include::Document), - embedding: include.0.contains(&Include::Embedding), - // If URI is requested, metadata is also requested so we can extract the URI. - metadata: (include.0.contains(&Include::Metadata) - || include.0.contains(&Include::Uri)), - }, + .get(get_plan.clone(), |code: tonic::Code| { + let mut provider = self.collections_with_segments_provider.clone(); + let mut get_plan2 = get_plan.clone(); + async move { + if code == tonic::Code::NotFound { + provider + .collections_with_segments_cache + .remove(&collection_id) + .await; + } + let collection_and_segments = provider + .get_collection_with_segments(collection_id) + .await + .map_err(|err| Box::new(err) as Box)?; + get_plan2.scan = Scan { + collection_and_segments, + }; + Ok(get_plan2) + } }) .await?; let return_bytes = get_result.size_bytes(); @@ -1435,9 +1535,14 @@ impl ServiceBasedFrontend { } }; let res = get_to_retry - .retry(self.collections_with_segments_provider.get_retry_backoff()) + .retry(self.retry_policy) // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| matches!(e.code(), ErrorCodes::NotFound | ErrorCodes::Unknown)) + .when(|e| { + matches!( + e.code(), + ErrorCodes::NotFound | ErrorCodes::Unknown | ErrorCodes::Unavailable + ) + }) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -1447,6 +1552,14 @@ impl ServiceBasedFrontend { ); } }) + .adjust(|e, d| { + if e.code() == ErrorCodes::NotFound { + // Retry immediately for cache invalidation + Some(Duration::from_micros(0)) + } else { + d + } + }) .await; self.metrics .get_retries_counter @@ -1587,9 +1700,14 @@ impl ServiceBasedFrontend { } }; let res = query_to_retry - .retry(self.collections_with_segments_provider.get_retry_backoff()) + .retry(self.retry_policy) // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| matches!(e.code(), ErrorCodes::NotFound | ErrorCodes::Unknown)) + .when(|e| { + matches!( + e.code(), + ErrorCodes::NotFound | ErrorCodes::Unknown | ErrorCodes::Unavailable + ) + }) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -1599,6 +1717,14 @@ impl ServiceBasedFrontend { ); } }) + .adjust(|e, d| { + if e.code() == ErrorCodes::NotFound { + // Retry immediately for cache invalidation + Some(Duration::from_micros(0)) + } else { + d + } + }) .await; self.metrics .query_retries_counter @@ -1797,6 +1923,7 @@ impl Configurable<(FrontendConfig, System)> for ServiceBasedFrontend { let executor = Executor::try_from_config(&(config.executor.clone(), system.clone()), registry).await?; + let retry_config = &config.retry; Ok(ServiceBasedFrontend::new( config.allow_reset, sysdb, @@ -1805,6 +1932,7 @@ impl Configurable<(FrontendConfig, System)> for ServiceBasedFrontend { executor, max_batch_size, config.default_knn_index, + retry_config.into(), )) } } @@ -1816,7 +1944,10 @@ mod tests { use chroma_types::Collection; use uuid::Uuid; - use crate::server::CreateCollectionPayload; + use crate::{ + executor::config::{DistributedExecutorConfig, ExecutorConfig, RetryConfig}, + server::CreateCollectionPayload, + }; use super::*; diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index fa91c122997..5c6698391e9 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -1175,6 +1175,9 @@ impl LogServer { &self, request: Request, ) -> Result, Status> { + println!("(Sanket-temp) Sleeping for 30 secs"); + tokio::time::sleep(std::time::Duration::from_secs(30)).await; + println!("(Sanket-temp) Slept for 30 secs"); let scout_logs = request.into_inner(); let collection_id = Uuid::parse_str(&scout_logs.collection_id) .map(CollectionUuid) diff --git a/rust/log/src/grpc_log.rs b/rust/log/src/grpc_log.rs index 10251faf618..b3c795c4be2 100644 --- a/rust/log/src/grpc_log.rs +++ b/rust/log/src/grpc_log.rs @@ -77,8 +77,8 @@ pub enum GrpcPushLogsError { impl ChromaError for GrpcPushLogsError { fn code(&self) -> ErrorCodes { match self { - GrpcPushLogsError::Backoff => ErrorCodes::AlreadyExists, - GrpcPushLogsError::BackoffCompaction => ErrorCodes::AlreadyExists, + GrpcPushLogsError::Backoff => ErrorCodes::Unavailable, + GrpcPushLogsError::BackoffCompaction => ErrorCodes::Unavailable, GrpcPushLogsError::FailedToPushLogs(_) => ErrorCodes::Internal, GrpcPushLogsError::ConversionError(_) => ErrorCodes::Internal, GrpcPushLogsError::Sealed => ErrorCodes::FailedPrecondition, diff --git a/rust/python_bindings/src/bindings.rs b/rust/python_bindings/src/bindings.rs index a9183ed9d40..0c56534b6f5 100644 --- a/rust/python_bindings/src/bindings.rs +++ b/rust/python_bindings/src/bindings.rs @@ -3,10 +3,8 @@ use chroma_cache::FoyerCacheConfig; use chroma_cli::chroma_cli; use chroma_config::{registry::Registry, Configurable}; use chroma_frontend::{ - executor::config::{ExecutorConfig, LocalExecutorConfig}, - get_collection_with_segments_provider::{ - CacheInvalidationRetryConfig, CollectionsWithSegmentsProviderConfig, - }, + executor::config::{ExecutorConfig, LocalExecutorConfig, RetryConfig}, + get_collection_with_segments_provider::CollectionsWithSegmentsProviderConfig, Frontend, FrontendConfig, }; use chroma_log::config::{LogConfig, SqliteLogConfig}; @@ -102,8 +100,6 @@ impl Bindings { }); let collection_cache_config = CollectionsWithSegmentsProviderConfig { - // No retry to sysdb on local chroma - cache_invalidation_retry_policy: CacheInvalidationRetryConfig::new(0, 0), permitted_parallelism: 32, cache: chroma_cache::CacheConfig::Nop, cache_ttl_secs: 60, @@ -124,6 +120,7 @@ impl Bindings { default_knn_index: knn_index, tenants_to_migrate_immediately: vec![], tenants_to_migrate_immediately_threshold: None, + retry: RetryConfig::default(), }; let frontend = runtime.block_on(async { From f940c0caeeeaceee6278de550db720eeba7cc167 Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Thu, 4 Sep 2025 15:40:21 -0700 Subject: [PATCH 2/5] Retries everywhere --- rust/frontend/src/executor/distributed.rs | 176 ++- rust/frontend/src/executor/local.rs | 13 +- rust/frontend/src/executor/mod.rs | 32 +- .../src/impls/service_based_frontend.rs | 1065 ++++++++++------- rust/types/src/execution/operator.rs | 10 + rust/types/src/execution/plan.rs | 9 + 6 files changed, 877 insertions(+), 428 deletions(-) diff --git a/rust/frontend/src/executor/distributed.rs b/rust/frontend/src/executor/distributed.rs index 2d3e19c30c9..55cf4e2894b 100644 --- a/rust/frontend/src/executor/distributed.rs +++ b/rust/frontend/src/executor/distributed.rs @@ -1,4 +1,7 @@ use std::future::Future; +use std::sync::atomic::AtomicUsize; +use std::sync::Arc; +use std::time::Duration; use super::config; use async_trait::async_trait; @@ -15,6 +18,7 @@ use chroma_memberlist::{ }; use chroma_system::System; use chroma_types::chroma_proto::query_executor_client::QueryExecutorClient; +use chroma_types::plan::PlanToProtoError; use chroma_types::SegmentType; use chroma_types::{ operator::{CountResult, GetResult, KnnBatchResult, SearchResult}, @@ -22,7 +26,10 @@ use chroma_types::{ ExecutorError, }; +use opentelemetry::global; +use opentelemetry::metrics::Counter; use rand::distributions::Distribution; +use tonic::Code; use tonic::Request; use tonic::Status; @@ -46,6 +53,19 @@ pub struct DistributedExecutor { replication_factor: usize, backoff: ExponentialBuilder, client_selection_config: ClientSelectionConfig, + metrics: Arc, +} + +#[derive(Clone, Debug)] +struct Metrics { + fork_retries_counter: Counter, + delete_retries_counter: Counter, + count_retries_counter: Counter, + query_retries_counter: Counter, + get_retries_counter: Counter, + add_retries_counter: Counter, + update_retries_counter: Counter, + upsert_retries_counter: Counter, } #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] @@ -97,11 +117,31 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx let backoff = retry_config.into(); let client_selection_config = config.client_selection_config.clone(); + let meter = global::meter("chroma.executor"); + let fork_retries_counter = meter.u64_counter("fork_retries").build(); + let delete_retries_counter = meter.u64_counter("delete_retries").build(); + let count_retries_counter = meter.u64_counter("count_retries").build(); + let query_retries_counter = meter.u64_counter("query_retries").build(); + let get_retries_counter = meter.u64_counter("get_retries").build(); + let add_retries_counter = meter.u64_counter("add_retries").build(); + let update_retries_counter = meter.u64_counter("update_retries").build(); + let upsert_retries_counter = meter.u64_counter("upsert_retries").build(); + let metrics = Arc::new(Metrics { + fork_retries_counter, + delete_retries_counter, + count_retries_counter, + query_retries_counter, + get_retries_counter, + add_retries_counter, + update_retries_counter, + upsert_retries_counter, + }); Ok(Self { client_assigner, replication_factor: config.replication_factor, backoff, client_selection_config, + metrics, }) } } @@ -119,7 +159,16 @@ impl DistributedExecutor { impl DistributedExecutor { ///////////////////////// Plan Operations ///////////////////////// - pub async fn count(&mut self, plan: Count) -> Result { + pub async fn count( + &mut self, + plan: Count, + replan_closure: F, + ) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { + let retry_count = Arc::new(AtomicUsize::new(0)); let clients = self .client_assigner .clients( @@ -131,8 +180,9 @@ impl DistributedExecutor { .to_string(), ) .map_err(|e| ExecutorError::Internal(e.boxed()))?; - let plan: chroma_types::chroma_proto::CountPlan = plan.clone().try_into()?; + let plan_proto: chroma_types::chroma_proto::CountPlan = plan.clone().try_into()?; let attempt_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let last_error = std::sync::Arc::new(parking_lot::Mutex::new(tonic::Code::Ok)); let config = self.client_selection_config.clone(); let res = { let attempt_count = attempt_count.clone(); @@ -140,14 +190,56 @@ impl DistributedExecutor { let current_attempt = attempt_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let is_retry = current_attempt > 0; + if is_retry { + let last_error_code = *last_error.lock(); + let replan_count = + replan_closure(last_error_code) + .await + .map_err(|e| -> tonic::Status { + Status::new( + e.code().into(), + format!("Failed to replan count {:?}", e), + ) + })?; + let replan_count_proto = + replan_count.try_into().map_err(|e: PlanToProtoError| { + tonic::Status::new( + e.code().into(), + format!("Failed to convert count plan to proto {:?}", e), + ) + })?; + return choose_query_client_weighted(&clients, &config, is_retry)? + .count(Request::new(replan_count_proto)) + .await; + } choose_query_client_weighted(&clients, &config, is_retry)? - .count(Request::new(plan.clone())) + .count(Request::new(plan_proto.clone())) .await }) .retry(self.backoff) .when(is_retryable_error) + .notify(|e, _| { + let mut last_error = last_error.lock(); + *last_error = e.code(); + tracing::info!( + "Retrying count for collection {}, error {:?}", + plan.scan.collection_and_segments.collection.collection_id, + e + ); + retry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }) + .adjust(|e, d| { + if e.code() == Code::NotFound { + return Some(Duration::from_micros(0)); + } + d + }) .await? }; + self.metrics.count_retries_counter.add( + retry_count.load(std::sync::atomic::Ordering::Relaxed) as u64, + &[], + ); Ok(res.into_inner().into()) } @@ -160,6 +252,7 @@ impl DistributedExecutor { F: Fn(tonic::Code) -> Fut, Fut: Future>>, { + let retry_count = Arc::new(AtomicUsize::new(0)); let clients = self .client_assigner .clients( @@ -192,7 +285,7 @@ impl DistributedExecutor { ) })?; return choose_query_client_weighted(&clients, &config, is_retry)? - .get(Request::new(replan_get.clone().try_into()?)) + .get(Request::new(replan_get.try_into()?)) .await; } choose_query_client_weighted(&clients, &config, is_retry)? @@ -204,13 +297,38 @@ impl DistributedExecutor { .notify(|e, _| { let mut last_error = last_error.lock(); *last_error = e.code(); + tracing::info!( + "Retrying get for collection {}, error {:?}", + plan.scan.collection_and_segments.collection.collection_id, + e + ); + retry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }) + .adjust(|e, d| { + if e.code() == Code::NotFound { + return Some(Duration::from_micros(0)); + } + d }) .await? }; + self.metrics.get_retries_counter.add( + retry_count.load(std::sync::atomic::Ordering::Relaxed) as u64, + &[], + ); Ok(res.into_inner().try_into()?) } - pub async fn knn(&mut self, plan: Knn) -> Result { + pub async fn knn( + &mut self, + plan: Knn, + replan_closure: F, + ) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { + let retry_count = Arc::new(AtomicUsize::new(0)); let clients = self .client_assigner .clients( @@ -223,6 +341,7 @@ impl DistributedExecutor { ) .map_err(|e| ExecutorError::Internal(e.boxed()))?; let attempt_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let last_error = std::sync::Arc::new(parking_lot::Mutex::new(tonic::Code::Ok)); let config = self.client_selection_config.clone(); let res = { let attempt_count = attempt_count.clone(); @@ -230,24 +349,49 @@ impl DistributedExecutor { let current_attempt = attempt_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let is_retry = current_attempt > 0; - let r = choose_query_client_weighted(&clients, &config, is_retry)? - .knn(Request::new(plan.clone().try_into()?)) - .await; - if r.is_err() { - println!("(Sanket-temp) Knn query failed with error {:?}", r); + if is_retry { + let last_error_code = *last_error.lock(); + let replan_knn = + replan_closure(last_error_code) + .await + .map_err(|e| -> tonic::Status { + Status::new( + e.code().into(), + format!("Failed to replan knn {:?}", e), + ) + })?; + return choose_query_client_weighted(&clients, &config, is_retry)? + .knn(Request::new(replan_knn.try_into()?)) + .await; } - r + choose_query_client_weighted(&clients, &config, is_retry)? + .knn(Request::new(plan.clone().try_into()?)) + .await }) .retry(self.backoff) .when(is_retryable_error) - .notify(|e, dur| { - println!( - "(Sanket-temp) KNN query failed with error: {}, retrying in {:?}", - e, dur - ) + .notify(|e, _| { + let mut last_error = last_error.lock(); + *last_error = e.code(); + tracing::info!( + "Retrying knn for collection {}, error {:?}", + plan.scan.collection_and_segments.collection.collection_id, + e + ); + retry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }) + .adjust(|e, d| { + if e.code() == Code::NotFound { + return Some(Duration::from_micros(0)); + } + d }) .await? }; + self.metrics.query_retries_counter.add( + retry_count.load(std::sync::atomic::Ordering::Relaxed) as u64, + &[], + ); Ok(res.into_inner().try_into()?) } diff --git a/rust/frontend/src/executor/local.rs b/rust/frontend/src/executor/local.rs index a02db5f8aab..fe6e11c220c 100644 --- a/rust/frontend/src/executor/local.rs +++ b/rust/frontend/src/executor/local.rs @@ -19,6 +19,7 @@ use chroma_types::{ }; use std::{ collections::{HashMap, HashSet}, + future::Future, sync::Arc, }; @@ -54,7 +55,11 @@ impl LocalExecutor { } impl LocalExecutor { - pub async fn count(&mut self, plan: Count) -> Result { + pub async fn count(&mut self, plan: Count, _: F) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { self.try_backfill_collection(&plan.scan.collection_and_segments) .await?; self.metadata_reader @@ -133,7 +138,11 @@ impl LocalExecutor { Ok(result) } - pub async fn knn(&mut self, plan: Knn) -> Result { + pub async fn knn(&mut self, plan: Knn, _: F) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: std::future::Future>>, + { let collection_and_segments = plan.scan.collection_and_segments.clone(); self.try_backfill_collection(&collection_and_segments) .await?; diff --git a/rust/frontend/src/executor/mod.rs b/rust/frontend/src/executor/mod.rs index 0485cf26571..16f95bf8db3 100644 --- a/rust/frontend/src/executor/mod.rs +++ b/rust/frontend/src/executor/mod.rs @@ -23,10 +23,20 @@ pub enum Executor { } impl Executor { - pub async fn count(&mut self, plan: Count) -> Result { + pub async fn count( + &mut self, + plan: Count, + replan_closure: F, + ) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { match self { - Executor::Distributed(distributed_executor) => distributed_executor.count(plan).await, - Executor::Local(local_executor) => local_executor.count(plan).await, + Executor::Distributed(distributed_executor) => { + distributed_executor.count(plan, replan_closure).await + } + Executor::Local(local_executor) => local_executor.count(plan, replan_closure).await, } } pub async fn get( @@ -45,10 +55,20 @@ impl Executor { Executor::Local(local_executor) => local_executor.get(plan, replan_closure).await, } } - pub async fn knn(&mut self, plan: Knn) -> Result { + pub async fn knn( + &mut self, + plan: Knn, + replan_closure: F, + ) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { match self { - Executor::Distributed(distributed_executor) => distributed_executor.knn(plan).await, - Executor::Local(local_executor) => local_executor.knn(plan).await, + Executor::Distributed(distributed_executor) => { + distributed_executor.knn(plan, replan_closure).await + } + Executor::Local(local_executor) => local_executor.knn(plan, replan_closure).await, } } pub async fn search(&mut self, plan: Search) -> Result { diff --git a/rust/frontend/src/impls/service_based_frontend.rs b/rust/frontend/src/impls/service_based_frontend.rs index 16aab185a6c..33cdb7480e7 100644 --- a/rust/frontend/src/impls/service_based_frontend.rs +++ b/rust/frontend/src/impls/service_based_frontend.rs @@ -20,33 +20,36 @@ use chroma_types::{ operator::{Filter, KnnBatch, KnnProjection, Limit, Projection, Scan}, plan::{Count, Get, Knn, Search}, AddCollectionRecordsError, AddCollectionRecordsRequest, AddCollectionRecordsResponse, - Collection, CollectionUuid, CountCollectionsError, CountCollectionsRequest, - CountCollectionsResponse, CountRequest, CountResponse, CreateCollectionError, - CreateCollectionRequest, CreateCollectionResponse, CreateDatabaseError, CreateDatabaseRequest, - CreateDatabaseResponse, CreateTenantError, CreateTenantRequest, CreateTenantResponse, - DeleteCollectionError, DeleteCollectionRecordsError, DeleteCollectionRecordsRequest, - DeleteCollectionRecordsResponse, DeleteCollectionRequest, DeleteDatabaseError, - DeleteDatabaseRequest, DeleteDatabaseResponse, ForkCollectionError, ForkCollectionRequest, - ForkCollectionResponse, GetCollectionByCrnError, GetCollectionByCrnRequest, - GetCollectionByCrnResponse, GetCollectionError, GetCollectionRequest, GetCollectionResponse, - GetCollectionsError, GetDatabaseError, GetDatabaseRequest, GetDatabaseResponse, GetRequest, - GetResponse, GetTenantError, GetTenantRequest, GetTenantResponse, HealthCheckResponse, - HeartbeatError, HeartbeatResponse, Include, KnnIndex, ListCollectionsRequest, - ListCollectionsResponse, ListDatabasesError, ListDatabasesRequest, ListDatabasesResponse, - Operation, OperationRecord, QueryError, QueryRequest, QueryResponse, ResetError, ResetResponse, - SearchRequest, SearchResponse, Segment, SegmentScope, SegmentType, SegmentUuid, - UpdateCollectionError, UpdateCollectionRecordsError, UpdateCollectionRecordsRequest, - UpdateCollectionRecordsResponse, UpdateCollectionRequest, UpdateCollectionResponse, - UpdateTenantError, UpdateTenantRequest, UpdateTenantResponse, UpsertCollectionRecordsError, - UpsertCollectionRecordsRequest, UpsertCollectionRecordsResponse, VectorIndexConfiguration, - Where, + Collection, CollectionAndSegments, CollectionUuid, CountCollectionsError, + CountCollectionsRequest, CountCollectionsResponse, CountRequest, CountResponse, + CreateCollectionError, CreateCollectionRequest, CreateCollectionResponse, CreateDatabaseError, + CreateDatabaseRequest, CreateDatabaseResponse, CreateTenantError, CreateTenantRequest, + CreateTenantResponse, DeleteCollectionError, DeleteCollectionRecordsError, + DeleteCollectionRecordsRequest, DeleteCollectionRecordsResponse, DeleteCollectionRequest, + DeleteDatabaseError, DeleteDatabaseRequest, DeleteDatabaseResponse, ForkCollectionError, + ForkCollectionRequest, ForkCollectionResponse, GetCollectionByCrnError, + GetCollectionByCrnRequest, GetCollectionByCrnResponse, GetCollectionError, + GetCollectionRequest, GetCollectionResponse, GetCollectionsError, GetDatabaseError, + GetDatabaseRequest, GetDatabaseResponse, GetRequest, GetResponse, GetTenantError, + GetTenantRequest, GetTenantResponse, HealthCheckResponse, HeartbeatError, HeartbeatResponse, + Include, KnnIndex, ListCollectionsRequest, ListCollectionsResponse, ListDatabasesError, + ListDatabasesRequest, ListDatabasesResponse, Operation, OperationRecord, QueryError, + QueryRequest, QueryResponse, ResetError, ResetResponse, SearchRequest, SearchResponse, Segment, + SegmentScope, SegmentType, SegmentUuid, UpdateCollectionError, UpdateCollectionRecordsError, + UpdateCollectionRecordsRequest, UpdateCollectionRecordsResponse, UpdateCollectionRequest, + UpdateCollectionResponse, UpdateTenantError, UpdateTenantRequest, UpdateTenantResponse, + UpsertCollectionRecordsError, UpsertCollectionRecordsRequest, UpsertCollectionRecordsResponse, + VectorIndexConfiguration, Where, }; use opentelemetry::global; use opentelemetry::metrics::Counter; use std::collections::HashSet; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + time::Instant, +}; use super::utils::to_records; @@ -54,12 +57,24 @@ use super::utils::to_records; struct Metrics { fork_retries_counter: Counter, delete_retries_counter: Counter, - count_retries_counter: Counter, - query_retries_counter: Counter, - get_retries_counter: Counter, add_retries_counter: Counter, update_retries_counter: Counter, upsert_retries_counter: Counter, + list_db_retries_counter: Counter, + create_db_retries_counter: Counter, + get_db_retries_counter: Counter, + delete_db_retries_counter: Counter, + list_collections_retries_counter: Counter, + count_collections_retries_counter: Counter, + get_collection_retries_counter: Counter, + get_collection_by_crn_retries_counter: Counter, + create_collection_retries_counter: Counter, + update_collection_retries_counter: Counter, + delete_collection_retries_counter: Counter, + get_tenant_retries_counter: Counter, + create_tenant_retries_counter: Counter, + update_tenant_retries_counter: Counter, + get_collection_with_segments_counter: Counter, search_retries_counter: Counter, metering_fork_counter: Counter, metering_read_counter: Counter, @@ -95,9 +110,6 @@ impl ServiceBasedFrontend { let meter = global::meter("chroma"); let fork_retries_counter = meter.u64_counter("fork_retries").build(); let delete_retries_counter = meter.u64_counter("delete_retries").build(); - let count_retries_counter = meter.u64_counter("count_retries").build(); - let query_retries_counter = meter.u64_counter("query_retries").build(); - let get_retries_counter = meter.u64_counter("get_retries").build(); let add_retries_counter = meter.u64_counter("add_retries").build(); let update_retries_counter = meter.u64_counter("update_retries").build(); let upsert_retries_counter = meter.u64_counter("upsert_retries").build(); @@ -106,15 +118,50 @@ impl ServiceBasedFrontend { let metering_read_counter = meter.u64_counter("metering_events_sent.read").with_description("The number of read metering events sent by the frontend to the metering event receiver.").build(); let metering_write_counter = meter.u64_counter("metering_events_sent.write").with_description("The number of write metering events sent by the frontend to the metering event receiver.").build(); let metering_external_read_counter = meter.u64_counter("metering_events_sent.external_read").with_description("The number of external read metering events sent by the frontend to the metering event receiver.").build(); + let list_db_retries_counter = meter.u64_counter("list_database_retries").build(); + let create_db_retries_counter = meter.u64_counter("create_database_retries").build(); + let get_db_retries_counter = meter.u64_counter("get_database_retries").build(); + let delete_db_retries_counter = meter.u64_counter("delete_database_retries").build(); + let list_collections_retries_counter = + meter.u64_counter("list_collections_retries").build(); + let count_collections_retries_counter = + meter.u64_counter("count_collections_retries").build(); + let get_collection_retries_counter = meter.u64_counter("get_collection_retries").build(); + let get_collection_by_crn_retries_counter = + meter.u64_counter("get_collection_by_crn_retries").build(); + let get_tenant_retries_counter = meter.u64_counter("get_tenant_retries").build(); + let create_collection_retries_counter = + meter.u64_counter("create_collection_retries").build(); + let update_collection_retries_counter = + meter.u64_counter("update_collection_retries").build(); + let delete_collection_retries_counter = + meter.u64_counter("delete_collection_retries").build(); + let create_tenant_retries_counter = meter.u64_counter("create_tenant_retries").build(); + let update_tenant_retries_counter = meter.u64_counter("update_tenant_retries").build(); + let get_collection_with_segments_counter = meter + .u64_counter("get_collection_with_segments_retries") + .build(); let metrics = Arc::new(Metrics { fork_retries_counter, delete_retries_counter, - count_retries_counter, - query_retries_counter, - get_retries_counter, add_retries_counter, update_retries_counter, upsert_retries_counter, + get_collection_with_segments_counter, + list_db_retries_counter, + get_db_retries_counter, + list_collections_retries_counter, + count_collections_retries_counter, + get_collection_retries_counter, + get_collection_by_crn_retries_counter, + get_tenant_retries_counter, + create_collection_retries_counter, + update_collection_retries_counter, + create_tenant_retries_counter, + update_tenant_retries_counter, + create_db_retries_counter, + delete_db_retries_counter, + delete_collection_retries_counter, search_retries_counter, metering_fork_counter, metering_read_counter, @@ -262,14 +309,56 @@ impl ServiceBasedFrontend { &mut self, CreateTenantRequest { name, .. }: CreateTenantRequest, ) -> Result { - self.sysdb_client.create_tenant(name).await + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_create_tenant = || { + let mut self_clone = self.clone(); + let tenant_clone = name.clone(); + async move { self_clone.sysdb_client.create_tenant(tenant_clone).await } + }; + let res = retryable_create_tenant + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "create tenant failed with error {:?} for tenant {}. Retrying", + e, + name, + ); + }) + .await?; + self.metrics + .create_tenant_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + Ok(res) } pub async fn get_tenant( &mut self, GetTenantRequest { name, .. }: GetTenantRequest, ) -> Result { - self.sysdb_client.get_tenant(name).await + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_get_tenant = || { + let mut self_clone = self.clone(); + let tenant_clone = name.clone(); + async move { self_clone.sysdb_client.get_tenant(tenant_clone).await } + }; + let res = retryable_get_tenant + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "get tenant failed with error {:?} for tenant {}. Retrying", + e, + name, + ); + }) + .await?; + self.metrics + .get_tenant_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + Ok(res) } pub async fn update_tenant( @@ -280,9 +369,35 @@ impl ServiceBasedFrontend { .. }: UpdateTenantRequest, ) -> Result { - self.sysdb_client - .update_tenant(tenant_id, resource_name) - .await + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_update_tenant = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let resource_clone = resource_name.clone(); + async move { + self_clone + .sysdb_client + .update_tenant(tenant_clone, resource_clone) + .await + } + }; + let res = retryable_update_tenant + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "update tenant failed with error {:?} for tenant {} for resource {}. Retrying", + e, + tenant_id, + resource_name + ); + }) + .await?; + self.metrics + .update_tenant_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + Ok(res) } pub async fn create_database( @@ -294,9 +409,36 @@ impl ServiceBasedFrontend { .. }: CreateDatabaseRequest, ) -> Result { - self.sysdb_client - .create_database(database_id, database_name, tenant_id) - .await + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_create_db = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let db_name_clone = database_name.clone(); + async move { + self_clone + .sysdb_client + .create_database(database_id, db_name_clone, tenant_clone) + .await + } + }; + let res = retryable_create_db + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "create database failed with error {:?} for tenant {}, database name {}, database id {}. Retrying", + e, + tenant_id, + database_id, + database_name + ); + }) + .await; + self.metrics + .create_db_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + res } pub async fn list_databases( @@ -308,9 +450,33 @@ impl ServiceBasedFrontend { .. }: ListDatabasesRequest, ) -> Result { - self.sysdb_client - .list_databases(tenant_id, limit, offset) - .await + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_list_db = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + async move { + self_clone + .sysdb_client + .list_databases(tenant_clone, limit, offset) + .await + } + }; + let res = retryable_list_db + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "list databases failed with error {:?} for tenant {}. Retrying", + e, + tenant_id + ); + }) + .await; + self.metrics + .list_db_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + res } pub async fn get_database( @@ -321,9 +487,35 @@ impl ServiceBasedFrontend { .. }: GetDatabaseRequest, ) -> Result { - self.sysdb_client - .get_database(database_name, tenant_id) - .await + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_get_db = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let db_name_clone = database_name.clone(); + async move { + self_clone + .sysdb_client + .get_database(db_name_clone, tenant_clone) + .await + } + }; + let res = retryable_get_db + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "get database failed with error {:?} for tenant {} database name {}. Retrying", + e, + tenant_id, + database_name + ); + }) + .await; + self.metrics + .get_db_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + res } pub async fn delete_database( @@ -334,9 +526,35 @@ impl ServiceBasedFrontend { .. }: DeleteDatabaseRequest, ) -> Result { - self.sysdb_client - .delete_database(database_name, tenant_id) - .await + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_delete_db = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let db_name_clone = database_name.clone(); + async move { + self_clone + .sysdb_client + .delete_database(db_name_clone, tenant_clone) + .await + } + }; + let res = retryable_delete_db + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "delete database failed with error {:?} for tenant {} database name {}. Retrying", + e, + tenant_id, + database_name + ); + }) + .await; + self.metrics + .delete_db_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + res } pub async fn list_collections( @@ -349,15 +567,41 @@ impl ServiceBasedFrontend { .. }: ListCollectionsRequest, ) -> Result { - self.sysdb_client - .get_collections(GetCollectionsOptions { - tenant: Some(tenant_id.clone()), - database: Some(database_name.clone()), - limit, - offset, - ..Default::default() + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_list_collection = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let db_name_clone = database_name.clone(); + async move { + self_clone + .sysdb_client + .get_collections(GetCollectionsOptions { + tenant: Some(tenant_clone), + database: Some(db_name_clone), + limit, + offset, + ..Default::default() + }) + .await + } + }; + let res = retryable_list_collection + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "list collections failed with error {:?} for tenant {} database name {}. Retrying", + e, + tenant_id, + database_name + ); }) - .await + .await; + self.metrics + .list_collections_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + res } pub async fn count_collections( @@ -368,10 +612,36 @@ impl ServiceBasedFrontend { .. }: CountCollectionsRequest, ) -> Result { - self.sysdb_client - .count_collections(tenant_id, Some(database_name)) - .await - .map(|count| count as u32) + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_count_collection = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let db_name_clone = database_name.clone(); + async move { + self_clone + .sysdb_client + .count_collections(tenant_clone, Some(db_name_clone)) + .await + .map(|count| count as u32) + } + }; + let res = retryable_count_collection + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "count collections failed with error {:?} for tenant {} database name {}. Retrying", + e, + tenant_id, + database_name + ); + }) + .await; + self.metrics + .count_collections_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + res } pub async fn get_collection( @@ -383,37 +653,86 @@ impl ServiceBasedFrontend { .. }: GetCollectionRequest, ) -> Result { - let mut collections = self - .sysdb_client - .get_collections(GetCollectionsOptions { - name: Some(collection_name.clone()), - tenant: Some(tenant_id.clone()), - database: Some(database_name.clone()), - limit: Some(1), - ..Default::default() + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_get_collection = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let db_name_clone = database_name.clone(); + let collection_name_clone = collection_name.clone(); + async move { + let mut collections = self_clone + .sysdb_client + .get_collections(GetCollectionsOptions { + name: Some(collection_name_clone.clone()), + tenant: Some(tenant_clone), + database: Some(db_name_clone), + limit: Some(1), + ..Default::default() + }) + .await + .map_err(|err| Box::new(err) as Box)?; + collections + .pop() + .ok_or(GetCollectionError::NotFound(collection_name_clone)) + } + }; + let res = retryable_get_collection + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "get collection failed with error {:?} for tenant {} database name {}, collection name {}. Retrying", + e, + tenant_id, + database_name, + collection_name, + ); }) - .await - .map_err(|err| Box::new(err) as Box)?; - collections - .pop() - .ok_or(GetCollectionError::NotFound(collection_name)) + .await; + self.metrics + .get_collection_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); + res } pub async fn get_collection_by_crn( &mut self, GetCollectionByCrnRequest { parsed_crn, .. }: GetCollectionByCrnRequest, ) -> Result { - let collection = self - .sysdb_client - .get_collection_by_crn( - parsed_crn.tenant_resource_name.clone(), - parsed_crn.database_name.clone(), - parsed_crn.collection_name.clone(), - ) - .await - .map_err(|err| Box::new(err) as Box)?; + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_get_by_crn = || { + let mut self_clone = self.clone(); + let tenant_clone = parsed_crn.tenant_resource_name.clone(); + let db_name_clone = parsed_crn.database_name.clone(); + let coll_name_clone = parsed_crn.collection_name.clone(); + async move { + self_clone + .sysdb_client + .get_collection_by_crn(tenant_clone, db_name_clone, coll_name_clone) + .await + .map_err(|err| Box::new(err) as Box) + } + }; + let res = retryable_get_by_crn + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "get collection by crn failed with error {:?} for tenant {} database name {}, collection name {}. Retrying", + e, + parsed_crn.tenant_resource_name, + parsed_crn.database_name, + parsed_crn.collection_name, + ); + }) + .await?; + self.metrics + .get_collection_by_crn_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); - Ok(collection) + Ok(res) } pub async fn create_collection( @@ -526,26 +845,52 @@ impl ServiceBasedFrontend { } }; - let collection = self - .sysdb_client - .create_collection( - tenant_id.clone(), - database_name, - collection_id, - name, - segments, - configuration, - metadata, - None, - get_or_create, - ) - .await - .map_err(|err| Box::new(err) as Box)?; + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_create_collection = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let db_name_clone = database_name.clone(); + let coll_name_clone = name.clone(); + let segments_clone = segments.clone(); + let config_clone = configuration.clone(); + let metadata_clone = metadata.clone(); + async move { + self_clone + .sysdb_client + .create_collection( + tenant_clone, + db_name_clone, + collection_id, + coll_name_clone, + segments_clone, + config_clone, + metadata_clone, + None, + get_or_create, + ) + .await + .map_err(|err| Box::new(err) as Box) + } + }; + let res = retryable_create_collection + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "create collection failed with error {:?} for tenant {} database name {}, collection name {}. Retrying", + e, + tenant_id, + database_name, + name, + ); + }) + .await?; self.collections_with_segments_provider .collections_with_segments_cache .remove(&collection_id) .await; - Ok(collection) + Ok(res) } pub async fn update_collection( @@ -558,34 +903,55 @@ impl ServiceBasedFrontend { .. }: UpdateCollectionRequest, ) -> Result { - self.sysdb_client - .update_collection( - collection_id, - new_name, - new_metadata, - None, - new_configuration, - ) - .await - .map_err(|err| Box::new(err) as Box)?; + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_update_collection = || { + let mut self_clone = self.clone(); + let new_name_clone = new_name.clone(); + let new_metadata_clone = new_metadata.clone(); + let new_config_clone = new_configuration.clone(); + async move { + self_clone + .sysdb_client + .update_collection( + collection_id, + new_name_clone, + new_metadata_clone, + None, + new_config_clone, + ) + .await + .map_err(|err| Box::new(err) as Box) + } + }; + retryable_update_collection + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "Update collection failed with error {:?} for collection id {}. Retrying", + e, + collection_id, + ); + }) + .await?; // Invalidate the cache. self.collections_with_segments_provider .collections_with_segments_cache .remove(&collection_id) .await; - + self.metrics + .update_collection_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); Ok(UpdateCollectionResponse {}) } - pub async fn delete_collection( + pub async fn retryable_delete_collection( &mut self, - DeleteCollectionRequest { - tenant_id, - database_name, - collection_name, - .. - }: DeleteCollectionRequest, - ) -> Result { + tenant_id: String, + database_name: String, + collection_name: String, + ) -> Result { let collection = self .get_collection( GetCollectionRequest::try_new( @@ -612,12 +978,52 @@ impl ServiceBasedFrontend { ) .await .map_err(|err| Box::new(err) as Box)?; + + Ok(collection.collection_id) + } + + pub async fn delete_collection( + &mut self, + DeleteCollectionRequest { + tenant_id, + database_name, + collection_name, + .. + }: DeleteCollectionRequest, + ) -> Result { + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_delete_collection = || { + let mut self_clone = self.clone(); + let tenant_clone = tenant_id.clone(); + let db_name_clone = database_name.clone(); + let coll_name_clone = collection_name.clone(); + async move { + self_clone + .retryable_delete_collection(tenant_clone, db_name_clone, coll_name_clone) + .await + } + }; + let id = retryable_delete_collection + .retry(self.retry_policy) + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "delete collection failed with error {:?} for collection name {}, database name {}. Retrying", + e, + collection_name, + database_name, + ); + }) + .await?; // Invalidate the cache. self.collections_with_segments_provider .collections_with_segments_cache - .remove(&collection.collection_id) + .remove(&id) .await; - + self.metrics + .delete_collection_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); Ok(DeleteCollectionRecordsResponse {}) } @@ -697,13 +1103,11 @@ impl ServiceBasedFrontend { ) }) .notify(|_, _| { - let retried = retries.fetch_add(1, Ordering::Relaxed); - if retried > 0 { - tracing::info!( - "Retrying fork() request for collection {}", - request.source_collection_id - ); - } + retries.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "Retrying fork() request for collection {}", + request.source_collection_id + ); }) .await; self.metrics @@ -765,7 +1169,7 @@ impl ServiceBasedFrontend { // Any error other than a backoff is not safe to retry. let res = add_to_retry .retry(self.retry_policy) - .when(|e| matches!(e.code(), ErrorCodes::Unavailable)) + .when(|e| Self::is_retryable(e.code().into())) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -854,7 +1258,7 @@ impl ServiceBasedFrontend { // Any error other than a backoff is not safe to retry. let res = add_to_retry .retry(self.retry_policy) - .when(|e| matches!(e.code(), ErrorCodes::Unavailable)) + .when(|e| Self::is_retryable(e.code().into())) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -948,7 +1352,7 @@ impl ServiceBasedFrontend { // Any error other than a backoff is not safe to retry. let res = add_to_retry .retry(self.retry_policy) - .when(|e| matches!(e.code(), ErrorCodes::Unavailable)) + .when(|e| Self::is_retryable(e.code().into())) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -1073,10 +1477,8 @@ impl ServiceBasedFrontend { let read_event = if let Some(where_clause) = r#where { let collection_and_segments = self - .collections_with_segments_provider - .get_collection_with_segments(collection_id) - .await - .map_err(|err| Box::new(err) as Box)?; + .retryable_get_collection_with_segments(collection_id) + .await?; let latest_collection_logical_size_bytes = collection_and_segments .collection .size_bytes_post_compaction; @@ -1108,22 +1510,22 @@ impl ServiceBasedFrontend { .executor .get(get_plan.clone(), |code: tonic::Code| { let mut provider = self.collections_with_segments_provider.clone(); - let mut get_plan2 = get_plan.clone(); + let mut replan_get = get_plan.clone(); async move { if code == tonic::Code::NotFound { provider .collections_with_segments_cache .remove(&collection_id) .await; + let collection_and_segments = provider + .get_collection_with_segments(collection_id) + .await + .map_err(|err| Box::new(err) as Box)?; + replan_get.scan = Scan { + collection_and_segments, + }; } - let collection_and_segments = provider - .get_collection_with_segments(collection_id) - .await - .map_err(|err| Box::new(err) as Box)?; - get_plan2.scan = Scan { - collection_and_segments, - }; - Ok(get_plan2) + Ok(replan_get) } }) .await?; @@ -1185,59 +1587,8 @@ impl ServiceBasedFrontend { &mut self, request: DeleteCollectionRecordsRequest, ) -> Result { - let retries = Arc::new(AtomicUsize::new(0)); - let retryable_get_records_to_delete = || { - let mut self_clone = self.clone(); - let request_clone = request.clone(); - let cache_clone = self - .collections_with_segments_provider - .collections_with_segments_cache - .clone(); - async move { - let res = self_clone - .retryable_get_records_to_delete(request_clone) - .await; - match res { - Ok(res) => Ok(res), - Err(e) => { - if e.code() == ErrorCodes::NotFound { - tracing::info!( - "Invalidating cache for collection {}", - request.collection_id - ); - cache_clone.remove(&request.collection_id).await; - } - Err(e) - } - } - } - }; - let records_to_delete = retryable_get_records_to_delete - .retry(self.retry_policy) - // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| { - matches!( - e.code(), - ErrorCodes::NotFound | ErrorCodes::Unknown | ErrorCodes::Unavailable - ) - }) - .notify(|_, _| { - let retried = retries.fetch_add(1, Ordering::Relaxed); - if retried > 0 { - tracing::info!( - "Retrying get records to delete request for collection {}", - request.collection_id - ); - } - }) - .adjust(|e, d| { - if e.code() == ErrorCodes::NotFound { - // Retry immediately for cache invalidation - Some(Duration::from_micros(0)) - } else { - d - } - }) + let records_to_delete = self + .retryable_get_records_to_delete(request.clone()) .await?; let retries = Arc::new(AtomicUsize::new(0)); @@ -1260,7 +1611,7 @@ impl ServiceBasedFrontend { let res = retryable_push_delete_logs .retry(self.retry_policy) // NOTE: It's not safe to retry adds to the log except when it is unavailable. - .when(|e| matches!(e.code(), ErrorCodes::Unavailable)) + .when(|e| Self::is_retryable(e.code().into())) .notify(|_, _| { let retried = retries.fetch_add(1, Ordering::Relaxed); if retried > 0 { @@ -1283,19 +1634,37 @@ impl ServiceBasedFrontend { CountRequest { collection_id, .. }: CountRequest, ) -> Result { let collection_and_segments = self - .collections_with_segments_provider - .get_collection_with_segments(collection_id) - .await - .map_err(|err| Box::new(err) as Box)?; + .retryable_get_collection_with_segments(collection_id) + .await?; let latest_collection_logical_size_bytes = collection_and_segments .collection .size_bytes_post_compaction; + let count_plan = Count { + scan: Scan { + collection_and_segments, + }, + }; let count_result = self .executor - .count(Count { - scan: Scan { - collection_and_segments, - }, + .count(count_plan.clone(), |code: tonic::Code| { + let mut provider = self.collections_with_segments_provider.clone(); + let mut count_replanned = count_plan.clone(); + async move { + if code == tonic::Code::NotFound { + provider + .collections_with_segments_cache + .remove(&collection_id) + .await; + let collection_and_segments = provider + .get_collection_with_segments(collection_id) + .await + .map_err(|err| Box::new(err) as Box)?; + count_replanned.scan = Scan { + collection_and_segments, + }; + } + Ok(count_replanned) + } }) .await?; let return_bytes = count_result.size_bytes(); @@ -1340,61 +1709,44 @@ impl ServiceBasedFrontend { } pub async fn count(&mut self, request: CountRequest) -> Result { - let retries = Arc::new(AtomicUsize::new(0)); - let count_to_retry = || { + self.retryable_count(request).await + } + + fn is_retryable(code: tonic::Code) -> bool { + code == tonic::Code::Unavailable || code == tonic::Code::Unknown + } + + async fn retryable_get_collection_with_segments( + &mut self, + collection_id: CollectionUuid, + ) -> Result> { + let retry_count = Arc::new(AtomicUsize::new(0)); + let retryable_get_collection = || { let mut self_clone = self.clone(); - let request_clone = request.clone(); - let cache_clone = self - .collections_with_segments_provider - .collections_with_segments_cache - .clone(); async move { - let res = self_clone.retryable_count(request_clone).await; - match res { - Ok(res) => Ok(res), - Err(e) => { - if e.code() == ErrorCodes::NotFound { - tracing::info!( - "Invalidating cache for collection {}", - request.collection_id - ); - cache_clone.remove(&request.collection_id).await; - } - Err(e) - } - } + self_clone + .collections_with_segments_provider + .get_collection_with_segments(collection_id) + .await + .map_err(|err| Box::new(err) as Box) } }; - let res = count_to_retry + + let res = retryable_get_collection .retry(self.retry_policy) - // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| { - matches!( - e.code(), - ErrorCodes::NotFound | ErrorCodes::Unknown | ErrorCodes::Unavailable - ) - }) - .notify(|_, _| { - let retried = retries.fetch_add(1, Ordering::Relaxed); - if retried > 0 { - tracing::info!( - "Retrying count() request for collection {}", - request.collection_id - ); - } - }) - .adjust(|e, d| { - if e.code() == ErrorCodes::NotFound { - // Retry immediately for cache invalidation - Some(Duration::from_micros(0)) - } else { - d - } + .when(|e| Self::is_retryable(e.code().into())) + .notify(|e, _| { + retry_count.fetch_add(1, Ordering::Relaxed); + tracing::info!( + "get collection with segments failed with error {:?} for collection {}. Retrying", + e, + collection_id + ); }) .await; self.metrics - .count_retries_counter - .add(retries.load(Ordering::Relaxed) as u64, &[]); + .get_collection_with_segments_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); res } @@ -1411,10 +1763,8 @@ impl ServiceBasedFrontend { }: GetRequest, ) -> Result { let collection_and_segments = self - .collections_with_segments_provider - .get_collection_with_segments(collection_id) - .await - .map_err(|err| Box::new(err) as Box)?; + .retryable_get_collection_with_segments(collection_id) + .await?; let latest_collection_logical_size_bytes = collection_and_segments .collection .size_bytes_post_compaction; @@ -1447,22 +1797,22 @@ impl ServiceBasedFrontend { .executor .get(get_plan.clone(), |code: tonic::Code| { let mut provider = self.collections_with_segments_provider.clone(); - let mut get_plan2 = get_plan.clone(); + let mut get_replanned = get_plan.clone(); async move { if code == tonic::Code::NotFound { provider .collections_with_segments_cache .remove(&collection_id) .await; + let collection_and_segments = provider + .get_collection_with_segments(collection_id) + .await + .map_err(|err| Box::new(err) as Box)?; + get_replanned.scan = Scan { + collection_and_segments, + }; } - let collection_and_segments = provider - .get_collection_with_segments(collection_id) - .await - .map_err(|err| Box::new(err) as Box)?; - get_plan2.scan = Scan { - collection_and_segments, - }; - Ok(get_plan2) + Ok(get_replanned) } }) .await?; @@ -1509,62 +1859,7 @@ impl ServiceBasedFrontend { } pub async fn get(&mut self, request: GetRequest) -> Result { - let retries = Arc::new(AtomicUsize::new(0)); - let get_to_retry = || { - let mut self_clone = self.clone(); - let request_clone = request.clone(); - let cache_clone = self - .collections_with_segments_provider - .collections_with_segments_cache - .clone(); - async move { - let res = self_clone.retryable_get(request_clone).await; - match res { - Ok(res) => Ok(res), - Err(e) => { - if e.code() == ErrorCodes::NotFound { - tracing::info!( - "Invalidating cache for collection {}", - request.collection_id - ); - cache_clone.remove(&request.collection_id).await; - } - Err(e) - } - } - } - }; - let res = get_to_retry - .retry(self.retry_policy) - // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| { - matches!( - e.code(), - ErrorCodes::NotFound | ErrorCodes::Unknown | ErrorCodes::Unavailable - ) - }) - .notify(|_, _| { - let retried = retries.fetch_add(1, Ordering::Relaxed); - if retried > 0 { - tracing::info!( - "Retrying get() request for collection {}", - request.collection_id - ); - } - }) - .adjust(|e, d| { - if e.code() == ErrorCodes::NotFound { - // Retry immediately for cache invalidation - Some(Duration::from_micros(0)) - } else { - d - } - }) - .await; - self.metrics - .get_retries_counter - .add(retries.load(Ordering::Relaxed) as u64, &[]); - res + self.retryable_get(request).await } async fn retryable_query( @@ -1580,10 +1875,8 @@ impl ServiceBasedFrontend { }: QueryRequest, ) -> Result { let collection_and_segments = self - .collections_with_segments_provider - .get_collection_with_segments(collection_id) - .await - .map_err(|err| Box::new(err) as Box)?; + .retryable_get_collection_with_segments(collection_id) + .await?; let latest_collection_logical_size_bytes = collection_and_segments .collection .size_bytes_post_compaction; @@ -1596,30 +1889,50 @@ impl ServiceBasedFrontend { .map(Where::fts_query_length) .unwrap_or_default(); let query_embedding_count = embeddings.len() as u64; + let knn_plan = Knn { + scan: Scan { + collection_and_segments, + }, + filter: Filter { + query_ids: ids, + where_clause: r#where, + }, + knn: KnnBatch { + embeddings, + fetch: n_results, + }, + proj: KnnProjection { + projection: Projection { + document: include.0.contains(&Include::Document), + embedding: include.0.contains(&Include::Embedding), + // If URI is requested, metadata is also requested so we can extract the URI. + metadata: (include.0.contains(&Include::Metadata) + || include.0.contains(&Include::Uri)), + }, + distance: include.0.contains(&Include::Distance), + }, + }; let query_result = self .executor - .knn(Knn { - scan: Scan { - collection_and_segments, - }, - filter: Filter { - query_ids: ids, - where_clause: r#where, - }, - knn: KnnBatch { - embeddings, - fetch: n_results, - }, - proj: KnnProjection { - projection: Projection { - document: include.0.contains(&Include::Document), - embedding: include.0.contains(&Include::Embedding), - // If URI is requested, metadata is also requested so we can extract the URI. - metadata: (include.0.contains(&Include::Metadata) - || include.0.contains(&Include::Uri)), - }, - distance: include.0.contains(&Include::Distance), - }, + .knn(knn_plan.clone(), |code: tonic::Code| { + let mut provider = self.collections_with_segments_provider.clone(); + let mut knn_replanned = knn_plan.clone(); + async move { + if code == tonic::Code::NotFound { + provider + .collections_with_segments_cache + .remove(&collection_id) + .await; + let collection_and_segments = provider + .get_collection_with_segments(collection_id) + .await + .map_err(|err| Box::new(err) as Box)?; + knn_replanned.scan = Scan { + collection_and_segments, + }; + } + Ok(knn_replanned) + } }) .await?; let return_bytes = query_result.size_bytes(); @@ -1673,63 +1986,7 @@ impl ServiceBasedFrontend { ) .await .map_err(|err| err.boxed())?; - - let retries = Arc::new(AtomicUsize::new(0)); - let query_to_retry = || { - let mut self_clone = self.clone(); - let request_clone = request.clone(); - let cache_clone = self - .collections_with_segments_provider - .collections_with_segments_cache - .clone(); - async move { - let res = self_clone.retryable_query(request_clone).await; - match res { - Ok(res) => Ok(res), - Err(e) => { - if e.code() == ErrorCodes::NotFound { - tracing::info!( - "Invalidating cache for collection {}", - request.collection_id - ); - cache_clone.remove(&request.collection_id).await; - } - Err(e) - } - } - } - }; - let res = query_to_retry - .retry(self.retry_policy) - // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| { - matches!( - e.code(), - ErrorCodes::NotFound | ErrorCodes::Unknown | ErrorCodes::Unavailable - ) - }) - .notify(|_, _| { - let retried = retries.fetch_add(1, Ordering::Relaxed); - if retried > 0 { - tracing::info!( - "Retrying query() request for collection {}", - request.collection_id - ); - } - }) - .adjust(|e, d| { - if e.code() == ErrorCodes::NotFound { - // Retry immediately for cache invalidation - Some(Duration::from_micros(0)) - } else { - d - } - }) - .await; - self.metrics - .query_retries_counter - .add(retries.load(Ordering::Relaxed) as u64, &[]); - res + self.retryable_query(request).await } pub async fn retryable_search( diff --git a/rust/types/src/execution/operator.rs b/rust/types/src/execution/operator.rs index a635ddaca30..63d2007fbb4 100644 --- a/rust/types/src/execution/operator.rs +++ b/rust/types/src/execution/operator.rs @@ -1,3 +1,5 @@ +use chroma_error::{ChromaError, ErrorCodes}; +use core::mem::discriminant; use serde::{de::Error, Deserialize, Deserializer, Serialize}; use serde_json::Value; use std::{ @@ -59,6 +61,14 @@ pub enum ScanToProtoError { CollectionToProto(#[from] crate::CollectionToProtoError), } +impl ChromaError for ScanToProtoError { + fn code(&self) -> ErrorCodes { + match self { + ScanToProtoError::CollectionToProto(e) => e.code(), + } + } +} + impl TryFrom for chroma_proto::ScanOperator { type Error = ScanToProtoError; diff --git a/rust/types/src/execution/plan.rs b/rust/types/src/execution/plan.rs index 070a057f23c..326f3fa2ea5 100644 --- a/rust/types/src/execution/plan.rs +++ b/rust/types/src/execution/plan.rs @@ -5,6 +5,7 @@ use super::{ }, }; use crate::{chroma_proto, validators::validate_rank}; +use chroma_error::{ChromaError, ErrorCodes}; use serde::{Deserialize, Serialize}; use thiserror::Error; use utoipa::{ @@ -22,6 +23,14 @@ pub enum PlanToProtoError { Scan(#[from] ScanToProtoError), } +impl ChromaError for PlanToProtoError { + fn code(&self) -> ErrorCodes { + match self { + PlanToProtoError::Scan(e) => e.code(), + } + } +} + /// The `Count` plan shoud ouutput the total number of records in the collection #[derive(Clone)] pub struct Count { From d240bb321417f9b71f99fcf1dc5929065a4f6a6c Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Thu, 4 Sep 2025 15:49:17 -0700 Subject: [PATCH 3/5] Delete files --- decode_log.py | 26 ---------------- deserialize_version_file.py | 61 ------------------------------------- 2 files changed, 87 deletions(-) delete mode 100644 decode_log.py delete mode 100644 deserialize_version_file.py diff --git a/decode_log.py b/decode_log.py deleted file mode 100644 index a60022e90ae..00000000000 --- a/decode_log.py +++ /dev/null @@ -1,26 +0,0 @@ -# import pandas as pd -# import chromadb.proto.chroma_pb2 as chroma_pb2 - -# path = '/Users/sanketkedia/Downloads/FragmentSeqNo=0000000000000001.parquet' -# df = pd.read_parquet(path) -# body = df['body'].iloc[0] -# log_record = chroma_pb2.OperationRecord() -# err = log_record.ParseFromString(body) -# print(log_record) - - - -import sys -from chromadb.proto.chroma_pb2 import Operation, OperationRecord - -hex_line = "0a046665667712001a1b0a190a0f6368726f6d613a646f63756d656e7412060a0465667765" - -def decode_file(): - data = bytes.fromhex(hex_line) - # Decode data to OperationRecord and print - record = OperationRecord.FromString(data) - print(record) - print(record.operation) - -if __name__ == "__main__": - decode_file() \ No newline at end of file diff --git a/deserialize_version_file.py b/deserialize_version_file.py deleted file mode 100644 index b74685bdf27..00000000000 --- a/deserialize_version_file.py +++ /dev/null @@ -1,61 +0,0 @@ -import chromadb.proto.coordinator_pb2 as coordinator_pb2 - -# Read the serialized file -with open('/Users/sanketkedia/Downloads/000038_5415a1ef-4efc-4503-8a24-ef144a9d1911_flush', 'rb') as f: - serialized_data = f.read() - -# Deserialize the data -collection_version_file = coordinator_pb2.CollectionVersionFile() -collection_version_file.ParseFromString(serialized_data) - -if not collection_version_file.HasField('version_history'): - print("No version history found in the file.") - exit(0) - -versions = collection_version_file.version_history.versions -print(f"Found {len(versions)} versions in the file.") -print(collection_version_file) - -exit(0) - -output_file_path = "/Users/sanketkedia/Documents/000310_166f7650-8cd8-452d-904b-6f92ca5fd62c_flush" - - # Track the last non-empty segment_info we've seen -last_non_empty_segment_info = None -last_non_empty_version = None - -def has_non_empty_segment_info(version_info): - """Check if a version has non-empty segment_info.""" - if not version_info.HasField('segment_info'): - return False - - segment_info = version_info.segment_info - return len(segment_info.segment_compaction_info) > 0 - -# Process each version -for i, version in enumerate(versions): - version_num = version.version - - if has_non_empty_segment_info(version): - # This version has non-empty segment_info, update our tracker - last_non_empty_segment_info = version.segment_info - last_non_empty_version = version_num - print(f"Version {version_num}: Has non-empty segment_info with {len(version.segment_info.segment_compaction_info)} segments") - else: - # This version has empty segment_info - if last_non_empty_segment_info is not None: - # We have a previous non-empty segment_info to use - print(f"Version {version_num}: Patching with segment_info from version {last_non_empty_version}") - - # Clear any existing segment_info and copy from the last non-empty one - version.segment_info.CopyFrom(last_non_empty_segment_info) - else: - print(f"Version {version_num}: No previous non-empty segment_info found, keeping empty") - -print(f"\nWriting patched file to: {output_file_path}") -serialized_output = collection_version_file.SerializeToString() - -with open(output_file_path, 'wb') as f: - f.write(serialized_output) - -print("Patching complete!") \ No newline at end of file From 4c24011d044e84fd079d54676d91aae1b9e2cbdc Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Thu, 4 Sep 2025 16:31:10 -0700 Subject: [PATCH 4/5] Rebase --- rust/frontend/src/executor/distributed.rs | 50 ++++++++++- rust/frontend/src/executor/mod.rs | 14 ++- .../src/impls/service_based_frontend.rs | 88 ++++++++----------- 3 files changed, 96 insertions(+), 56 deletions(-) diff --git a/rust/frontend/src/executor/distributed.rs b/rust/frontend/src/executor/distributed.rs index 55cf4e2894b..3c45c6e229a 100644 --- a/rust/frontend/src/executor/distributed.rs +++ b/rust/frontend/src/executor/distributed.rs @@ -62,6 +62,7 @@ struct Metrics { delete_retries_counter: Counter, count_retries_counter: Counter, query_retries_counter: Counter, + search_retries_counter: Counter, get_retries_counter: Counter, add_retries_counter: Counter, update_retries_counter: Counter, @@ -122,6 +123,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx let delete_retries_counter = meter.u64_counter("delete_retries").build(); let count_retries_counter = meter.u64_counter("count_retries").build(); let query_retries_counter = meter.u64_counter("query_retries").build(); + let search_retries_counter = meter.u64_counter("search_retries").build(); let get_retries_counter = meter.u64_counter("get_retries").build(); let add_retries_counter = meter.u64_counter("add_retries").build(); let update_retries_counter = meter.u64_counter("update_retries").build(); @@ -135,6 +137,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx add_retries_counter, update_retries_counter, upsert_retries_counter, + search_retries_counter, }); Ok(Self { client_assigner, @@ -395,7 +398,16 @@ impl DistributedExecutor { Ok(res.into_inner().try_into()?) } - pub async fn search(&mut self, plan: Search) -> Result { + pub async fn search( + &mut self, + plan: Search, + replan_closure: F, + ) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { + let retry_count = Arc::new(AtomicUsize::new(0)); // Get the collection ID from the plan let collection_id = &plan .scan @@ -413,6 +425,7 @@ impl DistributedExecutor { let request: chroma_types::chroma_proto::SearchPlan = plan.try_into()?; let attempt_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0)); + let last_error = std::sync::Arc::new(parking_lot::Mutex::new(tonic::Code::Ok)); let config = self.client_selection_config.clone(); let res = { let attempt_count = attempt_count.clone(); @@ -420,14 +433,49 @@ impl DistributedExecutor { let current_attempt = attempt_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); let is_retry = current_attempt > 0; + if is_retry { + let last_error_code = *last_error.lock(); + let replan_search = + replan_closure(last_error_code) + .await + .map_err(|e| -> tonic::Status { + Status::new( + e.code().into(), + format!("Failed to replan search {:?}", e), + ) + })?; + return choose_query_client_weighted(&clients, &config, is_retry)? + .search(Request::new(replan_search.try_into()?)) + .await; + } choose_query_client_weighted(&clients, &config, is_retry)? .search(Request::new(request.clone())) .await }) .retry(self.backoff) .when(is_retryable_error) + .notify(|e, _| { + let mut last_error = last_error.lock(); + *last_error = e.code(); + tracing::info!( + "Retrying search for collection {}, error {:?}", + collection_id, + e + ); + retry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + }) + .adjust(|e, d| { + if e.code() == Code::NotFound { + return Some(Duration::from_micros(0)); + } + d + }) .await? }; + self.metrics.search_retries_counter.add( + retry_count.load(std::sync::atomic::Ordering::Relaxed) as u64, + &[], + ); Ok(res.into_inner().try_into()?) } diff --git a/rust/frontend/src/executor/mod.rs b/rust/frontend/src/executor/mod.rs index 16f95bf8db3..ce71dfe863f 100644 --- a/rust/frontend/src/executor/mod.rs +++ b/rust/frontend/src/executor/mod.rs @@ -71,9 +71,19 @@ impl Executor { Executor::Local(local_executor) => local_executor.knn(plan, replan_closure).await, } } - pub async fn search(&mut self, plan: Search) -> Result { + pub async fn search( + &mut self, + plan: Search, + replan_closure: F, + ) -> Result + where + F: Fn(tonic::Code) -> Fut, + Fut: Future>>, + { match self { - Executor::Distributed(distributed_executor) => distributed_executor.search(plan).await, + Executor::Distributed(distributed_executor) => { + distributed_executor.search(plan, replan_closure).await + } Executor::Local(local_executor) => local_executor.search(plan).await, } } diff --git a/rust/frontend/src/impls/service_based_frontend.rs b/rust/frontend/src/impls/service_based_frontend.rs index 33cdb7480e7..a93cddff83e 100644 --- a/rust/frontend/src/impls/service_based_frontend.rs +++ b/rust/frontend/src/impls/service_based_frontend.rs @@ -75,7 +75,6 @@ struct Metrics { create_tenant_retries_counter: Counter, update_tenant_retries_counter: Counter, get_collection_with_segments_counter: Counter, - search_retries_counter: Counter, metering_fork_counter: Counter, metering_read_counter: Counter, metering_write_counter: Counter, @@ -113,7 +112,6 @@ impl ServiceBasedFrontend { let add_retries_counter = meter.u64_counter("add_retries").build(); let update_retries_counter = meter.u64_counter("update_retries").build(); let upsert_retries_counter = meter.u64_counter("upsert_retries").build(); - let search_retries_counter = meter.u64_counter("search_retries").build(); let metering_fork_counter = meter.u64_counter("metering_events_sent.fork").with_description("The number of fork metering events sent by the frontend to the metering event receiver.").build(); let metering_read_counter = meter.u64_counter("metering_events_sent.read").with_description("The number of read metering events sent by the frontend to the metering event receiver.").build(); let metering_write_counter = meter.u64_counter("metering_events_sent.write").with_description("The number of write metering events sent by the frontend to the metering event receiver.").build(); @@ -162,7 +160,6 @@ impl ServiceBasedFrontend { create_db_retries_counter, delete_db_retries_counter, delete_collection_retries_counter, - search_retries_counter, metering_fork_counter, metering_read_counter, metering_write_counter, @@ -1577,7 +1574,9 @@ impl ServiceBasedFrontend { }; if let Some(event) = read_event { - event.submit().await; + if let Ok(()) = event.submit().await { + self.metrics.metering_read_counter.add(1, &[]); + } } Ok(records) @@ -1996,10 +1995,8 @@ impl ServiceBasedFrontend { // TODO: The dispatch logic is mostly the same for count/get/query/search, we should consider unifying them // Get collection and segments once for all queries let collection_and_segments = self - .collections_with_segments_provider - .get_collection_with_segments(request.collection_id) - .await - .map_err(|err| QueryError::Other(Box::new(err) as Box))?; + .retryable_get_collection_with_segments(request.collection_id) + .await?; let latest_collection_logical_size_bytes = collection_and_segments .collection @@ -2032,8 +2029,36 @@ impl ServiceBasedFrontend { payloads: request.searches, }; + let collection_id = search_plan + .scan + .collection_and_segments + .collection + .collection_id; + // Execute the single search plan using the executor - let result = self.executor.search(search_plan).await?; + let result = self + .executor + .search(search_plan.clone(), |code: tonic::Code| { + let mut provider = self.collections_with_segments_provider.clone(); + let mut search_replanned = search_plan.clone(); + async move { + if code == tonic::Code::NotFound { + provider + .collections_with_segments_cache + .remove(&collection_id) + .await; + let collection_and_segments = provider + .get_collection_with_segments(collection_id) + .await + .map_err(|err| Box::new(err) as Box)?; + search_replanned.scan = Scan { + collection_and_segments, + }; + } + Ok(search_replanned) + } + }) + .await?; // Calculate return bytes (approximate size of the response) let return_bytes = result.size_bytes(); @@ -2079,50 +2104,7 @@ impl ServiceBasedFrontend { } pub async fn search(&mut self, request: SearchRequest) -> Result { - // TODO: The retry logic is mostly the same for count/get/query/search, we should consider unifying them - let retries = Arc::new(AtomicUsize::new(0)); - let search_to_retry = || { - let mut self_clone = self.clone(); - let request_clone = request.clone(); - let cache_clone = self - .collections_with_segments_provider - .collections_with_segments_cache - .clone(); - async move { - let res = self_clone.retryable_search(request_clone).await; - match res { - Ok(res) => Ok(res), - Err(e) => { - if e.code() == ErrorCodes::NotFound { - tracing::info!( - "Invalidating cache for collection {}", - request.collection_id - ); - cache_clone.remove(&request.collection_id).await; - } - Err(e) - } - } - } - }; - let res = search_to_retry - .retry(self.collections_with_segments_provider.get_retry_backoff()) - // NOTE: Transport level errors will manifest as unknown errors, and they should also be retried - .when(|e| matches!(e.code(), ErrorCodes::NotFound | ErrorCodes::Unknown)) - .notify(|_, _| { - let retried = retries.fetch_add(1, Ordering::Relaxed); - if retried > 0 { - tracing::info!( - "Retrying search() request for collection {}", - request.collection_id - ); - } - }) - .await; - self.metrics - .search_retries_counter - .add(retries.load(Ordering::Relaxed) as u64, &[]); - res + self.retryable_search(request).await } pub async fn healthcheck(&self) -> HealthCheckResponse { From e24519cb46f308f79e60177c32a8e827084fa0de Mon Sep 17 00:00:00 2001 From: Sanket Kedia Date: Thu, 4 Sep 2025 16:32:55 -0700 Subject: [PATCH 5/5] Remove sleep --- rust/frontend/src/impls/service_based_frontend.rs | 8 ++++---- rust/log-service/src/lib.rs | 3 --- rust/types/src/execution/operator.rs | 1 - 3 files changed, 4 insertions(+), 8 deletions(-) diff --git a/rust/frontend/src/impls/service_based_frontend.rs b/rust/frontend/src/impls/service_based_frontend.rs index a93cddff83e..53069db7706 100644 --- a/rust/frontend/src/impls/service_based_frontend.rs +++ b/rust/frontend/src/impls/service_based_frontend.rs @@ -887,6 +887,9 @@ impl ServiceBasedFrontend { .collections_with_segments_cache .remove(&collection_id) .await; + self.metrics + .create_collection_retries_counter + .add(retry_count.load(Ordering::Relaxed) as u64, &[]); Ok(res) } @@ -2183,10 +2186,7 @@ mod tests { use chroma_types::Collection; use uuid::Uuid; - use crate::{ - executor::config::{DistributedExecutorConfig, ExecutorConfig, RetryConfig}, - server::CreateCollectionPayload, - }; + use crate::server::CreateCollectionPayload; use super::*; diff --git a/rust/log-service/src/lib.rs b/rust/log-service/src/lib.rs index 5c6698391e9..fa91c122997 100644 --- a/rust/log-service/src/lib.rs +++ b/rust/log-service/src/lib.rs @@ -1175,9 +1175,6 @@ impl LogServer { &self, request: Request, ) -> Result, Status> { - println!("(Sanket-temp) Sleeping for 30 secs"); - tokio::time::sleep(std::time::Duration::from_secs(30)).await; - println!("(Sanket-temp) Slept for 30 secs"); let scout_logs = request.into_inner(); let collection_id = Uuid::parse_str(&scout_logs.collection_id) .map(CollectionUuid) diff --git a/rust/types/src/execution/operator.rs b/rust/types/src/execution/operator.rs index 63d2007fbb4..43e5f1b991a 100644 --- a/rust/types/src/execution/operator.rs +++ b/rust/types/src/execution/operator.rs @@ -1,5 +1,4 @@ use chroma_error::{ChromaError, ErrorCodes}; -use core::mem::discriminant; use serde::{de::Error, Deserialize, Deserializer, Serialize}; use serde_json::Value; use std::{