Skip to content

Commit 38ab949

Browse files
committed
Add async implementation of FilesystemStore
1 parent 55baa15 commit 38ab949

File tree

2 files changed

+198
-32
lines changed

2 files changed

+198
-32
lines changed

lightning-persister/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@ edition = "2021"
1313
all-features = true
1414
rustdoc-args = ["--cfg", "docsrs"]
1515

16+
[features]
17+
tokio = ["dep:tokio"]
18+
1619
[dependencies]
1720
bitcoin = "0.32.2"
1821
lightning = { version = "0.2.0", path = "../lightning" }
22+
tokio = { version = "1.35", optional = true, features = [ "macros", "rt-multi-thread" ] }
1923

2024
[target.'cfg(windows)'.dependencies]
2125
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }

lightning-persister/src/fs_store.rs

Lines changed: 194 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,18 @@ use std::collections::HashMap;
88
use std::fs;
99
use std::io::{Read, Write};
1010
use std::path::{Path, PathBuf};
11+
#[cfg(feature = "tokio")]
12+
use std::sync::atomic::AtomicU64;
1113
use std::sync::atomic::{AtomicUsize, Ordering};
1214
use std::sync::{Arc, Mutex, RwLock};
1315

16+
#[cfg(feature = "tokio")]
17+
use core::future::Future;
18+
#[cfg(feature = "tokio")]
19+
use core::pin::Pin;
20+
#[cfg(feature = "tokio")]
21+
use lightning::util::persist::KVStore;
22+
1423
#[cfg(target_os = "windows")]
1524
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
1625

@@ -30,43 +39,70 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3039
path.as_ref().encode_wide().chain(Some(0)).collect()
3140
}
3241

33-
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34-
const GC_LOCK_INTERVAL: usize = 25;
35-
36-
/// A [`KVStoreSync`] implementation that writes to and reads from the file system.
37-
pub struct FilesystemStore {
42+
struct FilesystemStoreInner {
3843
data_dir: PathBuf,
3944
tmp_file_counter: AtomicUsize,
40-
gc_counter: AtomicUsize,
41-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
45+
46+
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
47+
// latest written version per key.
48+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<u64>>>>,
49+
}
50+
51+
/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system.
52+
pub struct FilesystemStore {
53+
inner: Arc<FilesystemStoreInner>,
54+
55+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
56+
// operations aren't sensitive to the order of execution.
57+
#[cfg(feature = "tokio")]
58+
version_counter: AtomicU64,
4259
}
4360

4461
impl FilesystemStore {
4562
/// Constructs a new [`FilesystemStore`].
4663
pub fn new(data_dir: PathBuf) -> Self {
4764
let locks = Mutex::new(HashMap::new());
4865
let tmp_file_counter = AtomicUsize::new(0);
49-
let gc_counter = AtomicUsize::new(1);
50-
Self { data_dir, tmp_file_counter, gc_counter, locks }
66+
Self {
67+
inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }),
68+
#[cfg(feature = "tokio")]
69+
version_counter: AtomicU64::new(0),
70+
}
5171
}
5272

5373
/// Returns the data directory.
5474
pub fn get_data_dir(&self) -> PathBuf {
55-
self.data_dir.clone()
75+
self.inner.data_dir.clone()
5676
}
77+
}
5778

