Skip to content

Commit e2dd5b0

Browse files
committed
fix: add back gc protection for blobs
1 parent 95a9959 commit e2dd5b0

File tree

6 files changed

+149
-60
lines changed

6 files changed

+149
-60
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,6 @@ rpc = ["engine"]
104104
[package.metadata.docs.rs]
105105
all-features = true
106106
rustdoc-args = ["--cfg", "iroh_docsrs"]
107+
108+
[patch.crates-io]
109+
iroh-blobs = { git = "https://github.com/n0-computer/iroh-blobs", branch = "Frando/gc-protect" }

src/engine.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use futures_lite::{Stream, StreamExt};
1313
use iroh::{Endpoint, NodeAddr, PublicKey};
1414
use iroh_blobs::{
1515
api::{blobs::BlobStatus, downloader::Downloader, Store},
16+
store::fs::options::{ProtectCb, ProtectOutcome},
1617
Hash,
1718
};
1819
use iroh_gossip::net::Gossip;
@@ -56,6 +57,7 @@ pub struct Engine {
5657
#[debug("ContentStatusCallback")]
5758
content_status_cb: ContentStatusCallback,
5859
blob_store: iroh_blobs::api::Store,
60+
_gc_protect_task: AbortOnDropHandle<()>,
5961
}
6062

6163
impl Engine {
@@ -70,6 +72,7 @@ impl Engine {
7072
bao_store: iroh_blobs::api::Store,
7173
downloader: Downloader,
7274
default_author_storage: DefaultAuthorStorage,
75+
protect_cb: Option<ProtectCallbackHandler>,
7376
) -> anyhow::Result<Self> {
7477
let (live_actor_tx, to_live_actor_recv) = mpsc::channel(ACTOR_CHANNEL_CAP);
7578
let me = endpoint.node_id().fmt_short();
@@ -86,6 +89,33 @@ impl Engine {
8689
};
8790
let sync = SyncHandle::spawn(replica_store, Some(content_status_cb.clone()), me.clone());
8891

92+
let sync2 = sync.clone();
93+
let gc_protect_task = AbortOnDropHandle::new(n0_future::task::spawn(async move {
94+
let Some(mut protect_handler) = protect_cb else {
95+
return;
96+
};
97+
while let Some(reply_tx) = protect_handler.0.recv().await {
98+
let (tx, rx) = mpsc::channel(64);
99+
if let Err(_err) = reply_tx.send(rx) {
100+
continue;
101+
}
102+
let hashes = match sync2.content_hashes().await {
103+
Ok(hashes) => hashes,
104+
Err(err) => {
105+
if let Err(_err) = tx.send(Err(err)).await {
106+
break;
107+
}
108+
continue;
109+
}
110+
};
111+
for hash in hashes {
112+
if let Err(_err) = tx.send(hash).await {
113+
break;
114+
}
115+
}
116+
}
117+
}));
118+
89119
let actor = LiveActor::new(
90120
sync.clone(),
91121
endpoint.clone(),
@@ -123,6 +153,7 @@ impl Engine {
123153
content_status_cb,
124154
default_author,
125155
blob_store: bao_store,
156+
_gc_protect_task: gc_protect_task,
126157
})
127158
}
128159

@@ -457,3 +488,57 @@ impl DefaultAuthor {
457488
Ok(())
458489
}
459490
}
491+
492+
#[derive(Debug)]
493+
struct ProtectCallbackSender(mpsc::Sender<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>);
494+
495+
///
496+
#[derive(Debug)]
497+
pub struct ProtectCallbackHandler(
498+
pub(crate) mpsc::Receiver<oneshot::Sender<mpsc::Receiver<Result<Hash>>>>,
499+
);
500+
501+
impl ProtectCallbackHandler {
502+
///
503+
pub fn new() -> (Self, ProtectCb) {
504+
let (tx, rx) = mpsc::channel(4);
505+
let cb = ProtectCallbackSender(tx).into_cb();
506+
let handler = ProtectCallbackHandler(rx);
507+
(handler, cb)
508+
}
509+
}
510+
511+
impl ProtectCallbackSender {
512+
fn into_cb(self) -> ProtectCb {
513+
let start_tx = self.0.clone();
514+
Arc::new(move |live| {
515+
let start_tx = start_tx.clone();
516+
Box::pin(async move {
517+
let (tx, rx) = oneshot::channel();
518+
if let Err(_err) = start_tx.send(tx).await {
519+
tracing::warn!("Failed to get protected hashes from docs: ProtectCallback receiver dropped");
520+
return ProtectOutcome::Skip;
521+
}
522+
let mut rx = match rx.await {
523+
Ok(rx) => rx,
524+
Err(_err) => {
525+
tracing::warn!("Failed to get protected hashes from docs: ProtectCallback sender dropped");
526+
return ProtectOutcome::Skip;
527+
}
528+
};
529+
while let Some(res) = rx.recv().await {
530+
match res {
531+
Err(err) => {
532+
tracing::warn!("Getting protected hashes produces error: {err:#}");
533+
return ProtectOutcome::Skip;
534+
}
535+
Ok(hash) => {
536+
live.insert(hash);
537+
}
538+
}
539+
}
540+
ProtectOutcome::Continue
541+
})
542+
})
543+
}
544+
}

