@@ -40,7 +40,7 @@ use lightning::sign::ChangeDestinationSourceSync;
40
40
use lightning::sign::EntropySource;
41
41
use lightning::sign::OutputSpender;
42
42
use lightning::util::logger::Logger;
43
- use lightning::util::persist::{KVStoreSync, PersisterSync};
43
+ use lightning::util::persist::{KVStoreSync, Persister, PersisterSync};
44
44
use lightning::util::sweep::OutputSweeper;
45
45
#[cfg(feature = "std")]
46
46
use lightning::util::sweep::OutputSweeperSync;
@@ -311,6 +311,15 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
311
311
true
312
312
}
313
313
314
+ macro_rules! maybe_await {
315
+ (true, $e:expr) => {
316
+ $e.await
317
+ };
318
+ (false, $e:expr) => {
319
+ $e
320
+ };
321
+ }
322
+
314
323
macro_rules! define_run_body {
315
324
(
316
325
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
@@ -319,7 +328,7 @@ macro_rules! define_run_body {
319
328
$peer_manager: ident, $gossip_sync: ident,
320
329
$process_sweeper: expr,
321
330
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
322
- $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
331
+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, $async: tt,
323
332
) => { {
324
333
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
325
334
$channel_manager.get_cm().timer_tick_occurred();
@@ -375,7 +384,7 @@ macro_rules! define_run_body {
375
384
376
385
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377
386
log_trace!($logger, "Persisting ChannelManager...");
378
- $ persister.persist_manager(&$channel_manager)?;
387
+ maybe_await!($async, $ persister.persist_manager(&$channel_manager) )?;
379
388
log_trace!($logger, "Done persisting ChannelManager.");
380
389
}
381
390
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +445,7 @@ macro_rules! define_run_body {
436
445
log_trace!($logger, "Persisting network graph.");
437
446
}
438
447
439
- if let Err(e) = $ persister.persist_graph(network_graph) {
448
+ if let Err(e) = maybe_await!($async, $ persister.persist_graph(network_graph) ) {
440
449
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441
450
}
442
451
@@ -464,8 +473,8 @@ macro_rules! define_run_body {
464
473
} else {
465
474
log_trace!($logger, "Persisting scorer");
466
475
}
467
- if let Err(e) = $ persister.persist_scorer(&scorer) {
468
- log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
476
+ if let Err(e) = maybe_await!($async, $ persister.persist_scorer(&scorer) ) {
477
+ log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469
478
}
470
479
}
471
480
last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
@@ -487,16 +496,16 @@ macro_rules! define_run_body {
487
496
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488
497
// some races where users quit while channel updates were in-flight, with
489
498
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490
- $ persister.persist_manager(&$channel_manager)?;
499
+ maybe_await!($async, $ persister.persist_manager(&$channel_manager) )?;
491
500
492
501
// Persist Scorer on exit
493
502
if let Some(ref scorer) = $scorer {
494
- $ persister.persist_scorer(&scorer)?;
503
+ maybe_await!($async, $ persister.persist_scorer(&scorer) )?;
495
504
}
496
505
497
506
// Persist NetworkGraph on exit
498
507
if let Some(network_graph) = $gossip_sync.network_graph() {
499
- $ persister.persist_graph(network_graph)?;
508
+ maybe_await!($async, $ persister.persist_graph(network_graph) )?;
500
509
}
501
510
502
511
Ok(())
@@ -643,22 +652,30 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
643
652
/// ```
644
653
/// # use lightning::io;
645
654
/// # use lightning::events::ReplayEvent;
646
- /// # use lightning::util::sweep::OutputSweeper;
647
655
/// # use std::sync::{Arc, RwLock};
648
656
/// # use std::sync::atomic::{AtomicBool, Ordering};
649
657
/// # use std::time::SystemTime;
650
658
/// # use lightning_background_processor::{process_events_async, GossipSync};
659
+ /// # use core::future::Future;
660
+ /// # use core::pin::Pin;
651
661
/// # struct Logger {}
652
662
/// # impl lightning::util::logger::Logger for Logger {
653
663
/// # fn log(&self, _record: lightning::util::logger::Record) {}
654
664
/// # }
655
- /// # struct Store {}
656
- /// # impl lightning::util::persist::KVStore for Store {
665
+ /// # struct StoreSync {}
666
+ /// # impl lightning::util::persist::KVStoreSync for StoreSync {
657
667
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
658
668
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
659
669
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
660
670
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
661
671
/// # }
672
+ /// # struct Store {}
673
+ /// # impl lightning::util::persist::KVStore for Store {
674
+ /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
675
+ /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
676
+ /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
677
+ /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
678
+ /// # }
662
679
/// # struct EventHandler {}
663
680
/// # impl EventHandler {
664
681
/// # async fn handle_event(&self, _: lightning::events::Event) -> Result<(), ReplayEvent> { Ok(()) }
@@ -669,22 +686,22 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
669
686
/// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
670
687
/// # fn disconnect_socket(&mut self) {}
671
688
/// # }
672
- /// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store >, Arc<lightning::sign::KeysManager>>;
689
+ /// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync >, Arc<lightning::sign::KeysManager>>;
673
690
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
674
691
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
675
692
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
676
693
/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
677
694
/// # type LiquidityManager<B, F, FE> = lightning_liquidity::LiquidityManager<Arc<lightning::sign::KeysManager>, Arc<ChannelManager<B, F, FE>>, Arc<F>>;
678
695
/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
679
- /// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, Store>;
680
- /// #
696
+ /// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger, F, StoreSync>;
697
+ /// # type OutputSweeper<B, D, FE, F, O> = lightning::util::sweep::OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<StoreSync>, Arc<Logger>, Arc<O>>;
698
+ ///
681
699
/// # struct Node<
682
700
/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
683
701
/// # F: lightning::chain::Filter + Send + Sync + 'static,
684
702
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685
703
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686
704
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687
- /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
688
705
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689
706
/// # > {
690
707
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -697,7 +714,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697
714
/// # persister: Arc<Store>,
698
715
/// # logger: Arc<Logger>,
699
716
/// # scorer: Arc<Scorer>,
700
- /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O> >>,
717
+ /// # sweeper: Arc<OutputSweeper<B, D, FE, F, O >>,
701
718
/// # }
702
719
/// #
703
720
/// # async fn setup_background_processing<
@@ -706,10 +723,9 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706
723
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707
724
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708
725
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709
- /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
710
726
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711
- /// # >(node: Node<B, F, FE, UL, D, K, O>) {
712
- /// let background_persister = Arc::clone(&node.persister);
727
+ /// # >(node: Node<B, F, FE, UL, D, O>) {
728
+ /// let background_persister = Arc::new(Arc:: clone(&node.persister) );
713
729
/// let background_event_handler = Arc::clone(&node.event_handler);
714
730
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
715
731
/// let background_chan_man = Arc::clone(&node.channel_manager);
@@ -814,7 +830,7 @@ where
814
830
F::Target: 'static + FeeEstimator,
815
831
L::Target: 'static + Logger,
816
832
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
817
- PS::Target: 'static + PersisterSync <'a, CM, L, S>,
833
+ PS::Target: 'static + Persister <'a, CM, L, S>,
818
834
ES::Target: 'static + EntropySource,
819
835
CM::Target: AChannelManager,
820
836
OM::Target: AOnionMessenger,
@@ -841,7 +857,7 @@ where
841
857
if let Some(duration_since_epoch) = fetch_time() {
842
858
if update_scorer(scorer, &event, duration_since_epoch) {
843
859
log_trace!(logger, "Persisting scorer after update");
844
- if let Err(e) = persister.persist_scorer(&*scorer) {
860
+ if let Err(e) = persister.persist_scorer(&*scorer).await {
845
861
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846
862
// We opt not to abort early on persistence failure here as persisting
847
863
// the scorer is non-critical and we still hope that it will have
@@ -919,6 +935,7 @@ where
919
935
},
920
936
mobile_interruptable_platform,
921
937
fetch_time,
938
+ true,
922
939
)
923
940
}
924
941
@@ -1098,6 +1115,7 @@ impl BackgroundProcessor {
1098
1115
.expect("Time should be sometime after 1970"),
1099
1116
)
1100
1117
},
1118
+ false,
1101
1119
)
1102
1120
});
1103
1121
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -1162,6 +1180,8 @@ mod tests {
1162
1180
use bitcoin::transaction::Version;
1163
1181
use bitcoin::transaction::{Transaction, TxOut};
1164
1182
use bitcoin::{Amount, ScriptBuf, Txid};
1183
+ use core::future::Future;
1184
+ use core::pin::Pin;
1165
1185
use core::sync::atomic::{AtomicBool, Ordering};
1166
1186
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
1167
1187
use lightning::chain::transaction::OutPoint;
@@ -1186,7 +1206,7 @@ mod tests {
1186
1206
use lightning::types::payment::PaymentHash;
1187
1207
use lightning::util::config::UserConfig;
1188
1208
use lightning::util::persist::{
1189
- KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
1209
+ KVStore, KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
1190
1210
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1191
1211
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
1192
1212
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
@@ -1483,6 +1503,42 @@ mod tests {
1483
1503
}
1484
1504
}
1485
1505
1506
+ struct PersisterSyncWrapper(PersisterSync);
1507
+
1508
+ impl KVStore for PersisterSyncWrapper {
1509
+ fn read(
1510
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str,
1511
+ ) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
1512
+ let res = self.0.read(primary_namespace, secondary_namespace, key);
1513
+
1514
+ Box::pin(async move { res })
1515
+ }
1516
+
1517
+ fn write(
1518
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
1519
+ ) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
1520
+ let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
1521
+
1522
+ Box::pin(async move { res })
1523
+ }
1524
+
1525
+ fn remove(
1526
+ &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
1527
+ ) -> Result<(), lightning::io::Error> {
1528
+ let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy);
1529
+
1530
+ return res;
1531
+ }
1532
+
1533
+ fn list(
1534
+ &self, primary_namespace: &str, secondary_namespace: &str,
1535
+ ) -> Result<Vec<String>, lightning::io::Error> {
1536
+ let res = self.0.list(primary_namespace, secondary_namespace);
1537
+
1538
+ return res;
1539
+ }
1540
+ }
1541
+
1486
1542
struct TestScorer {
1487
1543
event_expectations: Option<VecDeque<TestResult>>,
1488
1544
}
@@ -2107,9 +2163,9 @@ mod tests {
2107
2163
open_channel!(nodes[0], nodes[1], 100000);
2108
2164
2109
2165
let data_dir = nodes[0].kv_store.get_data_dir();
2110
- let persister = Arc::new(
2111
- PersisterSync::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"),
2112
- );
2166
+ let persister_sync =
2167
+ PersisterSync::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test");
2168
+ let persister = Arc::new(Arc::new(PersisterSyncWrapper(persister_sync)) );
2113
2169
2114
2170
let bp_future = super::process_events_async(
2115
2171
persister,
@@ -2618,8 +2674,8 @@ mod tests {
2618
2674
let (_, nodes) =
2619
2675
create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
2620
2676
let data_dir = nodes[0].kv_store.get_data_dir();
2621
- let persister =
2622
- Arc::new(PersisterSync ::new(data_dir).with_graph_persistence_notifier(sender ));
2677
+ let persister_sync = PersisterSync::new(data_dir).with_graph_persistence_notifier(sender);
2678
+ let persister = Arc::new(Arc ::new(PersisterSyncWrapper(persister_sync) ));
2623
2679
2624
2680
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2625
2681
let bp_future = super::process_events_async(
@@ -2835,7 +2891,8 @@ mod tests {
2835
2891
2836
2892
let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
2837
2893
let data_dir = nodes[0].kv_store.get_data_dir();
2838
- let persister = Arc::new(PersisterSync::new(data_dir));
2894
+ let persister_sync = PersisterSync::new(data_dir);
2895
+ let persister = Arc::new(Arc::new(PersisterSyncWrapper(persister_sync)));
2839
2896
2840
2897
let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2841
2898
0 commit comments