Skip to content

Commit 2cde0c1

Browse files
committed
Let BackgroundProcessor drive HTLC forwarding
Previously, we'd require the user to manually call `process_pending_htlc_forwards` as part of `PendingHTLCsForwardable` event handling. Here, we rather move this responsibility to `BackgroundProcessor`, which simplyfies the flow and allows us to implement reasonable forwarding delays on our side rather than delegating to users' implementations. Note this also introduces batching rounds rather than calling `process_pending_htlc_forwards` individually for each `PendingHTLCsForwardable` event, which had been unintuitive anyways, as subsequent `PendingHTLCsForwardable` could lead to overlapping batch intervals, resulting in the shortest timespan 'winning' every time, as `process_pending_htlc_forwards` would of course handle all pending HTLCs at once.
1 parent b587f82 commit 2cde0c1

File tree

2 files changed

+80
-4
lines changed

2 files changed

+80
-4
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
// This file is Copyright its original authors, visible in version control
2+
// history.
3+
//
4+
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
5+
// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
6+
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
7+
// You may not use this file except in accordance with one or both of these
8+
// licenses.
9+
10+
use core::sync::atomic::{AtomicU16, Ordering};
11+
use core::time::Duration;
12+
13+
pub(crate) struct BatchDelay {
14+
next_batch_delay_millis: AtomicU16,
15+
}
16+
17+
impl BatchDelay {
18+
pub(crate) fn new() -> Self {
19+
let next_batch_delay_millis = AtomicU16::new(rand_batch_delay_millis());
20+
Self { next_batch_delay_millis }
21+
}
22+
23+
pub(crate) fn get(&self) -> Duration {
24+
Duration::from_millis(self.next_batch_delay_millis.load(Ordering::Acquire) as u64)
25+
}
26+
27+
pub(crate) fn next(&self) -> Duration {
28+
let next = rand_batch_delay_millis();
29+
self.next_batch_delay_millis.store(next, Ordering::Release);
30+
Duration::from_millis(next as u64)
31+
}
32+
}
33+
34+
fn rand_batch_delay_millis() -> u16 {
35+
// TODO: actually randomize the result.
36+
100
37+
}

lightning-background-processor/src/lib.rs

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ extern crate alloc;
2626
extern crate lightning;
2727
extern crate lightning_rapid_gossip_sync;
2828

