diff --git a/src/builder.rs b/src/builder.rs index a177768f6..66b160e31 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1591,12 +1591,12 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); - let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); + let background_processor_task = Mutex::new(None); Ok(Node { runtime, stop_sender, - event_handling_stopped_sender, + background_processor_task, config, wallet, chain_source, diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 6e62d9c08..9882e652b 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -40,7 +40,7 @@ use std::time::{Duration, Instant}; const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5; const ELECTRUM_CLIENT_NUM_RETRIES: u8 = 3; -const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 20; +const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 10; pub(crate) struct ElectrumRuntimeClient { electrum_client: Arc, diff --git a/src/config.rs b/src/config.rs index a2930ea5a..7b7ed8156 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,10 +65,13 @@ pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60 pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; // The timeout after which we abort a wallet syncing operation. -pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90; +pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 20; // The timeout after which we abort a wallet syncing operation. -pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; +pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 10; + +// The timeout after which we give up waiting on LDK's event handler to exit on shutdown. +pub(crate) const LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS: u64 = 30; // The timeout after which we abort a fee rate cache update operation. pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; diff --git a/src/lib.rs b/src/lib.rs index a75da763a..0a53fbbb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,8 +126,9 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, - PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, may_announce_channel, ChannelConfig, Config, + LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, + RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -146,9 +147,7 @@ use types::{ }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; -#[cfg(tokio_unstable)] -use logger::log_trace; -use logger::{log_debug, log_error, log_info, LdkLogger, Logger}; +use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use lightning::chain::BestBlock; use lightning::events::bump_transaction::Wallet as LdkWallet; @@ -179,7 +178,7 @@ uniffi::include_scaffolding!("ldk_node"); pub struct Node { runtime: Arc>>>, stop_sender: tokio::sync::watch::Sender<()>, - event_handling_stopped_sender: tokio::sync::watch::Sender<()>, + background_processor_task: Mutex>>, config: Arc, wallet: Arc, chain_source: Arc, @@ -577,9 +576,7 @@ impl Node { }) }; - let background_stop_logger = Arc::clone(&self.logger); - let event_handling_stopped_sender = self.event_handling_stopped_sender.clone(); - runtime.spawn(async move { + let handle = runtime.spawn(async move { process_events_async( background_persister, |e| background_event_handler.handle_event(e), @@ -599,20 +596,9 @@ impl Node { log_error!(background_error_logger, "Failed to process events: {}", e); panic!("Failed to process events"); }); - log_debug!(background_stop_logger, "Events processing stopped.",); - - match event_handling_stopped_sender.send(()) { - Ok(_) => (), - Err(e) => { - log_error!( - background_stop_logger, - "Failed to send 'events handling stopped' signal. This should never happen: {}", - e - ); - debug_assert!(false); - }, - } }); + debug_assert!(self.background_processor_task.lock().unwrap().is_none()); + *self.background_processor_task.lock().unwrap() = Some(handle); if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); @@ -655,7 +641,7 @@ impl Node { // Stop the runtime. match self.stop_sender.send(()) { - Ok(_) => (), + Ok(_) => log_trace!(self.logger, "Sent shutdown signal to background tasks."), Err(e) => { log_error!( self.logger, @@ -668,43 +654,45 @@ impl Node { // Disconnect all peers. self.peer_manager.disconnect_all_peers(); + log_debug!(self.logger, "Disconnected all network peers."); - // Wait until event handling stopped, at least until a timeout is reached. - let event_handling_stopped_logger = Arc::clone(&self.logger); - let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe(); - - // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow - // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We - // should drop this considerably post upgrading to BDK 1.0. - let timeout_res = tokio::task::block_in_place(move || { - runtime.block_on(async { - tokio::time::timeout( - Duration::from_secs(100), - event_handling_stopped_receiver.changed(), - ) - .await - }) - }); + // Stop any runtime-dependant chain sources. + self.chain_source.stop(); + log_debug!(self.logger, "Stopped chain sources."); + + // Wait until background processing stopped, at least until a timeout is reached. + if let Some(background_processor_task) = + self.background_processor_task.lock().unwrap().take() + { + let abort_handle = background_processor_task.abort_handle(); + let timeout_res = tokio::task::block_in_place(move || { + runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), + background_processor_task, + ) + .await + }) + }); - match timeout_res { - Ok(stop_res) => match stop_res { - Ok(()) => {}, + match timeout_res { + Ok(stop_res) => match stop_res { + Ok(()) => log_debug!(self.logger, "Stopped background processing of events."), + Err(e) => { + abort_handle.abort(); + log_error!( + self.logger, + "Stopping event handling failed. This should never happen: {}", + e + ); + panic!("Stopping event handling failed. This should never happen."); + }, + }, Err(e) => { - log_error!( - event_handling_stopped_logger, - "Stopping event handling failed. This should never happen: {}", - e - ); - panic!("Stopping event handling failed. This should never happen."); + abort_handle.abort(); + log_error!(self.logger, "Stopping event handling timed out: {}", e); }, - }, - Err(e) => { - log_error!( - event_handling_stopped_logger, - "Stopping event handling timed out: {}", - e - ); - }, + } } #[cfg(tokio_unstable)]