Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,13 @@ struct TestChainMonitor {
Arc<KeyProvider>,
>,
>,
pub deferred: bool,
pub latest_monitors: Mutex<HashMap<ChannelId, LatestMonitorState>>,
}
impl TestChainMonitor {
pub fn new(
broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>,
persister: Arc<TestPersister>, keys: Arc<KeyProvider>,
persister: Arc<TestPersister>, keys: Arc<KeyProvider>, deferred: bool,
) -> Self {
Self {
chain_monitor: Arc::new(chainmonitor::ChainMonitor::new(
Expand All @@ -282,13 +283,44 @@ impl TestChainMonitor {
Arc::clone(&persister),
Arc::clone(&keys),
keys.get_peer_storage_key(),
deferred,
)),
logger,
keys,
persister,
deferred,
latest_monitors: Mutex::new(new_hash_map()),
}
}

/// Flushes all deferred monitor operations and, if the persister reports success, promotes
/// pending monitor states to persisted in our shadow records. `TestChainMonitor` maintains
/// its own `latest_monitors` map that tracks serialized monitor snapshots independently of
/// `ChainMonitor`, so that the fuzzer can simulate node restarts by deserializing from these
/// snapshots rather than relying on the persister's storage.
///
/// This simulates the pattern of snapshotting the pending count, persisting the
/// `ChannelManager`, then flushing the queued monitor writes.
fn flush_and_update_latest_monitors(&self) {
let count = self.chain_monitor.pending_operation_count();
if count == 0 {
return;
}
// Execute all queued watch_channel/update_channel operations inside the ChainMonitor.
self.chain_monitor.flush(count, &self.logger);
let persister_res = *self.persister.update_ret.lock().unwrap();
// Only update our local tracking state when the persister signals completion. When
// persistence is still in-progress, the monitors stay in the pending set so that a
// simulated restart can still reload from the last fully-persisted snapshot.
if persister_res == chain::ChannelMonitorUpdateStatus::Completed {
for (_channel_id, state) in self.latest_monitors.lock().unwrap().iter_mut() {
if let Some((id, data)) = state.pending_monitors.drain(..).last() {
state.persisted_monitor_id = id;
state.persisted_monitor = data;
}
}
}
}
}
impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn watch_channel(
Expand All @@ -298,6 +330,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
monitor.write(&mut ser).unwrap();
let monitor_id = monitor.get_latest_update_id();
let res = self.chain_monitor.watch_channel(channel_id, monitor);
if self.deferred {
assert_eq!(res, Ok(chain::ChannelMonitorUpdateStatus::InProgress));
}
let state = match res {
Ok(chain::ChannelMonitorUpdateStatus::Completed) => LatestMonitorState {
persisted_monitor_id: monitor_id,
Expand Down Expand Up @@ -347,6 +382,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
let mut ser = VecWriter(Vec::new());
deserialized_monitor.write(&mut ser).unwrap();
let res = self.chain_monitor.update_channel(channel_id, update);
if self.deferred {
assert_eq!(res, chain::ChannelMonitorUpdateStatus::InProgress);
}
match res {
chain::ChannelMonitorUpdateStatus::Completed => {
map_entry.persisted_monitor_id = update.update_id;
Expand All @@ -363,6 +401,9 @@ impl chain::Watch<TestChannelSigner> for TestChainMonitor {
fn release_pending_monitor_events(
&self,
) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, PublicKey)> {
if self.deferred {
self.flush_and_update_latest_monitors();
}
return self.chain_monitor.release_pending_monitor_events();
}
}
Expand Down Expand Up @@ -892,6 +933,11 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
ChannelMonitorUpdateStatus::Completed
}),
];
let deferred = [
initial_mon_styles & 0b001_000 != 0,
initial_mon_styles & 0b010_000 != 0,
initial_mon_styles & 0b100_000 != 0,
];

let mut chain_state = ChainState::new();
let mut node_height_a: u32 = 0;
Expand Down Expand Up @@ -920,6 +966,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
update_ret: Mutex::new(mon_style[$node_id as usize].borrow().clone()),
}),
Arc::clone(&keys_manager),
deferred[$node_id as usize],
));

let mut config = UserConfig::default();
Expand Down Expand Up @@ -972,6 +1019,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed),
}),
Arc::clone(keys),
deferred[node_id as usize],
));

