@@ -30,6 +30,7 @@ mod fwd_batch;
30
30
31
31
use fwd_batch:: BatchDelay ;
32
32
33
+ use crate :: lightning:: util:: ser:: Writeable ;
33
34
use lightning:: chain;
34
35
use lightning:: chain:: chaininterface:: { BroadcasterInterface , FeeEstimator } ;
35
36
use lightning:: chain:: chainmonitor:: { ChainMonitor , Persist } ;
@@ -53,7 +54,13 @@ use lightning::sign::ChangeDestinationSourceSync;
53
54
use lightning:: sign:: EntropySource ;
54
55
use lightning:: sign:: OutputSpender ;
55
56
use lightning:: util:: logger:: Logger ;
56
- use lightning:: util:: persist:: { KVStore , Persister } ;
57
+ use lightning:: util:: persist:: {
58
+ KVStore , CHANNEL_MANAGER_PERSISTENCE_KEY , CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
59
+ CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_KEY ,
60
+ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE , NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
61
+ SCORER_PERSISTENCE_KEY , SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
62
+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
63
+ } ;
57
64
use lightning:: util:: sweep:: OutputSweeper ;
58
65
#[ cfg( feature = "std" ) ]
59
66
use lightning:: util:: sweep:: OutputSweeperSync ;
@@ -326,7 +333,8 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
326
333
327
334
macro_rules! define_run_body {
328
335
(
329
- $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
336
+ $kv_store: ident,
337
+ $chain_monitor: ident, $process_chain_monitor_events: expr,
330
338
$channel_manager: ident, $process_channel_manager_events: expr,
331
339
$onion_messenger: ident, $process_onion_message_handler_events: expr,
332
340
$peer_manager: ident, $gossip_sync: ident,
@@ -404,7 +412,12 @@ macro_rules! define_run_body {
404
412
405
413
if $channel_manager. get_cm( ) . get_and_clear_needs_persistence( ) {
406
414
log_trace!( $logger, "Persisting ChannelManager..." ) ;
407
- $persister. persist_manager( & $channel_manager) ?;
415
+ $kv_store. write(
416
+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
417
+ CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
418
+ CHANNEL_MANAGER_PERSISTENCE_KEY ,
419
+ & $channel_manager. get_cm( ) . encode( ) ,
420
+ ) ?;
408
421
log_trace!( $logger, "Done persisting ChannelManager." ) ;
409
422
}
410
423
if $timer_elapsed( & mut last_freshness_call, FRESHNESS_TIMER ) {
@@ -465,7 +478,12 @@ macro_rules! define_run_body {
465
478
log_trace!( $logger, "Persisting network graph." ) ;
466
479
}
467
480
468
- if let Err ( e) = $persister. persist_graph( network_graph) {
481
+ if let Err ( e) = $kv_store. write(
482
+ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
483
+ NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
484
+ NETWORK_GRAPH_PERSISTENCE_KEY ,
485
+ & network_graph. encode( ) ,
486
+ ) {
469
487
log_error!( $logger, "Error: Failed to persist network graph, check your disk and permissions {}" , e)
470
488
}
471
489
@@ -493,7 +511,12 @@ macro_rules! define_run_body {
493
511
} else {
494
512
log_trace!( $logger, "Persisting scorer" ) ;
495
513
}
496
- if let Err ( e) = $persister. persist_scorer( & scorer) {
514
+ if let Err ( e) = $kv_store. write(
515
+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
516
+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
517
+ SCORER_PERSISTENCE_KEY ,
518
+ & scorer. encode( ) ,
519
+ ) {
497
520
log_error!( $logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
498
521
}
499
522
}
@@ -516,16 +539,31 @@ macro_rules! define_run_body {
516
539
// After we exit, ensure we persist the ChannelManager one final time - this avoids
517
540
// some races where users quit while channel updates were in-flight, with
518
541
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
519
- $persister. persist_manager( & $channel_manager) ?;
542
+ $kv_store. write(
543
+ CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE ,
544
+ CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE ,
545
+ CHANNEL_MANAGER_PERSISTENCE_KEY ,
546
+ & $channel_manager. get_cm( ) . encode( ) ,
547
+ ) ?;
520
548
521
549
// Persist Scorer on exit
522
550
if let Some ( ref scorer) = $scorer {
523
- $persister. persist_scorer( & scorer) ?;
551
+ $kv_store. write(
552
+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
553
+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
554
+ SCORER_PERSISTENCE_KEY ,
555
+ & scorer. encode( ) ,
556
+ ) ?;
524
557
}
525
558
526
559
// Persist NetworkGraph on exit
527
560
if let Some ( network_graph) = $gossip_sync. network_graph( ) {
528
- $persister. persist_graph( network_graph) ?;
561
+ $kv_store. write(
562
+ NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE ,
563
+ NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE ,
564
+ NETWORK_GRAPH_PERSISTENCE_KEY ,
565
+ & network_graph. encode( ) ,
566
+ ) ?;
529
567
}
530
568
531
569
Ok ( ( ) )
@@ -723,7 +761,6 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
723
761
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
724
762
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
725
763
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
726
- /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
727
764
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
728
765
/// # > {
729
766
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -736,7 +773,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
736
773
/// # persister: Arc<Store>,
737
774
/// # logger: Arc<Logger>,
738
775
/// # scorer: Arc<Scorer>,
739
- /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K >, Arc<Logger>, Arc<O>>>,
776
+ /// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store >, Arc<Logger>, Arc<O>>>,
740
777
/// # }
741
778
/// #
742
779
/// # async fn setup_background_processing<
@@ -745,9 +782,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
745
782
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
746
783
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
747
784
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
748
- /// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
749
785
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
750
- /// # >(node: Node<B, F, FE, UL, D, K, O>) {
786
+ /// # >(node: Node<B, F, FE, UL, D, O>) {
751
787
/// let background_persister = Arc::clone(&node.persister);
752
788
/// let background_event_handler = Arc::clone(&node.event_handler);
753
789
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -819,7 +855,6 @@ pub async fn process_events_async<
819
855
P : ' static + Deref ,
820
856
EventHandlerFuture : core:: future:: Future < Output = Result < ( ) , ReplayEvent > > ,
821
857
EventHandler : Fn ( Event ) -> EventHandlerFuture ,
822
- PS : ' static + Deref + Send ,
823
858
ES : ' static + Deref + Send ,
824
859
M : ' static
825
860
+ Deref < Target = ChainMonitor < <CM :: Target as AChannelManager >:: Signer , CF , T , F , L , P , ES > >
@@ -841,7 +876,7 @@ pub async fn process_events_async<
841
876
Sleeper : Fn ( Duration ) -> SleepFuture ,
842
877
FetchTime : Fn ( ) -> Option < Duration > ,
843
878
> (
844
- persister : PS , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
879
+ kv_store : K , event_handler : EventHandler , chain_monitor : M , channel_manager : CM ,
845
880
onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
846
881
liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
847
882
sleeper : Sleeper , mobile_interruptable_platform : bool , fetch_time : FetchTime ,
@@ -853,7 +888,6 @@ where
853
888
F :: Target : ' static + FeeEstimator ,
854
889
L :: Target : ' static + Logger ,
855
890
P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
856
- PS :: Target : ' static + Persister < ' a , CM , L , S > ,
857
891
ES :: Target : ' static + EntropySource ,
858
892
CM :: Target : AChannelManager ,
859
893
OM :: Target : AOnionMessenger ,
@@ -869,7 +903,7 @@ where
869
903
let event_handler = & event_handler;
870
904
let scorer = & scorer;
871
905
let logger = & logger;
872
- let persister = & persister ;
906
+ let kv_store = & kv_store ;
873
907
let fetch_time = & fetch_time;
874
908
// We should be able to drop the Box once our MSRV is 1.68
875
909
Box :: pin ( async move {
@@ -880,7 +914,12 @@ where
880
914
if let Some ( duration_since_epoch) = fetch_time ( ) {
881
915
if update_scorer ( scorer, & event, duration_since_epoch) {
882
916
log_trace ! ( logger, "Persisting scorer after update" ) ;
883
- if let Err ( e) = persister. persist_scorer ( & * scorer) {
917
+ if let Err ( e) = kv_store. write (
918
+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
919
+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
920
+ SCORER_PERSISTENCE_KEY ,
921
+ & scorer. encode ( ) ,
922
+ ) {
884
923
log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e) ;
885
924
// We opt not to abort early on persistence failure here as persisting
886
925
// the scorer is non-critical and we still hope that it will have
@@ -895,7 +934,7 @@ where
895
934
} ;
896
935
let mut batch_delay = BatchDelay :: new ( ) ;
897
936
define_run_body ! (
898
- persister ,
937
+ kv_store ,
899
938
chain_monitor,
900
939
chain_monitor. process_pending_events_async( async_event_handler) . await ,
901
940
channel_manager,
@@ -977,21 +1016,21 @@ impl BackgroundProcessor {
977
1016
/// documentation].
978
1017
///
979
1018
/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
980
- /// [`Persister::persist_manager `] returns an error. In case of an error, the error is retrieved by calling
1019
+ /// [`KVStore `] returns an error. In case of an error, the error is retrieved by calling
981
1020
/// either [`join`] or [`stop`].
982
1021
///
983
1022
/// # Data Persistence
984
1023
///
985
- /// [`Persister::persist_manager `] is responsible for writing out the [`ChannelManager`] to disk, and/or
1024
+ /// [`KVStore `] is responsible for writing out the [`ChannelManager`] to disk, and/or
986
1025
/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
987
1026
/// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
988
1027
/// provided implementation.
989
1028
///
990
- /// [`Persister::persist_graph `] is responsible for writing out the [`NetworkGraph`] to disk, if
1029
+ /// [`KVStore `] is also responsible for writing out the [`NetworkGraph`] to disk, if
991
1030
/// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
992
1031
/// See the `lightning-persister` crate for LDK's provided implementation.
993
1032
///
994
- /// Typically, users should either implement [`Persister::persist_manager `] to never return an
1033
+ /// Typically, users should either implement [`KVStore `] to never return an
995
1034
/// error or call [`join`] and handle any error that may arise. For the latter case,
996
1035
/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
997
1036
///
@@ -1013,8 +1052,6 @@ impl BackgroundProcessor {
1013
1052
/// [`stop`]: Self::stop
1014
1053
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
1015
1054
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
1016
- /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
1017
- /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
1018
1055
/// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
1019
1056
/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
1020
1057
pub fn start <
@@ -1027,7 +1064,6 @@ impl BackgroundProcessor {
1027
1064
L : ' static + Deref + Send ,
1028
1065
P : ' static + Deref ,
1029
1066
EH : ' static + EventHandler + Send ,
1030
- PS : ' static + Deref + Send ,
1031
1067
ES : ' static + Deref + Send ,
1032
1068
M : ' static
1033
1069
+ Deref <
@@ -1045,10 +1081,10 @@ impl BackgroundProcessor {
1045
1081
SC : for < ' b > WriteableScore < ' b > ,
1046
1082
D : ' static + Deref ,
1047
1083
O : ' static + Deref ,
1048
- K : ' static + Deref ,
1084
+ K : ' static + Deref + Send ,
1049
1085
OS : ' static + Deref < Target = OutputSweeperSync < T , D , F , CF , K , L , O > > + Send ,
1050
1086
> (
1051
- persister : PS , event_handler : EH , chain_monitor : M , channel_manager : CM ,
1087
+ kv_store : K , event_handler : EH , chain_monitor : M , channel_manager : CM ,
1052
1088
onion_messenger : Option < OM > , gossip_sync : GossipSync < PGS , RGS , G , UL , L > , peer_manager : PM ,
1053
1089
liquidity_manager : Option < LM > , sweeper : Option < OS > , logger : L , scorer : Option < S > ,
1054
1090
) -> Self
@@ -1059,7 +1095,6 @@ impl BackgroundProcessor {
1059
1095
F :: Target : ' static + FeeEstimator ,
1060
1096
L :: Target : ' static + Logger ,
1061
1097
P :: Target : ' static + Persist < <CM :: Target as AChannelManager >:: Signer > ,
1062
- PS :: Target : ' static + Persister < ' a , CM , L , S > ,
1063
1098
ES :: Target : ' static + EntropySource ,
1064
1099
CM :: Target : AChannelManager ,
1065
1100
OM :: Target : AOnionMessenger ,
@@ -1084,7 +1119,12 @@ impl BackgroundProcessor {
1084
1119
. expect ( "Time should be sometime after 1970" ) ;
1085
1120
if update_scorer ( scorer, & event, duration_since_epoch) {
1086
1121
log_trace ! ( logger, "Persisting scorer after update" ) ;
1087
- if let Err ( e) = persister. persist_scorer ( & scorer) {
1122
+ if let Err ( e) = kv_store. write (
1123
+ SCORER_PERSISTENCE_PRIMARY_NAMESPACE ,
1124
+ SCORER_PERSISTENCE_SECONDARY_NAMESPACE ,
1125
+ SCORER_PERSISTENCE_KEY ,
1126
+ & scorer. encode ( ) ,
1127
+ ) {
1088
1128
log_error ! ( logger, "Error: Failed to persist scorer, check your disk and permissions {}" , e)
1089
1129
}
1090
1130
}
@@ -1093,7 +1133,7 @@ impl BackgroundProcessor {
1093
1133
} ;
1094
1134
let mut batch_delay = BatchDelay :: new ( ) ;
1095
1135
define_run_body ! (
1096
- persister ,
1136
+ kv_store ,
1097
1137
chain_monitor,
1098
1138
chain_monitor. process_pending_events( & event_handler) ,
1099
1139
channel_manager,
@@ -1314,7 +1354,7 @@ mod tests {
1314
1354
Arc < test_utils:: TestBroadcaster > ,
1315
1355
Arc < test_utils:: TestFeeEstimator > ,
1316
1356
Arc < test_utils:: TestLogger > ,
1317
- Arc < FilesystemStore > ,
1357
+ Arc < Persister > ,
1318
1358
Arc < KeysManager > ,
1319
1359
> ;
1320
1360
@@ -1372,7 +1412,7 @@ mod tests {
1372
1412
> ,
1373
1413
liquidity_manager : Arc < LM > ,
1374
1414
chain_monitor : Arc < ChainMonitor > ,
1375
- kv_store : Arc < FilesystemStore > ,
1415
+ kv_store : Arc < Persister > ,
1376
1416
tx_broadcaster : Arc < test_utils:: TestBroadcaster > ,
1377
1417
network_graph : Arc < NetworkGraph < Arc < test_utils:: TestLogger > > > ,
1378
1418
logger : Arc < test_utils:: TestLogger > ,
@@ -1384,7 +1424,7 @@ mod tests {
1384
1424
Arc < TestWallet > ,
1385
1425
Arc < test_utils:: TestFeeEstimator > ,
1386
1426
Arc < test_utils:: TestChainSource > ,
1387
- Arc < FilesystemStore > ,
1427
+ Arc < Persister > ,
1388
1428
Arc < test_utils:: TestLogger > ,
1389
1429
Arc < KeysManager > ,
1390
1430
> ,
@@ -1476,6 +1516,10 @@ mod tests {
1476
1516
fn with_scorer_error ( self , error : std:: io:: ErrorKind , message : & ' static str ) -> Self {
1477
1517
Self { scorer_error : Some ( ( error, message) ) , ..self }
1478
1518
}
1519
+
1520
+ pub fn get_data_dir ( & self ) -> PathBuf {
1521
+ self . kv_store . get_data_dir ( )
1522
+ }
1479
1523
}
1480
1524
1481
1525
impl KVStore for Persister {
@@ -1720,7 +1764,7 @@ mod tests {
1720
1764
) ) ;
1721
1765
let chain_source = Arc :: new ( test_utils:: TestChainSource :: new ( Network :: Bitcoin ) ) ;
1722
1766
let kv_store =
1723
- Arc :: new ( FilesystemStore :: new ( format ! ( "{}_persister_{}" , & persist_dir, i) . into ( ) ) ) ;
1767
+ Arc :: new ( Persister :: new ( format ! ( "{}_persister_{}" , & persist_dir, i) . into ( ) ) ) ;
1724
1768
let now = Duration :: from_secs ( genesis_block. header . time as u64 ) ;
1725
1769
let keys_manager = Arc :: new ( KeysManager :: new ( & seed, now. as_secs ( ) , now. subsec_nanos ( ) ) ) ;
1726
1770
let chain_monitor = Arc :: new ( chainmonitor:: ChainMonitor :: new (
0 commit comments