Skip to content

Commit d9a6641

Browse files
Parallelize persistence in the async bg processor
Co-authored-by: Matt Corallo <[email protected]>
1 parent 269cff7 commit d9a6641

File tree

1 file changed

+177
-32
lines changed
  • lightning-background-processor/src

1 file changed

+177
-32
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 177 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -443,9 +443,117 @@ pub(crate) mod futures_util {
443443
pub(crate) fn dummy_waker() -> Waker {
444444
unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) }
445445
}
446+
447+
enum JoinerResult<E, F: Future<Output = Result<(), E>> + Unpin> {
448+
Pending(Option<F>),
449+
Ready(Result<(), E>),
450+
}
451+
452+
pub(crate) struct Joiner<
453+
E,
454+
A: Future<Output = Result<(), E>> + Unpin,
455+
B: Future<Output = Result<(), E>> + Unpin,
456+
C: Future<Output = Result<(), E>> + Unpin,
457+
D: Future<Output = Result<(), E>> + Unpin,
458+
> {
459+
a: JoinerResult<E, A>,
460+
b: JoinerResult<E, B>,
461+
c: JoinerResult<E, C>,
462+
d: JoinerResult<E, D>,
463+
}
464+
465+
impl<
466+
E,
467+
A: Future<Output = Result<(), E>> + Unpin,
468+
B: Future<Output = Result<(), E>> + Unpin,
469+
C: Future<Output = Result<(), E>> + Unpin,
470+
D: Future<Output = Result<(), E>> + Unpin,
471+
> Joiner<E, A, B, C, D>
472+
{
473+
pub(crate) fn new() -> Self {
474+
Self {
475+
a: JoinerResult::Pending(None),
476+
b: JoinerResult::Pending(None),
477+
c: JoinerResult::Pending(None),
478+
d: JoinerResult::Pending(None),
479+
}
480+
}
481+
482+
pub(crate) fn set_a(&mut self, fut: A) {
483+
self.a = JoinerResult::Pending(Some(fut));
484+
}
485+
pub(crate) fn set_b(&mut self, fut: B) {
486+
self.b = JoinerResult::Pending(Some(fut));
487+
}
488+
pub(crate) fn set_c(&mut self, fut: C) {
489+
self.c = JoinerResult::Pending(Some(fut));
490+
}
491+
pub(crate) fn set_d(&mut self, fut: D) {
492+
self.d = JoinerResult::Pending(Some(fut));
493+
}
494+
}
495+
496+
impl<
497+
E,
498+
A: Future<Output = Result<(), E>> + Unpin,
499+
B: Future<Output = Result<(), E>> + Unpin,
500+
C: Future<Output = Result<(), E>> + Unpin,
501+
D: Future<Output = Result<(), E>> + Unpin,
502+
> Future for Joiner<E, A, B, C, D>
503+
where
504+
Joiner<E, A, B, C, D>: Unpin,
505+
{
506+
type Output = [Result<(), E>; 4];
507+
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<Self::Output> {
508+
let mut all_complete = true;
509+
macro_rules! handle {
510+
($val: ident) => {
511+
match &mut (self.$val) {
512+
JoinerResult::Pending(None) => {
513+
self.$val = JoinerResult::Ready(Ok(()));
514+
},
515+
JoinerResult::<E, _>::Pending(Some(ref mut val)) => {
516+
match Pin::new(val).poll(ctx) {
517+
Poll::Ready(res) => {
518+
self.$val = JoinerResult::Ready(res);
519+
},
520+
Poll::Pending => {
521+
all_complete = false;
522+
},
523+
}
524+
},
525+
JoinerResult::Ready(_) => {},
526+
}
527+
};
528+
}
529+
handle!(a);
530+
handle!(b);
531+
handle!(c);
532+
handle!(d);
533+
534+
if all_complete {
535+
let mut res = [Ok(()), Ok(()), Ok(()), Ok(())];
536+
if let JoinerResult::Ready(ref mut val) = &mut self.a {
537+
core::mem::swap(&mut res[0], val);
538+
}
539+
if let JoinerResult::Ready(ref mut val) = &mut self.b {
540+
core::mem::swap(&mut res[1], val);
541+
}
542+
if let JoinerResult::Ready(ref mut val) = &mut self.c {
543+
core::mem::swap(&mut res[2], val);
544+
}
545+
if let JoinerResult::Ready(ref mut val) = &mut self.d {
546+
core::mem::swap(&mut res[3], val);
547+
}
548+
Poll::Ready(res)
549+
} else {
550+
Poll::Pending
551+
}
552+
}
553+
}
446554
}
447555
use core::task;
448-
use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
556+
use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutput};
449557

