Skip to content

Commit 90411b1

Browse files
committed
Remove Persister
In preparation for the addition of an async KVStore, we here remove the Persister pseudo-wrapper. The wrapper is thin, would need to be duplicated for async, and KVStore isn't fully abstracted anyway anymore because the sweeper takes it directly.
1 parent 17c469c commit 90411b1

File tree

3 files changed

+92
-94
lines changed

3 files changed

+92
-94
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 78 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ extern crate alloc;
1717
extern crate lightning;
1818
extern crate lightning_rapid_gossip_sync;
1919

20+
use crate::lightning::util::ser::Writeable;
2021
use lightning::chain;
2122
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
2223
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
@@ -40,7 +41,13 @@ use lightning::sign::ChangeDestinationSourceSync;
4041
use lightning::sign::EntropySource;
4142
use lightning::sign::OutputSpender;
4243
use lightning::util::logger::Logger;
43-
use lightning::util::persist::{KVStore, Persister};
44+
use lightning::util::persist::{
45+
KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
46+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
47+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
48+
SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
49+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
50+
};
4451
use lightning::util::sweep::OutputSweeper;
4552
#[cfg(feature = "std")]
4653
use lightning::util::sweep::OutputSweeperSync;
@@ -313,7 +320,8 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
313320

314321
macro_rules! define_run_body {
315322
(
316-
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
323+
$kv_store: ident,
324+
$chain_monitor: ident, $process_chain_monitor_events: expr,
317325
$channel_manager: ident, $process_channel_manager_events: expr,
318326
$onion_messenger: ident, $process_onion_message_handler_events: expr,
319327
$peer_manager: ident, $gossip_sync: ident,
@@ -375,7 +383,12 @@ macro_rules! define_run_body {
375383

376384
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
377385
log_trace!($logger, "Persisting ChannelManager...");
378-
$persister.persist_manager(&$channel_manager)?;
386+
$kv_store.write(
387+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
388+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
389+
CHANNEL_MANAGER_PERSISTENCE_KEY,
390+
&$channel_manager.get_cm().encode(),
391+
)?;
379392
log_trace!($logger, "Done persisting ChannelManager.");
380393
}
381394
if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
@@ -436,7 +449,12 @@ macro_rules! define_run_body {
436449
log_trace!($logger, "Persisting network graph.");
437450
}
438451

439-
if let Err(e) = $persister.persist_graph(network_graph) {
452+
if let Err(e) = $kv_store.write(
453+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
454+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
455+
NETWORK_GRAPH_PERSISTENCE_KEY,
456+
&network_graph.encode(),
457+
) {
440458
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
441459
}
442460

@@ -464,7 +482,12 @@ macro_rules! define_run_body {
464482
} else {
465483
log_trace!($logger, "Persisting scorer");
466484
}
467-
if let Err(e) = $persister.persist_scorer(&scorer) {
485+
if let Err(e) = $kv_store.write(
486+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
487+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
488+
SCORER_PERSISTENCE_KEY,
489+
&scorer.encode(),
490+
) {
468491
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
469492
}
470493
}
@@ -487,16 +510,31 @@ macro_rules! define_run_body {
487510
// After we exit, ensure we persist the ChannelManager one final time - this avoids
488511
// some races where users quit while channel updates were in-flight, with
489512
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
490-
$persister.persist_manager(&$channel_manager)?;
513+
$kv_store.write(
514+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
515+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
516+
CHANNEL_MANAGER_PERSISTENCE_KEY,
517+
&$channel_manager.get_cm().encode(),
518+
)?;
491519

492520
// Persist Scorer on exit
493521
if let Some(ref scorer) = $scorer {
494-
$persister.persist_scorer(&scorer)?;
522+
$kv_store.write(
523+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
524+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
525+
SCORER_PERSISTENCE_KEY,
526+
&scorer.encode(),
527+
)?;
495528
}
496529

497530
// Persist NetworkGraph on exit
498531
if let Some(network_graph) = $gossip_sync.network_graph() {
499-
$persister.persist_graph(network_graph)?;
532+
$kv_store.write(
533+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
534+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
535+
NETWORK_GRAPH_PERSISTENCE_KEY,
536+
&network_graph.encode(),
537+
)?;
500538
}
501539

502540
Ok(())
@@ -684,7 +722,6 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
684722
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
685723
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
686724
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
687-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
688725
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
689726
/// # > {
690727
/// # peer_manager: Arc<PeerManager<B, F, FE, UL>>,
@@ -697,7 +734,7 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
697734
/// # persister: Arc<Store>,
698735
/// # logger: Arc<Logger>,
699736
/// # scorer: Arc<Scorer>,
700-
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<K>, Arc<Logger>, Arc<O>>>,
737+
/// # sweeper: Arc<OutputSweeper<Arc<B>, Arc<D>, Arc<FE>, Arc<F>, Arc<Store>, Arc<Logger>, Arc<O>>>,
701738
/// # }
702739
/// #
703740
/// # async fn setup_background_processing<
@@ -706,9 +743,8 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
706743
/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
707744
/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
708745
/// # D: lightning::sign::ChangeDestinationSource + Send + Sync + 'static,
709-
/// # K: lightning::util::persist::KVStore + Send + Sync + 'static,
710746
/// # O: lightning::sign::OutputSpender + Send + Sync + 'static,
711-
/// # >(node: Node<B, F, FE, UL, D, K, O>) {
747+
/// # >(node: Node<B, F, FE, UL, D, O>) {
712748
/// let background_persister = Arc::clone(&node.persister);
713749
/// let background_event_handler = Arc::clone(&node.event_handler);
714750
/// let background_chain_mon = Arc::clone(&node.chain_monitor);
@@ -780,7 +816,6 @@ pub async fn process_events_async<
780816
P: 'static + Deref,
781817
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
782818
EventHandler: Fn(Event) -> EventHandlerFuture,
783-
PS: 'static + Deref + Send,
784819
ES: 'static + Deref + Send,
785820
M: 'static
786821
+ Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>
@@ -802,7 +837,7 @@ pub async fn process_events_async<
802837
Sleeper: Fn(Duration) -> SleepFuture,
803838
FetchTime: Fn() -> Option<Duration>,
804839
>(
805-
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
840+
kv_store: K, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
806841
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
807842
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
808843
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
@@ -814,7 +849,6 @@ where
814849
F::Target: 'static + FeeEstimator,
815850
L::Target: 'static + Logger,
816851
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
817-
PS::Target: 'static + Persister<'a, CM, L, S>,
818852
ES::Target: 'static + EntropySource,
819853
CM::Target: AChannelManager,
820854
OM::Target: AOnionMessenger,
@@ -830,7 +864,7 @@ where
830864
let event_handler = &event_handler;
831865
let scorer = &scorer;
832866
let logger = &logger;
833-
let persister = &persister;
867+
let kv_store = &kv_store;
834868
let fetch_time = &fetch_time;
835869
// We should be able to drop the Box once our MSRV is 1.68
836870
Box::pin(async move {
@@ -841,7 +875,12 @@ where
841875
if let Some(duration_since_epoch) = fetch_time() {
842876
if update_scorer(scorer, &event, duration_since_epoch) {
843877
log_trace!(logger, "Persisting scorer after update");
844-
if let Err(e) = persister.persist_scorer(&*scorer) {
878+
if let Err(e) = kv_store.write(
879+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
880+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
881+
SCORER_PERSISTENCE_KEY,
882+
&scorer.encode(),
883+
) {
845884
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e);
846885
// We opt not to abort early on persistence failure here as persisting
847886
// the scorer is non-critical and we still hope that it will have
@@ -855,7 +894,7 @@ where
855894
})
856895
};
857896
define_run_body!(
858-
persister,
897+
kv_store,
859898
chain_monitor,
860899
chain_monitor.process_pending_events_async(async_event_handler).await,
861900
channel_manager,
@@ -928,21 +967,21 @@ impl BackgroundProcessor {
928967
/// documentation].
929968
///
930969
/// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
931-
/// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
970+
/// [`KVStore`] returns an error. In case of an error, the error is retrieved by calling
932971
/// either [`join`] or [`stop`].
933972
///
934973
/// # Data Persistence
935974
///
936-
/// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
975+
/// [`KVStore`] is responsible for writing out the [`ChannelManager`] to disk, and/or
937976
/// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
938977
/// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
939978
/// provided implementation.
940979
///
941-
/// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
980+
/// [`KVStore`] is also responsible for writing out the [`NetworkGraph`] to disk, if
942981
/// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
943982
/// See the `lightning-persister` crate for LDK's provided implementation.
944983
///
945-
/// Typically, users should either implement [`Persister::persist_manager`] to never return an
984+
/// Typically, users should either implement [`KVStore`] to never return an
946985
/// error or call [`join`] and handle any error that may arise. For the latter case,
947986
/// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
948987
///
@@ -964,8 +1003,6 @@ impl BackgroundProcessor {
9641003
/// [`stop`]: Self::stop
9651004
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
9661005
/// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
967-
/// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
968-
/// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
9691006
/// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
9701007
/// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
9711008
pub fn start<
@@ -978,7 +1015,6 @@ impl BackgroundProcessor {
9781015
L: 'static + Deref + Send,
9791016
P: 'static + Deref,
9801017
EH: 'static + EventHandler + Send,
981-
PS: 'static + Deref + Send,
9821018
ES: 'static + Deref + Send,
9831019
M: 'static
9841020
+ Deref<
@@ -996,10 +1032,10 @@ impl BackgroundProcessor {
9961032
SC: for<'b> WriteableScore<'b>,
9971033
D: 'static + Deref,
9981034
O: 'static + Deref,
999-
K: 'static + Deref,
1035+
K: 'static + Deref + Send,
10001036
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
10011037
>(
1002-
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
1038+
kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
10031039
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
10041040
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
10051041
) -> Self
@@ -1010,7 +1046,6 @@ impl BackgroundProcessor {
10101046
F::Target: 'static + FeeEstimator,
10111047
L::Target: 'static + Logger,
10121048
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
1013-
PS::Target: 'static + Persister<'a, CM, L, S>,
10141049
ES::Target: 'static + EntropySource,
10151050
CM::Target: AChannelManager,
10161051
OM::Target: AOnionMessenger,
@@ -1035,15 +1070,20 @@ impl BackgroundProcessor {
10351070
.expect("Time should be sometime after 1970");
10361071
if update_scorer(scorer, &event, duration_since_epoch) {
10371072
log_trace!(logger, "Persisting scorer after update");
1038-
if let Err(e) = persister.persist_scorer(&scorer) {
1073+
if let Err(e) = kv_store.write(
1074+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1075+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1076+
SCORER_PERSISTENCE_KEY,
1077+
&scorer.encode(),
1078+
) {
10391079
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
10401080
}
10411081
}
10421082
}
10431083
event_handler.handle_event(event)
10441084
};
10451085
define_run_body!(
1046-
persister,
1086+
kv_store,
10471087
chain_monitor,
10481088
chain_monitor.process_pending_events(&event_handler),
10491089
channel_manager,
@@ -1256,7 +1296,7 @@ mod tests {
12561296
Arc<test_utils::TestBroadcaster>,
12571297
Arc<test_utils::TestFeeEstimator>,
12581298
Arc<test_utils::TestLogger>,
1259-
Arc<FilesystemStore>,
1299+
Arc<Persister>,
12601300
Arc<KeysManager>,
12611301
>;
12621302

@@ -1314,7 +1354,7 @@ mod tests {
13141354
>,
13151355
liquidity_manager: Arc<LM>,
13161356
chain_monitor: Arc<ChainMonitor>,
1317-
kv_store: Arc<FilesystemStore>,
1357+
kv_store: Arc<Persister>,
13181358
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
13191359
network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
13201360
logger: Arc<test_utils::TestLogger>,
@@ -1326,7 +1366,7 @@ mod tests {
13261366
Arc<TestWallet>,
13271367
Arc<test_utils::TestFeeEstimator>,
13281368
Arc<test_utils::TestChainSource>,
1329-
Arc<FilesystemStore>,
1369+
Arc<Persister>,
13301370
Arc<test_utils::TestLogger>,
13311371
Arc<KeysManager>,
13321372
>,
@@ -1418,6 +1458,10 @@ mod tests {
14181458
fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
14191459
Self { scorer_error: Some((error, message)), ..self }
14201460
}
1461+
1462+
pub fn get_data_dir(&self) -> PathBuf {
1463+
self.kv_store.get_data_dir()
1464+
}
14211465
}
14221466

14231467
impl KVStore for Persister {
@@ -1662,7 +1706,7 @@ mod tests {
16621706
));
16631707
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
16641708
let kv_store =
1665-
Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
1709+
Arc::new(Persister::new(format!("{}_persister_{}", &persist_dir, i).into()));
16661710
let now = Duration::from_secs(genesis_block.header.time as u64);
16671711
let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
16681712
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(

lightning/src/ln/channelmanager.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1886,7 +1886,7 @@ where
18861886
/// - Perform any periodic channel and payment checks by calling [`timer_tick_occurred`] roughly
18871887
/// every minute
18881888
/// - Persist to disk whenever [`get_and_clear_needs_persistence`] returns `true` using a
1889-
/// [`Persister`] such as a [`KVStore`] implementation
1889+
/// [`KVStore`] implementation
18901890
/// - Handle [`Event`]s obtained via its [`EventsProvider`] implementation
18911891
///
18921892
/// The [`Future`] returned by [`get_event_or_persistence_needed_future`] is useful in determining
@@ -2468,7 +2468,6 @@ where
24682468
/// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
24692469
/// [`timer_tick_occurred`]: Self::timer_tick_occurred
24702470
/// [`get_and_clear_needs_persistence`]: Self::get_and_clear_needs_persistence
2471-
/// [`Persister`]: crate::util::persist::Persister
24722471
/// [`KVStore`]: crate::util::persist::KVStore
24732472
/// [`get_event_or_persistence_needed_future`]: Self::get_event_or_persistence_needed_future
24742473
/// [`lightning-block-sync`]: https://docs.rs/lightning_block_sync/latest/lightning_block_sync

0 commit comments

Comments
 (0)