From 75dd1dc97598fa931f434712b3b53a2a46803e9d Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 13 May 2025 14:04:46 +0200 Subject: [PATCH 1/6] Reduce syncing and shutdown timeouts considerably Previously, we had to configure enormous syncing timeouts as the BDK wallet syncing would hold a central mutex that could lead to large parts of event handling and syncing locking up. Here, we drop the configured timeouts considerably across the board, since such huge values are hopefully not required anymore. --- src/chain/electrum.rs | 2 +- src/config.rs | 7 +++++-- src/lib.rs | 10 ++++------ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 6e62d9c08..9882e652b 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -40,7 +40,7 @@ use std::time::{Duration, Instant}; const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5; const ELECTRUM_CLIENT_NUM_RETRIES: u8 = 3; -const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 20; +const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 10; pub(crate) struct ElectrumRuntimeClient { electrum_client: Arc, diff --git a/src/config.rs b/src/config.rs index 4a39c1b56..544c6d3bb 100644 --- a/src/config.rs +++ b/src/config.rs @@ -65,10 +65,13 @@ pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60 pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10; // The timeout after which we abort a wallet syncing operation. -pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90; +pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 20; // The timeout after which we abort a wallet syncing operation. -pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30; +pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 10; + +// The timeout after which we give up waiting on LDK's event handler to exit on shutdown. +pub(crate) const LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS: u64 = 30; // The timeout after which we abort a fee rate cache update operation. pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; diff --git a/src/lib.rs b/src/lib.rs index c3bfe16d8..592e5cbfd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -127,8 +127,9 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ - default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL, - PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, + default_user_config, may_announce_channel, ChannelConfig, Config, + LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, + RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -672,13 +673,10 @@ impl Node { let event_handling_stopped_logger = Arc::clone(&self.logger); let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe(); - // FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow - // event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We - // should drop this considerably post upgrading to BDK 1.0. let timeout_res = tokio::task::block_in_place(move || { runtime.block_on(async { tokio::time::timeout( - Duration::from_secs(100), + Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), event_handling_stopped_receiver.changed(), ) .await From 8f8d7e57ceb6bc0b78da80fd41ac9ba3367f2971 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 7 Jul 2025 11:37:52 +0200 Subject: [PATCH 2/6] Await on the background processing task's `JoinHandle` Previously, we used to a channel to indicate that the background processor task has been stopped. Here, we rather just await the task's `JoinHandle` which is more robust in that it avoids a race condition. --- src/builder.rs | 4 +-- src/lib.rs | 80 +++++++++++++++++++++++--------------------------- 2 files changed, 38 insertions(+), 46 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 79982b4e3..f58521525 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1495,12 +1495,12 @@ fn build_with_store_internal( }; let (stop_sender, _) = tokio::sync::watch::channel(()); - let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(()); + let background_processor_task = Mutex::new(None); Ok(Node { runtime, stop_sender, - event_handling_stopped_sender, + background_processor_task, config, wallet, chain_source, diff --git a/src/lib.rs b/src/lib.rs index 592e5cbfd..b62cf573d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -179,7 +179,7 @@ uniffi::include_scaffolding!("ldk_node"); pub struct Node { runtime: Arc>>>, stop_sender: tokio::sync::watch::Sender<()>, - event_handling_stopped_sender: tokio::sync::watch::Sender<()>, + background_processor_task: Mutex>>, config: Arc, wallet: Arc, chain_source: Arc, @@ -578,8 +578,7 @@ impl Node { }; let background_stop_logger = Arc::clone(&self.logger); - let event_handling_stopped_sender = self.event_handling_stopped_sender.clone(); - runtime.spawn(async move { + let handle = runtime.spawn(async move { process_events_async( background_persister, |e| background_event_handler.handle_event(e), @@ -600,19 +599,9 @@ impl Node { panic!("Failed to process events"); }); log_debug!(background_stop_logger, "Events processing stopped.",); - - match event_handling_stopped_sender.send(()) { - Ok(_) => (), - Err(e) => { - log_error!( - background_stop_logger, - "Failed to send 'events handling stopped' signal. This should never happen: {}", - e - ); - debug_assert!(false); - }, - } }); + debug_assert!(self.background_processor_task.lock().unwrap().is_none()); + *self.background_processor_task.lock().unwrap() = Some(handle); if let Some(liquidity_source) = self.liquidity_source.as_ref() { let mut stop_liquidity_handler = self.stop_sender.subscribe(); @@ -669,39 +658,42 @@ impl Node { // Disconnect all peers. self.peer_manager.disconnect_all_peers(); - // Wait until event handling stopped, at least until a timeout is reached. - let event_handling_stopped_logger = Arc::clone(&self.logger); - let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe(); + // Stop any runtime-dependant chain sources. + self.chain_source.stop(); - let timeout_res = tokio::task::block_in_place(move || { - runtime.block_on(async { - tokio::time::timeout( - Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), - event_handling_stopped_receiver.changed(), - ) - .await - }) - }); + // Wait until background processing stopped, at least until a timeout is reached. + if let Some(background_processor_task) = + self.background_processor_task.lock().unwrap().take() + { + let abort_handle = background_processor_task.abort_handle(); + let timeout_res = tokio::task::block_in_place(move || { + runtime.block_on(async { + tokio::time::timeout( + Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), + background_processor_task, + ) + .await + }) + }); - match timeout_res { - Ok(stop_res) => match stop_res { - Ok(()) => {}, + match timeout_res { + Ok(stop_res) => match stop_res { + Ok(()) => {}, + Err(e) => { + abort_handle.abort(); + log_error!( + self.logger, + "Stopping event handling failed. This should never happen: {}", + e + ); + panic!("Stopping event handling failed. This should never happen."); + }, + }, Err(e) => { - log_error!( - event_handling_stopped_logger, - "Stopping event handling failed. This should never happen: {}", - e - ); - panic!("Stopping event handling failed. This should never happen."); + abort_handle.abort(); + log_error!(self.logger, "Stopping event handling timed out: {}", e); }, - }, - Err(e) => { - log_error!( - event_handling_stopped_logger, - "Stopping event handling timed out: {}", - e - ); - }, + } } #[cfg(tokio_unstable)] From 8ce139a1cb31aeb624640a8e3d452903272ce7b9 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 7 Jul 2025 13:28:03 +0200 Subject: [PATCH 3/6] Improve logging in `stop` .. we provide finer-grained logging after each step of `stop`. --- src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b62cf573d..9c404324d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -577,7 +577,6 @@ impl Node { }) }; - let background_stop_logger = Arc::clone(&self.logger); let handle = runtime.spawn(async move { process_events_async( background_persister, @@ -598,7 +597,6 @@ impl Node { log_error!(background_error_logger, "Failed to process events: {}", e); panic!("Failed to process events"); }); - log_debug!(background_stop_logger, "Events processing stopped.",); }); debug_assert!(self.background_processor_task.lock().unwrap().is_none()); *self.background_processor_task.lock().unwrap() = Some(handle); @@ -644,7 +642,7 @@ impl Node { // Stop the runtime. match self.stop_sender.send(()) { - Ok(_) => (), + Ok(_) => log_trace!(self.logger, "Sent shutdown signal to background tasks."), Err(e) => { log_error!( self.logger, @@ -657,9 +655,11 @@ impl Node { // Disconnect all peers. self.peer_manager.disconnect_all_peers(); + log_debug!(self.logger, "Disconnected all network peers."); // Stop any runtime-dependant chain sources. self.chain_source.stop(); + log_debug!(self.logger, "Stopped chain sources."); // Wait until background processing stopped, at least until a timeout is reached. if let Some(background_processor_task) = @@ -678,7 +678,7 @@ impl Node { match timeout_res { Ok(stop_res) => match stop_res { - Ok(()) => {}, + Ok(()) => log_debug!(self.logger, "Stopped background processing of events."), Err(e) => { abort_handle.abort(); log_error!( From a0d7ad40529c0505406e1812010d30295f3d546d Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 14 Aug 2025 14:23:59 +0200 Subject: [PATCH 4/6] Wait on all background tasks to finish (or abort) Previously, we'd only wait for the background processor tasks to successfully finish. It turned out that this could lead to races when the other background tasks took too long to shutdown. Here, we attempt to wait on all background tasks shutting down for a bit, before moving on. --- src/builder.rs | 4 ++ src/config.rs | 3 ++ src/lib.rs | 136 ++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 113 insertions(+), 30 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index f58521525..dbb7096dc 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1496,11 +1496,15 @@ fn build_with_store_internal( let (stop_sender, _) = tokio::sync::watch::channel(()); let background_processor_task = Mutex::new(None); + let background_tasks = Mutex::new(None); + let cancellable_background_tasks = Mutex::new(None); Ok(Node { runtime, stop_sender, background_processor_task, + background_tasks, + cancellable_background_tasks, config, wallet, chain_source, diff --git a/src/config.rs b/src/config.rs index 544c6d3bb..6b02015fe 100644 --- a/src/config.rs +++ b/src/config.rs @@ -73,6 +73,9 @@ pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 10; // The timeout after which we give up waiting on LDK's event handler to exit on shutdown. pub(crate) const LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS: u64 = 30; +// The timeout after which we give up waiting on a background task to exit on shutdown. +pub(crate) const BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS: u64 = 5; + // The timeout after which we abort a fee rate cache update operation. pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5; diff --git a/src/lib.rs b/src/lib.rs index 9c404324d..868354c1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -128,8 +128,8 @@ pub use builder::NodeBuilder as Builder; use chain::ChainSource; use config::{ default_user_config, may_announce_channel, ChannelConfig, Config, - LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, - RGS_SYNC_INTERVAL, + BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, + NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL, }; use connection::ConnectionManager; use event::{EventHandler, EventQueue}; @@ -180,6 +180,8 @@ pub struct Node { runtime: Arc>>>, stop_sender: tokio::sync::watch::Sender<()>, background_processor_task: Mutex>>, + background_tasks: Mutex>>, + cancellable_background_tasks: Mutex>>, config: Arc, wallet: Arc, chain_source: Arc, @@ -233,6 +235,10 @@ impl Node { return Err(Error::AlreadyRunning); } + let mut background_tasks = tokio::task::JoinSet::new(); + let mut cancellable_background_tasks = tokio::task::JoinSet::new(); + let runtime_handle = runtime.handle(); + log_info!( self.logger, "Starting up LDK Node with node ID {} on network: {}", @@ -259,11 +265,19 @@ impl Node { let sync_cman = Arc::clone(&self.channel_manager); let sync_cmon = Arc::clone(&self.chain_monitor); let sync_sweeper = Arc::clone(&self.output_sweeper); - runtime.spawn(async move { - chain_source - .continuously_sync_wallets(stop_sync_receiver, sync_cman, sync_cmon, sync_sweeper) - .await; - }); + background_tasks.spawn_on( + async move { + chain_source + .continuously_sync_wallets( + stop_sync_receiver, + sync_cman, + sync_cmon, + sync_sweeper, + ) + .await; + }, + runtime_handle, + ); if self.gossip_source.is_rgs() { let gossip_source = Arc::clone(&self.gossip_source); @@ -271,7 +285,7 @@ impl Node { let gossip_sync_logger = Arc::clone(&self.logger); let gossip_node_metrics = Arc::clone(&self.node_metrics); let mut stop_gossip_sync = self.stop_sender.subscribe(); - runtime.spawn(async move { + cancellable_background_tasks.spawn_on(async move { let mut interval = tokio::time::interval(RGS_SYNC_INTERVAL); loop { tokio::select! { @@ -312,7 +326,7 @@ impl Node { } } } - }); + }, runtime_handle); } if let Some(listening_addresses) = &self.config.listening_addresses { @@ -338,7 +352,7 @@ impl Node { bind_addrs.extend(resolved_address); } - runtime.spawn(async move { + cancellable_background_tasks.spawn_on(async move { { let listener = tokio::net::TcpListener::bind(&*bind_addrs).await @@ -357,7 +371,7 @@ impl Node { _ = stop_listen.changed() => { log_debug!( listening_logger, - "Stopping listening to inbound connections.", + "Stopping listening to inbound connections." ); break; } @@ -376,7 +390,7 @@ impl Node { } listening_indicator.store(false, Ordering::Release); - }); + }, runtime_handle); } // Regularly reconnect to persisted peers. @@ -385,7 +399,7 @@ impl Node { let connect_logger = Arc::clone(&self.logger); let connect_peer_store = Arc::clone(&self.peer_store); let mut stop_connect = self.stop_sender.subscribe(); - runtime.spawn(async move { + cancellable_background_tasks.spawn_on(async move { let mut interval = tokio::time::interval(PEER_RECONNECTION_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { @@ -393,7 +407,7 @@ impl Node { _ = stop_connect.changed() => { log_debug!( connect_logger, - "Stopping reconnecting known peers.", + "Stopping reconnecting known peers." ); return; } @@ -413,7 +427,7 @@ impl Node { } } } - }); + }, runtime_handle); // Regularly broadcast node announcements. let bcast_cm = Arc::clone(&self.channel_manager); @@ -425,7 +439,7 @@ impl Node { let mut stop_bcast = self.stop_sender.subscribe(); let node_alias = self.config.node_alias.clone(); if may_announce_channel(&self.config).is_ok() { - runtime.spawn(async move { + cancellable_background_tasks.spawn_on(async move { // We check every 30 secs whether our last broadcast is NODE_ANN_BCAST_INTERVAL away. #[cfg(not(test))] let mut interval = tokio::time::interval(Duration::from_secs(30)); @@ -496,7 +510,7 @@ impl Node { } } } - }); + }, runtime_handle); } let mut stop_tx_bcast = self.stop_sender.subscribe(); @@ -605,24 +619,33 @@ impl Node { let mut stop_liquidity_handler = self.stop_sender.subscribe(); let liquidity_handler = Arc::clone(&liquidity_source); let liquidity_logger = Arc::clone(&self.logger); - runtime.spawn(async move { - loop { - tokio::select! { - _ = stop_liquidity_handler.changed() => { - log_debug!( - liquidity_logger, - "Stopping processing liquidity events.", - ); - return; + background_tasks.spawn_on( + async move { + loop { + tokio::select! { + _ = stop_liquidity_handler.changed() => { + log_debug!( + liquidity_logger, + "Stopping processing liquidity events.", + ); + return; + } + _ = liquidity_handler.handle_next_event() => {} } - _ = liquidity_handler.handle_next_event() => {} } - } - }); + }, + runtime_handle, + ); } *runtime_lock = Some(runtime); + debug_assert!(self.background_tasks.lock().unwrap().is_none()); + *self.background_tasks.lock().unwrap() = Some(background_tasks); + + debug_assert!(self.cancellable_background_tasks.lock().unwrap().is_none()); + *self.cancellable_background_tasks.lock().unwrap() = Some(cancellable_background_tasks); + log_info!(self.logger, "Startup complete."); Ok(()) } @@ -653,6 +676,17 @@ impl Node { }, } + // Cancel cancellable background tasks + if let Some(mut tasks) = self.cancellable_background_tasks.lock().unwrap().take() { + let runtime_2 = Arc::clone(&runtime); + tasks.abort_all(); + tokio::task::block_in_place(move || { + runtime_2.block_on(async { while let Some(_) = tasks.join_next().await {} }) + }); + } else { + debug_assert!(false, "Expected some cancellable background tasks"); + }; + // Disconnect all peers. self.peer_manager.disconnect_all_peers(); log_debug!(self.logger, "Disconnected all network peers."); @@ -661,6 +695,46 @@ impl Node { self.chain_source.stop(); log_debug!(self.logger, "Stopped chain sources."); + // Wait until non-cancellable background tasks (mod LDK's background processor) are done. + let runtime_3 = Arc::clone(&runtime); + if let Some(mut tasks) = self.background_tasks.lock().unwrap().take() { + tokio::task::block_in_place(move || { + runtime_3.block_on(async { + loop { + let timeout_fut = tokio::time::timeout( + Duration::from_secs(BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS), + tasks.join_next_with_id(), + ); + match timeout_fut.await { + Ok(Some(Ok((id, _)))) => { + log_trace!(self.logger, "Stopped background task with id {}", id); + }, + Ok(Some(Err(e))) => { + tasks.abort_all(); + log_trace!(self.logger, "Stopping background task failed: {}", e); + break; + }, + Ok(None) => { + log_debug!(self.logger, "Stopped all background tasks"); + break; + }, + Err(e) => { + tasks.abort_all(); + log_error!( + self.logger, + "Stopping background task timed out: {}", + e + ); + break; + }, + } + } + }) + }); + } else { + debug_assert!(false, "Expected some background tasks"); + }; + // Wait until background processing stopped, at least until a timeout is reached. if let Some(background_processor_task) = self.background_processor_task.lock().unwrap().take() @@ -694,7 +768,9 @@ impl Node { log_error!(self.logger, "Stopping event handling timed out: {}", e); }, } - } + } else { + debug_assert!(false, "Expected a background processing task"); + }; #[cfg(tokio_unstable)] { From 672a3329e8bdbcbbe582a9c4bd3ffeb717c7e47f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 14 Aug 2025 14:45:08 +0200 Subject: [PATCH 5/6] Add test that drops the node in an async context .. as tokio tends to panic if dropping a runtime in an async context and we're not super careful. Here, we add some test coverage for this edge case in Rust tests. --- tests/integration_tests_rust.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index db48eca23..b21387521 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -1381,3 +1381,14 @@ fn facade_logging() { validate_log_entry(entry); } } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn drop_in_async_context() { + let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::Esplora(&electrsd); + let seed_bytes = vec![42u8; 64]; + + let config = random_config(true); + let node = setup_node(&chain_source, config, Some(seed_bytes)); + node.stop().unwrap(); +} From 1a2d900e2a8fbf8703485f3aa7fb7a91f63bb07c Mon Sep 17 00:00:00 2001 From: moisesPomilio <93723302+moisesPompilio@users.noreply.github.com> Date: Mon, 30 Jun 2025 21:52:19 -0300 Subject: [PATCH 6/6] Fix CLN crash by waiting for block height sync before channel open (#527) --- tests/integration_tests_cln.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/tests/integration_tests_cln.rs b/tests/integration_tests_cln.rs index b6300576c..f77311fb2 100644 --- a/tests/integration_tests_cln.rs +++ b/tests/integration_tests_cln.rs @@ -64,7 +64,17 @@ fn test_cln() { // Setup CLN let sock = "/tmp/lightning-rpc"; let cln_client = LightningRPC::new(&sock); - let cln_info = cln_client.getinfo().unwrap(); + let cln_info = { + loop { + let info = cln_client.getinfo().unwrap(); + // Wait for CLN to sync block height before channel open. + // Prevents crash due to unset blockheight (see LDK Node issue #527). + if info.blockheight > 0 { + break info; + } + std::thread::sleep(std::time::Duration::from_millis(250)); + } + }; let cln_node_id = PublicKey::from_str(&cln_info.id).unwrap(); let cln_address: SocketAddress = match cln_info.binding.first().unwrap() { NetworkAddress::Ipv4 { address, port } => {