diff --git a/README.md b/README.md index 38817043..6136a720 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # iroh-gossip This crate implements the `iroh-gossip` protocol. -It is based on *epidemic broadcast trees* to disseminate messages among a swarm of peers interested in a *topic*. +It is based on *epidemic broadcast trees* to disseminate messages among a swarm of peers interested in a *topic*. The implementation is based on the papers [HyParView](https://asc.di.fct.unl.pt/~jleitao/pdf/dsn07-leitao.pdf) and [PlumTree](https://asc.di.fct.unl.pt/~jleitao/pdf/srds07-leitao.pdf). The crate is made up from two modules: @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> { let endpoint = Endpoint::builder().discovery_n0().bind().await?; // build gossip protocol - let gossip = Gossip::builder().spawn(endpoint.clone()).await?; + let gossip = Gossip::builder().spawn(endpoint.clone()); // setup router let router = Router::builder(endpoint.clone()) diff --git a/src/net.rs b/src/net.rs index 893b3681..49250435 100644 --- a/src/net.rs +++ b/src/net.rs @@ -10,19 +10,18 @@ use std::{ use anyhow::Context as _; use bytes::BytesMut; +use discovery::GossipDiscovery; use futures_concurrency::stream::{stream_group, StreamGroup}; -use futures_util::FutureExt as _; use iroh::{ - endpoint::{Connection, DirectAddr}, - protocol::ProtocolHandler, - Endpoint, NodeAddr, NodeId, PublicKey, RelayUrl, + endpoint::Connection, node_info::NodeData, protocol::ProtocolHandler, Endpoint, NodeAddr, + NodeId, PublicKey, RelayUrl, }; use iroh_metrics::inc; use n0_future::{ boxed::BoxFuture, task::{self, AbortOnDropHandle, JoinSet}, time::Instant, - Stream, StreamExt as _, TryFutureExt as _, + Stream, StreamExt, TryFutureExt as _, }; use rand::rngs::StdRng; use rand_core::SeedableRng; @@ -37,6 +36,7 @@ use crate::{ proto::{self, HyparviewConfig, PeerData, PlumtreeConfig, Scope, TopicId}, }; +mod discovery; mod handles; pub mod util; @@ -57,8 +57,6 @@ const SEND_QUEUE_CAP: usize = 64; const TO_ACTOR_CAP: usize = 64; /// Channel capacity for the InEvent message queue (single) const IN_EVENT_CAP: usize = 1024; -/// Name used for logging when new node addresses are added from gossip. -const SOURCE_NAME: &str = "gossip"; /// Events emitted from the gossip protocol pub type ProtoEvent = proto::Event; @@ -168,6 +166,7 @@ impl ProtocolHandler for Gossip { #[derive(Debug, Clone)] pub struct Builder { config: proto::Config, + discovery: Option, } impl Builder { @@ -190,36 +189,26 @@ impl Builder { self } - /// Spawn a gossip actor and get a handle for it - pub async fn spawn(self, endpoint: Endpoint) -> Result { - // We want to wait for our endpoint to be addressable by other nodes before launching gossip, - // because otherwise our Join messages, which will be forwarded into the swarm through a random - // walk, might not include an address to talk back to us. - // `Endpoint::node_addr` always waits for direct addresses to be available, which never completes - // when running as WASM in browser. Therefore, we instead race the futures that wait for the direct - // addresses or the home relay to be initialized, and construct our node address from that. - // TODO: Make `Endpoint` provide a more straightforward API for that. - let addr = { - n0_future::future::race( - endpoint.direct_addresses().initialized().map(|_| ()), - endpoint.home_relay().initialized().map(|_| ()), - ) - .await; - let addrs = endpoint - .direct_addresses() - .get() - .expect("endpoint alive") - .unwrap_or_default() - .into_iter() - .map(|x| x.addr); - let home_relay = endpoint.home_relay().get().expect("endpoint alive"); - NodeAddr::from_parts(endpoint.node_id(), home_relay, addrs) - }; - - let (actor, to_actor_tx) = Actor::new(endpoint, self.config, &addr.into()); - let me = actor.endpoint.node_id().fmt_short(); - let max_message_size = actor.state.max_message_size(); + /// Optionally enable broadcasting and receiving node addresses over gossip. + /// + /// If you are using a discovery service on your [`Endpoint`], and all nodes participating in gossip + /// are discoverable through this discovery service, you do not need to enable this. + /// + /// If you are managing node addresses manually, you can create an instance of [`GossipDiscovery`] and + /// add it to both the endpoint and here. Then gossip will include our node's address info in join + /// and forward join messages, so that other nodes can contact us through that info. + /// We will then also collect the address info retrieved via gossip messages and make them available + /// to the endpoint. + pub fn use_gossip_for_discovery(mut self, discovery: GossipDiscovery) -> Self { + self.discovery = Some(discovery); + self + } + /// Spawns a gossip actor and returns the [`Gossip`] handle. + pub fn spawn(self, endpoint: Endpoint) -> Gossip { + let me = endpoint.node_id().fmt_short(); + let max_message_size = self.config.max_message_size; + let (actor, to_actor_tx) = Actor::new(endpoint, self.config, self.discovery); let actor_handle = task::spawn( async move { if let Err(err) = actor.run().await { @@ -229,7 +218,7 @@ impl Builder { .instrument(error_span!("gossip", %me)), ); - Ok(Gossip { + Gossip { inner: Inner { to_actor_tx, _actor_handle: AbortOnDropHandle::new(actor_handle), @@ -239,7 +228,7 @@ impl Builder { .into(), #[cfg(feature = "rpc")] rpc_handler: Default::default(), - }) + } } } @@ -248,22 +237,23 @@ impl Gossip { pub fn builder() -> Builder { Builder { config: Default::default(), + discovery: None, } } - /// Get the maximum message size configured for this gossip actor. + /// Returns the maximum message size configured for this gossip actor. pub fn max_message_size(&self) -> usize { self.inner.max_message_size } - /// Handle an incoming [`Connection`]. + /// Handles an incoming [`Connection`]. /// /// Make sure to check the ALPN protocol yourself before passing the connection. pub async fn handle_connection(&self, conn: Connection) -> Result<(), Error> { self.inner.handle_connection(conn).await } - /// Join a gossip topic with the default options and wait for at least one active connection. + /// Joins a gossip topic with the default options and waits for at least one active connection to be established. pub async fn subscribe_and_join( &self, topic_id: TopicId, @@ -274,9 +264,10 @@ impl Gossip { Ok(sub) } - /// Join a gossip topic with the default options. + /// Joins a gossip topic with the default options. /// - /// Note that this will not wait for any bootstrap node to be available. To ensure the topic is connected to at least one node, use [`GossipTopic::joined`] or [`Gossip::subscribe_and_join`] + /// Note that this will not wait for any bootstrap node to be available. + /// To ensure the topic is connected to at least one node, use [`GossipTopic::joined`] or [`Gossip::subscribe_and_join`] pub fn subscribe( &self, topic_id: TopicId, @@ -287,7 +278,7 @@ impl Gossip { Ok(sub) } - /// Join a gossip topic with options. + /// Joins a gossip topic with options. /// /// Returns a [`GossipTopic`] instantly. To wait for at least one connection to be established, /// you can await [`GossipTopic::joined`]. @@ -301,7 +292,7 @@ impl Gossip { GossipTopic::new(command_tx, event_rx) } - /// Join a gossip topic with options and an externally-created update stream. + /// Joins a gossip topic with options and an externally-created update stream. /// /// This method differs from [`Self::subscribe_with_opts`] by letting you pass in a `updates` command stream yourself /// instead of using a channel created for you. @@ -319,7 +310,7 @@ impl Gossip { } impl Inner { - pub fn subscribe_with_stream( + pub(crate) fn subscribe_with_stream( &self, topic_id: TopicId, options: JoinOptions, @@ -456,8 +447,8 @@ enum ToActor { struct Actor { /// Protocol state state: proto::State, - /// The endpoint through which we dial peers - endpoint: Endpoint, + /// Optional discovery to publish addresses to. + discovery: Option, /// Dial machine to connect to peers dialer: Dialer, /// Input messages to the actor @@ -484,13 +475,20 @@ impl Actor { fn new( endpoint: Endpoint, config: proto::Config, - my_addr: &AddrInfo, + discovery: Option, ) -> (Self, mpsc::Sender) { let peer_id = endpoint.node_id(); - let dialer = Dialer::new(endpoint.clone()); + let dialer = Dialer::new(endpoint); + + let initial_peer_data = discovery + .as_ref() + .and_then(|discovery| discovery.our_addr.as_ref()) + .and_then(|our_addr| our_addr.get()) + .map(|addr_info| encode_peer_data(&addr_info).unwrap()); + let state = proto::State::new( peer_id, - encode_peer_data(my_addr).unwrap(), + initial_peer_data, config, rand::rngs::StdRng::from_entropy(), ); @@ -498,7 +496,6 @@ impl Actor { let (in_event_tx, in_event_rx) = mpsc::channel(IN_EVENT_CAP); let actor = Actor { - endpoint, state, dialer, to_actor_rx, @@ -510,57 +507,53 @@ impl Actor { topics: Default::default(), quit_queue: Default::default(), connection_tasks: Default::default(), + discovery, }; (actor, to_actor_tx) } - pub async fn run(mut self) -> Result<(), Error> { - let (mut current_addresses, mut home_relay_stream, mut direct_addresses_stream) = - self.setup().await?; + #[cfg(test)] + fn endpoint(&self) -> &Endpoint { + &self.dialer.endpoint + } + #[cfg(test)] + fn node_id(&self) -> NodeId { + self.dialer.endpoint.node_id() + } + + pub async fn run(mut self) -> Result<(), Error> { let mut i = 0; - while let Some(()) = self - .event_loop( - &mut current_addresses, - &mut home_relay_stream, - &mut direct_addresses_stream, - i, - ) - .await? - { + let mut addr_update_stream = self.setup_addr_stream().await?; + while let Some(()) = self.event_loop(i, &mut addr_update_stream).await? { i += 1; } Ok(()) } - /// Performs the initial actor setup to run the [`Actor::event_loop`]. - /// - /// This updates our current address and return it. It also returns the home relay stream and - /// direct addr stream. - async fn setup( - &mut self, - ) -> Result< - ( - BTreeSet, - impl Stream + Unpin, - impl Stream> + Unpin, - ), - Error, - > { - // Watch for changes in direct addresses to update our peer data. - let direct_addresses_stream = self.endpoint.direct_addresses().stream().filter_map(|i| i); - // Watch for changes of our home relay to update our peer data. - let home_relay_stream = self.endpoint.home_relay().stream().filter_map(|i| i); - // With each gossip message we provide addressing information to reach our node. - let current_addresses = self.endpoint.direct_addresses().get()?.unwrap_or_default(); - - self.handle_addr_update(¤t_addresses).await?; - Ok(( - current_addresses, - home_relay_stream, - direct_addresses_stream, - )) + async fn setup_addr_stream(&mut self) -> Result, Error> { + match self.discovery.as_ref().and_then(|d| d.our_addr.as_ref()) { + Some(our_addr) => { + let watcher = our_addr.watch(); + let mut stream = watcher.stream().filter_map(|x| x).boxed(); + // We want to wait for our endpoint to be addressable by other nodes before launching gossip, + // because otherwise our Join messages, which will be forwarded into the swarm through a random + // walk, might not include an address to talk back to us. + // `Endpoint::node_addr` always waits for direct addresses to be available, which never completes + // when running as WASM in browser. Therefore, we instead race the futures that wait for the direct + // addresses or the home relay to be initialized, and construct our node address from that. + let Some(initial) = stream.next().await else { + return Err(anyhow::anyhow!( + "Failed to retrieve initial address from endpoint" + ) + .into()); + }; + self.handle_addr_update(initial).await?; + Ok(stream) + } + None => Ok(n0_future::stream::pending().boxed()), + } } /// One event loop processing step. @@ -568,10 +561,8 @@ impl Actor { /// None is returned when no further processing should be performed. async fn event_loop( &mut self, - current_addresses: &mut BTreeSet, - home_relay_stream: &mut (impl Stream + Unpin), - direct_addresses_stream: &mut (impl Stream> + Unpin), i: usize, + mut our_addr_updates: impl Stream + Unpin, ) -> Result, Error> { inc!(Metrics, actor_tick_main); tokio::select! { @@ -591,15 +582,11 @@ impl Actor { trace!(?i, "tick: command_rx"); self.handle_command(topic, key, command).await?; }, - Some(new_addresses) = direct_addresses_stream.next() => { - trace!(?i, "tick: new_endpoints"); + Some(addr_info) = our_addr_updates.next() => { + trace!(?i, "tick: new_addr_info"); inc!(Metrics, actor_tick_endpoint); - *current_addresses = new_addresses; - self.handle_addr_update(current_addresses).await?; - } - Some(_relay_url) = home_relay_stream.next() => { - trace!(?i, "tick: new_home_relay"); - self.handle_addr_update(current_addresses).await?; + tracing::info!("addr update {addr_info:?}"); + self.handle_addr_update(addr_info).await?; } (peer_id, res) = self.dialer.next_conn() => { trace!(?i, "tick: dialer"); @@ -613,12 +600,6 @@ impl Actor { Some(Err(err)) => { warn!(peer = %peer_id.fmt_short(), "dial failed: {err}"); inc!(Metrics, actor_tick_dialer_failure); - let peer_state = self.peers.get(&peer_id); - let is_active = matches!(peer_state, Some(PeerState::Active { .. })); - if !is_active { - self.handle_in_event(InEvent::PeerDisconnected(peer_id), Instant::now()) - .await?; - } } None => { warn!(peer = %peer_id.fmt_short(), "dial disconnected"); @@ -650,11 +631,8 @@ impl Actor { Ok(Some(())) } - async fn handle_addr_update( - &mut self, - current_addresses: &BTreeSet, - ) -> Result<(), Error> { - let peer_data = our_peer_data(&self.endpoint, current_addresses)?; + async fn handle_addr_update(&mut self, info: AddrInfo) -> Result<(), Error> { + let peer_data = encode_peer_data(&info)?; self.handle_in_event(InEvent::UpdatePeerData(peer_data), Instant::now()) .await } @@ -782,23 +760,16 @@ impl Actor { event_senders, command_rx_keys, } = state; - let mut sender_dead = false; if !neighbors.is_empty() { for neighbor in neighbors.iter() { - if let Err(_err) = channels + channels .event_tx - .send(Ok(Event::Gossip(GossipEvent::NeighborUp(*neighbor)))) - .await - { - sender_dead = true; - break; - } + .try_send(Ok(Event::Gossip(GossipEvent::NeighborUp(*neighbor)))) + .ok(); } } - if !sender_dead { - event_senders.push(channels.receiver_id, channels.event_tx); - } + event_senders.push(channels.receiver_id, channels.event_tx); let command_rx = TopicCommandStream::new(topic_id, channels.command_rx); let key = self.command_rx.insert(command_rx); command_rx_keys.insert(key); @@ -909,23 +880,26 @@ impl Actor { debug!(peer=%peer_id.fmt_short(), "gossip state indicates disconnect: drop peer"); self.peers.remove(&peer_id); } - OutEvent::PeerData(node_id, data) => match decode_peer_data(&data) { - Err(err) => warn!("Failed to decode {data:?} from {node_id}: {err}"), - Ok(info) => { - debug!(peer = ?node_id, "add known addrs: {info:?}"); - let node_addr = NodeAddr { - node_id, - relay_url: info.relay_url, - direct_addresses: info.direct_addresses, - }; - if let Err(err) = self - .endpoint - .add_node_addr_with_source(node_addr, SOURCE_NAME) - { - debug!(peer = ?node_id, "add known failed: {err:?}"); + OutEvent::PeerData(node_id, data) => { + if let Some(discovery) = &self.discovery { + match decode_peer_data(&data) { + Ok(Some(info)) => { + debug!(peer = %node_id.fmt_short(), "add addr info to discovery: {info:?}"); + let node_addr = NodeAddr { + node_id, + relay_url: info.relay_url, + direct_addresses: info.direct_addresses, + }; + discovery.add_node_addr(node_addr); + } + Err(err) => warn!( + "Failed to decode peer data from {}: {err}", + node_id.fmt_short() + ), + Ok(None) => {} } } - }, + } } } Ok(()) @@ -1093,12 +1067,21 @@ async fn connection_loop( res.0.context("send_loop").and(res.1.context("recv_loop")) } -#[derive(Default, Debug, Clone, Serialize, Deserialize)] +#[derive(Default, Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] struct AddrInfo { relay_url: Option, direct_addresses: BTreeSet, } +impl From<&NodeData> for AddrInfo { + fn from(value: &NodeData) -> Self { + Self { + relay_url: value.relay_url().cloned(), + direct_addresses: value.direct_addresses().clone(), + } + } +} + impl From for AddrInfo { fn from( NodeAddr { @@ -1123,13 +1106,13 @@ fn encode_peer_data(info: &AddrInfo) -> Result { Ok(PeerData::new(bytes)) } -fn decode_peer_data(peer_data: &PeerData) -> Result { - let bytes = peer_data.as_bytes(); - if bytes.is_empty() { - return Ok(AddrInfo::default()); +fn decode_peer_data(peer_data: &PeerData) -> Result, Error> { + if peer_data.is_empty() { + Ok(None) + } else { + let info = postcard::from_bytes(peer_data.as_bytes())?; + Ok(Some(info)) } - let info = postcard::from_bytes(bytes)?; - Ok(info) } #[derive(Debug, Default)] @@ -1231,16 +1214,6 @@ impl Stream for TopicCommandStream { } } -fn our_peer_data( - endpoint: &Endpoint, - direct_addresses: &BTreeSet, -) -> Result { - encode_peer_data(&AddrInfo { - relay_url: endpoint.home_relay().get().ok().flatten(), - direct_addresses: direct_addresses.iter().map(|x| x.addr).collect(), - }) -} - #[derive(Debug)] struct Dialer { endpoint: Endpoint, @@ -1316,7 +1289,9 @@ mod test { use bytes::Bytes; use futures_concurrency::future::TryJoin; use iroh::{protocol::Router, RelayMap, RelayMode, SecretKey}; + use n0_future::{FuturesOrdered, StreamExt}; use rand::Rng; + use testresult::TestResult; use tokio::{spawn, time::timeout}; use tokio_util::sync::CancellationToken; use tracing::{info, instrument}; @@ -1326,7 +1301,6 @@ mod test { struct ManualActorLoop { actor: Actor, - current_addresses: BTreeSet, step: usize, } @@ -1347,38 +1321,18 @@ mod test { type EndpointHandle = tokio::task::JoinHandle>; impl ManualActorLoop { - #[instrument(skip_all, fields(me = %actor.endpoint.node_id().fmt_short()))] - async fn new(mut actor: Actor) -> Result { - let (current_addresses, _, _) = actor.setup().await?; - let test_rig = Self { - actor, - current_addresses, - step: 0, - }; + #[instrument(skip_all, fields(me = %actor.node_id().fmt_short()))] + async fn new(actor: Actor) -> Result { + let test_rig = Self { actor, step: 0 }; Ok(test_rig) } - #[instrument(skip_all, fields(me = %self.endpoint.node_id().fmt_short()))] + #[instrument(skip_all, fields(me = %self.node_id().fmt_short()))] async fn step(&mut self) -> Result, Error> { - let ManualActorLoop { - actor, - current_addresses, - step, - } = self; + let ManualActorLoop { actor, step } = self; *step += 1; - // ignore updates that change our published address. This gives us better control over - // events since the endpoint it no longer emitting changes - let home_relay_stream = &mut futures_lite::stream::pending(); - let direct_addresses_stream = &mut futures_lite::stream::pending(); - actor - .event_loop( - current_addresses, - home_relay_stream, - direct_addresses_stream, - *step, - ) - .await + actor.event_loop(*step, n0_future::stream::pending()).await } async fn steps(&mut self, n: usize) -> Result<(), Error> { @@ -1407,13 +1361,9 @@ mod test { relay_map: RelayMap, cancel: &CancellationToken, ) -> Result<(Self, Actor, EndpointHandle), Error> { - let my_addr = AddrInfo { - relay_url: relay_map.nodes().next().map(|relay| relay.url.clone()), - direct_addresses: Default::default(), - }; let endpoint = create_endpoint(rng, relay_map).await?; - let (actor, to_actor_tx) = Actor::new(endpoint, config, &my_addr); + let (actor, to_actor_tx) = Actor::new(endpoint, config, None); let max_message_size = actor.state.max_message_size(); let _actor_handle = @@ -1431,7 +1381,7 @@ mod test { }; let endpoing_task = task::spawn(endpoint_loop( - actor.endpoint.clone(), + actor.endpoint().clone(), gossip.clone(), cancel.child_token(), )); @@ -1448,7 +1398,7 @@ mod test { ) -> Result<(Self, Endpoint, EndpointHandle, impl Drop), Error> { let (g, actor, ep_handle) = Gossip::t_new_with_actor(rng, config, relay_map, cancel).await?; - let ep = actor.endpoint.clone(); + let ep = actor.endpoint().clone(); let me = ep.node_id().fmt_short(); let actor_handle = task::spawn( async move { @@ -1517,9 +1467,9 @@ mod test { let ep2 = create_endpoint(&mut rng, relay_map.clone()).await.unwrap(); let ep3 = create_endpoint(&mut rng, relay_map.clone()).await.unwrap(); - let go1 = Gossip::builder().spawn(ep1.clone()).await.unwrap(); - let go2 = Gossip::builder().spawn(ep2.clone()).await.unwrap(); - let go3 = Gossip::builder().spawn(ep3.clone()).await.unwrap(); + let go1 = Gossip::builder().spawn(ep1.clone()); + let go2 = Gossip::builder().spawn(ep2.clone()); + let go3 = Gossip::builder().spawn(ep3.clone()); debug!("peer1 {:?}", ep1.node_id()); debug!("peer2 {:?}", ep2.node_id()); debug!("peer3 {:?}", ep3.node_id()); @@ -1650,7 +1600,7 @@ mod test { let (go2, ep2, ep2_handle, _test_actor_handle) = Gossip::t_new(rng, Default::default(), relay_map, &ct).await?; - let node_id1 = actor.endpoint.node_id(); + let node_id1 = actor.node_id(); let node_id2 = ep2.node_id(); tracing::info!( node_1 = node_id1.fmt_short(), @@ -1696,7 +1646,7 @@ mod test { // first node let addr2 = NodeAddr::new(node_id2).with_relay_url(relay_url); - actor.endpoint.add_node_addr(addr2)?; + actor.endpoint().add_node_addr(addr2)?; // we use a channel to signal advancing steps to the task let (tx, mut rx) = mpsc::channel::<()>(1); let ct1 = ct.clone(); @@ -1875,7 +1825,7 @@ mod test { .insecure_skip_relay_cert_verify(true) .bind() .await?; - let gossip = Gossip::builder().spawn(ep.clone()).await?; + let gossip = Gossip::builder().spawn(ep.clone()); let router = Router::builder(ep.clone()) .accept(GOSSIP_ALPN, gossip.clone()) .spawn() @@ -1902,7 +1852,7 @@ mod test { } let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap(); - let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1); + let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(183187); let topic_id = TopicId::from_bytes(rng.gen()); // spawn a gossip node, send the node's address on addr_tx, @@ -1950,6 +1900,7 @@ mod test { assert_eq!(&msg, "msg1"); info!("kill broadcast node"); cancel.cancel(); + assert!(join_handle_1.join().unwrap().is_none()); // spawns the node again with the same node id, and send another message let cancel = CancellationToken::new(); @@ -1969,11 +1920,111 @@ mod test { assert_eq!(&msg, "msg2"); info!("kill broadcast node"); cancel.cancel(); + assert!(join_handle_2.join().unwrap().is_none()); info!("kill recv node"); recv_task.abort(); - assert!(join_handle_1.join().unwrap().is_none()); - assert!(join_handle_2.join().unwrap().is_none()); + + Ok(()) + } + + #[tokio::test] + #[traced_test] + async fn gossip_discovery() -> TestResult { + /// Spawns a new endpoint and gossip instance. + async fn spawn_gossip( + secret_key: SecretKey, + relay_map: RelayMap, + use_discovery: bool, + ) -> anyhow::Result<(Router, Gossip)> { + let discovery = use_discovery.then(GossipDiscovery::default); + let mut ep_builder = Endpoint::builder() + .secret_key(secret_key) + .relay_mode(RelayMode::Custom(relay_map)) + .insecure_skip_relay_cert_verify(true); + if let Some(discovery) = &discovery { + ep_builder = ep_builder.discovery(Box::new(discovery.clone())); + } + let ep = ep_builder.bind().await?; + let mut gossip_builder = Gossip::builder(); + if let Some(discovery) = discovery { + gossip_builder = gossip_builder.use_gossip_for_discovery(discovery) + } + let gossip = gossip_builder.spawn(ep.clone()); + let router = Router::builder(ep.clone()) + .accept(GOSSIP_ALPN, gossip.clone()) + .spawn() + .await?; + Ok((router, gossip)) + } + + let (relay_map, relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap(); + let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(5); + let topic_id = TopicId::from_bytes(rng.gen()); + + let use_discovery_steps = [false, true]; + for use_discovery in use_discovery_steps { + // create 3 gossip instances + let (routers, gossips): (Vec<_>, Vec<_>) = + FuturesOrdered::from_iter((0..3).map(|_i| { + let secret_key = SecretKey::generate(&mut rng); + spawn_gossip(secret_key, relay_map.clone(), use_discovery) + })) + .try_collect::<_, _, Vec<_>>() + .await? + .into_iter() + .unzip(); + + let node_ids: Vec<_> = routers.iter().map(|r| r.endpoint().node_id()).collect(); + let node_addrs: Vec<_> = node_ids + .iter() + .map(|node_id| NodeAddr::new(*node_id).with_relay_url(relay_url.clone())) + .collect(); + + // connect 1 to 2 and 2 to 3 + routers[1].endpoint().add_node_addr(node_addrs[0].clone())?; + routers[2].endpoint().add_node_addr(node_addrs[0].clone())?; + // note: 1 does not know about 3, and 3 does not know about 1 + + let topics = vec![ + gossips[0].subscribe(topic_id, vec![])?, + gossips[1].subscribe(topic_id, vec![node_ids[0]])?, + gossips[2].subscribe(topic_id, vec![node_ids[0]])?, + ]; + + let futs = topics.into_iter().enumerate().map(|(i, mut topic)| { + async move { + // this is the heart of the test. + let expected_neighbors = match (i, use_discovery) { + (0, _) => 2, + (1 | 2, false) => 1, + (1 | 2, true) => 2, + _ => unreachable!(), + }; + let fut = async { + loop { + let event = topic.next().await; + assert!( + matches!( + event, + Some(Ok(Event::Gossip(GossipEvent::NeighborUp(_)))) + ), + "unexpected event on node {i}: {event:?}" + ); + } + }; + // run for 2s + assert!(n0_future::time::timeout(Duration::from_secs(2), fut) + .await + .is_err()); + assert_eq!(topic.neighbors().count(), expected_neighbors); + topic + } + }); + let fut = FuturesOrdered::from_iter(futs).collect::>(); + let topics = n0_future::time::timeout(Duration::from_secs(3), fut).await?; + drop(topics); + } Ok(()) } diff --git a/src/net/discovery.rs b/src/net/discovery.rs new file mode 100644 index 00000000..e321613c --- /dev/null +++ b/src/net/discovery.rs @@ -0,0 +1,74 @@ +use anyhow::Result; +use iroh::{ + discovery::{static_provider::StaticProvider, Discovery, DiscoveryItem}, + watchable::Watchable, + NodeAddr, +}; +use n0_future::boxed::BoxStream; + +use super::AddrInfo; + +#[derive(Debug, Clone)] +pub struct GossipDiscovery { + pub(super) inner: StaticProvider, + pub(super) our_addr: Option>>, +} + +impl Default for GossipDiscovery { + fn default() -> Self { + Self::new() + } +} + +impl GossipDiscovery { + /// Create a new [`GossipDiscovery`] that publishes our address with our join messages. + /// + /// By adding this to both an [`Endpoint`] and a [`Gossip`], this will both make address info + /// retrieved from other nodes available to the endpoint, and make our address info available to + /// other nodes. + /// + /// [`Endpoint`]: iroh::Endpoint + /// [`Gossip`]: crate::net::Gossip + pub fn new() -> Self { + Self { + inner: Default::default(), + our_addr: Some(Default::default()), + } + } + + /// Create a new [`GossipDiscovery`] that does not publish our address with our join messages. + /// + /// By adding this to both an [`Endpoint`] and a [`Gossip`], this will make address info + /// retrieved from other nodes available to the endpoint, but does not publish our address + /// info to other nodes. + /// + /// [`Endpoint`]: iroh::Endpoint + /// [`Gossip`]: crate::net::Gossip + pub fn without_publish() -> Self { + Self { + inner: Default::default(), + our_addr: None, + } + } + + pub(super) fn add_node_addr(&self, addr: NodeAddr) { + self.inner.add_node_info(addr); + } +} + +impl Discovery for GossipDiscovery { + fn publish(&self, data: &iroh::node_info::NodeData) { + if let Some(watchable) = self.our_addr.as_ref() { + tracing::info!("PUBLISH {data:?}"); + watchable.set(Some(data.into())).ok(); + } + } + + fn resolve( + &self, + endpoint: iroh::Endpoint, + node_id: iroh::NodeId, + ) -> Option>> { + self.inner.resolve(endpoint, node_id) + } +} diff --git a/src/net/handles.rs b/src/net/handles.rs index adf82bea..7268c974 100644 --- a/src/net/handles.rs +++ b/src/net/handles.rs @@ -92,6 +92,11 @@ impl GossipTopic { pub fn is_joined(&self) -> bool { self.receiver.is_joined() } + + /// Returns an iterator of the current neighbors in the gossip swarm. + pub fn neighbors(&self) -> impl Iterator + '_ { + self.receiver.neighbors() + } } impl Stream for GossipTopic { diff --git a/src/proto.rs b/src/proto.rs index e730ce75..eda78d72 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -101,6 +101,11 @@ impl PeerData { pub fn as_bytes(&self) -> &[u8] { &self.0 } + + /// Returns true if the peer data is empty. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } } /// PeerInfo contains a peer's identifier and the opaque peer data as provided by the implementer. diff --git a/src/proto/state.rs b/src/proto/state.rs index 1cf66fcf..bb95ec2b 100644 --- a/src/proto/state.rs +++ b/src/proto/state.rs @@ -134,7 +134,7 @@ impl From> for InEventMapped { #[derive(Debug)] pub struct State { me: PI, - me_data: PeerData, + me_data: Option, config: Config, rng: R, states: HashMap>, @@ -149,7 +149,7 @@ impl State { /// (which can be updated over time). /// For the protocol to perform as recommended in the papers, the [`Config`] should be /// identical for all nodes in the network. - pub fn new(me: PI, me_data: PeerData, config: Config, rng: R) -> Self { + pub fn new(me: PI, me_data: Option, config: Config, rng: R) -> Self { Self { me, me_data, @@ -219,7 +219,7 @@ impl State { if let hash_map::Entry::Vacant(e) = self.states.entry(topic) { e.insert(topic::State::with_rng( self.me, - Some(self.me_data.clone()), + self.me_data.clone(), self.config.clone(), self.rng.clone(), )); @@ -250,7 +250,7 @@ impl State { // when a peer disconnected on the network level, forward event to all states InEventMapped::All(event) => { if let topic::InEvent::UpdatePeerData(data) = &event { - self.me_data = data.clone(); + self.me_data = Some(data.clone()); } for (topic, state) in self.states.iter_mut() { let out = state.handle(event.clone(), now);