From 3d603b0d905d7e4de0d8f33df72c1d54f075e62b Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 15:43:05 +0300 Subject: [PATCH 1/4] remove options from BaoFileHandleInner it is present in global, so now we don't need to impl drop, we have it in on_shutdown --- src/store/fs.rs | 25 ++++++++++++------------- src/store/fs/bao_file.rs | 16 ++-------------- 2 files changed, 14 insertions(+), 27 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 53ed163a..ed9fa53c 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -218,9 +218,16 @@ impl entity_manager::Params for EmParams { type EntityState = Slot; async fn on_shutdown( - _state: entity_manager::ActiveEntityState, - _cause: entity_manager::ShutdownCause, + state: entity_manager::ActiveEntityState, + cause: entity_manager::ShutdownCause, ) { + // this isn't strictly necessary. Drop will run anyway as soon as the + // state is reset to it's default value. Doing it here means that we + // have exact control over where it happens. + if let Some(mut handle) = state.state.0.lock().await.take() { + trace!("shutting down hash: {}, cause: {cause:?}", state.id); + handle.persist(&state.global.options); + } } } @@ -312,10 +319,7 @@ impl HashContext { let res = self.db().get(hash).await.map_err(io::Error::other)?; let res = match res { Some(state) => open_bao_file(&hash, state, &self.global).await, - None => Ok(BaoFileHandle::new_partial_mem( - hash, - self.global.options.clone(), - )), + None => Ok(BaoFileHandle::new_partial_mem(hash)), }; Ok((res?, ())) }) @@ -362,7 +366,7 @@ async fn open_bao_file( MemOrFile::File(file) } }; - BaoFileHandle::new_complete(*hash, data, outboard, options.clone()) + BaoFileHandle::new_complete(*hash, data, outboard) } EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?, }) @@ -618,12 +622,7 @@ impl Actor { options: options.clone(), db: meta::Db::new(db_send), internal_cmd_tx: fs_commands_tx, - empty: BaoFileHandle::new_complete( - Hash::EMPTY, - MemOrFile::empty(), - MemOrFile::empty(), - options, - ), + empty: BaoFileHandle::new_complete(Hash::EMPTY, MemOrFile::empty(), MemOrFile::empty()), protect, }); rt.spawn(db_actor.run()); diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index bf150ae8..a12558fa 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -507,7 +507,6 @@ impl BaoFileStorage { pub struct BaoFileHandleInner { pub(crate) storage: watch::Sender, hash: Hash, - options: Arc, } impl fmt::Debug for BaoFileHandleInner { @@ -526,7 +525,7 @@ impl fmt::Debug for BaoFileHandleInner { pub struct BaoFileHandle(Arc); impl BaoFileHandle { - pub fn persist(&mut self) { + pub fn persist(&mut self, options: &Options) { self.0.storage.send_if_modified(|guard| { if Arc::strong_count(&self.0) > 1 { return false; @@ -534,7 +533,6 @@ impl BaoFileHandle { let BaoFileStorage::Partial(fs) = guard.take() else { return false; }; - let options = &self.options; let path = options.path.bitfield_path(&self.hash); trace!( "writing bitfield for hash {} to {}", @@ -554,12 +552,6 @@ impl BaoFileHandle { } } -impl Drop for BaoFileHandle { - fn drop(&mut self) { - self.persist(); - } -} - /// A reader for a bao file, reading just the data. #[derive(Debug)] pub struct DataReader(BaoFileHandle); @@ -601,12 +593,11 @@ impl BaoFileHandle { /// Create a new bao file handle. /// /// This will create a new file handle with an empty memory storage. - pub fn new_partial_mem(hash: Hash, options: Arc) -> Self { + pub fn new_partial_mem(hash: Hash) -> Self { let storage = BaoFileStorage::partial_mem(); Self(Arc::new(BaoFileHandleInner { storage: watch::Sender::new(storage), hash, - options: options.clone(), })) } @@ -626,7 +617,6 @@ impl BaoFileHandle { Ok(Self(Arc::new(BaoFileHandleInner { storage: watch::Sender::new(storage), hash, - options, }))) } @@ -635,13 +625,11 @@ impl BaoFileHandle { hash: Hash, data: MemOrFile>, outboard: MemOrFile, - options: Arc, ) -> Self { let storage = CompleteStorage { data, outboard }.into(); Self(Arc::new(BaoFileHandleInner { storage: watch::Sender::new(storage), hash, - options, })) } From e65bd074e05acf6e0408f252124c71fc32abe374 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 16:00:59 +0300 Subject: [PATCH 2/4] Get rid of BaoFileHandleInner and all that indirection --- src/store/fs.rs | 18 ++++---- src/store/fs/bao_file.rs | 96 +++++++++++++--------------------------- 2 files changed, 39 insertions(+), 75 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index ed9fa53c..1845ecb7 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -226,7 +226,7 @@ impl entity_manager::Params for EmParams { // have exact control over where it happens. if let Some(mut handle) = state.state.0.lock().await.take() { trace!("shutting down hash: {}, cause: {cause:?}", state.id); - handle.persist(&state.global.options); + handle.persist(&state.id, &state.global.options); } } } @@ -319,7 +319,7 @@ impl HashContext { let res = self.db().get(hash).await.map_err(io::Error::other)?; let res = match res { Some(state) => open_bao_file(&hash, state, &self.global).await, - None => Ok(BaoFileHandle::new_partial_mem(hash)), + None => Ok(BaoFileHandle::new_partial_mem()), }; Ok((res?, ())) }) @@ -366,7 +366,7 @@ async fn open_bao_file( MemOrFile::File(file) } }; - BaoFileHandle::new_complete(*hash, data, outboard) + BaoFileHandle::new_complete(data, outboard) } EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?, }) @@ -622,7 +622,7 @@ impl Actor { options: options.clone(), db: meta::Db::new(db_send), internal_cmd_tx: fs_commands_tx, - empty: BaoFileHandle::new_complete(Hash::EMPTY, MemOrFile::empty(), MemOrFile::empty()), + empty: BaoFileHandle::new_complete(MemOrFile::empty(), MemOrFile::empty()), protect, }); rt.spawn(db_actor.run()); @@ -926,7 +926,7 @@ async fn import_bao_impl( ) -> api::Result<()> { trace!( "importing bao: {} {} bytes", - handle.hash().fmt_short(), + ctx.id.fmt_short(), size ); let mut batch = Vec::::new(); @@ -935,7 +935,7 @@ async fn import_bao_impl( // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx.global).await?; + handle.write_batch(&batch, &bitfield, &ctx).await?; batch.clear(); ranges = ChunkRanges::empty(); } @@ -951,7 +951,7 @@ async fn import_bao_impl( } if !batch.is_empty() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx.global).await?; + handle.write_batch(&batch, &bitfield, &ctx).await?; } Ok(()) } @@ -991,7 +991,6 @@ async fn export_ranges_impl( "exporting ranges: {hash} {ranges:?} size={}", handle.current_size()? ); - debug_assert!(handle.hash() == hash, "hash mismatch"); let bitfield = handle.bitfield()?; let data = handle.data_reader(); let size = bitfield.size(); @@ -1052,8 +1051,7 @@ async fn export_bao_impl( handle: BaoFileHandle, ) -> anyhow::Result<()> { let ExportBaoRequest { ranges, hash, .. } = cmd; - debug_assert!(handle.hash() == hash, "hash mismatch"); - let outboard = handle.outboard()?; + let outboard = handle.outboard(&hash)?; let size = outboard.tree.size(); if size == 0 && hash != Hash::EMPTY { // we have no data whatsoever, so we stop here diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index a12558fa..f63ebb8f 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -31,7 +31,7 @@ use super::{ use crate::{ api::blobs::Bitfield, store::{ - fs::{meta::raw_outboard_size, TaskContext}, + fs::{meta::raw_outboard_size, HashContext, TaskContext}, util::{ read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile, PartialMemStorage, DD, @@ -401,21 +401,20 @@ impl BaoFileStorage { self, batch: &[BaoContentItem], bitfield: &Bitfield, - ctx: &TaskContext, - hash: &Hash, + ctx: &HashContext, ) -> io::Result<(Self, Option>)> { Ok(match self { BaoFileStorage::PartialMem(mut ms) => { // check if we need to switch to file mode, otherwise write to memory - if max_offset(batch) <= ctx.options.inline.max_data_inlined { + if max_offset(batch) <= ctx.global.options.inline.max_data_inlined { ms.write_batch(bitfield.size(), batch)?; let changes = ms.bitfield.update(bitfield); let new = changes.new_state(); if new.complete { - let (cs, update) = ms.into_complete(hash, ctx)?; + let (cs, update) = ms.into_complete(&ctx.id, &ctx.global)?; (cs.into(), Some(update)) } else { - let fs = ms.persist(ctx, hash)?; + let fs = ms.persist(&ctx.global, &ctx.id)?; let update = EntryState::Partial { size: new.validated_size, }; @@ -428,13 +427,13 @@ impl BaoFileStorage { // a write at the end of a very large file. // // opt: we should check if we become complete to avoid going from mem to partial to complete - let mut fs = ms.persist(ctx, hash)?; + let mut fs = ms.persist(&ctx.global, &ctx.id)?; fs.write_batch(bitfield.size(), batch)?; let changes = fs.bitfield.update(bitfield); let new = changes.new_state(); if new.complete { let size = new.validated_size.unwrap(); - let (cs, update) = fs.into_complete(size, &ctx.options)?; + let (cs, update) = fs.into_complete(size, &ctx.global.options)?; (cs.into(), Some(update)) } else { let update = EntryState::Partial { @@ -450,7 +449,7 @@ impl BaoFileStorage { let new = changes.new_state(); if new.complete { let size = new.validated_size.unwrap(); - let (cs, update) = fs.into_complete(size, &ctx.options)?; + let (cs, update) = fs.into_complete(size, &ctx.global.options)?; (cs.into(), Some(update)) } else if changes.was_validated() { // we are still partial, but now we know the size @@ -503,46 +502,29 @@ impl BaoFileStorage { } } -/// The inner part of a bao file handle. -pub struct BaoFileHandleInner { - pub(crate) storage: watch::Sender, - hash: Hash, -} - -impl fmt::Debug for BaoFileHandleInner { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let guard = self.storage.borrow(); - let storage = guard.deref(); - f.debug_struct("BaoFileHandleInner") - .field("hash", &DD(self.hash)) - .field("storage", &storage) - .finish_non_exhaustive() - } -} - /// A cheaply cloneable handle to a bao file, including the hash and the configuration. #[derive(Debug, Clone, derive_more::Deref)] -pub struct BaoFileHandle(Arc); +pub(crate) struct BaoFileHandle(Arc>); impl BaoFileHandle { - pub fn persist(&mut self, options: &Options) { - self.0.storage.send_if_modified(|guard| { + pub fn persist(&mut self, hash: &Hash, options: &Options) { + self.send_if_modified(|guard| { if Arc::strong_count(&self.0) > 1 { return false; } let BaoFileStorage::Partial(fs) = guard.take() else { return false; }; - let path = options.path.bitfield_path(&self.hash); + let path = options.path.bitfield_path(hash); trace!( "writing bitfield for hash {} to {}", - self.hash, + hash, path.display() ); if let Err(cause) = fs.sync_all(&path) { error!( "failed to write bitfield for {} at {}: {:?}", - self.hash, + hash, path.display(), cause ); @@ -558,7 +540,7 @@ pub struct DataReader(BaoFileHandle); impl ReadBytesAt for DataReader { fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result { - let guard = self.0.storage.borrow(); + let guard = self.0.borrow(); match guard.deref() { BaoFileStorage::PartialMem(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Partial(x) => x.data.read_bytes_at(offset, size), @@ -574,7 +556,7 @@ pub struct OutboardReader(BaoFileHandle); impl ReadAt for OutboardReader { fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result { - let guard = self.0.storage.borrow(); + let guard = self.0.borrow(); match guard.deref() { BaoFileStorage::Complete(x) => x.outboard.read_at(offset, buf), BaoFileStorage::PartialMem(x) => x.outboard.read_at(offset, buf), @@ -593,12 +575,9 @@ impl BaoFileHandle { /// Create a new bao file handle. /// /// This will create a new file handle with an empty memory storage. - pub fn new_partial_mem(hash: Hash) -> Self { + pub fn new_partial_mem() -> Self { let storage = BaoFileStorage::partial_mem(); - Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - })) + Self(Arc::new(watch::Sender::new(storage))) } /// Create a new bao file handle with a partial file. @@ -614,23 +593,16 @@ impl BaoFileHandle { } else { storage.into() }; - Ok(Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - }))) + Ok(Self(Arc::new(watch::Sender::new(storage)))) } /// Create a new complete bao file handle. pub fn new_complete( - hash: Hash, data: MemOrFile>, outboard: MemOrFile, ) -> Self { let storage = CompleteStorage { data, outboard }.into(); - Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - })) + Self(Arc::new(watch::Sender::new(storage))) } /// Complete the handle @@ -639,7 +611,7 @@ impl BaoFileHandle { data: MemOrFile>, outboard: MemOrFile, ) { - self.storage.send_if_modified(|guard| { + self.send_if_modified(|guard| { let res = match guard { BaoFileStorage::Complete(_) => None, BaoFileStorage::PartialMem(entry) => Some(&mut entry.bitfield), @@ -657,13 +629,13 @@ impl BaoFileHandle { } pub fn subscribe(&self) -> BaoFileStorageSubscriber { - BaoFileStorageSubscriber::new(self.0.storage.subscribe()) + BaoFileStorageSubscriber::new(self.0.subscribe()) } /// True if the file is complete. #[allow(dead_code)] pub fn is_complete(&self) -> bool { - matches!(self.storage.borrow().deref(), BaoFileStorage::Complete(_)) + matches!(self.borrow().deref(), BaoFileStorage::Complete(_)) } /// An AsyncSliceReader for the data file. @@ -684,7 +656,7 @@ impl BaoFileHandle { /// The most precise known total size of the data file. pub fn current_size(&self) -> io::Result { - match self.storage.borrow().deref() { + match self.borrow().deref() { BaoFileStorage::Complete(mem) => Ok(mem.size()), BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), BaoFileStorage::Partial(file) => file.current_size(), @@ -694,7 +666,7 @@ impl BaoFileHandle { /// The most precise known total size of the data file. pub fn bitfield(&self) -> io::Result { - match self.storage.borrow().deref() { + match self.borrow().deref() { BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), @@ -703,33 +675,27 @@ impl BaoFileHandle { } /// The outboard for the file. - pub fn outboard(&self) -> io::Result> { - let root = self.hash.into(); + pub fn outboard(&self, hash: &Hash) -> io::Result> { let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); let outboard = self.outboard_reader(); Ok(PreOrderOutboard { - root, + root: blake3::Hash::from(*hash), tree, data: outboard, }) } - /// The hash of the file. - pub fn hash(&self) -> Hash { - self.hash - } - /// Write a batch and notify the db pub(super) async fn write_batch( &self, batch: &[BaoContentItem], bitfield: &Bitfield, - ctx: &TaskContext, + ctx: &HashContext, ) -> io::Result<()> { trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); let mut res = Ok(None); - self.storage.send_if_modified(|state| { - let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx, &self.hash) + self.send_if_modified(|state| { + let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) else { res = Err(io::Error::other("write batch failed")); return false; @@ -739,7 +705,7 @@ impl BaoFileHandle { true }); if let Some(update) = res? { - ctx.db.update(self.hash, update).await?; + ctx.global.db.update(ctx.id, update).await?; } Ok(()) } From e81b1f3cb1084e38026cbafe0dea5af3e710046a Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 16:16:26 +0300 Subject: [PATCH 3/4] change a few fns to take HashContext directly instead of 2 args --- src/store/fs.rs | 8 ++------ src/store/fs/bao_file.rs | 44 ++++++++++++++++++---------------------- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 1845ecb7..1159e571 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -226,7 +226,7 @@ impl entity_manager::Params for EmParams { // have exact control over where it happens. if let Some(mut handle) = state.state.0.lock().await.take() { trace!("shutting down hash: {}, cause: {cause:?}", state.id); - handle.persist(&state.id, &state.global.options); + handle.persist(&state); } } } @@ -924,11 +924,7 @@ async fn import_bao_impl( handle: BaoFileHandle, ctx: HashContext, ) -> api::Result<()> { - trace!( - "importing bao: {} {} bytes", - ctx.id.fmt_short(), - size - ); + trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size); let mut batch = Vec::::new(); let mut ranges = ChunkRanges::empty(); while let Some(item) = rx.recv().await? { diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index f63ebb8f..30b790c5 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -335,18 +335,16 @@ impl Default for BaoFileStorage { impl PartialMemStorage { /// Converts this storage into a complete storage, using the given hash for /// path names and the given options for decisions about inlining. - fn into_complete( - self, - hash: &Hash, - ctx: &TaskContext, - ) -> io::Result<(CompleteStorage, EntryState)> { + fn into_complete(self, ctx: &HashContext) -> io::Result<(CompleteStorage, EntryState)> { + let options = &ctx.global.options; + let hash = &ctx.id; let size = self.current_size(); let outboard_size = raw_outboard_size(size); - let (data, data_location) = if ctx.options.is_inlined_data(size) { + let (data, data_location) = if options.is_inlined_data(size) { let data: Bytes = self.data.to_vec().into(); (MemOrFile::Mem(data.clone()), DataLocation::Inline(data)) } else { - let data_path = ctx.options.path.data_path(hash); + let data_path = options.path.data_path(hash); let mut data_file = create_read_write(&data_path)?; self.data.persist(&mut data_file)?; ( @@ -354,7 +352,8 @@ impl PartialMemStorage { DataLocation::Owned(size), ) }; - let (outboard, outboard_location) = if ctx.options.is_inlined_outboard(outboard_size) { + let (outboard, outboard_location) = if ctx.global.options.is_inlined_outboard(outboard_size) + { if outboard_size > 0 { let outboard: Bytes = self.outboard.to_vec().into(); ( @@ -365,7 +364,7 @@ impl PartialMemStorage { (MemOrFile::empty(), OutboardLocation::NotNeeded) } } else { - let outboard_path = ctx.options.path.outboard_path(hash); + let outboard_path = ctx.global.options.path.outboard_path(hash); let mut outboard_file = create_read_write(&outboard_path)?; self.outboard.persist(&mut outboard_file)?; let outboard_location = if outboard_size == 0 { @@ -411,10 +410,10 @@ impl BaoFileStorage { let changes = ms.bitfield.update(bitfield); let new = changes.new_state(); if new.complete { - let (cs, update) = ms.into_complete(&ctx.id, &ctx.global)?; + let (cs, update) = ms.into_complete(ctx)?; (cs.into(), Some(update)) } else { - let fs = ms.persist(&ctx.global, &ctx.id)?; + let fs = ms.persist(ctx)?; let update = EntryState::Partial { size: new.validated_size, }; @@ -427,7 +426,7 @@ impl BaoFileStorage { // a write at the end of a very large file. // // opt: we should check if we become complete to avoid going from mem to partial to complete - let mut fs = ms.persist(&ctx.global, &ctx.id)?; + let mut fs = ms.persist(ctx)?; fs.write_batch(bitfield.size(), batch)?; let changes = fs.bitfield.update(bitfield); let new = changes.new_state(); @@ -507,20 +506,17 @@ impl BaoFileStorage { pub(crate) struct BaoFileHandle(Arc>); impl BaoFileHandle { - pub fn persist(&mut self, hash: &Hash, options: &Options) { + pub(super) fn persist(&mut self, ctx: &HashContext) { self.send_if_modified(|guard| { + let hash = &ctx.id; if Arc::strong_count(&self.0) > 1 { return false; } let BaoFileStorage::Partial(fs) = guard.take() else { return false; }; - let path = options.path.bitfield_path(hash); - trace!( - "writing bitfield for hash {} to {}", - hash, - path.display() - ); + let path = ctx.global.options.path.bitfield_path(hash); + trace!("writing bitfield for hash {} to {}", hash, path.display()); if let Err(cause) = fs.sync_all(&path) { error!( "failed to write bitfield for {} at {}: {:?}", @@ -695,8 +691,7 @@ impl BaoFileHandle { trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); let mut res = Ok(None); self.send_if_modified(|state| { - let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) - else { + let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx) else { res = Err(io::Error::other("write batch failed")); return false; }; @@ -713,9 +708,10 @@ impl BaoFileHandle { impl PartialMemStorage { /// Persist the batch to disk. - fn persist(self, ctx: &TaskContext, hash: &Hash) -> io::Result { - let options = &ctx.options.path; - ctx.protect.protect( + fn persist(self, ctx: &HashContext) -> io::Result { + let options = &ctx.global.options.path; + let hash = &ctx.id; + ctx.global.protect.protect( *hash, [ BaoFilePart::Data, From 27df6da669d8868c070a7093225415345ec92908 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 9 Jul 2025 17:19:51 +0300 Subject: [PATCH 4/4] more using HashContext instead of TaskContext --- src/store/fs.rs | 15 ++++++--------- src/store/fs/bao_file.rs | 11 ++++++----- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/store/fs.rs b/src/store/fs.rs index 1159e571..b323571b 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -298,7 +298,7 @@ impl HashContext { .get_or_create(|| async { let res = self.db().get(hash).await.map_err(io::Error::other)?; let res = match res { - Some(state) => open_bao_file(&hash, state, &self.global).await, + Some(state) => open_bao_file(state, self).await, None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")), }; Ok((res?, ())) @@ -318,7 +318,7 @@ impl HashContext { .get_or_create(|| async { let res = self.db().get(hash).await.map_err(io::Error::other)?; let res = match res { - Some(state) => open_bao_file(&hash, state, &self.global).await, + Some(state) => open_bao_file(state, self).await, None => Ok(BaoFileHandle::new_partial_mem()), }; Ok((res?, ())) @@ -331,12 +331,9 @@ impl HashContext { } } -async fn open_bao_file( - hash: &Hash, - state: EntryState, - ctx: &TaskContext, -) -> io::Result { - let options = &ctx.options; +async fn open_bao_file(state: EntryState, ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = &ctx.global.options; Ok(match state { EntryState::Complete { data_location, @@ -368,7 +365,7 @@ async fn open_bao_file( }; BaoFileHandle::new_complete(data, outboard) } - EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?, + EntryState::Partial { .. } => BaoFileHandle::new_partial_file(ctx).await?, }) } diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 30b790c5..1fe91404 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -31,7 +31,7 @@ use super::{ use crate::{ api::blobs::Bitfield, store::{ - fs::{meta::raw_outboard_size, HashContext, TaskContext}, + fs::{meta::raw_outboard_size, HashContext}, util::{ read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile, PartialMemStorage, DD, @@ -577,14 +577,15 @@ impl BaoFileHandle { } /// Create a new bao file handle with a partial file. - pub(super) async fn new_partial_file(hash: Hash, ctx: &TaskContext) -> io::Result { - let options = ctx.options.clone(); - let storage = PartialFileStorage::load(&hash, &options.path)?; + pub(super) async fn new_partial_file(ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = ctx.global.options.clone(); + let storage = PartialFileStorage::load(hash, &options.path)?; let storage = if storage.bitfield.is_complete() { let size = storage.bitfield.size; let (storage, entry_state) = storage.into_complete(size, &options)?; debug!("File was reconstructed as complete"); - ctx.db.set(hash, entry_state).await?; + ctx.global.db.set(*hash, entry_state).await?; storage.into() } else { storage.into()