From cc4fa6377e99f87b95bfd3c15115ae8eb90e53f5 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 14:34:31 +0200 Subject: [PATCH 1/6] Add a lazily initialized in mem client for ffi etc. --- src/net_protocol.rs | 4 +++ src/rpc.rs | 78 +++++++++++++++++++++++++++++++++++++++-- src/rpc/client.rs | 5 +++ src/rpc/client/blobs.rs | 5 +++ 4 files changed, 90 insertions(+), 2 deletions(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 68b88f82d..1b5b35a9b 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -36,6 +36,8 @@ pub struct Blobs { downloader: Downloader, batches: tokio::sync::Mutex, endpoint: Endpoint, + #[cfg(feature = "rpc")] + pub(crate) rpc_handler: Arc>, } /// Name used for logging when new node addresses are added from gossip. @@ -107,6 +109,8 @@ impl Blobs { downloader, endpoint, batches: Default::default(), + #[cfg(feature = "rpc")] + rpc_handler: Arc::new(OnceLock::new()), } } diff --git a/src/rpc.rs b/src/rpc.rs index 457691d12..cf829de7a 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -7,8 +7,9 @@ use std::{ use anyhow::anyhow; use client::{ - blobs::{BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption}, + blobs::{self, BlobInfo, BlobStatus, IncompleteBlobInfo, WrapOption}, tags::TagInfo, + MemConnector, }; use futures_buffered::BufferedStreamExt; use futures_lite::StreamExt; @@ -32,7 +33,13 @@ use proto::{ }, Request, RpcError, RpcResult, RpcService, }; -use quic_rpc::server::{ChannelTypes, RpcChannel, RpcServerError}; +use quic_rpc::{ + server::{ChannelTypes, RpcChannel, RpcServerError}, + RpcClient, RpcServer, +}; +use tokio::task::JoinSet; +use tokio_util::task::AbortOnDropHandle; +use tracing::{error, warn}; use crate::{ export::ExportProgress, @@ -56,6 +63,16 @@ const RPC_BLOB_GET_CHUNK_SIZE: usize = 1024 * 64; const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; impl Blobs { + /// Get a client for the blobs protocol + pub fn client(self: Arc) -> blobs::Client { + let client = self + .rpc_handler + .get_or_init(|| RpcHandler::new(&self)) + .client + .clone(); + blobs::Client::new(client) + } + /// Handle an RPC request pub async fn handle_rpc_request( self: Arc, @@ -871,3 +888,60 @@ impl Blobs { Ok(CreateCollectionResponse { hash, tag }) } } + +#[derive(Debug)] +pub(crate) struct RpcHandler { + /// Client to hand out + client: RpcClient, + /// Handler task + _handler: AbortOnDropHandle<()>, +} + +impl RpcHandler { + fn new(blobs: &Arc>) -> Self { + let blobs = blobs.clone(); + let (listener, connector) = quic_rpc::transport::flume::channel(1); + let listener = RpcServer::new(listener); + let client = RpcClient::new(connector); + let task = tokio::spawn(async move { + let mut tasks = JoinSet::new(); + loop { + tokio::select! { + Some(res) = tasks.join_next(), if !tasks.is_empty() => { + if let Err(e) = res { + if e.is_panic() { + error!("Panic handling RPC request: {e}"); + } + } + } + req = listener.accept() => { + let req = match req { + Ok(req) => req, + Err(e) => { + warn!("Error accepting RPC request: {e}"); + continue; + } + }; + let blobs = blobs.clone(); + tasks.spawn(async move { + let (req, client) = match req.read_first().await { + Ok((req, client)) => (req, client), + Err(e) => { + warn!("Error reading first message: {e}"); + return; + } + }; + if let Err(cause) = blobs.handle_rpc_request(req, client).await { + warn!("Error handling RPC request: {:?}", cause); + } + }); + } + } + } + }); + Self { + client, + _handler: AbortOnDropHandle::new(task), + } + } +} diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 4b11fdc19..a2450f496 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -1,10 +1,15 @@ //! Iroh blobs and tags client use anyhow::Result; use futures_util::{Stream, StreamExt}; +use quic_rpc::transport::flume::FlumeConnector; pub mod blobs; pub mod tags; +/// Type alias for a memory-backed client. +pub(crate) type MemConnector = + FlumeConnector; + fn flatten( s: impl Stream, E2>>, ) -> impl Stream> diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 36e37b61c..eb85bc2b8 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -120,6 +120,11 @@ where Self { rpc } } + /// Get a tags client. + pub fn tags(&self) -> tags::Client { + tags::Client::new(self.rpc.clone()) + } + /// Check if a blob is completely stored on the node. /// /// Note that this will return false for blobs that are partially stored on From b0d5f59fb3fffff97f8fc5249d9563588b54d944 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 14:47:21 +0200 Subject: [PATCH 2/6] Fix imports --- src/net_protocol.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/net_protocol.rs b/src/net_protocol.rs index 1b5b35a9b..9caff14e1 100644 --- a/src/net_protocol.rs +++ b/src/net_protocol.rs @@ -3,7 +3,10 @@ // TODO: reduce API surface and add documentation #![allow(missing_docs)] -use std::{collections::BTreeMap, sync::Arc}; +use std::{ + collections::BTreeMap, + sync::{Arc, OnceLock}, +}; use anyhow::{anyhow, Result}; use futures_lite::future::Boxed as BoxedFuture; From 8d8fb44dbd58c82a7ad3707ee046187d4479cdf5 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 14:50:32 +0200 Subject: [PATCH 3/6] fix cargo deny --- deny.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/deny.toml b/deny.toml index f5669dbf3..050bf2b46 100644 --- a/deny.toml +++ b/deny.toml @@ -34,6 +34,7 @@ license-files = [ [advisories] ignore = [ "RUSTSEC-2024-0370", # unmaintained, no upgrade available + "RUSTSEC-2024-0384", # unmaintained, no upgrade available ] [sources] From 80327e48c73f5b9bb97587c42e7106ff8c3bc2bd Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 18:23:05 +0200 Subject: [PATCH 4/6] use spawn_accept_loop --- Cargo.lock | 4 ++-- Cargo.toml | 1 + src/downloader.rs | 1 - src/protocol.rs | 13 +++++++------ src/rpc.rs | 47 ++++------------------------------------------- src/util/fs.rs | 1 - 6 files changed, 14 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ca1acf205..056333f1d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3134,8 +3134,7 @@ dependencies = [ [[package]] name = "quic-rpc" version = "0.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e131f594054d27d077162815db3b5e9ddd76a28fbb9091b68095971e75c286" +source = "git+https://github.com/n0-computer/quic-rpc?branch=main#32d5bc1a08609f4f0b5650980088f07d81971a55" dependencies = [ "anyhow", "derive_more", @@ -3149,6 +3148,7 @@ dependencies = [ "serde", "slab", "tokio", + "tokio-util", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 3324f1f5e..beb6d1680 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -131,3 +131,4 @@ iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } +quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" } diff --git a/src/downloader.rs b/src/downloader.rs index 90f3cd0b5..b9cd2df19 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -645,7 +645,6 @@ impl, D: Dialer> Service { } /// Handle receiving a [`Message`]. - /// // This is called in the actor loop, and only async because subscribing to an existing transfer // sends the initial state. async fn handle_message(&mut self, msg: Message) { diff --git a/src/protocol.rs b/src/protocol.rs index 9f24b7217..da0995f5f 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -148,7 +148,8 @@ //! # use bao_tree::{ChunkNum, ChunkRanges}; //! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq}; //! # let hash: iroh_blobs::Hash = [0; 32].into(); -//! let ranges = &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110)); +//! let ranges = +//! &ChunkRanges::from(..ChunkNum(10)) | &ChunkRanges::from(ChunkNum(100)..ChunkNum(110)); //! let spec = RangeSpecSeq::from_ranges([ranges]); //! let request = GetRequest::new(hash, spec); //! ``` @@ -236,8 +237,8 @@ //! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq}; //! # let hash: iroh_blobs::Hash = [0; 32].into(); //! let spec = RangeSpecSeq::from_ranges_infinite([ -//! ChunkRanges::all(), // the collection itself -//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child +//! ChunkRanges::all(), // the collection itself +//! ChunkRanges::from(..ChunkNum(1)), // the first chunk of each child //! ]); //! let request = GetRequest::new(hash, spec); //! ``` @@ -252,9 +253,9 @@ //! # use iroh_blobs::protocol::{GetRequest, RangeSpecSeq}; //! # let hash: iroh_blobs::Hash = [0; 32].into(); //! let spec = RangeSpecSeq::from_ranges([ -//! ChunkRanges::empty(), // we don't need the collection itself -//! ChunkRanges::empty(), // we don't need the first child either -//! ChunkRanges::all(), // we need the second child completely +//! ChunkRanges::empty(), // we don't need the collection itself +//! ChunkRanges::empty(), // we don't need the first child either +//! ChunkRanges::all(), // we need the second child completely //! ]); //! let request = GetRequest::new(hash, spec); //! ``` diff --git a/src/rpc.rs b/src/rpc.rs index cf829de7a..e250d7819 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -37,9 +37,7 @@ use quic_rpc::{ server::{ChannelTypes, RpcChannel, RpcServerError}, RpcClient, RpcServer, }; -use tokio::task::JoinSet; use tokio_util::task::AbortOnDropHandle; -use tracing::{error, warn}; use crate::{ export::ExportProgress, @@ -892,7 +890,7 @@ impl Blobs { #[derive(Debug)] pub(crate) struct RpcHandler { /// Client to hand out - client: RpcClient, + client: RpcClient, /// Handler task _handler: AbortOnDropHandle<()>, } @@ -903,45 +901,8 @@ impl RpcHandler { let (listener, connector) = quic_rpc::transport::flume::channel(1); let listener = RpcServer::new(listener); let client = RpcClient::new(connector); - let task = tokio::spawn(async move { - let mut tasks = JoinSet::new(); - loop { - tokio::select! { - Some(res) = tasks.join_next(), if !tasks.is_empty() => { - if let Err(e) = res { - if e.is_panic() { - error!("Panic handling RPC request: {e}"); - } - } - } - req = listener.accept() => { - let req = match req { - Ok(req) => req, - Err(e) => { - warn!("Error accepting RPC request: {e}"); - continue; - } - }; - let blobs = blobs.clone(); - tasks.spawn(async move { - let (req, client) = match req.read_first().await { - Ok((req, client)) => (req, client), - Err(e) => { - warn!("Error reading first message: {e}"); - return; - } - }; - if let Err(cause) = blobs.handle_rpc_request(req, client).await { - warn!("Error handling RPC request: {:?}", cause); - } - }); - } - } - } - }); - Self { - client, - _handler: AbortOnDropHandle::new(task), - } + let _handler = listener + .spawn_accept_loop(move |req, chan| blobs.clone().handle_rpc_request(req, chan)); + Self { client, _handler } } } diff --git a/src/util/fs.rs b/src/util/fs.rs index 068ebadc9..6095bc768 100644 --- a/src/util/fs.rs +++ b/src/util/fs.rs @@ -179,7 +179,6 @@ pub struct PathContent { } /// Walks the directory to get the total size and number of files in directory or file -/// // TODO: possible combine with `scan_dir` pub fn path_content_info(path: impl AsRef) -> anyhow::Result { path_content_info0(path) From 20800ddf8ec3bfeaa4d4df9ca92c2e0f75ccd8a6 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 19:12:52 +0200 Subject: [PATCH 5/6] use released quic-rpc --- Cargo.lock | 5 +++-- Cargo.toml | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 056333f1d..8d2af62fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3133,8 +3133,9 @@ dependencies = [ [[package]] name = "quic-rpc" -version = "0.15.0" -source = "git+https://github.com/n0-computer/quic-rpc?branch=main#32d5bc1a08609f4f0b5650980088f07d81971a55" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc623a188942fc875926f7baeb2cb08ed4288b64f29072656eb051e360ee7623" dependencies = [ "anyhow", "derive_more", diff --git a/Cargo.toml b/Cargo.toml index beb6d1680..c27f50da3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,7 +37,7 @@ parking_lot = { version = "0.12.1", optional = true } pin-project = "1.1.5" portable-atomic = { version = "1", optional = true } postcard = { version = "1", default-features = false, features = ["alloc", "use-std", "experimental-derive"] } -quic-rpc = { version = "0.15.0", optional = true } +quic-rpc = { version = "0.15.1", optional = true } quic-rpc-derive = { version = "0.15.0", optional = true } quinn = { package = "iroh-quinn", version = "0.12", features = ["ring"] } rand = "0.8" @@ -131,4 +131,4 @@ iroh-router = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-net = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-metrics = { git = "https://github.com/n0-computer/iroh", branch = "main" } iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" } -quic-rpc = { git = "https://github.com/n0-computer/quic-rpc", branch = "main" } + From 5ce5a0310158323c35c31710e3ed6e5fe954c113 Mon Sep 17 00:00:00 2001 From: Ruediger Klaehn Date: Thu, 14 Nov 2024 19:32:42 +0200 Subject: [PATCH 6/6] Add MemClient type alias --- src/rpc.rs | 2 +- src/rpc/client/blobs.rs | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/rpc.rs b/src/rpc.rs index e250d7819..04f9d00bc 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -62,7 +62,7 @@ const RPC_BLOB_GET_CHANNEL_CAP: usize = 2; impl Blobs { /// Get a client for the blobs protocol - pub fn client(self: Arc) -> blobs::Client { + pub fn client(self: Arc) -> blobs::MemClient { let client = self .rpc_handler .get_or_init(|| RpcHandler::new(&self)) diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index eb85bc2b8..64c4d7056 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -111,6 +111,9 @@ pub struct Client> { pub(super) rpc: RpcClient, } +/// Type alias for a memory-backed client. +pub type MemClient = Client; + impl Client where C: Connector,