From 6ad9311905e4d0b3c1e313b7b4ebf32225644746 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Wed, 6 Nov 2024 17:05:52 +0200 Subject: [PATCH] add net_protocol feature and mane some things private --- Cargo.toml | 3 +- src/lib.rs | 4 +- src/net_protocol.rs | 89 +++++++++++++++++++++-------------------- src/rpc.rs | 10 +++-- src/rpc/client/blobs.rs | 1 + 5 files changed, 56 insertions(+), 51 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0d2753f49..3324f1f5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,8 +76,9 @@ futures-util = "0.3.30" testdir = "0.9.1" [features] -default = ["fs-store", "rpc"] +default = ["fs-store", "rpc", "net_protocol"] downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"] +net_protocol = ["downloader"] fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"] metrics = ["iroh-metrics/metrics"] redb = ["dep:redb"] diff --git a/src/lib.rs b/src/lib.rs index 8b561c504..9a8b3c31a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -35,8 +35,8 @@ pub mod format; pub mod get; pub mod hashseq; pub mod metrics; -#[cfg(feature = "downloader")] -#[cfg_attr(iroh_docsrs, doc(cfg(feature = "downloader")))] +#[cfg(feature = "net_protocol")] +#[cfg_attr(iroh_docsrs, doc(cfg(feature = "net_protocol")))] pub mod net_protocol; pub mod protocol; pub mod provider; diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 917dc2f07..68b88f82d 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -28,44 +28,6 @@ use crate::{ HashAndFormat, TempTag, }; -#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] -pub struct BatchId(u64); - -/// A request to the node to download and share the data specified by the hash. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BlobDownloadRequest { - /// This mandatory field contains the hash of the data to download and share. - pub hash: Hash, - /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as - /// well. - pub format: BlobFormat, - /// This mandatory field specifies the nodes to download the data from. - /// - /// If set to more than a single node, they will all be tried. If `mode` is set to - /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. - /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, - /// if the concurrency limits permit. - pub nodes: Vec, - /// Optional tag to tag the data with. - pub tag: SetTagOption, - /// Whether to directly start the download or add it to the download queue. - pub mode: DownloadMode, -} - -/// Set the mode for whether to directly start the download or add it to the download queue. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum DownloadMode { - /// Start the download right away. - /// - /// No concurrency limits or queuing will be applied. It is up to the user to manage download - /// concurrency. - Direct, - /// Queue the download. - /// - /// The download queue will be processed in-order, while respecting the downloader concurrency limits. - Queued, -} - #[derive(Debug)] pub struct Blobs { rt: LocalPoolHandle, @@ -81,7 +43,7 @@ const BLOB_DOWNLOAD_SOURCE_NAME: &str = "blob_download"; /// Keeps track of all the currently active batch operations of the blobs api. #[derive(Debug, Default)] -pub struct BlobBatches { +pub(crate) struct BlobBatches { /// Currently active batches batches: BTreeMap, /// Used to generate new batch ids. @@ -152,19 +114,19 @@ impl Blobs { &self.store } - pub(crate) fn rt(&self) -> LocalPoolHandle { - self.rt.clone() + pub fn rt(&self) -> &LocalPoolHandle { + &self.rt } - pub(crate) fn endpoint(&self) -> &Endpoint { + pub fn endpoint(&self) -> &Endpoint { &self.endpoint } - pub async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> { + pub(crate) async fn batches(&self) -> tokio::sync::MutexGuard<'_, BlobBatches> { self.batches.lock().await } - pub async fn download( + pub(crate) async fn download( &self, endpoint: Endpoint, req: BlobDownloadRequest, @@ -318,3 +280,42 @@ impl ProtocolHandler for Blobs { }) } } + +/// A request to the node to download and share the data specified by the hash. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BlobDownloadRequest { + /// This mandatory field contains the hash of the data to download and share. + pub hash: Hash, + /// If the format is [`BlobFormat::HashSeq`], all children are downloaded and shared as + /// well. + pub format: BlobFormat, + /// This mandatory field specifies the nodes to download the data from. + /// + /// If set to more than a single node, they will all be tried. If `mode` is set to + /// [`DownloadMode::Direct`], they will be tried sequentially until a download succeeds. + /// If `mode` is set to [`DownloadMode::Queued`], the nodes may be dialed in parallel, + /// if the concurrency limits permit. + pub nodes: Vec, + /// Optional tag to tag the data with. + pub tag: SetTagOption, + /// Whether to directly start the download or add it to the download queue. + pub mode: DownloadMode, +} + +/// Set the mode for whether to directly start the download or add it to the download queue. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DownloadMode { + /// Start the download right away. + /// + /// No concurrency limits or queuing will be applied. It is up to the user to manage download + /// concurrency. + Direct, + /// Queue the download. + /// + /// The download queue will be processed in-order, while respecting the downloader concurrency limits. + Queued, +} + +/// Newtype for a batch id +#[derive(Debug, PartialEq, Eq, PartialOrd, Serialize, Deserialize, Ord, Clone, Copy, Hash)] +pub struct BatchId(pub u64); diff --git a/src/rpc.rs b/src/rpc.rs index 7264f8ac5..457691d12 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -73,7 +73,7 @@ impl Blobs { } /// Handle a tags request - pub async fn handle_tags_request( + async fn handle_tags_request( self: Arc, msg: proto::tags::Request, chan: RpcChannel, @@ -91,7 +91,7 @@ impl Blobs { } /// Handle a blobs request - pub async fn handle_blobs_request( + async fn handle_blobs_request( self: Arc, msg: proto::blobs::Request, chan: RpcChannel, @@ -308,7 +308,8 @@ impl Blobs { // provide a little buffer so that we don't slow down the sender let (tx, rx) = async_channel::bounded(32); let tx2 = tx.clone(); - self.rt().spawn_detached(|| async move { + let rt = self.rt().clone(); + rt.spawn_detached(|| async move { if let Err(e) = self.blob_add_from_path0(msg, tx).await { tx2.send(AddProgress::Abort(RpcError::new(&*e))).await.ok(); } @@ -386,7 +387,8 @@ impl Blobs { fn blob_export(self: Arc, msg: ExportRequest) -> impl Stream { let (tx, rx) = async_channel::bounded(1024); let progress = AsyncChannelProgressSender::new(tx); - self.rt().spawn_detached(move || async move { + let rt = self.rt().clone(); + rt.spawn_detached(move || async move { let res = crate::export::export( self.store(), msg.hash, diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 1f7ce7646..36e37b61c 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -106,6 +106,7 @@ use crate::rpc::proto::blobs::{ /// Iroh blobs client. #[derive(Debug, Clone)] +#[repr(transparent)] pub struct Client> { pub(super) rpc: RpcClient, }