Skip to content

Commit e7dddc9

Browse files
committed
fixup: Serialise ChannelMonitors and send them over inside Peer Storage
1 parent 6328b33 commit e7dddc9

File tree

2 files changed

+67
-32
lines changed

2 files changed

+67
-32
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1887,7 +1887,9 @@ mod tests {
18871887
Arc::clone(&logger),
18881888
Arc::clone(&fee_estimator),
18891889
Arc::clone(&kv_store),
1890+
#[cfg(peer_storage)]
18901891
Arc::clone(&keys_manager),
1892+
#[cfg(peer_storage)]
18911893
keys_manager.get_peer_storage_key(),
18921894
));
18931895
let best_block = BestBlock::from_network(network);

lightning/src/chain/chainmonitor.rs

Lines changed: 65 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ use crate::chain::transaction::{OutPoint, TransactionData};
3838
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
3939
use crate::events::{self, Event, EventHandler, ReplayEvent};
4040
use crate::ln::channel_state::ChannelDetails;
41-
use crate::ln::msgs::{self, BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
41+
#[cfg(peer_storage)]
42+
use crate::ln::msgs::PeerStorage;
43+
use crate::ln::msgs::{BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
44+
#[cfg(peer_storage)]
4245
use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHolder};
4346
use crate::ln::types::ChannelId;
4447
use crate::prelude::*;
@@ -53,6 +56,8 @@ use crate::util::persist::MonitorName;
5356
use crate::util::ser::{VecWriter, Writeable};
5457
use crate::util::wakers::{Future, Notifier};
5558
use bitcoin::secp256k1::PublicKey;
59+
#[cfg(peer_storage)]
60+
use core::iter::Cycle;
5661
use core::ops::Deref;
5762
use core::sync::atomic::{AtomicUsize, Ordering};
5863

@@ -268,6 +273,8 @@ pub struct ChainMonitor<
268273
logger: L,
269274
fee_estimator: F,
270275
persister: P,
276+
277+
#[cfg(peer_storage)]
271278
entropy_source: ES,
272279
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
273280
/// from the user and not from a [`ChannelMonitor`].
@@ -282,7 +289,9 @@ pub struct ChainMonitor<
282289
/// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners.
283290
pending_send_only_events: Mutex<Vec<MessageSendEvent>>,
284291

292+
#[cfg(peer_storage)]
285293
our_peerstorage_encryption_key: PeerStorageKey,
294+
_phantom: std::marker::PhantomData<ES>,
286295
}
287296

288297
impl<
@@ -481,7 +490,7 @@ where
481490
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
482491
pub fn new(
483492
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P,
484-
entropy_source: ES, our_peerstorage_encryption_key: PeerStorageKey,
493+
_entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey,
485494
) -> Self {
486495
Self {
487496
monitors: RwLock::new(new_hash_map()),
@@ -490,12 +499,15 @@ where
490499
logger,
491500
fee_estimator: feeest,
492501
persister,
493-
entropy_source,
502+
#[cfg(peer_storage)]
503+
entropy_source: _entropy_source,
494504
pending_monitor_events: Mutex::new(Vec::new()),
495505
highest_chain_height: AtomicUsize::new(0),
496506
event_notifier: Notifier::new(),
497507
pending_send_only_events: Mutex::new(Vec::new()),
498-
our_peerstorage_encryption_key,
508+
#[cfg(peer_storage)]
509+
our_peerstorage_encryption_key: _our_peerstorage_encryption_key,
510+
_phantom: std::marker::PhantomData,
499511
}
500512
}
501513

@@ -808,59 +820,80 @@ where
808820

809821
/// This function collects the counterparty node IDs from all monitors into a `HashSet`,
810822
/// ensuring unique IDs are returned.
823+
#[cfg(peer_storage)]
811824
fn all_counterparty_node_ids(&self) -> HashSet<PublicKey> {
812825
let mon = self.monitors.read().unwrap();
813826
mon.values().map(|monitor| monitor.monitor.get_counterparty_node_id()).collect()
814827
}
815828

