From af9f5157762a9bcbc6a2234c53530b1810345673 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 2 Jul 2025 11:35:17 +0200 Subject: [PATCH 1/3] feat: add back gc protect callback --- src/store/fs/gc.rs | 36 ++++++++++++++++++++++++++++++++---- src/store/fs/options.rs | 3 ++- 2 files changed, 34 insertions(+), 5 deletions(-) diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index a394dc19..65b947db 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -1,9 +1,9 @@ -use std::collections::HashSet; +use std::{collections::HashSet, pin::Pin, sync::Arc}; use bao_tree::ChunkRanges; use genawaiter::sync::{Co, Gen}; use n0_future::{Stream, StreamExt}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use crate::{api::Store, Hash, HashAndFormat}; @@ -130,14 +130,31 @@ fn gc_sweep<'a>( }) } -#[derive(Debug, Clone)] +#[derive(derive_more::Debug, Clone)] pub struct GcConfig { pub interval: std::time::Duration, + #[debug("ProtectCallback")] + pub add_protected: Option, } +#[derive(Debug)] +pub enum ProtectOutcome { + Continue, + Skip, +} + +pub type ProtectCb = Arc< + dyn for<'a> Fn( + &'a mut HashSet, + ) + -> Pin + Send + Sync + 'a>> + + Send + + Sync + + 'static, +>; + pub async fn gc_run_once(store: &Store, live: &mut HashSet) -> crate::api::Result<()> { { - live.clear(); store.clear_protected().await?; let mut stream = gc_mark(store, live); while let Some(ev) = stream.next().await { @@ -179,7 +196,17 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet) -> crate::api: pub async fn run_gc(store: Store, config: GcConfig) { let mut live = HashSet::new(); loop { + live.clear(); tokio::time::sleep(config.interval).await; + if let Some(ref cb) = config.add_protected { + match (cb)(&mut live).await { + ProtectOutcome::Continue => {} + ProtectOutcome::Skip => { + info!("Skip gc run: protect callback indicated skip"); + continue; + } + } + } if let Err(e) = gc_run_once(&store, &mut live).await { error!("error during gc run: {e}"); break; @@ -288,6 +315,7 @@ mod tests { assert!(!data_path.exists()); assert!(!outboard_path.exists()); } + live.clear(); // create a large partial file and check that the data and outboard file as well as // the sizes and bitfield files are deleted by gc { diff --git a/src/store/fs/options.rs b/src/store/fs/options.rs index 6e123b75..f7dfa82f 100644 --- a/src/store/fs/options.rs +++ b/src/store/fs/options.rs @@ -4,7 +4,8 @@ use std::{ time::Duration, }; -use super::{gc::GcConfig, meta::raw_outboard_size, temp_name}; +pub use super::gc::{GcConfig, ProtectCb, ProtectOutcome}; +use super::{meta::raw_outboard_size, temp_name}; use crate::Hash; /// Options for directories used by the file store. From ff1c2f8b99080abf155beabb88ce985516dbef0c Mon Sep 17 00:00:00 2001 From: Frando Date: Thu, 3 Jul 2025 10:57:40 +0200 Subject: [PATCH 2/3] chore: more debug logs for gc --- src/store/fs/gc.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index 65b947db..671e2318 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -154,6 +154,7 @@ pub type ProtectCb = Arc< >; pub async fn gc_run_once(store: &Store, live: &mut HashSet) -> crate::api::Result<()> { + debug!(externally_protected = live.len(), "gc: start"); { store.clear_protected().await?; let mut stream = gc_mark(store, live); @@ -172,6 +173,7 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet) -> crate::api: } } } + debug!(total_protected = live.len(), "gc: sweep"); { let mut stream = gc_sweep(store, live); while let Some(ev) = stream.next().await { @@ -189,11 +191,13 @@ pub async fn gc_run_once(store: &Store, live: &mut HashSet) -> crate::api: } } } + debug!("gc: done"); Ok(()) } pub async fn run_gc(store: Store, config: GcConfig) { + debug!("gc enabled with interval {:?}", config.interval); let mut live = HashSet::new(); loop { live.clear(); From b1c92700836e76c4f4ef6458721b095121e14a43 Mon Sep 17 00:00:00 2001 From: Frando Date: Thu, 3 Jul 2025 12:57:52 +0200 Subject: [PATCH 3/3] docs: add docs for gc protection --- src/store/fs/gc.rs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index 671e2318..df272dbb 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -130,19 +130,39 @@ fn gc_sweep<'a>( }) } +/// Configuration for garbage collection. #[derive(derive_more::Debug, Clone)] pub struct GcConfig { + /// Interval in which to run garbage collection. pub interval: std::time::Duration, + /// Optional callback to manually add protected blobs. + /// + /// The callback is called before each garbage collection run. It gets a `&mut HashSet` + /// and returns a future that returns [`ProtectOutcome`]. All hashes that are added to the + /// [`HashSet`] will be protected from garbage collection during this run. + /// + /// In normal operation, return [`ProtectOutcome::Continue`] from the callback. If you return + /// [`ProtectOutcome::Abort`], the garbage collection run will be aborted.Use this if your + /// source of hashes to protect returned an error, and thus garbage collection should be skipped + /// completely to not unintentionally delete blobs that should be protected. #[debug("ProtectCallback")] pub add_protected: Option, } +/// Returned from [`ProtectCb`]. +/// +/// See [`GcConfig::add_protected] for details. #[derive(Debug)] pub enum ProtectOutcome { + /// Continue with the garbage collection run. Continue, - Skip, + /// Abort the garbage collection run. + Abort, } +/// The type of the garbage collection callback. +/// +/// See [`GcConfig::add_protected] for details. pub type ProtectCb = Arc< dyn for<'a> Fn( &'a mut HashSet, @@ -205,8 +225,8 @@ pub async fn run_gc(store: Store, config: GcConfig) { if let Some(ref cb) = config.add_protected { match (cb)(&mut live).await { ProtectOutcome::Continue => {} - ProtectOutcome::Skip => { - info!("Skip gc run: protect callback indicated skip"); + ProtectOutcome::Abort => { + info!("abort gc run: protect callback indicated abort"); continue; } }