Skip to content

Commit a9ab85a

Browse files
committed
fixup: Serialise ChannelMonitors and send them over inside Peer Storage
1 parent 6328b33 commit a9ab85a

File tree

3 files changed

+88
-30
lines changed

3 files changed

+88
-30
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1887,7 +1887,9 @@ mod tests {
18871887
Arc::clone(&logger),
18881888
Arc::clone(&fee_estimator),
18891889
Arc::clone(&kv_store),
1890+
#[cfg(peer_storage)]
18901891
Arc::clone(&keys_manager),
1892+
#[cfg(peer_storage)]
18911893
keys_manager.get_peer_storage_key(),
18921894
));
18931895
let best_block = BestBlock::from_network(network);

lightning/src/chain/chainmonitor.rs

Lines changed: 84 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,17 @@ use crate::chain::transaction::{OutPoint, TransactionData};
3838
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
3939
use crate::events::{self, Event, EventHandler, ReplayEvent};
4040
use crate::ln::channel_state::ChannelDetails;
41-
use crate::ln::msgs::{self, BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
41+
#[cfg(peer_storage)]
42+
use crate::ln::msgs::PeerStorage;
43+
use crate::ln::msgs::{BaseMessageHandler, Init, MessageSendEvent, SendOnlyMessageHandler};
44+
#[cfg(peer_storage)]
4245
use crate::ln::our_peer_storage::{DecryptedOurPeerStorage, PeerStorageMonitorHolder};
4346
use crate::ln::types::ChannelId;
4447
use crate::prelude::*;
4548
use crate::sign::ecdsa::EcdsaChannelSigner;
46-
use crate::sign::{EntropySource, PeerStorageKey};
49+
use crate::sign::EntropySource;
50+
#[cfg(peer_storage)]
51+
use crate::sign::PeerStorageKey;
4752
use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard};
4853
use crate::types::features::{InitFeatures, NodeFeatures};
4954
use crate::util::errors::APIError;
@@ -53,6 +58,8 @@ use crate::util::persist::MonitorName;
5358
use crate::util::ser::{VecWriter, Writeable};
5459
use crate::util::wakers::{Future, Notifier};
5560
use bitcoin::secp256k1::PublicKey;
61+
#[cfg(peer_storage)]
62+
use core::iter::Cycle;
5663
use core::ops::Deref;
5764
use core::sync::atomic::{AtomicUsize, Ordering};
5865

@@ -268,6 +275,8 @@ pub struct ChainMonitor<
268275
logger: L,
269276
fee_estimator: F,
270277
persister: P,
278+
279+
#[cfg(peer_storage)]
271280
entropy_source: ES,
272281
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
273282
/// from the user and not from a [`ChannelMonitor`].
@@ -282,7 +291,9 @@ pub struct ChainMonitor<
282291
/// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners.
283292
pending_send_only_events: Mutex<Vec<MessageSendEvent>>,
284293

294+
#[cfg(peer_storage)]
285295
our_peerstorage_encryption_key: PeerStorageKey,
296+
_phantom: std::marker::PhantomData<ES>,
286297
}
287298

288299
impl<
@@ -479,6 +490,27 @@ where
479490
/// [`NodeSigner`]: crate::sign::NodeSigner
480491
/// [`NodeSigner::get_peer_storage_key`]: crate::sign::NodeSigner::get_peer_storage_key
481492
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
493+
#[cfg(not(peer_storage))]
494+
pub fn new(
495+
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P,
496+
) -> Self {
497+
Self {
498+
monitors: RwLock::new(new_hash_map()),
499+
chain_source,
500+
broadcaster,
501+
logger,
502+
fee_estimator: feeest,
503+
persister,
504+
pending_monitor_events: Mutex::new(Vec::new()),
505+
highest_chain_height: AtomicUsize::new(0),
506+
event_notifier: Notifier::new(),
507+
pending_send_only_events: Mutex::new(Vec::new()),
508+
_phantom: std::marker::PhantomData,
509+
}
510+
}
511+
512+
/// Adding `entropy_source` and `our_peerstorage_encryption_key` for compiling with peer_storage attr.
513+
#[cfg(peer_storage)]
482514
pub fn new(
483515
chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P,
484516
entropy_source: ES, our_peerstorage_encryption_key: PeerStorageKey,
@@ -496,6 +528,7 @@ where
496528
event_notifier: Notifier::new(),
497529
pending_send_only_events: Mutex::new(Vec::new()),
498530
our_peerstorage_encryption_key,
531+
_phantom: std::marker::PhantomData,
499532
}
500533
}
501534

@@ -808,59 +841,80 @@ where
808841

809842
/// This function collects the counterparty node IDs from all monitors into a `HashSet`,
810843
/// ensuring unique IDs are returned.
844+
#[cfg(peer_storage)]
811845
fn all_counterparty_node_ids(&self) -> HashSet<PublicKey> {
812846
let mon = self.monitors.read().unwrap();
813847
mon.values().map(|monitor| monitor.monitor.get_counterparty_node_id()).collect()
814848
}
815849

