From 6fe2d30c76107f4968b712ac809cfaae38736ed0 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 13 May 2025 14:04:46 +0200 Subject: [PATCH 1/3] Reduce syncing and shutdown timeouts considerably Previously, we had to configure enormous syncing timeouts as the BDK wallet syncing would hold a central mutex that could lead to large parts of event handling and syncing locking up. Here, we drop the configured timeouts considerably across the board, since such huge values are hopefully not required anymore. --- src/chain/electrum.rs | 2 +- src/config.rs | 7 +++++-- src/lib.rs | 10 ++++------ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 6e62d9c08..9882e652b 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -40,7 +40,7 @@ use std::time::{Duration, Instant}; const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5; const ELECTRUM_CLIENT_NUM_RETRIES: u8 = 3; -const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 20; +const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 10; pub(crate) struct ElectrumRuntimeClient { electrum_client: Arc, diff --git a/src/config.rs b/src/config.rs index a2930ea5a..7b7ed8156 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,10 +65,13 @@ pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60 pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; // The timeout after which we abort a wallet syncing operation. -pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90; +pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 20; // The timeout after which we abort a wallet syncing operation. -pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; +pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 10; + +// The timeout after which we give up waiting on LDK's event handler to exit on shutdown. +pub(crate) const LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS: u64 = 30; // The timeout after which we abort a fee rate cache update operation. pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; diff --git a/src/lib.rs b/src/lib.rs index a75da763a..8579c29fc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,8 +126,9 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, - PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, may_announce_channel, ChannelConfig, Config, + LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, + RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -673,13 +674,10 @@ impl Node { let event_handling_stopped_logger = Arc::clone(&self.logger); let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe(); - // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow - // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We - // should drop this considerably post upgrading to BDK 1.0. let timeout_res = tokio::task::block_in_place(move || { runtime.block_on(async { tokio::time::timeout( - Duration::from_secs(100), + Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), event_handling_stopped_receiver.changed(), ) .await From 9eae61dc0a4b0f14a728d2019cd10d20c9b35a2a Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 7 Jul 2025 11:37:52 +0200 Subject: [PATCH 2/3] Await on the background processing task's `JoinHandle` Previously, we used to a channel to indicate that the background processor task has been stopped. Here, we rather just await the task's `JoinHandle` which is more robust in that it avoids a race condition. --- src/builder.rs | 4 +-- src/lib.rs | 80 +++++++++++++++++++++++--------------------------- 2 files changed, 38 insertions(+), 46 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index a177768f6..66b160e31 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1591,12 +1591,12 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); - let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); + let background_processor_task = Mutex::new(None); Ok(Node { runtime, stop_sender, - event_handling_stopped_sender, + background_processor_task, config, wallet, chain_source, diff --git a/src/lib.rs b/src/lib.rs index 8579c29fc..e0f8ff236 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -180,7 +180,7 @@ uniffi::include_scaffolding!("ldk_node"); pub struct Node { runtime: Arc>>>, stop_sender: tokio::sync::watch::Sender<()>, - event_handling_stopped_sender: tokio::sync::watch::Sender<()>, + background_processor_task: Mutex>>, config: Arc, wallet: Arc, chain_source: Arc, @@ -579,8 +579,7 @@ impl Node { }; let background_stop_logger = Arc::clone(&self.logger); - let event_handling_stopped_sender = self.event_handling_stopped_sender.clone(); - runtime.spawn(async move { + let handle = runtime.spawn(async move { process_events_async( background_persister, |e| background_event_handler.handle_event(e), @@ -601,19 +600,9 @@ impl Node { panic!("Failed to process events"); }); log_debug!(background_stop_logger, "Events processing stopped.",); - - match event_handling_stopped_sender.send(()) { - Ok(_) => (), - Err(e) => { - log_error!( - background_stop_logger, - "Failed to send 'events handling stopped' signal. This should never happen: {}", - e - ); - debug_assert!(false); - }, - } }); + debug_assert!(self.background_processor_task.lock().unwrap().is_none()); + *self.background_processor_task.lock().unwrap() = Some(handle); if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); @@ -670,39 +659,42 @@ impl Node { // Disconnect all peers. self.peer_manager.disconnect_all_peers(); - // Wait until event handling stopped, at least until a timeout is reached. - let event_handling_stopped_logger = Arc::clone(&self.logger); - let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe(); + // Stop any runtime-dependant chain sources. + self.chain_source.stop(); - let timeout_res = tokio::task::block_in_place(move || { - runtime.block_on(async { - tokio::time::timeout( - Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), - event_handling_stopped_receiver.changed(), - ) - .await - }) - }); + // Wait until background processing stopped, at least until a timeout is reached. + if let Some(background_processor_task) = + self.background_processor_task.lock().unwrap().take() + { + let abort_handle = background_processor_task.abort_handle(); + let timeout_res = tokio::task::block_in_place(move || { + runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), + background_processor_task, + ) + .await + }) + }); - match timeout_res { - Ok(stop_res) => match stop_res { - Ok(()) => {}, + match timeout_res { + Ok(stop_res) => match stop_res { + Ok(()) => {}, + Err(e) => { + abort_handle.abort(); + log_error!( + self.logger, + "Stopping event handling failed. This should never happen: {}", + e + ); + panic!("Stopping event handling failed. This should never happen."); + }, + }, Err(e) => { - log_error!( - event_handling_stopped_logger, - "Stopping event handling failed. This should never happen: {}", - e - ); - panic!("Stopping event handling failed. This should never happen."); + abort_handle.abort(); + log_error!(self.logger, "Stopping event handling timed out: {}", e); }, - }, - Err(e) => { - log_error!( - event_handling_stopped_logger, - "Stopping event handling timed out: {}", - e - ); - }, + } } #[cfg(tokio_unstable)] From c3d6161b7e46ef20db6530827aeef3a21c57703a Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 7 Jul 2025 13:28:03 +0200 Subject: [PATCH 3/3] Improve logging in `stop` .. we provide finer-grained logging after each step of `stop`. --- src/lib.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e0f8ff236..0a53fbbb3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,9 +147,7 @@ use types::{ }; pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId}; -#[cfg(tokio_unstable)] -use logger::log_trace; -use logger::{log_debug, log_error, log_info, LdkLogger, Logger}; +use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use lightning::chain::BestBlock; use lightning::events::bump_transaction::Wallet as LdkWallet; @@ -578,7 +576,6 @@ impl Node { }) }; - let background_stop_logger = Arc::clone(&self.logger); let handle = runtime.spawn(async move { process_events_async( background_persister, @@ -599,7 +596,6 @@ impl Node { log_error!(background_error_logger, "Failed to process events: {}", e); panic!("Failed to process events"); }); - log_debug!(background_stop_logger, "Events processing stopped.",); }); debug_assert!(self.background_processor_task.lock().unwrap().is_none()); *self.background_processor_task.lock().unwrap() = Some(handle); @@ -645,7 +641,7 @@ impl Node { // Stop the runtime. match self.stop_sender.send(()) { - Ok(_) => (), + Ok(_) => log_trace!(self.logger, "Sent shutdown signal to background tasks."), Err(e) => { log_error!( self.logger, @@ -658,9 +654,11 @@ impl Node { // Disconnect all peers. self.peer_manager.disconnect_all_peers(); + log_debug!(self.logger, "Disconnected all network peers."); // Stop any runtime-dependant chain sources. self.chain_source.stop(); + log_debug!(self.logger, "Stopped chain sources."); // Wait until background processing stopped, at least until a timeout is reached. if let Some(background_processor_task) = @@ -679,7 +677,7 @@ impl Node { match timeout_res { Ok(stop_res) => match stop_res { - Ok(()) => {}, + Ok(()) => log_debug!(self.logger, "Stopped background processing of events."), Err(e) => { abort_handle.abort(); log_error!(