diff --git a/Cargo.lock b/Cargo.lock index fb835def430..2e4e4bff740 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2547,7 +2547,7 @@ dependencies = [ [[package]] name = "iroh-blobs" version = "0.27.0" -source = "git+https://github.com/n0-computer/iroh-blobs?branch=main#f87c3bf59e5040382d29d872ba543351a908f500" +source = "git+https://github.com/n0-computer/iroh-blobs?branch=main#378fc0fd8257c0beeb3a655a8378226a630b2d73" dependencies = [ "anyhow", "async-channel", @@ -2565,6 +2565,7 @@ dependencies = [ "iroh-metrics", "iroh-net", "iroh-quinn", + "iroh-router", "num_cpus", "oneshot", "parking_lot", diff --git a/iroh/src/client/blobs.rs b/iroh/src/client/blobs.rs index 3dde9e39a87..7bc4d889ad3 100644 --- a/iroh/src/client/blobs.rs +++ b/iroh/src/client/blobs.rs @@ -74,10 +74,12 @@ use futures_lite::{Stream, StreamExt}; use futures_util::SinkExt; use genawaiter::sync::{Co, Gen}; use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; +pub use iroh_blobs::net_protocol::DownloadMode; use iroh_blobs::{ export::ExportProgress as BytesExportProgress, format::collection::{Collection, SimpleStore}, get::db::DownloadProgress as BytesDownloadProgress, + net_protocol::BlobDownloadRequest, store::{BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ValidateProgress}, util::SetTagOption, BlobFormat, Hash, Tag, @@ -90,6 +92,7 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncReadExt, ReadBuf}; use tokio_util::io::{ReaderStream, StreamReader}; use tracing::warn; + mod batch; pub use batch::{AddDirOpts, AddFileOpts, AddReaderOpts, Batch}; @@ -98,8 +101,8 @@ use crate::rpc_protocol::{ blobs::{ AddPathRequest, AddStreamRequest, AddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BlobStatusRequest, ConsistencyCheckRequest, CreateCollectionRequest, - CreateCollectionResponse, DeleteRequest, DownloadRequest, ExportRequest, - ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse, ValidateRequest, + CreateCollectionResponse, DeleteRequest, ExportRequest, ListIncompleteRequest, ListRequest, + ReadAtRequest, ReadAtResponse, ValidateRequest, }, node::StatusRequest, }; @@ -357,7 +360,7 @@ impl Client { } = opts; let stream = self .rpc - .server_streaming(DownloadRequest { + .server_streaming(BlobDownloadRequest { hash, format, nodes, @@ -980,20 +983,6 @@ pub struct DownloadOptions { pub mode: DownloadMode, } -/// Set the mode for whether to directly start the download or add it to the download queue. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum DownloadMode { - /// Start the download right away. - /// - /// No concurrency limits or queuing will be applied. It is up to the user to manage download - /// concurrency. - Direct, - /// Queue the download. - /// - /// The download queue will be processed in-order, while respecting the downloader concurrency limits. - Queued, -} - #[cfg(test)] mod tests { use iroh_blobs::hashseq::HashSeq; diff --git a/iroh/src/client/blobs/batch.rs b/iroh/src/client/blobs/batch.rs index 4c72fc3a241..9dad9797ba6 100644 --- a/iroh/src/client/blobs/batch.rs +++ b/iroh/src/client/blobs/batch.rs @@ -11,6 +11,7 @@ use futures_lite::StreamExt; use futures_util::{sink::Buffer, FutureExt, SinkExt, Stream}; use iroh_blobs::{ format::collection::Collection, + net_protocol::BatchId, provider::BatchAddPathProgress, store::ImportMode, util::{SetTagOption, TagDrop}, @@ -27,7 +28,7 @@ use crate::{ rpc_protocol::{ blobs::{ BatchAddPathRequest, BatchAddStreamRequest, BatchAddStreamResponse, - BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchId, BatchUpdate, + BatchAddStreamUpdate, BatchCreateTempTagRequest, BatchUpdate, }, tags::{self, SyncMode}, }, diff --git a/iroh/src/node.rs b/iroh/src/node.rs index f665ebcb14a..482a4da2a8c 100644 --- a/iroh/src/node.rs +++ b/iroh/src/node.rs @@ -50,6 +50,7 @@ use futures_lite::StreamExt; use futures_util::future::{MapErr, Shared}; use iroh_base::key::PublicKey; use iroh_blobs::{ + net_protocol::Blobs as BlobsProtocol, store::Store as BaoStore, util::local_pool::{LocalPool, LocalPoolHandle}, }; @@ -59,7 +60,6 @@ use iroh_net::{ AddrInfo, Endpoint, NodeAddr, }; use iroh_router::{ProtocolHandler, Router}; -use protocol::blobs::BlobsProtocol; use quic_rpc::{transport::ServerEndpoint as _, RpcServer}; use tokio::task::{JoinError, JoinSet}; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; diff --git a/iroh/src/node/builder.rs b/iroh/src/node/builder.rs index 4970c61ca14..9599591e02e 100644 --- a/iroh/src/node/builder.rs +++ b/iroh/src/node/builder.rs @@ -11,6 +11,7 @@ use futures_util::{FutureExt as _, TryFutureExt as _}; use iroh_base::key::SecretKey; use iroh_blobs::{ downloader::Downloader, + net_protocol::Blobs as BlobsProtocol, provider::EventSender, store::{Map, Store as BaoStore}, util::local_pool::{self, LocalPool, LocalPoolHandle, PanicMode}, @@ -36,10 +37,7 @@ use tracing::{debug, error_span, trace, Instrument}; use super::{rpc_status::RpcStatus, IrohServerEndpoint, JoinErrToStr, Node, NodeInner}; use crate::{ client::RPC_ALPN, - node::{ - nodes_storage::load_node_addrs, - protocol::{blobs::BlobsProtocol, docs::DocsProtocol}, - }, + node::{nodes_storage::load_node_addrs, protocol::docs::DocsProtocol}, rpc_protocol::RpcService, util::{fs::load_secret_key, path::IrohPaths}, }; diff --git a/iroh/src/node/protocol.rs b/iroh/src/node/protocol.rs index 8096736eb8a..7ad25a44ff7 100644 --- a/iroh/src/node/protocol.rs +++ b/iroh/src/node/protocol.rs @@ -1,2 +1 @@ -pub(crate) mod blobs; pub(crate) mod docs; diff --git a/iroh/src/node/protocol/blobs.rs b/iroh/src/node/protocol/blobs.rs deleted file mode 100644 index 9cca5f14c36..00000000000 --- a/iroh/src/node/protocol/blobs.rs +++ /dev/null @@ -1,268 +0,0 @@ -use std::{collections::BTreeMap, sync::Arc}; - -use anyhow::{anyhow, Result}; -use futures_lite::future::Boxed as BoxedFuture; -use iroh_blobs::{ - downloader::{DownloadRequest, Downloader}, - get::{ - db::{DownloadProgress, GetState}, - Stats, - }, - provider::EventSender, - util::{ - local_pool::LocalPoolHandle, - progress::{AsyncChannelProgressSender, ProgressSender}, - SetTagOption, - }, - HashAndFormat, TempTag, -}; -use iroh_net::{endpoint::Connecting, Endpoint, NodeAddr}; -use iroh_router::ProtocolHandler; -use tracing::{debug, warn}; - -use crate::{ - client::blobs::DownloadMode, - rpc_protocol::blobs::{BatchId, DownloadRequest as BlobDownloadRequest}, -}; - -#[derive(Debug)] -pub(crate) struct BlobsProtocol { - rt: LocalPoolHandle, - store: S, - events: EventSender, - downloader: Downloader, - batches: tokio::sync::Mutex, -} - -/// Name used for logging when new node addresses are added from gossip. -const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download"; - -/// Keeps track of all the currently active batch operations of the blobs api. -#[derive(Debug, Default)] -pub(crate) struct BlobBatches { - /// Currently active batches - batches: BTreeMap, - /// Used to generate new batch ids. - max: u64, -} - -/// A single batch of blob operations -#[derive(Debug, Default)] -struct BlobBatch { - /// The tags in this batch. - tags: BTreeMap>, -} - -impl BlobBatches { - /// Create a new unique batch id. - pub(crate) fn create(&mut self) -> BatchId { - let id = self.max; - self.max += 1; - BatchId(id) - } - - /// Store a temp tag in a batch identified by a batch id. - pub(crate) fn store(&mut self, batch: BatchId, tt: TempTag) { - let entry = self.batches.entry(batch).or_default(); - entry.tags.entry(tt.hash_and_format()).or_default().push(tt); - } - - /// Remove a tag from a batch. - pub(crate) fn remove_one(&mut self, batch: BatchId, content: &HashAndFormat) -> Result<()> { - if let Some(batch) = self.batches.get_mut(&batch) { - if let Some(tags) = batch.tags.get_mut(content) { - tags.pop(); - if tags.is_empty() { - batch.tags.remove(content); - } - return Ok(()); - } - } - // this can happen if we try to upgrade a tag from an expired batch - anyhow::bail!("tag not found in batch"); - } - - /// Remove an entire batch. - pub(crate) fn remove(&mut self, batch: BatchId) { - self.batches.remove(&batch); - } -} - -impl BlobsProtocol { - pub(crate) fn new_with_events( - store: S, - rt: LocalPoolHandle, - events: EventSender, - downloader: Downloader, - ) -> Self { - Self { - rt, - store, - events, - downloader, - batches: Default::default(), - } - } - - pub(crate) fn store(&self) -> &S { - &self.store - } - - pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> { - self.batches.lock().await - } - - pub(crate) async fn download( - &self, - endpoint: Endpoint, - req: BlobDownloadRequest, - progress: AsyncChannelProgressSender, - ) -> Result<()> { - let BlobDownloadRequest { - hash, - format, - nodes, - tag, - mode, - } = req; - let hash_and_format = HashAndFormat { hash, format }; - let temp_tag = self.store.temp_tag(hash_and_format); - let stats = match mode { - DownloadMode::Queued => { - self.download_queued(endpoint, hash_and_format, nodes, progress.clone()) - .await? - } - DownloadMode::Direct => { - self.download_direct_from_nodes(endpoint, hash_and_format, nodes, progress.clone()) - .await? - } - }; - - progress.send(DownloadProgress::AllDone(stats)).await.ok(); - match tag { - SetTagOption::Named(tag) => { - self.store.set_tag(tag, Some(hash_and_format)).await?; - } - SetTagOption::Auto => { - self.store.create_tag(hash_and_format).await?; - } - } - drop(temp_tag); - - Ok(()) - } - - async fn download_queued( - &self, - endpoint: Endpoint, - hash_and_format: HashAndFormat, - nodes: Vec, - progress: AsyncChannelProgressSender, - ) -> Result { - let mut node_ids = Vec::with_capacity(nodes.len()); - let mut any_added = false; - for node in nodes { - node_ids.push(node.node_id); - if !node.info.is_empty() { - endpoint.add_node_addr_with_source(node, BLOB_DOWNLOAD_SOURCE_NAME)?; - any_added = true; - } - } - let can_download = !node_ids.is_empty() && (any_added || endpoint.discovery().is_some()); - anyhow::ensure!(can_download, "no way to reach a node for download"); - let req = DownloadRequest::new(hash_and_format, node_ids).progress_sender(progress); - let handle = self.downloader.queue(req).await; - let stats = handle.await?; - Ok(stats) - } - - #[tracing::instrument("download_direct", skip_all, fields(hash=%hash_and_format.hash.fmt_short()))] - async fn download_direct_from_nodes( - &self, - endpoint: Endpoint, - hash_and_format: HashAndFormat, - nodes: Vec, - progress: AsyncChannelProgressSender, - ) -> Result { - let mut last_err = None; - let mut remaining_nodes = nodes.len(); - let mut nodes_iter = nodes.into_iter(); - 'outer: loop { - match iroh_blobs::get::db::get_to_db_in_steps( - self.store.clone(), - hash_and_format, - progress.clone(), - ) - .await? - { - GetState::Complete(stats) => return Ok(stats), - GetState::NeedsConn(needs_conn) => { - let (conn, node_id) = 'inner: loop { - match nodes_iter.next() { - None => break 'outer, - Some(node) => { - remaining_nodes -= 1; - let node_id = node.node_id; - if node_id == endpoint.node_id() { - debug!( - ?remaining_nodes, - "skip node {} (it is the node id of ourselves)", - node_id.fmt_short() - ); - continue 'inner; - } - match endpoint.connect(node, iroh_blobs::protocol::ALPN).await { - Ok(conn) => break 'inner (conn, node_id), - Err(err) => { - debug!( - ?remaining_nodes, - "failed to connect to {}: {err}", - node_id.fmt_short() - ); - continue 'inner; - } - } - } - } - }; - match needs_conn.proceed(conn).await { - Ok(stats) => return Ok(stats), - Err(err) => { - warn!( - ?remaining_nodes, - "failed to download from {}: {err}", - node_id.fmt_short() - ); - last_err = Some(err); - } - } - } - } - } - match last_err { - Some(err) => Err(err.into()), - None => Err(anyhow!("No nodes to download from provided")), - } - } -} - -impl ProtocolHandler for BlobsProtocol { - fn accept(self: Arc, conn: Connecting) -> BoxedFuture> { - Box::pin(async move { - iroh_blobs::provider::handle_connection( - conn.await?, - self.store.clone(), - self.events.clone(), - self.rt.clone(), - ) - .await; - Ok(()) - }) - } - - fn shutdown(self: Arc) -> BoxedFuture<()> { - Box::pin(async move { - self.store.shutdown().await; - }) - } -} diff --git a/iroh/src/node/rpc.rs b/iroh/src/node/rpc.rs index fbfb0b23d13..a4f32fb0707 100644 --- a/iroh/src/node/rpc.rs +++ b/iroh/src/node/rpc.rs @@ -14,6 +14,7 @@ use iroh_blobs::{ export::ExportProgress, format::collection::Collection, get::db::DownloadProgress, + net_protocol::{BlobDownloadRequest, Blobs as BlobsProtocol}, provider::{AddProgress, BatchAddPathProgress}, store::{ ConsistencyCheckProgress, ExportFormat, ImportProgress, MapEntry, Store as BaoStore, @@ -43,10 +44,7 @@ use crate::{ tags::TagInfo, NodeStatus, }, - node::{ - protocol::{blobs::BlobsProtocol, docs::DocsProtocol}, - NodeInner, - }, + node::{protocol::docs::DocsProtocol, NodeInner}, rpc_protocol::{ authors, blobs::{ @@ -55,9 +53,8 @@ use crate::{ BatchAddStreamResponse, BatchAddStreamUpdate, BatchCreateRequest, BatchCreateResponse, BatchCreateTempTagRequest, BatchUpdate, BlobStatusRequest, BlobStatusResponse, ConsistencyCheckRequest, CreateCollectionRequest, CreateCollectionResponse, - DeleteRequest, DownloadRequest as BlobDownloadRequest, DownloadResponse, ExportRequest, - ExportResponse, ListIncompleteRequest, ListRequest, ReadAtRequest, ReadAtResponse, - ValidateRequest, + DeleteRequest, DownloadResponse, ExportRequest, ExportResponse, ListIncompleteRequest, + ListRequest, ReadAtRequest, ReadAtResponse, ValidateRequest, }, docs::{ ExportFileRequest, ExportFileResponse, ImportFileRequest, ImportFileResponse, diff --git a/iroh/src/rpc_protocol/blobs.rs b/iroh/src/rpc_protocol/blobs.rs index e7fd876f121..b27c83809b9 100644 --- a/iroh/src/rpc_protocol/blobs.rs +++ b/iroh/src/rpc_protocol/blobs.rs @@ -6,6 +6,7 @@ use iroh_blobs::{ export::ExportProgress, format::collection::Collection, get::db::DownloadProgress, + net_protocol::{BatchId, BlobDownloadRequest}, provider::{AddProgress, BatchAddPathProgress}, store::{ BaoBlobSize, ConsistencyCheckProgress, ExportFormat, ExportMode, ImportMode, @@ -14,16 +15,13 @@ use iroh_blobs::{ util::SetTagOption, BlobFormat, HashAndFormat, Tag, }; -use iroh_net::NodeAddr; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; use super::RpcService; use crate::{ - client::blobs::{ - BlobInfo, BlobStatus, DownloadMode, IncompleteBlobInfo, ReadAtLen, WrapOption, - }, + client::blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, ReadAtLen, WrapOption}, node::{RpcError, RpcResult}, }; @@ -40,7 +38,7 @@ pub enum Request { #[server_streaming(response = AddPathResponse)] AddPath(AddPathRequest), #[server_streaming(response = DownloadResponse)] - Download(DownloadRequest), + Download(BlobDownloadRequest), #[server_streaming(response = ExportResponse)] Export(ExportRequest), #[server_streaming(response = RpcResult)] @@ -114,28 +112,7 @@ pub struct AddPathRequest { #[derive(Debug, Serialize, Deserialize, derive_more::Into)] pub struct AddPathResponse(pub AddProgress); -/// A request to the node to download and share the data specified by the hash. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DownloadRequest { - /// This mandatory field contains the hash of the data to download and share. - pub hash: Hash, - /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as - /// well. - pub format: BlobFormat, - /// This mandatory field specifies the nodes to download the data from. - /// - /// If set to more than a single node, they will all be tried. If `mode` is set to - /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. - /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, - /// if the concurrency limits permit. - pub nodes: Vec, - /// Optional tag to tag the data with. - pub tag: SetTagOption, - /// Whether to directly start the download or add it to the download queue. - pub mode: DownloadMode, -} - -/// Progress response for [`DownloadRequest`] +/// Progress response for [`BlobDownloadRequest`] #[derive(Debug, Clone, Serialize, Deserialize, derive_more::From, derive_more::Into)] pub struct DownloadResponse(pub DownloadProgress); @@ -340,6 +317,3 @@ pub struct BatchAddPathRequest { /// Response to a batch add path request #[derive(Serialize, Deserialize, Debug)] pub struct BatchAddPathResponse(pub BatchAddPathProgress); - -#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] -pub struct BatchId(pub(crate) u64); diff --git a/iroh/src/rpc_protocol/tags.rs b/iroh/src/rpc_protocol/tags.rs index ecc20775f2e..34e4edd9ae0 100644 --- a/iroh/src/rpc_protocol/tags.rs +++ b/iroh/src/rpc_protocol/tags.rs @@ -1,9 +1,9 @@ -use iroh_blobs::{HashAndFormat, Tag}; +use iroh_blobs::{net_protocol::BatchId, HashAndFormat, Tag}; use nested_enum_utils::enum_conversions; use quic_rpc_derive::rpc_requests; use serde::{Deserialize, Serialize}; -use super::{blobs::BatchId, RpcService}; +use super::RpcService; use crate::{client::tags::TagInfo, node::RpcResult}; #[allow(missing_docs)]