Skip to content

Update KVStore and KVStoreSync arguments to being owned #3974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 17 additions & 17 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,14 +593,14 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
/// # struct StoreSync {}
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
/// # }
/// # struct Store {}
/// # impl lightning::util::persist::KVStore for Store {
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
/// # }
Expand Down Expand Up @@ -796,7 +796,7 @@ where
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY,
&scorer.encode(),
scorer.encode(),
)
.await
{
Expand Down Expand Up @@ -932,7 +932,7 @@ where
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
&channel_manager.get_cm().encode(),
channel_manager.get_cm().encode(),
)
.await
};
Expand Down Expand Up @@ -977,7 +977,7 @@ where
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
&network_graph.encode(),
network_graph.encode(),
)
.await
{
Expand Down Expand Up @@ -1020,7 +1020,7 @@ where
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY,
&scorer.encode(),
scorer.encode(),
)
.await
{
Expand Down Expand Up @@ -1128,7 +1128,7 @@ where
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
&channel_manager.get_cm().encode(),
channel_manager.get_cm().encode(),
)
.await?;
if let Some(ref scorer) = scorer {
Expand All @@ -1137,7 +1137,7 @@ where
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY,
&scorer.encode(),
scorer.encode(),
)
.await?;
}
Expand All @@ -1147,7 +1147,7 @@ where
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
&network_graph.encode(),
network_graph.encode(),
)
.await?;
}
Expand Down Expand Up @@ -1352,7 +1352,7 @@ impl BackgroundProcessor {
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY,
&scorer.encode(),
scorer.encode(),
) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
Expand Down Expand Up @@ -1452,7 +1452,7 @@ impl BackgroundProcessor {
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
&channel_manager.get_cm().encode(),
channel_manager.get_cm().encode(),
))?;
log_trace!(logger, "Done persisting ChannelManager.");
}
Expand Down Expand Up @@ -1484,7 +1484,7 @@ impl BackgroundProcessor {
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
&network_graph.encode(),
network_graph.encode(),
) {
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}",e);
}
Expand Down Expand Up @@ -1513,7 +1513,7 @@ impl BackgroundProcessor {
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY,
&scorer.encode(),
scorer.encode(),
) {
log_error!(logger,
"Error: Failed to persist scorer, check your disk and permissions {}",
Expand Down Expand Up @@ -1556,22 +1556,22 @@ impl BackgroundProcessor {
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_KEY,
&channel_manager.get_cm().encode(),
channel_manager.get_cm().encode(),
)?;
if let Some(ref scorer) = scorer {
kv_store.write(
SCORER_PERSISTENCE_PRIMARY_NAMESPACE,
SCORER_PERSISTENCE_SECONDARY_NAMESPACE,
SCORER_PERSISTENCE_KEY,
&scorer.encode(),
scorer.encode(),
)?;
}
if let Some(network_graph) = gossip_sync.network_graph() {
kv_store.write(
NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE,
NETWORK_GRAPH_PERSISTENCE_KEY,
&network_graph.encode(),
network_graph.encode(),
)?;
}
Ok(())
Expand Down Expand Up @@ -1916,7 +1916,7 @@ mod tests {
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> lightning::io::Result<()> {
if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE
&& secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE
Expand Down
2 changes: 1 addition & 1 deletion lightning-persister/src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl KVStoreSync for FilesystemStore {
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> lightning::io::Result<()> {
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;

Expand Down
19 changes: 10 additions & 9 deletions lightning-persister/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,23 @@ use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event
use std::panic::RefUnwindSafe;

pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(kv_store: &K) {
let data = [42u8; 32];
let data = vec![42u8; 32];

let primary_namespace = "testspace";
let secondary_namespace = "testsubspace";
let key = "testkey";

// Test the basic KVStore operations.
kv_store.write(primary_namespace, secondary_namespace, key, &data).unwrap();
kv_store.write(primary_namespace, secondary_namespace, key, data.clone()).unwrap();

// Test empty primary_namespace/secondary_namespace is allowed, but not empty primary_namespace
// and non-empty secondary_namespace, and not empty key.
kv_store.write("", "", key, &data).unwrap();
let res = std::panic::catch_unwind(|| kv_store.write("", secondary_namespace, key, &data));
kv_store.write("", "", key, data.clone()).unwrap();
let res =
std::panic::catch_unwind(|| kv_store.write("", secondary_namespace, key, data.clone()));
assert!(res.is_err());
let res = std::panic::catch_unwind(|| {
kv_store.write(primary_namespace, secondary_namespace, "", &data)
kv_store.write(primary_namespace, secondary_namespace, "", data.clone())
});
assert!(res.is_err());

Expand All @@ -47,7 +48,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
// Ensure we have no issue operating with primary_namespace/secondary_namespace/key being
// KVSTORE_NAMESPACE_KEY_MAX_LEN
let max_chars = "A".repeat(KVSTORE_NAMESPACE_KEY_MAX_LEN);
kv_store.write(&max_chars, &max_chars, &max_chars, &data).unwrap();
kv_store.write(&max_chars, &max_chars, &max_chars, data.clone()).unwrap();

let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
assert_eq!(listed_keys.len(), 1);
Expand All @@ -66,7 +67,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
source_store: &mut S, target_store: &mut T,
) {
// We fill the source with some bogus keys.
let dummy_data = [42u8; 32];
let dummy_data = vec![42u8; 32];
let num_primary_namespaces = 3;
let num_secondary_namespaces = 3;
let num_keys = 3;
Expand All @@ -87,7 +88,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
let key =
format!("testkey{}", KVSTORE_NAMESPACE_KEY_ALPHABET.chars().nth(k).unwrap());
source_store
.write(&primary_namespace, &secondary_namespace, &key, &dummy_data)
.write(&primary_namespace, &secondary_namespace, &key, dummy_data.clone())
.unwrap();
expected_keys.push((primary_namespace.clone(), secondary_namespace.clone(), key));
}
Expand All @@ -107,7 +108,7 @@ pub(crate) fn do_test_data_migration<S: MigratableKVStore, T: MigratableKVStore>
assert_eq!(target_list, expected_keys);

for (p, s, k) in expected_keys.iter() {
assert_eq!(target_store.read(p, s, k).unwrap(), dummy_data);
assert_eq!(target_store.read(p, s, k).unwrap(), dummy_data.clone());
}
}

Expand Down
22 changes: 11 additions & 11 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ pub trait KVStoreSync {
) -> Result<Vec<u8>, io::Error>;
/// A synchronous version of the [`KVStore::write`] method.
fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> Result<(), io::Error>;
/// A synchronous version of the [`KVStore::remove`] method.
fn remove(
Expand Down Expand Up @@ -159,7 +159,7 @@ where
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> {
let res = self.0.write(primary_namespace, secondary_namespace, key, buf);

Expand Down Expand Up @@ -233,7 +233,7 @@ pub trait KVStore {
///
/// Will create the given `primary_namespace` and `secondary_namespace` if not already present in the store.
fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>>;
/// Removes any data that had previously been persisted under the given `key`.
///
Expand Down Expand Up @@ -291,7 +291,7 @@ pub fn migrate_kv_store_data<S: MigratableKVStore, T: MigratableKVStore>(

for (primary_namespace, secondary_namespace, key) in &keys_to_migrate {
let data = source_store.read(primary_namespace, secondary_namespace, key)?;
target_store.write(primary_namespace, secondary_namespace, key, &data)?;
target_store.write(primary_namespace, secondary_namespace, key, data)?;
}

Ok(())
Expand All @@ -310,7 +310,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<Channel
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&monitor_name.to_string(),
&monitor.encode(),
monitor.encode(),
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
Expand All @@ -325,7 +325,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<Channel
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
&monitor_name.to_string(),
&monitor.encode(),
monitor.encode(),
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(_) => chain::ChannelMonitorUpdateStatus::UnrecoverableError,
Expand All @@ -346,7 +346,7 @@ impl<ChannelSigner: EcdsaChannelSigner, K: KVStoreSync + ?Sized> Persist<Channel
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_key.as_str(),
&monitor,
monitor,
) {
Ok(()) => {},
Err(_e) => return,
Expand Down Expand Up @@ -763,7 +763,7 @@ where
CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_key.as_str(),
&monitor_bytes,
monitor_bytes,
) {
Ok(_) => chain::ChannelMonitorUpdateStatus::Completed,
Err(e) => {
Expand Down Expand Up @@ -804,7 +804,7 @@ where
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
monitor_key.as_str(),
update_name.as_str(),
&update.encode(),
update.encode(),
) {
Ok(()) => chain::ChannelMonitorUpdateStatus::Completed,
Err(e) => {
Expand Down Expand Up @@ -876,7 +876,7 @@ where
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE,
ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE,
monitor_key.as_str(),
&monitor.encode(),
monitor.encode(),
) {
Ok(()) => {},
Err(_e) => return,
Expand Down Expand Up @@ -1487,7 +1487,7 @@ mod tests {
CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE,
&monitor_name.to_string(),
UpdateName::from(1).as_str(),
&[0u8; 1],
vec![0u8; 1],
)
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion lightning/src/util/sweep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ where
OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
OUTPUT_SWEEPER_PERSISTENCE_KEY,
&encoded,
encoded,
)
}

Expand Down
4 changes: 2 additions & 2 deletions lightning/src/util/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ impl KVStoreSync for TestStore {
}

fn write(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>,
) -> io::Result<()> {
if self.read_only {
return Err(io::Error::new(
Expand All @@ -875,7 +875,7 @@ impl KVStoreSync for TestStore {
};
let outer_e = persisted_lock.entry(prefixed).or_insert(new_hash_map());
let mut bytes = Vec::new();
bytes.write_all(buf)?;
bytes.write_all(&buf)?;
outer_e.insert(key.to_string(), bytes);
Ok(())
}
Expand Down
Loading