diff --git a/Cargo.lock b/Cargo.lock index 28f6736f7..3b8b2d154 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2026,7 +2026,7 @@ checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" [[package]] name = "iroh" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#b59a05621e3721b9ec41ead1f4754a58dc457d19" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" dependencies = [ "anyhow", "async-channel", @@ -2039,6 +2039,7 @@ dependencies = [ "iroh-io", "iroh-metrics", "iroh-net", + "iroh-node-util", "iroh-quinn", "iroh-relay", "iroh-router", @@ -2063,7 +2064,7 @@ dependencies = [ [[package]] name = "iroh-base" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#b59a05621e3721b9ec41ead1f4754a58dc457d19" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" dependencies = [ "aead", "anyhow", @@ -2181,7 +2182,7 @@ dependencies = [ [[package]] name = "iroh-metrics" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#b59a05621e3721b9ec41ead1f4754a58dc457d19" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" dependencies = [ "anyhow", "erased_set", @@ -2201,7 +2202,7 @@ dependencies = [ [[package]] name = "iroh-net" version = "0.28.1" -source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#b59a05621e3721b9ec41ead1f4754a58dc457d19" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" dependencies = [ "anyhow", "axum", @@ -2280,6 +2281,25 @@ dependencies = [ "z32", ] +[[package]] +name = "iroh-node-util" +version = "0.28.0" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" +dependencies = [ + "anyhow", + "futures-lite 2.5.0", + "iroh-net", + "nested_enum_utils", + "quic-rpc", + "quic-rpc-derive", + "serde", + "serde-error", + "strum 0.26.3", + "tempfile", + "tokio", + "tracing", +] + [[package]] name = "iroh-quinn" version = "0.12.0" @@ -2332,7 +2352,7 @@ dependencies = [ [[package]] name = "iroh-relay" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#b59a05621e3721b9ec41ead1f4754a58dc457d19" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" dependencies = [ "anyhow", "base64 0.22.1", @@ -2392,7 +2412,7 @@ dependencies = [ [[package]] name = "iroh-router" version = "0.28.0" -source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#b59a05621e3721b9ec41ead1f4754a58dc457d19" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" dependencies = [ "anyhow", "futures-buffered", @@ -2744,7 +2764,7 @@ dependencies = [ [[package]] name = "netwatch" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#b59a05621e3721b9ec41ead1f4754a58dc457d19" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" dependencies = [ "anyhow", "bytes", @@ -3270,7 +3290,7 @@ checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" [[package]] name = "portmapper" version = "0.1.0" -source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#b59a05621e3721b9ec41ead1f4754a58dc457d19" +source = "git+https://github.com/n0-computer/iroh?branch=refactor-drop-external-protocols#bc8389cf18ac15d216eb8a9d7f02f8a2a775d0ad" dependencies = [ "anyhow", "base64 0.22.1", diff --git a/Cargo.toml b/Cargo.toml index 7821b54c9..c0c40cbf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,7 +97,7 @@ futures-util = "0.3.30" testdir = "0.9.1" [features] -default = ["fs-store", "rpc", "net_protocol"] +default = ["fs-store", "rpc", "net_protocol", "example-iroh"] downloader = ["dep:parking_lot", "tokio-util/time", "dep:hashlink"] net_protocol = ["downloader"] fs-store = ["dep:reflink-copy", "redb", "dep:redb_v1", "dep:tempfile"] @@ -115,8 +115,9 @@ rpc = [ "dep:walkdir", "downloader", ] +test-utils = ["rpc"] -example-iroh = ["dep:iroh", "dep:clap", "dep:indicatif", "dep:console"] +example-iroh = ["dep:iroh", "dep:clap", "dep:indicatif", "dep:console", "test-utils"] [package.metadata.docs.rs] all-features = true diff --git a/examples/hello-world-fetch.rs b/examples/hello-world-fetch.rs index 3c86be600..0bfd7437a 100644 --- a/examples/hello-world-fetch.rs +++ b/examples/hello-world-fetch.rs @@ -3,13 +3,11 @@ //! //! This is using an in memory database and a random node id. //! Run the `provide` example, which will give you instructions on how to run this example. -use std::{env, str::FromStr, sync::Arc}; +use std::{env, str::FromStr}; use anyhow::{bail, ensure, Context, Result}; use iroh::base::ticket::BlobTicket; -use iroh_blobs::{ - downloader::Downloader, net_protocol::Blobs, util::local_pool::LocalPool, BlobFormat, -}; +use iroh_blobs::{test_utils::Node, BlobFormat}; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -37,35 +35,19 @@ async fn main() -> Result<()> { BlobTicket::from_str(&args[1]).context("failed parsing blob ticket\n\nGet a ticket by running the follow command in a separate terminal:\n\n`cargo run --example hello-world-provide`")?; // create a new node - let mut builder = iroh::node::Node::memory().build().await?; - let local_pool = LocalPool::default(); - let store = iroh_blobs::store::mem::Store::new(); - let downloader = Downloader::new( - store.clone(), - builder.endpoint().clone(), - local_pool.handle().clone(), - ); - let blobs = Arc::new(Blobs::new_with_events( - store, - local_pool.handle().clone(), - Default::default(), - downloader, - builder.endpoint().clone(), - )); - let blobs_client = blobs.clone().client(); - builder = builder.accept(iroh_blobs::protocol::ALPN.to_vec(), blobs); - let node = builder.spawn().await?; + let node = Node::memory().build().await?; println!("fetching hash: {}", ticket.hash()); println!("node id: {}", node.node_id()); println!("node listening addresses:"); - let addrs = node.net().node_addr().await?; + let addrs = node.endpoint().node_addr().await?; for addr in addrs.direct_addresses() { println!("\t{:?}", addr); } println!( "node relay server url: {:?}", - node.home_relay() + node.endpoint() + .home_relay() .expect("a default relay url should be provided") .to_string() ); @@ -78,7 +60,8 @@ async fn main() -> Result<()> { // `download` returns a stream of `DownloadProgress` events. You can iterate through these updates to get progress // on the state of your download. - let download_stream = blobs_client + let download_stream = node + .blobs() .download(ticket.hash(), ticket.node_addr().clone()) .await?; @@ -93,7 +76,7 @@ async fn main() -> Result<()> { // Get the content we have just fetched from the iroh database. - let bytes = blobs_client.read_to_bytes(ticket.hash()).await?; + let bytes = node.blobs().read_to_bytes(ticket.hash()).await?; let s = std::str::from_utf8(&bytes).context("unable to parse blob as as utf-8 string")?; println!("{s}"); diff --git a/examples/hello-world-provide.rs b/examples/hello-world-provide.rs index 53fd92a32..13d0a9e09 100644 --- a/examples/hello-world-provide.rs +++ b/examples/hello-world-provide.rs @@ -3,10 +3,7 @@ //! This is using an in memory database and a random node id. //! run this example from the project root: //! $ cargo run --example hello-world-provide -use std::sync::Arc; - use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; -use iroh_blobs::{downloader::Downloader, net_protocol::Blobs, util::local_pool::LocalPool}; use tracing_subscriber::{prelude::*, EnvFilter}; // set the RUST_LOG env var to one of {debug,info,warn} to see logging info @@ -24,30 +21,13 @@ async fn main() -> anyhow::Result<()> { println!("'Hello World' provide example!"); // create a new node - let mut builder = iroh::node::Node::memory().build().await?; - let local_pool = LocalPool::default(); - let store = iroh_blobs::store::mem::Store::new(); - let downloader = Downloader::new( - store.clone(), - builder.endpoint().clone(), - local_pool.handle().clone(), - ); - let blobs = Arc::new(Blobs::new_with_events( - store, - local_pool.handle().clone(), - Default::default(), - downloader, - builder.endpoint().clone(), - )); - let blobs_client = blobs.clone().client(); - builder = builder.accept(iroh_blobs::protocol::ALPN.to_vec(), blobs); - let node = builder.spawn().await?; + let node = iroh_blobs::test_utils::Node::memory().build().await?; // add some data and remember the hash - let res = blobs_client.add_bytes("Hello, world!").await?; + let res = node.blobs().add_bytes("Hello, world!").await?; // create a ticket - let mut addr = node.net().node_addr().await?; + let mut addr = node.endpoint().node_addr().await?; addr.apply_options(AddrInfoOptions::RelayAndAddresses); let ticket = BlobTicket::new(addr, res.hash, res.format)?; diff --git a/examples/local-swarm-discovery.rs b/examples/local-swarm-discovery.rs index 20af4a4bd..50421ac0e 100644 --- a/examples/local-swarm-discovery.rs +++ b/examples/local-swarm-discovery.rs @@ -5,19 +5,15 @@ //! Wait for output that looks like the following: //! $ cargo run --example local_swarm_discovery --features="discovery-local-network" -- connect [NODE_ID] [HASH] -o [FILE_PATH] //! Run that command on another machine in the same local network, replacing [FILE_PATH] to the path on which you want to save the transferred content. -use std::{path::PathBuf, sync::Arc}; +use std::path::PathBuf; use anyhow::ensure; use clap::{Parser, Subcommand}; use iroh::{ base::{hash::Hash, key::SecretKey}, net::{discovery::local_swarm_discovery::LocalSwarmDiscovery, key::PublicKey, NodeAddr}, - node::DiscoveryConfig, -}; -use iroh_blobs::{ - downloader::Downloader, net_protocol::Blobs, rpc::client::blobs::WrapOption, - util::local_pool::LocalPool, }; +use iroh_blobs::{rpc::client::blobs::WrapOption, test_utils::Node}; use tracing_subscriber::{prelude::*, EnvFilter}; use self::progress::show_download_progress; @@ -65,33 +61,14 @@ async fn main() -> anyhow::Result<()> { let key = SecretKey::generate(); let discovery = LocalSwarmDiscovery::new(key.public())?; - let cfg = DiscoveryConfig::Custom(Box::new(discovery)); println!("Starting iroh node with local node discovery..."); - // create a new node - let mut builder = iroh::node::Node::memory() + let endpoint = iroh_net::Endpoint::builder() .secret_key(key) - .node_discovery(cfg) - .relay_mode(iroh_net::RelayMode::Disabled) - .build() - .await?; - let local_pool = LocalPool::default(); - let store = iroh_blobs::store::mem::Store::new(); - let downloader = Downloader::new( - store.clone(), - builder.endpoint().clone(), - local_pool.handle().clone(), - ); - let blobs = Arc::new(Blobs::new_with_events( - store, - local_pool.handle().clone(), - Default::default(), - downloader, - builder.endpoint().clone(), - )); - let blobs_client = blobs.clone().client(); - builder = builder.accept(iroh_blobs::protocol::ALPN.to_vec(), blobs); - let node = builder.spawn().await?; + .discovery(Box::new(discovery)) + .relay_mode(iroh_net::RelayMode::Disabled); + // create a new node + let node = Node::memory().endpoint(endpoint).build().await?; match &cli.command { Commands::Accept { path } => { @@ -102,7 +79,8 @@ async fn main() -> anyhow::Result<()> { } let absolute = path.canonicalize()?; println!("Adding {} as {}...", path.display(), absolute.display()); - let stream = blobs_client + let stream = node + .blobs() .add_from_path( absolute, true, @@ -118,7 +96,8 @@ async fn main() -> anyhow::Result<()> { } Commands::Connect { node_id, hash, out } => { println!("NodeID: {}", node.node_id()); - let mut stream = blobs_client + let mut stream = node + .blobs() .download(*hash, NodeAddr::new(*node_id)) .await?; show_download_progress(*hash, &mut stream).await?; @@ -130,7 +109,8 @@ async fn main() -> anyhow::Result<()> { path.display(), absolute.display() ); - let stream = blobs_client + let stream = node + .blobs() .export( *hash, absolute, diff --git a/src/lib.rs b/src/lib.rs index 886d1d746..172941b51 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -47,6 +47,9 @@ pub mod provider; #[cfg_attr(iroh_docsrs, doc(cfg(feature = "rpc")))] pub mod rpc; pub mod store; +#[cfg(any(test, feature = "test-utils"))] +#[cfg_attr(iroh_docsrs, doc(cfg(any(test, feature = "test-utils"))))] +pub mod test_utils; pub mod util; use bao_tree::BlockSize; diff --git a/src/rpc/client/blobs.rs b/src/rpc/client/blobs.rs index 4d6ce6f32..9e710655f 100644 --- a/src/rpc/client/blobs.rs +++ b/src/rpc/client/blobs.rs @@ -988,168 +988,24 @@ pub struct DownloadOptions { } #[cfg(test)] +#[cfg(feature = "test-utils")] mod tests { use std::{path::Path, time::Duration}; use iroh_base::{node_addr::AddrInfoOptions, ticket::BlobTicket}; use iroh_net::{key::SecretKey, test_utils::DnsPkarrServer, NodeId, RelayMode}; - use node::Node; use rand::RngCore; use testresult::TestResult; use tokio::{io::AsyncWriteExt, sync::mpsc}; use super::*; - use crate::hashseq::HashSeq; - - mod node { - //! An iroh node that just has the blobs transport - use std::{path::Path, sync::Arc}; - - use iroh_net::{Endpoint, NodeAddr, NodeId}; - use iroh_router::Router; - use tokio_util::task::AbortOnDropHandle; - - use super::RpcService; - use crate::{ - downloader::Downloader, - net_protocol::Blobs, - provider::{CustomEventSender, EventSender}, - rpc::client::{blobs, tags}, - util::local_pool::LocalPool, - }; - - type RpcClient = quic_rpc::RpcClient; - - /// An iroh node that just has the blobs transport - #[derive(Debug)] - pub struct Node { - router: iroh_router::Router, - client: RpcClient, - _local_pool: LocalPool, - _rpc_task: AbortOnDropHandle<()>, - } - - /// An iroh node builder - #[derive(Debug)] - pub struct Builder { - store: S, - events: EventSender, - endpoint: Option, - } - - impl Builder { - /// Sets the event sender - pub fn blobs_events(self, events: impl CustomEventSender) -> Self { - Self { - events: events.into(), - ..self - } - } - - /// Set an endpoint builder - pub fn endpoint(self, endpoint: iroh_net::endpoint::Builder) -> Self { - Self { - endpoint: Some(endpoint), - ..self - } - } - - /// Spawns the node - pub async fn spawn(self) -> anyhow::Result { - let store = self.store; - let events = self.events; - let endpoint = self - .endpoint - .unwrap_or_else(|| Endpoint::builder().discovery_n0()) - .bind() - .await?; - let local_pool = LocalPool::single(); - let mut router = Router::builder(endpoint.clone()); - - // Setup blobs - let downloader = - Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone()); - let blobs = Arc::new(Blobs::new_with_events( - store.clone(), - local_pool.handle().clone(), - events, - downloader, - endpoint.clone(), - )); - router = router.accept(crate::protocol::ALPN.to_vec(), blobs.clone()); - - // Build the router - let router = router.spawn().await?; - - // Setup RPC - let (internal_rpc, controller) = quic_rpc::transport::flume::channel(32); - let internal_rpc = quic_rpc::RpcServer::new(internal_rpc).boxed(); - let _rpc_task = internal_rpc.spawn_accept_loop(move |msg, chan| { - blobs.clone().handle_rpc_request(msg, chan) - }); - let client = quic_rpc::RpcClient::new(controller).boxed(); - Ok(Node { - router, - client, - _rpc_task, - _local_pool: local_pool, - }) - } - } - - impl Node { - /// Creates a new node with memory storage - pub fn memory() -> Builder { - Builder { - store: crate::store::mem::Store::new(), - events: Default::default(), - endpoint: None, - } - } - - /// Creates a new node with persistent storage - pub async fn persistent( - path: impl AsRef, - ) -> anyhow::Result> { - Ok(Builder { - store: crate::store::fs::Store::load(path).await?, - events: Default::default(), - endpoint: None, - }) - } - - /// Returns the node id - pub fn node_id(&self) -> NodeId { - self.router.endpoint().node_id() - } - - /// Returns the node address - pub async fn node_addr(&self) -> anyhow::Result { - self.router.endpoint().node_addr().await - } - - /// Shuts down the node - pub async fn shutdown(self) -> anyhow::Result<()> { - self.router.shutdown().await - } - - /// Returns an in-memory blobs client - pub fn blobs(&self) -> blobs::Client { - blobs::Client::new(self.client.clone()) - } - - /// Returns an in-memory tags client - pub fn tags(&self) -> tags::Client { - tags::Client::new(self.client.clone()) - } - } - } + use crate::{hashseq::HashSeq, test_utils::Node}; #[tokio::test] async fn test_blob_create_collection() -> Result<()> { let _guard = iroh_test::logging::setup(); - let node = node::Node::memory().spawn().await?; + let node = Node::memory().build().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; @@ -1220,7 +1076,7 @@ mod tests { } // check that "temp" tags have been deleted - let tags: Vec<_> = node.tags().list().await?.try_collect().await?; + let tags: Vec<_> = node.blobs().tags().list().await?.try_collect().await?; assert_eq!(tags.len(), 1); assert_eq!(tags[0].hash, hash); assert_eq!(tags[0].name, tag); @@ -1233,7 +1089,7 @@ mod tests { async fn test_blob_read_at() -> Result<()> { // let _guard = iroh_test::logging::setup(); - let node = node::Node::memory().spawn().await?; + let node = Node::memory().build().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; @@ -1372,7 +1228,7 @@ mod tests { async fn test_blob_get_collection() -> Result<()> { let _guard = iroh_test::logging::setup(); - let node = node::Node::memory().spawn().await?; + let node = Node::memory().build().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; @@ -1438,7 +1294,7 @@ mod tests { async fn test_blob_share() -> Result<()> { let _guard = iroh_test::logging::setup(); - let node = node::Node::memory().spawn().await?; + let node = Node::memory().build().await?; // create temp file let temp_dir = tempfile::tempdir().context("tempdir")?; @@ -1513,16 +1369,10 @@ mod tests { let _guard = iroh_test::logging::setup(); let (node1_events, mut node1_events_r) = BlobEvents::new(16); - let node1 = node::Node::memory() - .blobs_events(node1_events) - .spawn() - .await?; + let node1 = Node::memory().blobs_events(node1_events).build().await?; let (node2_events, mut node2_events_r) = BlobEvents::new(16); - let node2 = node::Node::memory() - .blobs_events(node2_events) - .spawn() - .await?; + let node2 = Node::memory().blobs_events(node2_events).build().await?; let import_outcome = node1.blobs().add_bytes(&b"hello world"[..]).await?; @@ -1577,7 +1427,7 @@ mod tests { async fn test_blob_get_self_existing() -> TestResult<()> { let _guard = iroh_test::logging::setup(); - let node = node::Node::memory().spawn().await?; + let node = Node::memory().build().await?; let node_id = node.node_id(); let blobs = node.blobs(); @@ -1625,7 +1475,7 @@ mod tests { async fn test_blob_get_self_missing() -> TestResult<()> { let _guard = iroh_test::logging::setup(); - let node = node::Node::memory().spawn().await?; + let node = Node::memory().build().await?; let node_id = node.node_id(); let blobs = node.blobs(); @@ -1677,7 +1527,7 @@ mod tests { async fn test_blob_get_existing_collection() -> TestResult<()> { let _guard = iroh_test::logging::setup(); - let node = node::Node::memory().spawn().await?; + let node = Node::memory().build().await?; // We use a nonexisting node id because we just want to check that this succeeds without // hitting the network. let node_id = NodeId::from_bytes(&[0u8; 32])?; @@ -1750,7 +1600,7 @@ mod tests { async fn test_blob_delete_mem() -> Result<()> { let _guard = iroh_test::logging::setup(); - let node = node::Node::memory().spawn().await?; + let node = Node::memory().build().await?; let res = node.blobs().add_bytes(&b"hello world"[..]).await?; @@ -1772,7 +1622,7 @@ mod tests { let _guard = iroh_test::logging::setup(); let dir = tempfile::tempdir()?; - let node = node::Node::persistent(dir.path()).await?.spawn().await?; + let node = Node::persistent(dir.path()).await?.build().await?; let res = node.blobs().add_bytes(&b"hello world"[..]).await?; @@ -1793,7 +1643,7 @@ mod tests { async fn test_ticket_multiple_addrs() -> TestResult<()> { let _guard = iroh_test::logging::setup(); - let node = Node::memory().spawn().await?; + let node = Node::memory().build().await?; let hash = node .blobs() .add_bytes(Bytes::from_static(b"hello")) @@ -1813,7 +1663,7 @@ mod tests { let _guard = iroh_test::logging::setup(); use std::io::Cursor; - let node = Node::memory().spawn().await?; + let node = Node::memory().build().await?; let blobs = node.blobs(); let input = vec![2u8; 1024 * 256]; // 265kb so actually streaming, chunk size is 64kb @@ -1830,7 +1680,7 @@ mod tests { async fn test_node_add_tagged_blob_event() -> Result<()> { let _guard = iroh_test::logging::setup(); - let node = Node::memory().spawn().await?; + let node = Node::memory().build().await?; let _got_hash = tokio::time::timeout(Duration::from_secs(10), async move { let mut stream = node @@ -1871,11 +1721,11 @@ mod tests { let endpoint1 = iroh_net::Endpoint::builder() .relay_mode(RelayMode::Custom(relay_map.clone())) .insecure_skip_relay_cert_verify(true); - let node1 = Node::memory().endpoint(endpoint1).spawn().await?; + let node1 = Node::memory().endpoint(endpoint1).build().await?; let endpoint2 = iroh_net::Endpoint::builder() .relay_mode(RelayMode::Custom(relay_map.clone())) .insecure_skip_relay_cert_verify(true); - let node2 = Node::memory().endpoint(endpoint2).spawn().await?; + let node2 = Node::memory().endpoint(endpoint2).build().await?; let AddOutcome { hash, .. } = node1.blobs().add_bytes(b"foo".to_vec()).await?; // create a node addr with only a relay URL, no direct addresses @@ -1907,7 +1757,7 @@ mod tests { .dns_resolver(dns_pkarr_server.dns_resolver()) .secret_key(secret1.clone()) .discovery(dns_pkarr_server.discovery(secret1)); - let node1 = Node::memory().endpoint(endpoint1).spawn().await?; + let node1 = Node::memory().endpoint(endpoint1).build().await?; let secret2 = SecretKey::generate(); let endpoint2 = iroh_net::Endpoint::builder() .relay_mode(RelayMode::Custom(relay_map.clone())) @@ -1915,7 +1765,7 @@ mod tests { .dns_resolver(dns_pkarr_server.dns_resolver()) .secret_key(secret2.clone()) .discovery(dns_pkarr_server.discovery(secret2)); - let node2 = Node::memory().endpoint(endpoint2).spawn().await?; + let node2 = Node::memory().endpoint(endpoint2).build().await?; let hash = node1.blobs().add_bytes(b"foo".to_vec()).await?.hash; // create a node addr with node id only diff --git a/src/test_utils.rs b/src/test_utils.rs new file mode 100644 index 000000000..bee68a094 --- /dev/null +++ b/src/test_utils.rs @@ -0,0 +1,131 @@ +//! An iroh node that just has the blobs transport +use std::{path::Path, sync::Arc}; + +use iroh_net::{Endpoint, NodeAddr, NodeId}; +use iroh_router::Router; + +use crate::{ + downloader::Downloader, + net_protocol::Blobs, + provider::{CustomEventSender, EventSender}, + rpc::client::blobs, + util::local_pool::LocalPool, +}; + +/// An iroh node that just has the blobs transport +#[derive(Debug)] + +pub struct Node { + router: iroh_router::Router, + client: blobs::MemClient, + _local_pool: LocalPool, +} + +/// An iroh node builder +#[derive(Debug)] +pub struct Builder { + store: S, + events: EventSender, + endpoint: Option, +} + +impl Builder { + /// Sets the event sender + pub fn blobs_events(self, events: impl CustomEventSender) -> Self { + Self { + events: events.into(), + ..self + } + } + + /// Set an endpoint builder + pub fn endpoint(self, endpoint: iroh_net::endpoint::Builder) -> Self { + Self { + endpoint: Some(endpoint), + ..self + } + } + + /// Build the node + pub async fn build(self) -> anyhow::Result { + let store = self.store; + let events = self.events; + let endpoint = self + .endpoint + .unwrap_or_else(|| Endpoint::builder().discovery_n0()) + .bind() + .await?; + let local_pool = LocalPool::single(); + let mut router = Router::builder(endpoint.clone()); + + // Setup blobs + let downloader = + Downloader::new(store.clone(), endpoint.clone(), local_pool.handle().clone()); + let blobs = Arc::new(Blobs::new_with_events( + store.clone(), + local_pool.handle().clone(), + events, + downloader, + endpoint.clone(), + )); + router = router.accept(crate::protocol::ALPN.to_vec(), blobs.clone()); + + // Build the router + let router = router.spawn().await?; + + // Setup RPC + let client = blobs.client(); + Ok(Node { + router, + client, + _local_pool: local_pool, + }) + } +} + +impl Node { + /// Creates a new node with memory storage + pub fn memory() -> Builder { + Builder { + store: crate::store::mem::Store::new(), + events: Default::default(), + endpoint: None, + } + } + + /// Creates a new node with persistent storage + pub async fn persistent( + path: impl AsRef, + ) -> anyhow::Result> { + Ok(Builder { + store: crate::store::fs::Store::load(path).await?, + events: Default::default(), + endpoint: None, + }) + } + + /// Returns the endpoint + pub fn endpoint(&self) -> &Endpoint { + self.router.endpoint() + } + + /// Returns the node id + pub fn node_id(&self) -> NodeId { + self.router.endpoint().node_id() + } + + /// Returns the node address + pub async fn node_addr(&self) -> anyhow::Result { + self.router.endpoint().node_addr().await + } + + /// Shuts down the node + pub async fn shutdown(self) -> anyhow::Result<()> { + self.router.shutdown().await + } + + /// Returns an in-memory blobs client + pub fn blobs(&self) -> &blobs::MemClient { + &self.client + } +}