816829
#[cfg(peer_storage)]
817830
fn send_peer_storage(&self, their_node_id: PublicKey) {
818-
#[allow(unused_mut)]
819831
let mut monitors_list: Vec<PeerStorageMonitorHolder> = Vec::new();
820832
let random_bytes = self.entropy_source.get_secure_random_bytes();
821833

822834
const MAX_PEER_STORAGE_SIZE: usize = 65531;
823835
const USIZE_LEN: usize = core::mem::size_of::<usize>();
824-
let mut usize_bytes = [0u8; USIZE_LEN];
825-
usize_bytes.copy_from_slice(&random_bytes[0..USIZE_LEN]);
826-
let random_usize = usize::from_le_bytes(usize_bytes);
827-
828-
let mut curr_size = 0;
829-
let monitors = self.monitors.read().unwrap();
830-
let mut stored_chanmon_idx = alloc::collections::BTreeSet::<usize>::new();
831-
// Used as a fallback reference if the set is empty
832-
let zero = 0;
836+
let mut random_bytes_cycle_iter = random_bytes.iter().cycle();
837+
838+
let mut current_size = 0;
839+
let monitors_lock = self.monitors.read().unwrap();
840+
let mut channel_ids = monitors_lock.keys().copied().collect();
841+
842+
fn next_random_id(
843+
channel_ids: &mut Vec<ChannelId>,
844+
random_bytes_cycle_iter: &mut Cycle<core::slice::Iter<u8>>,
845+
) -> Option<ChannelId> {
846+
if channel_ids.is_empty() {
847+
return None;
848+
}
849+
let random_idx = {
850+
let mut usize_bytes = [0u8; USIZE_LEN];
851+
usize_bytes.iter_mut().for_each(|b| {
852+
*b = *random_bytes_cycle_iter.next().expect("A cycle never ends")
853+
});
854+
// Take one more to introduce a slight misalignment.
855+
random_bytes_cycle_iter.next().expect("A cycle never ends");
856+
usize::from_le_bytes(usize_bytes) % channel_ids.len()
857+
};
858+
Some(channel_ids.swap_remove(random_idx))
859+
}
833860

834-
while curr_size < MAX_PEER_STORAGE_SIZE
835-
&& *stored_chanmon_idx.last().unwrap_or(&zero) < monitors.len()
861+
while let Some(channel_id) = next_random_id(&mut channel_ids, &mut random_bytes_cycle_iter)
836862
{
837-
let idx = random_usize % monitors.len();
838-
stored_chanmon_idx.insert(idx + 1);
839-
let (cid, mon) = monitors.iter().skip(idx).next().unwrap();
863+
let monitor_holder = if let Some(monitor_holder) = monitors_lock.get(&channel_id) {
864+
monitor_holder
865+
} else {
866+
debug_assert!(
867+
false,
868+
"Tried to access non-existing monitor, this should never happen"
869+
);
870+
break;
871+
};
840872

841-
let mut ser_chan = VecWriter(Vec::new());
842-
let min_seen_secret = mon.monitor.get_min_seen_secret();
843-
let counterparty_node_id = mon.monitor.get_counterparty_node_id();
873+
let mut serialized_channel = VecWriter(Vec::new());
874+
let min_seen_secret = monitor_holder.monitor.get_min_seen_secret();
875+
let counterparty_node_id = monitor_holder.monitor.get_counterparty_node_id();
844876
{
845-
let chan_mon = mon.monitor.inner.lock().unwrap();
877+
let inner_lock = monitor_holder.monitor.inner.lock().unwrap();
846878

847-
write_chanmon_internal(&chan_mon, true, &mut ser_chan)
879+
write_chanmon_internal(&inner_lock, true, &mut serialized_channel)
848880
.expect("can not write Channel Monitor for peer storage message");
849881
}
850882
let peer_storage_monitor = PeerStorageMonitorHolder {
851-
channel_id: *cid,
883+
channel_id,
852884
min_seen_secret,
853885
counterparty_node_id,
854-
monitor_bytes: ser_chan.0,
886+
monitor_bytes: serialized_channel.0,
855887
};
856888

857-
// Adding size of peer_storage_monitor.
858-
curr_size += peer_storage_monitor.serialized_length();
889+
let serialized_length = peer_storage_monitor.serialized_length();
859890

860-
if curr_size > MAX_PEER_STORAGE_SIZE {
861-
break;
891+
if current_size + serialized_length > MAX_PEER_STORAGE_SIZE {
892+
continue;
893+
} else {
894+
current_size += serialized_length;
895+
monitors_list.push(peer_storage_monitor);
862896
}
863-
monitors_list.push(peer_storage_monitor);
864897
}
865898

866899
let serialised_channels = monitors_list.encode();
@@ -870,7 +903,7 @@ where
870903
log_debug!(self.logger, "Sending Peer Storage to {}", log_pubkey!(their_node_id));
871904
let send_peer_storage_event = MessageSendEvent::SendPeerStorage {
872905
node_id: their_node_id,
873-
msg: msgs::PeerStorage { data: cipher.into_vec() },
906+
msg: PeerStorage { data: cipher.into_vec() },
874907
};
875908

876909
self.pending_send_only_events.lock().unwrap().push(send_peer_storage_event)

0 commit comments

Comments
 (0)