Skip to content

Commit 1c1fe71

Browse files
Rebuild manager forwarded htlcs maps from Channels
We have an overarching goal of (mostly) getting rid of ChannelManager persistence and rebuilding the ChannelManager's state from existing ChannelMonitors, due to issues when the two structs are out-of-sync on restart. The main issue that can arise is channel force closure. Here we start this process by rebuilding ChannelManager::decode_update_add_htlcs, forward_htlcs, and pending_intercepted_htlcs from Channel data, which will soon be included in the ChannelMonitors as part of a different series of PRs. We also fix the reload_node test util to use the node's pre-reload config after restart. The previous behavior was a bit surprising and led to one of this commit's tests failing.
1 parent cfd2bd9 commit 1c1fe71

File tree

3 files changed

+159
-3
lines changed

3 files changed

+159
-3
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18271,6 +18271,76 @@ where
1827118271
}
1827218272
}
1827318273

18274+
// Remove HTLCs from `forward_htlcs` if they are also present in `decode_update_add_htlcs`.
18275+
//
18276+
// In the future, the full set of pending HTLCs will be pulled from `Channel{Monitor}` data and
18277+
// placed in `ChannelManager::decode_update_add_htlcs` on read, to be handled on the next call
18278+
// to `process_pending_htlc_forwards`. This is part of a larger effort to remove the requirement
18279+
// of regularly persisting the `ChannelManager`. The new pipeline is supported for HTLC forwards
18280+
// received on LDK 0.3+ but not <= 0.2, so prune non-legacy HTLCs from `forward_htlcs`.
18281+
forward_htlcs_legacy.retain(|scid, pending_fwds| {
18282+
for fwd in pending_fwds {
18283+
let (prev_scid, prev_htlc_id) = match fwd {
18284+
HTLCForwardInfo::AddHTLC(htlc) => {
18285+
(htlc.prev_outbound_scid_alias, htlc.prev_htlc_id)
18286+
},
18287+
HTLCForwardInfo::FailHTLC { htlc_id, .. }
18288+
| HTLCForwardInfo::FailMalformedHTLC { htlc_id, .. } => (*scid, *htlc_id),
18289+
};
18290+
if let Some(pending_update_adds) = decode_update_add_htlcs.get_mut(&prev_scid) {
18291+
if pending_update_adds
18292+
.iter()
18293+
.any(|update_add| update_add.htlc_id == prev_htlc_id)
18294+
{
18295+
return false;
18296+
}
18297+
}
18298+
}
18299+
true
18300+
});
18301+
// Remove intercepted HTLC forwards if they are also present in `decode_update_add_htlcs`. See
18302+
// the above comment.
18303+
pending_intercepted_htlcs_legacy.retain(|id, fwd| {
18304+
let prev_scid = fwd.prev_outbound_scid_alias;
18305+
if let Some(pending_update_adds) = decode_update_add_htlcs.get_mut(&prev_scid) {
18306+
if pending_update_adds
18307+
.iter()
18308+
.any(|update_add| update_add.htlc_id == fwd.prev_htlc_id)
18309+
{
18310+
pending_events_read.retain(
18311+
|(ev, _)| !matches!(ev, Event::HTLCIntercepted { intercept_id, .. } if intercept_id == id),
18312+
);
18313+
return false;
18314+
}
18315+
}
18316+
if !pending_events_read.iter().any(
18317+
|(ev, _)| matches!(ev, Event::HTLCIntercepted { intercept_id, .. } if intercept_id == id),
18318+
) {
18319+
match create_htlc_intercepted_event(*id, &fwd) {
18320+
Ok(ev) => pending_events_read.push_back((ev, None)),
18321+
Err(()) => debug_assert!(false),
18322+
}
18323+
}
18324+
true
18325+
});
18326+
// Add legacy update_adds that were received on LDK <= 0.2 that are not present in the
18327+
// `decode_update_add_htlcs` map that was rebuilt from `Channel{Monitor}` data, see above
18328+
// comment.
18329+
for (scid, legacy_update_adds) in decode_update_add_htlcs_legacy.drain() {
18330+
match decode_update_add_htlcs.entry(scid) {
18331+
hash_map::Entry::Occupied(mut update_adds) => {
18332+
for legacy_update_add in legacy_update_adds {
18333+
if !update_adds.get().contains(&legacy_update_add) {
18334+
update_adds.get_mut().push(legacy_update_add);
18335+
}
18336+
}
18337+
},
18338+
hash_map::Entry::Vacant(entry) => {
18339+
entry.insert(legacy_update_adds);
18340+
},
18341+
}
18342+
}
18343+
1827418344
let best_block = BestBlock::new(best_block_hash, best_block_height);
1827518345
let flow = OffersMessageFlow::new(
1827618346
chain_hash,
@@ -18300,7 +18370,7 @@ where
1830018370
pending_intercepted_htlcs: Mutex::new(pending_intercepted_htlcs_legacy),
1830118371

1830218372
forward_htlcs: Mutex::new(forward_htlcs_legacy),
18303-
decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs_legacy),
18373+
decode_update_add_htlcs: Mutex::new(decode_update_add_htlcs),
1830418374
claimable_payments: Mutex::new(ClaimablePayments {
1830518375
claimable_payments,
1830618376
pending_claiming_payments: pending_claiming_payments.unwrap(),

lightning/src/ln/functional_test_utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1382,9 +1382,10 @@ macro_rules! reload_node {
13821382
$node.onion_messenger.set_async_payments_handler(&$new_channelmanager);
13831383
};
13841384
($node: expr, $chanman_encoded: expr, $monitors_encoded: expr, $persister: ident, $new_chain_monitor: ident, $new_channelmanager: ident) => {
1385+
let config = $node.node.get_current_config();
13851386
reload_node!(
13861387
$node,
1387-
test_default_channel_config(),
1388+
config,
13881389
$chanman_encoded,
13891390
$monitors_encoded,
13901391
$persister,

lightning/src/ln/reload_tests.rs

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
508508

509509
#[cfg(feature = "std")]
510510
fn do_test_data_loss_protect(reconnect_panicing: bool, substantially_old: bool, not_stale: bool) {
511-
use crate::ln::channelmanager::Retry;
511+
use crate::ln::outbound_payment::Retry;
512512
use crate::types::string::UntrustedString;
513513
// When we get a data_loss_protect proving we're behind, we immediately panic as the
514514
// chain::Watch API requirements have been violated (e.g. the user restored from a backup). The
@@ -1173,6 +1173,91 @@ fn removed_payment_no_manager_persistence() {
11731173
expect_payment_failed!(nodes[0], payment_hash, false);
11741174
}
11751175

1176+
#[test]
1177+
fn manager_persisted_pre_outbound_edge_forward() {
1178+
do_manager_persisted_pre_outbound_edge_forward(false);
1179+
}
1180+
1181+
#[test]
1182+
fn manager_persisted_pre_outbound_edge_intercept_forward() {
1183+
do_manager_persisted_pre_outbound_edge_forward(true);
1184+
}
1185+
1186+
fn do_manager_persisted_pre_outbound_edge_forward(intercept_htlc: bool) {
1187+
let chanmon_cfgs = create_chanmon_cfgs(3);
1188+
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
1189+
let persister;
1190+
let new_chain_monitor;
1191+
let mut intercept_forwards_config = test_default_channel_config();
1192+
intercept_forwards_config.accept_intercept_htlcs = true;
1193+
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, Some(intercept_forwards_config), None]);
1194+
let nodes_1_deserialized;
1195+
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
1196+
1197+
let chan_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1).2;
1198+
let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2).2;
1199+
1200+
// Lock in the HTLC from node_a <> node_b.
1201+
let amt_msat = 5000;
1202+
let (mut route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[2], amt_msat);
1203+
if intercept_htlc {
1204+
route.paths[0].hops[1].short_channel_id = nodes[1].node.get_intercept_scid();
1205+
}
1206+
nodes[0].node.send_payment_with_route(route, payment_hash, RecipientOnionFields::secret_only(payment_secret), PaymentId(payment_hash.0)).unwrap();
1207+
check_added_monitors(&nodes[0], 1);
1208+
let updates = get_htlc_update_msgs(&nodes[0], &nodes[1].node.get_our_node_id());
1209+
nodes[1].node.handle_update_add_htlc(nodes[0].node.get_our_node_id(), &updates.update_add_htlcs[0]);
1210+
do_commitment_signed_dance(&nodes[1], &nodes[0], &updates.commitment_signed, false, false);
1211+
1212+
// Decode the HTLC onion but don't forward it to the next hop, such that the HTLC ends up in
1213+
// `ChannelManager::forward_htlcs` or `ChannelManager::pending_intercepted_htlcs`.
1214+
nodes[1].node.test_process_pending_update_add_htlcs();
1215+
1216+
// Disconnect peers and reload the forwarding node_b.
1217+
nodes[0].node.peer_disconnected(nodes[1].node.get_our_node_id());
1218+
nodes[2].node.peer_disconnected(nodes[1].node.get_our_node_id());
1219+
1220+
let node_b_encoded = nodes[1].node.encode();
1221+
1222+
let chan_0_monitor_serialized = get_monitor!(nodes[1], chan_id_1).encode();
1223+
let chan_1_monitor_serialized = get_monitor!(nodes[1], chan_id_2).encode();
1224+
reload_node!(nodes[1], node_b_encoded, &[&chan_0_monitor_serialized, &chan_1_monitor_serialized], persister, new_chain_monitor, nodes_1_deserialized);
1225+
1226+
reconnect_nodes(ReconnectArgs::new(&nodes[1], &nodes[0]));
1227+
let mut args_b_c = ReconnectArgs::new(&nodes[1], &nodes[2]);
1228+
args_b_c.send_channel_ready = (true, true);
1229+
args_b_c.send_announcement_sigs = (true, true);
1230+
reconnect_nodes(args_b_c);
1231+
1232+
// Forward the HTLC and ensure we can claim it post-reload.
1233+
nodes[1].node.process_pending_htlc_forwards();
1234+
1235+
if intercept_htlc {
1236+
let events = nodes[1].node.get_and_clear_pending_events();
1237+
assert_eq!(events.len(), 1);
1238+
let (intercept_id, expected_outbound_amt_msat) = match events[0] {
1239+
Event::HTLCIntercepted { intercept_id, expected_outbound_amount_msat, .. } => {
1240+
(intercept_id, expected_outbound_amount_msat)
1241+
},
1242+
_ => panic!()
1243+
};
1244+
nodes[1].node.forward_intercepted_htlc(intercept_id, &chan_id_2,
1245+
nodes[2].node.get_our_node_id(), expected_outbound_amt_msat).unwrap();
1246+
nodes[1].node.process_pending_htlc_forwards();
1247+
}
1248+
check_added_monitors(&nodes[1], 1);
1249+
1250+
let updates = get_htlc_update_msgs(&nodes[1], &nodes[2].node.get_our_node_id());
1251+
nodes[2].node.handle_update_add_htlc(nodes[1].node.get_our_node_id(), &updates.update_add_htlcs[0]);
1252+
do_commitment_signed_dance(&nodes[2], &nodes[1], &updates.commitment_signed, false, false);
1253+
expect_and_process_pending_htlcs(&nodes[2], false);
1254+
1255+
expect_payment_claimable!(nodes[2], payment_hash, payment_secret, amt_msat, None, nodes[2].node.get_our_node_id());
1256+
let path: &[&[_]] = &[&[&nodes[1], &nodes[2]]];
1257+
do_claim_payment_along_route(ClaimAlongRouteArgs::new(&nodes[0], path, payment_preimage));
1258+
expect_payment_sent(&nodes[0], payment_preimage, None, true, true);
1259+
}
1260+
11761261
#[test]
11771262
fn test_reload_partial_funding_batch() {
11781263
let chanmon_cfgs = create_chanmon_cfgs(3);

0 commit comments

Comments
 (0)