11use std:: collections:: HashMap ;
22use std:: io:: { BufRead , BufReader , Write } ;
33use std:: net:: { Shutdown , SocketAddr , TcpListener , TcpStream } ;
4- use std:: sync:: mpsc:: { Sender , SyncSender , TrySendError } ;
4+ use std:: sync:: atomic:: AtomicBool ;
5+ use std:: sync:: mpsc:: { Receiver , Sender } ;
56use std:: sync:: { Arc , Mutex } ;
67use std:: thread;
78
@@ -100,6 +101,7 @@ struct Connection {
100101 chan : SyncChannel < Message > ,
101102 stats : Arc < Stats > ,
102103 txs_limit : usize ,
104+ die_please : Option < Receiver < ( ) > > ,
103105 #[ cfg( feature = "electrum-discovery" ) ]
104106 discovery : Option < Arc < DiscoveryManager > > ,
105107}
@@ -111,6 +113,7 @@ impl Connection {
111113 addr : SocketAddr ,
112114 stats : Arc < Stats > ,
113115 txs_limit : usize ,
116+ die_please : Receiver < ( ) > ,
114117 #[ cfg( feature = "electrum-discovery" ) ] discovery : Option < Arc < DiscoveryManager > > ,
115118 ) -> Connection {
116119 Connection {
@@ -122,6 +125,7 @@ impl Connection {
122125 chan : SyncChannel :: new ( 10 ) ,
123126 stats,
124127 txs_limit,
128+ die_please : Some ( die_please) ,
125129 #[ cfg( feature = "electrum-discovery" ) ]
126130 discovery,
127131 }
@@ -501,40 +505,46 @@ impl Connection {
501505 Ok ( ( ) )
502506 }
503507
504- fn handle_replies ( & mut self ) -> Result < ( ) > {
508+ fn handle_replies ( & mut self , shutdown : crossbeam_channel :: Receiver < ( ) > ) -> Result < ( ) > {
505509 let empty_params = json ! ( [ ] ) ;
506510 loop {
507- let msg = self . chan . receiver ( ) . recv ( ) . chain_err ( || "channel closed" ) ?;
508- trace ! ( "RPC {:?}" , msg) ;
509- match msg {
510- Message :: Request ( line) => {
511- let cmd: Value = from_str ( & line) . chain_err ( || "invalid JSON format" ) ?;
512- let reply = match (
513- cmd. get ( "method" ) ,
514- cmd. get ( "params" ) . unwrap_or_else ( || & empty_params) ,
515- cmd. get ( "id" ) ,
516- ) {
517- (
518- Some ( & Value :: String ( ref method) ) ,
519- & Value :: Array ( ref params) ,
520- Some ( ref id) ,
521- ) => self . handle_command ( method, params, id) ?,
522- _ => bail ! ( "invalid command: {}" , cmd) ,
523- } ;
524- self . send_values ( & [ reply] ) ?
525- }
526- Message :: PeriodicUpdate => {
527- let values = self
528- . update_subscriptions ( )
529- . chain_err ( || "failed to update subscriptions" ) ?;
530- self . send_values ( & values) ?
511+ crossbeam_channel:: select! {
512+ recv( self . chan. receiver( ) ) -> msg => {
513+ let msg = msg. chain_err( || "channel closed" ) ?;
514+ trace!( "RPC {:?}" , msg) ;
515+ match msg {
516+ Message :: Request ( line) => {
517+ let cmd: Value = from_str( & line) . chain_err( || "invalid JSON format" ) ?;
518+ let reply = match (
519+ cmd. get( "method" ) ,
520+ cmd. get( "params" ) . unwrap_or( & empty_params) ,
521+ cmd. get( "id" ) ,
522+ ) {
523+ ( Some ( Value :: String ( method) ) , Value :: Array ( params) , Some ( id) ) => {
524+ self . handle_command( method, params, id) ?
525+ }
526+ _ => bail!( "invalid command: {}" , cmd) ,
527+ } ;
528+ self . send_values( & [ reply] ) ?
529+ }
530+ Message :: PeriodicUpdate => {
531+ let values = self
532+ . update_subscriptions( )
533+ . chain_err( || "failed to update subscriptions" ) ?;
534+ self . send_values( & values) ?
535+ }
536+ Message :: Done => return Ok ( ( ) ) ,
537+ }
531538 }
532- Message :: Done => return Ok ( ( ) ) ,
539+ recv ( shutdown ) -> _ => return Ok ( ( ) ) ,
533540 }
534541 }
535542 }
536543
537- fn handle_requests ( mut reader : BufReader < TcpStream > , tx : SyncSender < Message > ) -> Result < ( ) > {
544+ fn handle_requests (
545+ mut reader : BufReader < TcpStream > ,
546+ tx : crossbeam_channel:: Sender < Message > ,
547+ ) -> Result < ( ) > {
538548 loop {
539549 let mut line = Vec :: < u8 > :: new ( ) ;
540550 reader
@@ -566,8 +576,18 @@ impl Connection {
566576 self . stats . clients . inc ( ) ;
567577 let reader = BufReader :: new ( self . stream . try_clone ( ) . expect ( "failed to clone TcpStream" ) ) ;
568578 let tx = self . chan . sender ( ) ;
579+
580+ let stream = self . stream . try_clone ( ) . expect ( "failed to clone TcpStream" ) ;
581+ let die_please = self . die_please . take ( ) . unwrap ( ) ;
582+ let ( reply_killer, reply_receiver) = crossbeam_channel:: unbounded ( ) ;
583+ spawn_thread ( "properly-die" , move || {
584+ let _ = die_please. recv ( ) ;
585+ let _ = stream. shutdown ( Shutdown :: Both ) ;
586+ let _ = reply_killer. send ( ( ) ) ;
587+ } ) ;
588+
569589 let child = spawn_thread ( "reader" , || Connection :: handle_requests ( reader, tx) ) ;
570- if let Err ( e) = self . handle_replies ( ) {
590+ if let Err ( e) = self . handle_replies ( reply_receiver ) {
571591 error ! (
572592 "[{}] connection handling failed: {}" ,
573593 self . addr,
@@ -633,30 +653,38 @@ struct Stats {
633653impl RPC {
634654 fn start_notifier (
635655 notification : Channel < Notification > ,
636- senders : Arc < Mutex < Vec < SyncSender < Message > > > > ,
656+ senders : Arc < Mutex < Vec < crossbeam_channel :: Sender < Message > > > > ,
637657 acceptor : Sender < Option < ( TcpStream , SocketAddr ) > > ,
658+ acceptor_shutdown : Sender < ( ) > ,
638659 ) {
639660 spawn_thread ( "notification" , move || {
640661 for msg in notification. receiver ( ) . iter ( ) {
641662 let mut senders = senders. lock ( ) . unwrap ( ) ;
642663 match msg {
643664 Notification :: Periodic => {
644665 for sender in senders. split_off ( 0 ) {
645- if let Err ( TrySendError :: Disconnected ( _) ) =
666+ if let Err ( crossbeam_channel :: TrySendError :: Disconnected ( _) ) =
646667 sender. try_send ( Message :: PeriodicUpdate )
647668 {
648669 continue ;
649670 }
650671 senders. push ( sender) ;
651672 }
652673 }
653- Notification :: Exit => acceptor. send ( None ) . unwrap ( ) , // mark acceptor as done
674+ Notification :: Exit => {
675+ acceptor_shutdown. send ( ( ) ) . unwrap ( ) ; // Stop the acceptor itself
676+ acceptor. send ( None ) . unwrap ( ) ; // mark acceptor as done
677+ break ;
678+ }
654679 }
655680 }
656681 } ) ;
657682 }
658683
659- fn start_acceptor ( addr : SocketAddr ) -> Channel < Option < ( TcpStream , SocketAddr ) > > {
684+ fn start_acceptor (
685+ addr : SocketAddr ,
686+ shutdown_channel : Channel < ( ) > ,
687+ ) -> Channel < Option < ( TcpStream , SocketAddr ) > > {
660688 let chan = Channel :: unbounded ( ) ;
661689 let acceptor = chan. sender ( ) ;
662690 spawn_thread ( "acceptor" , move || {
@@ -666,10 +694,29 @@ impl RPC {
666694 . set_nonblocking ( false )
667695 . expect ( "cannot set nonblocking to false" ) ;
668696 let listener = TcpListener :: from ( socket) ;
697+ let local_addr = listener. local_addr ( ) . unwrap ( ) ;
698+ let shutdown_bool = Arc :: new ( AtomicBool :: new ( false ) ) ;
699+
700+ {
701+ let shutdown_bool = Arc :: clone ( & shutdown_bool) ;
702+ crate :: util:: spawn_thread ( "shutdown-acceptor" , move || {
703+ // Block until shutdown is sent.
704+ let _ = shutdown_channel. receiver ( ) . recv ( ) ;
705+ // Store the bool so after the next accept it will break the loop
706+ shutdown_bool. store ( true , std:: sync:: atomic:: Ordering :: Release ) ;
707+ // Connect to the socket to cause it to unblock
708+ let _ = TcpStream :: connect ( local_addr) ;
709+ } ) ;
710+ }
669711
670712 info ! ( "Electrum RPC server running on {}" , addr) ;
671713 loop {
672714 let ( stream, addr) = listener. accept ( ) . expect ( "accept failed" ) ;
715+
716+ if shutdown_bool. load ( std:: sync:: atomic:: Ordering :: Acquire ) {
717+ break ;
718+ }
719+
673720 stream
674721 . set_nonblocking ( false )
675722 . expect ( "failed to set connection as blocking" ) ;
@@ -726,10 +773,18 @@ impl RPC {
726773 RPC {
727774 notification : notification. sender ( ) ,
728775 server : Some ( spawn_thread ( "rpc" , move || {
729- let senders = Arc :: new ( Mutex :: new ( Vec :: < SyncSender < Message > > :: new ( ) ) ) ;
730-
731- let acceptor = RPC :: start_acceptor ( rpc_addr) ;
732- RPC :: start_notifier ( notification, senders. clone ( ) , acceptor. sender ( ) ) ;
776+ let senders =
777+ Arc :: new ( Mutex :: new ( Vec :: < crossbeam_channel:: Sender < Message > > :: new ( ) ) ) ;
778+
779+ let acceptor_shutdown = Channel :: unbounded ( ) ;
780+ let acceptor_shutdown_sender = acceptor_shutdown. sender ( ) ;
781+ let acceptor = RPC :: start_acceptor ( rpc_addr, acceptor_shutdown) ;
782+ RPC :: start_notifier (
783+ notification,
784+ senders. clone ( ) ,
785+ acceptor. sender ( ) ,
786+ acceptor_shutdown_sender,
787+ ) ;
733788
734789 let mut threads = HashMap :: new ( ) ;
735790 let ( garbage_sender, garbage_receiver) = crossbeam_channel:: unbounded ( ) ;
@@ -740,6 +795,10 @@ impl RPC {
740795 let senders = Arc :: clone ( & senders) ;
741796 let stats = Arc :: clone ( & stats) ;
742797 let garbage_sender = garbage_sender. clone ( ) ;
798+
799+ // Kill the peers properly
800+ let ( killer, peace_receiver) = std:: sync:: mpsc:: channel ( ) ;
801+
743802 #[ cfg( feature = "electrum-discovery" ) ]
744803 let discovery = discovery. clone ( ) ;
745804
@@ -751,6 +810,7 @@ impl RPC {
751810 addr,
752811 stats,
753812 txs_limit,
813+ peace_receiver,
754814 #[ cfg( feature = "electrum-discovery" ) ]
755815 discovery,
756816 ) ;
@@ -761,24 +821,29 @@ impl RPC {
761821 } ) ;
762822
763823 trace ! ( "[{}] spawned {:?}" , addr, spawned. thread( ) . id( ) ) ;
764- threads. insert ( spawned. thread ( ) . id ( ) , spawned) ;
824+ threads. insert ( spawned. thread ( ) . id ( ) , ( spawned, killer ) ) ;
765825 while let Ok ( id) = garbage_receiver. try_recv ( ) {
766- if let Some ( thread) = threads. remove ( & id) {
826+ if let Some ( ( thread, killer ) ) = threads. remove ( & id) {
767827 trace ! ( "[{}] joining {:?}" , addr, id) ;
828+ let _ = killer. send ( ( ) ) ;
768829 if let Err ( error) = thread. join ( ) {
769830 error ! ( "failed to join {:?}: {:?}" , id, error) ;
770831 }
771832 }
772833 }
773834 }
835+ // Drop these
836+ drop ( acceptor) ;
837+ drop ( garbage_receiver) ;
774838
775839 trace ! ( "closing {} RPC connections" , senders. lock( ) . unwrap( ) . len( ) ) ;
776840 for sender in senders. lock ( ) . unwrap ( ) . iter ( ) {
777- let _ = sender. send ( Message :: Done ) ;
841+ let _ = sender. try_send ( Message :: Done ) ;
778842 }
779843
780- for ( id, thread) in threads {
844+ for ( id, ( thread, killer ) ) in threads {
781845 trace ! ( "joining {:?}" , id) ;
846+ let _ = killer. send ( ( ) ) ;
782847 if let Err ( error) = thread. join ( ) {
783848 error ! ( "failed to join {:?}: {:?}" , id, error) ;
784849 }
@@ -802,5 +867,8 @@ impl Drop for RPC {
802867 handle. join ( ) . unwrap ( ) ;
803868 }
804869 trace ! ( "RPC server is stopped" ) ;
870+ crate :: util:: with_spawned_threads ( |threads| {
871+ trace ! ( "Threads after dropping RPC: {:?}" , threads) ;
872+ } ) ;
805873 }
806874}
0 commit comments