Skip to content

Commit d23d37a

Browse files
Frandorklaehn
andauthored
deps: bump to iroh@main and irpc@main (#128)
## Description This bumps irpc to the main version, adapting for the breaking changes introduced in n0-computer/irpc#46. ## Breaking Changes - `iroh_blobs::api::proto::StoreService` is removed, `Request` now implements `irpc::Service` - `iroh_blobs::api::downloader::DownloaderService` is removed, `SwarmProtocol` now implements `irpc::Service` ## Notes & open questions <!-- Any notes, remarks or open questions you have to make about the PR. --> ## Change checklist - [ ] Self-review. - [ ] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [ ] Tests if relevant. - [ ] All breaking changes documented. --------- Co-authored-by: Ruediger Klaehn <[email protected]>
1 parent 73ca073 commit d23d37a

File tree

10 files changed

+248
-298
lines changed

10 files changed

+248
-298
lines changed

Cargo.lock

Lines changed: 218 additions & 223 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ self_cell = "1.1.0"
4242
genawaiter = { version = "0.99.1", features = ["futures03"] }
4343
iroh-base = "0.90"
4444
reflink-copy = "0.1.24"
45-
irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "message_spans", "stream", "derive"], default-features = false }
45+
irpc = { version = "0.5.0", features = ["rpc", "quinn_endpoint_setup", "spans", "stream", "derive"], default-features = false }
4646
iroh-metrics = { version = "0.35" }
4747

4848
[dev-dependencies]
@@ -69,4 +69,3 @@ default = ["hide-proto-docs"]
6969
iroh = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
7070
iroh-base = { git = "https://github.com/n0-computer/iroh.git", branch = "main" }
7171
irpc = { git = "https://github.com/n0-computer/irpc.git", branch = "main" }
72-
irpc-derive = { git = "https://github.com/n0-computer/irpc.git", branch = "main" }

