From 65b5e2b78fd536bc4e4f783548973aaa0ec394ea Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sat, 28 Jun 2025 15:24:59 +0530 Subject: [PATCH 1/8] Write structs to serialise-deserialise Channels inside Peer-storage 'PeerStorageMonitorHolder' is used to wrap a single ChannelMonitor, here we are adding some fields separetly so that we do not need to read the whole ChannelMonitor to identify if we have lost some states. `PeerStorageMonitorHolderList` is used to keep the list of all the channels which would be sent over the wire inside Peer Storage. --- lightning/src/ln/our_peer_storage.rs | 30 ++++++++++++++++++++++++++++ lightning/src/util/ser.rs | 1 + 2 files changed, 31 insertions(+) diff --git a/lightning/src/ln/our_peer_storage.rs b/lightning/src/ln/our_peer_storage.rs index 430c9f559f9..178637430b1 100644 --- a/lightning/src/ln/our_peer_storage.rs +++ b/lightning/src/ln/our_peer_storage.rs @@ -13,7 +13,9 @@ use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::{Hash, HashEngine, Hmac, HmacEngine}; +use bitcoin::secp256k1::PublicKey; +use crate::ln::types::ChannelId; use crate::sign::PeerStorageKey; use crate::crypto::chacha20poly1305rfc::ChaCha20Poly1305RFC; @@ -146,6 +148,34 @@ fn derive_nonce(key: &PeerStorageKey, random_bytes: &[u8]) -> [u8; 12] { nonce } +/// [`PeerStorageMonitorHolder`] represents a single channel sent over the wire. +/// This would be used inside [`ChannelManager`] to determine +/// if the user has lost channel states so that we can do something about it. +/// +/// The main idea here is to just enable node to figure out that it has lost some data +/// using peer storage backups. +/// +/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +/// +/// TODO(aditya): Write FundRecoverer to use `monitor_bytes` to drop onchain. +pub(crate) struct PeerStorageMonitorHolder { + /// Channel Id of the channel. + pub(crate) channel_id: ChannelId, + /// Node Id of the channel partner. + pub(crate) counterparty_node_id: PublicKey, + /// Minimum seen secret to determine if we have lost state. + pub(crate) min_seen_secret: u64, + /// Whole serialised ChannelMonitor to recover funds. + pub(crate) monitor_bytes: Vec, +} + +impl_writeable_tlv_based!(PeerStorageMonitorHolder, { + (0, channel_id, required), + (2, counterparty_node_id, required), + (4, min_seen_secret, required), + (6, monitor_bytes, required_vec), +}); + #[cfg(test)] mod tests { use crate::ln::our_peer_storage::{derive_nonce, DecryptedOurPeerStorage}; diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index ac2b529f0bd..95554b10d0e 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -1084,6 +1084,7 @@ impl_for_vec!((A, B), A, B); impl_for_vec!(SerialId); impl_for_vec!(NegotiatedTxInput); impl_for_vec!(InteractiveTxOutput); +impl_for_vec!(crate::ln::our_peer_storage::PeerStorageMonitorHolder); impl_writeable_for_vec!(&crate::routing::router::BlindedTail); impl_readable_for_vec!(crate::routing::router::BlindedTail); impl_for_vec!(crate::routing::router::TrampolineHop); From 28d7874cb9d8ec82f3636f9197f722d8516c0c9a Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Thu, 17 Jul 2025 00:33:31 +0530 Subject: [PATCH 2/8] Remove #[rustfmt::skip] from fn write Fixed formatting for write() in ChannelMonitorImpl. This would make the next commit cleaner by ensuring it only contains direct code shifts, without unrelated formatting changes. --- lightning/src/chain/channelmonitor.rs | 57 ++++++++++++++++++--------- 1 file changed, 39 insertions(+), 18 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index ddb1e31f645..777db413f4c 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1357,7 +1357,6 @@ const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; impl Writeable for ChannelMonitorImpl { - #[rustfmt::skip] fn write(&self, writer: &mut W) -> Result<(), Error> { write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); @@ -1367,7 +1366,9 @@ impl Writeable for ChannelMonitorImpl { U48(self.commitment_transaction_number_obscure_factor).write(writer)?; self.destination_script.write(writer)?; - if let Some(ref broadcasted_holder_revokable_script) = self.broadcasted_holder_revokable_script { + if let Some(ref broadcasted_holder_revokable_script) = + self.broadcasted_holder_revokable_script + { writer.write_all(&[0; 1])?; broadcasted_holder_revokable_script.0.write(writer)?; broadcasted_holder_revokable_script.1.write(writer)?; @@ -1430,27 +1431,34 @@ impl Writeable for ChannelMonitorImpl { } } - writer.write_all(&(self.funding.counterparty_claimable_outpoints.len() as u64).to_be_bytes())?; + writer.write_all( + &(self.funding.counterparty_claimable_outpoints.len() as u64).to_be_bytes(), + )?; for (ref txid, ref htlc_infos) in self.funding.counterparty_claimable_outpoints.iter() { writer.write_all(&txid[..])?; writer.write_all(&(htlc_infos.len() as u64).to_be_bytes())?; for &(ref htlc_output, ref htlc_source) in htlc_infos.iter() { - debug_assert!(htlc_source.is_none() || Some(**txid) == self.funding.current_counterparty_commitment_txid + debug_assert!( + htlc_source.is_none() + || Some(**txid) == self.funding.current_counterparty_commitment_txid || Some(**txid) == self.funding.prev_counterparty_commitment_txid, - "HTLC Sources for all revoked commitment transactions should be none!"); + "HTLC Sources for all revoked commitment transactions should be none!" + ); serialize_htlc_in_commitment!(htlc_output); htlc_source.as_ref().map(|b| b.as_ref()).write(writer)?; } } - writer.write_all(&(self.counterparty_commitment_txn_on_chain.len() as u64).to_be_bytes())?; + writer + .write_all(&(self.counterparty_commitment_txn_on_chain.len() as u64).to_be_bytes())?; for (ref txid, commitment_number) in self.counterparty_commitment_txn_on_chain.iter() { writer.write_all(&txid[..])?; writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; } writer.write_all(&(self.counterparty_hash_commitment_number.len() as u64).to_be_bytes())?; - for (ref payment_hash, commitment_number) in self.counterparty_hash_commitment_number.iter() { + for (ref payment_hash, commitment_number) in self.counterparty_hash_commitment_number.iter() + { writer.write_all(&payment_hash.0[..])?; writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; } @@ -1458,17 +1466,22 @@ impl Writeable for ChannelMonitorImpl { if let Some(holder_commitment_tx) = &self.funding.prev_holder_commitment_tx { writer.write_all(&[1; 1])?; write_legacy_holder_commitment_data( - writer, holder_commitment_tx, &self.prev_holder_htlc_data.as_ref().unwrap(), + writer, + holder_commitment_tx, + &self.prev_holder_htlc_data.as_ref().unwrap(), )?; } else { writer.write_all(&[0; 1])?; } write_legacy_holder_commitment_data( - writer, &self.funding.current_holder_commitment_tx, &self.current_holder_htlc_data, + writer, + &self.funding.current_holder_commitment_tx, + &self.current_holder_htlc_data, )?; - writer.write_all(&byte_utils::be48_to_array(self.current_counterparty_commitment_number))?; + writer + .write_all(&byte_utils::be48_to_array(self.current_counterparty_commitment_number))?; writer.write_all(&byte_utils::be48_to_array(self.current_holder_commitment_number))?; writer.write_all(&(self.payment_preimages.len() as u64).to_be_bytes())?; @@ -1476,12 +1489,19 @@ impl Writeable for ChannelMonitorImpl { writer.write_all(&payment_preimage.0[..])?; } - writer.write_all(&(self.pending_monitor_events.iter().filter(|ev| match ev { - MonitorEvent::HTLCEvent(_) => true, - MonitorEvent::HolderForceClosed(_) => true, - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }).count() as u64).to_be_bytes())?; + writer.write_all( + &(self + .pending_monitor_events + .iter() + .filter(|ev| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::HolderForceClosed(_) => true, + MonitorEvent::HolderForceClosedWithInfo { .. } => true, + _ => false, + }) + .count() as u64) + .to_be_bytes(), + )?; for event in self.pending_monitor_events.iter() { match event { MonitorEvent::HTLCEvent(upd) => { @@ -1505,7 +1525,8 @@ impl Writeable for ChannelMonitorImpl { self.best_block.block_hash.write(writer)?; writer.write_all(&self.best_block.height.to_be_bytes())?; - writer.write_all(&(self.onchain_events_awaiting_threshold_conf.len() as u64).to_be_bytes())?; + writer + .write_all(&(self.onchain_events_awaiting_threshold_conf.len() as u64).to_be_bytes())?; for ref entry in self.onchain_events_awaiting_threshold_conf.iter() { entry.write(writer)?; } @@ -1533,7 +1554,7 @@ impl Writeable for ChannelMonitorImpl { let mut pending_monitor_events = self.pending_monitor_events.clone(); pending_monitor_events.push(MonitorEvent::HolderForceClosed(*outpoint)); pending_monitor_events - } + }, _ => self.pending_monitor_events.clone(), }; From 341fe0af775219d60ee9ac14b5a3b6ffc8dbb39d Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Thu, 17 Jul 2025 01:12:30 +0530 Subject: [PATCH 3/8] Serialise ChannelMonitors and send them over inside Peer Storage Create a utililty function to prevent code duplication while writing ChannelMonitors. Serialise them inside ChainMonitor::send_peer_storage and send them over. Cfg-tag the sending logic because we are unsure of what to omit from ChannelMonitors stored inside peer-storage. --- Cargo.toml | 1 + ci/ci-tests.sh | 2 + lightning/src/chain/chainmonitor.rs | 54 +++- lightning/src/chain/channelmonitor.rs | 399 ++++++++++++++------------ 4 files changed, 263 insertions(+), 193 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3891b11a2b4..4bd5efdfbf6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,4 +68,5 @@ check-cfg = [ "cfg(splicing)", "cfg(async_payments)", "cfg(simple_close)", + "cfg(peer_storage)", ] diff --git a/ci/ci-tests.sh b/ci/ci-tests.sh index 2ab512e2d3e..1c8a53602c1 100755 --- a/ci/ci-tests.sh +++ b/ci/ci-tests.sh @@ -158,3 +158,5 @@ RUSTFLAGS="--cfg=async_payments" cargo test --verbose --color always -p lightnin RUSTFLAGS="--cfg=simple_close" cargo test --verbose --color always -p lightning [ "$CI_MINIMIZE_DISK_USAGE" != "" ] && cargo clean RUSTFLAGS="--cfg=lsps1_service" cargo test --verbose --color always -p lightning-liquidity +[ "$CI_MINIMIZE_DISK_USAGE" != "" ] && cargo clean +RUSTFLAGS="--cfg=peer_storage" cargo test --verbose --color always -p lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 0de372813b3..ef81749568c 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -29,15 +29,15 @@ use bitcoin::hash_types::{BlockHash, Txid}; use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::channelmonitor::{ - Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, TransactionOutputs, - WithChannelMonitor, + write_chanmon_internal, Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, + TransactionOutputs, WithChannelMonitor, }; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; use crate::events::{self, Event, EventHandler, ReplayEvent}; use crate::ln::channel_state::ChannelDetails; use crate::ln::msgs::{self, BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler}; -use crate::ln::our_peer_storage::DecryptedOurPeerStorage; +use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHolder}; use crate::ln::types::ChannelId; use crate::prelude::*; use crate::sign::ecdsa::EcdsaChannelSigner; @@ -47,6 +47,7 @@ use crate::types::features::{InitFeatures, NodeFeatures}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; use crate::util::persist::MonitorName; +use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; use bitcoin::secp256k1::PublicKey; use core::ops::Deref; @@ -810,10 +811,51 @@ where } fn send_peer_storage(&self, their_node_id: PublicKey) { - // TODO: Serialize `ChannelMonitor`s inside `our_peer_storage`. - + let mut monitors_list: Vec = Vec::new(); let random_bytes = self.entropy_source.get_secure_random_bytes(); - let serialised_channels = Vec::new(); + + #[cfg(peer_storage)] + { + const MAX_PEER_STORAGE_SIZE: usize = 65531; + const USIZE_LEN: usize = core::mem::size_of::(); + let random_usize = usize::from_le_bytes(random_bytes[0..USIZE_LEN].try_into().unwrap()); + let mut curr_size = 0; + let monitors = self.monitors.read().unwrap(); + // Randomising Keys in the HashMap to fetch monitors without repetition. + let mut keys: Vec<&ChannelId> = monitors.keys().collect(); + for i in (1..keys.len()).rev() { + let j = random_usize % (i + 1); + keys.swap(i, j); + } + + for chan_id in keys { + let mon = &monitors[chan_id]; + let mut ser_chan = VecWriter(Vec::new()); + let min_seen_secret = mon.monitor.get_min_seen_secret(); + let counterparty_node_id = mon.monitor.get_counterparty_node_id(); + let chan_mon = mon.monitor.inner.lock().unwrap(); + + write_chanmon_internal(&chan_mon, true, &mut ser_chan) + .expect("can not write Channel Monitor for peer storage message"); + + let peer_storage_monitor = PeerStorageMonitorHolder { + channel_id: *chan_id, + min_seen_secret, + counterparty_node_id, + monitor_bytes: ser_chan.0, + }; + + // Adding size of peer_storage_monitor. + curr_size += peer_storage_monitor.serialized_length(); + + if curr_size > MAX_PEER_STORAGE_SIZE { + break; + } + monitors_list.push(peer_storage_monitor); + } + } + + let serialised_channels = monitors_list.encode(); let our_peer_storage = DecryptedOurPeerStorage::new(serialised_channels); let cipher = our_peer_storage.encrypt(&self.our_peerstorage_encryption_key, &random_bytes); diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 777db413f4c..c5ff9a8d949 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1356,229 +1356,254 @@ impl Writeable for ChannelMonitor { const SERIALIZATION_VERSION: u8 = 1; const MIN_SERIALIZATION_VERSION: u8 = 1; -impl Writeable for ChannelMonitorImpl { - fn write(&self, writer: &mut W) -> Result<(), Error> { - write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); - - self.latest_update_id.write(writer)?; - - // Set in initial Channel-object creation, so should always be set by now: - U48(self.commitment_transaction_number_obscure_factor).write(writer)?; - - self.destination_script.write(writer)?; - if let Some(ref broadcasted_holder_revokable_script) = - self.broadcasted_holder_revokable_script - { - writer.write_all(&[0; 1])?; - broadcasted_holder_revokable_script.0.write(writer)?; - broadcasted_holder_revokable_script.1.write(writer)?; - broadcasted_holder_revokable_script.2.write(writer)?; - } else { - writer.write_all(&[1; 1])?; - } - - self.counterparty_payment_script.write(writer)?; - match &self.shutdown_script { - Some(script) => script.write(writer)?, - None => ScriptBuf::new().write(writer)?, - } +/// Utility function for writing [`ChannelMonitor`] to prevent code duplication in [`ChainMonitor`] while sending Peer Storage. +/// +/// NOTE: `is_stub` is true only when we are using this to serialise for Peer Storage. +/// +/// TODO: Determine which fields of each `ChannelMonitor` should be included in Peer Storage, and which should be omitted. +/// +/// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor +pub(crate) fn write_chanmon_internal( + channel_monitor: &ChannelMonitorImpl, _is_stub: bool, writer: &mut W, +) -> Result<(), Error> { + write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); - self.channel_keys_id.write(writer)?; - self.holder_revocation_basepoint.write(writer)?; - let funding_outpoint = self.get_funding_txo(); - writer.write_all(&funding_outpoint.txid[..])?; - writer.write_all(&funding_outpoint.index.to_be_bytes())?; - let redeem_script = self.funding.channel_parameters.make_funding_redeemscript(); - let script_pubkey = redeem_script.to_p2wsh(); - script_pubkey.write(writer)?; - self.funding.current_counterparty_commitment_txid.write(writer)?; - self.funding.prev_counterparty_commitment_txid.write(writer)?; - - self.counterparty_commitment_params.write(writer)?; - redeem_script.write(writer)?; - self.funding.channel_parameters.channel_value_satoshis.write(writer)?; + channel_monitor.latest_update_id.write(writer)?; - match self.their_cur_per_commitment_points { - Some((idx, pubkey, second_option)) => { - writer.write_all(&byte_utils::be48_to_array(idx))?; - writer.write_all(&pubkey.serialize())?; - match second_option { - Some(second_pubkey) => { - writer.write_all(&second_pubkey.serialize())?; - }, - None => { - writer.write_all(&[0; 33])?; - }, - } - }, - None => { - writer.write_all(&byte_utils::be48_to_array(0))?; - }, - } + // Set in initial Channel-object creation, so should always be set by now: + U48(channel_monitor.commitment_transaction_number_obscure_factor).write(writer)?; - writer.write_all(&self.on_holder_tx_csv.to_be_bytes())?; + channel_monitor.destination_script.write(writer)?; + if let Some(ref broadcasted_holder_revokable_script) = + channel_monitor.broadcasted_holder_revokable_script + { + writer.write_all(&[0; 1])?; + broadcasted_holder_revokable_script.0.write(writer)?; + broadcasted_holder_revokable_script.1.write(writer)?; + broadcasted_holder_revokable_script.2.write(writer)?; + } else { + writer.write_all(&[1; 1])?; + } + + channel_monitor.counterparty_payment_script.write(writer)?; + match &channel_monitor.shutdown_script { + Some(script) => script.write(writer)?, + None => ScriptBuf::new().write(writer)?, + } + + channel_monitor.channel_keys_id.write(writer)?; + channel_monitor.holder_revocation_basepoint.write(writer)?; + let funding_outpoint = channel_monitor.get_funding_txo(); + writer.write_all(&funding_outpoint.txid[..])?; + writer.write_all(&funding_outpoint.index.to_be_bytes())?; + let redeem_script = channel_monitor.funding.channel_parameters.make_funding_redeemscript(); + let script_pubkey = redeem_script.to_p2wsh(); + script_pubkey.write(writer)?; + channel_monitor.funding.current_counterparty_commitment_txid.write(writer)?; + channel_monitor.funding.prev_counterparty_commitment_txid.write(writer)?; + + channel_monitor.counterparty_commitment_params.write(writer)?; + redeem_script.write(writer)?; + channel_monitor.funding.channel_parameters.channel_value_satoshis.write(writer)?; + + match channel_monitor.their_cur_per_commitment_points { + Some((idx, pubkey, second_option)) => { + writer.write_all(&byte_utils::be48_to_array(idx))?; + writer.write_all(&pubkey.serialize())?; + match second_option { + Some(second_pubkey) => { + writer.write_all(&second_pubkey.serialize())?; + }, + None => { + writer.write_all(&[0; 33])?; + }, + } + }, + None => { + writer.write_all(&byte_utils::be48_to_array(0))?; + }, + } - self.commitment_secrets.write(writer)?; + writer.write_all(&channel_monitor.on_holder_tx_csv.to_be_bytes())?; - #[rustfmt::skip] - macro_rules! serialize_htlc_in_commitment { - ($htlc_output: expr) => { - writer.write_all(&[$htlc_output.offered as u8; 1])?; - writer.write_all(&$htlc_output.amount_msat.to_be_bytes())?; - writer.write_all(&$htlc_output.cltv_expiry.to_be_bytes())?; - writer.write_all(&$htlc_output.payment_hash.0[..])?; - $htlc_output.transaction_output_index.write(writer)?; - } - } + channel_monitor.commitment_secrets.write(writer)?; - writer.write_all( - &(self.funding.counterparty_claimable_outpoints.len() as u64).to_be_bytes(), - )?; - for (ref txid, ref htlc_infos) in self.funding.counterparty_claimable_outpoints.iter() { - writer.write_all(&txid[..])?; - writer.write_all(&(htlc_infos.len() as u64).to_be_bytes())?; - for &(ref htlc_output, ref htlc_source) in htlc_infos.iter() { - debug_assert!( - htlc_source.is_none() - || Some(**txid) == self.funding.current_counterparty_commitment_txid - || Some(**txid) == self.funding.prev_counterparty_commitment_txid, - "HTLC Sources for all revoked commitment transactions should be none!" - ); - serialize_htlc_in_commitment!(htlc_output); - htlc_source.as_ref().map(|b| b.as_ref()).write(writer)?; - } + #[rustfmt::skip] + macro_rules! serialize_htlc_in_commitment { + ($htlc_output: expr) => { + writer.write_all(&[$htlc_output.offered as u8; 1])?; + writer.write_all(&$htlc_output.amount_msat.to_be_bytes())?; + writer.write_all(&$htlc_output.cltv_expiry.to_be_bytes())?; + writer.write_all(&$htlc_output.payment_hash.0[..])?; + $htlc_output.transaction_output_index.write(writer)?; } + } - writer - .write_all(&(self.counterparty_commitment_txn_on_chain.len() as u64).to_be_bytes())?; - for (ref txid, commitment_number) in self.counterparty_commitment_txn_on_chain.iter() { - writer.write_all(&txid[..])?; - writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; + writer.write_all( + &(channel_monitor.funding.counterparty_claimable_outpoints.len() as u64).to_be_bytes(), + )?; + for (ref txid, ref htlc_infos) in + channel_monitor.funding.counterparty_claimable_outpoints.iter() + { + writer.write_all(&txid[..])?; + writer.write_all(&(htlc_infos.len() as u64).to_be_bytes())?; + for &(ref htlc_output, ref htlc_source) in htlc_infos.iter() { + debug_assert!( + htlc_source.is_none() + || Some(**txid) == channel_monitor.funding.current_counterparty_commitment_txid + || Some(**txid) == channel_monitor.funding.prev_counterparty_commitment_txid, + "HTLC Sources for all revoked commitment transactions should be none!" + ); + serialize_htlc_in_commitment!(htlc_output); + htlc_source.as_ref().map(|b| b.as_ref()).write(writer)?; } + } - writer.write_all(&(self.counterparty_hash_commitment_number.len() as u64).to_be_bytes())?; - for (ref payment_hash, commitment_number) in self.counterparty_hash_commitment_number.iter() - { - writer.write_all(&payment_hash.0[..])?; - writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; - } + writer.write_all( + &(channel_monitor.counterparty_commitment_txn_on_chain.len() as u64).to_be_bytes(), + )?; + for (ref txid, commitment_number) in channel_monitor.counterparty_commitment_txn_on_chain.iter() + { + writer.write_all(&txid[..])?; + writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; + } - if let Some(holder_commitment_tx) = &self.funding.prev_holder_commitment_tx { - writer.write_all(&[1; 1])?; - write_legacy_holder_commitment_data( - writer, - holder_commitment_tx, - &self.prev_holder_htlc_data.as_ref().unwrap(), - )?; - } else { - writer.write_all(&[0; 1])?; - } + writer.write_all( + &(channel_monitor.counterparty_hash_commitment_number.len() as u64).to_be_bytes(), + )?; + for (ref payment_hash, commitment_number) in + channel_monitor.counterparty_hash_commitment_number.iter() + { + writer.write_all(&payment_hash.0[..])?; + writer.write_all(&byte_utils::be48_to_array(*commitment_number))?; + } + if let Some(holder_commitment_tx) = &channel_monitor.funding.prev_holder_commitment_tx { + writer.write_all(&[1; 1])?; write_legacy_holder_commitment_data( writer, - &self.funding.current_holder_commitment_tx, - &self.current_holder_htlc_data, + holder_commitment_tx, + &channel_monitor.prev_holder_htlc_data.as_ref().unwrap(), )?; + } else { + writer.write_all(&[0; 1])?; + } - writer - .write_all(&byte_utils::be48_to_array(self.current_counterparty_commitment_number))?; - writer.write_all(&byte_utils::be48_to_array(self.current_holder_commitment_number))?; + write_legacy_holder_commitment_data( + writer, + &channel_monitor.funding.current_holder_commitment_tx, + &channel_monitor.current_holder_htlc_data, + )?; - writer.write_all(&(self.payment_preimages.len() as u64).to_be_bytes())?; - for (payment_preimage, _) in self.payment_preimages.values() { - writer.write_all(&payment_preimage.0[..])?; - } + writer.write_all(&byte_utils::be48_to_array( + channel_monitor.current_counterparty_commitment_number, + ))?; + writer + .write_all(&byte_utils::be48_to_array(channel_monitor.current_holder_commitment_number))?; - writer.write_all( - &(self - .pending_monitor_events - .iter() - .filter(|ev| match ev { - MonitorEvent::HTLCEvent(_) => true, - MonitorEvent::HolderForceClosed(_) => true, - MonitorEvent::HolderForceClosedWithInfo { .. } => true, - _ => false, - }) - .count() as u64) - .to_be_bytes(), - )?; - for event in self.pending_monitor_events.iter() { - match event { - MonitorEvent::HTLCEvent(upd) => { - 0u8.write(writer)?; - upd.write(writer)?; - }, - MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, - // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep - // backwards compatibility, we write a `HolderForceClosed` event along with the - // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. - MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, - _ => {}, // Covered in the TLV writes below - } - } + writer.write_all(&(channel_monitor.payment_preimages.len() as u64).to_be_bytes())?; + for (payment_preimage, _) in channel_monitor.payment_preimages.values() { + writer.write_all(&payment_preimage.0[..])?; + } - writer.write_all(&(self.pending_events.len() as u64).to_be_bytes())?; - for event in self.pending_events.iter() { - event.write(writer)?; + writer.write_all( + &(channel_monitor + .pending_monitor_events + .iter() + .filter(|ev| match ev { + MonitorEvent::HTLCEvent(_) => true, + MonitorEvent::HolderForceClosed(_) => true, + MonitorEvent::HolderForceClosedWithInfo { .. } => true, + _ => false, + }) + .count() as u64) + .to_be_bytes(), + )?; + for event in channel_monitor.pending_monitor_events.iter() { + match event { + MonitorEvent::HTLCEvent(upd) => { + 0u8.write(writer)?; + upd.write(writer)?; + }, + MonitorEvent::HolderForceClosed(_) => 1u8.write(writer)?, + // `HolderForceClosedWithInfo` replaced `HolderForceClosed` in v0.0.122. To keep + // backwards compatibility, we write a `HolderForceClosed` event along with the + // `HolderForceClosedWithInfo` event. This is deduplicated in the reader. + MonitorEvent::HolderForceClosedWithInfo { .. } => 1u8.write(writer)?, + _ => {}, // Covered in the TLV writes below } + } - self.best_block.block_hash.write(writer)?; - writer.write_all(&self.best_block.height.to_be_bytes())?; + writer.write_all(&(channel_monitor.pending_events.len() as u64).to_be_bytes())?; + for event in channel_monitor.pending_events.iter() { + event.write(writer)?; + } - writer - .write_all(&(self.onchain_events_awaiting_threshold_conf.len() as u64).to_be_bytes())?; - for ref entry in self.onchain_events_awaiting_threshold_conf.iter() { - entry.write(writer)?; - } + channel_monitor.best_block.block_hash.write(writer)?; + writer.write_all(&channel_monitor.best_block.height.to_be_bytes())?; - (self.outputs_to_watch.len() as u64).write(writer)?; - for (txid, idx_scripts) in self.outputs_to_watch.iter() { - txid.write(writer)?; - (idx_scripts.len() as u64).write(writer)?; - for (idx, script) in idx_scripts.iter() { - idx.write(writer)?; - script.write(writer)?; - } + writer.write_all( + &(channel_monitor.onchain_events_awaiting_threshold_conf.len() as u64).to_be_bytes(), + )?; + for ref entry in channel_monitor.onchain_events_awaiting_threshold_conf.iter() { + entry.write(writer)?; + } + + (channel_monitor.outputs_to_watch.len() as u64).write(writer)?; + for (txid, idx_scripts) in channel_monitor.outputs_to_watch.iter() { + txid.write(writer)?; + (idx_scripts.len() as u64).write(writer)?; + for (idx, script) in idx_scripts.iter() { + idx.write(writer)?; + script.write(writer)?; } - self.onchain_tx_handler.write(writer)?; + } - self.lockdown_from_offchain.write(writer)?; - self.holder_tx_signed.write(writer)?; + channel_monitor.onchain_tx_handler.write(writer)?; - // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` for backwards compatibility. - let pending_monitor_events = match self.pending_monitor_events.iter().find(|ev| match ev { + channel_monitor.lockdown_from_offchain.write(writer)?; + channel_monitor.holder_tx_signed.write(writer)?; + + // If we have a `HolderForceClosedWithInfo` event, we need to write the `HolderForceClosed` for backwards compatibility. + let pending_monitor_events = + match channel_monitor.pending_monitor_events.iter().find(|ev| match ev { MonitorEvent::HolderForceClosedWithInfo { .. } => true, _ => false, }) { Some(MonitorEvent::HolderForceClosedWithInfo { outpoint, .. }) => { - let mut pending_monitor_events = self.pending_monitor_events.clone(); + let mut pending_monitor_events = channel_monitor.pending_monitor_events.clone(); pending_monitor_events.push(MonitorEvent::HolderForceClosed(*outpoint)); pending_monitor_events }, - _ => self.pending_monitor_events.clone(), + _ => channel_monitor.pending_monitor_events.clone(), }; - write_tlv_fields!(writer, { - (1, self.funding_spend_confirmed, option), - (3, self.htlcs_resolved_on_chain, required_vec), - (5, pending_monitor_events, required_vec), - (7, self.funding_spend_seen, required), - (9, self.counterparty_node_id, required), - (11, self.confirmed_commitment_tx_counterparty_output, option), - (13, self.spendable_txids_confirmed, required_vec), - (15, self.counterparty_fulfilled_htlcs, required), - (17, self.initial_counterparty_commitment_info, option), - (19, self.channel_id, required), - (21, self.balances_empty_height, option), - (23, self.holder_pays_commitment_tx_fee, option), - (25, self.payment_preimages, required), - (27, self.first_negotiated_funding_txo, required), - (29, self.initial_counterparty_commitment_tx, option), - (31, self.funding.channel_parameters, required), - (32, self.pending_funding, optional_vec), - }); + write_tlv_fields!(writer, { + (1, channel_monitor.funding_spend_confirmed, option), + (3, channel_monitor.htlcs_resolved_on_chain, required_vec), + (5, pending_monitor_events, required_vec), + (7, channel_monitor.funding_spend_seen, required), + (9, channel_monitor.counterparty_node_id, required), + (11, channel_monitor.confirmed_commitment_tx_counterparty_output, option), + (13, channel_monitor.spendable_txids_confirmed, required_vec), + (15, channel_monitor.counterparty_fulfilled_htlcs, required), + (17, channel_monitor.initial_counterparty_commitment_info, option), + (19, channel_monitor.channel_id, required), + (21, channel_monitor.balances_empty_height, option), + (23, channel_monitor.holder_pays_commitment_tx_fee, option), + (25, channel_monitor.payment_preimages, required), + (27, channel_monitor.first_negotiated_funding_txo, required), + (29, channel_monitor.initial_counterparty_commitment_tx, option), + (31, channel_monitor.funding.channel_parameters, required), + (32, channel_monitor.pending_funding, optional_vec), + }); - Ok(()) + Ok(()) +} + +impl Writeable for ChannelMonitorImpl { + fn write(&self, writer: &mut W) -> Result<(), Error> { + write_chanmon_internal(self, false, writer) } } From 2b3fdb9d9b7000b0e7db986a0b2a6b2da9ca323e Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sat, 28 Jun 2025 15:37:29 +0530 Subject: [PATCH 4/8] Determine if we have lost data Deserialise the ChannelMonitors and compare the data to determine if we have lost some states. --- lightning/src/ln/channelmanager.rs | 50 +++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 38f8728113f..6128edbc1e6 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -81,7 +81,7 @@ use crate::ln::onion_utils::{ decode_fulfill_attribution_data, HTLCFailReason, LocalHTLCFailureReason, }; use crate::ln::onion_utils::{process_fulfill_attribution_data, AttributionData}; -use crate::ln::our_peer_storage::EncryptedOurPeerStorage; +use crate::ln::our_peer_storage::{EncryptedOurPeerStorage, PeerStorageMonitorHolder}; #[cfg(test)] use crate::ln::outbound_payment; use crate::ln::outbound_payment::{ @@ -2959,6 +2959,7 @@ pub(super) const MAX_UNFUNDED_CHANNEL_PEERS: usize = 50; /// This constant defines the upper limit for the size of data /// that can be stored for a peer. It is set to 1024 bytes (1 kilobyte) /// to prevent excessive resource consumption. +#[cfg(not(test))] const MAX_PEER_STORAGE_SIZE: usize = 1024; /// The maximum number of peers which we do not have a (funded) channel with. Once we reach this @@ -9332,7 +9333,53 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ }; log_trace!(logger, "Got valid {}-byte peer backup from {}", decrypted.len(), peer_node_id); + let per_peer_state = self.per_peer_state.read().unwrap(); + + let mut cursor = io::Cursor::new(decrypted); + let mon_list = as Readable>::read(&mut cursor) + .unwrap_or_else(|e| { + // This should NEVER happen. + log_debug!(self.logger, "Unable to unpack the retrieved peer storage {:?}", e); + Vec::new() + }); + + for mon_holder in mon_list.iter() { + let peer_state_mutex = match per_peer_state.get(&mon_holder.counterparty_node_id) { + Some(mutex) => mutex, + None => { + log_debug!( + logger, + "Not able to find peer_state for the counterparty {}, channelId {}", + log_pubkey!(mon_holder.counterparty_node_id), + mon_holder.channel_id + ); + continue; + }, + }; + let peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &*peer_state_lock; + + match peer_state.channel_by_id.get(&mon_holder.channel_id) { + Some(chan) => { + if let Some(funded_chan) = chan.as_funded() { + if funded_chan.get_revoked_counterparty_commitment_transaction_number() + > mon_holder.min_seen_secret + { + panic!( + "Lost channel state for channel {}.\n\ + Received peer storage with a more recent state than what our node had.\n\ + Use the FundRecoverer to initiate a force close and sweep the funds.", + &mon_holder.channel_id + ); + } + } + }, + None => { + log_debug!(logger, "Found an unknown channel {}", &mon_holder.channel_id); + }, + } + } Ok(()) } @@ -9358,6 +9405,7 @@ This indicates a bug inside LDK. Please report this error at https://github.com/ ), ChannelId([0; 32]))); } + #[cfg(not(test))] if msg.data.len() > MAX_PEER_STORAGE_SIZE { log_debug!(logger, "Sending warning to peer and ignoring peer storage request from {} as its over 1KiB", log_pubkey!(counterparty_node_id)); From 662fff1102cb824248a1902981864e0d81809a58 Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Sat, 28 Jun 2025 15:38:40 +0530 Subject: [PATCH 5/8] test: Modify test_peer_storage to check latest changes Node should now determine lost states using retrieved peer storage. --- lightning/src/ln/channelmanager.rs | 104 +++++++++++++---------------- lightning/src/util/test_utils.rs | 5 ++ 2 files changed, 53 insertions(+), 56 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 6128edbc1e6..cb2d83c3a79 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -17165,38 +17165,62 @@ mod tests { #[test] #[rustfmt::skip] + #[cfg(peer_storage)] + #[should_panic(expected = "Lost channel state for channel ae3367da2c13bc1ceb86bf56418f62828f7ce9d6bfb15a46af5ba1f1ed8b124f.\n\ + Received peer storage with a more recent state than what our node had.\n\ + Use the FundRecoverer to initiate a force close and sweep the funds.")] fn test_peer_storage() { let chanmon_cfgs = create_chanmon_cfgs(2); + let (persister, chain_monitor); let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); + let nodes_0_deserialized; let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); - let nodes = create_network(2, &node_cfgs, &node_chanmgrs); + let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1); + let (_, _, cid, _) = create_announced_chan_between_nodes(&nodes, 0, 1); + send_payment(&nodes[0], &vec!(&nodes[1])[..], 1000); + let nodes_0_serialized = nodes[0].node.encode(); + let old_state_monitor = get_monitor!(nodes[0], cid).encode(); + send_payment(&nodes[0], &vec!(&nodes[1])[..], 10000); + send_payment(&nodes[0], &vec!(&nodes[1])[..], 9999); + + // Update peer storage with latest commitment txns + connect_blocks(&nodes[0], 1); + connect_blocks(&nodes[0], 1); let peer_storage_msg_events_node0 = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_msg_events(); let peer_storage_msg_events_node1 = nodes[1].chain_monitor.chain_monitor.get_and_clear_pending_msg_events(); assert_ne!(peer_storage_msg_events_node0.len(), 0); assert_ne!(peer_storage_msg_events_node1.len(), 0); - match peer_storage_msg_events_node0[0] { - MessageSendEvent::SendPeerStorage { ref node_id, ref msg } => { - assert_eq!(*node_id, nodes[1].node.get_our_node_id()); - nodes[1].node.handle_peer_storage(nodes[0].node.get_our_node_id(), msg.clone()); + for ps_msg in peer_storage_msg_events_node0 { + match ps_msg { + MessageSendEvent::SendPeerStorage { ref node_id, ref msg } => { + assert_eq!(*node_id, nodes[1].node.get_our_node_id()); + nodes[1].node.handle_peer_storage(nodes[0].node.get_our_node_id(), msg.clone()); + } + _ => panic!("Unexpected event"), } - _ => panic!("Unexpected event"), } - match peer_storage_msg_events_node1[0] { - MessageSendEvent::SendPeerStorage { ref node_id, ref msg } => { - assert_eq!(*node_id, nodes[0].node.get_our_node_id()); - nodes[0].node.handle_peer_storage(nodes[1].node.get_our_node_id(), msg.clone()); + for ps_msg in peer_storage_msg_events_node1 { + match ps_msg { + MessageSendEvent::SendPeerStorage { ref node_id, ref msg } => { + assert_eq!(*node_id, nodes[0].node.get_our_node_id()); + nodes[0].node.handle_peer_storage(nodes[1].node.get_our_node_id(), msg.clone()); + } + _ => panic!("Unexpected event"), } - _ => panic!("Unexpected event"), } + nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id()); nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id()); + // Reload Node! + // nodes[0].chain_source.clear_watched_txn_and_outputs(); + reload_node!(nodes[0], test_default_channel_config(), &nodes_0_serialized, &[&old_state_monitor[..]], persister, chain_monitor, nodes_0_deserialized); + nodes[0].node.peer_connected(nodes[1].node.get_our_node_id(), &msgs::Init { features: nodes[1].node.init_features(), networks: None, remote_network_address: None }, true).unwrap(); @@ -17207,62 +17231,30 @@ mod tests { let node_1_events = nodes[1].node.get_and_clear_pending_msg_events(); assert_eq!(node_1_events.len(), 2); + // Since, node-0 does not have any memory it would not send any message. let node_0_events = nodes[0].node.get_and_clear_pending_msg_events(); - assert_eq!(node_0_events.len(), 2); + assert_eq!(node_0_events.len(), 1); + + match node_0_events[0] { + MessageSendEvent::SendChannelReestablish { ref node_id, .. } => { + assert_eq!(*node_id, nodes[1].node.get_our_node_id()); + // nodes[0] would send a bogus channel reestablish, so there's no need to handle this. + } + _ => panic!("Unexpected event"), + } - for msg in node_1_events{ + for msg in node_1_events { if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg { nodes[0].node.handle_channel_reestablish(nodes[1].node.get_our_node_id(), msg); assert_eq!(*node_id, nodes[0].node.get_our_node_id()); } else if let MessageSendEvent::SendPeerStorageRetrieval { ref node_id, ref msg } = msg { + // Should Panic here! nodes[0].node.handle_peer_storage_retrieval(nodes[1].node.get_our_node_id(), msg.clone()); assert_eq!(*node_id, nodes[0].node.get_our_node_id()); } else { panic!("Unexpected event") } } - - for msg in node_0_events{ - if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg { - nodes[1].node.handle_channel_reestablish(nodes[0].node.get_our_node_id(), msg); - assert_eq!(*node_id, nodes[1].node.get_our_node_id()); - } else if let MessageSendEvent::SendPeerStorageRetrieval { ref node_id, ref msg } = msg { - nodes[1].node.handle_peer_storage_retrieval(nodes[0].node.get_our_node_id(), msg.clone()); - assert_eq!(*node_id, nodes[1].node.get_our_node_id()); - } else { - panic!("Unexpected event") - } - } - - let node_1_msg_events = nodes[1].node.get_and_clear_pending_msg_events(); - let node_0_msg_events = nodes[0].node.get_and_clear_pending_msg_events(); - - assert_eq!(node_1_msg_events.len(), 3); - assert_eq!(node_0_msg_events.len(), 3); - - for msg in node_1_msg_events { - if let MessageSendEvent::SendChannelReady { ref node_id, .. } = msg { - assert_eq!(*node_id, nodes[0].node.get_our_node_id()); - } else if let MessageSendEvent::SendAnnouncementSignatures { ref node_id, .. } = msg { - assert_eq!(*node_id, nodes[0].node.get_our_node_id()); - } else if let MessageSendEvent::SendChannelUpdate { ref node_id, .. } = msg { - assert_eq!(*node_id, nodes[0].node.get_our_node_id()); - } else { - panic!("Unexpected event") - } - } - - for msg in node_0_msg_events { - if let MessageSendEvent::SendChannelReady { ref node_id, .. } = msg { - assert_eq!(*node_id, nodes[1].node.get_our_node_id()); - } else if let MessageSendEvent::SendAnnouncementSignatures { ref node_id, .. } = msg { - assert_eq!(*node_id, nodes[1].node.get_our_node_id()); - } else if let MessageSendEvent::SendChannelUpdate { ref node_id, .. } = msg { - assert_eq!(*node_id, nodes[1].node.get_our_node_id()); - } else { - panic!("Unexpected event") - } - } } #[test] diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index e5b02b34824..bce3253398d 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -1822,6 +1822,11 @@ impl TestChainSource { self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone())); self.watched_txn.lock().unwrap().remove(&(outpoint.txid, script_pubkey)); } + + pub fn clear_watched_txn_and_outputs(&self) { + self.watched_outputs.lock().unwrap().clear(); + self.watched_txn.lock().unwrap().clear(); + } } impl UtxoLookup for TestChainSource { From 8a05784136788bbb5d6af5461f9d8d42c2d8c8af Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Fri, 25 Jul 2025 11:49:21 +0530 Subject: [PATCH 6/8] fixup: Allow unused_import, these are used in cfg(peer_storage) --- lightning/src/chain/chainmonitor.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index ef81749568c..1681d01870b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -28,6 +28,7 @@ use bitcoin::hash_types::{BlockHash, Txid}; use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; +#[allow(unused_imports)] use crate::chain::channelmonitor::{ write_chanmon_internal, Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, TransactionOutputs, WithChannelMonitor, @@ -47,6 +48,7 @@ use crate::types::features::{InitFeatures, NodeFeatures}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; use crate::util::persist::MonitorName; +#[allow(unused_imports)] use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; use bitcoin::secp256k1::PublicKey; @@ -811,6 +813,7 @@ where } fn send_peer_storage(&self, their_node_id: PublicKey) { + #[allow(unused_mut)] let mut monitors_list: Vec = Vec::new(); let random_bytes = self.entropy_source.get_secure_random_bytes(); From e1f702fabacd13eba98c92edf3af2cfe9d398e60 Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Tue, 29 Jul 2025 17:31:15 +0530 Subject: [PATCH 7/8] fixup: Serialise ChannelMonitors and send them over inside Peer Storage --- Cargo.toml | 2 +- lightning/src/chain/chainmonitor.rs | 79 ++++++++++++++++------------- 2 files changed, 44 insertions(+), 37 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4bd5efdfbf6..e350336bb57 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,5 +68,5 @@ check-cfg = [ "cfg(splicing)", "cfg(async_payments)", "cfg(simple_close)", - "cfg(peer_storage)", + "cfg(peer_storage)", ] diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 1681d01870b..4a40ba8723b 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -28,10 +28,11 @@ use bitcoin::hash_types::{BlockHash, Txid}; use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -#[allow(unused_imports)] +#[cfg(peer_storage)] +use crate::chain::channelmonitor::write_chanmon_internal; use crate::chain::channelmonitor::{ - write_chanmon_internal, Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, - TransactionOutputs, WithChannelMonitor, + Balance, ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, TransactionOutputs, + WithChannelMonitor, }; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput}; @@ -48,7 +49,7 @@ use crate::types::features::{InitFeatures, NodeFeatures}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; use crate::util::persist::MonitorName; -#[allow(unused_imports)] +#[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; use bitcoin::secp256k1::PublicKey; @@ -812,50 +813,54 @@ where mon.values().map(|monitor| monitor.monitor.get_counterparty_node_id()).collect() } + #[cfg(peer_storage)] fn send_peer_storage(&self, their_node_id: PublicKey) { #[allow(unused_mut)] let mut monitors_list: Vec = Vec::new(); let random_bytes = self.entropy_source.get_secure_random_bytes(); - #[cfg(peer_storage)] + const MAX_PEER_STORAGE_SIZE: usize = 65531; + const USIZE_LEN: usize = core::mem::size_of::(); + let mut usize_bytes = [0u8; USIZE_LEN]; + usize_bytes.copy_from_slice(&random_bytes[0..USIZE_LEN]); + let random_usize = usize::from_le_bytes(usize_bytes); + + let mut curr_size = 0; + let monitors = self.monitors.read().unwrap(); + let mut stored_chanmon_idx = alloc::collections::BTreeSet::::new(); + // Used as a fallback reference if the set is empty + let zero = 0; + + while curr_size < MAX_PEER_STORAGE_SIZE + && *stored_chanmon_idx.last().unwrap_or(&zero) < monitors.len() { - const MAX_PEER_STORAGE_SIZE: usize = 65531; - const USIZE_LEN: usize = core::mem::size_of::(); - let random_usize = usize::from_le_bytes(random_bytes[0..USIZE_LEN].try_into().unwrap()); - let mut curr_size = 0; - let monitors = self.monitors.read().unwrap(); - // Randomising Keys in the HashMap to fetch monitors without repetition. - let mut keys: Vec<&ChannelId> = monitors.keys().collect(); - for i in (1..keys.len()).rev() { - let j = random_usize % (i + 1); - keys.swap(i, j); - } + let idx = random_usize % monitors.len(); + stored_chanmon_idx.insert(idx + 1); + let (cid, mon) = monitors.iter().skip(idx).next().unwrap(); - for chan_id in keys { - let mon = &monitors[chan_id]; - let mut ser_chan = VecWriter(Vec::new()); - let min_seen_secret = mon.monitor.get_min_seen_secret(); - let counterparty_node_id = mon.monitor.get_counterparty_node_id(); + let mut ser_chan = VecWriter(Vec::new()); + let min_seen_secret = mon.monitor.get_min_seen_secret(); + let counterparty_node_id = mon.monitor.get_counterparty_node_id(); + { let chan_mon = mon.monitor.inner.lock().unwrap(); write_chanmon_internal(&chan_mon, true, &mut ser_chan) .expect("can not write Channel Monitor for peer storage message"); - - let peer_storage_monitor = PeerStorageMonitorHolder { - channel_id: *chan_id, - min_seen_secret, - counterparty_node_id, - monitor_bytes: ser_chan.0, - }; - - // Adding size of peer_storage_monitor. - curr_size += peer_storage_monitor.serialized_length(); - - if curr_size > MAX_PEER_STORAGE_SIZE { - break; - } - monitors_list.push(peer_storage_monitor); } + let peer_storage_monitor = PeerStorageMonitorHolder { + channel_id: *cid, + min_seen_secret, + counterparty_node_id, + monitor_bytes: ser_chan.0, + }; + + // Adding size of peer_storage_monitor. + curr_size += peer_storage_monitor.serialized_length(); + + if curr_size > MAX_PEER_STORAGE_SIZE { + break; + } + monitors_list.push(peer_storage_monitor); } let serialised_channels = monitors_list.encode(); @@ -965,6 +970,7 @@ where ) }); + #[cfg(peer_storage)] // Send peer storage everytime a new block arrives. for node_id in self.all_counterparty_node_ids() { self.send_peer_storage(node_id); @@ -1066,6 +1072,7 @@ where ) }); + #[cfg(peer_storage)] // Send peer storage everytime a new block arrives. for node_id in self.all_counterparty_node_ids() { self.send_peer_storage(node_id); From 6328b33fef1ecb29d0c18005f4789b31b3407adc Mon Sep 17 00:00:00 2001 From: Aditya Sharma Date: Tue, 29 Jul 2025 17:32:46 +0530 Subject: [PATCH 8/8] fixup: test: Modify test_peer_storage to check latest changes --- lightning/src/ln/channelmanager.rs | 2 +- lightning/src/util/test_utils.rs | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index cb2d83c3a79..e14777cb37b 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -17218,7 +17218,7 @@ mod tests { nodes[1].node.peer_disconnected(nodes[0].node.get_our_node_id()); // Reload Node! - // nodes[0].chain_source.clear_watched_txn_and_outputs(); + // TODO: Handle the case where we've completely forgotten about an active channel. reload_node!(nodes[0], test_default_channel_config(), &nodes_0_serialized, &[&old_state_monitor[..]], persister, chain_monitor, nodes_0_deserialized); nodes[0].node.peer_connected(nodes[1].node.get_our_node_id(), &msgs::Init { diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index bce3253398d..e5b02b34824 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -1822,11 +1822,6 @@ impl TestChainSource { self.watched_outputs.lock().unwrap().remove(&(outpoint, script_pubkey.clone())); self.watched_txn.lock().unwrap().remove(&(outpoint.txid, script_pubkey)); } - - pub fn clear_watched_txn_and_outputs(&self) { - self.watched_outputs.lock().unwrap().clear(); - self.watched_txn.lock().unwrap().clear(); - } } impl UtxoLookup for TestChainSource {