From 4bfa953c0966ca071bd40292862f687c816ebe31 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 22 Jul 2025 12:52:22 +0300 Subject: [PATCH 1/7] Implement Reader for individual blobs. --- src/api/blobs.rs | 17 ++++ src/api/blobs/reader.rs | 170 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 187 insertions(+) create mode 100644 src/api/blobs/reader.rs diff --git a/src/api/blobs.rs b/src/api/blobs.rs index 0f79838f..cc141769 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt}; use quinn::SendStream; use range_collections::{range_set::RangeSetRange, RangeSet2}; use ref_cast::RefCast; +use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; use tracing::trace; +mod reader; +pub use reader::Reader; // Public reexports from the proto module. // @@ -102,6 +105,14 @@ impl Blobs { }) } + pub fn reader(&self, hash: impl Into) -> Reader { + self.reader_with_opts(ReaderOptions { hash: hash.into() }) + } + + pub fn reader_with_opts(&self, options: ReaderOptions) -> Reader { + Reader::new(self.clone(), options) + } + /// Delete a blob. /// /// This function is not public, because it does not work as expected when called manually, @@ -647,6 +658,12 @@ impl<'a> AddProgress<'a> { } } +/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ReaderOptions { + pub hash: Hash, +} + /// An observe result. Awaiting this will return the current state. /// /// Calling [`ObserveProgress::stream`] will return a stream of updates, where diff --git a/src/api/blobs/reader.rs b/src/api/blobs/reader.rs new file mode 100644 index 00000000..44365a48 --- /dev/null +++ b/src/api/blobs/reader.rs @@ -0,0 +1,170 @@ +use std::{ + io::{self, ErrorKind, SeekFrom}, + ops::DerefMut, + sync::{Arc, Mutex}, + task::Poll, +}; + +use n0_future::FutureExt; + +use crate::api::{ + blobs::{Blobs, ReaderOptions}, + RequestResult, +}; + +pub struct Reader { + blobs: Blobs, + options: ReaderOptions, + state: Arc>, +} + +#[derive(Default)] +enum ReaderState { + Idle { + position: u64, + }, + Seeking { + position: u64, + }, + Reading { + position: u64, + op: n0_future::boxed::BoxFuture>>, + }, + #[default] + Poisoned, +} + +impl Reader { + pub fn new(blobs: Blobs, options: ReaderOptions) -> Self { + Self { + blobs, + options, + state: Arc::new(Mutex::new(ReaderState::Idle { position: 0 })), + } + } +} + +impl tokio::io::AsyncRead for Reader { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + match std::mem::take(this.state.lock().unwrap().deref_mut()) { + ReaderState::Idle { position } => { + // todo: read until next page boundary instead of fixed size + let len = buf.remaining().min(1024 * 16); + let end = position.checked_add(len as u64).ok_or_else(|| { + io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading") + })?; + let hash = this.options.hash; + let blobs = this.blobs.clone(); + let ranges = position..end; + let op = async move { blobs.export_ranges(hash, ranges).concatenate().await }; + *this.state.lock().unwrap() = ReaderState::Reading { + position, + op: Box::pin(op), + }; + } + ReaderState::Reading { position, mut op } => { + match op.poll(cx) { + Poll::Ready(Ok(data)) => { + let len = data.len(); + if len > buf.remaining() { + return Poll::Ready(Err(io::Error::new( + ErrorKind::UnexpectedEof, + "Read more data than buffer can hold", + ))); + } + buf.put_slice(&data); + let position = position + len as u64; + *this.state.lock().unwrap() = ReaderState::Idle { position }; + return Poll::Ready(Ok(())); + } + Poll::Ready(Err(e)) => { + *this.state.lock().unwrap() = ReaderState::Idle { position }; + let e = io::Error::new(ErrorKind::Other, e.to_string()); + return Poll::Ready(Err(e)); + } + Poll::Pending => { + // Put back the state + *this.state.lock().unwrap() = ReaderState::Reading { + position, + op: Box::pin(op), + }; + return Poll::Pending; + } + } + } + state @ ReaderState::Seeking { .. } => { + *this.state.lock().unwrap() = state; + return Poll::Ready(Err(io::Error::new( + ErrorKind::Other, + "Can't read while seeking", + ))); + } + ReaderState::Poisoned => { + return Poll::Ready(Err(io::Error::other("Reader is poisoned"))); + } + }; + todo!() + } +} + +impl tokio::io::AsyncSeek for Reader { + fn start_seek( + self: std::pin::Pin<&mut Self>, + seek_from: tokio::io::SeekFrom, + ) -> std::io::Result<()> { + let this = self.get_mut(); + match std::mem::take(this.state.lock().unwrap().deref_mut()) { + ReaderState::Idle { position } => { + let position1 = match seek_from { + SeekFrom::Start(pos) => pos, + SeekFrom::Current(offset) => { + position.checked_add_signed(offset).ok_or_else(|| { + io::Error::new( + ErrorKind::InvalidInput, + "Position overflow when seeking", + ) + })? + } + SeekFrom::End(_offset) => { + // todo: support seeking from end if we know the size + return Err(io::Error::new( + ErrorKind::InvalidInput, + "Seeking from end is not supported yet", + ))?; + } + }; + *this.state.lock().unwrap() = ReaderState::Seeking { + position: position1, + }; + Ok(()) + } + ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")), + ReaderState::Seeking { .. } => Err(io::Error::other("Already seeking")), + ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), + } + } + + fn poll_complete( + self: std::pin::Pin<&mut Self>, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + Poll::Ready( + match std::mem::take(this.state.lock().unwrap().deref_mut()) { + ReaderState::Seeking { position } => { + // we only put the state back if we are in the right state + *this.state.lock().unwrap() = ReaderState::Idle { position }; + Ok(position) + } + ReaderState::Idle { .. } => Err(io::Error::other("No seek operation in progress")), + ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")), + ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), + }, + ) + } +} From 6925dd6d1ae50e2e95e7149216d11988061c9678 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 22 Jul 2025 15:31:40 +0300 Subject: [PATCH 2/7] Fix some bugs --- src/api/blobs.rs | 2 +- src/api/blobs/reader.rs | 318 ++++++++++++++++++++++++++++++---------- src/store/fs.rs | 10 +- 3 files changed, 243 insertions(+), 87 deletions(-) diff --git a/src/api/blobs.rs b/src/api/blobs.rs index cc141769..69169582 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -873,7 +873,7 @@ impl ExportRangesProgress { /// range of 0..100, you will get the entire first chunk, 0..1024. /// /// It is up to the caller to clip the ranges to the requested ranges. - pub async fn stream(self) -> impl Stream { + pub fn stream(self) -> impl Stream { Gen::new(|co| async move { let mut rx = match self.inner.await { Ok(rx) => rx, diff --git a/src/api/blobs/reader.rs b/src/api/blobs/reader.rs index 44365a48..54fae35e 100644 --- a/src/api/blobs/reader.rs +++ b/src/api/blobs/reader.rs @@ -1,24 +1,26 @@ use std::{ io::{self, ErrorKind, SeekFrom}, ops::DerefMut, + pin::Pin, sync::{Arc, Mutex}, - task::Poll, + task::{Context, Poll}, }; -use n0_future::FutureExt; +use n0_future::StreamExt; use crate::api::{ blobs::{Blobs, ReaderOptions}, - RequestResult, + proto::ExportRangesItem, }; +#[derive(Debug)] pub struct Reader { blobs: Blobs, options: ReaderOptions, state: Arc>, } -#[derive(Default)] +#[derive(Default, derive_more::Debug)] enum ReaderState { Idle { position: u64, @@ -28,7 +30,8 @@ enum ReaderState { }, Reading { position: u64, - op: n0_future::boxed::BoxFuture>>, + #[debug(skip)] + op: n0_future::boxed::BoxStream, }, #[default] Poisoned, @@ -46,69 +49,101 @@ impl Reader { impl tokio::io::AsyncRead for Reader { fn poll_read( - self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, + self: Pin<&mut Self>, + cx: &mut Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { + ) -> Poll> { let this = self.get_mut(); - match std::mem::take(this.state.lock().unwrap().deref_mut()) { - ReaderState::Idle { position } => { - // todo: read until next page boundary instead of fixed size - let len = buf.remaining().min(1024 * 16); - let end = position.checked_add(len as u64).ok_or_else(|| { - io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading") - })?; - let hash = this.options.hash; - let blobs = this.blobs.clone(); - let ranges = position..end; - let op = async move { blobs.export_ranges(hash, ranges).concatenate().await }; - *this.state.lock().unwrap() = ReaderState::Reading { - position, - op: Box::pin(op), - }; - } - ReaderState::Reading { position, mut op } => { - match op.poll(cx) { - Poll::Ready(Ok(data)) => { - let len = data.len(); - if len > buf.remaining() { - return Poll::Ready(Err(io::Error::new( - ErrorKind::UnexpectedEof, - "Read more data than buffer can hold", + let mut position1 = None; + loop { + let mut guard = this.state.lock().unwrap(); + match std::mem::take(guard.deref_mut()) { + ReaderState::Idle { position } => { + // todo: read until next page boundary instead of fixed size + let len = buf.remaining() as u64; + let end = position.checked_add(len).ok_or_else(|| { + io::Error::new(ErrorKind::InvalidInput, "Position overflow when reading") + })?; + // start the export op for the entire size of the buffer, and convert to a stream + let stream = this + .blobs + .export_ranges(this.options.hash, position..end) + .stream(); + position1 = Some(position); + *guard = ReaderState::Reading { + position, + op: Box::pin(stream), + }; + } + ReaderState::Reading { position, mut op } => { + let position1 = position1.get_or_insert(position); + match op.poll_next(cx) { + Poll::Ready(Some(ExportRangesItem::Data(data))) => { + if data.offset != *position1 { + break Poll::Ready(Err(io::Error::other( + "Data offset does not match expected position", + ))); + } + buf.put_slice(&data.data); + // update just local position1, not the position in the state. + *position1 = + position1 + .checked_add(data.data.len() as u64) + .ok_or_else(|| { + io::Error::new(ErrorKind::InvalidInput, "Position overflow") + })?; + *guard = ReaderState::Reading { position, op }; + } + Poll::Ready(Some(ExportRangesItem::Error(err))) => { + *guard = ReaderState::Idle { position }; + break Poll::Ready(Err(io::Error::other( + format!("Error reading data: {err}"), ))); } - buf.put_slice(&data); - let position = position + len as u64; - *this.state.lock().unwrap() = ReaderState::Idle { position }; - return Poll::Ready(Ok(())); - } - Poll::Ready(Err(e)) => { - *this.state.lock().unwrap() = ReaderState::Idle { position }; - let e = io::Error::new(ErrorKind::Other, e.to_string()); - return Poll::Ready(Err(e)); - } - Poll::Pending => { - // Put back the state - *this.state.lock().unwrap() = ReaderState::Reading { - position, - op: Box::pin(op), - }; - return Poll::Pending; + Poll::Ready(Some(ExportRangesItem::Size(_size))) => { + // put back the state and continue reading + *guard = ReaderState::Reading { position, op }; + } + Poll::Ready(None) => { + // done with the stream, go back in idle. + *guard = ReaderState::Idle { + position: *position1, + }; + break Poll::Ready(Ok(())); + } + Poll::Pending => { + break if position != *position1 { + // we read some data so we need to abort the op. + // + // we can't be sure we won't be called with the same buf size next time. + *guard = ReaderState::Idle { + position: *position1, + }; + Poll::Ready(Ok(())) + } else { + // nothing was read yet, we remain in the reading state + // + // we make an assumption here that the next call will be with the same buf size. + *guard = ReaderState::Reading { + position: *position1, + op, + }; + Poll::Pending + }; + } } } - } - state @ ReaderState::Seeking { .. } => { - *this.state.lock().unwrap() = state; - return Poll::Ready(Err(io::Error::new( - ErrorKind::Other, - "Can't read while seeking", - ))); - } - ReaderState::Poisoned => { - return Poll::Ready(Err(io::Error::other("Reader is poisoned"))); - } - }; - todo!() + state @ ReaderState::Seeking { .. } => { + *this.state.lock().unwrap() = state; + break Poll::Ready(Err(io::Error::other( + "Can't read while seeking", + ))); + } + ReaderState::Poisoned => { + break Poll::Ready(Err(io::Error::other("Reader is poisoned"))); + } + }; + } } } @@ -116,9 +151,10 @@ impl tokio::io::AsyncSeek for Reader { fn start_seek( self: std::pin::Pin<&mut Self>, seek_from: tokio::io::SeekFrom, - ) -> std::io::Result<()> { + ) -> io::Result<()> { let this = self.get_mut(); - match std::mem::take(this.state.lock().unwrap().deref_mut()) { + let mut guard = this.state.lock().unwrap(); + match std::mem::take(guard.deref_mut()) { ReaderState::Idle { position } => { let position1 = match seek_from { SeekFrom::Start(pos) => pos, @@ -138,7 +174,7 @@ impl tokio::io::AsyncSeek for Reader { ))?; } }; - *this.state.lock().unwrap() = ReaderState::Seeking { + *guard = ReaderState::Seeking { position: position1, }; Ok(()) @@ -149,22 +185,144 @@ impl tokio::io::AsyncSeek for Reader { } } - fn poll_complete( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { + fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - Poll::Ready( - match std::mem::take(this.state.lock().unwrap().deref_mut()) { - ReaderState::Seeking { position } => { - // we only put the state back if we are in the right state - *this.state.lock().unwrap() = ReaderState::Idle { position }; - Ok(position) - } - ReaderState::Idle { .. } => Err(io::Error::other("No seek operation in progress")), - ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")), - ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), + let mut guard = this.state.lock().unwrap(); + Poll::Ready(match std::mem::take(guard.deref_mut()) { + ReaderState::Seeking { position } => { + *guard = ReaderState::Idle { position }; + Ok(position) + } + ReaderState::Idle { position } => { + // seek calls poll_complete just in case, to finish a pending seek operation + // before the next seek operation. So it is poll_complete/start_seek/poll_complete + *guard = ReaderState::Idle { position }; + Ok(position) + } + ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")), + ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), + }) + } +} + +#[cfg(test)] +mod tests { + use bao_tree::ChunkRanges; + use testresult::TestResult; + use tokio::io::{AsyncReadExt, AsyncSeekExt}; + + use super::*; + use crate::{ + store::{ + fs::{ + tests::{create_n0_bao, test_data, INTERESTING_SIZES}, + FsStore, }, - ) + mem::MemStore, + }, + util::ChunkRangesExt, + }; + + async fn reader_smoke(blobs: &Blobs) -> TestResult<()> { + for size in INTERESTING_SIZES { + let data = test_data(size); + let tag = blobs.add_bytes(data.clone()).await?; + // read all + { + let mut reader = blobs.reader(tag.hash); + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await?; + assert_eq!(buf, data); + let pos = reader.stream_position().await?; + assert_eq!(pos, data.len() as u64); + } + // seek to mid and read all + { + let mut reader = blobs.reader(tag.hash); + let mid = size / 2; + reader.seek(SeekFrom::Start(mid as u64)).await?; + let mut buf = Vec::new(); + reader.read_to_end(&mut buf).await?; + assert_eq!(buf, data[mid..].to_vec()); + let pos = reader.stream_position().await?; + assert_eq!(pos, data.len() as u64); + } + } + Ok(()) + } + + async fn reader_partial(blobs: &Blobs) -> TestResult<()> { + for size in INTERESTING_SIZES { + let data = test_data(size); + let ranges = ChunkRanges::chunk(0); + let (hash, bao) = create_n0_bao(&data, &ranges)?; + println!("importing {} bytes", bao.len()); + blobs.import_bao_bytes(hash, ranges.clone(), bao).await?; + // read the first chunk or the entire blob, whatever is smaller + // this should work! + { + let mut reader = blobs.reader(hash); + let valid = size.min(1024); + let mut buf = vec![0u8; valid]; + reader.read_exact(&mut buf).await?; + assert_eq!(buf, data[..valid]); + let pos = reader.stream_position().await?; + assert_eq!(pos, valid as u64); + } + if size > 1024 { + // read the part we don't have - should immediately return an error + { + let mut reader = blobs.reader(hash); + let mut rest = vec![0u8; size - 1024]; + reader.seek(SeekFrom::Start(1024)).await?; + let res = reader.read_exact(&mut rest).await; + assert!(res.is_err()); + } + // read crossing the end of the blob - should return an error despite + // the first bytes being valid. + // A read that fails should not update the stream position. + { + let mut reader = blobs.reader(hash); + let mut buf = vec![0u8; size]; + let res = reader.read(&mut buf).await; + assert!(res.is_err()); + let pos = reader.stream_position().await?; + assert_eq!(pos, 0); + } + } + } + Ok(()) + } + + #[tokio::test] + async fn reader_partial_fs() -> TestResult<()> { + let testdir = tempfile::tempdir()?; + let store = FsStore::load(testdir.path().to_owned()).await?; + // reader_smoke_raw(store.blobs()).await?; + reader_partial(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn reader_partial_memory() -> TestResult<()> { + let store = MemStore::new(); + reader_partial(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn reader_smoke_fs() -> TestResult<()> { + let testdir = tempfile::tempdir()?; + let store = FsStore::load(testdir.path().to_owned()).await?; + // reader_smoke_raw(store.blobs()).await?; + reader_smoke(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn reader_smoke_memory() -> TestResult<()> { + let store = MemStore::new(); + reader_smoke(store.blobs()).await?; + Ok(()) } } diff --git a/src/store/fs.rs b/src/store/fs.rs index 024d9786..b0c1eb60 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -989,7 +989,7 @@ async fn export_ranges_impl( ) -> io::Result<()> { let ExportRangesRequest { ranges, hash } = cmd; trace!( - "exporting ranges: {hash} {ranges:?} size={}", + "export_ranges: exporting ranges: {hash} {ranges:?} size={}", handle.current_size()? ); debug_assert!(handle.hash() == hash, "hash mismatch"); @@ -1012,11 +1012,9 @@ async fn export_ranges_impl( loop { let end: u64 = (offset + bs).min(range.end); let size = (end - offset) as usize; - tx.send(ExportRangesItem::Data(Leaf { - offset, - data: data.read_bytes_at(offset, size)?, - })) - .await?; + let res = data.read_bytes_at(offset, size); + tx.send(ExportRangesItem::Data(Leaf { offset, data: res? })) + .await?; offset = end; if offset >= range.end { break; From 2d388a2041e21a0b87e0882ce5735c18e3049f78 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 22 Jul 2025 15:48:25 +0300 Subject: [PATCH 3/7] remove unused mutex --- src/api/blobs/reader.rs | 44 ++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/api/blobs/reader.rs b/src/api/blobs/reader.rs index 54fae35e..13869a51 100644 --- a/src/api/blobs/reader.rs +++ b/src/api/blobs/reader.rs @@ -1,8 +1,6 @@ use std::{ io::{self, ErrorKind, SeekFrom}, - ops::DerefMut, pin::Pin, - sync::{Arc, Mutex}, task::{Context, Poll}, }; @@ -17,7 +15,7 @@ use crate::api::{ pub struct Reader { blobs: Blobs, options: ReaderOptions, - state: Arc>, + state: ReaderState, } #[derive(Default, derive_more::Debug)] @@ -42,7 +40,7 @@ impl Reader { Self { blobs, options, - state: Arc::new(Mutex::new(ReaderState::Idle { position: 0 })), + state: ReaderState::Idle { position: 0 }, } } } @@ -56,8 +54,8 @@ impl tokio::io::AsyncRead for Reader { let this = self.get_mut(); let mut position1 = None; loop { - let mut guard = this.state.lock().unwrap(); - match std::mem::take(guard.deref_mut()) { + let guard = &mut this.state; + match std::mem::take(guard) { ReaderState::Idle { position } => { // todo: read until next page boundary instead of fixed size let len = buf.remaining() as u64; @@ -78,6 +76,9 @@ impl tokio::io::AsyncRead for Reader { ReaderState::Reading { position, mut op } => { let position1 = position1.get_or_insert(position); match op.poll_next(cx) { + Poll::Ready(Some(ExportRangesItem::Size(_))) => { + *guard = ReaderState::Reading { position, op }; + } Poll::Ready(Some(ExportRangesItem::Data(data))) => { if data.offset != *position1 { break Poll::Ready(Err(io::Error::other( @@ -96,13 +97,9 @@ impl tokio::io::AsyncRead for Reader { } Poll::Ready(Some(ExportRangesItem::Error(err))) => { *guard = ReaderState::Idle { position }; - break Poll::Ready(Err(io::Error::other( - format!("Error reading data: {err}"), - ))); - } - Poll::Ready(Some(ExportRangesItem::Size(_size))) => { - // put back the state and continue reading - *guard = ReaderState::Reading { position, op }; + break Poll::Ready(Err(io::Error::other(format!( + "Error reading data: {err}" + )))); } Poll::Ready(None) => { // done with the stream, go back in idle. @@ -134,10 +131,9 @@ impl tokio::io::AsyncRead for Reader { } } state @ ReaderState::Seeking { .. } => { - *this.state.lock().unwrap() = state; - break Poll::Ready(Err(io::Error::other( - "Can't read while seeking", - ))); + // should I try to recover from this or just keep it poisoned? + this.state = state; + break Poll::Ready(Err(io::Error::other("Can't read while seeking"))); } ReaderState::Poisoned => { break Poll::Ready(Err(io::Error::other("Reader is poisoned"))); @@ -153,8 +149,8 @@ impl tokio::io::AsyncSeek for Reader { seek_from: tokio::io::SeekFrom, ) -> io::Result<()> { let this = self.get_mut(); - let mut guard = this.state.lock().unwrap(); - match std::mem::take(guard.deref_mut()) { + let guard = &mut this.state; + match std::mem::take(guard) { ReaderState::Idle { position } => { let position1 = match seek_from { SeekFrom::Start(pos) => pos, @@ -187,8 +183,8 @@ impl tokio::io::AsyncSeek for Reader { fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { let this = self.get_mut(); - let mut guard = this.state.lock().unwrap(); - Poll::Ready(match std::mem::take(guard.deref_mut()) { + let guard = &mut this.state; + Poll::Ready(match std::mem::take(guard) { ReaderState::Seeking { position } => { *guard = ReaderState::Idle { position }; Ok(position) @@ -199,7 +195,11 @@ impl tokio::io::AsyncSeek for Reader { *guard = ReaderState::Idle { position }; Ok(position) } - ReaderState::Reading { .. } => Err(io::Error::other("Can't seek while reading")), + state @ ReaderState::Reading { .. } => { + // should I try to recover from this or just keep it poisoned? + *guard = state; + Err(io::Error::other("Can't seek while reading")) + } ReaderState::Poisoned => Err(io::Error::other("Reader is poisoned")), }) } From b2b0216bf8f39fa77fdc21aaa54faafb11142fed Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 22 Jul 2025 15:49:33 +0300 Subject: [PATCH 4/7] Rename Reader to BlobReader --- src/api/blobs.rs | 8 ++++---- src/api/blobs/reader.rs | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/api/blobs.rs b/src/api/blobs.rs index 69169582..a886f0fa 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize}; use tokio::io::AsyncWriteExt; use tracing::trace; mod reader; -pub use reader::Reader; +pub use reader::BlobReader; // Public reexports from the proto module. // @@ -105,12 +105,12 @@ impl Blobs { }) } - pub fn reader(&self, hash: impl Into) -> Reader { + pub fn reader(&self, hash: impl Into) -> BlobReader { self.reader_with_opts(ReaderOptions { hash: hash.into() }) } - pub fn reader_with_opts(&self, options: ReaderOptions) -> Reader { - Reader::new(self.clone(), options) + pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader { + BlobReader::new(self.clone(), options) } /// Delete a blob. diff --git a/src/api/blobs/reader.rs b/src/api/blobs/reader.rs index 13869a51..5e386343 100644 --- a/src/api/blobs/reader.rs +++ b/src/api/blobs/reader.rs @@ -12,7 +12,7 @@ use crate::api::{ }; #[derive(Debug)] -pub struct Reader { +pub struct BlobReader { blobs: Blobs, options: ReaderOptions, state: ReaderState, @@ -35,7 +35,7 @@ enum ReaderState { Poisoned, } -impl Reader { +impl BlobReader { pub fn new(blobs: Blobs, options: ReaderOptions) -> Self { Self { blobs, @@ -45,7 +45,7 @@ impl Reader { } } -impl tokio::io::AsyncRead for Reader { +impl tokio::io::AsyncRead for BlobReader { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, @@ -143,7 +143,7 @@ impl tokio::io::AsyncRead for Reader { } } -impl tokio::io::AsyncSeek for Reader { +impl tokio::io::AsyncSeek for BlobReader { fn start_seek( self: std::pin::Pin<&mut Self>, seek_from: tokio::io::SeekFrom, From b2658b6a7f9b4246d820f32b6208ab597ecd7056 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 22 Jul 2025 16:41:03 +0300 Subject: [PATCH 5/7] Add example how to use Blobs::reader --- src/api.rs | 2 +- src/api/blobs.rs | 24 ++++++++++++++++++++++++ src/api/blobs/reader.rs | 3 +-- 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/api.rs b/src/api.rs index 2296c3d7..56527994 100644 --- a/src/api.rs +++ b/src/api.rs @@ -4,7 +4,7 @@ //! with a remote store via rpc calls. //! //! The entry point for the api is the [`Store`] struct. There are several ways -//! to obtain a `Store` instance: it is available via [`Deref`](std::ops::Deref) +//! to obtain a `Store` instance: it is available via [`Deref`] //! from the different store implementations //! (e.g. [`MemStore`](crate::store::mem::MemStore) //! and [`FsStore`](crate::store::fs::FsStore)) as well as on the diff --git a/src/api/blobs.rs b/src/api/blobs.rs index a886f0fa..d0b94859 100644 --- a/src/api/blobs.rs +++ b/src/api/blobs.rs @@ -105,10 +105,34 @@ impl Blobs { }) } + /// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`] + /// and therefore can be used to read the blob's content. + /// + /// Any access to parts of the blob that are not present will result in an error. + /// + /// Example: + /// ```rust + /// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs}; + /// use tokio::io::AsyncReadExt; + /// + /// # async fn example() -> anyhow::Result<()> { + /// let store = MemStore::new(); + /// let tag = store.add_slice(b"Hello, world!").await?; + /// let mut reader = store.reader(tag.hash); + /// let mut buf = String::new(); + /// reader.read_to_string(&mut buf).await?; + /// assert_eq!(buf, "Hello, world!"); + /// # Ok(()) + /// } + /// ``` pub fn reader(&self, hash: impl Into) -> BlobReader { self.reader_with_opts(ReaderOptions { hash: hash.into() }) } + /// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`] + /// and therefore can be used to read the blob's content. + /// + /// Any access to parts of the blob that are not present will result in an error. pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader { BlobReader::new(self.clone(), options) } diff --git a/src/api/blobs/reader.rs b/src/api/blobs/reader.rs index 5e386343..e221b7cd 100644 --- a/src/api/blobs/reader.rs +++ b/src/api/blobs/reader.rs @@ -11,6 +11,7 @@ use crate::api::{ proto::ExportRangesItem, }; +/// A reader for blobs that implements `AsyncRead` and `AsyncSeek`. #[derive(Debug)] pub struct BlobReader { blobs: Blobs, @@ -298,7 +299,6 @@ mod tests { async fn reader_partial_fs() -> TestResult<()> { let testdir = tempfile::tempdir()?; let store = FsStore::load(testdir.path().to_owned()).await?; - // reader_smoke_raw(store.blobs()).await?; reader_partial(store.blobs()).await?; Ok(()) } @@ -314,7 +314,6 @@ mod tests { async fn reader_smoke_fs() -> TestResult<()> { let testdir = tempfile::tempdir()?; let store = FsStore::load(testdir.path().to_owned()).await?; - // reader_smoke_raw(store.blobs()).await?; reader_smoke(store.blobs()).await?; Ok(()) } From 709b14e6f8d969160cd474e7f71ebd9dd89a8311 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 22 Jul 2025 17:19:48 +0300 Subject: [PATCH 6/7] Allow getting the hash for a reader, and make the constructor pub(super). --- src/api/blobs/reader.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/api/blobs/reader.rs b/src/api/blobs/reader.rs index e221b7cd..e15e374d 100644 --- a/src/api/blobs/reader.rs +++ b/src/api/blobs/reader.rs @@ -6,9 +6,12 @@ use std::{ use n0_future::StreamExt; -use crate::api::{ - blobs::{Blobs, ReaderOptions}, - proto::ExportRangesItem, +use crate::{ + api::{ + blobs::{Blobs, ReaderOptions}, + proto::ExportRangesItem, + }, + Hash, }; /// A reader for blobs that implements `AsyncRead` and `AsyncSeek`. @@ -37,13 +40,17 @@ enum ReaderState { } impl BlobReader { - pub fn new(blobs: Blobs, options: ReaderOptions) -> Self { + pub(super) fn new(blobs: Blobs, options: ReaderOptions) -> Self { Self { blobs, options, state: ReaderState::Idle { position: 0 }, } } + + pub fn hash(&self) -> &Hash { + &self.options.hash + } } impl tokio::io::AsyncRead for BlobReader { From 5fc308eb292ec8b7537f24eebf256cd2de69038d Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Tue, 22 Jul 2025 17:56:47 +0300 Subject: [PATCH 7/7] reduce parallelism for cross tests. Maybe it helps with the memory allocation failures --- .github/workflows/ci.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index b4e086b2..7f7ed5dc 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -143,7 +143,7 @@ jobs: - uses: taiki-e/install-action@cross - name: test - run: cross test --all --target ${{ matrix.target }} -- --test-threads=12 + run: cross test --all --target ${{ matrix.target }} -- --test-threads=4 env: RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }}