Skip to content

Commit c6029ae

Browse files
authored
Merge pull request #14 from n0-computer/Frando/gossip
feat: add support for iroh-gossip
2 parents e030fb1 + 0ac8e14 commit c6029ae

File tree

4 files changed

+135
-0
lines changed

4 files changed

+135
-0
lines changed

Cargo.lock

Lines changed: 23 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ tracing = "0.1.41"
2828
rand = "0.8"
2929
derive_more = { version = "2.0.1", features = ["from"] }
3030
strum = { version = "0.27.1", features = ["derive"] }
31+
iroh-gossip = { version = "0.34.1", default-features = false }
3132

3233
[features]
3334
bin = ["iroh-blobs/rpc"]

src/client.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use iroh::{
66
Endpoint, NodeAddr, NodeId,
77
};
88
use iroh_blobs::{ticket::BlobTicket, BlobFormat, Hash};
9+
use iroh_gossip::proto::TopicId;
910
use n0_future::{task::AbortOnDropHandle, SinkExt, StreamExt};
1011
use rand::Rng;
1112
use rcan::Rcan;
@@ -192,6 +193,36 @@ impl Client {
192193
let res = r.await??;
193194
Ok(res)
194195
}
196+
197+
/// Create a gossip topic.
198+
pub async fn put_gossip_topic(
199+
&mut self,
200+
topic: TopicId,
201+
label: String,
202+
bootstrap: Vec<NodeId>,
203+
) -> Result<()> {
204+
let (s, r) = oneshot::channel();
205+
self.sender
206+
.send(ActorMessage::PutTopic {
207+
topic,
208+
label,
209+
bootstrap,
210+
s,
211+
})
212+
.await?;
213+
let res = r.await??;
214+
Ok(res)
215+
}
216+
217+
/// Delete a gossip topic.
218+
pub async fn delete_gossip_topic(&mut self, topic: TopicId) -> Result<()> {
219+
let (s, r) = oneshot::channel();
220+
self.sender
221+
.send(ActorMessage::DeleteTopic { topic, s })
222+
.await?;
223+
let res = r.await??;
224+
Ok(res)
225+
}
195226
}
196227

197228
struct Actor {
@@ -237,6 +268,16 @@ enum ActorMessage {
237268
name: String,
238269
s: oneshot::Sender<anyhow::Result<Hash>>,
239270
},
271+
PutTopic {
272+
topic: iroh_gossip::proto::TopicId,
273+
label: String,
274+
bootstrap: Vec<NodeId>,
275+
s: oneshot::Sender<anyhow::Result<()>>,
276+
},
277+
DeleteTopic {
278+
topic: iroh_gossip::proto::TopicId,
279+
s: oneshot::Sender<anyhow::Result<()>>,
280+
},
240281
}
241282

242283
impl Actor {
@@ -378,6 +419,61 @@ impl Actor {
378419
};
379420
s.send(response).ok();
380421
}
422+
ActorMessage::PutTopic {
423+
topic,
424+
label,
425+
bootstrap,
426+
s,
427+
} => {
428+
if let Err(err) = self
429+
.writer
430+
.send(ServerMessage::PutTopic {
431+
topic: *topic.as_bytes(),
432+
label,
433+
bootstrap,
434+
})
435+
.await
436+
{
437+
s.send(Err(err.into())).ok();
438+
return;
439+
}
440+
let response = match self.reader.next().await {
441+
Some(Ok(msg)) => match msg {
442+
ClientMessage::PutTopicResponse(None) => Ok(()),
443+
ClientMessage::PutTopicResponse(Some(err)) => {
444+
Err(anyhow!("put topic failed: {}", err))
445+
}
446+
_ => Err(anyhow!("unexpected message from server: {:?}", msg)),
447+
},
448+
Some(Err(err)) => Err(anyhow!("failed to receive response: {:?}", err)),
449+
None => Err(anyhow!("connection closed")),
450+
};
451+
s.send(response).ok();
452+
}
453+
ActorMessage::DeleteTopic { topic, s } => {
454+
if let Err(err) = self
455+
.writer
456+
.send(ServerMessage::DeleteTopic {
457+
topic: *topic.as_bytes(),
458+
})
459+
.await
460+
{
461+
s.send(Err(err.into())).ok();
462+
return;
463+
}
464+
let response = match self.reader.next().await {
465+
Some(Ok(msg)) => match msg {
466+
ClientMessage::DeleteTopicResponse(None) => Ok(()),
467+
ClientMessage::DeleteTopicResponse(Some(err)) => {
468+
Err(anyhow!("delete topic failed: {}", err))
469+
}
470+
_ => Err(anyhow!("unexpected message from server: {:?}", msg)),
471+
},
472+
Some(Err(err)) => Err(anyhow!("failed to receive response: {:?}", err)),
473+
None => Err(anyhow!("connection closed")),
474+
};
475+
s.send(response).ok();
476+
}
381477
}
382478
}
383479

src/protocol.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use iroh::NodeId;
12
use iroh_blobs::{ticket::BlobTicket, Hash};
23
use rcan::Rcan;
34
use serde::{Deserialize, Serialize};
@@ -7,6 +8,8 @@ use crate::caps::Caps;
78

89
pub const ALPN: &[u8] = b"/iroh/n0des/1";
910

11+
pub type ProtoTopicId = [u8; 32];
12+
1013
/// Messages sent from the client to the server
1114
#[allow(clippy::large_enum_variant)]
1215
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -15,6 +18,14 @@ pub enum ServerMessage {
1518
Auth(Rcan<Caps>),
1619
/// Request that the node fetches the given blob.
1720
PutBlob { ticket: BlobTicket, name: String },
21+
/// Request that the node joins the given tossip topic
22+
PutTopic {
23+
topic: ProtoTopicId,
24+
label: String,
25+
bootstrap: Vec<NodeId>,
26+
},
27+
/// Request that the node joins the given tossip topic
28+
DeleteTopic { topic: ProtoTopicId },
1829
/// Request the name of a blob held by the node
1930
GetTag { name: String },
2031
/// Request to store the given metrics data
@@ -33,6 +44,10 @@ pub enum ClientMessage {
3344
AuthResponse(Option<String>),
3445
/// If set, this means it was an error.
3546
PutBlobResponse(Option<String>),
47+
/// If set, this means it was an error.
48+
PutTopicResponse(Option<String>),
49+
/// If set, this means it was an error.
50+
DeleteTopicResponse(Option<String>),
3651
/// Simple pong response
3752
Pong {
3853
req: [u8; 32],

0 commit comments

Comments
 (0)