@@ -26,7 +26,7 @@ use n0_future::{
26
26
use rand:: rngs:: StdRng ;
27
27
use rand_core:: SeedableRng ;
28
28
use serde:: { Deserialize , Serialize } ;
29
- use tokio:: sync:: mpsc;
29
+ use tokio:: sync:: { mpsc, oneshot } ;
30
30
use tokio_util:: sync:: CancellationToken ;
31
31
use tracing:: { debug, error, error_span, trace, warn, Instrument } ;
32
32
@@ -162,6 +162,15 @@ impl ProtocolHandler for Gossip {
162
162
Ok ( ( ) )
163
163
} )
164
164
}
165
+
166
+ fn shutdown ( & self ) -> BoxFuture < ( ) > {
167
+ let this = self . clone ( ) ;
168
+ Box :: pin ( async move {
169
+ if let Err ( err) = this. shutdown ( ) . await {
170
+ warn ! ( "error while shutting down gossip: {err:#}" ) ;
171
+ }
172
+ } )
173
+ }
165
174
}
166
175
167
176
/// Builder to configure and construct [`Gossip`].
@@ -323,6 +332,17 @@ impl Gossip {
323
332
pub fn metrics ( & self ) -> & Arc < Metrics > {
324
333
& self . inner . metrics
325
334
}
335
+
336
+ /// Shutdown the gossip instance.
337
+ ///
338
+ /// This leaves all topics, sending `Disconnect` messages to peers, and then
339
+ /// stops the gossip actor loop and drops all state and connections.
340
+ pub async fn shutdown ( & self ) -> anyhow:: Result < ( ) > {
341
+ let ( reply, reply_rx) = oneshot:: channel ( ) ;
342
+ self . inner . send ( ToActor :: Shutdown { reply } ) . await ?;
343
+ reply_rx. await ?;
344
+ Ok ( ( ) )
345
+ }
326
346
}
327
347
328
348
impl Inner {
@@ -457,6 +477,9 @@ enum ToActor {
457
477
topic : TopicId ,
458
478
receiver_id : ReceiverId ,
459
479
} ,
480
+ Shutdown {
481
+ reply : oneshot:: Sender < ( ) > ,
482
+ } ,
460
483
}
461
484
462
485
/// Actor that sends and handles messages between the connection and main state loops
@@ -590,7 +613,18 @@ impl Actor {
590
613
trace!( ?i, "tick: to_actor_rx" ) ;
591
614
self . metrics. actor_tick_rx. inc( ) ;
592
615
match msg {
593
- Some ( msg) => self . handle_to_actor_msg( msg, Instant :: now( ) ) . await ?,
616
+ Some ( msg) => {
617
+ if let ToActor :: Shutdown { reply } = msg {
618
+ debug!( "received shutdown message, quit all topics" ) ;
619
+ self . quit_queue. extend( self . topics. keys( ) . copied( ) ) ;
620
+ self . process_quit_queue( ) . await . ok( ) ;
621
+ debug!( "all topics quit, stop gossip actor" ) ;
622
+ reply. send( ( ) ) . ok( ) ;
623
+ return Ok ( None )
624
+ } else {
625
+ self . handle_to_actor_msg( msg, Instant :: now( ) ) . await ?;
626
+ }
627
+ }
594
628
None => {
595
629
debug!( "all gossip handles dropped, stop gossip actor" ) ;
596
630
return Ok ( None )
@@ -818,6 +852,7 @@ impl Actor {
818
852
ToActor :: ReceiverGone { topic, receiver_id } => {
819
853
self . handle_receiver_gone ( topic, receiver_id) . await ?;
820
854
}
855
+ ToActor :: Shutdown { .. } => unreachable ! ( "handled in main loop" ) ,
821
856
}
822
857
Ok ( ( ) )
823
858
}
0 commit comments