Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1591,12 +1591,12 @@ fn build_with_store_internal(
};

let (stop_sender, _) = tokio::sync::watch::channel(());
let (event_handling_stopped_sender, _) = tokio::sync::watch::channel(());
let background_processor_task = Mutex::new(None);

Ok(Node {
runtime,
stop_sender,
event_handling_stopped_sender,
background_processor_task,
config,
wallet,
chain_source,
Expand Down
2 changes: 1 addition & 1 deletion src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use std::time::{Duration, Instant};

const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5;
const ELECTRUM_CLIENT_NUM_RETRIES: u8 = 3;
const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 20;
const ELECTRUM_CLIENT_TIMEOUT_SECS: u8 = 10;

pub(crate) struct ElectrumRuntimeClient {
electrum_client: Arc<ElectrumClient>,
Expand Down
7 changes: 5 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,13 @@ pub(crate) const NODE_ANN_BCAST_INTERVAL: Duration = Duration::from_secs(60 * 60
pub(crate) const WALLET_SYNC_INTERVAL_MINIMUM_SECS: u64 = 10;

// The timeout after which we abort a wallet syncing operation.
pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 90;
pub(crate) const BDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 20;

// The timeout after which we abort a wallet syncing operation.
pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 30;
pub(crate) const LDK_WALLET_SYNC_TIMEOUT_SECS: u64 = 10;

// The timeout after which we give up waiting on LDK's event handler to exit on shutdown.
pub(crate) const LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS: u64 = 30;

// The timeout after which we abort a fee rate cache update operation.
pub(crate) const FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS: u64 = 5;
Expand Down
100 changes: 44 additions & 56 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ pub use builder::NodeBuilder as Builder;

use chain::ChainSource;
use config::{
default_user_config, may_announce_channel, ChannelConfig, Config, NODE_ANN_BCAST_INTERVAL,
PEER_RECONNECTION_INTERVAL, RGS_SYNC_INTERVAL,
default_user_config, may_announce_channel, ChannelConfig, Config,
LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, NODE_ANN_BCAST_INTERVAL, PEER_RECONNECTION_INTERVAL,
RGS_SYNC_INTERVAL,
};
use connection::ConnectionManager;
use event::{EventHandler, EventQueue};
Expand All @@ -146,9 +147,7 @@ use types::{
};
pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, UserChannelId};

#[cfg(tokio_unstable)]
use logger::log_trace;
use logger::{log_debug, log_error, log_info, LdkLogger, Logger};
use logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};

use lightning::chain::BestBlock;
use lightning::events::bump_transaction::Wallet as LdkWallet;
Expand Down Expand Up @@ -179,7 +178,7 @@ uniffi::include_scaffolding!("ldk_node");
pub struct Node {
runtime: Arc<RwLock<Option<Arc<tokio::runtime::Runtime>>>>,
stop_sender: tokio::sync::watch::Sender<()>,
event_handling_stopped_sender: tokio::sync::watch::Sender<()>,
background_processor_task: Mutex<Option<tokio::task::JoinHandle<()>>>,
config: Arc<Config>,
wallet: Arc<Wallet>,
chain_source: Arc<ChainSource>,
Expand Down Expand Up @@ -577,9 +576,7 @@ impl Node {
})
};

let background_stop_logger = Arc::clone(&self.logger);
let event_handling_stopped_sender = self.event_handling_stopped_sender.clone();
runtime.spawn(async move {
let handle = runtime.spawn(async move {
process_events_async(
background_persister,
|e| background_event_handler.handle_event(e),
Expand All @@ -599,20 +596,9 @@ impl Node {
log_error!(background_error_logger, "Failed to process events: {}", e);
panic!("Failed to process events");
});
log_debug!(background_stop_logger, "Events processing stopped.",);

match event_handling_stopped_sender.send(()) {
Ok(_) => (),
Err(e) => {
log_error!(
background_stop_logger,
"Failed to send 'events handling stopped' signal. This should never happen: {}",
e
);
debug_assert!(false);
},
}
});
debug_assert!(self.background_processor_task.lock().unwrap().is_none());
*self.background_processor_task.lock().unwrap() = Some(handle);

if let Some(liquidity_source) = self.liquidity_source.as_ref() {
let mut stop_liquidity_handler = self.stop_sender.subscribe();
Expand Down Expand Up @@ -655,7 +641,7 @@ impl Node {

// Stop the runtime.
match self.stop_sender.send(()) {
Ok(_) => (),
Ok(_) => log_trace!(self.logger, "Sent shutdown signal to background tasks."),
Err(e) => {
log_error!(
self.logger,
Expand All @@ -668,43 +654,45 @@ impl Node {

// Disconnect all peers.
self.peer_manager.disconnect_all_peers();
log_debug!(self.logger, "Disconnected all network peers.");

// Wait until event handling stopped, at least until a timeout is reached.
let event_handling_stopped_logger = Arc::clone(&self.logger);
let mut event_handling_stopped_receiver = self.event_handling_stopped_sender.subscribe();

// FIXME: For now, we wait up to 100 secs (BDK_WALLET_SYNC_TIMEOUT_SECS + 10) to allow
// event handling to exit gracefully even if it was blocked on the BDK wallet syncing. We
// should drop this considerably post upgrading to BDK 1.0.
let timeout_res = tokio::task::block_in_place(move || {
runtime.block_on(async {
tokio::time::timeout(
Duration::from_secs(100),
event_handling_stopped_receiver.changed(),
)
.await
})
});
// Stop any runtime-dependant chain sources.
self.chain_source.stop();
log_debug!(self.logger, "Stopped chain sources.");

// Wait until background processing stopped, at least until a timeout is reached.
if let Some(background_processor_task) =
self.background_processor_task.lock().unwrap().take()
{
let abort_handle = background_processor_task.abort_handle();
let timeout_res = tokio::task::block_in_place(move || {
runtime.block_on(async {
tokio::time::timeout(
Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS),
background_processor_task,
)
.await
})
});

match timeout_res {
Ok(stop_res) => match stop_res {
Ok(()) => {},
match timeout_res {
Ok(stop_res) => match stop_res {
Ok(()) => log_debug!(self.logger, "Stopped background processing of events."),
Err(e) => {
abort_handle.abort();
log_error!(
self.logger,
"Stopping event handling failed. This should never happen: {}",
e
);
panic!("Stopping event handling failed. This should never happen.");
},
},
Err(e) => {
log_error!(
event_handling_stopped_logger,
"Stopping event handling failed. This should never happen: {}",
e
);
panic!("Stopping event handling failed. This should never happen.");
abort_handle.abort();
log_error!(self.logger, "Stopping event handling timed out: {}", e);
},
},
Err(e) => {
log_error!(
event_handling_stopped_logger,
"Stopping event handling timed out: {}",
e
);
},
}
}

#[cfg(tokio_unstable)]
Expand Down
Loading