src/protocol.rs

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use iroh_gossip::net::Gossip;
99

1010
use crate::{
1111
api::DocsApi,
12-
engine::{DefaultAuthorStorage, Engine},
12+
engine::{DefaultAuthorStorage, Engine, ProtectCallbackHandler},
1313
store::Store,
1414
};
1515

@@ -29,7 +29,10 @@ impl Docs {
2929
/// Create a new [`Builder`] for the docs protocol, using a persistent replica and author storage
3030
/// in the given directory.
3131
pub fn persistent(path: PathBuf) -> Builder {
32-
Builder { path: Some(path) }
32+
Builder {
33+
path: Some(path),
34+
protect_cb: None,
35+
}
3336
}
3437

3538
/// Creates a new [`Docs`] from an [`Engine`].
@@ -79,9 +82,18 @@ impl ProtocolHandler for Docs {
7982
#[derive(Debug, Default)]
8083
pub struct Builder {
8184
path: Option<PathBuf>,
85+
protect_cb: Option<ProtectCallbackHandler>,
8286
}
8387

8488
impl Builder {
89+
/// Set the gc protect handler
90+
///
91+
/// TODO(Frando): Expand docs.
92+
pub fn protect_handler(mut self, protect_handler: ProtectCallbackHandler) -> Self {
93+
self.protect_cb = Some(protect_handler);
94+
self
95+
}
96+
8597
/// Build a [`Docs`] protocol given a [`Blobs`] and [`Gossip`] protocol.
8698
pub async fn spawn(
8799
self,
@@ -105,6 +117,7 @@ impl Builder {
105117
blobs,
106118
downloader,
107119
author_store,
120+
self.protect_cb,
108121
)
109122
.await?;
110123
Ok(Docs::new(engine))

tests/client.rs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#![cfg(feature = "rpc")]
22
use anyhow::{Context, Result};
3-
use futures_util::TryStreamExt;
43
use iroh_blobs::api::blobs::{ExportMode, ImportMode};
54
use iroh_docs::store::Query;
5+
use n0_future::StreamExt;
66
use rand::RngCore;
77
use testresult::TestResult;
88
use tokio::io::AsyncWriteExt;
@@ -40,8 +40,9 @@ async fn test_doc_close() -> Result<()> {
4040
}
4141

4242
#[tokio::test]
43-
#[traced_test]
43+
// #[traced_test]
4444
async fn test_doc_import_export() -> TestResult<()> {
45+
tracing_subscriber::fmt::init();
4546
let node = Node::memory().spawn().await?;
4647

4748
// create temp file
@@ -99,14 +100,21 @@ async fn test_doc_import_export() -> TestResult<()> {
99100
let key = entry.key().to_vec();
100101
let path = key_to_path(key, None, Some(out_root))?;
101102
// TODO(Frando): iroh-blobs should do this IMO.
102-
tokio::fs::create_dir_all(path.parent().unwrap()).await?;
103-
let _export_outcome = doc
103+
// tokio::fs::create_dir_all(path.parent().unwrap()).await?;
104+
let progress = doc
104105
.export_file(blobs, entry, path.clone(), ExportMode::Copy)
105-
.await
106-
.context("export file")?
107-
.finish()
108-
.await
109-
.context("export finish")?;
106+
.await?;
107+
let mut progress = progress.stream().await;
108+
while let Some(msg) = progress.next().await {
109+
println!("MSG {msg:?}");
110+
}
111+
// let _export_outcome = doc
112+
// .export_file(blobs, entry, path.clone(), ExportMode::Copy)
113+
// .await
114+
// .context("export file")?
115+
// .finish()
116+
// .await
117+
// .context("export finish")?;
110118

111119
let got_bytes = tokio::fs::read(path).await.context("tokio read")?;
112120
assert_eq!(buf, got_bytes);

tests/util.rs

Lines changed: 28 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@ use std::{
77
};
88

99
use iroh::{discovery::IntoDiscovery, dns::DnsResolver, NodeId, RelayMode, SecretKey};
10-
use iroh_docs::protocol::Docs;
10+
use iroh_blobs::store::fs::options::{GcConfig, Options};
11+
use iroh_docs::{engine::ProtectCallbackHandler, protocol::Docs};
1112
use iroh_gossip::net::Gossip;
1213

1314
/// Default bind address for the node.
@@ -74,7 +75,11 @@ pub struct Builder {
7475

7576
impl Builder {
7677
/// Spawns the node
77-
async fn spawn0(self, blobs: iroh_blobs::api::Store) -> anyhow::Result<Node> {
78+
async fn spawn0(
79+
self,
80+
blobs: iroh_blobs::api::Store,
81+
protect_cb: Option<ProtectCallbackHandler>,
82+
) -> anyhow::Result<Node> {
7883
let mut addr_v4 = DEFAULT_BIND_ADDR_V4;
7984
let mut addr_v6 = DEFAULT_BIND_ADDR_V6;
8085
if self.bind_random_port {
@@ -88,12 +93,14 @@ impl Builder {
8893
builder = builder.discovery_n0();
8994
let endpoint = builder.bind().await?;
9095
let mut router = iroh::protocol::Router::builder(endpoint.clone());
91-
// let blobs = Blobs::builder(store.clone()).build(&endpoint);
9296
let gossip = Gossip::builder().spawn(endpoint.clone());
93-
let docs_builder = match self.path {
97+
let mut docs_builder = match self.path {
9498
Some(ref path) => Docs::persistent(path.to_path_buf()),
9599
None => Docs::memory(),
96100
};
101+
if let Some(protect_cb) = protect_cb {
102+
docs_builder = docs_builder.protect_handler(protect_cb);
103+
}
97104
let docs = match docs_builder
98105
.spawn(endpoint.clone(), blobs.clone(), gossip.clone())
99106
.await
@@ -114,44 +121,6 @@ impl Builder {
114121
// Build the router
115122
let router = router.spawn();
116123

117-
// Setup RPC
118-
// let (internal_rpc, controller) =
119-
// quic_rpc::transport::flume::channel::<Request, Response>(1);
120-
// let controller = controller.boxed();
121-
// let internal_rpc = internal_rpc.boxed();
122-
// let internal_rpc = quic_rpc::RpcServer::<Service>::new(internal_rpc);
123-
124-
// let docs2 = docs.clone();
125-
// let blobs2 = blobs.clone();
126-
// let rpc_task: tokio::task::JoinHandle<()> = tokio::task::spawn(async move {
127-
// loop {
128-
// let request = internal_rpc.accept().await;
129-
// match request {
130-
// Ok(accepting) => {
131-
// let blobs = blobs2.clone();
132-
// let docs = docs2.clone();
133-
// tokio::task::spawn(async move {
134-
// let (msg, chan) = accepting.read_first().await?;
135-
// match msg {
136-
// Request::BlobsOrTags(msg) => {
137-
// blobs.handle_rpc_request(msg, chan.map().boxed()).await?;
138-
// }
139-
// Request::Docs(msg) => {
140-
// docs.handle_rpc_request(msg, chan.map().boxed()).await?;
141-
// }
142-
// }
143-
// anyhow::Ok(())
144-
// });
145-
// }
146-
// Err(err) => {
147-
// tracing::warn!("rpc error: {:?}", err);
148-
// }
149-
// }
150-
// }
151-
// });
152-
153-
// let client = quic_rpc::RpcClient::new(controller);
154-
155124
// TODO: Make this work again.
156125
// if let Some(period) = self.gc_interval {
157126
// blobs.add_protected(docs.protect_cb())?;
@@ -241,17 +210,29 @@ impl Node {
241210
impl Builder {
242211
/// Spawns the node
243212
pub async fn spawn(self) -> anyhow::Result<Node> {
244-
let store = match self.path {
213+
let (store, protect_handler) = match self.path {
245214
None => {
246215
let store = iroh_blobs::store::mem::MemStore::new();
247-
(&*store).clone()
216+
((*store).clone(), None)
248217
}
249218
Some(ref path) => {
250-
let store = iroh_blobs::store::fs::FsStore::load(path.clone()).await?;
251-
(&*store).clone()
219+
let db_path = path.join("blobs.db");
220+
let mut opts = Options::new(path);
221+
let protect_handler = if let Some(interval) = self.gc_interval {
222+
let (handler, cb) = ProtectCallbackHandler::new();
223+
opts.gc = Some(GcConfig {
224+
interval,
225+
add_protected: Some(cb),
226+
});
227+
Some(handler)
228+
} else {
229+
None
230+
};
231+
let store = iroh_blobs::store::fs::FsStore::load_with_opts(db_path, opts).await?;
232+
((*store).clone(), protect_handler)
252233
}
253234
};
254-
self.spawn0(store).await
235+
self.spawn0(store, protect_handler).await
255236
}
256237
}
257238

0 commit comments

Comments
 (0)