Skip to content

Commit c824b41

Browse files
committed
Add async implementation of FilesystemStore
1 parent d37c595 commit c824b41

File tree

2 files changed

+212
-32
lines changed

2 files changed

+212
-32
lines changed

lightning-persister/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,13 @@ edition = "2021"
1313
all-features = true
1414
rustdoc-args = ["--cfg", "docsrs"]
1515

16+
[features]
17+
tokio = ["dep:tokio"]
18+
1619
[dependencies]
1720
bitcoin = "0.32.2"
1821
lightning = { version = "0.2.0", path = "../lightning" }
22+
tokio = { version = "1.35", optional = true, features = [ "macros", "rt-multi-thread" ] }
1923

2024
[target.'cfg(windows)'.dependencies]
2125
windows-sys = { version = "0.48.0", default-features = false, features = ["Win32_Storage_FileSystem", "Win32_Foundation"] }

lightning-persister/src/fs_store.rs

Lines changed: 208 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,18 @@ use std::collections::HashMap;
88
use std::fs;
99
use std::io::{Read, Write};
1010
use std::path::{Path, PathBuf};
11+
#[cfg(feature = "tokio")]
12+
use std::sync::atomic::AtomicU64;
1113
use std::sync::atomic::{AtomicUsize, Ordering};
1214
use std::sync::{Arc, Mutex, RwLock};
1315

16+
#[cfg(feature = "tokio")]
17+
use core::future::Future;
18+
#[cfg(feature = "tokio")]
19+
use core::pin::Pin;
20+
#[cfg(feature = "tokio")]
21+
use lightning::util::persist::KVStore;
22+
1423
#[cfg(target_os = "windows")]
1524
use {std::ffi::OsStr, std::os::windows::ffi::OsStrExt};
1625

@@ -30,47 +39,76 @@ fn path_to_windows_str<T: AsRef<OsStr>>(path: &T) -> Vec<u16> {
3039
path.as_ref().encode_wide().chain(Some(0)).collect()
3140
}
3241

33-
// The number of read/write/remove/list operations after which we clean up our `locks` HashMap.
34-
const GC_LOCK_INTERVAL: usize = 25;
35-
3642
// The number of times we retry listing keys in `FilesystemStore::list` before we give up reaching
3743
// a consistent view and error out.
3844
const LIST_DIR_CONSISTENCY_RETRIES: usize = 10;
3945