29+
mod fwd_batch;
30+
31+
use fwd_batch::BatchDelay;
32+
2933
use lightning::chain;
3034
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
3135
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
@@ -328,7 +332,7 @@ macro_rules! define_run_body {
328332
$peer_manager: ident, $gossip_sync: ident,
329333
$process_sweeper: expr,
330334
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
331-
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
335+
$timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $batch_delay: expr,
332336
) => { {
333337
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
334338
$channel_manager.get_cm().timer_tick_occurred();
@@ -345,6 +349,9 @@ macro_rules! define_run_body {
345349
let mut have_pruned = false;
346350
let mut have_decayed_scorer = false;
347351

352+
let mut cur_batch_delay = $batch_delay.get();
353+
let mut last_forwards_processing_call = $get_timer(cur_batch_delay);
354+
348355
loop {
349356
$process_channel_manager_events;
350357
$process_chain_monitor_events;
@@ -369,6 +376,18 @@ macro_rules! define_run_body {
369376
break;
370377
}
371378

379+
if $timer_elapsed(&mut last_forwards_processing_call, cur_batch_delay) {
380+
$channel_manager.get_cm().process_pending_htlc_forwards();
381+
cur_batch_delay = $batch_delay.next();
382+
last_forwards_processing_call = $get_timer(cur_batch_delay);
383+
}
384+
385+
// Exit the loop if the background processor was requested to stop.
386+
if $loop_exit_check {
387+
log_trace!($logger, "Terminating background processor.");
388+
break;
389+
}
390+
372391
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
373392
// see `await_start`'s use below.
374393
let mut await_start = None;
@@ -523,12 +542,14 @@ pub(crate) mod futures_util {
523542
C: Future<Output = ()> + Unpin,
524543
D: Future<Output = ()> + Unpin,
525544
E: Future<Output = bool> + Unpin,
545+
F: Future<Output = bool> + Unpin,
526546
> {
527547
pub a: A,
528548
pub b: B,
529549
pub c: C,
530550
pub d: D,
531551
pub e: E,
552+
pub f: F,
532553
}
533554

534555
pub(crate) enum SelectorOutput {
@@ -537,6 +558,7 @@ pub(crate) mod futures_util {
537558
C,
538559
D,
539560
E(bool),
561+
F(bool),
540562
}
541563

542564
impl<
@@ -545,7 +567,8 @@ pub(crate) mod futures_util {
545567
C: Future<Output = ()> + Unpin,
546568
D: Future<Output = ()> + Unpin,
547569
E: Future<Output = bool> + Unpin,
548-
> Future for Selector<A, B, C, D, E>
570+
F: Future<Output = bool> + Unpin,
571+
> Future for Selector<A, B, C, D, E, F>
549572
{
550573
type Output = SelectorOutput;
551574
fn poll(
@@ -581,6 +604,12 @@ pub(crate) mod futures_util {
581604
},
582605
Poll::Pending => {},
583606
}
607+
match Pin::new(&mut self.f).poll(ctx) {
608+
Poll::Ready(res) => {
609+
return Poll::Ready(SelectorOutput::F(res));
610+
},
611+
Poll::Pending => {},
612+
}
584613
Poll::Pending
585614
}
586615
}
@@ -863,6 +892,7 @@ where
863892
event_handler(event).await
864893
})
865894
};
895+
let batch_delay = Arc::new(BatchDelay::new());
866896
define_run_body!(
867897
persister,
868898
chain_monitor,
@@ -901,7 +931,8 @@ where
901931
b: chain_monitor.get_update_future(),
902932
c: om_fut,
903933
d: lm_fut,
904-
e: sleeper(if mobile_interruptable_platform {
934+
e: sleeper(batch_delay.get()),
935+
f: sleeper(if mobile_interruptable_platform {
905936
Duration::from_millis(100)
906937
} else {
907938
FASTEST_TIMER
@@ -912,6 +943,9 @@ where
912943
SelectorOutput::E(exit) => {
913944
should_break = exit;
914945
},
946+
SelectorOutput::F(exit) => {
947+
should_break = exit;
948+
},
915949
}
916950
},
917951
|t| sleeper(t),
@@ -928,6 +962,7 @@ where
928962
},
929963
mobile_interruptable_platform,
930964
fetch_time,
965+
batch_delay,
931966
)
932967
}
933968

@@ -1051,6 +1086,7 @@ impl BackgroundProcessor {
10511086
}
10521087
event_handler.handle_event(event)
10531088
};
1089+
let batch_delay = Arc::new(BatchDelay::new());
10541090
define_run_body!(
10551091
persister,
10561092
chain_monitor,
@@ -1094,7 +1130,9 @@ impl BackgroundProcessor {
10941130
&chain_monitor.get_update_future(),
10951131
),
10961132
};
1097-
sleeper.wait_timeout(Duration::from_millis(100));
1133+
let batch_delay = batch_delay.get();
1134+
let fastest_timeout = batch_delay.min(Duration::from_millis(100));
1135+
sleeper.wait_timeout(fastest_timeout);
10981136
},
10991137
|_| Instant::now(),
11001138
|time: &Instant, dur| time.elapsed() > dur,
@@ -1107,6 +1145,7 @@ impl BackgroundProcessor {
11071145
.expect("Time should be sometime after 1970"),
11081146
)
11091147
},
1148+
batch_delay,
11101149
)
11111150
});
11121151
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }

0 commit comments

Comments
 (0)