450558
/// Processes background events in a future.
451559
///
@@ -812,16 +920,25 @@ where
812920
Some(true) => break,
813921
None => {},
814922
}
923+
924+
let mut futures = Joiner::new();
925+
815926
if channel_manager.get_cm().get_and_clear_needs_persistence() {
816927
log_trace!(logger, "Persisting ChannelManager...");
817-
kv_store
818-
.write(
819-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
820-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
821-
CHANNEL_MANAGER_PERSISTENCE_KEY,
822-
&channel_manager.get_cm().encode(),
823-
)
824-
.await?;
928+
929+
let fut = async {
930+
kv_store
931+
.write(
932+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
933+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
934+
CHANNEL_MANAGER_PERSISTENCE_KEY,
935+
&channel_manager.get_cm().encode(),
936+
)
937+
.await
938+
};
939+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
940+
futures.set_a(Box::pin(fut));
941+
825942
log_trace!(logger, "Done persisting ChannelManager.");
826943
}
827944

@@ -854,17 +971,25 @@ where
854971
log_warn!(logger, "Not pruning network graph, consider implementing the fetch_time argument or calling remove_stale_channels_and_tracking_with_time manually.");
855972
log_trace!(logger, "Persisting network graph.");
856973
}
857-
if let Err(e) = kv_store
858-
.write(
859-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
860-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
861-
NETWORK_GRAPH_PERSISTENCE_KEY,
862-
&network_graph.encode(),
863-
)
864-
.await
865-
{
866-
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
867-
}
974+
let fut = async {
975+
if let Err(e) = kv_store
976+
.write(
977+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
978+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
979+
NETWORK_GRAPH_PERSISTENCE_KEY,
980+
&network_graph.encode(),
981+
)
982+
.await
983+
{
984+
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
985+
}
986+
987+
Ok(())
988+
};
989+
990+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
991+
futures.set_b(Box::pin(fut));
992+
868993
have_pruned = true;
869994
}
870995
let prune_timer =
@@ -889,21 +1014,28 @@ where
8891014
} else {
8901015
log_trace!(logger, "Persisting scorer");
8911016
}
892-
if let Err(e) = kv_store
893-
.write(
894-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
895-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
896-
SCORER_PERSISTENCE_KEY,
897-
&scorer.encode(),
898-
)
899-
.await
900-
{
901-
log_error!(
1017+
let fut = async {
1018+
if let Err(e) = kv_store
1019+
.write(
1020+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1021+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1022+
SCORER_PERSISTENCE_KEY,
1023+
&scorer.encode(),
1024+
)
1025+
.await
1026+
{
1027+
log_error!(
9021028
logger,
9031029
"Error: Failed to persist scorer, check your disk and permissions {}",
9041030
e
9051031
);
906-
}
1032+
}
1033+
1034+
Ok(())
1035+
};
1036+
1037+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1038+
futures.set_c(Box::pin(fut));
9071039
}
9081040
last_scorer_persist_call = sleeper(SCORER_PERSIST_TIMER);
9091041
},
@@ -914,13 +1046,26 @@ where
9141046
Some(false) => {
9151047
log_trace!(logger, "Regenerating sweeper spends if necessary");
9161048
if let Some(ref sweeper) = sweeper {
917-
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1049+
let fut = async {
1050+
let _ = sweeper.regenerate_and_broadcast_spend_if_necessary().await;
1051+
1052+
Ok(())
1053+
};
1054+
1055+
// TODO: Once our MSRV is 1.68 we should be able to drop the Box
1056+
futures.set_d(Box::pin(fut));
9181057
}
9191058
last_sweeper_call = sleeper(SWEEPER_TIMER);
9201059
},
9211060
Some(true) => break,
9221061
None => {},
9231062
}
1063+
1064+
// Run persistence tasks in parallel and exit if any of them returns an error.
1065+
for res in futures.await {
1066+
res?;
1067+
}
1068+
9241069
match check_sleeper(&mut last_onion_message_handler_call) {
9251070
Some(false) => {
9261071
if let Some(om) = &onion_messenger {

0 commit comments

Comments
 (0)