40-
/// A [`KVStoreSync`] implementation that writes to and reads from the file system.
41-
pub struct FilesystemStore {
46+
struct FilesystemStoreInner {
4247
data_dir: PathBuf,
4348
tmp_file_counter: AtomicUsize,
44-
gc_counter: AtomicUsize,
45-
locks: Mutex<HashMap<PathBuf, Arc<RwLock<()>>>>,
49+
50+
// Per path lock that ensures that we don't have concurrent writes to the same file. The lock also encapsulates the
51+
// latest written version per key.
52+
locks: Mutex<HashMap<PathBuf, Arc<RwLock<u64>>>>,
53+
}
54+
55+
/// A [`KVStore`] and [`KVStoreSync`] implementation that writes to and reads from the file system.
56+
///
57+
/// [`KVStore`]: lightning::util::persist::KVStore
58+
pub struct FilesystemStore {
59+
inner: Arc<FilesystemStoreInner>,
60+
61+
// Version counter to ensure that writes are applied in the correct order. It is assumed that read, list and remove
62+
// operations aren't sensitive to the order of execution.
63+
#[cfg(feature = "tokio")]
64+
version_counter: AtomicU64,
4665
}
4766

4867
impl FilesystemStore {
4968
/// Constructs a new [`FilesystemStore`].
5069
pub fn new(data_dir: PathBuf) -> Self {
5170
let locks = Mutex::new(HashMap::new());
5271
let tmp_file_counter = AtomicUsize::new(0);
53-
let gc_counter = AtomicUsize::new(1);
54-
Self { data_dir, tmp_file_counter, gc_counter, locks }
72+
Self {
73+
inner: Arc::new(FilesystemStoreInner { data_dir, tmp_file_counter, locks }),
74+
#[cfg(feature = "tokio")]
75+
version_counter: AtomicU64::new(0),
76+
}
5577
}
5678

5779
/// Returns the data directory.
5880
pub fn get_data_dir(&self) -> PathBuf {
59-
self.data_dir.clone()
81+
self.inner.data_dir.clone()
6082
}
83+
}
6184

62-
fn garbage_collect_locks(&self) {
63-
let gc_counter = self.gc_counter.fetch_add(1, Ordering::AcqRel);
85+
impl KVStoreSync for FilesystemStore {
86+
fn read(
87+
&self, primary_namespace: String, secondary_namespace: String, key: String,
88+
) -> Result<Vec<u8>, lightning::io::Error> {
89+
self.inner.read(primary_namespace, secondary_namespace, key)
90+
}
6491

65-
if gc_counter % GC_LOCK_INTERVAL == 0 {
66-
// Take outer lock for the cleanup.
67-
let mut outer_lock = self.locks.lock().unwrap();
92+
fn write(
93+
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
94+
) -> Result<(), lightning::io::Error> {
95+
self.inner.write_version(primary_namespace, secondary_namespace, key, buf, None)
96+
}
6897

69-
// Garbage collect all lock entries that are not referenced anymore.
70-
outer_lock.retain(|_, v| Arc::strong_count(&v) > 1);
71-
}
98+
fn remove(
99+
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
100+
) -> Result<(), lightning::io::Error> {
101+
self.inner.remove(primary_namespace, secondary_namespace, key, lazy)
72102
}
73103

104+
fn list(
105+
&self, primary_namespace: String, secondary_namespace: String,
106+
) -> Result<Vec<String>, lightning::io::Error> {
107+
self.inner.list(primary_namespace, secondary_namespace)
108+
}
109+
}
110+
111+
impl FilesystemStoreInner {
74112
fn get_dest_dir_path(
75113
&self, primary_namespace: &str, secondary_namespace: &str,
76114
) -> std::io::Result<PathBuf> {
@@ -94,9 +132,7 @@ impl FilesystemStore {
94132

95133
Ok(dest_dir_path)
96134
}
97-
}
98135

99-
impl KVStoreSync for FilesystemStore {
100136
fn read(
101137
&self, primary_namespace: String, secondary_namespace: String, key: String,
102138
) -> lightning::io::Result<Vec<u8>> {
@@ -118,13 +154,14 @@ impl KVStoreSync for FilesystemStore {
118154
f.read_to_end(&mut buf)?;
119155
}
120156

121-
self.garbage_collect_locks();
122-
123157
Ok(buf)
124158
}
125159

126-
fn write(
160+
/// Writes a specific version of a key to the filesystem. If a newer version has been written already, this function
161+
/// returns early without writing.
162+
fn write_version(
127163
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
164+
version: Option<u64>,
128165
) -> lightning::io::Result<()> {
129166
check_namespace_key_validity(
130167
&primary_namespace,
@@ -164,7 +201,18 @@ impl KVStoreSync for FilesystemStore {
164201
let mut outer_lock = self.locks.lock().unwrap();
165202
Arc::clone(&outer_lock.entry(dest_file_path.clone()).or_default())
166203
};
167-
let _guard = inner_lock_ref.write().unwrap();
204+
let mut last_written_version = inner_lock_ref.write().unwrap();
205+
206+
// If a version is provided, we check if we already have a newer version written. This is used in async
207+
// contexts to realize eventual consistency.
208+
if let Some(version) = version {
209+
if version <= *last_written_version {
210+
// If the version is not greater, we don't write the file.
211+
return Ok(());
212+
}
213+
214+
*last_written_version = version;
215+
}
168216

169217
#[cfg(not(target_os = "windows"))]
170218
{
@@ -211,8 +259,6 @@ impl KVStoreSync for FilesystemStore {
211259
}
212260
};
213261

214-
self.garbage_collect_locks();
215-
216262
res
217263
}
218264

@@ -312,8 +358,6 @@ impl KVStoreSync for FilesystemStore {
312358
}
313359
}
314360

315-
self.garbage_collect_locks();
316-
317361
Ok(())
318362
}
319363

@@ -364,12 +408,75 @@ impl KVStoreSync for FilesystemStore {
364408
break 'retry_list;
365409
}
366410

367-
self.garbage_collect_locks();
368-
369411
Ok(keys)
370412
}
371413
}
372414

415+
#[cfg(feature = "tokio")]
416+
impl KVStore for FilesystemStore {
417+
fn read(
418+
&self, primary_namespace: String, secondary_namespace: String, key: String,
419+
) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, lightning::io::Error>> + 'static + Send>> {
420+
let this = Arc::clone(&self.inner);
421+
422+
Box::pin(async move {
423+
tokio::task::spawn_blocking(move || {
424+
this.read(primary_namespace, secondary_namespace, key)
425+
})
426+
.await
427+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
428+
})
429+
}
430+
431+
fn write(
432+
&self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec<u8>,
433+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
434+
let this = Arc::clone(&self.inner);
435+
436+
// Obtain a version number to retain the call sequence.
437+
let version = self.version_counter.fetch_add(1, Ordering::Relaxed);
438+
if version == u64::MAX {
439+
panic!("FilesystemStore version counter overflowed");
440+
}
441+
442+
Box::pin(async move {
443+
tokio::task::spawn_blocking(move || {
444+
this.write_version(primary_namespace, secondary_namespace, key, buf, Some(version))
445+
})
446+
.await
447+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
448+
})
449+
}
450+
451+
fn remove(
452+
&self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool,
453+
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
454+
let this = Arc::clone(&self.inner);
455+
456+
Box::pin(async move {
457+
tokio::task::spawn_blocking(move || {
458+
this.remove(primary_namespace, secondary_namespace, key, lazy)
459+
})
460+
.await
461+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
462+
})
463+
}
464+
465+
fn list(
466+
&self, primary_namespace: String, secondary_namespace: String,
467+
) -> Pin<Box<dyn Future<Output = Result<Vec<String>, lightning::io::Error>> + 'static + Send>> {
468+
let this = Arc::clone(&self.inner);
469+
470+
Box::pin(async move {
471+
tokio::task::spawn_blocking(move || this.list(primary_namespace, secondary_namespace))
472+
.await
473+
.unwrap_or_else(|e| {
474+
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
475+
})
476+
})
477+
}
478+
}
479+
373480
fn dir_entry_is_key(dir_entry: &fs::DirEntry) -> Result<bool, lightning::io::Error> {
374481
let p = dir_entry.path();
375482
if let Some(ext) = p.extension() {
@@ -460,7 +567,7 @@ fn get_key_from_dir_entry_path(p: &Path, base_path: &Path) -> Result<String, lig
460567

461568
impl MigratableKVStore for FilesystemStore {
462569
fn list_all_keys(&self) -> Result<Vec<(String, String, String)>, lightning::io::Error> {
463-
let prefixed_dest = &self.data_dir;
570+
let prefixed_dest = &self.inner.data_dir;
464571
if !prefixed_dest.exists() {
465572
return Ok(Vec::new());
466573
}
@@ -547,7 +654,7 @@ mod tests {
547654
fn drop(&mut self) {
548655
// We test for invalid directory names, so it's OK if directory removal
549656
// fails.
550-
match fs::remove_dir_all(&self.data_dir) {
657+
match fs::remove_dir_all(&self.inner.data_dir) {
551658
Err(e) => println!("Failed to remove test persister directory: {}", e),
552659
_ => {},
553660
}
@@ -562,6 +669,75 @@ mod tests {
562669
do_read_write_remove_list_persist(&fs_store);
563670
}
564671

672+
#[cfg(feature = "tokio")]
673+
#[tokio::test]
674+
async fn read_write_remove_list_persist_async() {
675+
use crate::fs_store::FilesystemStore;
676+
use lightning::util::persist::KVStore;
677+
use std::sync::Arc;
678+
679+
let mut temp_path = std::env::temp_dir();
680+
temp_path.push("test_read_write_remove_list_persist_async");
681+
let fs_store: Arc<dyn KVStore> = Arc::new(FilesystemStore::new(temp_path));
682+
683+
let data1 = vec![42u8; 32];
684+
let data2 = vec![43u8; 32];
685+
686+
let primary_namespace = "testspace";
687+
let secondary_namespace = "testsubspace";
688+
let key = "testkey";
689+
690+
// Test writing the same key twice with different data. Execute the asynchronous part out of order to ensure
691+
// that eventual consistency works.
692+
let fut1 = fs_store.write(
693+
primary_namespace.to_string(),
694+
secondary_namespace.to_string(),
695+
key.to_string(),
696+
data1,
697+
);
698+
let fut2 = fs_store.write(
699+
primary_namespace.to_string(),
700+
secondary_namespace.to_string(),
701+
key.to_string(),
702+
data2.clone(),
703+
);
704+
705+
fut2.await.unwrap();
706+
fut1.await.unwrap();
707+
708+
// Test list.
709+
let listed_keys = fs_store
710+
.list(primary_namespace.to_string(), secondary_namespace.to_string())
711+
.await
712+
.unwrap();
713+
assert_eq!(listed_keys.len(), 1);
714+
assert_eq!(listed_keys[0], key);
715+
716+
// Test read. We expect to read data2, as the write call was initiated later.
717+
let read_data = fs_store
718+
.read(primary_namespace.to_string(), secondary_namespace.to_string(), key.to_string())
719+
.await
720+
.unwrap();
721+
assert_eq!(data2, &*read_data);
722+
723+
// Test remove.
724+
fs_store
725+
.remove(
726+
primary_namespace.to_string(),
727+
secondary_namespace.to_string(),
728+
key.to_string(),
729+
false,
730+
)
731+
.await
732+
.unwrap();
733+
734+
let listed_keys = fs_store
735+
.list(primary_namespace.to_string(), secondary_namespace.to_string())
736+
.await
737+
.unwrap();
738+
assert_eq!(listed_keys.len(), 0);
739+
}
740+
565741
#[test]
566742
fn test_data_migration() {
567743
let mut source_temp_path = std::env::temp_dir();

0 commit comments

Comments
 (0)