Skip to content

Commit 619ec16

Browse files
committed
Update KVStore and KVStoreSync arguments to being owned
This avoids the need for async KVStore implementations to copy data.
1 parent ecce268 commit 619ec16

File tree

6 files changed

+238
-170
lines changed

6 files changed

+238
-170
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 47 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ use std::time::Instant;
8484

8585
#[cfg(not(feature = "std"))]
8686
use alloc::boxed::Box;
87+
#[cfg(not(feature = "std"))]
88+
use alloc::string::ToString;
8789

8890
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
8991
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -415,10 +417,10 @@ macro_rules! define_run_body {
415417
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
416418
log_trace!($logger, "Persisting ChannelManager...");
417419
maybe_await!($async_persist, $kv_store.write(
418-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
419-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
420-
CHANNEL_MANAGER_PERSISTENCE_KEY,
421-
&$channel_manager.get_cm().encode(),
420+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
421+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
422+
CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(),
423+
$channel_manager.get_cm().encode(),
422424
))?;
423425
log_trace!($logger, "Done persisting ChannelManager.");
424426
}
@@ -481,10 +483,10 @@ macro_rules! define_run_body {
481483
}
482484

483485
if let Err(e) = maybe_await!($async_persist, $kv_store.write(
484-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
485-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
486-
NETWORK_GRAPH_PERSISTENCE_KEY,
487-
&network_graph.encode(),
486+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
487+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
488+
NETWORK_GRAPH_PERSISTENCE_KEY.to_string(),
489+
network_graph.encode(),
488490
)) {
489491
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
490492
}
@@ -514,10 +516,10 @@ macro_rules! define_run_body {
514516
log_trace!($logger, "Persisting scorer");
515517
}
516518
if let Err(e) = maybe_await!($async_persist, $kv_store.write(
517-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
518-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
519-
SCORER_PERSISTENCE_KEY,
520-
&scorer.encode(),
519+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
520+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
521+
SCORER_PERSISTENCE_KEY.to_string(),
522+
scorer.encode(),
521523
)) {
522524
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
523525
}
@@ -542,29 +544,29 @@ macro_rules! define_run_body {
542544
// some races where users quit while channel updates were in-flight, with
543545
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
544546
maybe_await!($async_persist, $kv_store.write(
545-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
546-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
547-
CHANNEL_MANAGER_PERSISTENCE_KEY,
548-
&$channel_manager.get_cm().encode(),
547+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
548+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
549+
CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(),
550+
$channel_manager.get_cm().encode(),
549551
))?;
550552

551553
// Persist Scorer on exit
552554
if let Some(ref scorer) = $scorer {
553555
maybe_await!($async_persist, $kv_store.write(
554-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
555-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
556-
SCORER_PERSISTENCE_KEY,
557-
&scorer.encode(),
556+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
557+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
558+
SCORER_PERSISTENCE_KEY.to_string(),
559+
scorer.encode(),
558560
))?;
559561
}
560562

561563
// Persist NetworkGraph on exit
562564
if let Some(network_graph) = $gossip_sync.network_graph() {
563565
maybe_await!($async_persist, $kv_store.write(
564-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
565-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
566-
NETWORK_GRAPH_PERSISTENCE_KEY,
567-
&network_graph.encode(),
566+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
567+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
568+
NETWORK_GRAPH_PERSISTENCE_KEY.to_string(),
569+
network_graph.encode(),
568570
))?;
569571
}
570572

@@ -725,17 +727,17 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
725727
/// # }
726728
/// # struct StoreSync {}
727729
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
728-
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
729-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
730-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
731-
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
730+
/// # fn read(&self, primary_namespace: String, secondary_namespace: String, key: String) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
731+
/// # fn write(&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
732+
/// # fn remove(&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool) -> io::Result<()> { Ok(()) }
733+
/// # fn list(&self, primary_namespace: String, secondary_namespace: String) -> io::Result<Vec<String>> { Ok(Vec::new()) }
732734
/// # }
733735
/// # struct Store {}
734736
/// # impl lightning::util::persist::KVStore for Store {
735-
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
736-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
737-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
738-
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
737+
/// # fn read(&self, primary_namespace: String, secondary_namespace: String, key: String) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
738+
/// # fn write(&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
739+
/// # fn remove(&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
740+
/// # fn list(&self, primary_namespace: String, secondary_namespace: String) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
739741
/// # }
740742
/// # use core::time::Duration;
741743
/// # struct DefaultTimeProvider;
@@ -927,10 +929,10 @@ where
927929
log_trace!(logger, "Persisting scorer after update");
928930
if let Err(e) = kv_store
929931
.write(
930-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
931-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
932-
SCORER_PERSISTENCE_KEY,
933-
&scorer.encode(),
932+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
933+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
934+
SCORER_PERSISTENCE_KEY.to_string(),
935+
scorer.encode(),
934936
)
935937
.await
936938
{
@@ -1206,10 +1208,10 @@ impl BackgroundProcessor {
12061208
if update_scorer(scorer, &event, duration_since_epoch) {
12071209
log_trace!(logger, "Persisting scorer after update");
12081210
if let Err(e) = kv_store.write(
1209-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1210-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1211-
SCORER_PERSISTENCE_KEY,
1212-
&scorer.encode(),
1211+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1212+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1213+
SCORER_PERSISTENCE_KEY.to_string(),
1214+
scorer.encode(),
12131215
) {
12141216
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
12151217
}
@@ -1618,13 +1620,14 @@ mod tests {
16181620

16191621
impl KVStoreSync for Persister {
16201622
fn read(
1621-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
1623+
&self, primary_namespace: String, secondary_namespace: String, key: String,
16221624
) -> lightning::io::Result<Vec<u8>> {
16231625
self.kv_store.read(primary_namespace, secondary_namespace, key)
16241626
}
16251627

16261628
fn write(
1627-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
1629+
&self, primary_namespace: String, secondary_namespace: String, key: String,
1630+
buf: Vec<u8>,
16281631
) -> lightning::io::Result<()> {
16291632
if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
16301633
&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
@@ -1666,13 +1669,13 @@ mod tests {
16661669
}
16671670

16681671
fn remove(
1669-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
1672+
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
16701673
) -> lightning::io::Result<()> {
16711674
self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
16721675
}
16731676

16741677
fn list(
1675-
&self, primary_namespace: &str, secondary_namespace: &str,
1678+
&self, primary_namespace: String, secondary_namespace: String,
16761679
) -> lightning::io::Result<Vec<String>> {
16771680
self.kv_store.list(primary_namespace, secondary_namespace)
16781681
}

lightning-persister/src/fs_store.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,12 @@ impl FilesystemStore {
9898

9999
impl KVStoreSync for FilesystemStore {
100100
fn read(
101-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
101+
&self, primary_namespace: String, secondary_namespace: String, key: String,
102102
) -> lightning::io::Result<Vec<u8>> {
103-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
103+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
104104

105-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
105+
let mut dest_file_path =
106+
self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
106107
dest_file_path.push(key);
107108

108109
let mut buf = Vec::new();
@@ -123,11 +124,17 @@ impl KVStoreSync for FilesystemStore {
123124
}
124125

125126
fn write(
126-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
127+
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
127128
) -> lightning::io::Result<()> {
128-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
129-
130-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
129+
check_namespace_key_validity(
130+
&primary_namespace,
131+
&secondary_namespace,
132+
Some(&key),
133+
"write",
134+
)?;
135+
136+
let mut dest_file_path =
137+
self.get_dest_dir_path(primary_namespace.as_str(), secondary_namespace.as_str())?;
131138
dest_file_path.push(key);
132139

133140
let parent_directory = dest_file_path.parent().ok_or_else(|| {
@@ -210,11 +217,17 @@ impl KVStoreSync for FilesystemStore {
210217
}
211218

212219
fn remove(
213-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
220+
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
214221
) -> lightning::io::Result<()> {
215-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
216-
217-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
222+
check_namespace_key_validity(
223+
&primary_namespace,
224+
&secondary_namespace,
225+
Some(&key),
226+
"remove",
227+
)?;
228+
229+
let mut dest_file_path =
230+
self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
218231
dest_file_path.push(key);
219232

220233
if !dest_file_path.is_file() {
@@ -305,11 +318,11 @@ impl KVStoreSync for FilesystemStore {
305318
}
306319

307320
fn list(
308-
&self, primary_namespace: &str, secondary_namespace: &str,
321+
&self, primary_namespace: String, secondary_namespace: String,
309322
) -> lightning::io::Result<Vec<String>> {
310-
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
323+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
311324

312-
let prefixed_dest = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
325+
let prefixed_dest = self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
313326

314327
if !Path::new(&prefixed_dest).exists() {
315328
return Ok(Vec::new());

lightning-persister/src/test_utils.rs

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,60 +13,90 @@ use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event
1313
use std::panic::RefUnwindSafe;
1414

1515
pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(kv_store: &K) {
16-
let data = [42u8; 32];
16+
let data = vec![42u8; 32];
1717

1818
let primary_namespace = "testspace";
1919
let secondary_namespace = "testsubspace";
2020
let key = "testkey";
2121

2222
// Test the basic KVStore operations.
23-
kv_store.write(primary_namespace, secondary_namespace, key, &data).unwrap();
23+
kv_store
24+
.write(
25+
primary_namespace.to_string(),
26+
secondary_namespace.to_string(),
27+
key.to_string(),
28+
data.clone(),
29+
)
30+
.unwrap();
2431

2532
// Test empty primary_namespace/secondary_namespace is allowed, but not empty primary_namespace
2633
// and non-empty secondary_namespace, and not empty key.
27-
kv_store.write("", "", key, &data).unwrap();
28-
let res = std::panic::catch_unwind(|| kv_store.write("", secondary_namespace, key, &data));
34+
kv_store.write("".to_string(), "".to_string(), key.to_string(), data.clone()).unwrap();
35+
let res = std::panic::catch_unwind(|| {
36+
kv_store.write(
37+
"".to_string(),
38+
secondary_namespace.to_string(),
39+
key.to_string(),
40+
data.clone(),
41+
)
42+
});
2943
assert!(res.is_err());
3044
let res = std::panic::catch_unwind(|| {
31-
kv_store.write(primary_namespace, secondary_namespace, "", &data)
45+
kv_store.write(
46+
primary_namespace.to_string(),
47+
secondary_namespace.to_string(),
48+
"".to_string(),
49+
data.clone(),
50+
)
3251
});
3352
assert!(res.is_err());
3453

35-
let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap();
54+
let listed_keys =
55+
kv_store.list(primary_namespace.to_string(), secondary_namespace.to_string()).unwrap();
3656
assert_eq!(listed_keys.len(), 1);
3757
assert_eq!(listed_keys[0], key);
3858

39-
let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap();
59+
let read_data = kv_store
60+
.read(primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string())
61+
.unwrap();
4062
assert_eq!(data, &*read_data);
4163

42-
kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap();
64+
kv_store
65+
.remove(
66+
primary_namespace.to_string(),
67+
secondary_namespace.to_string(),
68+
key.to_string(),
69+
false,
70+
)
71+
.unwrap();
4372

44-
let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap();
73+
let listed_keys =
74+
kv_store.list(primary_namespace.to_string(), secondary_namespace.to_string()).unwrap();
4575
assert_eq!(listed_keys.len(), 0);
4676

4777
// Ensure we have no issue operating with primary_namespace/secondary_namespace/key being
4878
// KVSTORE_NAMESPACE_KEY_MAX_LEN
4979
let max_chars = "A".repeat(KVSTORE_NAMESPACE_KEY_MAX_LEN);
50-
kv_store.write(&max_chars, &max_chars, &max_chars, &data).unwrap();
80+
kv_store.write(max_chars.clone(), max_chars.clone(), max_chars.clone(), data.clone()).unwrap();
5181

52-
let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
82+
let listed_keys = kv_store.list(max_chars.clone(), max_chars.clone()).unwrap();
5383
assert_eq!(listed_keys.len(), 1);
5484
assert_eq!(listed_keys[0], max_chars);
5585

56-
let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
86+
let read_data = kv_store.read(max_chars.clone(), max_chars.clone(), max_chars.clone()).unwrap();
5787
assert_eq!(data, &*read_data);
5888

59-
kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap();
89+
kv_store.remove(max_chars.clone(), max_chars.clone(), max_chars.clone(), false).unwrap();
6090

61-
let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
91+
let listed_keys = kv_store.list(max_chars.clone(), max_chars.clone()).unwrap();
6292
assert_eq!(listed_keys.len(), 0);
6393
}
6494

6595
pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>(
6696
source_store: &mut S, target_store: &mut T,
6797
) {
6898
// We fill the source with some bogus keys.
69-
let dummy_data = [42u8; 32];
99+
let dummy_data = vec![42u8; 32];
70100
let num_primary_namespaces = 3;
71101
let num_secondary_namespaces = 3;
72102
let num_keys = 3;
@@ -87,7 +117,12 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
87117
let key =
88118
format!("testkey{}", KVSTORE_NAMESPACE_KEY_ALPHABET.chars().nth(k).unwrap());
89119
source_store
90-
.write(&primary_namespace, &secondary_namespace, &key, &dummy_data)
120+
.write(
121+
primary_namespace.clone(),
122+
secondary_namespace.clone(),
123+
key.clone(),
124+
dummy_data.clone(),
125+
)
91126
.unwrap();
92127
expected_keys.push((primary_namespace.clone(), secondary_namespace.clone(), key));
93128
}
@@ -107,7 +142,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
107142
assert_eq!(target_list, expected_keys);
108143

109144
for (p, s, k) in expected_keys.iter() {
110-
assert_eq!(target_store.read(p, s, k).unwrap(), dummy_data);
145+
assert_eq!(target_store.read(p.clone(), s.clone(), k.clone()).unwrap(), dummy_data);
111146
}
112147
}
113148

0 commit comments

Comments
 (0)