examples/random_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ async fn provide(args: ProvideArgs) -> anyhow::Result<()> {
241241
let router = iroh::protocol::Router::builder(endpoint.clone())
242242
.accept(iroh_blobs::ALPN, blobs)
243243
.spawn();
244-
let addr = router.endpoint().node_addr().initialized().await?;
244+
let addr = router.endpoint().node_addr().initialized().await;
245245
let ticket = NodeTicket::from(addr.clone());
246246
println!("Node address: {addr:?}");
247247
println!("ticket:\n{ticket}");

src/api.rs

Lines changed: 5 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@
1212
//!
1313
//! You can also [`connect`](Store::connect) to a remote store that is listening
1414
//! to rpc requests.
15-
use std::{io, net::SocketAddr, ops::Deref, sync::Arc};
15+
use std::{io, net::SocketAddr, ops::Deref};
1616

1717
use bao_tree::io::EncodeError;
1818
use iroh::Endpoint;
19-
use irpc::rpc::{listen, Handler};
19+
use irpc::rpc::{listen, RemoteService};
2020
use n0_snafu::SpanTrace;
2121
use nested_enum_utils::common_fields;
2222
use proto::{Request, ShutdownRequest, SyncDbRequest};
@@ -32,7 +32,7 @@ pub mod remote;
3232
pub mod tags;
3333
pub use crate::{store::util::Tag, util::temp_tag::TempTag};
3434

35-
pub(crate) type ApiClient = irpc::Client<proto::Command, proto::Request, proto::StoreService>;
35+
pub(crate) type ApiClient = irpc::Client<proto::Request>;
3636

3737
#[common_fields({
3838
backtrace: Option<Backtrace>,
@@ -281,42 +281,8 @@ impl Store {
281281

282282
/// Listen on a quinn endpoint for incoming rpc connections.
283283
pub async fn listen(self, endpoint: quinn::Endpoint) {
284-
let local = self.client.local().unwrap().clone();
285-
let handler: Handler<Request> = Arc::new(move |req, rx, tx| {
286-
let local = local.clone();
287-
Box::pin({
288-
match req {
289-
Request::SetTag(msg) => local.send((msg, tx)),
290-
Request::CreateTag(msg) => local.send((msg, tx)),
291-
Request::DeleteTags(msg) => local.send((msg, tx)),
292-
Request::RenameTag(msg) => local.send((msg, tx)),
293-
Request::ListTags(msg) => local.send((msg, tx)),
294-
295-
Request::ListTempTags(msg) => local.send((msg, tx)),
296-
Request::CreateTempTag(msg) => local.send((msg, tx)),
297-
298-
Request::BlobStatus(msg) => local.send((msg, tx)),
299-
300-
Request::ImportBytes(msg) => local.send((msg, tx)),
301-
Request::ImportByteStream(msg) => local.send((msg, tx, rx)),
302-
Request::ImportBao(msg) => local.send((msg, tx, rx)),
303-
Request::ImportPath(msg) => local.send((msg, tx)),
304-
Request::ListBlobs(msg) => local.send((msg, tx)),
305-
Request::DeleteBlobs(msg) => local.send((msg, tx)),
306-
Request::Batch(msg) => local.send((msg, tx, rx)),
307-
308-
Request::ExportBao(msg) => local.send((msg, tx)),
309-
Request::ExportRanges(msg) => local.send((msg, tx)),
310-
Request::ExportPath(msg) => local.send((msg, tx)),
311-
312-
Request::Observe(msg) => local.send((msg, tx)),
313-
314-
Request::ClearProtected(msg) => local.send((msg, tx)),
315-
Request::SyncDb(msg) => local.send((msg, tx)),
316-
Request::Shutdown(msg) => local.send((msg, tx)),
317-
}
318-
})
319-
});
284+
let local = self.client.as_local().unwrap().clone();
285+
let handler = Request::remote_handler(local);
320286
listen::<Request>(endpoint, handler).await
321287
}
322288

src/api/downloader.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,10 @@ use crate::{
2929

3030
#[derive(Debug, Clone)]
3131
pub struct Downloader {
32-
client: irpc::Client<SwarmMsg, SwarmProtocol, DownloaderService>,
32+
client: irpc::Client<SwarmProtocol>,
3333
}
3434

35-
#[derive(Debug, Clone)]
36-
pub struct DownloaderService;
37-
38-
impl irpc::Service for DownloaderService {}
39-
40-
#[rpc_requests(DownloaderService, message = SwarmMsg, alias = "Msg")]
35+
#[rpc_requests(message = SwarmMsg, alias = "Msg")]
4136
#[derive(Debug, Serialize, Deserialize)]
4237
enum SwarmProtocol {
4338
#[rpc(tx = mpsc::Sender<DownloadProgessItem>)]
@@ -711,9 +706,9 @@ mod tests {
711706
let (r3, store3, _) = node_test_setup_fs(testdir.path().join("c")).await?;
712707
let tt1 = store1.add_slice("hello world").await?;
713708
let tt2 = store2.add_slice("hello world 2").await?;
714-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
709+
let node1_addr = r1.endpoint().node_addr().initialized().await;
715710
let node1_id = node1_addr.node_id;
716-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
711+
let node2_addr = r2.endpoint().node_addr().initialized().await;
717712
let node2_id = node2_addr.node_id;
718713
let swarm = Downloader::new(&store3, r3.endpoint());
719714
r3.endpoint().add_node_addr(node1_addr.clone())?;
@@ -750,9 +745,9 @@ mod tests {
750745
format: crate::BlobFormat::HashSeq,
751746
})
752747
.await?;
753-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
748+
let node1_addr = r1.endpoint().node_addr().initialized().await;
754749
let node1_id = node1_addr.node_id;
755-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
750+
let node2_addr = r2.endpoint().node_addr().initialized().await;
756751
let node2_id = node2_addr.node_id;
757752
let swarm = Downloader::new(&store3, r3.endpoint());
758753
r3.endpoint().add_node_addr(node1_addr.clone())?;
@@ -819,9 +814,9 @@ mod tests {
819814
format: crate::BlobFormat::HashSeq,
820815
})
821816
.await?;
822-
let node1_addr = r1.endpoint().node_addr().initialized().await?;
817+
let node1_addr = r1.endpoint().node_addr().initialized().await;
823818
let node1_id = node1_addr.node_id;
824-
let node2_addr = r2.endpoint().node_addr().initialized().await?;
819+
let node2_addr = r2.endpoint().node_addr().initialized().await;
825820
let node2_id = node2_addr.node_id;
826821
let swarm = Downloader::new(&store3, r3.endpoint());
827822
r3.endpoint().add_node_addr(node1_addr.clone())?;

src/api/proto.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,7 @@ impl HashSpecific for CreateTagMsg {
8686
}
8787
}
8888

89-
#[derive(Debug, Clone)]
90-
pub struct StoreService;
91-
impl irpc::Service for StoreService {}
92-
93-
#[rpc_requests(StoreService, message = Command, alias = "Msg")]
89+
#[rpc_requests(message = Command, alias = "Msg")]
9490
#[derive(Debug, Serialize, Deserialize)]
9591
pub enum Request {
9692
#[rpc(tx = mpsc::Sender<super::Result<Hash>>)]

src/net_protocol.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ impl BlobsProtocol {
9999
/// just a convenience method to create a ticket from content and the address of this node.
100100
pub async fn ticket(&self, content: impl Into<HashAndFormat>) -> anyhow::Result<BlobTicket> {
101101
let content = content.into();
102-
let addr = self.inner.endpoint.node_addr().initialized().await?;
102+
let addr = self.inner.endpoint.node_addr().initialized().await;
103103
let ticket = BlobTicket::new(addr, content.hash, content.format);
104104
Ok(ticket)
105105
}

src/store/fs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1259,7 +1259,7 @@ impl AsRef<Store> for FsStore {
12591259

12601260
impl FsStore {
12611261
fn new(
1262-
sender: irpc::LocalSender<proto::Command, proto::StoreService>,
1262+
sender: irpc::LocalSender<proto::Request>,
12631263
db: tokio::sync::mpsc::Sender<InternalCommand>,
12641264
) -> Self {
12651265
Self {

src/store/fs/import.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@ use crate::{
3939
blobs::{AddProgressItem, ImportMode},
4040
proto::{
4141
HashSpecific, ImportByteStreamMsg, ImportByteStreamRequest, ImportByteStreamUpdate,
42-
ImportBytesMsg, ImportBytesRequest, ImportPathMsg, ImportPathRequest, Scope,
43-
StoreService,
42+
ImportBytesMsg, ImportBytesRequest, ImportPathMsg, ImportPathRequest, Request, Scope,
4443
},
4544
},
4645
store::{
@@ -136,12 +135,12 @@ impl std::fmt::Debug for ImportEntry {
136135
}
137136
}
138137

139-
impl Channels<StoreService> for ImportEntry {
138+
impl Channels<Request> for ImportEntry {
140139
type Tx = mpsc::Sender<AddProgressItem>;
141140
type Rx = NoReceiver;
142141
}
143142

144-
pub type ImportEntryMsg = WithChannels<ImportEntry, StoreService>;
143+
pub type ImportEntryMsg = WithChannels<ImportEntry, Request>;
145144

146145
impl HashSpecific for ImportEntryMsg {
147146
fn hash(&self) -> Hash {

src/tests.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ async fn two_nodes_get_blobs(
226226
for size in sizes {
227227
tts.push(store1.add_bytes(test_data(size)).await?);
228228
}
229-
let addr1 = r1.endpoint().node_addr().initialized().await?;
229+
let addr1 = r1.endpoint().node_addr().initialized().await;
230230
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
231231
for size in sizes {
232232
let hash = Hash::new(test_data(size));
@@ -259,7 +259,7 @@ async fn two_nodes_observe(
259259
let size = 1024 * 1024 * 8 + 1;
260260
let data = test_data(size);
261261
let (hash, bao) = create_n0_bao(&data, &ChunkRanges::all())?;
262-
let addr1 = r1.endpoint().node_addr().initialized().await?;
262+
let addr1 = r1.endpoint().node_addr().initialized().await;
263263
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
264264
let mut stream = store2
265265
.remote()
@@ -308,7 +308,7 @@ async fn two_nodes_get_many(
308308
tts.push(store1.add_bytes(test_data(size)).await?);
309309
}
310310
let hashes = tts.iter().map(|tt| tt.hash).collect::<Vec<_>>();
311-
let addr1 = r1.endpoint().node_addr().initialized().await?;
311+
let addr1 = r1.endpoint().node_addr().initialized().await;
312312
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
313313
store2
314314
.remote()
@@ -381,7 +381,7 @@ async fn two_nodes_push_blobs(
381381
for size in sizes {
382382
tts.push(store1.add_bytes(test_data(size)).await?);
383383
}
384-
let addr2 = r2.endpoint().node_addr().initialized().await?;
384+
let addr2 = r2.endpoint().node_addr().initialized().await;
385385
let conn = r1.endpoint().connect(addr2, crate::ALPN).await?;
386386
for size in sizes {
387387
let hash = Hash::new(test_data(size));
@@ -542,7 +542,7 @@ async fn two_nodes_hash_seq(
542542
r2: Router,
543543
store2: &Store,
544544
) -> TestResult<()> {
545-
let addr1 = r1.endpoint().node_addr().initialized().await?;
545+
let addr1 = r1.endpoint().node_addr().initialized().await;
546546
let sizes = INTERESTING_SIZES;
547547
let root = add_test_hash_seq(store1, sizes).await?;
548548
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
@@ -569,7 +569,7 @@ async fn two_nodes_hash_seq_mem() -> TestResult<()> {
569569
async fn two_nodes_hash_seq_progress() -> TestResult<()> {
570570
tracing_subscriber::fmt::try_init().ok();
571571
let (_testdir, (r1, store1, _), (r2, store2, _)) = two_node_test_setup_fs().await?;
572-
let addr1 = r1.endpoint().node_addr().initialized().await?;
572+
let addr1 = r1.endpoint().node_addr().initialized().await;
573573
let sizes = INTERESTING_SIZES;
574574
let root = add_test_hash_seq(&store1, sizes).await?;
575575
let conn = r2.endpoint().connect(addr1, crate::ALPN).await?;
@@ -605,7 +605,7 @@ async fn node_serve_hash_seq() -> TestResult<()> {
605605
let r1 = Router::builder(endpoint)
606606
.accept(crate::protocol::ALPN, blobs)
607607
.spawn();
608-
let addr1 = r1.endpoint().node_addr().initialized().await?;
608+
let addr1 = r1.endpoint().node_addr().initialized().await;
609609
info!("node addr: {addr1:?}");
610610
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
611611
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;
@@ -636,7 +636,7 @@ async fn node_serve_blobs() -> TestResult<()> {
636636
let r1 = Router::builder(endpoint)
637637
.accept(crate::protocol::ALPN, blobs)
638638
.spawn();
639-
let addr1 = r1.endpoint().node_addr().initialized().await?;
639+
let addr1 = r1.endpoint().node_addr().initialized().await;
640640
info!("node addr: {addr1:?}");
641641
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
642642
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;
@@ -678,7 +678,7 @@ async fn node_smoke(store: &Store) -> TestResult<()> {
678678
let r1 = Router::builder(endpoint)
679679
.accept(crate::protocol::ALPN, blobs)
680680
.spawn();
681-
let addr1 = r1.endpoint().node_addr().initialized().await?;
681+
let addr1 = r1.endpoint().node_addr().initialized().await;
682682
info!("node addr: {addr1:?}");
683683
let endpoint2 = Endpoint::builder().discovery_n0().bind().await?;
684684
let conn = endpoint2.connect(addr1, crate::protocol::ALPN).await?;

0 commit comments

Comments
 (0)