816850
#[cfg(peer_storage)]
817851
fn send_peer_storage(&self, their_node_id: PublicKey) {
818-
#[allow(unused_mut)]
819852
let mut monitors_list: Vec<PeerStorageMonitorHolder> = Vec::new();
820853
let random_bytes = self.entropy_source.get_secure_random_bytes();
821854

822855
const MAX_PEER_STORAGE_SIZE: usize = 65531;
823856
const USIZE_LEN: usize = core::mem::size_of::<usize>();
824-
let mut usize_bytes = [0u8; USIZE_LEN];
825-
usize_bytes.copy_from_slice(&random_bytes[0..USIZE_LEN]);
826-
let random_usize = usize::from_le_bytes(usize_bytes);
827-
828-
let mut curr_size = 0;
829-
let monitors = self.monitors.read().unwrap();
830-
let mut stored_chanmon_idx = alloc::collections::BTreeSet::<usize>::new();
831-
// Used as a fallback reference if the set is empty
832-
let zero = 0;
857+
let mut random_bytes_cycle_iter = random_bytes.iter().cycle();
858+
859+
let mut current_size = 0;
860+
let monitors_lock = self.monitors.read().unwrap();
861+
let mut channel_ids = monitors_lock.keys().copied().collect();
862+
863+
fn next_random_id(
864+
channel_ids: &mut Vec<ChannelId>,
865+
random_bytes_cycle_iter: &mut Cycle<core::slice::Iter<u8>>,
866+
) -> Option<ChannelId> {
867+
if channel_ids.is_empty() {
868+
return None;
869+
}
870+
let random_idx = {
871+
let mut usize_bytes = [0u8; USIZE_LEN];
872+
usize_bytes.iter_mut().for_each(|b| {
873+
*b = *random_bytes_cycle_iter.next().expect("A cycle never ends")
874+
});
875+
// Take one more to introduce a slight misalignment.
876+
random_bytes_cycle_iter.next().expect("A cycle never ends");
877+
usize::from_le_bytes(usize_bytes) % channel_ids.len()
878+
};
879+
Some(channel_ids.swap_remove(random_idx))
880+
}
833881

834-
while curr_size < MAX_PEER_STORAGE_SIZE
835-
&& *stored_chanmon_idx.last().unwrap_or(&zero) < monitors.len()
882+
while let Some(channel_id) = next_random_id(&mut channel_ids, &mut random_bytes_cycle_iter)
836883
{
837-
let idx = random_usize % monitors.len();
838-
stored_chanmon_idx.insert(idx + 1);
839-
let (cid, mon) = monitors.iter().skip(idx).next().unwrap();
884+
let monitor_holder = if let Some(monitor_holder) = monitors_lock.get(&channel_id) {
885+
monitor_holder
886+
} else {
887+
debug_assert!(
888+
false,
889+
"Tried to access non-existing monitor, this should never happen"
890+
);
891+
break;
892+
};
840893

841-
let mut ser_chan = VecWriter(Vec::new());
842-
let min_seen_secret = mon.monitor.get_min_seen_secret();
843-
let counterparty_node_id = mon.monitor.get_counterparty_node_id();
894+
let mut serialized_channel = VecWriter(Vec::new());
895+
let min_seen_secret = monitor_holder.monitor.get_min_seen_secret();
896+
let counterparty_node_id = monitor_holder.monitor.get_counterparty_node_id();
844897
{
845-
let chan_mon = mon.monitor.inner.lock().unwrap();
898+
let inner_lock = monitor_holder.monitor.inner.lock().unwrap();
846899

847-
write_chanmon_internal(&chan_mon, true, &mut ser_chan)
900+
write_chanmon_internal(&inner_lock, true, &mut serialized_channel)
848901
.expect("can not write Channel Monitor for peer storage message");
849902
}
850903
let peer_storage_monitor = PeerStorageMonitorHolder {
851-
channel_id: *cid,
904+
channel_id,
852905
min_seen_secret,
853906
counterparty_node_id,
854-
monitor_bytes: ser_chan.0,
907+
monitor_bytes: serialized_channel.0,
855908
};
856909

857-
// Adding size of peer_storage_monitor.
858-
curr_size += peer_storage_monitor.serialized_length();
910+
let serialized_length = peer_storage_monitor.serialized_length();
859911

860-
if curr_size > MAX_PEER_STORAGE_SIZE {
861-
break;
912+
if current_size + serialized_length > MAX_PEER_STORAGE_SIZE {
913+
continue;
914+
} else {
915+
current_size += serialized_length;
916+
monitors_list.push(peer_storage_monitor);
862917
}
863-
monitors_list.push(peer_storage_monitor);
864918
}
865919

866920
let serialised_channels = monitors_list.encode();
@@ -870,7 +924,7 @@ where
870924
log_debug!(self.logger, "Sending Peer Storage to {}", log_pubkey!(their_node_id));
871925
let send_peer_storage_event = MessageSendEvent::SendPeerStorage {
872926
node_id: their_node_id,
873-
msg: msgs::PeerStorage { data: cipher.into_vec() },
927+
msg: PeerStorage { data: cipher.into_vec() },
874928
};
875929

876930
self.pending_send_only_events.lock().unwrap().push(send_peer_storage_event)

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,9 @@ impl<'a> TestChainMonitor<'a> {
471471
logger,
472472
fee_estimator,
473473
persister,
474+
#[cfg(peer_storage)]
474475
keys_manager,
476+
#[cfg(peer_storage)]
475477
keys_manager.get_peer_storage_key(),
476478
),
477479
keys_manager,

0 commit comments

Comments
 (0)