Skip to content

Commit 9116e7a

Browse files
authored
[CHORE] Hard delete for manually GC'ed collections (#5490)
## Description of changes This PR makes it so that if we run manual gc on a collection that has been fully hard deleted from the sysdb, then the log will be deleted too. ## Test plan CI + manual testing. ## Migration plan N/A ## Observability plan Tracing ## Documentation Changes N/A
1 parent 6206073 commit 9116e7a

File tree

3 files changed

+321
-7
lines changed

3 files changed

+321
-7
lines changed

rust/garbage_collector/src/garbage_collector_component.rs

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use chroma_error::ChromaError;
1414
use chroma_log::Log;
1515
use chroma_memberlist::memberlist_provider::Memberlist;
1616
use chroma_storage::Storage;
17-
use chroma_sysdb::{CollectionToGcInfo, SysDb, SysDbConfig};
17+
use chroma_sysdb::{CollectionToGcInfo, GetCollectionsToGcError, SysDb, SysDbConfig};
1818
use chroma_system::{
1919
wrap, Component, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator, System,
2020
TaskResult,
@@ -118,6 +118,37 @@ impl GarbageCollector {
118118
self.system = Some(system);
119119
}
120120

121+
async fn garbage_collect_hard_delete_log(
122+
&self,
123+
collection_id: CollectionUuid,
124+
) -> Result<GarbageCollectorResponse, GarbageCollectCollectionError> {
125+
let dispatcher = self
126+
.dispatcher
127+
.as_ref()
128+
.ok_or(GarbageCollectCollectionError::Uninitialized)?;
129+
let system = self
130+
.system
131+
.as_ref()
132+
.ok_or(GarbageCollectCollectionError::Uninitialized)?;
133+
134+
let orchestrator =
135+
crate::log_only_orchestrator::HardDeleteLogOnlyGarbageCollectorOrchestrator::new(
136+
dispatcher.clone(),
137+
self.storage.clone(),
138+
self.logs.clone(),
139+
collection_id,
140+
);
141+
142+
let result = match orchestrator.run(system.clone()).await {
143+
Ok(res) => res,
144+
Err(e) => {
145+
tracing::error!("Failed to run garbage collection orchestrator v2: {:?}", e);
146+
return Err(GarbageCollectCollectionError::OrchestratorV2Error(e));
147+
}
148+
};
149+
Ok(result)
150+
}
151+
121152
async fn garbage_collect_collection(
122153
&self,
123154
version_absolute_cutoff_time: DateTime<Utc>,
@@ -433,6 +464,7 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
433464
}
434465
}
435466
}
467+
let mut collections_to_hard_delete_log = vec![];
436468
for collection_id in manual {
437469
if collections_to_gc.iter().any(|c| c.id == collection_id) {
438470
continue;
@@ -446,6 +478,9 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
446478
);
447479
collections_to_gc.push(collection_info);
448480
}
481+
Err(GetCollectionsToGcError::NoSuchCollection) => {
482+
collections_to_hard_delete_log.push(collection_id);
483+
}
449484
Err(err) => {
450485
tracing::event!(
451486
Level::ERROR,
@@ -491,7 +526,7 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
491526

492527
let mut sysdb = self.sysdb_client.clone();
493528

494-
let mut jobs_stream = futures::stream::iter(collections_to_gc)
529+
let jobs_iter1 = collections_to_gc.into_iter()
495530
.map(|(cleanup_mode, collection)| {
496531
tracing::info!(
497532
"Processing collection: {} (tenant: {}, version_file_path: {})",
@@ -504,21 +539,28 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
504539
let instrumented_span = span!(parent: None, tracing::Level::INFO, "Garbage collection job", collection_id = ?collection.id, tenant_id = %collection.tenant, cleanup_mode = ?cleanup_mode);
505540
Span::current().add_link(instrumented_span.context().span().span_context().clone());
506541

507-
self.garbage_collect_collection(
542+
Box::pin(self.garbage_collect_collection(
508543
version_absolute_cutoff_time,
509544
collection_soft_delete_absolute_cutoff_time,
510545
collection,
511546
cleanup_mode,
512547
self.config
513548
.enable_dangerous_option_to_ignore_min_versions_for_wal3,
514549
)
515-
.instrument(instrumented_span)
516-
})
517-
.buffer_unordered(100);
550+
.instrument(instrumented_span)) as std::pin::Pin<Box<dyn std::future::Future<Output = Result<GarbageCollectorResponse, GarbageCollectCollectionError>> + Send + '_>>
551+
});
552+
let jobs_iter2 = collections_to_hard_delete_log.into_iter().map(|collection_id| {
553+
tracing::event!(Level::INFO, "hard delete log-only");
554+
let instrumented_span = span!(parent: None, tracing::Level::INFO, "Garbage collection job (hard delete log)", collection_id =? collection_id);
555+
Span::current().add_link(instrumented_span.context().span().span_context().clone());
556+
Box::pin(self.garbage_collect_hard_delete_log(collection_id).instrument(instrumented_span)) as std::pin::Pin<Box<dyn std::future::Future<Output = Result<GarbageCollectorResponse, GarbageCollectCollectionError>> + Send + '_>>
557+
});
558+
let mut jobs_stream1 = futures::stream::iter(jobs_iter1).buffer_unordered(100);
559+
let mut jobs_stream2 = futures::stream::iter(jobs_iter2).buffer_unordered(100);
518560

519561
let mut num_completed_jobs = 0;
520562
let mut num_failed_jobs = 0;
521-
while let Some(job_result) = jobs_stream.next().await {
563+
while let Some(job_result) = jobs_stream1.next().await {
522564
match job_result {
523565
Ok(result) => {
524566
{
@@ -534,6 +576,25 @@ impl Handler<GarbageCollectMessage> for GarbageCollector {
534576
}
535577
}
536578
}
579+
// NOTE(rescrv): I'm not proud of this duplication, but I cannot coerce the
580+
// futures::stream::iter above to take a chain of two different futures. It just won't
581+
// compile.
582+
while let Some(job_result) = jobs_stream2.next().await {
583+
match job_result {
584+
Ok(result) => {
585+
{
586+
let mut manual_collections = self.manual_collections.lock();
587+
manual_collections.remove(&result.collection_id);
588+
}
589+
tracing::info!("Garbage collection hard delete completed. Deleted all log files collection {}.", result.collection_id);
590+
num_completed_jobs += 1;
591+
}
592+
Err(e) => {
593+
tracing::error!("Garbage collection failed: {:?}", e);
594+
num_failed_jobs += 1;
595+
}
596+
}
597+
}
537598
tracing::info!(
538599
"Completed {} jobs, failed {} jobs",
539600
num_completed_jobs,

rust/garbage_collector/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ mod construct_version_graph_orchestrator;
2626
mod garbage_collector_component;
2727
pub mod garbage_collector_orchestrator;
2828
pub mod garbage_collector_orchestrator_v2;
29+
mod log_only_orchestrator;
2930

3031
#[cfg(test)]
3132
pub(crate) mod helper;
Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
use crate::garbage_collector_orchestrator_v2::GarbageCollectorError;
2+
use crate::operators::delete_unused_logs::{
3+
DeleteUnusedLogsError, DeleteUnusedLogsInput, DeleteUnusedLogsOperator, DeleteUnusedLogsOutput,
4+
};
5+
use crate::types::{CleanupMode, GarbageCollectorResponse};
6+
use async_trait::async_trait;
7+
use chroma_log::Log;
8+
use chroma_storage::Storage;
9+
use chroma_system::{
10+
wrap, ComponentContext, ComponentHandle, Dispatcher, Handler, Orchestrator,
11+
OrchestratorContext, TaskResult,
12+
};
13+
use chroma_types::CollectionUuid;
14+
use std::collections::{HashMap, HashSet};
15+
use tokio::sync::oneshot::Sender;
16+
use tracing::{Level, Span};
17+
18+
#[derive(Debug)]
19+
pub struct HardDeleteLogOnlyGarbageCollectorOrchestrator {
20+
context: OrchestratorContext,
21+
storage: Storage,
22+
logs: Log,
23+
result_channel: Option<Sender<Result<GarbageCollectorResponse, GarbageCollectorError>>>,
24+
collection_to_destroy: CollectionUuid,
25+
}
26+
27+
#[allow(clippy::too_many_arguments)]
28+
impl HardDeleteLogOnlyGarbageCollectorOrchestrator {
29+
pub fn new(
30+
dispatcher: ComponentHandle<Dispatcher>,
31+
storage: Storage,
32+
logs: Log,
33+
collection_to_destroy: CollectionUuid,
34+
) -> Self {
35+
Self {
36+
context: OrchestratorContext::new(dispatcher),
37+
storage,
38+
logs,
39+
result_channel: None,
40+
collection_to_destroy,
41+
}
42+
}
43+
}
44+
45+
#[async_trait]
46+
impl Orchestrator for HardDeleteLogOnlyGarbageCollectorOrchestrator {
47+
type Output = GarbageCollectorResponse;
48+
type Error = GarbageCollectorError;
49+
50+
fn dispatcher(&self) -> ComponentHandle<Dispatcher> {
51+
self.context.dispatcher.clone()
52+
}
53+
54+
fn context(&self) -> &OrchestratorContext {
55+
&self.context
56+
}
57+
58+
async fn on_start(&mut self, ctx: &ComponentContext<Self>) {
59+
let _ = self
60+
.try_start_delete_unused_logs_operator(ctx)
61+
.await
62+
.inspect_err(|_| {
63+
tracing::event!(
64+
Level::ERROR,
65+
"could not start job to hard delete unused logs",
66+
)
67+
});
68+
}
69+
70+
fn set_result_channel(
71+
&mut self,
72+
sender: Sender<Result<GarbageCollectorResponse, GarbageCollectorError>>,
73+
) {
74+
self.result_channel = Some(sender);
75+
}
76+
77+
fn take_result_channel(
78+
&mut self,
79+
) -> Option<Sender<Result<GarbageCollectorResponse, GarbageCollectorError>>> {
80+
self.result_channel.take()
81+
}
82+
}
83+
84+
impl HardDeleteLogOnlyGarbageCollectorOrchestrator {
85+
async fn try_start_delete_unused_logs_operator(
86+
&mut self,
87+
ctx: &ComponentContext<Self>,
88+
) -> Result<(), GarbageCollectorError> {
89+
let collections_to_destroy =
90+
HashSet::from_iter(vec![self.collection_to_destroy].into_iter());
91+
let collections_to_garbage_collect = HashMap::new();
92+
let task = wrap(
93+
Box::new(DeleteUnusedLogsOperator {
94+
enabled: true,
95+
mode: CleanupMode::DeleteV2,
96+
storage: self.storage.clone(),
97+
logs: self.logs.clone(),
98+
enable_dangerous_option_to_ignore_min_versions_for_wal3: false,
99+
}),
100+
DeleteUnusedLogsInput {
101+
collections_to_destroy,
102+
collections_to_garbage_collect,
103+
},
104+
ctx.receiver(),
105+
self.context.task_cancellation_token.clone(),
106+
);
107+
self.dispatcher()
108+
.send(task, Some(Span::current()))
109+
.await
110+
.map_err(GarbageCollectorError::Channel)?;
111+
Ok(())
112+
}
113+
}
114+
115+
#[async_trait]
116+
impl Handler<TaskResult<DeleteUnusedLogsOutput, DeleteUnusedLogsError>>
117+
for HardDeleteLogOnlyGarbageCollectorOrchestrator
118+
{
119+
type Result = ();
120+
121+
async fn handle(
122+
&mut self,
123+
message: TaskResult<DeleteUnusedLogsOutput, DeleteUnusedLogsError>,
124+
ctx: &ComponentContext<HardDeleteLogOnlyGarbageCollectorOrchestrator>,
125+
) {
126+
let _output = match self.ok_or_terminate(message.into_inner(), ctx).await {
127+
Some(output) => output,
128+
None => return,
129+
};
130+
self.terminate_with_result(
131+
Ok(GarbageCollectorResponse {
132+
collection_id: self.collection_to_destroy,
133+
num_versions_deleted: 0,
134+
num_files_deleted: 0,
135+
..Default::default()
136+
}),
137+
ctx,
138+
)
139+
.await;
140+
}
141+
}
142+
143+
#[cfg(test)]
144+
mod tests {
145+
//! Test suite for the `HardDeleteLogOnlyGarbageCollectorOrchestrator`.
146+
//!
147+
//! This module verifies the core functionality of the hard delete orchestrator,
148+
//! which is responsible for permanently removing log data for destroyed collections.
149+
//! The tests ensure proper initialization, configuration, and trait implementation
150+
//! of the orchestrator component.
151+
//!
152+
//! # Test Coverage
153+
//!
154+
//! The test suite validates:
155+
//! - Correct initialization with required dependencies
156+
//! - Proper storage of collection UUID for destruction
157+
//! - Result channel lifecycle management
158+
//! - Orchestrator trait contract fulfillment
159+
//!
160+
//! # Testing Approach
161+
//!
162+
//! Tests use mock components (test storage, dispatcher, logs) to isolate
163+
//! orchestrator behavior without requiring actual I/O operations.
164+
//! Each test is self-contained and can run in parallel using tokio's
165+
//! multi-threaded runtime.
166+
use super::*;
167+
use chroma_config::registry::Registry;
168+
use chroma_config::Configurable;
169+
use chroma_log::config::{GrpcLogConfig, LogConfig};
170+
use chroma_storage::test_storage;
171+
use chroma_system::{Dispatcher, System};
172+
173+
/// Verifies that the orchestrator correctly initializes with all required components.
174+
///
175+
/// This test ensures that when creating a new `HardDeleteLogOnlyGarbageCollectorOrchestrator`,
176+
/// all provided dependencies (dispatcher, storage, logs, collection UUID) are properly
177+
/// stored and the result channel starts in an uninitialized state.
178+
///
179+
/// # Test Invariants
180+
///
181+
/// - Collection UUID must match the one provided during construction
182+
/// - Result channel must be `None` initially (set later by the system)
183+
#[tokio::test(flavor = "multi_thread")]
184+
async fn test_k8s_integration_orchestrator_initialization() {
185+
let (_storage_dir, storage) = test_storage();
186+
let system = System::new();
187+
let dispatcher = Dispatcher::new(Default::default());
188+
let dispatcher_handle = system.start_component(dispatcher);
189+
let registry = Registry::new();
190+
let log_config = LogConfig::Grpc(GrpcLogConfig::default());
191+
let logs = Log::try_from_config(&(log_config, system.clone()), &registry)
192+
.await
193+
.unwrap();
194+
let collection_to_destroy = CollectionUuid::new();
195+
196+
// Create orchestrator with test dependencies
197+
let orchestrator = HardDeleteLogOnlyGarbageCollectorOrchestrator::new(
198+
dispatcher_handle.clone(),
199+
storage.clone(),
200+
logs.clone(),
201+
collection_to_destroy,
202+
);
203+
204+
// Verify the orchestrator is properly initialized
205+
assert_eq!(orchestrator.collection_to_destroy, collection_to_destroy);
206+
assert!(orchestrator.result_channel.is_none());
207+
}
208+
209+
/// Validates that the orchestrator correctly stores the collection UUID for hard deletion.
210+
///
211+
/// This test verifies that the orchestrator preserves the collection UUID that will be
212+
/// passed to the `DeleteUnusedLogsOperator` when `on_start` is called. It also documents
213+
/// the hardcoded configuration that will be used for the delete operation.
214+
///
215+
/// # Implementation Details
216+
///
217+
/// When the orchestrator starts the delete operator (in `try_start_delete_unused_logs_operator`),
218+
/// it uses the following hardcoded configuration:
219+
/// - `enabled`: true (operator is active)
220+
/// - `mode`: `CleanupMode::DeleteV2` (performs hard deletion)
221+
/// - `enable_dangerous_option_to_ignore_min_versions_for_wal3`: false (safety check enabled)
222+
///
223+
/// The collection UUID stored in `collection_to_destroy` is placed in the
224+
/// `collections_to_destroy` set, while `collections_to_garbage_collect` remains empty
225+
/// since this orchestrator only handles hard deletion, not soft garbage collection.
226+
#[tokio::test(flavor = "multi_thread")]
227+
async fn test_k8s_integration_delete_operator_params() {
228+
let (_storage_dir, storage) = test_storage();
229+
let system = System::new();
230+
let dispatcher = Dispatcher::new(Default::default());
231+
let dispatcher_handle = system.start_component(dispatcher);
232+
let registry = Registry::new();
233+
let log_config = LogConfig::Grpc(GrpcLogConfig::default());
234+
let logs = Log::try_from_config(&(log_config, system.clone()), &registry)
235+
.await
236+
.unwrap();
237+
let collection_to_destroy = CollectionUuid::new();
238+
239+
let orchestrator = HardDeleteLogOnlyGarbageCollectorOrchestrator::new(
240+
dispatcher_handle,
241+
storage.clone(),
242+
logs.clone(),
243+
collection_to_destroy,
244+
);
245+
246+
// Verify the orchestrator stores correct collection UUID for destruction
247+
assert_eq!(orchestrator.collection_to_destroy, collection_to_destroy);
248+
249+
// Note: The delete operator configuration is hardcoded in try_start_delete_unused_logs_operator
250+
// and cannot be modified externally. This ensures consistent deletion behavior.
251+
}
252+
}

0 commit comments

Comments
 (0)