diff --git a/Cargo.lock b/Cargo.lock index 88da4393..8c754a17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -170,18 +170,6 @@ dependencies = [ "syn 2.0.90", ] -[[package]] -name = "async-channel" -version = "2.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" -dependencies = [ - "concurrent-queue", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-recursion" version = "1.1.1" @@ -896,27 +884,6 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a02a5d186d7bf1cb21f1f95e1a9cfa5c1f2dcd803a47aad454423ceec13525c5" -[[package]] -name = "event-listener" -version = "5.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" -dependencies = [ - "event-listener", - "pin-project-lite", -] - [[package]] name = "fallible-iterator" version = "0.3.0" @@ -1811,7 +1778,6 @@ name = "iroh-gossip" version = "0.30.0" dependencies = [ "anyhow", - "async-channel", "bytes", "clap", "data-encoding", @@ -1838,6 +1804,7 @@ dependencies = [ "strum", "testresult", "tokio", + "tokio-stream", "tokio-util", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 516a49d4..31846292 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,6 @@ unused-async = "warn" [dependencies] anyhow = { version = "1" } -async-channel = { version = "2.3.1", optional = true } blake3 = { package = "iroh-blake3", version = "1.4.5"} bytes = { version = "1.7", features = ["serde"] } derive_more = { version = "1.0.0", features = ["add", "debug", "deref", "display", "from", "try_into", "into"] } @@ -51,6 +50,7 @@ tokio-util = { version = "0.7.12", optional = true, features = ["codec", "rt"] } tracing = "0.1" data-encoding = { version = "2.6.0", optional = true } hex = "0.4.3" +tokio-stream = { version = "0.1.17", optional = true } # rpc dependencies (optional) nested_enum_utils = { version = "0.1.0", optional = true } @@ -76,9 +76,9 @@ net = [ "dep:iroh", "dep:tokio", "dep:tokio-util", - "dep:async-channel", "dep:futures-util", - "dep:futures-concurrency" + "dep:futures-concurrency", + "dep:tokio-stream", ] rpc = [ "dep:nested_enum_utils", @@ -92,6 +92,7 @@ cli = [ ] examples = ["net", "dep:data-encoding"] +tokio-stream = ["dep:tokio-stream"] [[example]] name = "chat" diff --git a/src/net.rs b/src/net.rs index f066bb32..7c522940 100644 --- a/src/net.rs +++ b/src/net.rs @@ -13,7 +13,6 @@ use anyhow::{anyhow, Context as _, Result}; use bytes::BytesMut; use futures_concurrency::stream::{stream_group, StreamGroup}; use futures_lite::{future::Boxed as BoxedFuture, stream::Stream, StreamExt}; -use futures_util::TryFutureExt; use iroh::{ endpoint::{get_remote_node_id, Connecting, Connection, DirectAddr}, protocol::ProtocolHandler, @@ -24,6 +23,7 @@ use rand::rngs::StdRng; use rand_core::SeedableRng; use serde::{Deserialize, Serialize}; use tokio::{sync::mpsc, task::JoinSet}; +use tokio_stream::wrappers::ReceiverStream; use tokio_util::{sync::CancellationToken, task::AbortOnDropHandle}; use tracing::{debug, error, error_span, trace, warn, Instrument}; @@ -190,7 +190,9 @@ impl Gossip { topic_id: TopicId, bootstrap: Vec, ) -> Result { - let mut sub = self.subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap)); + let mut sub = self + .subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap)) + .await?; sub.joined().await?; Ok(sub) } @@ -198,8 +200,14 @@ impl Gossip { /// Join a gossip topic with the default options. /// /// Note that this will not wait for any bootstrap node to be available. To ensure the topic is connected to at least one node, use [`GossipTopic::joined`] or [`Gossip::subscribe_and_join`] - pub fn subscribe(&self, topic_id: TopicId, bootstrap: Vec) -> Result { - let sub = self.subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap)); + pub async fn subscribe( + &self, + topic_id: TopicId, + bootstrap: Vec, + ) -> Result { + let sub = self + .subscribe_with_opts(topic_id, JoinOptions::with_bootstrap(bootstrap)) + .await?; Ok(sub) } @@ -211,11 +219,17 @@ impl Gossip { /// /// Messages will be queued until a first connection is available. If the internal channel becomes full, /// the oldest messages will be dropped from the channel. - pub fn subscribe_with_opts(&self, topic_id: TopicId, opts: JoinOptions) -> GossipTopic { - let (command_tx, command_rx) = async_channel::bounded(TOPIC_COMMANDS_DEFAULT_CAP); - let command_rx: CommandStream = Box::pin(command_rx); - let event_rx = self.subscribe_with_stream(topic_id, opts, command_rx); - GossipTopic::new(command_tx, event_rx) + pub async fn subscribe_with_opts( + &self, + topic_id: TopicId, + opts: JoinOptions, + ) -> Result { + let (command_tx, command_rx) = mpsc::channel(TOPIC_COMMANDS_DEFAULT_CAP); + let command_rx: CommandStream = Box::pin(ReceiverStream::new(command_rx)); + let event_rx = self + .subscribe_with_stream(topic_id, opts, command_rx) + .await?; + Ok(GossipTopic::new(command_tx, event_rx)) } /// Join a gossip topic with options and an externally-created update stream. @@ -225,24 +239,26 @@ impl Gossip { /// /// It returns a stream of events. If you want to wait for the topic to become active, wait for /// the [`GossipEvent::Joined`] event. - pub fn subscribe_with_stream( + pub async fn subscribe_with_stream( &self, topic_id: TopicId, options: JoinOptions, updates: CommandStream, - ) -> EventStream { - self.inner.subscribe_with_stream(topic_id, options, updates) + ) -> Result { + self.inner + .subscribe_with_stream(topic_id, options, updates) + .await } } impl Inner { - pub fn subscribe_with_stream( + pub async fn subscribe_with_stream( &self, topic_id: TopicId, options: JoinOptions, updates: CommandStream, - ) -> EventStream { - let (event_tx, event_rx) = async_channel::bounded(options.subscription_capacity); + ) -> Result { + let (event_tx, event_rx) = mpsc::channel(options.subscription_capacity); let to_actor_tx = self.to_actor_tx.clone(); let receiver_id = ReceiverId( self.next_receiver_id @@ -253,33 +269,22 @@ impl Inner { command_rx: updates, event_tx, }; - // We spawn a task to send the subscribe action to the actor, because we want the send to - // succeed even if the returned stream is dropped right away without being polled, because - // it is legit to keep only the `updates` stream and drop the event stream. This situation - // is handled fine within the actor, but we have to make sure that the message reaches the - // actor. - let task = tokio::task::spawn(async move { - to_actor_tx - .send(ToActor::Join { - topic_id, - bootstrap: options.bootstrap, - channels, - }) - .await - .map_err(|_| anyhow!("Gossip actor dropped")) - }); - let stream = async move { - task.await - .map_err(|err| anyhow!("Task for sending to gossip actor failed: {err:?}"))??; - Ok(event_rx) - } - .try_flatten_stream(); - EventStream { - inner: Box::pin(stream), + + to_actor_tx + .send(ToActor::Join { + topic_id, + bootstrap: options.bootstrap, + channels, + }) + .await + .map_err(|_| anyhow!("Gossip actor dropped"))?; + + Ok(EventStream { + inner: ReceiverStream::new(event_rx), to_actor_tx: self.to_actor_tx.clone(), topic: topic_id, receiver_id, - } + }) } async fn send(&self, event: ToActor) -> anyhow::Result<()> { @@ -298,11 +303,10 @@ impl Inner { } /// Stream of events for a topic. -#[derive(derive_more::Debug)] +#[derive(Debug)] pub struct EventStream { /// The actual stream polled to return [`Event`]s to the application. - #[debug("Stream")] - inner: Pin> + Send + 'static>>, + inner: ReceiverStream>, /// Channel to the actor task. /// @@ -323,7 +327,7 @@ impl Stream for EventStream { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.inner.poll_next(cx) + Pin::new(&mut self.inner).poll_next(cx) } } @@ -930,7 +934,7 @@ enum ConnOrigin { struct SubscriberChannels { /// Id for the receiver counter part of [`Self::event_tx`]. receiver_id: ReceiverId, - event_tx: async_channel::Sender>, + event_tx: mpsc::Sender>, #[debug("CommandStream")] command_rx: CommandStream, } @@ -1028,7 +1032,7 @@ struct EventSenders { /// Channels to communicate [`Event`] to [`EventStream`]s. /// /// This is indexed by receiver id. The boolean indicates a lagged channel ([`Event::Lagged`]). - senders: HashMap>, bool)>, + senders: HashMap>, bool)>, } /// Id for a gossip receiver. @@ -1042,7 +1046,7 @@ impl EventSenders { self.senders.is_empty() } - fn push(&mut self, id: ReceiverId, sender: async_channel::Sender>) { + fn push(&mut self, id: ReceiverId, sender: mpsc::Sender>) { self.senders.insert(id, (sender, false)); } @@ -1050,19 +1054,16 @@ impl EventSenders { /// /// This will not wait until the sink is full, but send a `Lagged` response if the sink is almost full. fn send(&mut self, event: &GossipEvent) { - let mut remove = Vec::new(); - for (&id, (send, lagged)) in self.senders.iter_mut() { + self.senders.retain(|_id, (send, lagged)| { // If the stream is disconnected, we don't need to send to it. if send.is_closed() { - remove.push(id); - continue; + return false; } // Check if the send buffer is almost full, and send a lagged response if it is. - let cap = send.capacity().expect("we only use bounded channels"); - let event = if send.len() >= cap - 1 { + let event = if send.capacity() <= 1 { if *lagged { - continue; + return true; } *lagged = true; Event::Lagged @@ -1071,14 +1072,15 @@ impl EventSenders { Event::Gossip(event.clone()) }; - if let Err(async_channel::TrySendError::Closed(_)) = send.try_send(Ok(event)) { - remove.push(id); + match send.try_send(Ok(event)) { + Ok(()) => true, + Err(mpsc::error::TrySendError::Full(_)) => { + *lagged = true; + true + } + Err(mpsc::error::TrySendError::Closed(_)) => false, } - } - - for id in remove.into_iter() { - self.senders.remove(&id); - } + }); } /// Removes a sender based on the corresponding receiver's id. @@ -1676,7 +1678,7 @@ mod test { let addr1 = NodeAddr::new(node_id1).with_relay_url(relay_url.clone()); ep2.add_node_addr(addr1)?; let go2_task = async move { - let mut sub = go2.subscribe(topic, Vec::new())?; + let mut sub = go2.subscribe(topic, Vec::new()).await?; sub.joined().await?; rx.recv().await.expect("signal to unsubscribe"); @@ -1685,7 +1687,7 @@ mod test { rx.recv().await.expect("signal to subscribe again"); tracing::info!("resubscribing"); - let mut sub = go2.subscribe(topic, vec![node_id1])?; + let mut sub = go2.subscribe(topic, vec![node_id1]).await?; sub.joined().await?; tracing::info!("subscription successful!"); @@ -1700,7 +1702,7 @@ mod test { let addr2 = NodeAddr::new(node_id2).with_relay_url(relay_url); ep1.add_node_addr(addr2)?; - let mut sub = go1.subscribe(topic, vec![node_id2])?; + let mut sub = go1.subscribe(topic, vec![node_id2]).await?; // wait for subscribed notification sub.joined().await?; diff --git a/src/net/handles.rs b/src/net/handles.rs index 53b24d0b..460e1fbf 100644 --- a/src/net/handles.rs +++ b/src/net/handles.rs @@ -13,16 +13,17 @@ use bytes::Bytes; use futures_lite::{Stream, StreamExt}; use iroh::NodeId; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; use super::EventStream; use crate::{net::TOPIC_EVENTS_DEFAULT_CAP, proto::DeliveryScope}; /// Sender for a gossip topic. #[derive(Debug)] -pub struct GossipSender(async_channel::Sender); +pub struct GossipSender(mpsc::Sender); impl GossipSender { - pub(crate) fn new(sender: async_channel::Sender) -> Self { + pub(crate) fn new(sender: mpsc::Sender) -> Self { Self(sender) } @@ -63,7 +64,7 @@ pub struct GossipTopic { } impl GossipTopic { - pub(crate) fn new(sender: async_channel::Sender, receiver: EventStream) -> Self { + pub(crate) fn new(sender: mpsc::Sender, receiver: EventStream) -> Self { Self { sender: GossipSender::new(sender), receiver: GossipReceiver::new(receiver), @@ -99,7 +100,7 @@ impl GossipTopic { impl Stream for GossipTopic { type Item = Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.receiver).poll_next(cx) + self.receiver.stream.inner.poll_next(cx) } } @@ -157,8 +158,9 @@ impl GossipReceiver { impl Stream for GossipReceiver { type Item = Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let item = std::task::ready!(Pin::new(&mut self.stream).poll_next(cx)); + let item = std::task::ready!(self.stream.poll_next(cx)); if let Some(Ok(item)) = &item { match item { Event::Gossip(GossipEvent::Joined(neighbors)) => {