-
Notifications
You must be signed in to change notification settings - Fork 1.8k
[ENH]: Retries everywhere #5417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Reviewer ChecklistPlease leverage this checklist to ensure your code review is thorough before approving Testing, Bugs, Errors, Logs, Documentation
System Compatibility
Quality
|
Consolidate and Enhance Retry Logic Across Executor and Frontend This PR refactors and centralizes retry logic in the Chroma Rust codebase. Previously, retries for downstream node queries were scattered across both the frontend and executor layers; now, all query retries are consolidated into the The retry mechanism is improved to handle different error classes, including immediate retries for Key Changes• Centralized all downstream query retry logic within the Affected Areas• This summary was automatically generated by @propel-code-bot |
d9d5eba
to
73e64da
Compare
self.retryable_count(request).await | ||
} | ||
|
||
fn is_retryable(code: tonic::Code) -> bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The error condition mapping in is_retryable
is inconsistent with the actual error codes returned by the system. The method only retries on Unavailable
and Unknown
errors, but the code shows that log operations return Unavailable
errors (line 1426), while other operations may return different error codes. This could lead to missing legitimate retry opportunities.
fn is_retryable(code: tonic::Code) -> bool {
matches!(code,
tonic::Code::Unavailable |
tonic::Code::Unknown |
tonic::Code::DeadlineExceeded |
tonic::Code::Aborted
)
}
Context for Agents
[**BestPractice**]
The error condition mapping in `is_retryable` is inconsistent with the actual error codes returned by the system. The method only retries on `Unavailable` and `Unknown` errors, but the code shows that log operations return `Unavailable` errors (line 1426), while other operations may return different error codes. This could lead to missing legitimate retry opportunities.
```rust
fn is_retryable(code: tonic::Code) -> bool {
matches!(code,
tonic::Code::Unavailable |
tonic::Code::Unknown |
tonic::Code::DeadlineExceeded |
tonic::Code::Aborted
)
}
```
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1737
|
||
pub async fn delete( | ||
&mut self, | ||
request: DeleteCollectionRecordsRequest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
The retry logic in delete
method separates record fetching from log pushing, but doesn't handle the case where records change between retries. If the first operation succeeds but the second fails and retries, the same records will be deleted multiple times, potentially causing data inconsistency.
// Consider fetching records within the retry loop or implementing idempotency checks
pub async fn delete(
&mut self,
request: DeleteCollectionRecordsRequest,
) -> Result<DeleteCollectionRecordsResponse, DeleteCollectionRecordsError> {
let retries = Arc::new(AtomicUsize::new(0));
let retryable_operation = || {
let mut self_clone = self.clone();
let request_clone = request.clone();
async move {
let records = self_clone.retryable_get_records_to_delete(request_clone.clone()).await?;
self_clone.retryable_push_delete_logs(
request_clone.tenant_id,
request_clone.database_name,
request_clone.collection_id,
records
).await
}
};
// ... rest of retry logic
}
Context for Agents
[**CriticalError**]
The retry logic in `delete` method separates record fetching from log pushing, but doesn't handle the case where records change between retries. If the first operation succeeds but the second fails and retries, the same records will be deleted multiple times, potentially causing data inconsistency.
```rust
// Consider fetching records within the retry loop or implementing idempotency checks
pub async fn delete(
&mut self,
request: DeleteCollectionRecordsRequest,
) -> Result<DeleteCollectionRecordsResponse, DeleteCollectionRecordsError> {
let retries = Arc::new(AtomicUsize::new(0));
let retryable_operation = || {
let mut self_clone = self.clone();
let request_clone = request.clone();
async move {
let records = self_clone.retryable_get_records_to_delete(request_clone.clone()).await?;
self_clone.retryable_push_delete_logs(
request_clone.tenant_id,
request_clone.database_name,
request_clone.collection_id,
records
).await
}
};
// ... rest of retry logic
}
```
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1610
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deletes are only retried in case the log is unavailable or has asked to backoff. So there is no double write
pub async fn count<F, Fut>( | ||
&mut self, | ||
plan: Count, | ||
replan_closure: F, | ||
) -> Result<CountResult, ExecutorError> | ||
where | ||
F: Fn(tonic::Code) -> Fut, | ||
Fut: Future<Output = Result<Count, Box<dyn ChromaError>>>, | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
The new retry logic with replanning is a great improvement and aligns well with the PR description. However, the implementation is duplicated across count
, get
, knn
, and search
. This could be refactored into a single generic private method to improve maintainability and reduce code duplication.
A helper function could encapsulate the retry pattern. Here's a potential signature to illustrate the idea:
async fn execute_with_retry<Plan, PlanProto, Result, ReplanFuture, GrpcFuture>(
&self,
initial_plan: Plan,
clients: &[QueryClient],
replan_closure: impl Fn(tonic::Code) -> ReplanFuture,
grpc_call: impl Fn(QueryClient, PlanProto) -> GrpcFuture,
metric_counter: &Counter<u64>,
) -> Result<Result, ExecutorError>
where
Plan: Clone + TryInto<PlanProto>, // and other traits
ReplanFuture: Future<Output = Result<Plan, Box<dyn ChromaError>>>,
GrpcFuture: Future<Output = Result<tonic::Response<Result>, tonic::Status>>,
Result: 'static, // and other traits
{
// ... shared retry logic here ...
}
This would make each of the public methods (count
, get
, etc.) a simple call to this helper with the appropriate closures and types.
Context for Agents
[**BestPractice**]
The new retry logic with replanning is a great improvement and aligns well with the PR description. However, the implementation is duplicated across `count`, `get`, `knn`, and `search`. This could be refactored into a single generic private method to improve maintainability and reduce code duplication.
A helper function could encapsulate the retry pattern. Here's a potential signature to illustrate the idea:
```rust
async fn execute_with_retry<Plan, PlanProto, Result, ReplanFuture, GrpcFuture>(
&self,
initial_plan: Plan,
clients: &[QueryClient],
replan_closure: impl Fn(tonic::Code) -> ReplanFuture,
grpc_call: impl Fn(QueryClient, PlanProto) -> GrpcFuture,
metric_counter: &Counter<u64>,
) -> Result<Result, ExecutorError>
where
Plan: Clone + TryInto<PlanProto>, // and other traits
ReplanFuture: Future<Output = Result<Plan, Box<dyn ChromaError>>>,
GrpcFuture: Future<Output = Result<tonic::Response<Result>, tonic::Status>>,
Result: 'static, // and other traits
{
// ... shared retry logic here ...
}
```
This would make each of the public methods (`count`, `get`, etc.) a simple call to this helper with the appropriate closures and types.
File: rust/frontend/src/executor/distributed.rs
Line: 173
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with this
&mut self, | ||
CreateTenantRequest { name, .. }: CreateTenantRequest, | ||
) -> Result<CreateTenantResponse, CreateTenantError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[BestPractice]
It's great to see robust retry logic being added to the sysdb calls. I've noticed that the retry pattern (creating a closure, calling .retry()
, .when()
, .notify()
, and updating metrics) is repeated for many of the sysdb methods (create_tenant
, get_tenant
, update_tenant
, create_database
, etc.).
To reduce this boilerplate, you could introduce a helper function or a macro. A helper function might look something like this:
async fn with_sysdb_retry<
Fut: Future<Output = Result<T, E>>,
T,
E: ChromaError + 'static,
>(
&self,
operation: impl Fn() -> Fut,
metric_counter: &Counter<u64>,
log_message: &str,
) -> Result<T, E> {
let retry_count = Arc::new(AtomicUsize::new(0));
let result = operation
.retry(self.retry_policy)
.when(|e: &E| Self::is_retryable(e.code().into()))
.notify(|e, _| {
retry_count.fetch_add(1, Ordering::Relaxed);
tracing::info!("{} failed with error {:?}. Retrying", log_message, e);
})
.await?;
metric_counter.add(retry_count.load(Ordering::Relaxed) as u64, &[]);
Ok(result)
}
This would simplify methods like create_tenant
to a much more concise form, improving readability and maintainability.
Context for Agents
[**BestPractice**]
It's great to see robust retry logic being added to the sysdb calls. I've noticed that the retry pattern (creating a closure, calling `.retry()`, `.when()`, `.notify()`, and updating metrics) is repeated for many of the sysdb methods (`create_tenant`, `get_tenant`, `update_tenant`, `create_database`, etc.).
To reduce this boilerplate, you could introduce a helper function or a macro. A helper function might look something like this:
```rust
async fn with_sysdb_retry<
Fut: Future<Output = Result<T, E>>,
T,
E: ChromaError + 'static,
>(
&self,
operation: impl Fn() -> Fut,
metric_counter: &Counter<u64>,
log_message: &str,
) -> Result<T, E> {
let retry_count = Arc::new(AtomicUsize::new(0));
let result = operation
.retry(self.retry_policy)
.when(|e: &E| Self::is_retryable(e.code().into()))
.notify(|e, _| {
retry_count.fetch_add(1, Ordering::Relaxed);
tracing::info!("{} failed with error {:?}. Retrying", log_message, e);
})
.await?;
metric_counter.add(retry_count.load(Ordering::Relaxed) as u64, &[]);
Ok(result)
}
```
This would simplify methods like `create_tenant` to a much more concise form, improving readability and maintainability.
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 314
let list_db_retries_counter = meter.u64_counter("list_database_retries").build(); | ||
let create_db_retries_counter = meter.u64_counter("create_database_retries").build(); | ||
let get_db_retries_counter = meter.u64_counter("get_database_retries").build(); | ||
let delete_db_retries_counter = meter.u64_counter("delete_database_retries").build(); | ||
let list_collections_retries_counter = | ||
meter.u64_counter("list_collections_retries").build(); | ||
let count_collections_retries_counter = | ||
meter.u64_counter("count_collections_retries").build(); | ||
let get_collection_retries_counter = meter.u64_counter("get_collection_retries").build(); | ||
let get_collection_by_crn_retries_counter = | ||
meter.u64_counter("get_collection_by_crn_retries").build(); | ||
let get_tenant_retries_counter = meter.u64_counter("get_tenant_retries").build(); | ||
let create_collection_retries_counter = | ||
meter.u64_counter("create_collection_retries").build(); | ||
let update_collection_retries_counter = | ||
meter.u64_counter("update_collection_retries").build(); | ||
let delete_collection_retries_counter = | ||
meter.u64_counter("delete_collection_retries").build(); | ||
let create_tenant_retries_counter = meter.u64_counter("create_tenant_retries").build(); | ||
let update_tenant_retries_counter = meter.u64_counter("update_tenant_retries").build(); | ||
let get_collection_with_segments_counter = meter | ||
.u64_counter("get_collection_with_segments_retries") | ||
.build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be better if there is a single retries
metric in this case and we distinguish with labels instead
CreateTenantRequest { name, .. }: CreateTenantRequest, | ||
) -> Result<CreateTenantResponse, CreateTenantError> { | ||
self.sysdb_client.create_tenant(name).await | ||
let retry_count = Arc::new(AtomicUsize::new(0)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: do we need this new var? could we just clone self.metrics.create_tenant_retries_counter and pass it to the closure?
fork_retries_counter: Counter<u64>, | ||
delete_retries_counter: Counter<u64>, | ||
count_retries_counter: Counter<u64>, | ||
query_retries_counter: Counter<u64>, | ||
search_retries_counter: Counter<u64>, | ||
get_retries_counter: Counter<u64>, | ||
add_retries_counter: Counter<u64>, | ||
update_retries_counter: Counter<u64>, | ||
upsert_retries_counter: Counter<u64>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar comment, should this be one metric with labels?
b614a69
to
b381e79
Compare
code == tonic::Code::Unavailable || code == tonic::Code::Unknown | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
Potential N+1 query pattern detected in the retry mechanism:
retryable_get_collection_with_segments(collection_id).await
This method is called inside executor operations that could be part of loops (e.g., batch operations, multiple collection queries). If this method is called for the same collection_id multiple times in quick succession, each call might result in separate database queries to fetch collection metadata.
Consider:
- Adding request deduplication for concurrent requests to the same collection
- Using a short-lived cache to prevent repeated identical queries
- Batching collection metadata requests when possible
Context for Agents
[**PerformanceOptimization**]
Potential N+1 query pattern detected in the retry mechanism:
```rust
retryable_get_collection_with_segments(collection_id).await
```
This method is called inside executor operations that could be part of loops (e.g., batch operations, multiple collection queries). If this method is called for the same collection_id multiple times in quick succession, each call might result in separate database queries to fetch collection metadata.
Consider:
1. Adding request deduplication for concurrent requests to the same collection
2. Using a short-lived cache to prevent repeated identical queries
3. Batching collection metadata requests when possible
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1720
let retry_count = Arc::new(AtomicUsize::new(0)); | ||
let retryable_create_collection = || { | ||
let mut self_clone = self.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[PerformanceOptimization]
Potential resource leak: The retryable_create_collection
closure captures multiple owned values (tenant_clone
, db_name_clone
, etc.) and creates a new self_clone
on each retry attempt. If retries fail repeatedly, this could accumulate significant memory usage.
The pattern is repeated across many methods. Consider:
- Limiting the clone depth
- Adding memory pressure checks before retries
- Using weak references where possible to prevent excessive memory usage during retry storms
Context for Agents
[**PerformanceOptimization**]
Potential resource leak: The `retryable_create_collection` closure captures multiple owned values (`tenant_clone`, `db_name_clone`, etc.) and creates a new `self_clone` on each retry attempt. If retries fail repeatedly, this could accumulate significant memory usage.
The pattern is repeated across many methods. Consider:
1. Limiting the clone depth
2. Adding memory pressure checks before retries
3. Using weak references where possible to prevent excessive memory usage during retry storms
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 847
Executor::Distributed(distributed_executor) => { | ||
distributed_executor.search(plan, replan_closure).await | ||
} | ||
Executor::Local(local_executor) => local_executor.search(plan).await, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
It appears there's a mismatch in the search
method dispatch for LocalExecutor
. The replan_closure
is not being passed, which will likely cause a compilation error.
Additionally, the search
method signature in rust/frontend/src/executor/local.rs
needs to be updated to accept the replan_closure
, similar to how count
, get
, and knn
were updated in that file.
Here's the suggested fix for the dispatch:
Executor::Local(local_executor) => local_executor.search(plan).await, | |
Executor::Local(local_executor) => local_executor.search(plan, replan_closure).await, |
⚡ Committable suggestion
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
Context for Agents
[**CriticalError**]
It appears there's a mismatch in the `search` method dispatch for `LocalExecutor`. The `replan_closure` is not being passed, which will likely cause a compilation error.
Additionally, the `search` method signature in `rust/frontend/src/executor/local.rs` needs to be updated to accept the `replan_closure`, similar to how `count`, `get`, and `knn` were updated in that file.
Here's the suggested fix for the dispatch:
```suggestion
Executor::Local(local_executor) => local_executor.search(plan, replan_closure).await,
```
⚡ **Committable suggestion**
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.
File: rust/frontend/src/executor/mod.rs
Line: 87
b381e79
to
128ad9d
Compare
128ad9d
to
e24519c
Compare
.add(retry_count.load(Ordering::Relaxed) as u64, &[]); | ||
res | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[CriticalError]
Race condition: Multiple concurrent operations on the same collection can cause cache invalidation race conditions. When one operation removes a collection from cache (cache.remove(&collection_id).await
), other concurrent operations might fail if they're in the middle of using stale cached data.
// Thread 1: Gets cached data
let cached_data = cache.get(&collection_id).await;
// Thread 2: Invalidates cache
cache.remove(&collection_id).await;
// Thread 1: Uses stale cached_data -> potential failure
Consider using cache versioning or atomic cache operations to prevent this race condition.
Context for Agents
[**CriticalError**]
Race condition: Multiple concurrent operations on the same collection can cause cache invalidation race conditions. When one operation removes a collection from cache (`cache.remove(&collection_id).await`), other concurrent operations might fail if they're in the middle of using stale cached data.
```rust
// Thread 1: Gets cached data
let cached_data = cache.get(&collection_id).await;
// Thread 2: Invalidates cache
cache.remove(&collection_id).await;
// Thread 1: Uses stale cached_data -> potential failure
```
Consider using cache versioning or atomic cache operations to prevent this race condition.
File: rust/frontend/src/impls/service_based_frontend.rs
Line: 1753
Description of changes
replan_closure
and gives the FE a way to prepare a fresh plan for the query on a retry.NotFound
error meaning it has to update the segments info and retry again.Test plan
pytest
for python,yarn test
for js,cargo test
for rustMigration plan
None
Observability plan
Observed in local tilt. Will observe it staging too
Documentation Changes
None