@@ -27,7 +27,7 @@ use n0_future::{
27
27
use rand:: rngs:: StdRng ;
28
28
use rand_core:: SeedableRng ;
29
29
use serde:: { Deserialize , Serialize } ;
30
- use tokio:: sync:: mpsc;
30
+ use tokio:: sync:: { mpsc, oneshot } ;
31
31
use tokio_util:: sync:: CancellationToken ;
32
32
use tracing:: { debug, error, error_span, trace, warn, Instrument } ;
33
33
@@ -162,6 +162,15 @@ impl ProtocolHandler for Gossip {
162
162
Ok ( ( ) )
163
163
} )
164
164
}
165
+
166
+ fn shutdown ( & self ) -> BoxFuture < ( ) > {
167
+ let this = self . clone ( ) ;
168
+ Box :: pin ( async move {
169
+ if let Err ( err) = this. shutdown ( ) . await {
170
+ warn ! ( "error while shutting down gossip: {err:#}" ) ;
171
+ }
172
+ } )
173
+ }
165
174
}
166
175
167
176
/// Builder to configure and construct [`Gossip`].
@@ -316,6 +325,17 @@ impl Gossip {
316
325
) -> EventStream {
317
326
self . inner . subscribe_with_stream ( topic_id, options, updates)
318
327
}
328
+
329
+ /// Shutdown the gossip instance.
330
+ ///
331
+ /// This leaves all topics, sending `Disconnect` messages to peers, and then
332
+ /// stops the gossip actor loop and drops all state and connections.
333
+ pub async fn shutdown ( & self ) -> anyhow:: Result < ( ) > {
334
+ let ( reply, reply_rx) = oneshot:: channel ( ) ;
335
+ self . inner . send ( ToActor :: Shutdown { reply } ) . await ?;
336
+ reply_rx. await ?;
337
+ Ok ( ( ) )
338
+ }
319
339
}
320
340
321
341
impl Inner {
@@ -450,6 +470,9 @@ enum ToActor {
450
470
topic : TopicId ,
451
471
receiver_id : ReceiverId ,
452
472
} ,
473
+ Shutdown {
474
+ reply : oneshot:: Sender < ( ) > ,
475
+ } ,
453
476
}
454
477
455
478
/// Actor that sends and handles messages between the connection and main state loops
@@ -580,7 +603,18 @@ impl Actor {
580
603
trace!( ?i, "tick: to_actor_rx" ) ;
581
604
inc!( Metrics , actor_tick_rx) ;
582
605
match msg {
583
- Some ( msg) => self . handle_to_actor_msg( msg, Instant :: now( ) ) . await ?,
606
+ Some ( msg) => {
607
+ if let ToActor :: Shutdown { reply } = msg {
608
+ debug!( "received shutdown message, quit all topics" ) ;
609
+ self . quit_queue. extend( self . topics. keys( ) . copied( ) ) ;
610
+ self . process_quit_queue( ) . await . ok( ) ;
611
+ debug!( "all topics quit, stop gossip actor" ) ;
612
+ reply. send( ( ) ) . ok( ) ;
613
+ return Ok ( None )
614
+ } else {
615
+ self . handle_to_actor_msg( msg, Instant :: now( ) ) . await ?;
616
+ }
617
+ }
584
618
None => {
585
619
debug!( "all gossip handles dropped, stop gossip actor" ) ;
586
620
return Ok ( None )
@@ -808,6 +842,7 @@ impl Actor {
808
842
ToActor :: ReceiverGone { topic, receiver_id } => {
809
843
self . handle_receiver_gone ( topic, receiver_id) . await ?;
810
844
}
845
+ ToActor :: Shutdown { .. } => unreachable ! ( "handled in main loop" ) ,
811
846
}
812
847
Ok ( ( ) )
813
848
}
0 commit comments