Skip to content

Commit 4c24011

Browse files
sanketkediajairad26
authored andcommitted
Rebase
1 parent d240bb3 commit 4c24011

File tree

3 files changed

+96
-56
lines changed

3 files changed

+96
-56
lines changed

rust/frontend/src/executor/distributed.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ struct Metrics {
6262
delete_retries_counter: Counter<u64>,
6363
count_retries_counter: Counter<u64>,
6464
query_retries_counter: Counter<u64>,
65+
search_retries_counter: Counter<u64>,
6566
get_retries_counter: Counter<u64>,
6667
add_retries_counter: Counter<u64>,
6768
update_retries_counter: Counter<u64>,
@@ -122,6 +123,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
122123
let delete_retries_counter = meter.u64_counter("delete_retries").build();
123124
let count_retries_counter = meter.u64_counter("count_retries").build();
124125
let query_retries_counter = meter.u64_counter("query_retries").build();
126+
let search_retries_counter = meter.u64_counter("search_retries").build();
125127
let get_retries_counter = meter.u64_counter("get_retries").build();
126128
let add_retries_counter = meter.u64_counter("add_retries").build();
127129
let update_retries_counter = meter.u64_counter("update_retries").build();
@@ -135,6 +137,7 @@ impl Configurable<(config::DistributedExecutorConfig, System)> for DistributedEx
135137
add_retries_counter,
136138
update_retries_counter,
137139
upsert_retries_counter,
140+
search_retries_counter,
138141
});
139142
Ok(Self {
140143
client_assigner,
@@ -395,7 +398,16 @@ impl DistributedExecutor {
395398
Ok(res.into_inner().try_into()?)
396399
}
397400

398-
pub async fn search(&mut self, plan: Search) -> Result<SearchResult, ExecutorError> {
401+
pub async fn search<F, Fut>(
402+
&mut self,
403+
plan: Search,
404+
replan_closure: F,
405+
) -> Result<SearchResult, ExecutorError>
406+
where
407+
F: Fn(tonic::Code) -> Fut,
408+
Fut: Future<Output = Result<Search, Box<dyn ChromaError>>>,
409+
{
410+
let retry_count = Arc::new(AtomicUsize::new(0));
399411
// Get the collection ID from the plan
400412
let collection_id = &plan
401413
.scan
@@ -413,21 +425,57 @@ impl DistributedExecutor {
413425
let request: chroma_types::chroma_proto::SearchPlan = plan.try_into()?;
414426

415427
let attempt_count = std::sync::Arc::new(std::sync::atomic::AtomicU32::new(0));
428+
let last_error = std::sync::Arc::new(parking_lot::Mutex::new(tonic::Code::Ok));
416429
let config = self.client_selection_config.clone();
417430
let res = {
418431
let attempt_count = attempt_count.clone();
419432
(|| async {
420433
let current_attempt =
421434
attempt_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
422435
let is_retry = current_attempt > 0;
436+
if is_retry {
437+
let last_error_code = *last_error.lock();
438+
let replan_search =
439+
replan_closure(last_error_code)
440+
.await
441+
.map_err(|e| -> tonic::Status {
442+
Status::new(
443+
e.code().into(),
444+
format!("Failed to replan search {:?}", e),
445+
)
446+
})?;
447+
return choose_query_client_weighted(&clients, &config, is_retry)?
448+
.search(Request::new(replan_search.try_into()?))
449+
.await;
450+
}
423451
choose_query_client_weighted(&clients, &config, is_retry)?
424452
.search(Request::new(request.clone()))
425453
.await
426454
})
427455
.retry(self.backoff)
428456
.when(is_retryable_error)
457+
.notify(|e, _| {
458+
let mut last_error = last_error.lock();
459+
*last_error = e.code();
460+
tracing::info!(
461+
"Retrying search for collection {}, error {:?}",
462+
collection_id,
463+
e
464+
);
465+
retry_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
466+
})
467+
.adjust(|e, d| {
468+
if e.code() == Code::NotFound {
469+
return Some(Duration::from_micros(0));
470+
}
471+
d
472+
})
429473
.await?
430474
};
475+
self.metrics.search_retries_counter.add(
476+
retry_count.load(std::sync::atomic::Ordering::Relaxed) as u64,
477+
&[],
478+
);
431479
Ok(res.into_inner().try_into()?)
432480
}
433481

rust/frontend/src/executor/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,19 @@ impl Executor {
7171
Executor::Local(local_executor) => local_executor.knn(plan, replan_closure).await,
7272
}
7373
}
74-
pub async fn search(&mut self, plan: Search) -> Result<SearchResult, ExecutorError> {
74+
pub async fn search<F, Fut>(
75+
&mut self,
76+
plan: Search,
77+
replan_closure: F,
78+
) -> Result<SearchResult, ExecutorError>
79+
where
80+
F: Fn(tonic::Code) -> Fut,
81+
Fut: Future<Output = Result<Search, Box<dyn ChromaError>>>,
82+
{
7583
match self {
76-
Executor::Distributed(distributed_executor) => distributed_executor.search(plan).await,
84+
Executor::Distributed(distributed_executor) => {
85+
distributed_executor.search(plan, replan_closure).await
86+
}
7787
Executor::Local(local_executor) => local_executor.search(plan).await,
7888
}
7989
}

rust/frontend/src/impls/service_based_frontend.rs

Lines changed: 35 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ struct Metrics {
7575
create_tenant_retries_counter: Counter<u64>,
7676
update_tenant_retries_counter: Counter<u64>,
7777
get_collection_with_segments_counter: Counter<u64>,
78-
search_retries_counter: Counter<u64>,
7978
metering_fork_counter: Counter<u64>,
8079
metering_read_counter: Counter<u64>,
8180
metering_write_counter: Counter<u64>,
@@ -113,7 +112,6 @@ impl ServiceBasedFrontend {
113112
let add_retries_counter = meter.u64_counter("add_retries").build();
114113
let update_retries_counter = meter.u64_counter("update_retries").build();
115114
let upsert_retries_counter = meter.u64_counter("upsert_retries").build();
116-
let search_retries_counter = meter.u64_counter("search_retries").build();
117115
let metering_fork_counter = meter.u64_counter("metering_events_sent.fork").with_description("The number of fork metering events sent by the frontend to the metering event receiver.").build();
118116
let metering_read_counter = meter.u64_counter("metering_events_sent.read").with_description("The number of read metering events sent by the frontend to the metering event receiver.").build();
119117
let metering_write_counter = meter.u64_counter("metering_events_sent.write").with_description("The number of write metering events sent by the frontend to the metering event receiver.").build();
@@ -162,7 +160,6 @@ impl ServiceBasedFrontend {
162160
create_db_retries_counter,
163161
delete_db_retries_counter,
164162
delete_collection_retries_counter,
165-
search_retries_counter,
166163
metering_fork_counter,
167164
metering_read_counter,
168165
metering_write_counter,
@@ -1577,7 +1574,9 @@ impl ServiceBasedFrontend {
15771574
};
15781575

15791576
if let Some(event) = read_event {
1580-
event.submit().await;
1577+
if let Ok(()) = event.submit().await {
1578+
self.metrics.metering_read_counter.add(1, &[]);
1579+
}
15811580
}
15821581

15831582
Ok(records)
@@ -1996,10 +1995,8 @@ impl ServiceBasedFrontend {
19961995
// TODO: The dispatch logic is mostly the same for count/get/query/search, we should consider unifying them
19971996
// Get collection and segments once for all queries
19981997
let collection_and_segments = self
1999-
.collections_with_segments_provider
2000-
.get_collection_with_segments(request.collection_id)
2001-
.await
2002-
.map_err(|err| QueryError::Other(Box::new(err) as Box<dyn ChromaError>))?;
1998+
.retryable_get_collection_with_segments(request.collection_id)
1999+
.await?;
20032000

20042001
let latest_collection_logical_size_bytes = collection_and_segments
20052002
.collection
@@ -2032,8 +2029,36 @@ impl ServiceBasedFrontend {
20322029
payloads: request.searches,
20332030
};
20342031

2032+
let collection_id = search_plan
2033+
.scan
2034+
.collection_and_segments
2035+
.collection
2036+
.collection_id;
2037+
20352038
// Execute the single search plan using the executor
2036-
let result = self.executor.search(search_plan).await?;
2039+
let result = self
2040+
.executor
2041+
.search(search_plan.clone(), |code: tonic::Code| {
2042+
let mut provider = self.collections_with_segments_provider.clone();
2043+
let mut search_replanned = search_plan.clone();
2044+
async move {
2045+
if code == tonic::Code::NotFound {
2046+
provider
2047+
.collections_with_segments_cache
2048+
.remove(&collection_id)
2049+
.await;
2050+
let collection_and_segments = provider
2051+
.get_collection_with_segments(collection_id)
2052+
.await
2053+
.map_err(|err| Box::new(err) as Box<dyn ChromaError>)?;
2054+
search_replanned.scan = Scan {
2055+
collection_and_segments,
2056+
};
2057+
}
2058+
Ok(search_replanned)
2059+
}
2060+
})
2061+
.await?;
20372062

20382063
// Calculate return bytes (approximate size of the response)
20392064
let return_bytes = result.size_bytes();
@@ -2079,50 +2104,7 @@ impl ServiceBasedFrontend {
20792104
}
20802105

20812106
pub async fn search(&mut self, request: SearchRequest) -> Result<SearchResponse, QueryError> {
2082-
// TODO: The retry logic is mostly the same for count/get/query/search, we should consider unifying them
2083-
let retries = Arc::new(AtomicUsize::new(0));
2084-
let search_to_retry = || {
2085-
let mut self_clone = self.clone();
2086-
let request_clone = request.clone();
2087-
let cache_clone = self
2088-
.collections_with_segments_provider
2089-
.collections_with_segments_cache
2090-
.clone();
2091-
async move {
2092-
let res = self_clone.retryable_search(request_clone).await;
2093-
match res {
2094-
Ok(res) => Ok(res),
2095-
Err(e) => {
2096-
if e.code() == ErrorCodes::NotFound {
2097-
tracing::info!(
2098-
"Invalidating cache for collection {}",
2099-
request.collection_id
2100-
);
2101-
cache_clone.remove(&request.collection_id).await;
2102-
}
2103-
Err(e)
2104-
}
2105-
}
2106-
}
2107-
};
2108-
let res = search_to_retry
2109-
.retry(self.collections_with_segments_provider.get_retry_backoff())
2110-
// NOTE: Transport level errors will manifest as unknown errors, and they should also be retried
2111-
.when(|e| matches!(e.code(), ErrorCodes::NotFound | ErrorCodes::Unknown))
2112-
.notify(|_, _| {
2113-
let retried = retries.fetch_add(1, Ordering::Relaxed);
2114-
if retried > 0 {
2115-
tracing::info!(
2116-
"Retrying search() request for collection {}",
2117-
request.collection_id
2118-
);
2119-
}
2120-
})
2121-
.await;
2122-
self.metrics
2123-
.search_retries_counter
2124-
.add(retries.load(Ordering::Relaxed) as u64, &[]);
2125-
res
2107+
self.retryable_search(request).await
21262108
}
21272109

21282110
pub async fn healthcheck(&self) -> HealthCheckResponse {

0 commit comments

Comments
 (0)