Skip to content

Commit 2329565

Browse files
committed
Update KVStore and KVStoreSync arguments to being owned
This avoids the need for async KVStore implementations to copy data.
1 parent ecce268 commit 2329565

File tree

6 files changed

+236
-170
lines changed

6 files changed

+236
-170
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 45 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -415,10 +415,10 @@ macro_rules! define_run_body {
415415
if $channel_manager.get_cm().get_and_clear_needs_persistence() {
416416
log_trace!($logger, "Persisting ChannelManager...");
417417
maybe_await!($async_persist, $kv_store.write(
418-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
419-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
420-
CHANNEL_MANAGER_PERSISTENCE_KEY,
421-
&$channel_manager.get_cm().encode(),
418+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
419+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
420+
CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(),
421+
$channel_manager.get_cm().encode(),
422422
))?;
423423
log_trace!($logger, "Done persisting ChannelManager.");
424424
}
@@ -481,10 +481,10 @@ macro_rules! define_run_body {
481481
}
482482

483483
if let Err(e) = maybe_await!($async_persist, $kv_store.write(
484-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
485-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
486-
NETWORK_GRAPH_PERSISTENCE_KEY,
487-
&network_graph.encode(),
484+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
485+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
486+
NETWORK_GRAPH_PERSISTENCE_KEY.to_string(),
487+
network_graph.encode(),
488488
)) {
489489
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
490490
}
@@ -514,10 +514,10 @@ macro_rules! define_run_body {
514514
log_trace!($logger, "Persisting scorer");
515515
}
516516
if let Err(e) = maybe_await!($async_persist, $kv_store.write(
517-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
518-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
519-
SCORER_PERSISTENCE_KEY,
520-
&scorer.encode(),
517+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
518+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
519+
SCORER_PERSISTENCE_KEY.to_string(),
520+
scorer.encode(),
521521
)) {
522522
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
523523
}
@@ -542,29 +542,29 @@ macro_rules! define_run_body {
542542
// some races where users quit while channel updates were in-flight, with
543543
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
544544
maybe_await!($async_persist, $kv_store.write(
545-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
546-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
547-
CHANNEL_MANAGER_PERSISTENCE_KEY,
548-
&$channel_manager.get_cm().encode(),
545+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
546+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
547+
CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(),
548+
$channel_manager.get_cm().encode(),
549549
))?;
550550

551551
// Persist Scorer on exit
552552
if let Some(ref scorer) = $scorer {
553553
maybe_await!($async_persist, $kv_store.write(
554-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
555-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
556-
SCORER_PERSISTENCE_KEY,
557-
&scorer.encode(),
554+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
555+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
556+
SCORER_PERSISTENCE_KEY.to_string(),
557+
scorer.encode(),
558558
))?;
559559
}
560560

561561
// Persist NetworkGraph on exit
562562
if let Some(network_graph) = $gossip_sync.network_graph() {
563563
maybe_await!($async_persist, $kv_store.write(
564-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
565-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
566-
NETWORK_GRAPH_PERSISTENCE_KEY,
567-
&network_graph.encode(),
564+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
565+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
566+
NETWORK_GRAPH_PERSISTENCE_KEY.to_string(),
567+
network_graph.encode(),
568568
))?;
569569
}
570570

@@ -725,17 +725,17 @@ use futures_util::{dummy_waker, OptionalSelector, Selector, SelectorOutput};
725725
/// # }
726726
/// # struct StoreSync {}
727727
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
728-
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
729-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
730-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
731-
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
728+
/// # fn read(&self, primary_namespace: String, secondary_namespace: String, key: String) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
729+
/// # fn write(&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
730+
/// # fn remove(&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool) -> io::Result<()> { Ok(()) }
731+
/// # fn list(&self, primary_namespace: String, secondary_namespace: String) -> io::Result<Vec<String>> { Ok(Vec::new()) }
732732
/// # }
733733
/// # struct Store {}
734734
/// # impl lightning::util::persist::KVStore for Store {
735-
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
736-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
737-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
738-
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
735+
/// # fn read(&self, primary_namespace: String, secondary_namespace: String, key: String) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
736+
/// # fn write(&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
737+
/// # fn remove(&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
738+
/// # fn list(&self, primary_namespace: String, secondary_namespace: String) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
739739
/// # }
740740
/// # use core::time::Duration;
741741
/// # struct DefaultTimeProvider;
@@ -927,10 +927,10 @@ where
927927
log_trace!(logger, "Persisting scorer after update");
928928
if let Err(e) = kv_store
929929
.write(
930-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
931-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
932-
SCORER_PERSISTENCE_KEY,
933-
&scorer.encode(),
930+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
931+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
932+
SCORER_PERSISTENCE_KEY.to_string(),
933+
scorer.encode(),
934934
)
935935
.await
936936
{
@@ -1206,10 +1206,10 @@ impl BackgroundProcessor {
12061206
if update_scorer(scorer, &event, duration_since_epoch) {
12071207
log_trace!(logger, "Persisting scorer after update");
12081208
if let Err(e) = kv_store.write(
1209-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1210-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1211-
SCORER_PERSISTENCE_KEY,
1212-
&scorer.encode(),
1209+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1210+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1211+
SCORER_PERSISTENCE_KEY.to_string(),
1212+
scorer.encode(),
12131213
) {
12141214
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
12151215
}
@@ -1618,13 +1618,14 @@ mod tests {
16181618

16191619
impl KVStoreSync for Persister {
16201620
fn read(
1621-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
1621+
&self, primary_namespace: String, secondary_namespace: String, key: String,
16221622
) -> lightning::io::Result<Vec<u8>> {
16231623
self.kv_store.read(primary_namespace, secondary_namespace, key)
16241624
}
16251625

16261626
fn write(
1627-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
1627+
&self, primary_namespace: String, secondary_namespace: String, key: String,
1628+
buf: Vec<u8>,
16281629
) -> lightning::io::Result<()> {
16291630
if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
16301631
&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
@@ -1666,13 +1667,13 @@ mod tests {
16661667
}
16671668

16681669
fn remove(
1669-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
1670+
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
16701671
) -> lightning::io::Result<()> {
16711672
self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
16721673
}
16731674

16741675
fn list(
1675-
&self, primary_namespace: &str, secondary_namespace: &str,
1676+
&self, primary_namespace: String, secondary_namespace: String,
16761677
) -> lightning::io::Result<Vec<String>> {
16771678
self.kv_store.list(primary_namespace, secondary_namespace)
16781679
}

lightning-persister/src/fs_store.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,12 @@ impl FilesystemStore {
9898

9999
impl KVStoreSync for FilesystemStore {
100100
fn read(
101-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
101+
&self, primary_namespace: String, secondary_namespace: String, key: String,
102102
) -> lightning::io::Result<Vec<u8>> {
103-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
103+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
104104

105-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
105+
let mut dest_file_path =
106+
self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
106107
dest_file_path.push(key);
107108

108109
let mut buf = Vec::new();
@@ -123,11 +124,17 @@ impl KVStoreSync for FilesystemStore {
123124
}
124125

125126
fn write(
126-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
127+
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
127128
) -> lightning::io::Result<()> {
128-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
129-
130-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
129+
check_namespace_key_validity(
130+
&primary_namespace,
131+
&secondary_namespace,
132+
Some(&key),
133+
"write",
134+
)?;
135+
136+
let mut dest_file_path =
137+
self.get_dest_dir_path(primary_namespace.as_str(), secondary_namespace.as_str())?;
131138
dest_file_path.push(key);
132139

133140
let parent_directory = dest_file_path.parent().ok_or_else(|| {
@@ -210,11 +217,17 @@ impl KVStoreSync for FilesystemStore {
210217
}
211218

212219
fn remove(
213-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
220+
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
214221
) -> lightning::io::Result<()> {
215-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
216-
217-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
222+
check_namespace_key_validity(
223+
&primary_namespace,
224+
&secondary_namespace,
225+
Some(&key),
226+
"remove",
227+
)?;
228+
229+
let mut dest_file_path =
230+
self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
218231
dest_file_path.push(key);
219232

220233
if !dest_file_path.is_file() {
@@ -305,11 +318,11 @@ impl KVStoreSync for FilesystemStore {
305318
}
306319

307320
fn list(
308-
&self, primary_namespace: &str, secondary_namespace: &str,
321+
&self, primary_namespace: String, secondary_namespace: String,
309322
) -> lightning::io::Result<Vec<String>> {
310-
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
323+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
311324

312-
let prefixed_dest = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
325+
let prefixed_dest = self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
313326

314327
if !Path::new(&prefixed_dest).exists() {
315328
return Ok(Vec::new());

lightning-persister/src/test_utils.rs

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,60 +13,90 @@ use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event
1313
use std::panic::RefUnwindSafe;
1414

1515
pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(kv_store: &K) {
16-
let data = [42u8; 32];
16+
let data = vec![42u8; 32];
1717

1818
let primary_namespace = "testspace";
1919
let secondary_namespace = "testsubspace";
2020
let key = "testkey";
2121

2222
// Test the basic KVStore operations.
23-
kv_store.write(primary_namespace, secondary_namespace, key, &data).unwrap();
23+
kv_store
24+
.write(
25+
primary_namespace.to_string(),
26+
secondary_namespace.to_string(),
27+
key.to_string(),
28+
data.clone(),
29+
)
30+
.unwrap();
2431

2532
// Test empty primary_namespace/secondary_namespace is allowed, but not empty primary_namespace
2633
// and non-empty secondary_namespace, and not empty key.
27-
kv_store.write("", "", key, &data).unwrap();
28-
let res = std::panic::catch_unwind(|| kv_store.write("", secondary_namespace, key, &data));
34+
kv_store.write("".to_string(), "".to_string(), key.to_string(), data.clone()).unwrap();
35+
let res = std::panic::catch_unwind(|| {
36+
kv_store.write(
37+
"".to_string(),
38+
secondary_namespace.to_string(),
39+
key.to_string(),
40+
data.clone(),
41+
)
42+
});
2943
assert!(res.is_err());
3044
let res = std::panic::catch_unwind(|| {
31-
kv_store.write(primary_namespace, secondary_namespace, "", &data)
45+
kv_store.write(
46+
primary_namespace.to_string(),
47+
secondary_namespace.to_string(),
48+
"".to_string(),
49+
data.clone(),
50+
)
3251
});
3352
assert!(res.is_err());
3453

35-
let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap();
54+
let listed_keys =
55+
kv_store.list(primary_namespace.to_string(), secondary_namespace.to_string()).unwrap();
3656
assert_eq!(listed_keys.len(), 1);
3757
assert_eq!(listed_keys[0], key);
3858

39-
let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap();
59+
let read_data = kv_store
60+
.read(primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string())
61+
.unwrap();
4062
assert_eq!(data, &*read_data);
4163

42-
kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap();
64+
kv_store
65+
.remove(
66+
primary_namespace.to_string(),
67+
secondary_namespace.to_string(),
68+
key.to_string(),
69+
false,
70+
)
71+
.unwrap();
4372

44-
let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap();
73+
let listed_keys =
74+
kv_store.list(primary_namespace.to_string(), secondary_namespace.to_string()).unwrap();
4575
assert_eq!(listed_keys.len(), 0);
4676

4777
// Ensure we have no issue operating with primary_namespace/secondary_namespace/key being
4878
// KVSTORE_NAMESPACE_KEY_MAX_LEN
4979
let max_chars = "A".repeat(KVSTORE_NAMESPACE_KEY_MAX_LEN);
50-
kv_store.write(&max_chars, &max_chars, &max_chars, &data).unwrap();
80+
kv_store.write(max_chars.clone(), max_chars.clone(), max_chars.clone(), data.clone()).unwrap();
5181

52-
let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
82+
let listed_keys = kv_store.list(max_chars.clone(), max_chars.clone()).unwrap();
5383
assert_eq!(listed_keys.len(), 1);
5484
assert_eq!(listed_keys[0], max_chars);
5585

56-
let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
86+
let read_data = kv_store.read(max_chars.clone(), max_chars.clone(), max_chars.clone()).unwrap();
5787
assert_eq!(data, &*read_data);
5888

59-
kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap();
89+
kv_store.remove(max_chars.clone(), max_chars.clone(), max_chars.clone(), false).unwrap();
6090

61-
let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
91+
let listed_keys = kv_store.list(max_chars.clone(), max_chars.clone()).unwrap();
6292
assert_eq!(listed_keys.len(), 0);
6393
}
6494

6595
pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>(
6696
source_store: &mut S, target_store: &mut T,
6797
) {
6898
// We fill the source with some bogus keys.
69-
let dummy_data = [42u8; 32];
99+
let dummy_data = vec![42u8; 32];
70100
let num_primary_namespaces = 3;
71101
let num_secondary_namespaces = 3;
72102
let num_keys = 3;
@@ -87,7 +117,12 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
87117
let key =
88118
format!("testkey{}", KVSTORE_NAMESPACE_KEY_ALPHABET.chars().nth(k).unwrap());
89119
source_store
90-
.write(&primary_namespace, &secondary_namespace, &key, &dummy_data)
120+
.write(
121+
primary_namespace.clone(),
122+
secondary_namespace.clone(),
123+
key.clone(),
124+
dummy_data.clone(),
125+
)
91126
.unwrap();
92127
expected_keys.push((primary_namespace.clone(), secondary_namespace.clone(), key));
93128
}
@@ -107,7 +142,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
107142
assert_eq!(target_list, expected_keys);
108143

109144
for (p, s, k) in expected_keys.iter() {
110-
assert_eq!(target_store.read(p, s, k).unwrap(), dummy_data);
145+
assert_eq!(target_store.read(p.clone(), s.clone(), k.clone()).unwrap(), dummy_data);
111146
}
112147
}
113148

0 commit comments

Comments
 (0)