Skip to content

feat: Read and seek #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ jobs:
- uses: taiki-e/install-action@cross

- name: test
run: cross test --all --target ${{ matrix.target }} -- --test-threads=12
run: cross test --all --target ${{ matrix.target }} -- --test-threads=4
env:
RUST_LOG: ${{ runner.debug && 'TRACE' || 'DEBUG' }}

Expand Down
2 changes: 1 addition & 1 deletion src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! with a remote store via rpc calls.
//!
//! The entry point for the api is the [`Store`] struct. There are several ways
//! to obtain a `Store` instance: it is available via [`Deref`](std::ops::Deref)
//! to obtain a `Store` instance: it is available via [`Deref`]
//! from the different store implementations
//! (e.g. [`MemStore`](crate::store::mem::MemStore)
//! and [`FsStore`](crate::store::fs::FsStore)) as well as on the
Expand Down
43 changes: 42 additions & 1 deletion src/api/blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ use n0_future::{future, stream, Stream, StreamExt};
use quinn::SendStream;
use range_collections::{range_set::RangeSetRange, RangeSet2};
use ref_cast::RefCast;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tracing::trace;
mod reader;
pub use reader::BlobReader;

// Public reexports from the proto module.
//
Expand Down Expand Up @@ -102,6 +105,38 @@ impl Blobs {
})
}

/// Create a reader for the given hash. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
/// and therefore can be used to read the blob's content.
///
/// Any access to parts of the blob that are not present will result in an error.
///
/// Example:
/// ```rust
/// use iroh_blobs::{store::mem::MemStore, api::blobs::Blobs};
/// use tokio::io::AsyncReadExt;
///
/// # async fn example() -> anyhow::Result<()> {
/// let store = MemStore::new();
/// let tag = store.add_slice(b"Hello, world!").await?;
/// let mut reader = store.reader(tag.hash);
/// let mut buf = String::new();
/// reader.read_to_string(&mut buf).await?;
/// assert_eq!(buf, "Hello, world!");
/// # Ok(())
/// }
/// ```
pub fn reader(&self, hash: impl Into<Hash>) -> BlobReader {
self.reader_with_opts(ReaderOptions { hash: hash.into() })
}

/// Create a reader for the given options. The reader implements [`tokio::io::AsyncRead`] and [`tokio::io::AsyncSeek`]
/// and therefore can be used to read the blob's content.
///
/// Any access to parts of the blob that are not present will result in an error.
pub fn reader_with_opts(&self, options: ReaderOptions) -> BlobReader {
BlobReader::new(self.clone(), options)
}

/// Delete a blob.
///
/// This function is not public, because it does not work as expected when called manually,
Expand Down Expand Up @@ -647,6 +682,12 @@ impl<'a> AddProgress<'a> {
}
}

/// Options for an async reader for blobs that supports AsyncRead and AsyncSeek.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReaderOptions {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might think that this options object is a bit much, but who knows, maybe we want to add more options in the future.

One option would be the "magic wait until download" option, e.g. you seek to somewhere where the blob isn't downloaded yet, it waits instead of failing.

Or even, you seek to somewhere where the blob isn't downloaded yet, it triggers a download and waits instead of failing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mark it as non_exhaustive in this case?

pub hash: Hash,
}

/// An observe result. Awaiting this will return the current state.
///
/// Calling [`ObserveProgress::stream`] will return a stream of updates, where
Expand Down Expand Up @@ -856,7 +897,7 @@ impl ExportRangesProgress {
/// range of 0..100, you will get the entire first chunk, 0..1024.
///
/// It is up to the caller to clip the ranges to the requested ranges.
pub async fn stream(self) -> impl Stream<Item = ExportRangesItem> {
pub fn stream(self) -> impl Stream<Item = ExportRangesItem> {
Gen::new(|co| async move {
let mut rx = match self.inner.await {
Ok(rx) => rx,
Expand Down
Loading
Loading