Skip to content

Commit 73e64da

Browse files
committed
Rebase
1 parent 36ec7d8 commit 73e64da

File tree

3 files changed

+119
-57
lines changed

3 files changed

+119
-57
lines changed

rust/frontend/src/executor/distributed.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ struct Metrics {
6262
delete_retries_counter: Counter<u64>,
6363
count_retries_counter: Counter<u64>,
6464
query_retries_counter: Counter<u64>,
65+
search_retries_counter: Counter<u64>,
6566
get_retries_counter: Counter<u64>,
6667
add_retries_counter: Counter<u64>,
6768
update_retries_counter: Counter<u64>,
@@ -122,6 +123,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
122123
let delete_retries_counter = meter.u64_counter("delete_retries").build();
123124
let count_retries_counter = meter.u64_counter("count_retries").build();
124125
let query_retries_counter = meter.u64_counter("query_retries").build();
126+
let search_retries_counter = meter.u64_counter("search_retries").build();
125127
let get_retries_counter = meter.u64_counter("get_retries").build();
126128
let add_retries_counter = meter.u64_counter("add_retries").build();
127129
let update_retries_counter = meter.u64_counter("update_retries").build();
@@ -135,6 +137,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
135137
add_retries_counter,
136138
update_retries_counter,
137139
upsert_retries_counter,
140+
search_retries_counter,
138141
});
139142
Ok(Self {
140143
client_assigner,
@@ -395,7 +398,16 @@ impl DistributedExecutor {
395398
Ok(res.into_inner().try_into()?)
396399
}
397400

398-
pub async fn search(&mut self, plan: Search) -> Result<SearchResult, ExecutorError> {
401+
pub async fn search<F, Fut>(
402+
&mut self,
403+
plan: Search,
404+
replan_closure: F,
405+
) -> Result<SearchResult, ExecutorError>
406+
where
407+
F: Fn(tonic::Code) -> Fut,
408+
Fut: Future<Output = Result<Search, Box<dyn ChromaError>>>,
409+
{
410+
let retry_count = Arc::new(AtomicUsize::new(0));
399411
// Get the collection ID from the plan
400412
let collection_id = &plan
401413
.scan
@@ -413,21 +425,57 @@ impl DistributedExecutor {
413425
let request: chroma_types::chroma_proto::SearchPlan = plan.try_into()?;
414426

415427
let attempt_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
428+
let last_error = std::sync::Arc::new(parking_lot::Mutex::new(tonic::Code::Ok));
416429
let config = self.client_selection_config.clone();
417430
let res = {
418431
let attempt_count = attempt_count.clone();
419432
(|| async {
420433
let current_attempt =
421434
attempt_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
422435
let is_retry = current_attempt > 0;
436+
if is_retry {
437+
let last_error_code = *last_error.lock();
438+
let replan_search =
439+
replan_closure(last_error_code)
440+
.await
441+
.map_err(|e| -> tonic::Status {
442+
Status::new(
443+
e.code().into(),
444+
format!("Failed to replan search {:?}", e),
445+
)
446+
})?;
447+
return choose_query_client_weighted(&clients, &config, is_retry)?
448+
.search(Request::new(replan_search.try_into()?))
449+
.await;
450+
}
423451
choose_query_client_weighted(&clients, &config, is_retry)?
424452
.search(Request::new(request.clone()))
425453
.await
426454
})
427455
.retry(self.backoff)
428456
.when(is_retryable_error)
457+
.notify(|e, _| {
458+
let mut last_error = last_error.lock();
459+
*last_error = e.code();
460+
tracing::info!(
461+
"Retrying search for collection {}, error {:?}",
462+
collection_id,
463+
e
464+
);
465+
retry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
466+
})
467+
.adjust(|e, d| {
468+
if e.code() == Code::NotFound {
469+
return Some(Duration::from_micros(0));
470+
}
471+
d
472+
})
429473
.await?
430474
};
475+
self.metrics.search_retries_counter.add(
476+
retry_count.load(std::sync::atomic::Ordering::Relaxed) as u64,
477+
&[],
478+
);
431479
Ok(res.into_inner().try_into()?)
432480
}
433481

rust/frontend/src/executor/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,19 @@ impl Executor {
7171
Executor::Local(local_executor) => local_executor.knn(plan, replan_closure).await,
7272
}
7373
}
74-
pub async fn search(&mut self, plan: Search) -> Result<SearchResult, ExecutorError> {
74+
pub async fn search<F, Fut>(
75+
&mut self,
76+
plan: Search,
77+
replan_closure: F,
78+
) -> Result<SearchResult, ExecutorError>
79+
where
80+
F: Fn(tonic::Code) -> Fut,
81+
Fut: Future<Output = Result<Search, Box<dyn ChromaError>>>,
82+
{
7583
match self {
76-
Executor::Distributed(distributed_executor) => distributed_executor.search(plan).await,
84+
Executor::Distributed(distributed_executor) => {
85+
distributed_executor.search(plan, replan_closure).await
86+
}
7787
Executor::Local(local_executor) => local_executor.search(plan).await,
7888
}
7989
}

rust/frontend/src/impls/service_based_frontend.rs

Lines changed: 58 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,29 @@ use chroma_sqlite::db::SqliteDb;
1717
use chroma_sysdb::{GetCollectionsOptions, SysDb};
1818
use chroma_system::System;
1919
use chroma_types::{
20-
operator::{Filter, KnnBatch, KnnProjection, Limit, Projection, Scan}, plan::{Count, Get, Knn, Search}, AddCollectionRecordsError, AddCollectionRecordsRequest, AddCollectionRecordsResponse, Collection, CollectionAndSegments, CollectionUuid, CountCollectionsError, CountCollectionsRequest, CountCollectionsResponse, CountRequest, CountResponse, CreateCollectionError, CreateCollectionRequest, CreateCollectionResponse, CreateDatabaseError, CreateDatabaseRequest, CreateDatabaseResponse, CreateTenantError, CreateTenantRequest, CreateTenantResponse, DeleteCollectionError, DeleteCollectionRecordsError, DeleteCollectionRecordsRequest, DeleteCollectionRecordsResponse, DeleteCollectionRequest, DeleteDatabaseError, DeleteDatabaseRequest, DeleteDatabaseResponse, ForkCollectionError, ForkCollectionRequest, ForkCollectionResponse, GetCollectionByCrnError, GetCollectionByCrnRequest, GetCollectionByCrnResponse, GetCollectionError, GetCollectionRequest, GetCollectionResponse, GetCollectionsError, GetDatabaseError, GetDatabaseRequest, GetDatabaseResponse, GetRequest, GetResponse, GetTenantError, GetTenantRequest, GetTenantResponse, HealthCheckResponse, HeartbeatError, HeartbeatResponse, Include, KnnIndex, ListCollectionsRequest, ListCollectionsResponse, ListDatabasesError, ListDatabasesRequest, ListDatabasesResponse, Operation, OperationRecord, QueryError, QueryRequest, QueryResponse, ResetError, ResetResponse, SearchRequest, SearchResponse, Segment, SegmentScope, SegmentType, SegmentUuid, UpdateCollectionError, UpdateCollectionRecordsError, UpdateCollectionRecordsRequest, UpdateCollectionRecordsResponse, UpdateCollectionRequest, UpdateCollectionResponse, UpdateTenantError, UpdateTenantRequest, UpdateTenantResponse, UpsertCollectionRecordsError, UpsertCollectionRecordsRequest, UpsertCollectionRecordsResponse, VectorIndexConfiguration, Where
20+
operator::{Filter, KnnBatch, KnnProjection, Limit, Projection, Scan},
21+
plan::{Count, Get, Knn, Search},
22+
AddCollectionRecordsError, AddCollectionRecordsRequest, AddCollectionRecordsResponse,
23+
Collection, CollectionAndSegments, CollectionUuid, CountCollectionsError,
24+
CountCollectionsRequest, CountCollectionsResponse, CountRequest, CountResponse,
25+
CreateCollectionError, CreateCollectionRequest, CreateCollectionResponse, CreateDatabaseError,
26+
CreateDatabaseRequest, CreateDatabaseResponse, CreateTenantError, CreateTenantRequest,
27+
CreateTenantResponse, DeleteCollectionError, DeleteCollectionRecordsError,
28+
DeleteCollectionRecordsRequest, DeleteCollectionRecordsResponse, DeleteCollectionRequest,
29+
DeleteDatabaseError, DeleteDatabaseRequest, DeleteDatabaseResponse, ForkCollectionError,
30+
ForkCollectionRequest, ForkCollectionResponse, GetCollectionByCrnError,
31+
GetCollectionByCrnRequest, GetCollectionByCrnResponse, GetCollectionError,
32+
GetCollectionRequest, GetCollectionResponse, GetCollectionsError, GetDatabaseError,
33+
GetDatabaseRequest, GetDatabaseResponse, GetRequest, GetResponse, GetTenantError,
34+
GetTenantRequest, GetTenantResponse, HealthCheckResponse, HeartbeatError, HeartbeatResponse,
35+
Include, KnnIndex, ListCollectionsRequest, ListCollectionsResponse, ListDatabasesError,
36+
ListDatabasesRequest, ListDatabasesResponse, Operation, OperationRecord, QueryError,
37+
QueryRequest, QueryResponse, ResetError, ResetResponse, SearchRequest, SearchResponse, Segment,
38+
SegmentScope, SegmentType, SegmentUuid, UpdateCollectionError, UpdateCollectionRecordsError,
39+
UpdateCollectionRecordsRequest, UpdateCollectionRecordsResponse, UpdateCollectionRequest,
40+
UpdateCollectionResponse, UpdateTenantError, UpdateTenantRequest, UpdateTenantResponse,
41+
UpsertCollectionRecordsError, UpsertCollectionRecordsRequest, UpsertCollectionRecordsResponse,
42+
VectorIndexConfiguration, Where,
2143
};
2244
use opentelemetry::global;
2345
use opentelemetry::metrics::Counter;
@@ -53,7 +75,6 @@ struct Metrics {
5375
create_tenant_retries_counter: Counter<u64>,
5476
update_tenant_retries_counter: Counter<u64>,
5577
get_collection_with_segments_counter: Counter<u64>,
56-
search_retries_counter: Counter<u64>,
5778
metering_fork_counter: Counter<u64>,
5879
metering_read_counter: Counter<u64>,
5980
metering_write_counter: Counter<u64>,
@@ -95,7 +116,6 @@ impl ServiceBasedFrontend {
95116
let add_retries_counter = meter.u64_counter("add_retries").build();
96117
let update_retries_counter = meter.u64_counter("update_retries").build();
97118
let upsert_retries_counter = meter.u64_counter("upsert_retries").build();
98-
let search_retries_counter = meter.u64_counter("search_retries").build();
99119
let metering_fork_counter = meter.u64_counter("metering_events_sent.fork").with_description("The number of fork metering events sent by the frontend to the metering event receiver.").build();
100120
let metering_read_counter = meter.u64_counter("metering_events_sent.read").with_description("The number of read metering events sent by the frontend to the metering event receiver.").build();
101121
let metering_write_counter = meter.u64_counter("metering_events_sent.write").with_description("The number of write metering events sent by the frontend to the metering event receiver.").build();
@@ -144,7 +164,6 @@ impl ServiceBasedFrontend {
144164
create_db_retries_counter,
145165
delete_db_retries_counter,
146166
delete_collection_retries_counter,
147-
search_retries_counter,
148167
metering_fork_counter,
149168
metering_read_counter,
150169
metering_write_counter,
@@ -1578,7 +1597,9 @@ impl ServiceBasedFrontend {
15781597
};
15791598

15801599
if let Some(event) = read_event {
1581-
event.submit().await;
1600+
if let Ok(()) = event.submit().await {
1601+
self.metrics.metering_read_counter.add(1, &[]);
1602+
}
15821603
}
15831604

15841605
Ok(records)
@@ -1997,10 +2018,8 @@ impl ServiceBasedFrontend {
19972018
// TODO: The dispatch logic is mostly the same for count/get/query/search, we should consider unifying them
19982019
// Get collection and segments once for all queries
19992020
let collection_and_segments = self
2000-
.collections_with_segments_provider
2001-
.get_collection_with_segments(request.collection_id)
2002-
.await
2003-
.map_err(|err| QueryError::Other(Box::new(err) as Box<dyn ChromaError>))?;
2021+
.retryable_get_collection_with_segments(request.collection_id)
2022+
.await?;
20042023

20052024
let latest_collection_logical_size_bytes = collection_and_segments
20062025
.collection
@@ -2032,8 +2051,36 @@ impl ServiceBasedFrontend {
20322051
payloads: request.searches,
20332052
};
20342053

2054+
let collection_id = search_plan
2055+
.scan
2056+
.collection_and_segments
2057+
.collection
2058+
.collection_id;
2059+
20352060
// Execute the single search plan using the executor
2036-
let result = self.executor.search(search_plan).await?;
2061+
let result = self
2062+
.executor
2063+
.search(search_plan.clone(), |code: tonic::Code| {
2064+
let mut provider = self.collections_with_segments_provider.clone();
2065+
let mut search_replanned = search_plan.clone();
2066+
async move {
2067+
if code == tonic::Code::NotFound {
2068+
provider
2069+
.collections_with_segments_cache
2070+
.remove(&collection_id)
2071+
.await;
2072+
let collection_and_segments = provider
2073+
.get_collection_with_segments(collection_id)
2074+
.await
2075+
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
2076+
search_replanned.scan = Scan {
2077+
collection_and_segments,
2078+
};
2079+
}
2080+
Ok(search_replanned)
2081+
}
2082+
})
2083+
.await?;
20372084

20382085
// Calculate return bytes (approximate size of the response)
20392086
let return_bytes = result.size_bytes();
@@ -2085,50 +2132,7 @@ impl ServiceBasedFrontend {
20852132
}
20862133

20872134
pub async fn search(&mut self, request: SearchRequest) -> Result<SearchResponse, QueryError> {
2088-
// TODO: The retry logic is mostly the same for count/get/query/search, we should consider unifying them
2089-
let retries = Arc::new(AtomicUsize::new(0));
2090-
let search_to_retry = || {
2091-
let mut self_clone = self.clone();
2092-
let request_clone = request.clone();
2093-
let cache_clone = self
2094-
.collections_with_segments_provider
2095-
.collections_with_segments_cache
2096-
.clone();
2097-
async move {
2098-
let res = self_clone.retryable_search(request_clone).await;
2099-
match res {
2100-
Ok(res) => Ok(res),
2101-
Err(e) => {
2102-
if e.code() == ErrorCodes::NotFound {
2103-
tracing::info!(
2104-
"Invalidating cache for collection {}",
2105-
request.collection_id
2106-
);
2107-
cache_clone.remove(&request.collection_id).await;
2108-
}
2109-
Err(e)
2110-
}
2111-
}
2112-
}
2113-
};
2114-
let res = search_to_retry
2115-
.retry(self.collections_with_segments_provider.get_retry_backoff())
2116-
// NOTE: Transport level errors will manifest as unknown errors, and they should also be retried
2117-
.when(|e| matches!(e.code(), ErrorCodes::NotFound | ErrorCodes::Unknown))
2118-
.notify(|_, _| {
2119-
let retried = retries.fetch_add(1, Ordering::Relaxed);
2120-
if retried > 0 {
2121-
tracing::info!(
2122-
"Retrying search() request for collection {}",
2123-
request.collection_id
2124-
);
2125-
}
2126-
})
2127-
.await;
2128-
self.metrics
2129-
.search_retries_counter
2130-
.add(retries.load(Ordering::Relaxed) as u64, &[]);
2131-
res
2135+
self.retryable_search(request).await
21322136
}
21332137

21342138
pub async fn healthcheck(&self) -> HealthCheckResponse {

0 commit comments

Comments
 (0)