58-
fn garbage_collect_locks(&self) {
59-
let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
79+
impl KVStoreSync for FilesystemStore {
80+
fn read(
81+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
82+
) -> Result<Vec<u8>, lightning::io::Error> {
83+
self.inner.read(primary_namespace, secondary_namespace, key)
84+
}
6085

61-
if gc_counter % GC_LOCK_INTERVAL == 0 {
62-
// Take outer lock for the cleanup.
63-
let mut outer_lock = self.locks.lock().unwrap();
86+
fn write(
87+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
88+
) -> Result<(), lightning::io::Error> {
89+
self.inner.write_version(primary_namespace, secondary_namespace, key, buf, None)
90+
}
6491

65-
// Garbage collect all lock entries that are not referenced anymore.
66-
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
67-
}
92+
fn remove(
93+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
94+
) -> Result<(), lightning::io::Error> {
95+
self.inner.remove(primary_namespace, secondary_namespace, key, lazy)
96+
}
97+
98+
fn list(
99+
&self, primary_namespace: &str, secondary_namespace: &str,
100+
) -> Result<Vec<String>, lightning::io::Error> {
101+
self.inner.list(primary_namespace, secondary_namespace)
68102
}
103+
}
69104

105+
impl FilesystemStoreInner {
70106
fn get_dest_dir_path(
71107
&self, primary_namespace: &str, secondary_namespace: &str,
72108
) -> std::io::Result<PathBuf> {
@@ -90,9 +126,7 @@ impl FilesystemStore {
90126

91127
Ok(dest_dir_path)
92128
}
93-
}
94129

95-
impl KVStoreSync for FilesystemStore {
96130
fn read(
97131
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
98132
) -> lightning::io::Result<Vec<u8>> {
@@ -113,13 +147,14 @@ impl KVStoreSync for FilesystemStore {
113147
f.read_to_end(&mut buf)?;
114148
}
115149

116-
self.garbage_collect_locks();
117-
118150
Ok(buf)
119151
}
120152

121-
fn write(
153+
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
154+
/// returns early without writing.
155+
fn write_version(
122156
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
157+
version: Option<u64>,
123158
) -> lightning::io::Result<()> {
124159
check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?;
125160

@@ -153,7 +188,18 @@ impl KVStoreSync for FilesystemStore {
153188
let mut outer_lock = self.locks.lock().unwrap();
154189
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
155190
};
156-
let _guard = inner_lock_ref.write().unwrap();
191+
let mut last_written_version = inner_lock_ref.write().unwrap();
192+
193+
// If a version is provided, we check if we already have a newer version written. This is used in async
194+
// contexts to realize eventual consistency.
195+
if let Some(version) = version {
196+
if version <= *last_written_version {
197+
// If the version is not greater, we don't write the file.
198+
return Ok(());
199+
}
200+
201+
*last_written_version = version;
202+
}
157203

158204
#[cfg(not(target_os = "windows"))]
159205
{
@@ -200,8 +246,6 @@ impl KVStoreSync for FilesystemStore {
200246
}
201247
};
202248

203-
self.garbage_collect_locks();
204-
205249
res
206250
}
207251

@@ -295,8 +339,6 @@ impl KVStoreSync for FilesystemStore {
295339
}
296340
}
297341

298-
self.garbage_collect_locks();
299-
300342
Ok(())
301343
}
302344

@@ -325,12 +367,90 @@ impl KVStoreSync for FilesystemStore {
325367
keys.push(key);
326368
}
327369

328-
self.garbage_collect_locks();
329-
330370
Ok(keys)
331371
}
332372
}
333373

