Skip to content

Commit b4abbd4

Browse files
committed
feat(net): Add shutdown function
1 parent 6f2656f commit b4abbd4

File tree

1 file changed

+36
-2
lines changed

1 file changed

+36
-2
lines changed

src/net.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use n0_future::{
2626
use rand::rngs::StdRng;
2727
use rand_core::SeedableRng;
2828
use serde::{Deserialize, Serialize};
29-
use tokio::sync::mpsc;
29+
use tokio::sync::{mpsc, oneshot};
3030
use tokio_util::sync::CancellationToken;
3131
use tracing::{debug, error, error_span, trace, warn, Instrument};
3232

@@ -162,6 +162,15 @@ impl ProtocolHandler for Gossip {
162162
Ok(())
163163
})
164164
}
165+
166+
fn shutdown(&self) -> BoxFuture<()> {
167+
let this = self.clone();
168+
Box::pin(async move {
169+
if let Err(err) = this.shutdown().await {
170+
warn!("error while shutting down gossip: {err:#}");
171+
}
172+
})
173+
}
165174
}
166175

167176
/// Builder to configure and construct [`Gossip`].
@@ -323,6 +332,17 @@ impl Gossip {
323332
pub fn metrics(&self) -> &Arc<Metrics> {
324333
&self.inner.metrics
325334
}
335+
336+
/// Shutdown the gossip instance.
337+
///
338+
/// This leaves all topics, sending `Disconnect` messages to peers, and then
339+
/// stops the gossip actor loop and drops all state and connections.
340+
pub async fn shutdown(&self) -> anyhow::Result<()> {
341+
let (reply, reply_rx) = oneshot::channel();
342+
self.inner.send(ToActor::Shutdown { reply }).await?;
343+
reply_rx.await?;
344+
Ok(())
345+
}
326346
}
327347

328348
impl Inner {
@@ -457,6 +477,9 @@ enum ToActor {
457477
topic: TopicId,
458478
receiver_id: ReceiverId,
459479
},
480+
Shutdown {
481+
reply: oneshot::Sender<()>,
482+
},
460483
}
461484

462485
/// Actor that sends and handles messages between the connection and main state loops
@@ -590,7 +613,17 @@ impl Actor {
590613
trace!(?i, "tick: to_actor_rx");
591614
self.metrics.actor_tick_rx.inc();
592615
match msg {
593-
Some(msg) => self.handle_to_actor_msg(msg, Instant::now()).await?,
616+
Some(ToActor::Shutdown { reply }) => {
617+
debug!("received shutdown message, quit all topics");
618+
self.quit_queue.extend(self.topics.keys().copied());
619+
self.process_quit_queue().await.ok();
620+
debug!("all topics quit, stop gossip actor");
621+
reply.send(()).ok();
622+
return Ok(None)
623+
},
624+
Some(msg) => {
625+
self.handle_to_actor_msg(msg, Instant::now()).await?;
626+
}
594627
None => {
595628
debug!("all gossip handles dropped, stop gossip actor");
596629
return Ok(None)
@@ -818,6 +851,7 @@ impl Actor {
818851
ToActor::ReceiverGone { topic, receiver_id } => {
819852
self.handle_receiver_gone(topic, receiver_id).await?;
820853
}
854+
ToActor::Shutdown { .. } => unreachable!("handled in main loop"),
821855
}
822856
Ok(())
823857
}

0 commit comments

Comments
 (0)