Skip to content

deps: bump to iroh@main and irpc@main #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jul 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
441 changes: 218 additions & 223 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ self_cell = "1.1.0"
genawaiter = { version = "0.99.1", features = ["futures03"] }
iroh-base = "0.90"
reflink-copy = "0.1.24"
irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"], default-features = false }
irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
iroh-metrics = { version = "0.35" }

[dev-dependencies]
Expand All @@ -69,4 +69,3 @@ default = ["hide-proto-docs"]
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
irpc = { git = "https://github.com/n0-computer/irpc.git", branch = "main" }
irpc-derive = { git = "https://github.com/n0-computer/irpc.git", branch = "main" }
2 changes: 1 addition & 1 deletion examples/random_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
let router = iroh::protocol::Router::builder(endpoint.clone())
.accept(iroh_blobs::ALPN, blobs)
.spawn();
let addr = router.endpoint().node_addr().initialized().await?;
let addr = router.endpoint().node_addr().initialized().await;
let ticket = NodeTicket::from(addr.clone());
println!("Node address: {addr:?}");
println!("ticket:\n{ticket}");
Expand Down
44 changes: 5 additions & 39 deletions src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
//!
//! You can also [`connect`](Store::connect) to a remote store that is listening
//! to rpc requests.
use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
use std::{io, net::SocketAddr, ops::Deref};

use bao_tree::io::EncodeError;
use iroh::Endpoint;
use irpc::rpc::{listen, Handler};
use irpc::rpc::{listen, RemoteService};
use n0_snafu::SpanTrace;
use nested_enum_utils::common_fields;
use proto::{Request, ShutdownRequest, SyncDbRequest};
Expand All @@ -32,7 +32,7 @@ pub mod remote;
pub mod tags;
pub use crate::{store::util::Tag, util::temp_tag::TempTag};

pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
pub(crate) type ApiClient = irpc::Client<proto::Request>;

