Skip to content

Commit d3dd617

Browse files
authored
Merge pull request #3978 from joostjager/remove-bg-macro-follow-up
Async background processor check_sleeper refactor
2 parents 7e68f54 + d91575d commit d3dd617

File tree

1 file changed

+34
-31
lines changed
  • lightning-background-processor/src

1 file changed

+34
-31
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -832,8 +832,7 @@ where
832832
let mut have_pruned = false;
833833
let mut have_decayed_scorer = false;
834834

835-
let mut cur_batch_delay = batch_delay.get();
836-
let mut last_forwards_processing_call = sleeper(cur_batch_delay);
835+
let mut last_forwards_processing_call = sleeper(batch_delay.get());
837836

838837
loop {
839838
channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
@@ -854,11 +853,11 @@ where
854853
// generally, and as a fallback place such blocking only immediately before
855854
// persistence.
856855
peer_manager.as_ref().process_events();
857-
match check_sleeper(&mut last_forwards_processing_call) {
856+
match check_and_reset_sleeper(&mut last_forwards_processing_call, || {
857+
sleeper(batch_delay.next())
858+
}) {
858859
Some(false) => {
859860
channel_manager.get_cm().process_pending_htlc_forwards();
860-
cur_batch_delay = batch_delay.next();
861-
last_forwards_processing_call = sleeper(cur_batch_delay);
862861
},
863862
Some(true) => break,
864863
None => {},
@@ -906,19 +905,20 @@ where
906905
}
907906

908907
let await_slow = if mobile_interruptable_platform {
909-
match check_sleeper(&mut await_start.unwrap()) {
908+
// Specify a zero new sleeper timeout because we won't use the new sleeper. It is re-initialized in the next
909+
// loop iteration.
910+
match check_and_reset_sleeper(&mut await_start.unwrap(), || sleeper(Duration::ZERO)) {
910911
Some(true) => break,
911912
Some(false) => true,
912913
None => false,
913914
}
914915
} else {
915916
false
916917
};
917-
match check_sleeper(&mut last_freshness_call) {
918+
match check_and_reset_sleeper(&mut last_freshness_call, || sleeper(FRESHNESS_TIMER)) {
918919
Some(false) => {
919920
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
920921
channel_manager.get_cm().timer_tick_occurred();
921-
last_freshness_call = sleeper(FRESHNESS_TIMER);
922922
},
923923
Some(true) => break,
924924
None => {},
@@ -963,8 +963,13 @@ where
963963
// pruning their network graph. We run once 60 seconds after startup before
964964
// continuing our normal cadence. For RGS, since 60 seconds is likely too long,
965965
// we prune after an initial sync completes.
966+
let prune_timer = if gossip_sync.prunable_network_graph().is_some() {
967+
NETWORK_PRUNE_TIMER
968+
} else {
969+
FIRST_NETWORK_PRUNE_TIMER
970+
};
966971
let prune_timer_elapsed = {
967-
match check_sleeper(&mut last_prune_call) {
972+
match check_and_reset_sleeper(&mut last_prune_call, || sleeper(prune_timer)) {
968973
Some(false) => true,
969974
Some(true) => break,
970975
None => false,
@@ -1008,9 +1013,6 @@ where
10081013

10091014
have_pruned = true;
10101015
}
1011-
let prune_timer =
1012-
if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
1013-
last_prune_call = sleeper(prune_timer);
10141016
}
10151017
if !have_decayed_scorer {
10161018
if let Some(ref scorer) = scorer {
@@ -1021,7 +1023,9 @@ where
10211023
}
10221024
have_decayed_scorer = true;
10231025
}
1024-
match check_sleeper(&mut last_scorer_persist_call) {
1026+
match check_and_reset_sleeper(&mut last_scorer_persist_call, || {
1027+
sleeper(SCORER_PERSIST_TIMER)
1028+
}) {
10251029
Some(false) => {
10261030
if let Some(ref scorer) = scorer {
10271031
if let Some(duration_since_epoch) = fetch_time() {
@@ -1053,12 +1057,11 @@ where
10531057
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
10541058
futures.set_c(Box::pin(fut));
10551059
}
1056-
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
10571060
},
10581061
Some(true) => break,
10591062
None => {},
10601063
}
1061-
match check_sleeper(&mut last_sweeper_call) {
1064+
match check_and_reset_sleeper(&mut last_sweeper_call, || sleeper(SWEEPER_TIMER)) {
10621065
Some(false) => {
10631066
log_trace!(logger, "Regenerating sweeper spends if necessary");
10641067
if let Some(ref sweeper) = sweeper {
@@ -1071,7 +1074,6 @@ where
10711074
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
10721075
futures.set_d(Box::pin(fut));
10731076
}
1074-
last_sweeper_call = sleeper(SWEEPER_TIMER);
10751077
},
10761078
Some(true) => break,
10771079
None => {},
@@ -1082,13 +1084,14 @@ where
10821084
res?;
10831085
}
10841086

1085-
match check_sleeper(&mut last_onion_message_handler_call) {
1087+
match check_and_reset_sleeper(&mut last_onion_message_handler_call, || {
1088+
sleeper(ONION_MESSAGE_HANDLER_TIMER)
1089+
}) {
10861090
Some(false) => {
10871091
if let Some(om) = &onion_messenger {
10881092
log_trace!(logger, "Calling OnionMessageHandler's timer_tick_occurred");
10891093
om.get_om().timer_tick_occurred();
10901094
}
1091-
last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
10921095
},
10931096
Some(true) => break,
10941097
None => {},
@@ -1112,23 +1115,21 @@ where
11121115
peer_manager.as_ref().disconnect_all_peers();
11131116
last_ping_call = sleeper(PING_TIMER);
11141117
} else {
1115-
match check_sleeper(&mut last_ping_call) {
1118+
match check_and_reset_sleeper(&mut last_ping_call, || sleeper(PING_TIMER)) {
11161119
Some(false) => {
11171120
log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
11181121
peer_manager.as_ref().timer_tick_occurred();
1119-
last_ping_call = sleeper(PING_TIMER);
11201122
},
11211123
Some(true) => break,
11221124
_ => {},
11231125
}
11241126
}
11251127

11261128
// Rebroadcast pending claims.
1127-
match check_sleeper(&mut last_rebroadcast_call) {
1129+
match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
11281130
Some(false) => {
11291131
log_trace!(logger, "Rebroadcasting monitor's pending claims");
11301132
chain_monitor.rebroadcast_pending_claims();
1131-
last_rebroadcast_call = sleeper(REBROADCAST_TIMER);
11321133
},
11331134
Some(true) => break,
11341135
None => {},
@@ -1170,13 +1171,18 @@ where
11701171
Ok(())
11711172
}
11721173

1173-
fn check_sleeper<SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin>(
1174-
fut: &mut SleepFuture,
1174+
fn check_and_reset_sleeper<
1175+
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
1176+
>(
1177+
fut: &mut SleepFuture, mut new_sleeper: impl FnMut() -> SleepFuture,
11751178
) -> Option<bool> {
11761179
let mut waker = dummy_waker();
11771180
let mut ctx = task::Context::from_waker(&mut waker);
1178-
match core::pin::Pin::new(fut).poll(&mut ctx) {
1179-
task::Poll::Ready(exit) => Some(exit),
1181+
match core::pin::Pin::new(&mut *fut).poll(&mut ctx) {
1182+
task::Poll::Ready(exit) => {
1183+
*fut = new_sleeper();
1184+
Some(exit)
1185+
},
11801186
task::Poll::Pending => None,
11811187
}
11821188
}
@@ -1502,7 +1508,7 @@ impl BackgroundProcessor {
15021508
NETWORK_GRAPH_PERSISTENCE_KEY,
15031509
&network_graph.encode(),
15041510
) {
1505-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
1511+
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e);
15061512
}
15071513
have_pruned = true;
15081514
}
@@ -1531,10 +1537,7 @@ impl BackgroundProcessor {
15311537
SCORER_PERSISTENCE_KEY,
15321538
&scorer.encode(),
15331539
) {
1534-
log_error!(logger,
1535-
"Error: Failed to persist scorer, check your disk and permissions {}",
1536-
e,
1537-
);
1540+
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
15381541
}
15391542
}
15401543
last_scorer_persist_call = Instant::now();

0 commit comments

Comments
 (0)