Skip to content

Commit daf19e3

Browse files
committed
Update KVStore and KVStoreSync arguments to being owned
This avoids the need for async KVStore implementations to copy data.
1 parent 664511b commit daf19e3

File tree

6 files changed

+262
-194
lines changed

6 files changed

+262
-194
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 71 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ use std::time::Instant;
8484

8585
#[cfg(not(feature = "std"))]
8686
use alloc::boxed::Box;
87+
#[cfg(not(feature = "std"))]
88+
use alloc::string::ToString;
8789

8890
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
8991
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -592,17 +594,17 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
592594
/// # }
593595
/// # struct StoreSync {}
594596
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
595-
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
596-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
597-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
598-
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
597+
/// # fn read(&self, primary_namespace: String, secondary_namespace: String, key: String) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
598+
/// # fn write(&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
599+
/// # fn remove(&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool) -> io::Result<()> { Ok(()) }
600+
/// # fn list(&self, primary_namespace: String, secondary_namespace: String) -> io::Result<Vec<String>> { Ok(Vec::new()) }
599601
/// # }
600602
/// # struct Store {}
601603
/// # impl lightning::util::persist::KVStore for Store {
602-
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
603-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
604-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
605-
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
604+
/// # fn read(&self, primary_namespace: String, secondary_namespace: String, key: String) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
605+
/// # fn write(&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
606+
/// # fn remove(&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
607+
/// # fn list(&self, primary_namespace: String, secondary_namespace: String) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
606608
/// # }
607609
/// # use core::time::Duration;
608610
/// # struct DefaultTimeProvider;
@@ -793,10 +795,10 @@ where
793795
log_trace!(logger, "Persisting scorer after update");
794796
if let Err(e) = kv_store
795797
.write(
796-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
797-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
798-
SCORER_PERSISTENCE_KEY,
799-
&scorer.encode(),
798+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
799+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
800+
SCORER_PERSISTENCE_KEY.to_string(),
801+
scorer.encode(),
800802
)
801803
.await
802804
{
@@ -929,10 +931,10 @@ where
929931
let fut = async {
930932
kv_store
931933
.write(
932-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
933-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
934-
CHANNEL_MANAGER_PERSISTENCE_KEY,
935-
&channel_manager.get_cm().encode(),
934+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
935+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
936+
CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(),
937+
channel_manager.get_cm().encode(),
936938
)
937939
.await
938940
};
@@ -974,10 +976,10 @@ where
974976
let fut = async {
975977
if let Err(e) = kv_store
976978
.write(
977-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
978-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
979-
NETWORK_GRAPH_PERSISTENCE_KEY,
980-
&network_graph.encode(),
979+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
980+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
981+
NETWORK_GRAPH_PERSISTENCE_KEY.to_string(),
982+
network_graph.encode(),
981983
)
982984
.await
983985
{
@@ -1017,10 +1019,10 @@ where
10171019
let fut = async {
10181020
if let Err(e) = kv_store
10191021
.write(
1020-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1021-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1022-
SCORER_PERSISTENCE_KEY,
1023-
&scorer.encode(),
1022+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1023+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1024+
SCORER_PERSISTENCE_KEY.to_string(),
1025+
scorer.encode(),
10241026
)
10251027
.await
10261028
{
@@ -1125,29 +1127,29 @@ where
11251127
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
11261128
kv_store
11271129
.write(
1128-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1129-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1130-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1131-
&channel_manager.get_cm().encode(),
1130+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1131+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1132+
CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(),
1133+
channel_manager.get_cm().encode(),
11321134
)
11331135
.await?;
11341136
if let Some(ref scorer) = scorer {
11351137
kv_store
11361138
.write(
1137-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1138-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1139-
SCORER_PERSISTENCE_KEY,
1140-
&scorer.encode(),
1139+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1140+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1141+
SCORER_PERSISTENCE_KEY.to_string(),
1142+
scorer.encode(),
11411143
)
11421144
.await?;
11431145
}
11441146
if let Some(network_graph) = gossip_sync.network_graph() {
11451147
kv_store
11461148
.write(
1147-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1148-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1149-
NETWORK_GRAPH_PERSISTENCE_KEY,
1150-
&network_graph.encode(),
1149+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1150+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1151+
NETWORK_GRAPH_PERSISTENCE_KEY.to_string(),
1152+
network_graph.encode(),
11511153
)
11521154
.await?;
11531155
}
@@ -1349,10 +1351,10 @@ impl BackgroundProcessor {
13491351
if update_scorer(scorer, &event, duration_since_epoch) {
13501352
log_trace!(logger, "Persisting scorer after update");
13511353
if let Err(e) = kv_store.write(
1352-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1353-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1354-
SCORER_PERSISTENCE_KEY,
1355-
&scorer.encode(),
1354+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1355+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1356+
SCORER_PERSISTENCE_KEY.to_string(),
1357+
scorer.encode(),
13561358
) {
13571359
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
13581360
}
@@ -1449,10 +1451,10 @@ impl BackgroundProcessor {
14491451
if channel_manager.get_cm().get_and_clear_needs_persistence() {
14501452
log_trace!(logger, "Persisting ChannelManager...");
14511453
(kv_store.write(
1452-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1453-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1454-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1455-
&channel_manager.get_cm().encode(),
1454+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1455+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1456+
CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(),
1457+
channel_manager.get_cm().encode(),
14561458
))?;
14571459
log_trace!(logger, "Done persisting ChannelManager.");
14581460
}
@@ -1481,10 +1483,10 @@ impl BackgroundProcessor {
14811483
duration_since_epoch.as_secs(),
14821484
);
14831485
if let Err(e) = kv_store.write(
1484-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1485-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1486-
NETWORK_GRAPH_PERSISTENCE_KEY,
1487-
&network_graph.encode(),
1486+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1487+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1488+
NETWORK_GRAPH_PERSISTENCE_KEY.to_string(),
1489+
network_graph.encode(),
14881490
) {
14891491
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
14901492
}
@@ -1510,10 +1512,10 @@ impl BackgroundProcessor {
15101512
log_trace!(logger, "Calling time_passed and persisting scorer");
15111513
scorer.write_lock().time_passed(duration_since_epoch);
15121514
if let Err(e) = kv_store.write(
1513-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1514-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1515-
SCORER_PERSISTENCE_KEY,
1516-
&scorer.encode(),
1515+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1516+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1517+
SCORER_PERSISTENCE_KEY.to_string(),
1518+
scorer.encode(),
15171519
) {
15181520
log_error!(logger,
15191521
"Error: Failed to persist scorer, check your disk and permissions {}",
@@ -1553,25 +1555,25 @@ impl BackgroundProcessor {
15531555
// some races where users quit while channel updates were in-flight, with
15541556
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
15551557
kv_store.write(
1556-
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
1557-
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
1558-
CHANNEL_MANAGER_PERSISTENCE_KEY,
1559-
&channel_manager.get_cm().encode(),
1558+
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1559+
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1560+
CHANNEL_MANAGER_PERSISTENCE_KEY.to_string(),
1561+
channel_manager.get_cm().encode(),
15601562
)?;
15611563
if let Some(ref scorer) = scorer {
15621564
kv_store.write(
1563-
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
1564-
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
1565-
SCORER_PERSISTENCE_KEY,
1566-
&scorer.encode(),
1565+
SCORER_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1566+
SCORER_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1567+
SCORER_PERSISTENCE_KEY.to_string(),
1568+
scorer.encode(),
15671569
)?;
15681570
}
15691571
if let Some(network_graph) = gossip_sync.network_graph() {
15701572
kv_store.write(
1571-
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
1572-
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
1573-
NETWORK_GRAPH_PERSISTENCE_KEY,
1574-
&network_graph.encode(),
1573+
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
1574+
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
1575+
NETWORK_GRAPH_PERSISTENCE_KEY.to_string(),
1576+
network_graph.encode(),
15751577
)?;
15761578
}
15771579
Ok(())
@@ -1910,13 +1912,14 @@ mod tests {
19101912

19111913
impl KVStoreSync for Persister {
19121914
fn read(
1913-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
1915+
&self, primary_namespace: String, secondary_namespace: String, key: String,
19141916
) -> lightning::io::Result<Vec<u8>> {
19151917
self.kv_store.read(primary_namespace, secondary_namespace, key)
19161918
}
19171919

19181920
fn write(
1919-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
1921+
&self, primary_namespace: String, secondary_namespace: String, key: String,
1922+
buf: Vec<u8>,
19201923
) -> lightning::io::Result<()> {
19211924
if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
19221925
&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
@@ -1958,13 +1961,13 @@ mod tests {
19581961
}
19591962

19601963
fn remove(
1961-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
1964+
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
19621965
) -> lightning::io::Result<()> {
19631966
self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
19641967
}
19651968

19661969
fn list(
1967-
&self, primary_namespace: &str, secondary_namespace: &str,
1970+
&self, primary_namespace: String, secondary_namespace: String,
19681971
) -> lightning::io::Result<Vec<String>> {
19691972
self.kv_store.list(primary_namespace, secondary_namespace)
19701973
}

lightning-persister/src/fs_store.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,12 @@ impl FilesystemStore {
9898

9999
impl KVStoreSync for FilesystemStore {
100100
fn read(
101-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
101+
&self, primary_namespace: String, secondary_namespace: String, key: String,
102102
) -> lightning::io::Result<Vec<u8>> {
103-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?;
103+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?;
104104

105-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
105+
let mut dest_file_path =
106+
self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
106107
dest_file_path.push(key);
107108

108109
let mut buf = Vec::new();
@@ -123,11 +124,17 @@ impl KVStoreSync for FilesystemStore {
123124
}
124125

125126
fn write(
126-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
127+
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
127128
) -> lightning::io::Result<()> {
128-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
129-
130-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
129+
check_namespace_key_validity(
130+
&primary_namespace,
131+
&secondary_namespace,
132+
Some(&key),
133+
"write",
134+
)?;
135+
136+
let mut dest_file_path =
137+
self.get_dest_dir_path(primary_namespace.as_str(), secondary_namespace.as_str())?;
131138
dest_file_path.push(key);
132139

133140
let parent_directory = dest_file_path.parent().ok_or_else(|| {
@@ -210,11 +217,17 @@ impl KVStoreSync for FilesystemStore {
210217
}
211218

212219
fn remove(
213-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
220+
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
214221
) -> lightning::io::Result<()> {
215-
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?;
216-
217-
let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
222+
check_namespace_key_validity(
223+
&primary_namespace,
224+
&secondary_namespace,
225+
Some(&key),
226+
"remove",
227+
)?;
228+
229+
let mut dest_file_path =
230+
self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
218231
dest_file_path.push(key);
219232

220233
if !dest_file_path.is_file() {
@@ -305,11 +318,11 @@ impl KVStoreSync for FilesystemStore {
305318
}
306319

307320
fn list(
308-
&self, primary_namespace: &str, secondary_namespace: &str,
321+
&self, primary_namespace: String, secondary_namespace: String,
309322
) -> lightning::io::Result<Vec<String>> {
310-
check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?;
323+
check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?;
311324

312-
let prefixed_dest = self.get_dest_dir_path(primary_namespace, secondary_namespace)?;
325+
let prefixed_dest = self.get_dest_dir_path(&primary_namespace, &secondary_namespace)?;
313326

314327
if !Path::new(&prefixed_dest).exists() {
315328
return Ok(Vec::new());

0 commit comments

Comments
 (0)