From 98663094344d4cf1a902e15f9c82eff1b6e2de83 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Thu, 24 Jul 2025 11:27:03 +0200 Subject: [PATCH 1/2] Add async implementation of FilesystemStore --- lightning-persister/Cargo.toml | 5 + lightning-persister/src/fs_store.rs | 308 ++++++++++++++++++++++------ 2 files changed, 255 insertions(+), 58 deletions(-) diff --git a/lightning-persister/Cargo.toml b/lightning-persister/Cargo.toml index ec34fa8a88d..593e19a95f7 100644 --- a/lightning-persister/Cargo.toml +++ b/lightning-persister/Cargo.toml @@ -13,9 +13,13 @@ edition = "2021" all-features = true rustdoc-args = ["--cfg", "docsrs"] +[features] +tokio = ["dep:tokio"] + [dependencies] bitcoin = "0.32.2" lightning = { version = "0.2.0", path = "../lightning" } +tokio = { version = "1.35", optional = true, default-features = false, features = ["rt-multi-thread"] } [target.'cfg(windows)'.dependencies] windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] } @@ -26,6 +30,7 @@ criterion = { version = "0.4", optional = true, default-features = false } [dev-dependencies] lightning = { version = "0.2.0", path = "../lightning", features = ["_test_utils"] } bitcoin = { version = "0.32.2", default-features = false } +tokio = { version = "1.35", default-features = false, features = ["macros"] } [lints] workspace = true diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index a9edb4e2e6f..d27add4e592 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -8,9 +8,18 @@ use std::collections::HashMap; use std::fs; use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +#[cfg(feature = "tokio")] +use std::sync::atomic::AtomicU64; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex, RwLock}; +#[cfg(feature = "tokio")] +use core::future::Future; +#[cfg(feature = "tokio")] +use core::pin::Pin; +#[cfg(feature = "tokio")] +use lightning::util::persist::KVStore; + #[cfg(target_os = "windows")] use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt}; @@ -30,19 +39,29 @@ fn path_to_windows_str>(path: &T) -> Vec { path.as_ref().encode_wide().chain(Some(0)).collect() } -// The number of read/write/remove/list operations after which we clean up our `locks` HashMap. -const GC_LOCK_INTERVAL: usize = 25; - // The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching // a consistent view and error out. const LIST_DIR_CONSISTENCY_RETRIES: usize = 10; -/// A [`KVStoreSync`] implementation that writes to and reads from the file system. -pub struct FilesystemStore { +struct FilesystemStoreInner { data_dir: PathBuf, tmp_file_counter: AtomicUsize, - gc_counter: AtomicUsize, - locks: Mutex>>>, + + // Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the + // latest written version per key. + locks: Mutex>>>, +} + +/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system. +/// +/// [`KVStore`]: lightning::util::persist::KVStore +pub struct FilesystemStore { + inner: Arc, + + // Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove + // operations aren't sensitive to the order of execution. + #[cfg(feature = "tokio")] + version_counter: AtomicU64, } impl FilesystemStore { @@ -50,27 +69,70 @@ impl FilesystemStore { pub fn new(data_dir: PathBuf) -> Self { let locks = Mutex::new(HashMap::new()); let tmp_file_counter = AtomicUsize::new(0); - let gc_counter = AtomicUsize::new(1); - Self { data_dir, tmp_file_counter, gc_counter, locks } + Self { + inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }), + #[cfg(feature = "tokio")] + version_counter: AtomicU64::new(0), + } } /// Returns the data directory. pub fn get_data_dir(&self) -> PathBuf { - self.data_dir.clone() + self.inner.data_dir.clone() } +} - fn garbage_collect_locks(&self) { - let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel); +impl KVStoreSync for FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Result, lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "read", + )?; + self.inner.read(path) + } - if gc_counter % GC_LOCK_INTERVAL == 0 { - // Take outer lock for the cleanup. - let mut outer_lock = self.locks.lock().unwrap(); + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Result<(), lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + )?; + self.inner.write_version(path, buf, None) + } - // Garbage collect all lock entries that are not referenced anymore. - outer_lock.retain(|_, v| Arc::strong_count(&v) > 1); - } + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Result<(), lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + )?; + self.inner.remove(path, lazy) } + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Result, lightning::io::Error> { + let path = self.inner.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list", + )?; + self.inner.list(path) + } +} + +impl FilesystemStoreInner { fn get_dest_dir_path( &self, primary_namespace: &str, secondary_namespace: &str, ) -> std::io::Result { @@ -94,17 +156,22 @@ impl FilesystemStore { Ok(dest_dir_path) } -} -impl KVStoreSync for FilesystemStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> lightning::io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + fn get_checked_dest_file_path( + &self, primary_namespace: &str, secondary_namespace: &str, key: Option<&str>, + operation: &str, + ) -> lightning::io::Result { + check_namespace_key_validity(primary_namespace, secondary_namespace, key, operation)?; let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); + if let Some(key) = key { + dest_file_path.push(key); + } + Ok(dest_file_path) + } + + fn read(&self, dest_file_path: PathBuf) -> lightning::io::Result> { let mut buf = Vec::new(); { let inner_lock_ref = { @@ -117,19 +184,14 @@ impl KVStoreSync for FilesystemStore { f.read_to_end(&mut buf)?; } - self.garbage_collect_locks(); - Ok(buf) } - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + /// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function + /// returns early without writing. + fn write_version( + &self, dest_file_path: PathBuf, buf: Vec, version: Option, ) -> lightning::io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; - - let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); - let parent_directory = dest_file_path.parent().ok_or_else(|| { let msg = format!("Could not retrieve parent directory of {}.", dest_file_path.display()); @@ -157,7 +219,18 @@ impl KVStoreSync for FilesystemStore { let mut outer_lock = self.locks.lock().unwrap(); Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) }; - let _guard = inner_lock_ref.write().unwrap(); + let mut last_written_version = inner_lock_ref.write().unwrap(); + + // If a version is provided, we check if we already have a newer version written. This is used in async + // contexts to realize eventual consistency. + if let Some(version) = version { + if version <= *last_written_version { + // If the version is not greater, we don't write the file. + return Ok(()); + } + + *last_written_version = version; + } #[cfg(not(target_os = "windows"))] { @@ -204,19 +277,10 @@ impl KVStoreSync for FilesystemStore { } }; - self.garbage_collect_locks(); - res } - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> lightning::io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; - - let mut dest_file_path = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - dest_file_path.push(key); - + fn remove(&self, dest_file_path: PathBuf, lazy: bool) -> lightning::io::Result<()> { if !dest_file_path.is_file() { return Ok(()); } @@ -299,18 +363,10 @@ impl KVStoreSync for FilesystemStore { } } - self.garbage_collect_locks(); - Ok(()) } - fn list( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> lightning::io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; - - let prefixed_dest = self.get_dest_dir_path(primary_namespace, secondary_namespace)?; - + fn list(&self, prefixed_dest: PathBuf) -> lightning::io::Result> { if !Path::new(&prefixed_dest).exists() { return Ok(Vec::new()); } @@ -351,12 +407,106 @@ impl KVStoreSync for FilesystemStore { break 'retry_list; } - self.garbage_collect_locks(); - Ok(keys) } } +#[cfg(feature = "tokio")] +impl KVStore for FilesystemStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Pin, lightning::io::Error>> + 'static + Send>> { + let this = Arc::clone(&self.inner); + let path = match this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "read", + ) { + Ok(path) => path, + Err(e) => return Box::pin(async move { Err(e) }), + }; + + Box::pin(async move { + tokio::task::spawn_blocking(move || this.read(path)).await.unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + }) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> Pin> + 'static + Send>> { + let this = Arc::clone(&self.inner); + let path = match this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "write", + ) { + Ok(path) => path, + Err(e) => return Box::pin(async move { Err(e) }), + }; + + // Obtain a version number to retain the call sequence. + let version = self.version_counter.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FilesystemStore version counter overflowed"); + } + + Box::pin(async move { + tokio::task::spawn_blocking(move || this.write_version(path, buf, Some(version))) + .await + .unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + }) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> Pin> + 'static + Send>> { + let this = Arc::clone(&self.inner); + let path = match this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + Some(key), + "remove", + ) { + Ok(path) => path, + Err(e) => return Box::pin(async move { Err(e) }), + }; + + Box::pin(async move { + tokio::task::spawn_blocking(move || this.remove(path, lazy)).await.unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + }) + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> Pin, lightning::io::Error>> + 'static + Send>> { + let this = Arc::clone(&self.inner); + + let path = match this.get_checked_dest_file_path( + primary_namespace, + secondary_namespace, + None, + "list", + ) { + Ok(path) => path, + Err(e) => return Box::pin(async move { Err(e) }), + }; + + Box::pin(async move { + tokio::task::spawn_blocking(move || this.list(path)).await.unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) + }) + } +} + fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result { let p = dir_entry.path(); if let Some(ext) = p.extension() { @@ -447,7 +597,7 @@ fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result Result, lightning::io::Error> { - let prefixed_dest = &self.data_dir; + let prefixed_dest = &self.inner.data_dir; if !prefixed_dest.exists() { return Ok(Vec::new()); } @@ -534,7 +684,7 @@ mod tests { fn drop(&mut self) { // We test for invalid directory names, so it's OK if directory removal // fails. - match fs::remove_dir_all(&self.data_dir) { + match fs::remove_dir_all(&self.inner.data_dir) { Err(e) => println!("Failed to remove test persister directory: {}", e), _ => {}, } @@ -549,6 +699,48 @@ mod tests { do_read_write_remove_list_persist(&fs_store); } + #[cfg(feature = "tokio")] + #[tokio::test] + async fn read_write_remove_list_persist_async() { + use crate::fs_store::FilesystemStore; + use lightning::util::persist::KVStore; + use std::sync::Arc; + + let mut temp_path = std::env::temp_dir(); + temp_path.push("test_read_write_remove_list_persist_async"); + let fs_store: Arc = Arc::new(FilesystemStore::new(temp_path)); + + let data1 = vec![42u8; 32]; + let data2 = vec![43u8; 32]; + + let primary_namespace = "testspace"; + let secondary_namespace = "testsubspace"; + let key = "testkey"; + + // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure + // that eventual consistency works. + let fut1 = fs_store.write(primary_namespace, secondary_namespace, key, data1); + let fut2 = fs_store.write(primary_namespace, secondary_namespace, key, data2.clone()); + + fut2.await.unwrap(); + fut1.await.unwrap(); + + // Test list. + let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); + assert_eq!(listed_keys.len(), 1); + assert_eq!(listed_keys[0], key); + + // Test read. We expect to read data2, as the write call was initiated later. + let read_data = fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap(); + assert_eq!(data2, &*read_data); + + // Test remove. + fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap(); + + let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); + assert_eq!(listed_keys.len(), 0); + } + #[test] fn test_data_migration() { let mut source_temp_path = std::env::temp_dir(); From f4e8d62d81a807e665356ba9207744e69812dbf5 Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Fri, 1 Aug 2025 20:21:14 +0200 Subject: [PATCH 2/2] f: versioned remove --- lightning-persister/src/fs_store.rs | 43 ++++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index d27add4e592..215949fbab1 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -116,7 +116,7 @@ impl KVStoreSync for FilesystemStore { Some(key), "remove", )?; - self.inner.remove(path, lazy) + self.inner.remove_version(path, lazy, None) } fn list( @@ -221,11 +221,11 @@ impl FilesystemStoreInner { }; let mut last_written_version = inner_lock_ref.write().unwrap(); - // If a version is provided, we check if we already have a newer version written. This is used in async - // contexts to realize eventual consistency. + // If a version is provided, we check if we already have a newer version written/removed. This is used in + // async contexts to realize eventual consistency. if let Some(version) = version { if version <= *last_written_version { - // If the version is not greater, we don't write the file. + // If the version is not greater, we don't touch the file. return Ok(()); } @@ -280,7 +280,9 @@ impl FilesystemStoreInner { res } - fn remove(&self, dest_file_path: PathBuf, lazy: bool) -> lightning::io::Result<()> { + fn remove_version( + &self, dest_file_path: PathBuf, lazy: bool, version: Option, + ) -> lightning::io::Result<()> { if !dest_file_path.is_file() { return Ok(()); } @@ -290,7 +292,18 @@ impl FilesystemStoreInner { let mut outer_lock = self.locks.lock().unwrap(); Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default()) }; - let _guard = inner_lock_ref.write().unwrap(); + let mut last_written_version = inner_lock_ref.write().unwrap(); + + // If a version is provided, we check if we already have a newer version written/removed. This is used in + // async contexts to realize eventual consistency. + if let Some(version) = version { + if version <= *last_written_version { + // If the version is not greater, we don't touch the file. + return Ok(()); + } + + *last_written_version = version; + } if lazy { // If we're lazy we just call remove and be done with it. @@ -477,10 +490,18 @@ impl KVStore for FilesystemStore { Err(e) => return Box::pin(async move { Err(e) }), }; + // Obtain a version number to retain the call sequence. + let version = self.version_counter.fetch_add(1, Ordering::Relaxed); + if version == u64::MAX { + panic!("FilesystemStore version counter overflowed"); + } + Box::pin(async move { - tokio::task::spawn_blocking(move || this.remove(path, lazy)).await.unwrap_or_else(|e| { - Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) - }) + tokio::task::spawn_blocking(move || this.remove_version(path, lazy, Some(version))) + .await + .unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) }) } @@ -720,8 +741,10 @@ mod tests { // Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure // that eventual consistency works. let fut1 = fs_store.write(primary_namespace, secondary_namespace, key, data1); - let fut2 = fs_store.write(primary_namespace, secondary_namespace, key, data2.clone()); + let fut2 = fs_store.remove(primary_namespace, secondary_namespace, key, false); + let fut3 = fs_store.write(primary_namespace, secondary_namespace, key, data2.clone()); + fut3.await.unwrap(); fut2.await.unwrap(); fut1.await.unwrap();