Skip to content

Commit 23a1eb7

Browse files
committed
Update KVStore and KVStoreSync write data to being owned
This avoids the need for async KVStore implementations to copy data.
1 parent 664511b commit 23a1eb7

File tree

6 files changed

+42
-41
lines changed

6 files changed

+42
-41
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -593,14 +593,14 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
593593
/// # struct StoreSync {}
594594
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
595595
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
596-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
596+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
597597
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
598598
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
599599
/// # }
600600
/// # struct Store {}
601601
/// # impl lightning::util::persist::KVStore for Store {
602602
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
603-
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
603+
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
604604
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
605605
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
606606
/// # }
@@ -796,7 +796,7 @@ where
796796
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
797797
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
798798
SCORER_PERSISTENCE_KEY,
799-
&scorer.encode(),
799+
scorer.encode(),
800800
)
801801
.await
802802
{
@@ -932,7 +932,7 @@ where
932932
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
933933
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
934934
CHANNEL_MANAGER_PERSISTENCE_KEY,
935-
&channel_manager.get_cm().encode(),
935+
channel_manager.get_cm().encode(),
936936
)
937937
.await
938938
};
@@ -977,7 +977,7 @@ where
977977
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
978978
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
979979
NETWORK_GRAPH_PERSISTENCE_KEY,
980-
&network_graph.encode(),
980+
network_graph.encode(),
981981
)
982982
.await
983983
{
@@ -1020,7 +1020,7 @@ where
10201020
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
10211021
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
10221022
SCORER_PERSISTENCE_KEY,
1023-
&scorer.encode(),
1023+
scorer.encode(),
10241024
)
10251025
.await
10261026
{
@@ -1128,7 +1128,7 @@ where
11281128
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
11291129
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
11301130
CHANNEL_MANAGER_PERSISTENCE_KEY,
1131-
&channel_manager.get_cm().encode(),
1131+
channel_manager.get_cm().encode(),
11321132
)
11331133
.await?;
11341134
if let Some(ref scorer) = scorer {
@@ -1137,7 +1137,7 @@ where
11371137
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
11381138
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
11391139
SCORER_PERSISTENCE_KEY,
1140-
&scorer.encode(),
1140+
scorer.encode(),
11411141
)
11421142
.await?;
11431143
}
@@ -1147,7 +1147,7 @@ where
11471147
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
11481148
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
11491149
NETWORK_GRAPH_PERSISTENCE_KEY,
1150-
&network_graph.encode(),
1150+
network_graph.encode(),
11511151
)
11521152
.await?;
11531153
}
@@ -1352,7 +1352,7 @@ impl BackgroundProcessor {
13521352
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
13531353
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
13541354
SCORER_PERSISTENCE_KEY,
1355-
&scorer.encode(),
1355+
scorer.encode(),
13561356
) {
13571357
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
13581358
}
@@ -1452,7 +1452,7 @@ impl BackgroundProcessor {
14521452
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
14531453
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
14541454
CHANNEL_MANAGER_PERSISTENCE_KEY,
1455-
&channel_manager.get_cm().encode(),
1455+
channel_manager.get_cm().encode(),
14561456
))?;
14571457
log_trace!(logger, "Done persisting ChannelManager.");
14581458
}
@@ -1484,7 +1484,7 @@ impl BackgroundProcessor {
14841484
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
14851485
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
14861486
NETWORK_GRAPH_PERSISTENCE_KEY,
1487-
&network_graph.encode(),
1487+
network_graph.encode(),
14881488
) {
14891489
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
14901490
}
@@ -1513,7 +1513,7 @@ impl BackgroundProcessor {
15131513
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
15141514
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
15151515
SCORER_PERSISTENCE_KEY,
1516-
&scorer.encode(),
1516+
scorer.encode(),
15171517
) {
15181518
log_error!(logger,
15191519
"Error: Failed to persist scorer, check your disk and permissions {}",
@@ -1556,22 +1556,22 @@ impl BackgroundProcessor {
15561556
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
15571557
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
15581558
CHANNEL_MANAGER_PERSISTENCE_KEY,
1559-
&channel_manager.get_cm().encode(),
1559+
channel_manager.get_cm().encode(),
15601560
)?;
15611561
if let Some(ref scorer) = scorer {
15621562
kv_store.write(
15631563
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
15641564
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
15651565
SCORER_PERSISTENCE_KEY,
1566-
&scorer.encode(),
1566+
scorer.encode(),
15671567
)?;
15681568
}
15691569
if let Some(network_graph) = gossip_sync.network_graph() {
15701570
kv_store.write(
15711571
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
15721572
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
15731573
NETWORK_GRAPH_PERSISTENCE_KEY,
1574-
&network_graph.encode(),
1574+
network_graph.encode(),
15751575
)?;
15761576
}
15771577
Ok(())
@@ -1916,7 +1916,7 @@ mod tests {
19161916
}
19171917

19181918
fn write(
1919-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
1919+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
19201920
) -> lightning::io::Result<()> {
19211921
if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
19221922
&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE

lightning-persister/src/fs_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ impl KVStoreSync for FilesystemStore {
123123
}
124124

125125
fn write(
126-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
126+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
127127
) -> lightning::io::Result<()> {
128128
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
129129

lightning-persister/src/test_utils.rs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,23 @@ use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event
1313
use std::panic::RefUnwindSafe;
1414

1515
pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(kv_store: &K) {
16-
let data = [42u8; 32];
16+
let data = vec![42u8; 32];
1717

1818
let primary_namespace = "testspace";
1919
let secondary_namespace = "testsubspace";
2020
let key = "testkey";
2121

2222
// Test the basic KVStore operations.
23-
kv_store.write(primary_namespace, secondary_namespace, key, &data).unwrap();
23+
kv_store.write(primary_namespace, secondary_namespace, key, data.clone()).unwrap();
2424

2525
// Test empty primary_namespace/secondary_namespace is allowed, but not empty primary_namespace
2626
// and non-empty secondary_namespace, and not empty key.
27-
kv_store.write("", "", key, &data).unwrap();
28-
let res = std::panic::catch_unwind(|| kv_store.write("", secondary_namespace, key, &data));
27+
kv_store.write("", "", key, data.clone()).unwrap();
28+
let res =
29+
std::panic::catch_unwind(|| kv_store.write("", secondary_namespace, key, data.clone()));
2930
assert!(res.is_err());
3031
let res = std::panic::catch_unwind(|| {
31-
kv_store.write(primary_namespace, secondary_namespace, "", &data)
32+
kv_store.write(primary_namespace, secondary_namespace, "", data.clone())
3233
});
3334
assert!(res.is_err());
3435

@@ -47,7 +48,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
4748
// Ensure we have no issue operating with primary_namespace/secondary_namespace/key being
4849
// KVSTORE_NAMESPACE_KEY_MAX_LEN
4950
let max_chars = "A".repeat(KVSTORE_NAMESPACE_KEY_MAX_LEN);
50-
kv_store.write(&max_chars, &max_chars, &max_chars, &data).unwrap();
51+
kv_store.write(&max_chars, &max_chars, &max_chars, data.clone()).unwrap();
5152

5253
let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
5354
assert_eq!(listed_keys.len(), 1);
@@ -66,7 +67,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
6667
source_store: &mut S, target_store: &mut T,
6768
) {
6869
// We fill the source with some bogus keys.
69-
let dummy_data = [42u8; 32];
70+
let dummy_data = vec![42u8; 32];
7071
let num_primary_namespaces = 3;
7172
let num_secondary_namespaces = 3;
7273
let num_keys = 3;
@@ -87,7 +88,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
8788
let key =
8889
format!("testkey{}", KVSTORE_NAMESPACE_KEY_ALPHABET.chars().nth(k).unwrap());
8990
source_store
90-
.write(&primary_namespace, &secondary_namespace, &key, &dummy_data)
91+
.write(&primary_namespace, &secondary_namespace, &key, dummy_data.clone())
9192
.unwrap();
9293
expected_keys.push((primary_namespace.clone(), secondary_namespace.clone(), key));
9394
}
@@ -107,7 +108,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
107108
assert_eq!(target_list, expected_keys);
108109

109110
for (p, s, k) in expected_keys.iter() {
110-
assert_eq!(target_store.read(p, s, k).unwrap(), dummy_data);
111+
assert_eq!(target_store.read(p, s, k).unwrap(), dummy_data.clone());
111112
}
112113
}
113114

lightning/src/util/persist.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ pub trait KVStoreSync {
118118
) -> Result<Vec<u8>, io::Error>;
119119
/// A synchronous version of the [`KVStore::write`] method.
120120
fn write(
121-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
121+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
122122
) -> Result<(), io::Error>;
123123
/// A synchronous version of the [`KVStore::remove`] method.
124124
fn remove(
@@ -159,7 +159,7 @@ where
159159
}
160160

161161
fn write(
162-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
162+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
163163
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
164164
let res = self.0.write(primary_namespace, secondary_namespace, key, buf);
165165

@@ -233,7 +233,7 @@ pub trait KVStore {
233233
///
234234
/// Will create the given `primary_namespace` and `secondary_namespace` if not already present in the store.
235235
fn write(
236-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
236+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
237237
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>>;
238238
/// Removes any data that had previously been persisted under the given `key`.
239239
///
@@ -291,7 +291,7 @@ pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(
291291

292292
for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
293293
let data = source_store.read(primary_namespace, secondary_namespace, key)?;
294-
target_store.write(primary_namespace, secondary_namespace, key, &data)?;
294+
target_store.write(primary_namespace, secondary_namespace, key, data)?;
295295
}
296296

297297
Ok(())
@@ -310,7 +310,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<Channel
310310
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
311311
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
312312
&monitor_name.to_string(),
313-
&monitor.encode(),
313+
monitor.encode(),
314314
) {
315315
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
316316
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
@@ -325,7 +325,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<Channel
325325
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
326326
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
327327
&monitor_name.to_string(),
328-
&monitor.encode(),
328+
monitor.encode(),
329329
) {
330330
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
331331
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
@@ -346,7 +346,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<Channel
346346
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
347347
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
348348
monitor_key.as_str(),
349-
&monitor,
349+
monitor,
350350
) {
351351
Ok(()) => {},
352352
Err(_e) => return,
@@ -763,7 +763,7 @@ where
763763
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
764764
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
765765
monitor_key.as_str(),
766-
&monitor_bytes,
766+
monitor_bytes,
767767
) {
768768
Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
769769
Err(e) => {
@@ -804,7 +804,7 @@ where
804804
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
805805
monitor_key.as_str(),
806806
update_name.as_str(),
807-
&update.encode(),
807+
update.encode(),
808808
) {
809809
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
810810
Err(e) => {
@@ -876,7 +876,7 @@ where
876876
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
877877
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
878878
monitor_key.as_str(),
879-
&monitor.encode(),
879+
monitor.encode(),
880880
) {
881881
Ok(()) => {},
882882
Err(_e) => return,
@@ -1487,7 +1487,7 @@ mod tests {
14871487
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
14881488
&monitor_name.to_string(),
14891489
UpdateName::from(1).as_str(),
1490-
&[0u8; 1],
1490+
vec![0u8; 1],
14911491
)
14921492
.unwrap();
14931493

lightning/src/util/sweep.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -659,7 +659,7 @@ where
659659
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
660660
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
661661
OUTPUT_SWEEPER_PERSISTENCE_KEY,
662-
&encoded,
662+
encoded,
663663
)
664664
}
665665

lightning/src/util/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -858,7 +858,7 @@ impl KVStoreSync for TestStore {
858858
}
859859

860860
fn write(
861-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
861+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
862862
) -> io::Result<()> {
863863
if self.read_only {
864864
return Err(io::Error::new(
@@ -875,7 +875,7 @@ impl KVStoreSync for TestStore {
875875
};
876876
let outer_e = persisted_lock.entry(prefixed).or_insert(new_hash_map());
877877
let mut bytes = Vec::new();
878-
bytes.write_all(buf)?;
878+
bytes.write_all(&buf)?;
879879
outer_e.insert(key.to_string(), bytes);
880880
Ok(())
881881
}

0 commit comments

Comments
 (0)