#[common_fields({
backtrace: Option<Backtrace>,
Expand Down Expand Up @@ -281,42 +281,8 @@ impl Store {

/// Listen on a quinn endpoint for incoming rpc connections.
pub async fn listen(self, endpoint: quinn::Endpoint) {
let local = self.client.local().unwrap().clone();
let handler: Handler<Request> = Arc::new(move |req, rx, tx| {
let local = local.clone();
Box::pin({
match req {
Request::SetTag(msg) => local.send((msg, tx)),
Request::CreateTag(msg) => local.send((msg, tx)),
Request::DeleteTags(msg) => local.send((msg, tx)),
Request::RenameTag(msg) => local.send((msg, tx)),
Request::ListTags(msg) => local.send((msg, tx)),

Request::ListTempTags(msg) => local.send((msg, tx)),
Request::CreateTempTag(msg) => local.send((msg, tx)),

Request::BlobStatus(msg) => local.send((msg, tx)),

Request::ImportBytes(msg) => local.send((msg, tx)),
Request::ImportByteStream(msg) => local.send((msg, tx, rx)),
Request::ImportBao(msg) => local.send((msg, tx, rx)),
Request::ImportPath(msg) => local.send((msg, tx)),
Request::ListBlobs(msg) => local.send((msg, tx)),
Request::DeleteBlobs(msg) => local.send((msg, tx)),
Request::Batch(msg) => local.send((msg, tx, rx)),

Request::ExportBao(msg) => local.send((msg, tx)),
Request::ExportRanges(msg) => local.send((msg, tx)),
Request::ExportPath(msg) => local.send((msg, tx)),

Request::Observe(msg) => local.send((msg, tx)),

Request::ClearProtected(msg) => local.send((msg, tx)),
Request::SyncDb(msg) => local.send((msg, tx)),
Request::Shutdown(msg) => local.send((msg, tx)),
}
})
});
let local = self.client.as_local().unwrap().clone();
let handler = Request::remote_handler(local);
listen::<Request>(endpoint, handler).await
}

Expand Down
21 changes: 8 additions & 13 deletions src/api/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ use crate::{

#[derive(Debug, Clone)]
pub struct Downloader {
client: irpc::Client<SwarmMsg, SwarmProtocol, DownloaderService>,
client: irpc::Client<SwarmProtocol>,
}

#[derive(Debug, Clone)]
pub struct DownloaderService;

impl irpc::Service for DownloaderService {}

#[rpc_requests(DownloaderService, message = SwarmMsg, alias = "Msg")]
#[rpc_requests(message = SwarmMsg, alias = "Msg")]
#[derive(Debug, Serialize, Deserialize)]
enum SwarmProtocol {
#[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
Expand Down Expand Up @@ -711,9 +706,9 @@ mod tests {
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
let tt1 = store1.add_slice("hello world").await?;
let tt2 = store2.add_slice("hello world 2").await?;
let node1_addr = r1.endpoint().node_addr().initialized().await?;
let node1_addr = r1.endpoint().node_addr().initialized().await;
let node1_id = node1_addr.node_id;
let node2_addr = r2.endpoint().node_addr().initialized().await?;
let node2_addr = r2.endpoint().node_addr().initialized().await;
let node2_id = node2_addr.node_id;
let swarm = Downloader::new(&store3, r3.endpoint());
r3.endpoint().add_node_addr(node1_addr.clone())?;
Expand Down Expand Up @@ -750,9 +745,9 @@ mod tests {
format: crate::BlobFormat::HashSeq,
})
.await?;
let node1_addr = r1.endpoint().node_addr().initialized().await?;
let node1_addr = r1.endpoint().node_addr().initialized().await;
let node1_id = node1_addr.node_id;
let node2_addr = r2.endpoint().node_addr().initialized().await?;
let node2_addr = r2.endpoint().node_addr().initialized().await;
let node2_id = node2_addr.node_id;
let swarm = Downloader::new(&store3, r3.endpoint());
r3.endpoint().add_node_addr(node1_addr.clone())?;
Expand Down Expand Up @@ -819,9 +814,9 @@ mod tests {
format: crate::BlobFormat::HashSeq,
})
.await?;
let node1_addr = r1.endpoint().node_addr().initialized().await?;
let node1_addr = r1.endpoint().node_addr().initialized().await;
let node1_id = node1_addr.node_id;
let node2_addr = r2.endpoint().node_addr().initialized().await?;
let node2_addr = r2.endpoint().node_addr().initialized().await;
let node2_id = node2_addr.node_id;
let swarm = Downloader::new(&store3, r3.endpoint());
r3.endpoint().add_node_addr(node1_addr.clone())?;
Expand Down
6 changes: 1 addition & 5 deletions src/api/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ impl HashSpecific for CreateTagMsg {
}
}

#[derive(Debug, Clone)]
pub struct StoreService;
impl irpc::Service for StoreService {}

#[rpc_requests(StoreService, message = Command, alias = "Msg")]
#[rpc_requests(message = Command, alias = "Msg")]
#[derive(Debug, Serialize, Deserialize)]
pub enum Request {
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]
Expand Down
2 changes: 1 addition & 1 deletion src/net_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl BlobsProtocol {
/// just a convenience method to create a ticket from content and the address of this node.
pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
let content = content.into();
let addr = self.inner.endpoint.node_addr().initialized().await?;
let addr = self.inner.endpoint.node_addr().initialized().await;
let ticket = BlobTicket::new(addr, content.hash, content.format);
Ok(ticket)
}
Expand Down
2 changes: 1 addition & 1 deletion src/store/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1259,7 +1259,7 @@ impl AsRef<Store> for FsStore {

impl FsStore {
fn new(
sender: irpc::LocalSender<proto::Command, proto::StoreService>,
sender: irpc::LocalSender<proto::Request>,
db: tokio::sync::mpsc::Sender<InternalCommand>,
) -> Self {
Self {
Expand Down
7 changes: 3 additions & 4 deletions src/store/fs/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ use crate::{
blobs::{AddProgressItem, ImportMode},
proto::{
HashSpecific, ImportByteStreamMsg, ImportByteStreamRequest, ImportByteStreamUpdate,
ImportBytesMsg, ImportBytesRequest, ImportPathMsg, ImportPathRequest, Scope,
StoreService,
ImportBytesMsg, ImportBytesRequest, ImportPathMsg, ImportPathRequest, Request, Scope,
},
},
store::{
Expand Down Expand Up @@ -136,12 +135,12 @@ impl std::fmt::Debug for ImportEntry {
}
}

impl Channels<StoreService> for ImportEntry {
impl Channels<Request> for ImportEntry {
type Tx = mpsc::Sender<AddProgressItem>;
type Rx = NoReceiver;
}

pub type ImportEntryMsg = WithChannels<ImportEntry, StoreService>;
pub type ImportEntryMsg = WithChannels<ImportEntry, Request>;

impl HashSpecific for ImportEntryMsg {
fn hash(&self) -> Hash {
Expand Down
18 changes: 9 additions & 9 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ async fn two_nodes_get_blobs(
for size in sizes {
tts.push(store1.add_bytes(test_data(size)).await?);
}
let addr1 = r1.endpoint().node_addr().initialized().await?;
let addr1 = r1.endpoint().node_addr().initialized().await;
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
for size in sizes {
let hash = Hash::new(test_data(size));
Expand Down Expand Up @@ -259,7 +259,7 @@ async fn two_nodes_observe(
let size = 1024 * 1024 * 8 + 1;
let data = test_data(size);
let (hash, bao) = create_n0_bao(&data, &ChunkRanges::all())?;
let addr1 = r1.endpoint().node_addr().initialized().await?;
let addr1 = r1.endpoint().node_addr().initialized().await;
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
let mut stream = store2
.remote()
Expand Down Expand Up @@ -308,7 +308,7 @@ async fn two_nodes_get_many(
tts.push(store1.add_bytes(test_data(size)).await?);
}
let hashes = tts.iter().map(|tt| tt.hash).collect::<Vec<_>>();
let addr1 = r1.endpoint().node_addr().initialized().await?;
let addr1 = r1.endpoint().node_addr().initialized().await;
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
store2
.remote()
Expand Down Expand Up @@ -381,7 +381,7 @@ async fn two_nodes_push_blobs(
for size in sizes {
tts.push(store1.add_bytes(test_data(size)).await?);
}
let addr2 = r2.endpoint().node_addr().initialized().await?;
let addr2 = r2.endpoint().node_addr().initialized().await;
let conn = r1.endpoint().connect(addr2, crate::ALPN).await?;
for size in sizes {
let hash = Hash::new(test_data(size));
Expand Down Expand Up @@ -542,7 +542,7 @@ async fn two_nodes_hash_seq(
r2: Router,
store2: &Store,
) -> TestResult<()> {
let addr1 = r1.endpoint().node_addr().initialized().await?;
let addr1 = r1.endpoint().node_addr().initialized().await;
let sizes = INTERESTING_SIZES;
let root = add_test_hash_seq(store1, sizes).await?;
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
Expand All @@ -569,7 +569,7 @@ async fn two_nodes_hash_seq_mem() -> TestResult<()> {
async fn two_nodes_hash_seq_progress() -> TestResult<()> {
tracing_subscriber::fmt::try_init().ok();
let (_testdir, (r1, store1, _), (r2, store2, _)) = two_node_test_setup_fs().await?;
let addr1 = r1.endpoint().node_addr().initialized().await?;
let addr1 = r1.endpoint().node_addr().initialized().await;
let sizes = INTERESTING_SIZES;
let root = add_test_hash_seq(&store1, sizes).await?;
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
Expand Down Expand Up @@ -605,7 +605,7 @@ async fn node_serve_hash_seq() -> TestResult<()> {
let r1 = Router::builder(endpoint)
.accept(crate::protocol::ALPN, blobs)
.spawn();
let addr1 = r1.endpoint().node_addr().initialized().await?;
let addr1 = r1.endpoint().node_addr().initialized().await;
info!("node addr: {addr1:?}");
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;
Expand Down Expand Up @@ -636,7 +636,7 @@ async fn node_serve_blobs() -> TestResult<()> {
let r1 = Router::builder(endpoint)
.accept(crate::protocol::ALPN, blobs)
.spawn();
let addr1 = r1.endpoint().node_addr().initialized().await?;
let addr1 = r1.endpoint().node_addr().initialized().await;
info!("node addr: {addr1:?}");
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;
Expand Down Expand Up @@ -678,7 +678,7 @@ async fn node_smoke(store: &Store) -> TestResult<()> {
let r1 = Router::builder(endpoint)
.accept(crate::protocol::ALPN, blobs)
.spawn();
let addr1 = r1.endpoint().node_addr().initialized().await?;
let addr1 = r1.endpoint().node_addr().initialized().await;
info!("node addr: {addr1:?}");
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;
Expand Down
Loading