let mut config = UserConfig::default();
Expand Down Expand Up @@ -1038,18 +1086,28 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
let manager =
<(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager");
let res = (manager.1, chain_monitor.clone());
let expected_status = if deferred[node_id as usize] {
ChannelMonitorUpdateStatus::InProgress
} else {
ChannelMonitorUpdateStatus::Completed
};
for (channel_id, mon) in monitors.drain() {
assert_eq!(
chain_monitor.chain_monitor.watch_channel(channel_id, mon),
Ok(ChannelMonitorUpdateStatus::Completed)
Ok(expected_status)
);
}
if deferred[node_id as usize] {
let count = chain_monitor.chain_monitor.pending_operation_count();
chain_monitor.chain_monitor.flush(count, &chain_monitor.logger);
}
*chain_monitor.persister.update_ret.lock().unwrap() = *mon_style[node_id as usize].borrow();
res
};

macro_rules! complete_all_pending_monitor_updates {
($monitor: expr) => {{
$monitor.flush_and_update_latest_monitors();
for (channel_id, state) in $monitor.latest_monitors.lock().unwrap().iter_mut() {
for (id, data) in state.pending_monitors.drain(..) {
$monitor.chain_monitor.channel_monitor_updated(*channel_id, id).unwrap();
Expand Down Expand Up @@ -1977,6 +2035,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
|monitor: &Arc<TestChainMonitor>,
chan_funding,
compl_selector: &dyn Fn(&mut Vec<(u64, Vec<u8>)>) -> Option<(u64, Vec<u8>)>| {
monitor.flush_and_update_latest_monitors();
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_funding) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
Expand All @@ -1993,6 +2052,7 @@ pub fn do_test<Out: Output + MaybeSend + MaybeSync>(
};

let complete_all_monitor_updates = |monitor: &Arc<TestChainMonitor>, chan_id| {
monitor.flush_and_update_latest_monitors();
if let Some(state) = monitor.latest_monitors.lock().unwrap().get_mut(chan_id) {
assert!(
state.pending_monitors.windows(2).all(|pair| pair[0].0 < pair[1].0),
Expand Down
1 change: 1 addition & 0 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ pub fn do_test(mut data: &[u8], logger: &Arc<dyn Logger + MaybeSend + MaybeSync>
Arc::new(TestPersister { update_ret: Mutex::new(ChannelMonitorUpdateStatus::Completed) }),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
false,
));

let network = Network::Bitcoin;
Expand Down
1 change: 1 addition & 0 deletions fuzz/src/lsps_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub fn do_test(data: &[u8]) {
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
false,
));
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
Expand Down
85 changes: 83 additions & 2 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1118,9 +1118,18 @@ where
None => {},
}

// We capture pending_operation_count inside the persistence branch to
// avoid a race: ChannelManager handlers queue deferred monitor ops
// before the persistence flag is set. Capturing outside would let us
// observe pending ops while the flag is still unset, causing us to
// flush monitor writes without persisting the ChannelManager.
// Declared before futures so it outlives the Joiner (drop order).
let pending_monitor_writes;

let mut futures = Joiner::new();

if channel_manager.get_cm().get_and_clear_needs_persistence() {
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
log_trace!(logger, "Persisting ChannelManager...");

let fut = async {
Expand All @@ -1131,7 +1140,12 @@ where
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)
.await
.await?;

// Flush monitor operations that were pending before we persisted. New updates
// that arrived after are left for the next iteration.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);
Ok(())
};
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
let mut fut = Box::pin(fut);
Expand Down Expand Up @@ -1373,6 +1387,7 @@ where
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
kv_store
.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1381,6 +1396,10 @@ where
channel_manager.get_cm().encode(),
)
.await?;

// Flush monitor operations that were pending before final persistence.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(ref scorer) = scorer {
kv_store
.write(
Expand Down Expand Up @@ -1684,7 +1703,17 @@ impl BackgroundProcessor {
channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = Instant::now();
}

// We capture pending_operation_count inside the persistence
// branch to avoid a race: ChannelManager handlers queue
// deferred monitor ops before the persistence flag is set.
// Capturing outside would let us observe pending ops while the
// flag is still unset, causing us to flush monitor writes
// without persisting the ChannelManager.
let mut pending_monitor_writes = 0;

if channel_manager.get_cm().get_and_clear_needs_persistence() {
pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
log_trace!(logger, "Persisting ChannelManager...");
(kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand All @@ -1695,6 +1724,10 @@ impl BackgroundProcessor {
log_trace!(logger, "Done persisting ChannelManager.");
}

// Flush monitor operations that were pending before we persisted. New
// updates that arrived after are left for the next iteration.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
log_trace!(logger, "Persisting LiquidityManager...");
let _ = liquidity_manager.get_lm().persist().map_err(|e| {
Expand Down Expand Up @@ -1809,12 +1842,17 @@ impl BackgroundProcessor {
// After we exit, ensure we persist the ChannelManager one final time - this avoids
// some races where users quit while channel updates were in-flight, with
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
let pending_monitor_writes = chain_monitor.get_cm().pending_operation_count();
kv_store.write(
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
channel_manager.get_cm().encode(),
)?;

// Flush monitor operations that were pending before final persistence.
chain_monitor.get_cm().flush(pending_monitor_writes, &logger);

if let Some(ref scorer) = scorer {
kv_store.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
Expand Down Expand Up @@ -1896,9 +1934,10 @@ mod tests {
use bitcoin::transaction::{Transaction, TxOut};
use bitcoin::{Amount, ScriptBuf, Txid};
use core::sync::atomic::{AtomicBool, Ordering};
use lightning::chain::chainmonitor;
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::transaction::OutPoint;
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
use lightning::chain::{BestBlock, Confirm, Filter};
use lightning::events::{Event, PathFailure, ReplayEvent};
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{
Expand Down Expand Up @@ -2444,6 +2483,7 @@ mod tests {
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
keys_manager.get_peer_storage_key(),
true,
));
let best_block = BestBlock::from_network(network);
let params = ChainParameters { network, best_block };
Expand Down Expand Up @@ -2567,6 +2607,8 @@ mod tests {
(persist_dir, nodes)
}

/// Opens a channel between two nodes without a running `BackgroundProcessor`,
/// so deferred monitor operations are flushed manually at each step.
macro_rules! open_channel {
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
begin_open_channel!($node_a, $node_b, $channel_value);
Expand All @@ -2582,19 +2624,31 @@ mod tests {
tx.clone(),
)
.unwrap();
// funding_transaction_generated does not call watch_channel, so no
// deferred op is queued and FundingCreated is available immediately.
let msg_a = get_event_msg!(
$node_a,
MessageSendEvent::SendFundingCreated,
$node_b.node.get_our_node_id()
);
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
// Flush node_b's new monitor (watch_channel) so it releases the
// FundingSigned message.
$node_b
.chain_monitor
.flush($node_b.chain_monitor.pending_operation_count(), &$node_b.logger);
get_event!($node_b, Event::ChannelPending);
let msg_b = get_event_msg!(
$node_b,
MessageSendEvent::SendFundingSigned,
$node_a.node.get_our_node_id()
);
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
// Flush node_a's new monitor (watch_channel) queued by
// handle_funding_signed.
$node_a
.chain_monitor
.flush($node_a.chain_monitor.pending_operation_count(), &$node_a.logger);
get_event!($node_a, Event::ChannelPending);
tx
}};
Expand Down Expand Up @@ -2720,6 +2774,20 @@ mod tests {
confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
}

/// Waits until the background processor has flushed all pending deferred monitor
/// operations for the given node. Panics if the pending count does not reach zero
/// within `EVENT_DEADLINE`.
fn wait_for_flushed(chain_monitor: &ChainMonitor) {
let start = std::time::Instant::now();
while chain_monitor.pending_operation_count() > 0 {
assert!(
start.elapsed() < EVENT_DEADLINE,
"Pending monitor operations were not flushed within deadline"
);
std::thread::sleep(Duration::from_millis(10));
}
}

#[test]
fn test_background_processor() {
// Test that when a new channel is created, the ChannelManager needs to be re-persisted with
Expand Down Expand Up @@ -3060,11 +3128,21 @@ mod tests {
.node
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
.unwrap();
// funding_transaction_generated does not call watch_channel, so no deferred op is
// queued and the FundingCreated message is available immediately.
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
// Node 1 has no bg processor, flush its new monitor (watch_channel) manually so
// events and FundingSigned are released.
nodes[1]
.chain_monitor
.flush(nodes[1].chain_monitor.pending_operation_count(), &nodes[1].logger);
get_event!(nodes[1], Event::ChannelPending);
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
// Wait for the bg processor to flush the new monitor (watch_channel) queued by
// handle_funding_signed.
wait_for_flushed(&nodes[0].chain_monitor);
channel_pending_recv
.recv_timeout(EVENT_DEADLINE)
.expect("ChannelPending not handled within deadline");
Expand Down Expand Up @@ -3125,6 +3203,9 @@ mod tests {
error_message.to_string(),
)
.unwrap();
// Wait for the bg processor to flush the monitor update triggered by force close
// so the commitment tx is broadcast.
wait_for_flushed(&nodes[0].chain_monitor);
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);

Expand Down
Loading
Loading