374+
#[cfg(feature = "tokio")]
375+
impl KVStore for FilesystemStore {
376+
fn read(
377+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
378+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
379+
let primary_namespace = primary_namespace.to_string();
380+
let secondary_namespace = secondary_namespace.to_string();
381+
let key = key.to_string();
382+
let this = Arc::clone(&self.inner);
383+
384+
Box::pin(async move {
385+
tokio::task::spawn_blocking(move || {
386+
this.read(&primary_namespace, &secondary_namespace, &key)
387+
})
388+
.await
389+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
390+
})
391+
}
392+
393+
fn write(
394+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8],
395+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
396+
let primary_namespace = primary_namespace.to_string();
397+
let secondary_namespace = secondary_namespace.to_string();
398+
let key = key.to_string();
399+
let buf = buf.to_vec();
400+
let this = Arc::clone(&self.inner);
401+
402+
// Obtain a version number to retain the call sequence.
403+
let version = self.version_counter.fetch_add(1, Ordering::SeqCst);
404+
405+
Box::pin(async move {
406+
tokio::task::spawn_blocking(move || {
407+
this.write_version(
408+
&primary_namespace,
409+
&secondary_namespace,
410+
&key,
411+
&buf,
412+
Some(version),
413+
)
414+
})
415+
.await
416+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
417+
})
418+
}
419+
420+
fn remove(
421+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
422+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
423+
let primary_namespace = primary_namespace.to_string();
424+
let secondary_namespace = secondary_namespace.to_string();
425+
let key = key.to_string();
426+
let this = Arc::clone(&self.inner);
427+
428+
Box::pin(async move {
429+
tokio::task::spawn_blocking(move || {
430+
this.remove(&primary_namespace, &secondary_namespace, &key, lazy)
431+
})
432+
.await
433+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
434+
})
435+
}
436+
437+
fn list(
438+
&self, primary_namespace: &str, secondary_namespace: &str,
439+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send>> {
440+
let primary_namespace = primary_namespace.to_string();
441+
let secondary_namespace = secondary_namespace.to_string();
442+
let this = Arc::clone(&self.inner);
443+
444+
Box::pin(async move {
445+
tokio::task::spawn_blocking(move || this.list(&primary_namespace, &secondary_namespace))
446+
.await
447+
.unwrap_or_else(|e| {
448+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
449+
})
450+
})
451+
}
452+
}
453+
334454
fn dir_entry_is_key(p: &Path) -> Result<bool, lightning::io::Error> {
335455
if let Some(ext) = p.extension() {
336456
#[cfg(target_os = "windows")]
@@ -427,7 +547,7 @@ fn get_key_from_dir_entry(p: &Path, base_path: &Path) -> Result<String, lightnin
427547

428548
impl MigratableKVStore for FilesystemStore {
429549
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
430-
let prefixed_dest = &self.data_dir;
550+
let prefixed_dest = &self.inner.data_dir;
431551
if !prefixed_dest.exists() {
432552
return Ok(Vec::new());
433553
}
@@ -511,7 +631,7 @@ mod tests {
511631
fn drop(&mut self) {
512632
// We test for invalid directory names, so it's OK if directory removal
513633
// fails.
514-
match fs::remove_dir_all(&self.data_dir) {
634+
match fs::remove_dir_all(&self.inner.data_dir) {
515635
Err(e) => println!("Failed to remove test persister directory: {}", e),
516636
_ => {},
517637
}
@@ -526,6 +646,48 @@ mod tests {
526646
do_read_write_remove_list_persist(&fs_store);
527647
}
528648

649+
#[cfg(feature = "tokio")]
650+
#[tokio::test]
651+
async fn read_write_remove_list_persist_async() {
652+
use crate::fs_store::FilesystemStore;
653+
use lightning::util::persist::KVStore;
654+
use std::sync::Arc;
655+
656+
let mut temp_path = std::env::temp_dir();
657+
temp_path.push("test_read_write_remove_list_persist_async");
658+
let fs_store: Arc<dyn KVStore> = Arc::new(FilesystemStore::new(temp_path));
659+
660+
let data1 = [42u8; 32];
661+
let data2 = [43u8; 32];
662+
663+
let primary_namespace = "testspace";
664+
let secondary_namespace = "testsubspace";
665+
let key = "testkey";
666+
667+
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
668+
// that eventual consistency works.
669+
let fut1 = fs_store.write(primary_namespace, secondary_namespace, key, &data1);
670+
let fut2 = fs_store.write(primary_namespace, secondary_namespace, key, &data2);
671+
672+
fut2.await.unwrap();
673+
fut1.await.unwrap();
674+
675+
// Test list.
676+
let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
677+
assert_eq!(listed_keys.len(), 1);
678+
assert_eq!(listed_keys[0], key);
679+
680+
// Test read. We expect to read data2, as the write call was initiated later.
681+
let read_data = fs_store.read(primary_namespace, secondary_namespace, key).await.unwrap();
682+
assert_eq!(data2, &*read_data);
683+
684+
// Test remove.
685+
fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
686+
687+
let listed_keys = fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
688+
assert_eq!(listed_keys.len(), 0);
689+
}
690+
529691
#[test]
530692
fn test_data_migration() {
531693
let mut source_temp_path = std::env::temp_dir();

0 commit comments

Comments
 (0)