diff --git a/Cargo.lock b/Cargo.lock index 5f1e6623..d6d33ad8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,6 +193,12 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "atomic_refcell" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41e67cd8309bbd06cd603a9e693a784ac2e5d1e955f11286e355089fcab3047c" + [[package]] name = "attohttpc" version = "0.24.1" @@ -1615,6 +1621,17 @@ dependencies = [ "web-sys", ] +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "libc", +] + [[package]] name = "ipconfig" version = "0.3.2" @@ -1728,6 +1745,7 @@ version = "0.91.0" dependencies = [ "anyhow", "arrayvec", + "atomic_refcell", "bao-tree", "bytes", "chrono", @@ -3972,17 +3990,19 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.1" +version = "1.46.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" +checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio", "parking_lot", "pin-project-lite", "signal-hook-registry", + "slab", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/Cargo.toml b/Cargo.toml index f7ff49b6..23b79141 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -58,6 +58,7 @@ testresult = "0.4.1" tracing-subscriber = { version = "0.3.19", features = ["fmt"] } tracing-test = "0.2.5" walkdir = "2.5.0" +atomic_refcell = "0.1.13" iroh = { version = "0.90", features = ["discovery-local-network"]} [features] diff --git a/src/api.rs b/src/api.rs index 56527994..c7994032 100644 --- a/src/api.rs +++ b/src/api.rs @@ -30,6 +30,7 @@ pub mod downloader; pub mod proto; pub mod remote; pub mod tags; +use crate::api::proto::WaitIdleRequest; pub use crate::{store::util::Tag, util::temp_tag::TempTag}; pub(crate) type ApiClient = irpc::Client; @@ -314,6 +315,7 @@ impl Store { Request::ClearProtected(msg) => local.send((msg, tx)), Request::SyncDb(msg) => local.send((msg, tx)), Request::Shutdown(msg) => local.send((msg, tx)), + Request::WaitIdle(msg) => local.send((msg, tx)), } }) }); @@ -332,6 +334,19 @@ impl Store { Ok(()) } + /// Waits for the store to become completely idle. + /// + /// This is mostly useful for tests, where you want to check that e.g. the + /// store has written all data to disk. + /// + /// Note that a store is not guaranteed to become idle, if it is being + /// interacted with concurrently. So this might wait forever. + pub async fn wait_idle(&self) -> irpc::Result<()> { + let msg = WaitIdleRequest; + self.client.rpc(msg).await?; + Ok(()) + } + pub(crate) fn from_sender(client: ApiClient) -> Self { Self { client } } diff --git a/src/api/proto.rs b/src/api/proto.rs index ed3686e1..7ff8e9d2 100644 --- a/src/api/proto.rs +++ b/src/api/proto.rs @@ -134,11 +134,16 @@ pub enum Request { #[rpc(tx = oneshot::Sender>)] SyncDb(SyncDbRequest), #[rpc(tx = oneshot::Sender<()>)] + WaitIdle(WaitIdleRequest), + #[rpc(tx = oneshot::Sender<()>)] Shutdown(ShutdownRequest), #[rpc(tx = oneshot::Sender>)] ClearProtected(ClearProtectedRequest), } +#[derive(Debug, Serialize, Deserialize)] +pub struct WaitIdleRequest; + #[derive(Debug, Serialize, Deserialize)] pub struct SyncDbRequest; diff --git a/src/api/remote.rs b/src/api/remote.rs index 9b010f69..47c3eea2 100644 --- a/src/api/remote.rs +++ b/src/api/remote.rs @@ -1064,8 +1064,15 @@ mod tests { use testresult::TestResult; use crate::{ + api::blobs::Blobs, protocol::{ChunkRangesSeq, GetRequest}, - store::fs::{tests::INTERESTING_SIZES, FsStore}, + store::{ + fs::{ + tests::{create_n0_bao, test_data, INTERESTING_SIZES}, + FsStore, + }, + mem::MemStore, + }, tests::{add_test_hash_seq, add_test_hash_seq_incomplete}, util::ChunkRangesExt, }; @@ -1117,6 +1124,38 @@ mod tests { Ok(()) } + async fn test_observe_partial(blobs: &Blobs) -> TestResult<()> { + let sizes = INTERESTING_SIZES; + for size in sizes { + let data = test_data(size); + let ranges = ChunkRanges::chunk(0); + let (hash, bao) = create_n0_bao(&data, &ranges)?; + blobs.import_bao_bytes(hash, ranges.clone(), bao).await?; + let bitfield = blobs.observe(hash).await?; + if size > 1024 { + assert_eq!(bitfield.ranges, ranges); + } else { + assert_eq!(bitfield.ranges, ChunkRanges::all()); + } + } + Ok(()) + } + + #[tokio::test] + async fn test_observe_partial_mem() -> TestResult<()> { + let store = MemStore::new(); + test_observe_partial(store.blobs()).await?; + Ok(()) + } + + #[tokio::test] + async fn test_observe_partial_fs() -> TestResult<()> { + let td = tempfile::tempdir()?; + let store = FsStore::load(td.path()).await?; + test_observe_partial(store.blobs()).await?; + Ok(()) + } + #[tokio::test] async fn test_local_info_hash_seq() -> TestResult<()> { let sizes = INTERESTING_SIZES; diff --git a/src/hash.rs b/src/hash.rs index 8190009a..006f4a9d 100644 --- a/src/hash.rs +++ b/src/hash.rs @@ -111,7 +111,7 @@ impl From<&[u8; 32]> for Hash { impl PartialOrd for Hash { fn partial_cmp(&self, other: &Self) -> Option { - Some(self.0.as_bytes().cmp(other.0.as_bytes())) + Some(self.cmp(other)) } } diff --git a/src/metrics.rs b/src/metrics.rs index c47fb6ea..0ff5cd2a 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -4,6 +4,7 @@ use iroh_metrics::{Counter, MetricsGroup}; /// Enum of metrics for the module #[allow(missing_docs)] +#[allow(dead_code)] #[derive(Debug, Default, MetricsGroup)] #[metrics(name = "iroh-blobs")] pub struct Metrics { diff --git a/src/store/fs.rs b/src/store/fs.rs index b0c1eb60..55cf7c67 100644 --- a/src/store/fs.rs +++ b/src/store/fs.rs @@ -64,35 +64,41 @@ //! safely shut down as well. Any store refs you are holding will be inoperable //! after this. use std::{ - collections::{HashMap, HashSet}, - fmt, fs, + fmt::{self, Debug}, + fs, future::Future, io::Write, num::NonZeroU64, ops::Deref, path::{Path, PathBuf}, - sync::Arc, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, }; use bao_tree::{ + blake3, io::{ mixed::{traverse_ranges_validated, EncodedItem, ReadBytesAt}, + outboard::PreOrderOutboard, sync::ReadAt, BaoContentItem, Leaf, }, - ChunkNum, ChunkRanges, + BaoTree, ChunkNum, ChunkRanges, }; use bytes::Bytes; use delete_set::{BaoFilePart, ProtectHandle}; +use entity_manager::{EntityManagerState, SpawnArg}; use entry_state::{DataLocation, OutboardLocation}; use gc::run_gc; use import::{ImportEntry, ImportSource}; use irpc::channel::mpsc; -use meta::{list_blobs, Snapshot}; +use meta::list_blobs; use n0_future::{future::yield_now, io}; use nested_enum_utils::enum_conversions; use range_collections::range_set::RangeSetRange; -use tokio::task::{Id, JoinError, JoinSet}; +use tokio::task::{JoinError, JoinSet}; use tracing::{error, instrument, trace}; use crate::{ @@ -106,8 +112,15 @@ use crate::{ ApiClient, }, store::{ + fs::{ + bao_file::{ + BaoFileStorage, BaoFileStorageSubscriber, CompleteStorage, DataReader, + OutboardReader, + }, + util::entity_manager::{self, ActiveEntityState}, + }, util::{BaoTreeSender, FixedSize, MemOrFile, ValueOrPoisioned}, - Hash, + Hash, IROH_BLOCK_SIZE, }, util::{ channel::oneshot, @@ -116,7 +129,7 @@ use crate::{ }, }; mod bao_file; -use bao_file::{BaoFileHandle, BaoFileHandleWeak}; +use bao_file::BaoFileHandle; mod delete_set; mod entry_state; mod import; @@ -185,8 +198,6 @@ struct TaskContext { pub db: meta::Db, // Handle to send internal commands pub internal_cmd_tx: tokio::sync::mpsc::Sender, - /// The file handle for the empty hash. - pub empty: BaoFileHandle, /// Handle to protect files from deletion. pub protect: ProtectHandle, } @@ -200,6 +211,24 @@ impl TaskContext { } } +#[derive(Debug)] +struct EmParams; + +impl entity_manager::Params for EmParams { + type EntityId = Hash; + + type GlobalState = Arc; + + type EntityState = BaoFileHandle; + + async fn on_shutdown( + state: entity_manager::ActiveEntityState, + _cause: entity_manager::ShutdownCause, + ) { + state.persist().await; + } +} + #[derive(Debug)] struct Actor { // Context that can be cheaply shared with tasks. @@ -210,227 +239,182 @@ struct Actor { fs_cmd_rx: tokio::sync::mpsc::Receiver, // Tasks for import and export operations. tasks: JoinSet<()>, - // Running tasks - running: HashSet, - // handles - handles: HashMap, + // Entity manager that handles concurrency for entities. + handles: EntityManagerState, // temp tags temp_tags: TempTags, + // waiters for idle state. + idle_waiters: Vec>, // our private tokio runtime. It has to live somewhere. _rt: RtWrapper, } -/// Wraps a slot and the task context. -/// -/// This contains everything a hash-specific task should need. -struct HashContext { - slot: Slot, - ctx: Arc, +type HashContext = ActiveEntityState; + +impl SyncEntityApi for HashContext { + /// Load the state from the database. + /// + /// If the state is Initial, this will start the load. + /// If it is Loading, it will wait until loading is done. + /// If it is any other state, it will be a noop. + async fn load(&self) { + enum Action { + Load, + Wait, + None, + } + let mut action = Action::None; + self.state.send_if_modified(|guard| match guard.deref() { + BaoFileStorage::Initial => { + *guard = BaoFileStorage::Loading; + action = Action::Load; + true + } + BaoFileStorage::Loading => { + action = Action::Wait; + false + } + _ => false, + }); + match action { + Action::Load => { + let state = if self.id == Hash::EMPTY { + BaoFileStorage::Complete(CompleteStorage { + data: MemOrFile::Mem(Bytes::new()), + outboard: MemOrFile::empty(), + }) + } else { + // we must assign a new state even in the error case, otherwise + // tasks waiting for loading would stall! + match self.global.db.get(self.id).await { + Ok(state) => match BaoFileStorage::open(state, self).await { + Ok(handle) => handle, + Err(_) => BaoFileStorage::Poisoned, + }, + Err(_) => BaoFileStorage::Poisoned, + } + }; + self.state.send_replace(state); + } + Action::Wait => { + // we are in state loading already, so we just need to wait for the + // other task to complete loading. + while matches!(self.state.borrow().deref(), BaoFileStorage::Loading) { + self.state.0.subscribe().changed().await.ok(); + } + } + Action::None => {} + } + } + + /// Write a batch and notify the db + async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()> { + trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); + let mut res = Ok(None); + self.state.send_if_modified(|state| { + let Ok((state1, update)) = state.take().write_batch(batch, bitfield, self) else { + res = Err(io::Error::other("write batch failed")); + return false; + }; + res = Ok(update); + *state = state1; + true + }); + if let Some(update) = res? { + self.global.db.update(self.id, update).await?; + } + Ok(()) + } + + /// An AsyncSliceReader for the data file. + /// + /// Caution: this is a reader for the unvalidated data file. Reading this + /// can produce data that does not match the hash. + #[allow(refining_impl_trait_internal)] + fn data_reader(&self) -> DataReader { + DataReader(self.state.clone()) + } + + /// An AsyncSliceReader for the outboard file. + /// + /// The outboard file is used to validate the data file. It is not guaranteed + /// to be complete. + #[allow(refining_impl_trait_internal)] + fn outboard_reader(&self) -> OutboardReader { + OutboardReader(self.state.clone()) + } + + /// The most precise known total size of the data file. + fn current_size(&self) -> io::Result { + match self.state.borrow().deref() { + BaoFileStorage::Complete(mem) => Ok(mem.size()), + BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), + BaoFileStorage::Partial(file) => file.current_size(), + BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => Err(io::Error::other("initial")), + BaoFileStorage::Loading => Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()), + } + } + + /// The most precise known total size of the data file. + fn bitfield(&self) -> io::Result { + match self.state.borrow().deref() { + BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), + BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), + BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), + BaoFileStorage::Poisoned => Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => Err(io::Error::other("initial")), + BaoFileStorage::Loading => Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => Err(io::ErrorKind::NotFound.into()), + } + } } impl HashContext { - pub fn db(&self) -> &meta::Db { - &self.ctx.db + /// The outboard for the file. + pub fn outboard(&self) -> io::Result> { + let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); + let outboard = self.outboard_reader(); + Ok(PreOrderOutboard { + root: blake3::Hash::from(self.id), + tree, + data: outboard, + }) } - pub fn options(&self) -> &Arc { - &self.ctx.options + fn db(&self) -> &meta::Db { + &self.global.db } - pub async fn lock(&self) -> tokio::sync::MutexGuard<'_, Option> { - self.slot.0.lock().await + pub fn options(&self) -> &Arc { + &self.global.options } - pub fn protect(&self, hash: Hash, parts: impl IntoIterator) { - self.ctx.protect.protect(hash, parts); + pub fn protect(&self, parts: impl IntoIterator) { + self.global.protect.protect(self.id, parts); } /// Update the entry state in the database, and wait for completion. - pub async fn update(&self, hash: Hash, state: EntryState) -> io::Result<()> { - let (tx, rx) = oneshot::channel(); - self.db() - .send( - meta::Update { - hash, - state, - tx: Some(tx), - span: tracing::Span::current(), - } - .into(), - ) - .await?; - rx.await.map_err(|_e| io::Error::other(""))??; + pub async fn update_await(&self, state: EntryState) -> io::Result<()> { + self.db().update_await(self.id, state).await?; Ok(()) } - pub async fn get_entry_state(&self, hash: Hash) -> io::Result>> { + pub async fn get_entry_state(&self) -> io::Result>> { + let hash = self.id; if hash == Hash::EMPTY { return Ok(Some(EntryState::Complete { data_location: DataLocation::Inline(Bytes::new()), outboard_location: OutboardLocation::NotNeeded, })); - } - let (tx, rx) = oneshot::channel(); - self.db() - .send( - meta::Get { - hash, - tx, - span: tracing::Span::current(), - } - .into(), - ) - .await - .ok(); - let res = rx.await.map_err(io::Error::other)?; - Ok(res.state?) + }; + self.db().get(hash).await } /// Update the entry state in the database, and wait for completion. - pub async fn set(&self, hash: Hash, state: EntryState) -> io::Result<()> { - let (tx, rx) = oneshot::channel(); - self.db() - .send( - meta::Set { - hash, - state, - tx, - span: tracing::Span::current(), - } - .into(), - ) - .await - .map_err(io::Error::other)?; - rx.await.map_err(|_e| io::Error::other(""))??; - Ok(()) - } - - pub async fn get_maybe_create(&self, hash: Hash, create: bool) -> api::Result { - if create { - self.get_or_create(hash).await - } else { - self.get(hash).await - } - } - - pub async fn get(&self, hash: Hash) -> api::Result { - if hash == Hash::EMPTY { - return Ok(self.ctx.empty.clone()); - } - let res = self - .slot - .get_or_create(|| async { - let res = self.db().get(hash).await.map_err(io::Error::other)?; - let res = match res { - Some(state) => open_bao_file(&hash, state, &self.ctx).await, - None => Err(io::Error::new(io::ErrorKind::NotFound, "hash not found")), - }; - Ok((res?, ())) - }) - .await - .map_err(api::Error::from); - let (res, _) = res?; - Ok(res) - } - - pub async fn get_or_create(&self, hash: Hash) -> api::Result { - if hash == Hash::EMPTY { - return Ok(self.ctx.empty.clone()); - } - let res = self - .slot - .get_or_create(|| async { - let res = self.db().get(hash).await.map_err(io::Error::other)?; - let res = match res { - Some(state) => open_bao_file(&hash, state, &self.ctx).await, - None => Ok(BaoFileHandle::new_partial_mem( - hash, - self.ctx.options.clone(), - )), - }; - Ok((res?, ())) - }) - .await - .map_err(api::Error::from); - trace!("{res:?}"); - let (res, _) = res?; - Ok(res) - } -} - -async fn open_bao_file( - hash: &Hash, - state: EntryState, - ctx: &TaskContext, -) -> io::Result { - let options = &ctx.options; - Ok(match state { - EntryState::Complete { - data_location, - outboard_location, - } => { - let data = match data_location { - DataLocation::Inline(data) => MemOrFile::Mem(data), - DataLocation::Owned(size) => { - let path = options.path.data_path(hash); - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, size)) - } - DataLocation::External(paths, size) => { - let Some(path) = paths.into_iter().next() else { - return Err(io::Error::other("no external data path")); - }; - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, size)) - } - }; - let outboard = match outboard_location { - OutboardLocation::NotNeeded => MemOrFile::empty(), - OutboardLocation::Inline(data) => MemOrFile::Mem(data), - OutboardLocation::Owned => { - let path = options.path.outboard_path(hash); - let file = fs::File::open(&path)?; - MemOrFile::File(file) - } - }; - BaoFileHandle::new_complete(*hash, data, outboard, options.clone()) - } - EntryState::Partial { .. } => BaoFileHandle::new_partial_file(*hash, ctx).await?, - }) -} - -/// An entry for each hash, containing a weak reference to a BaoFileHandle -/// wrapped in a tokio mutex so handle creation is sequential. -#[derive(Debug, Clone, Default)] -pub(crate) struct Slot(Arc>>); - -impl Slot { - pub async fn is_live(&self) -> bool { - let slot = self.0.lock().await; - slot.as_ref().map(|weak| !weak.is_dead()).unwrap_or(false) - } - - /// Get the handle if it exists and is still alive, otherwise load it from the database. - /// If there is nothing in the database, create a new in-memory handle. - /// - /// `make` will be called if the a live handle does not exist. - pub async fn get_or_create(&self, make: F) -> io::Result<(BaoFileHandle, T)> - where - F: FnOnce() -> Fut, - Fut: std::future::Future>, - T: Default, - { - let mut slot = self.0.lock().await; - if let Some(weak) = &*slot { - if let Some(handle) = weak.upgrade() { - return Ok((handle, Default::default())); - } - } - let handle = make().await; - if let Ok((handle, _)) = &handle { - *slot = Some(handle.downgrade()); - } - handle + pub async fn set(&self, state: EntryState) -> io::Result<()> { + self.db().set(self.id, state).await } } @@ -445,17 +429,12 @@ impl Actor { fn spawn(&mut self, fut: impl Future + Send + 'static) { let span = tracing::Span::current(); - let id = self.tasks.spawn(fut.instrument(span)).id(); - self.running.insert(id); + self.tasks.spawn(fut.instrument(span)); } - fn log_task_result(&mut self, res: Result<(Id, ()), JoinError>) { + fn log_task_result(res: Result<(), JoinError>) { match res { - Ok((id, _)) => { - // println!("task {id} finished"); - self.running.remove(&id); - // println!("{:?}", self.running); - } + Ok(_) => {} Err(e) => { error!("task failed: {e}"); } @@ -471,26 +450,6 @@ impl Actor { tx.send(tt).await.ok(); } - async fn clear_dead_handles(&mut self) { - let mut to_remove = Vec::new(); - for (hash, slot) in &self.handles { - if !slot.is_live().await { - to_remove.push(*hash); - } - } - for hash in to_remove { - if let Some(slot) = self.handles.remove(&hash) { - // do a quick check if the handle has become alive in the meantime, and reinsert it - let guard = slot.0.lock().await; - let is_live = guard.as_ref().map(|x| !x.is_dead()).unwrap_or_default(); - if is_live { - drop(guard); - self.handles.insert(hash, slot); - } - } - } - } - async fn handle_command(&mut self, cmd: Command) { let span = cmd.parent_span(); let _entered = span.enter(); @@ -499,6 +458,16 @@ impl Actor { trace!("{cmd:?}"); self.db().send(cmd.into()).await.ok(); } + Command::WaitIdle(cmd) => { + trace!("{cmd:?}"); + if self.tasks.is_empty() { + // we are currently idle + cmd.tx.send(()).await.ok(); + } else { + // wait for idle state + self.idle_waiters.push(cmd.tx); + } + } Command::Shutdown(cmd) => { trace!("{cmd:?}"); self.db().send(cmd.into()).await.ok(); @@ -525,34 +494,22 @@ impl Actor { } Command::ClearProtected(cmd) => { trace!("{cmd:?}"); - self.clear_dead_handles().await; self.db().send(cmd.into()).await.ok(); } Command::BlobStatus(cmd) => { trace!("{cmd:?}"); self.db().send(cmd.into()).await.ok(); } + Command::DeleteBlobs(cmd) => { + trace!("{cmd:?}"); + self.db().send(cmd.into()).await.ok(); + } Command::ListBlobs(cmd) => { trace!("{cmd:?}"); - let (tx, rx) = tokio::sync::oneshot::channel(); - self.db() - .send( - Snapshot { - tx, - span: cmd.span.clone(), - } - .into(), - ) - .await - .ok(); - if let Ok(snapshot) = rx.await { + if let Ok(snapshot) = self.db().snapshot(cmd.span.clone()).await { self.spawn(list_blobs(snapshot, cmd)); } } - Command::DeleteBlobs(cmd) => { - trace!("{cmd:?}"); - self.db().send(cmd.into()).await.ok(); - } Command::Batch(cmd) => { trace!("{cmd:?}"); let (id, scope) = self.temp_tags.create_scope(); @@ -581,40 +538,27 @@ impl Actor { } Command::ExportPath(cmd) => { trace!("{cmd:?}"); - let ctx = self.hash_context(cmd.hash); - self.spawn(export_path(cmd, ctx)); + cmd.spawn(&mut self.handles, &mut self.tasks).await; } Command::ExportBao(cmd) => { trace!("{cmd:?}"); - let ctx = self.hash_context(cmd.hash); - self.spawn(export_bao(cmd, ctx)); + cmd.spawn(&mut self.handles, &mut self.tasks).await; } Command::ExportRanges(cmd) => { trace!("{cmd:?}"); - let ctx = self.hash_context(cmd.hash); - self.spawn(export_ranges(cmd, ctx)); + cmd.spawn(&mut self.handles, &mut self.tasks).await; } Command::ImportBao(cmd) => { trace!("{cmd:?}"); - let ctx = self.hash_context(cmd.hash); - self.spawn(import_bao(cmd, ctx)); + cmd.spawn(&mut self.handles, &mut self.tasks).await; } Command::Observe(cmd) => { trace!("{cmd:?}"); - let ctx = self.hash_context(cmd.hash); - self.spawn(observe(cmd, ctx)); + cmd.spawn(&mut self.handles, &mut self.tasks).await; } } } - /// Create a hash context for a given hash. - fn hash_context(&mut self, hash: Hash) -> HashContext { - HashContext { - slot: self.handles.entry(hash).or_default().clone(), - ctx: self.context.clone(), - } - } - async fn handle_fs_command(&mut self, cmd: InternalCommand) { let span = cmd.parent_span(); let _entered = span.enter(); @@ -642,8 +586,7 @@ impl Actor { format: cmd.format, }, ); - let ctx = self.hash_context(cmd.hash); - self.spawn(finish_import(cmd, tt, ctx)); + (tt, cmd).spawn(&mut self.handles, &mut self.tasks).await; } } } @@ -652,6 +595,11 @@ impl Actor { async fn run(mut self) { loop { tokio::select! { + task = self.handles.tick() => { + if let Some(task) = task { + self.spawn(task); + } + } cmd = self.cmd_rx.recv() => { let Some(cmd) = cmd else { break; @@ -661,11 +609,20 @@ impl Actor { Some(cmd) = self.fs_cmd_rx.recv() => { self.handle_fs_command(cmd).await; } - Some(res) = self.tasks.join_next_with_id(), if !self.tasks.is_empty() => { - self.log_task_result(res); + Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => { + Self::log_task_result(res); + if self.tasks.is_empty() { + for tx in self.idle_waiters.drain(..) { + tx.send(()).await.ok(); + } + } } } } + self.handles.shutdown().await; + while let Some(res) = self.tasks.join_next().await { + Self::log_task_result(res); + } } async fn new( @@ -698,28 +655,149 @@ impl Actor { options: options.clone(), db: meta::Db::new(db_send), internal_cmd_tx: fs_commands_tx, - empty: BaoFileHandle::new_complete( - Hash::EMPTY, - MemOrFile::empty(), - MemOrFile::empty(), - options, - ), protect, }); rt.spawn(db_actor.run()); Ok(Self { - context: slot_context, + context: slot_context.clone(), cmd_rx, fs_cmd_rx: fs_commands_rx, tasks: JoinSet::new(), - running: HashSet::new(), - handles: Default::default(), + handles: EntityManagerState::new(slot_context, 1024, 32, 32, 2), temp_tags: Default::default(), + idle_waiters: Vec::new(), _rt: rt, }) } } +trait HashSpecificCommand: HashSpecific + Send + 'static { + /// Handle the command on success by spawning a task into the per-hash context. + fn handle(self, ctx: HashContext) -> impl Future + Send + 'static; + + /// Opportunity to send an error if spawning fails due to the task being busy (inbox full) + /// or dead (e.g. panic in one of the running tasks). + fn on_error(self, arg: SpawnArg) -> impl Future + Send + 'static; + + async fn spawn( + self, + manager: &mut entity_manager::EntityManagerState, + tasks: &mut JoinSet<()>, + ) where + Self: Sized, + { + let span = tracing::Span::current(); + let task = manager + .spawn(self.hash(), |arg| { + async move { + match arg { + SpawnArg::Active(state) => { + self.handle(state).await; + } + SpawnArg::Busy => { + self.on_error(arg).await; + } + SpawnArg::Dead => { + self.on_error(arg).await; + } + } + } + .instrument(span) + }) + .await; + if let Some(task) = task { + tasks.spawn(task); + } + } +} + +impl HashSpecificCommand for ObserveMsg { + async fn handle(self, ctx: HashContext) { + ctx.observe(self).await + } + async fn on_error(self, _arg: SpawnArg) {} +} +impl HashSpecificCommand for ExportPathMsg { + async fn handle(self, ctx: HashContext) { + ctx.export_path(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(ExportProgressItem::Error(api::Error::Io(err))) + .await + .ok(); + } +} +impl HashSpecificCommand for ExportBaoMsg { + async fn handle(self, ctx: HashContext) { + ctx.export_bao(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(EncodedItem::Error(bao_tree::io::EncodeError::Io(err))) + .await + .ok(); + } +} +impl HashSpecificCommand for ExportRangesMsg { + async fn handle(self, ctx: HashContext) { + ctx.export_ranges(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx + .send(ExportRangesItem::Error(api::Error::Io(err))) + .await + .ok(); + } +} +impl HashSpecificCommand for ImportBaoMsg { + async fn handle(self, ctx: HashContext) { + ctx.import_bao(self).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.tx.send(Err(api::Error::Io(err))).await.ok(); + } +} +impl HashSpecific for (TempTag, ImportEntryMsg) { + fn hash(&self) -> Hash { + self.1.hash() + } +} +impl HashSpecificCommand for (TempTag, ImportEntryMsg) { + async fn handle(self, ctx: HashContext) { + let (tt, cmd) = self; + ctx.finish_import(cmd, tt).await + } + async fn on_error(self, arg: SpawnArg) { + let err = match arg { + SpawnArg::Busy => io::ErrorKind::ResourceBusy.into(), + SpawnArg::Dead => io::Error::other("entity is dead"), + _ => unreachable!(), + }; + self.1.tx.send(AddProgressItem::Error(err)).await.ok(); + } +} + struct RtWrapper(Option); impl From for RtWrapper { @@ -774,24 +852,156 @@ async fn handle_batch_impl(cmd: BatchMsg, id: Scope, scope: &Arc) Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn finish_import(cmd: ImportEntryMsg, mut tt: TempTag, ctx: HashContext) { - let res = match finish_import_impl(cmd.inner, ctx).await { - Ok(()) => { - // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag - // it will be cleaned up when either the process exits or scope ends - if cmd.tx.is_rpc() { - trace!("leaking temp tag {}", tt.hash_and_format()); - tt.leak(); - } - AddProgressItem::Done(tt) +/// The minimal API you need to implement for an entity for a store to work. +trait EntityApi { + /// Import from a stream of n0 bao encoded data. + async fn import_bao(&self, cmd: ImportBaoMsg); + /// Finish an import from a local file or memory. + async fn finish_import(&self, cmd: ImportEntryMsg, tt: TempTag); + /// Observe the bitfield of the entry. + async fn observe(&self, cmd: ObserveMsg); + /// Export byte ranges of the entry as data + async fn export_ranges(&self, cmd: ExportRangesMsg); + /// Export chunk ranges of the entry as a n0 bao encoded stream. + async fn export_bao(&self, cmd: ExportBaoMsg); + /// Export the entry to a local file. + async fn export_path(&self, cmd: ExportPathMsg); + /// Persist the entry at the end of its lifecycle. + async fn persist(&self); +} + +/// A more opinionated API that can be used as a helper to save implementation +/// effort when implementing the EntityApi trait. +trait SyncEntityApi: EntityApi { + /// Load the entry state from the database. This must make sure that it is + /// not run concurrently, so if load is called multiple times, all but one + /// must wait. You can use a tokio::sync::OnceCell or similar to achieve this. + async fn load(&self); + + /// Get a synchronous reader for the data file. + fn data_reader(&self) -> impl ReadBytesAt; + + /// Get a synchronous reader for the outboard file. + fn outboard_reader(&self) -> impl ReadAt; + + /// Get the best known size of the data file. + fn current_size(&self) -> io::Result; + + /// Get the bitfield of the entry. + fn bitfield(&self) -> io::Result; + + /// Write a batch of content items to the entry. + async fn write_batch(&self, batch: &[BaoContentItem], bitfield: &Bitfield) -> io::Result<()>; +} + +/// The high level entry point per entry. +impl EntityApi for HashContext { + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn import_bao(&self, cmd: ImportBaoMsg) { + trace!("{cmd:?}"); + self.load().await; + let ImportBaoMsg { + inner: ImportBaoRequest { size, .. }, + rx, + tx, + .. + } = cmd; + let res = import_bao_impl(self, size, rx).await; + trace!("{res:?}"); + tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn observe(&self, cmd: ObserveMsg) { + trace!("{cmd:?}"); + self.load().await; + BaoFileStorageSubscriber::new(self.state.subscribe()) + .forward(cmd.tx) + .await + .ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_ranges(&self, mut cmd: ExportRangesMsg) { + trace!("{cmd:?}"); + self.load().await; + if let Err(cause) = export_ranges_impl(self, cmd.inner, &mut cmd.tx).await { + cmd.tx + .send(ExportRangesItem::Error(cause.into())) + .await + .ok(); } - Err(cause) => AddProgressItem::Error(cause), - }; - cmd.tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_bao(&self, mut cmd: ExportBaoMsg) { + trace!("{cmd:?}"); + self.load().await; + if let Err(cause) = export_bao_impl(self, cmd.inner, &mut cmd.tx).await { + // if the entry is in state NonExisting, this will be an io error with + // kind NotFound. So we must not wrap this somehow but pass it on directly. + cmd.tx + .send(bao_tree::io::EncodeError::Io(cause).into()) + .await + .ok(); + } + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn export_path(&self, cmd: ExportPathMsg) { + trace!("{cmd:?}"); + self.load().await; + let ExportPathMsg { inner, mut tx, .. } = cmd; + if let Err(cause) = export_path_impl(self, inner, &mut tx).await { + tx.send(cause.into()).await.ok(); + } + } + + #[instrument(skip_all, fields(hash = %cmd.hash_short()))] + async fn finish_import(&self, cmd: ImportEntryMsg, mut tt: TempTag) { + trace!("{cmd:?}"); + self.load().await; + let res = match finish_import_impl(self, cmd.inner).await { + Ok(()) => { + // for a remote call, we can't have the on_drop callback, so we have to leak the temp tag + // it will be cleaned up when either the process exits or scope ends + if cmd.tx.is_rpc() { + trace!("leaking temp tag {}", tt.hash_and_format()); + tt.leak(); + } + AddProgressItem::Done(tt) + } + Err(cause) => AddProgressItem::Error(cause), + }; + cmd.tx.send(res).await.ok(); + } + + #[instrument(skip_all, fields(hash = %self.id.fmt_short()))] + async fn persist(&self) { + self.state.send_if_modified(|guard| { + let hash = &self.id; + let BaoFileStorage::Partial(fs) = guard.take() else { + return false; + }; + let path = self.global.options.path.bitfield_path(hash); + trace!("writing bitfield for hash {} to {}", hash, path.display()); + if let Err(cause) = fs.sync_all(&path) { + error!( + "failed to write bitfield for {} at {}: {:?}", + hash, + path.display(), + cause + ); + } + false + }); + } } -async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::Result<()> { +async fn finish_import_impl(ctx: &HashContext, import_data: ImportEntry) -> io::Result<()> { + if ctx.id == Hash::EMPTY { + return Ok(()); // nothing to do for the empty hash + } let ImportEntry { source, hash, @@ -810,14 +1020,14 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R debug_assert!(!options.is_inlined_data(*size)); } } - let guard = ctx.lock().await; - let handle = guard.as_ref().and_then(|x| x.upgrade()); + ctx.load().await; + let handle = &ctx.state; // if I do have an existing handle, I have to possibly deal with observers. // if I don't have an existing handle, there are 2 cases: // the entry exists in the db, but we don't have a handle // the entry does not exist at all. // convert the import source to a data location and drop the open files - ctx.protect(hash, [BaoFilePart::Data, BaoFilePart::Outboard]); + ctx.protect([BaoFilePart::Data, BaoFilePart::Outboard]); let data_location = match source { ImportSource::Memory(data) => DataLocation::Inline(data), ImportSource::External(path, _file, size) => DataLocation::External(vec![path], size), @@ -861,58 +1071,39 @@ async fn finish_import_impl(import_data: ImportEntry, ctx: HashContext) -> io::R OutboardLocation::Owned } }; - if let Some(handle) = handle { - let data = match &data_location { - DataLocation::Inline(data) => MemOrFile::Mem(data.clone()), - DataLocation::Owned(size) => { - let path = ctx.options().path.data_path(&hash); - let file = fs::File::open(&path)?; - MemOrFile::File(FixedSize::new(file, *size)) - } - DataLocation::External(paths, size) => { - let Some(path) = paths.iter().next() else { - return Err(io::Error::other("no external data path")); - }; - let file = fs::File::open(path)?; - MemOrFile::File(FixedSize::new(file, *size)) - } - }; - let outboard = match &outboard_location { - OutboardLocation::NotNeeded => MemOrFile::empty(), - OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()), - OutboardLocation::Owned => { - let path = ctx.options().path.outboard_path(&hash); - let file = fs::File::open(&path)?; - MemOrFile::File(file) - } - }; - handle.complete(data, outboard); - } + let data = match &data_location { + DataLocation::Inline(data) => MemOrFile::Mem(data.clone()), + DataLocation::Owned(size) => { + let path = ctx.options().path.data_path(&hash); + let file = fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, *size)) + } + DataLocation::External(paths, size) => { + let Some(path) = paths.iter().next() else { + return Err(io::Error::other("no external data path")); + }; + let file = fs::File::open(path)?; + MemOrFile::File(FixedSize::new(file, *size)) + } + }; + let outboard = match &outboard_location { + OutboardLocation::NotNeeded => MemOrFile::empty(), + OutboardLocation::Inline(data) => MemOrFile::Mem(data.clone()), + OutboardLocation::Owned => { + let path = ctx.options().path.outboard_path(&hash); + let file = fs::File::open(&path)?; + MemOrFile::File(file) + } + }; + handle.complete(data, outboard); let state = EntryState::Complete { data_location, outboard_location, }; - ctx.update(hash, state).await?; + ctx.update_await(state).await?; Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn import_bao(cmd: ImportBaoMsg, ctx: HashContext) { - trace!("{cmd:?}"); - let ImportBaoMsg { - inner: ImportBaoRequest { size, hash }, - rx, - tx, - .. - } = cmd; - let res = match ctx.get_or_create(hash).await { - Ok(handle) => import_bao_impl(size, rx, handle, ctx).await, - Err(cause) => Err(cause), - }; - trace!("{res:?}"); - tx.send(res).await.ok(); -} - fn chunk_range(leaf: &Leaf) -> ChunkRanges { let start = ChunkNum::chunks(leaf.offset); let end = ChunkNum::chunks(leaf.offset + leaf.data.len() as u64); @@ -920,23 +1111,18 @@ fn chunk_range(leaf: &Leaf) -> ChunkRanges { } async fn import_bao_impl( + ctx: &HashContext, size: NonZeroU64, mut rx: mpsc::Receiver, - handle: BaoFileHandle, - ctx: HashContext, ) -> api::Result<()> { - trace!( - "importing bao: {} {} bytes", - handle.hash().fmt_short(), - size - ); + trace!("importing bao: {} {} bytes", ctx.id.fmt_short(), size); let mut batch = Vec::::new(); let mut ranges = ChunkRanges::empty(); while let Some(item) = rx.recv().await? { // if the batch is not empty, the last item is a leaf and the current item is a parent, write the batch if !batch.is_empty() && batch[batch.len() - 1].is_leaf() && item.is_parent() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx.ctx).await?; + ctx.write_batch(&batch, &bitfield).await?; batch.clear(); ranges = ChunkRanges::empty(); } @@ -952,49 +1138,23 @@ async fn import_bao_impl( } if !batch.is_empty() { let bitfield = Bitfield::new_unchecked(ranges, size.into()); - handle.write_batch(&batch, &bitfield, &ctx.ctx).await?; + ctx.write_batch(&batch, &bitfield).await?; } Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn observe(cmd: ObserveMsg, ctx: HashContext) { - let Ok(handle) = ctx.get_or_create(cmd.hash).await else { - return; - }; - handle.subscribe().forward(cmd.tx).await.ok(); -} - -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_ranges(mut cmd: ExportRangesMsg, ctx: HashContext) { - match ctx.get(cmd.hash).await { - Ok(handle) => { - if let Err(cause) = export_ranges_impl(cmd.inner, &mut cmd.tx, handle).await { - cmd.tx - .send(ExportRangesItem::Error(cause.into())) - .await - .ok(); - } - } - Err(cause) => { - cmd.tx.send(ExportRangesItem::Error(cause)).await.ok(); - } - } -} - async fn export_ranges_impl( + ctx: &HashContext, cmd: ExportRangesRequest, tx: &mut mpsc::Sender, - handle: BaoFileHandle, ) -> io::Result<()> { let ExportRangesRequest { ranges, hash } = cmd; trace!( - "export_ranges: exporting ranges: {hash} {ranges:?} size={}", - handle.current_size()? + "exporting ranges: {hash} {ranges:?} size={}", + ctx.current_size()? ); - debug_assert!(handle.hash() == hash, "hash mismatch"); - let bitfield = handle.bitfield()?; - let data = handle.data_reader(); + let bitfield = ctx.bitfield()?; + let data = ctx.data_reader(); let size = bitfield.size(); for range in ranges.iter() { let range = match range { @@ -1024,59 +1184,29 @@ async fn export_ranges_impl( Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_bao(mut cmd: ExportBaoMsg, ctx: HashContext) { - match ctx.get_maybe_create(cmd.hash, false).await { - Ok(handle) => { - if let Err(cause) = export_bao_impl(cmd.inner, &mut cmd.tx, handle).await { - cmd.tx - .send(bao_tree::io::EncodeError::Io(io::Error::other(cause)).into()) - .await - .ok(); - } - } - Err(cause) => { - let crate::api::Error::Io(cause) = cause; - cmd.tx - .send(bao_tree::io::EncodeError::Io(cause).into()) - .await - .ok(); - } - } -} - async fn export_bao_impl( + ctx: &HashContext, cmd: ExportBaoRequest, tx: &mut mpsc::Sender, - handle: BaoFileHandle, -) -> anyhow::Result<()> { +) -> io::Result<()> { let ExportBaoRequest { ranges, hash, .. } = cmd; - debug_assert!(handle.hash() == hash, "hash mismatch"); - let outboard = handle.outboard()?; + let outboard = ctx.outboard()?; let size = outboard.tree.size(); - if size == 0 && hash != Hash::EMPTY { + if size == 0 && cmd.hash != Hash::EMPTY { // we have no data whatsoever, so we stop here return Ok(()); } trace!("exporting bao: {hash} {ranges:?} size={size}",); - let data = handle.data_reader(); + let data = ctx.data_reader(); let tx = BaoTreeSender::new(tx); traverse_ranges_validated(data, outboard, &ranges, tx).await?; Ok(()) } -#[instrument(skip_all, fields(hash = %cmd.hash_short()))] -async fn export_path(cmd: ExportPathMsg, ctx: HashContext) { - let ExportPathMsg { inner, mut tx, .. } = cmd; - if let Err(cause) = export_path_impl(inner, &mut tx, ctx).await { - tx.send(cause.into()).await.ok(); - } -} - async fn export_path_impl( + ctx: &HashContext, cmd: ExportPathRequest, tx: &mut mpsc::Sender, - ctx: HashContext, ) -> api::Result<()> { let ExportPathRequest { mode, target, .. } = cmd; if !target.is_absolute() { @@ -1088,8 +1218,7 @@ async fn export_path_impl( if let Some(parent) = target.parent() { fs::create_dir_all(parent)?; } - let _guard = ctx.lock().await; - let state = ctx.get_entry_state(cmd.hash).await?; + let state = ctx.get_entry_state().await?; let (data_location, outboard_location) = match state { Some(EntryState::Complete { data_location, @@ -1151,13 +1280,10 @@ async fn export_path_impl( } } } - ctx.set( - cmd.hash, - EntryState::Complete { - data_location: DataLocation::External(vec![target], size), - outboard_location, - }, - ) + ctx.set(EntryState::Complete { + data_location: DataLocation::External(vec![target], size), + outboard_location, + }) .await?; } }, @@ -1201,8 +1327,14 @@ impl FsStore { /// Load or create a new store with custom options, returning an additional sender for file store specific commands. pub async fn load_with_opts(db_path: PathBuf, options: Options) -> anyhow::Result { + static THREAD_NR: AtomicU64 = AtomicU64::new(0); let rt = tokio::runtime::Builder::new_multi_thread() - .thread_name("iroh-blob-store") + .thread_name_fn(|| { + format!( + "iroh-blob-store-{}", + THREAD_NR.fetch_add(1, Ordering::SeqCst) + ) + }) .enable_time() .build()?; let handle = rt.handle().clone(); @@ -1420,7 +1552,7 @@ pub mod tests { // import data via import_bytes, check that we can observe it and that it is complete #[tokio::test] - async fn test_import_bytes() -> TestResult<()> { + async fn test_import_bytes_simple() -> TestResult<()> { tracing_subscriber::fmt::try_init().ok(); let testdir = tempfile::tempdir()?; let db_dir = testdir.path().join("db"); @@ -1851,8 +1983,11 @@ pub mod tests { .await; assert!(tts.contains(tt1.hash_and_format())); assert!(tts.contains(tt2.hash_and_format())); + println!("dropping batch"); drop(batch); store.sync_db().await?; + store.wait_idle().await?; + println!("reading temp tags after batch drop"); let tts = store .tags() .list_temp_tags() @@ -1951,7 +2086,6 @@ pub mod tests { if path.is_file() { if let Some(file_ext) = path.extension() { if file_ext.to_string_lossy().to_lowercase() == ext { - println!("Deleting: {}", path.display()); fs::remove_file(path)?; } } diff --git a/src/store/fs/bao_file.rs b/src/store/fs/bao_file.rs index 410317c2..d0217040 100644 --- a/src/store/fs/bao_file.rs +++ b/src/store/fs/bao_file.rs @@ -4,7 +4,6 @@ use std::{ io, ops::Deref, path::Path, - sync::{Arc, Weak}, }; use bao_tree::{ @@ -21,24 +20,20 @@ use bytes::{Bytes, BytesMut}; use derive_more::Debug; use irpc::channel::mpsc; use tokio::sync::watch; -use tracing::{debug, error, info, trace, Span}; +use tracing::{debug, info, trace}; use super::{ entry_state::{DataLocation, EntryState, OutboardLocation}, - meta::Update, options::{Options, PathOptions}, BaoFilePart, }; use crate::{ api::blobs::Bitfield, store::{ - fs::{ - meta::{raw_outboard_size, Set}, - TaskContext, - }, + fs::{meta::raw_outboard_size, util::entity_manager, HashContext}, util::{ read_checksummed_and_truncate, write_checksummed, FixedSize, MemOrFile, - PartialMemStorage, SizeInfo, SparseMemFile, DD, + PartialMemStorage, DD, }, Hash, IROH_BLOCK_SIZE, }, @@ -147,7 +142,7 @@ impl PartialFileStorage { &self.bitfield } - fn sync_all(&self, bitfield_path: &Path) -> io::Result<()> { + pub(super) fn sync_all(&self, bitfield_path: &Path) -> io::Result<()> { self.data.sync_all()?; self.outboard.sync_all()?; self.sizes.sync_all()?; @@ -240,7 +235,7 @@ impl PartialFileStorage { )) } - fn current_size(&self) -> io::Result { + pub(super) fn current_size(&self) -> io::Result { read_size(&self.sizes) } @@ -290,8 +285,24 @@ fn read_size(size_file: &File) -> io::Result { } /// The storage for a bao file. This can be either in memory or on disk. -#[derive(derive_more::From)] +/// +/// The two initial states `Initial` and `Loading` are used to coordinate the +/// loading of the entry from the metadata database. Once that is complete, +/// you should never see these states again. +/// +/// From the remaining states you can get into `Poisoned` if there is an +/// IO error during an operation. +/// +/// `Poisioned` is also used once the handle is persisted and no longer usable. +#[derive(derive_more::From, Default)] pub(crate) enum BaoFileStorage { + /// Initial state, we don't know anything yet. + #[default] + Initial, + /// Currently loading the entry from the metadata. + Loading, + /// There is no info about this hash in the metadata db. + NonExisting, /// The entry is incomplete and in memory. /// /// Since it is incomplete, it must be writeable. @@ -309,13 +320,8 @@ pub(crate) enum BaoFileStorage { /// /// Writing to this is a no-op, since it is already complete. Complete(CompleteStorage), - /// We will get into that state if there is an io error in the middle of an operation - /// - /// Also, when the handle is dropped we will poison the storage, so poisoned - /// can be seen when the handle is revived during the drop. - /// - /// BaoFileHandleWeak::upgrade() will return None if the storage is poisoned, - /// treat it as dead. + /// We will get into that state if there is an io error in the middle of an operation, + /// or after the handle is persisted and no longer usable. Poisoned, } @@ -326,31 +332,26 @@ impl fmt::Debug for BaoFileStorage { BaoFileStorage::Partial(x) => x.fmt(f), BaoFileStorage::Complete(x) => x.fmt(f), BaoFileStorage::Poisoned => f.debug_struct("Poisoned").finish(), + BaoFileStorage::Initial => f.debug_struct("Initial").finish(), + BaoFileStorage::Loading => f.debug_struct("Loading").finish(), + BaoFileStorage::NonExisting => f.debug_struct("NonExisting").finish(), } } } -impl Default for BaoFileStorage { - fn default() -> Self { - BaoFileStorage::Complete(Default::default()) - } -} - impl PartialMemStorage { /// Converts this storage into a complete storage, using the given hash for /// path names and the given options for decisions about inlining. - fn into_complete( - self, - hash: &Hash, - ctx: &TaskContext, - ) -> io::Result<(CompleteStorage, EntryState)> { + fn into_complete(self, ctx: &HashContext) -> io::Result<(CompleteStorage, EntryState)> { + let options = &ctx.global.options; + let hash = &ctx.id; let size = self.current_size(); let outboard_size = raw_outboard_size(size); - let (data, data_location) = if ctx.options.is_inlined_data(size) { + let (data, data_location) = if options.is_inlined_data(size) { let data: Bytes = self.data.to_vec().into(); (MemOrFile::Mem(data.clone()), DataLocation::Inline(data)) } else { - let data_path = ctx.options.path.data_path(hash); + let data_path = options.path.data_path(hash); let mut data_file = create_read_write(&data_path)?; self.data.persist(&mut data_file)?; ( @@ -358,7 +359,8 @@ impl PartialMemStorage { DataLocation::Owned(size), ) }; - let (outboard, outboard_location) = if ctx.options.is_inlined_outboard(outboard_size) { + let (outboard, outboard_location) = if ctx.global.options.is_inlined_outboard(outboard_size) + { if outboard_size > 0 { let outboard: Bytes = self.outboard.to_vec().into(); ( @@ -369,7 +371,7 @@ impl PartialMemStorage { (MemOrFile::empty(), OutboardLocation::NotNeeded) } } else { - let outboard_path = ctx.options.path.outboard_path(hash); + let outboard_path = ctx.global.options.path.outboard_path(hash); let mut outboard_file = create_read_write(&outboard_path)?; self.outboard.persist(&mut outboard_file)?; let outboard_location = if outboard_size == 0 { @@ -392,34 +394,43 @@ impl PartialMemStorage { impl BaoFileStorage { pub fn bitfield(&self) -> Bitfield { match self { - BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()), + BaoFileStorage::Initial => { + panic!("initial storage should not be used") + } + BaoFileStorage::Loading => { + panic!("loading storage should not be used") + } + BaoFileStorage::NonExisting => Bitfield::empty(), BaoFileStorage::PartialMem(x) => x.bitfield.clone(), BaoFileStorage::Partial(x) => x.bitfield.clone(), + BaoFileStorage::Complete(x) => Bitfield::complete(x.data.size()), BaoFileStorage::Poisoned => { panic!("poisoned storage should not be used") } } } - fn write_batch( + pub(super) fn write_batch( self, batch: &[BaoContentItem], bitfield: &Bitfield, - ctx: &TaskContext, - hash: &Hash, + ctx: &HashContext, ) -> io::Result<(Self, Option>)> { Ok(match self { + BaoFileStorage::NonExisting => { + Self::new_partial_mem().write_batch(batch, bitfield, ctx)? + } BaoFileStorage::PartialMem(mut ms) => { // check if we need to switch to file mode, otherwise write to memory - if max_offset(batch) <= ctx.options.inline.max_data_inlined { + if max_offset(batch) <= ctx.global.options.inline.max_data_inlined { ms.write_batch(bitfield.size(), batch)?; let changes = ms.bitfield.update(bitfield); let new = changes.new_state(); if new.complete { - let (cs, update) = ms.into_complete(hash, ctx)?; + let (cs, update) = ms.into_complete(ctx)?; (cs.into(), Some(update)) } else { - let fs = ms.persist(ctx, hash)?; + let fs = ms.persist(ctx)?; let update = EntryState::Partial { size: new.validated_size, }; @@ -432,13 +443,13 @@ impl BaoFileStorage { // a write at the end of a very large file. // // opt: we should check if we become complete to avoid going from mem to partial to complete - let mut fs = ms.persist(ctx, hash)?; + let mut fs = ms.persist(ctx)?; fs.write_batch(bitfield.size(), batch)?; let changes = fs.bitfield.update(bitfield); let new = changes.new_state(); if new.complete { let size = new.validated_size.unwrap(); - let (cs, update) = fs.into_complete(size, &ctx.options)?; + let (cs, update) = fs.into_complete(size, &ctx.global.options)?; (cs.into(), Some(update)) } else { let update = EntryState::Partial { @@ -454,7 +465,7 @@ impl BaoFileStorage { let new = changes.new_state(); if new.complete { let size = new.validated_size.unwrap(); - let (cs, update) = fs.into_complete(size, &ctx.options)?; + let (cs, update) = fs.into_complete(size, &ctx.global.options)?; (cs.into(), Some(update)) } else if changes.was_validated() { // we are still partial, but now we know the size @@ -471,7 +482,7 @@ impl BaoFileStorage { // unless there is a bug, this would just write the exact same data (self, None) } - BaoFileStorage::Poisoned => { + _ => { // we are poisoned, so just ignore the write (self, None) } @@ -479,7 +490,7 @@ impl BaoFileStorage { } /// Create a new mutable mem storage. - pub fn partial_mem() -> Self { + pub fn new_partial_mem() -> Self { Self::PartialMem(Default::default()) } @@ -489,13 +500,14 @@ impl BaoFileStorage { match self { Self::Complete(_) => Ok(()), Self::PartialMem(_) => Ok(()), + Self::NonExisting => Ok(()), Self::Partial(file) => { file.data.sync_all()?; file.outboard.sync_all()?; file.sizes.sync_all()?; Ok(()) } - Self::Poisoned => { + Self::Poisoned | Self::Initial | Self::Loading => { // we are poisoned, so just ignore the sync Ok(()) } @@ -507,199 +519,139 @@ impl BaoFileStorage { } } -/// A weak reference to a bao file handle. -#[derive(Debug, Clone)] -pub struct BaoFileHandleWeak(Weak); - -impl BaoFileHandleWeak { - /// Upgrade to a strong reference if possible. - pub fn upgrade(&self) -> Option { - let inner = self.0.upgrade()?; - if let &BaoFileStorage::Poisoned = inner.storage.borrow().deref() { - trace!("poisoned storage, cannot upgrade"); - return None; - }; - Some(BaoFileHandle(inner)) - } - - /// True if the handle is definitely dead. - pub fn is_dead(&self) -> bool { - self.0.strong_count() == 0 - } -} - -/// The inner part of a bao file handle. -pub struct BaoFileHandleInner { - pub(crate) storage: watch::Sender, - hash: Hash, - options: Arc, -} +/// A cheaply cloneable handle to a bao file, including the hash and the configuration. +#[derive(Debug, Clone, Default, derive_more::Deref)] +pub(crate) struct BaoFileHandle(pub(super) watch::Sender); -impl fmt::Debug for BaoFileHandleInner { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - let guard = self.storage.borrow(); - let storage = guard.deref(); - f.debug_struct("BaoFileHandleInner") - .field("hash", &DD(self.hash)) - .field("storage", &storage) - .finish_non_exhaustive() +impl entity_manager::Reset for BaoFileHandle { + fn reset(&mut self) { + self.send_replace(BaoFileStorage::Initial); } -} -/// A cheaply cloneable handle to a bao file, including the hash and the configuration. -#[derive(Debug, Clone, derive_more::Deref)] -pub struct BaoFileHandle(Arc); - -impl Drop for BaoFileHandle { - fn drop(&mut self) { - self.0.storage.send_if_modified(|guard| { - if Arc::strong_count(&self.0) > 1 { - return false; - } - // there is the possibility that somebody else will increase the strong count - // here. there is nothing we can do about it, but they won't be able to - // access the internals of the handle because we have the lock. - // - // We poison the storage. A poisoned storage is considered dead and will - // have to be recreated, but only *after* we are done with persisting - // the bitfield. - let BaoFileStorage::Partial(fs) = guard.take() else { - return false; - }; - let options = &self.options; - let path = options.path.bitfield_path(&self.hash); - trace!( - "writing bitfield for hash {} to {}", - self.hash, - path.display() - ); - if let Err(cause) = fs.sync_all(&path) { - error!( - "failed to write bitfield for {} at {}: {:?}", - self.hash, - path.display(), - cause - ); - } - false - }); + fn ref_count(&self) -> usize { + self.0.receiver_count() + self.0.sender_count() } } /// A reader for a bao file, reading just the data. #[derive(Debug)] -pub struct DataReader(BaoFileHandle); +pub struct DataReader(pub(super) BaoFileHandle); impl ReadBytesAt for DataReader { fn read_bytes_at(&self, offset: u64, size: usize) -> std::io::Result { - let guard = self.0.storage.borrow(); + let guard = self.0.borrow(); match guard.deref() { BaoFileStorage::PartialMem(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Partial(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Complete(x) => x.data.read_bytes_at(offset, size), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } } /// A reader for the outboard part of a bao file. #[derive(Debug)] -pub struct OutboardReader(BaoFileHandle); +pub struct OutboardReader(pub(super) BaoFileHandle); impl ReadAt for OutboardReader { fn read_at(&self, offset: u64, buf: &mut [u8]) -> io::Result { - let guard = self.0.storage.borrow(); + let guard = self.0.borrow(); match guard.deref() { BaoFileStorage::Complete(x) => x.outboard.read_at(offset, buf), BaoFileStorage::PartialMem(x) => x.outboard.read_at(offset, buf), BaoFileStorage::Partial(x) => x.outboard.read_at(offset, buf), BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), + BaoFileStorage::Initial => io::Result::Err(io::Error::other("initial")), + BaoFileStorage::Loading => io::Result::Err(io::Error::other("loading")), + BaoFileStorage::NonExisting => io::Result::Err(io::ErrorKind::NotFound.into()), } } } -impl BaoFileHandle { - #[allow(dead_code)] - pub fn id(&self) -> usize { - Arc::as_ptr(&self.0) as usize - } - - /// Create a new bao file handle. - /// - /// This will create a new file handle with an empty memory storage. - pub fn new_partial_mem(hash: Hash, options: Arc) -> Self { - let storage = BaoFileStorage::partial_mem(); - Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - options: options.clone(), - })) +impl BaoFileStorage { + pub async fn open(state: Option>, ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = &ctx.global.options; + Ok(match state { + Some(EntryState::Complete { + data_location, + outboard_location, + }) => { + let data = match data_location { + DataLocation::Inline(data) => MemOrFile::Mem(data), + DataLocation::Owned(size) => { + let path = options.path.data_path(hash); + let file = std::fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, size)) + } + DataLocation::External(paths, size) => { + let Some(path) = paths.into_iter().next() else { + return Err(io::Error::other("no external data path")); + }; + let file = std::fs::File::open(&path)?; + MemOrFile::File(FixedSize::new(file, size)) + } + }; + let outboard = match outboard_location { + OutboardLocation::NotNeeded => MemOrFile::empty(), + OutboardLocation::Inline(data) => MemOrFile::Mem(data), + OutboardLocation::Owned => { + let path = options.path.outboard_path(hash); + let file = std::fs::File::open(&path)?; + MemOrFile::File(file) + } + }; + Self::new_complete(data, outboard) + } + Some(EntryState::Partial { .. }) => Self::new_partial_file(ctx).await?, + None => Self::NonExisting, + }) } /// Create a new bao file handle with a partial file. - pub(super) async fn new_partial_file(hash: Hash, ctx: &TaskContext) -> io::Result { - let options = ctx.options.clone(); - let storage = PartialFileStorage::load(&hash, &options.path)?; - let storage = if storage.bitfield.is_complete() { + pub(super) async fn new_partial_file(ctx: &HashContext) -> io::Result { + let hash = &ctx.id; + let options = ctx.global.options.clone(); + let storage = PartialFileStorage::load(hash, &options.path)?; + Ok(if storage.bitfield.is_complete() { let size = storage.bitfield.size; let (storage, entry_state) = storage.into_complete(size, &options)?; debug!("File was reconstructed as complete"); - let (tx, rx) = crate::util::channel::oneshot::channel(); - ctx.db - .sender - .send( - Set { - hash, - state: entry_state, - tx, - span: Span::current(), - } - .into(), - ) - .await - .map_err(|_| io::Error::other("send update"))?; - rx.await.map_err(|_| io::Error::other("receive update"))??; + ctx.global.db.set(*hash, entry_state).await?; storage.into() } else { storage.into() - }; - Ok(Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - options, - }))) + }) } /// Create a new complete bao file handle. pub fn new_complete( - hash: Hash, data: MemOrFile>, outboard: MemOrFile, - options: Arc, ) -> Self { - let storage = CompleteStorage { data, outboard }.into(); - Self(Arc::new(BaoFileHandleInner { - storage: watch::Sender::new(storage), - hash, - options, - })) + CompleteStorage { data, outboard }.into() } +} +impl BaoFileHandle { /// Complete the handle pub fn complete( &self, data: MemOrFile>, outboard: MemOrFile, ) { - self.storage.send_if_modified(|guard| { - let res = match guard { - BaoFileStorage::Complete(_) => None, - BaoFileStorage::PartialMem(entry) => Some(&mut entry.bitfield), - BaoFileStorage::Partial(entry) => Some(&mut entry.bitfield), - BaoFileStorage::Poisoned => None, + self.send_if_modified(|guard| { + let needs_complete = match guard { + BaoFileStorage::NonExisting => true, + BaoFileStorage::Complete(_) => false, + BaoFileStorage::PartialMem(_) => true, + BaoFileStorage::Partial(_) => true, + _ => false, }; - if let Some(bitfield) = res { - bitfield.update(&Bitfield::complete(data.size())); + if needs_complete { *guard = BaoFileStorage::Complete(CompleteStorage { data, outboard }); true } else { @@ -707,118 +659,14 @@ impl BaoFileHandle { } }); } - - pub fn subscribe(&self) -> BaoFileStorageSubscriber { - BaoFileStorageSubscriber::new(self.0.storage.subscribe()) - } - - /// True if the file is complete. - #[allow(dead_code)] - pub fn is_complete(&self) -> bool { - matches!(self.storage.borrow().deref(), BaoFileStorage::Complete(_)) - } - - /// An AsyncSliceReader for the data file. - /// - /// Caution: this is a reader for the unvalidated data file. Reading this - /// can produce data that does not match the hash. - pub fn data_reader(&self) -> DataReader { - DataReader(self.clone()) - } - - /// An AsyncSliceReader for the outboard file. - /// - /// The outboard file is used to validate the data file. It is not guaranteed - /// to be complete. - pub fn outboard_reader(&self) -> OutboardReader { - OutboardReader(self.clone()) - } - - /// The most precise known total size of the data file. - pub fn current_size(&self) -> io::Result { - match self.storage.borrow().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.size()), - BaoFileStorage::PartialMem(mem) => Ok(mem.current_size()), - BaoFileStorage::Partial(file) => file.current_size(), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - } - } - - /// The most precise known total size of the data file. - pub fn bitfield(&self) -> io::Result { - match self.storage.borrow().deref() { - BaoFileStorage::Complete(mem) => Ok(mem.bitfield()), - BaoFileStorage::PartialMem(mem) => Ok(mem.bitfield().clone()), - BaoFileStorage::Partial(file) => Ok(file.bitfield().clone()), - BaoFileStorage::Poisoned => io::Result::Err(io::Error::other("poisoned storage")), - } - } - - /// The outboard for the file. - pub fn outboard(&self) -> io::Result> { - let root = self.hash.into(); - let tree = BaoTree::new(self.current_size()?, IROH_BLOCK_SIZE); - let outboard = self.outboard_reader(); - Ok(PreOrderOutboard { - root, - tree, - data: outboard, - }) - } - - /// The hash of the file. - pub fn hash(&self) -> Hash { - self.hash - } - - /// Downgrade to a weak reference. - pub fn downgrade(&self) -> BaoFileHandleWeak { - BaoFileHandleWeak(Arc::downgrade(&self.0)) - } - - /// Write a batch and notify the db - pub(super) async fn write_batch( - &self, - batch: &[BaoContentItem], - bitfield: &Bitfield, - ctx: &TaskContext, - ) -> io::Result<()> { - trace!("write_batch bitfield={:?} batch={}", bitfield, batch.len()); - let mut res = Ok(None); - self.storage.send_if_modified(|state| { - let Ok((state1, update)) = state.take().write_batch(batch, bitfield, ctx, &self.hash) - else { - res = Err(io::Error::other("write batch failed")); - return false; - }; - res = Ok(update); - *state = state1; - true - }); - if let Some(update) = res? { - ctx.db - .sender - .send( - Update { - hash: self.hash, - state: update, - tx: None, - span: Span::current(), - } - .into(), - ) - .await - .map_err(|_| io::Error::other("send update"))?; - } - Ok(()) - } } impl PartialMemStorage { - /// Persist the batch to disk, creating a FileBatch. - fn persist(self, ctx: &TaskContext, hash: &Hash) -> io::Result { - let options = &ctx.options.path; - ctx.protect.protect( + /// Persist the batch to disk. + fn persist(self, ctx: &HashContext) -> io::Result { + let options = &ctx.global.options.path; + let hash = &ctx.id; + ctx.global.protect.protect( *hash, [ BaoFilePart::Data, @@ -843,12 +691,6 @@ impl PartialMemStorage { bitfield: self.bitfield, }) } - - /// Get the parts data, outboard and sizes - #[allow(dead_code)] - pub fn into_parts(self) -> (SparseMemFile, SparseMemFile, SizeInfo) { - (self.data, self.outboard, self.size) - } } pub struct BaoFileStorageSubscriber { diff --git a/src/store/fs/gc.rs b/src/store/fs/gc.rs index a394dc19..70333f3e 100644 --- a/src/store/fs/gc.rs +++ b/src/store/fs/gc.rs @@ -192,6 +192,7 @@ mod tests { use std::{ io::{self}, path::Path, + time::Duration, }; use bao_tree::{io::EncodeError, ChunkNum}; @@ -299,6 +300,7 @@ mod tests { let outboard_path = options.outboard_path(&bh); let sizes_path = options.sizes_path(&bh); let bitfield_path = options.bitfield_path(&bh); + tokio::time::sleep(Duration::from_millis(100)).await; // allow for some time for the file to be written assert!(data_path.exists()); assert!(outboard_path.exists()); assert!(sizes_path.exists()); diff --git a/src/store/fs/meta.rs b/src/store/fs/meta.rs index 617db98c..21fbd9ed 100644 --- a/src/store/fs/meta.rs +++ b/src/store/fs/meta.rs @@ -34,7 +34,7 @@ mod proto; pub use proto::*; pub(crate) mod tables; use tables::{ReadOnlyTables, ReadableTables, Tables}; -use tracing::{debug, error, info_span, trace}; +use tracing::{debug, error, info_span, trace, Span}; use super::{ delete_set::DeleteHandle, @@ -88,7 +88,7 @@ pub type ActorResult = Result; #[derive(Debug, Clone)] pub struct Db { - pub sender: tokio::sync::mpsc::Sender, + sender: tokio::sync::mpsc::Sender, } impl Db { @@ -96,8 +96,71 @@ impl Db { Self { sender } } + pub async fn snapshot(&self, span: tracing::Span) -> io::Result { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.sender + .send(Snapshot { tx, span }.into()) + .await + .map_err(|_| io::Error::other("send snapshot"))?; + rx.await.map_err(|_| io::Error::other("receive snapshot")) + } + + pub async fn update_await(&self, hash: Hash, state: EntryState) -> io::Result<()> { + let (tx, rx) = oneshot::channel(); + self.sender + .send( + Update { + hash, + state, + tx: Some(tx), + span: tracing::Span::current(), + } + .into(), + ) + .await + .map_err(|_| io::Error::other("send update"))?; + rx.await + .map_err(|_e| io::Error::other("receive update"))??; + Ok(()) + } + + /// Update the entry state for a hash, without awaiting completion. + pub async fn update(&self, hash: Hash, state: EntryState) -> io::Result<()> { + self.sender + .send( + Update { + hash, + state, + tx: None, + span: Span::current(), + } + .into(), + ) + .await + .map_err(|_| io::Error::other("send update")) + } + + /// Set the entry state and await completion. + pub async fn set(&self, hash: Hash, entry_state: EntryState) -> io::Result<()> { + let (tx, rx) = oneshot::channel(); + self.sender + .send( + Set { + hash, + state: entry_state, + tx, + span: Span::current(), + } + .into(), + ) + .await + .map_err(|_| io::Error::other("send update"))?; + rx.await.map_err(|_| io::Error::other("receive update"))??; + Ok(()) + } + /// Get the entry state for a hash, if any. - pub async fn get(&self, hash: Hash) -> anyhow::Result>> { + pub async fn get(&self, hash: Hash) -> io::Result>> { let (tx, rx) = oneshot::channel(); self.sender .send( @@ -108,8 +171,9 @@ impl Db { } .into(), ) - .await?; - let res = rx.await?; + .await + .map_err(|_| io::Error::other("send get"))?; + let res = rx.await.map_err(|_| io::Error::other("receive get"))?; Ok(res.state?) } diff --git a/src/store/fs/util.rs b/src/store/fs/util.rs index f2949a7c..1cbd01bc 100644 --- a/src/store/fs/util.rs +++ b/src/store/fs/util.rs @@ -1,6 +1,7 @@ use std::future::Future; use tokio::{select, sync::mpsc}; +pub(crate) mod entity_manager; /// A wrapper for a tokio mpsc receiver that allows peeking at the next message. #[derive(Debug)] diff --git a/src/store/fs/util/entity_manager.rs b/src/store/fs/util/entity_manager.rs new file mode 100644 index 00000000..64fd9f85 --- /dev/null +++ b/src/store/fs/util/entity_manager.rs @@ -0,0 +1,1338 @@ +#![allow(dead_code)] +use std::{fmt::Debug, future::Future, hash::Hash}; + +use n0_future::{future, FuturesUnordered}; +use tokio::sync::{mpsc, oneshot}; + +/// Trait to reset an entity state in place. +/// +/// In many cases this is just assigning the default value, but e.g. for an +/// `Arc>` resetting to the default value means an allocation, whereas +/// reset can be done without. +pub trait Reset: Default { + /// Reset the state to its default value. + fn reset(&mut self); + + /// A ref count to ensure that the state is unique when shutting down. + /// + /// You are not allowed to clone the state out of a task, even though that + /// is possible. + fn ref_count(&self) -> usize; +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ShutdownCause { + /// The entity is shutting down gracefully because the entity is idle. + Idle, + /// The entity is shutting down because the entity manager is shutting down. + Soft, + /// The entity is shutting down because the sender was dropped. + Drop, +} + +/// Parameters for the entity manager system. +pub trait Params: Send + Sync + 'static { + /// Entity id type. + /// + /// This does not require Copy to allow for more complex types, such as `String`, + /// but you have to make sure that ids are small and cheap to clone, since they are + /// used as keys in maps. + type EntityId: Debug + Hash + Eq + Clone + Send + Sync + 'static; + /// Global state type. + /// + /// This is passed into all entity actors. It also needs to be cheap handle. + /// If you don't need it, just set it to `()`. + type GlobalState: Debug + Clone + Send + Sync + 'static; + /// Entity state type. + /// + /// This is the actual distinct per-entity state. This needs to implement + /// `Default` and a matching `Reset`. It also needs to implement `Clone` + /// since we unfortunately need to pass an owned copy of the state to the + /// callback - otherwise we run into some rust lifetime limitations + /// . + /// + /// Frequently this is an `Arc>` or similar. Note that per entity + /// access is concurrent but not parallel, so you can use a more efficient + /// synchronization primitive like [`AtomicRefCell`](https://crates.io/crates/atomic_refcell) if you want to. + type EntityState: Default + Debug + Reset + Clone + Send + Sync + 'static; + /// Function being called when an entity actor is shutting down. + fn on_shutdown( + state: entity_actor::State, + cause: ShutdownCause, + ) -> impl Future + Send + 'static + where + Self: Sized; +} + +/// Sent to the main actor and then delegated to the entity actor to spawn a new task. +pub(crate) struct Spawn { + id: P::EntityId, + f: Box) -> future::Boxed<()> + Send>, +} + +pub(crate) struct EntityShutdown; + +/// Argument for the `EntityManager::spawn` function. +pub enum SpawnArg { + /// The entity is active, and we were able to spawn a task. + Active(ActiveEntityState

), + /// The entity is busy and cannot spawn a new task. + Busy, + /// The entity is dead. + Dead, +} + +/// Sent from the entity actor to the main actor to notify that it is shutting down. +/// +/// With this message the entity actor gives back the receiver for its command channel, +/// so it can be reusd either immediately if commands come in during shutdown, or later +/// if the entity actor is reused for a different entity. +struct Shutdown { + id: P::EntityId, + receiver: mpsc::Receiver>, +} + +struct ShutdownAll { + tx: oneshot::Sender<()>, +} + +/// Sent from the main actor to the entity actor to notify that it has completed shutdown. +/// +/// With this message the entity actor sends back the remaining state. The tasks set +/// at this point must be empty, as the entity actor has already completed all tasks. +struct ShutdownComplete { + state: ActiveEntityState

, + tasks: FuturesUnordered>, +} + +mod entity_actor { + #![allow(dead_code)] + use n0_future::{future, FuturesUnordered, StreamExt}; + use tokio::sync::mpsc; + + use super::{ + EntityShutdown, Params, Reset, Shutdown, ShutdownCause, ShutdownComplete, Spawn, SpawnArg, + }; + + /// State of an active entity. + #[derive(Debug)] + pub struct State { + /// The entity id. + pub id: P::EntityId, + /// A copy of the global state. + pub global: P::GlobalState, + /// The per-entity state which might have internal mutability. + pub state: P::EntityState, + } + + impl Clone for State

{ + fn clone(&self) -> Self { + Self { + id: self.id.clone(), + global: self.global.clone(), + state: self.state.clone(), + } + } + } + + pub enum Command { + Spawn(Spawn

), + EntityShutdown(EntityShutdown), + } + + impl From for Command

{ + fn from(_: EntityShutdown) -> Self { + Self::EntityShutdown(EntityShutdown) + } + } + + #[derive(Debug)] + pub struct Actor { + pub recv: mpsc::Receiver>, + pub main: mpsc::Sender>, + pub state: State

, + pub tasks: FuturesUnordered>, + } + + impl Actor

{ + pub async fn run(mut self) { + loop { + tokio::select! { + command = self.recv.recv() => { + let Some(command) = command else { + // Channel closed, this means that the main actor is shutting down. + self.drop_shutdown_state().await; + break; + }; + match command { + Command::Spawn(spawn) => { + let task = (spawn.f)(SpawnArg::Active(self.state.clone())); + self.tasks.push(task); + } + Command::EntityShutdown(_) => { + self.soft_shutdown_state().await; + break; + } + } + } + Some(_) = self.tasks.next(), if !self.tasks.is_empty() => {} + } + if self.tasks.is_empty() && self.recv.is_empty() { + // No more tasks and no more commands, we can recycle the actor. + self.recycle_state().await; + break; // Exit the loop, actor is done. + } + } + } + + /// drop shutdown state. + /// + /// All senders for our receive channel were dropped, so we shut down without waiting for any tasks to complete. + async fn drop_shutdown_state(self) { + let Self { state, .. } = self; + P::on_shutdown(state, ShutdownCause::Drop).await; + } + + /// Soft shutdown state. + /// + /// We have received an explicit shutdown command, so we wait for all tasks to complete and then call the shutdown function. + async fn soft_shutdown_state(mut self) { + while (self.tasks.next().await).is_some() {} + P::on_shutdown(self.state.clone(), ShutdownCause::Soft).await; + } + + async fn recycle_state(self) { + // we can't check if recv is empty here, since new messages might come in while we are in recycle_state. + assert!( + self.tasks.is_empty(), + "Tasks must be empty before recycling" + ); + // notify main actor that we are starting to shut down. + // if the main actor is shutting down, this could fail, but we don't care. + self.main + .send( + Shutdown { + id: self.state.id.clone(), + receiver: self.recv, + } + .into(), + ) + .await + .ok(); + println!("Calling on_shutdown {}", self.state.state.ref_count()); + assert_eq!(self.state.state.ref_count(), 1); + P::on_shutdown(self.state.clone(), ShutdownCause::Idle).await; + // Notify the main actor that we have completed shutdown. + // here we also give back the rest of ourselves so the main actor can recycle us. + self.main + .send( + ShutdownComplete { + state: self.state, + tasks: self.tasks, + } + .into(), + ) + .await + .ok(); + } + + /// Recycle the actor for reuse by setting its state to default. + /// + /// This also checks several invariants: + /// - There must be no pending messages in the receive channel. + /// - The sender must have a strong count of 1, meaning no other references exist + /// - The tasks set must be empty, meaning no tasks are running. + /// - The global state must match the scope provided. + /// - The state must be unique to the actor, meaning no other references exist. + pub fn recycle(&mut self) { + assert!( + self.recv.is_empty(), + "Cannot recycle actor with pending messages" + ); + assert!( + self.recv.sender_strong_count() == 1, + "There must be only one sender left" + ); + assert!( + self.tasks.is_empty(), + "Tasks must be empty before recycling" + ); + self.state.state.reset(); + } + } +} +pub use entity_actor::State as ActiveEntityState; +pub use main_actor::ActorState as EntityManagerState; + +mod main_actor { + #![allow(dead_code)] + use std::{collections::HashMap, future::Future}; + + use n0_future::{future, FuturesUnordered}; + use tokio::{sync::mpsc, task::JoinSet}; + use tracing::{error, warn}; + + use super::{ + entity_actor, EntityShutdown, Params, Reset, Shutdown, ShutdownAll, ShutdownComplete, + Spawn, SpawnArg, + }; + + pub(super) enum Command { + Spawn(Spawn

), + ShutdownAll(ShutdownAll), + } + + impl From for Command

{ + fn from(shutdown_all: ShutdownAll) -> Self { + Self::ShutdownAll(shutdown_all) + } + } + + pub(super) enum InternalCommand { + ShutdownComplete(ShutdownComplete

), + Shutdown(Shutdown

), + } + + impl From> for InternalCommand

{ + fn from(shutdown: Shutdown

) -> Self { + Self::Shutdown(shutdown) + } + } + + impl From> for InternalCommand

{ + fn from(shutdown_complete: ShutdownComplete

) -> Self { + Self::ShutdownComplete(shutdown_complete) + } + } + + #[derive(Debug)] + pub enum EntityHandle { + /// A running entity actor. + Live { + send: mpsc::Sender>, + }, + ShuttingDown { + send: mpsc::Sender>, + recv: mpsc::Receiver>, + }, + } + + impl EntityHandle

{ + pub fn send(&self) -> &mpsc::Sender> { + match self { + EntityHandle::Live { send } => send, + EntityHandle::ShuttingDown { send, .. } => send, + } + } + } + + /// State machine for an entity actor manager. + /// + /// This is if you don't want a separate manager actor, but want to inline the entity + /// actor management into your main actor. + #[derive(Debug)] + pub struct ActorState { + /// Channel to receive internal commands from the entity actors. + /// This channel will never be closed since we also hold a sender to it. + internal_recv: mpsc::Receiver>, + /// Channel to send internal commands to ourselves, to hand out to entity actors. + internal_send: mpsc::Sender>, + /// Map of live entity actors. + live: HashMap>, + /// Global state shared across all entity actors. + state: P::GlobalState, + /// Pool of inactive entity actors to reuse. + pool: Vec<( + mpsc::Sender>, + entity_actor::Actor

, + )>, + /// Maximum size of the inbox of an entity actor. + entity_inbox_size: usize, + /// Initial capacity of the futures set for entity actors. + entity_futures_initial_capacity: usize, + } + + impl ActorState

{ + pub fn new( + state: P::GlobalState, + pool_capacity: usize, + entity_inbox_size: usize, + entity_response_inbox_size: usize, + entity_futures_initial_capacity: usize, + ) -> Self { + let (internal_send, internal_recv) = mpsc::channel(entity_response_inbox_size); + Self { + internal_recv, + internal_send, + live: HashMap::new(), + state, + pool: Vec::with_capacity(pool_capacity), + entity_inbox_size, + entity_futures_initial_capacity, + } + } + + #[must_use = "this function may return a future that must be spawned by the caller"] + /// Friendly version of `spawn_boxed` that does the boxing + pub async fn spawn( + &mut self, + id: P::EntityId, + f: F, + ) -> Option + Send + 'static> + where + F: FnOnce(SpawnArg

) -> Fut + Send + 'static, + Fut: Future + Send + 'static, + { + self.spawn_boxed( + id, + Box::new(|x| { + Box::pin(async move { + f(x).await; + }) + }), + ) + .await + } + + #[must_use = "this function may return a future that must be spawned by the caller"] + pub async fn spawn_boxed( + &mut self, + id: P::EntityId, + f: Box) -> future::Boxed<()> + Send>, + ) -> Option + Send + 'static> { + let (entity_handle, task) = self.get_or_create(id.clone()); + let sender = entity_handle.send(); + if let Err(e) = + sender.try_send(entity_actor::Command::Spawn(Spawn { id: id.clone(), f })) + { + match e { + mpsc::error::TrySendError::Full(cmd) => { + let entity_actor::Command::Spawn(spawn) = cmd else { + panic!() + }; + warn!( + "Entity actor inbox is full, cannot send command to entity actor {:?}.", + id + ); + // we await in the select here, but I think this is fine, since the actor is busy. + // maybe slowing things down a bit is helpful. + (spawn.f)(SpawnArg::Busy).await; + } + mpsc::error::TrySendError::Closed(cmd) => { + let entity_actor::Command::Spawn(spawn) = cmd else { + panic!() + }; + error!( + "Entity actor inbox is closed, cannot send command to entity actor {:?}.", + id + ); + // give the caller a chance to react to this bad news. + // at this point we are in trouble anyway, so awaiting is going to be the least of our problems. + (spawn.f)(SpawnArg::Dead).await; + } + } + }; + task + } + + /// This function needs to be polled by the owner of the actor state to advance the + /// entity manager state machine. If it returns a future, that future must be spawned + /// by the caller. + #[must_use = "this function may return a future that must be spawned by the caller"] + pub async fn tick(&mut self) -> Option + Send + 'static> { + if let Some(cmd) = self.internal_recv.recv().await { + match cmd { + InternalCommand::Shutdown(Shutdown { id, receiver }) => { + let Some(entity_handle) = self.live.remove(&id) else { + error!("Received shutdown command for unknown entity actor {id:?}"); + return None; + }; + let EntityHandle::Live { send } = entity_handle else { + error!( + "Received shutdown command for entity actor {id:?} that is already shutting down" + ); + return None; + }; + self.live.insert( + id.clone(), + EntityHandle::ShuttingDown { + send, + recv: receiver, + }, + ); + } + InternalCommand::ShutdownComplete(ShutdownComplete { state, tasks }) => { + let id = state.id.clone(); + let Some(entity_handle) = self.live.remove(&id) else { + error!( + "Received shutdown complete command for unknown entity actor {id:?}" + ); + return None; + }; + let EntityHandle::ShuttingDown { send, recv } = entity_handle else { + error!( + "Received shutdown complete command for entity actor {id:?} that is not shutting down" + ); + return None; + }; + // re-assemble the actor from the parts + let mut actor = entity_actor::Actor { + main: self.internal_send.clone(), + recv, + state, + tasks, + }; + if actor.recv.is_empty() { + // No commands during shutdown, we can recycle the actor. + self.recycle(send, actor); + } else { + actor.state.state.reset(); + self.live.insert(id.clone(), EntityHandle::Live { send }); + return Some(actor.run()); + } + } + } + } + None + } + + /// Send a shutdown command to all live entity actors. + pub async fn shutdown(self) { + for handle in self.live.values() { + handle.send().send(EntityShutdown {}.into()).await.ok(); + } + } + + /// Get or create an entity actor for the given id. + /// + /// If this function returns a future, it must be spawned by the caller. + fn get_or_create( + &mut self, + id: P::EntityId, + ) -> ( + &mut EntityHandle

, + Option + Send + 'static>, + ) { + let mut task = None; + let handle = self.live.entry(id.clone()).or_insert_with(|| { + if let Some((send, mut actor)) = self.pool.pop() { + // Get an actor from the pool of inactive actors and initialize it. + actor.state.id = id.clone(); + actor.state.global = self.state.clone(); + // strictly speaking this is not needed, since we reset the state when adding the actor to the pool. + actor.state.state.reset(); + task = Some(actor.run()); + EntityHandle::Live { send } + } else { + // Create a new entity actor and inbox. + let (send, recv) = mpsc::channel(self.entity_inbox_size); + let state: entity_actor::State

= entity_actor::State { + id: id.clone(), + global: self.state.clone(), + state: Default::default(), + }; + let actor = entity_actor::Actor { + main: self.internal_send.clone(), + recv, + state, + tasks: FuturesUnordered::with_capacity( + self.entity_futures_initial_capacity, + ), + }; + task = Some(actor.run()); + EntityHandle::Live { send } + } + }); + (handle, task) + } + + fn recycle( + &mut self, + sender: mpsc::Sender>, + mut actor: entity_actor::Actor

, + ) { + assert!(sender.strong_count() == 1); + // todo: check that sender and receiver are the same channel. tokio does not have an api for this, unfortunately. + // reset the actor in any case, just to check the invariants. + actor.recycle(); + // Recycle the actor for later use. + if self.pool.len() < self.pool.capacity() { + self.pool.push((sender, actor)); + } + } + } + + pub struct Actor { + /// Channel to receive commands from the outside world. + /// If this channel is closed, it means we need to shut down in a hurry. + recv: mpsc::Receiver>, + /// Tasks that are currently running. + tasks: JoinSet<()>, + /// Internal state of the actor + state: ActorState

, + } + + impl Actor

{ + pub fn new( + state: P::GlobalState, + recv: tokio::sync::mpsc::Receiver>, + pool_capacity: usize, + entity_inbox_size: usize, + entity_response_inbox_size: usize, + entity_futures_initial_capacity: usize, + ) -> Self { + Self { + recv, + tasks: JoinSet::new(), + state: ActorState::new( + state, + pool_capacity, + entity_inbox_size, + entity_response_inbox_size, + entity_futures_initial_capacity, + ), + } + } + + pub async fn run(mut self) { + enum SelectOutcome { + Command(A), + Tick(B), + TaskDone(C), + } + loop { + let res = tokio::select! { + x = self.recv.recv() => SelectOutcome::Command(x), + x = self.state.tick() => SelectOutcome::Tick(x), + Some(task) = self.tasks.join_next(), if !self.tasks.is_empty() => SelectOutcome::TaskDone(task), + }; + match res { + SelectOutcome::Command(cmd) => { + let Some(cmd) = cmd else { + // Channel closed, this means that the main actor is shutting down. + self.hard_shutdown().await; + break; + }; + match cmd { + Command::Spawn(spawn) => { + if let Some(task) = self.state.spawn_boxed(spawn.id, spawn.f).await + { + self.tasks.spawn(task); + } + } + Command::ShutdownAll(arg) => { + self.soft_shutdown().await; + arg.tx.send(()).ok(); + break; + } + } + // Handle incoming command + } + SelectOutcome::Tick(future) => { + if let Some(task) = future { + self.tasks.spawn(task); + } + } + SelectOutcome::TaskDone(result) => { + // Handle completed task + if let Err(e) = result { + error!("Task failed: {e:?}"); + } + } + } + } + } + + async fn soft_shutdown(self) { + let Self { + mut tasks, state, .. + } = self; + state.shutdown().await; + while let Some(res) = tasks.join_next().await { + if let Err(e) = res { + eprintln!("Task failed during shutdown: {e:?}"); + } + } + } + + async fn hard_shutdown(self) { + let Self { + mut tasks, state, .. + } = self; + // this is needed so calls to internal_send in idle shutdown fail fast. + // otherwise we would have to drain the channel, but we don't care about the messages at + // this point. + drop(state); + while let Some(res) = tasks.join_next().await { + if let Err(e) = res { + eprintln!("Task failed during shutdown: {e:?}"); + } + } + } + } +} + +/// A manager for entities identified by an entity id. +/// +/// The manager provides parallelism between entities, but just concurrency within a single entity. +/// This is useful if the entity wraps an external resource such as a file that does not benefit +/// from parallelism. +/// +/// The entity manager internally uses a main actor and per-entity actors. Per entity actors +/// and their inbox queues are recycled when they become idle, to save allocations. +/// +/// You can mostly ignore these implementation details, except when you want to customize the +/// queue sizes in the [`Options`] struct. +/// +/// The main entry point is the [`EntityManager::spawn`] function. +/// +/// Dropping the `EntityManager` will shut down the entity actors without waiting for their +/// tasks to complete. For a more gentle shutdown, use the [`EntityManager::shutdown`] function +/// that does wait for tasks to complete. +#[derive(Debug, Clone)] +pub struct EntityManager(mpsc::Sender>); + +#[derive(Debug, Clone, Copy)] +pub struct Options { + /// Maximum number of inactive entity actors that are being pooled for reuse. + pub pool_capacity: usize, + /// Size of the inbox for the manager actor. + pub inbox_size: usize, + /// Size of the inbox for entity actors. + pub entity_inbox_size: usize, + /// Size of the inbox for entity actor responses to the manager actor. + pub entity_response_inbox_size: usize, + /// Initial capacity of the futures set for entity actors. + /// + /// Set this to the expected average concurrency level of your entities. + pub entity_futures_initial_capacity: usize, +} + +impl Default for Options { + fn default() -> Self { + Self { + pool_capacity: 10, + inbox_size: 10, + entity_inbox_size: 10, + entity_response_inbox_size: 100, + entity_futures_initial_capacity: 16, + } + } +} + +impl EntityManager

{ + pub fn new(state: P::GlobalState, options: Options) -> Self { + let (send, recv) = mpsc::channel(options.inbox_size); + let actor = main_actor::Actor::new( + state, + recv, + options.pool_capacity, + options.entity_inbox_size, + options.entity_response_inbox_size, + options.entity_futures_initial_capacity, + ); + tokio::spawn(actor.run()); + Self(send) + } + + /// Spawn a new task on the entity actor with the given id. + /// + /// Unless the world is ending - e.g. tokio runtime is shutting down - the passed function + /// is guaranteed to be called. However, there is no guarantee that the entity actor is + /// alive and responsive. See [`SpawnArg`] for details. + /// + /// Multiple callbacks for the same entity will be executed sequentially. There is no + /// parallelism within a single entity. So you can use synchronization primitives that + /// assume unique access in P::EntityState. And even if you do use multithreaded synchronization + /// primitives, they will never be contended. + /// + /// The future returned by `f` will be executed concurrently with other tasks, but again + /// there will be no real parallelism within a single entity actor. + pub async fn spawn(&self, id: P::EntityId, f: F) -> Result<(), &'static str> + where + F: FnOnce(SpawnArg

) -> Fut + Send + 'static, + Fut: future::Future + Send + 'static, + { + let spawn = Spawn { + id, + f: Box::new(|arg| { + Box::pin(async move { + f(arg).await; + }) + }), + }; + self.0 + .send(main_actor::Command::Spawn(spawn)) + .await + .map_err(|_| "Failed to send spawn command") + } + + pub async fn shutdown(&self) -> std::result::Result<(), &'static str> { + let (tx, rx) = oneshot::channel(); + self.0 + .send(ShutdownAll { tx }.into()) + .await + .map_err(|_| "Failed to send shutdown command")?; + rx.await + .map_err(|_| "Failed to receive shutdown confirmation") + } +} + +#[cfg(test)] +mod tests { + //! Tests for the entity manager. + //! + //! We implement a simple database for u128 counters, identified by u64 ids, + //! with both an in-memory and a file-based implementation. + //! + //! The database does internal consistency checks, to ensure that each + //! entity is only ever accessed by a single tokio task at a time, and to + //! ensure that wakeup and shutdown events are interleaved. + //! + //! We also check that the database behaves correctly by comparing with an + //! in-memory implementation. + //! + //! Database operations are done in parallel, so the fact that we are using + //! AtomicRefCell provides another test - if there was parallel write access + //! to a single entity due to a bug, it would panic. + use std::collections::HashMap; + + use n0_future::{BufferedStreamExt, StreamExt}; + use testresult::TestResult; + + use super::*; + + // a simple database for u128 counters, identified by u64 ids. + trait CounterDb { + async fn add(&self, id: u64, value: u128) -> Result<(), &'static str>; + async fn get(&self, id: u64) -> Result; + async fn shutdown(&self) -> Result<(), &'static str>; + async fn check_consistency(&self, values: HashMap); + } + + #[derive(Debug, PartialEq, Eq)] + enum Event { + Wakeup, + Shutdown, + } + + mod mem { + //! The in-memory database uses a HashMap in the global state to store + //! the values of the counters. Loading means reading from the global + //! state into the entity state, and persisting means writing to the + //! global state from the entity state. + use std::{ + collections::{HashMap, HashSet}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::Instant, + }; + + use atomic_refcell::AtomicRefCell; + + use super::*; + + #[derive(Debug, Default)] + struct Inner { + value: Option, + tasks: HashSet, + } + + #[derive(Debug, Clone, Default)] + struct State(Arc>); + + impl Reset for State { + fn reset(&mut self) { + *self.0.borrow_mut() = Default::default(); + } + + fn ref_count(&self) -> usize { + Arc::strong_count(&self.0) + } + } + + #[derive(Debug, Default)] + struct Global { + // the "database" of entity values + data: HashMap, + // log of awake and shutdown events + log: HashMap>, + } + + struct Counters; + impl Params for Counters { + type EntityId = u64; + type GlobalState = Arc>; + type EntityState = State; + async fn on_shutdown(entity: entity_actor::State, _cause: ShutdownCause) { + let state = entity.state.0.borrow(); + let mut global = entity.global.lock().unwrap(); + assert_eq!(state.tasks.len(), 1); + // persist the state + if let Some(value) = state.value { + global.data.insert(entity.id, value); + } + // log the shutdown event + global + .log + .entry(entity.id) + .or_default() + .push((Event::Shutdown, Instant::now())); + } + } + + pub struct MemDb { + m: EntityManager, + global: Arc>, + } + + impl entity_actor::State { + async fn with_value(&self, f: impl FnOnce(&mut u128)) -> Result<(), &'static str> { + let mut state = self.state.0.borrow_mut(); + // lazily load the data from the database + if state.value.is_none() { + let mut global = self.global.lock().unwrap(); + state.value = Some(global.data.get(&self.id).copied().unwrap_or_default()); + // log the wakeup event + global + .log + .entry(self.id) + .or_default() + .push((Event::Wakeup, Instant::now())); + } + // insert the task id into the tasks set to check that access is always + // from the same tokio task (not necessarily the same thread). + state.tasks.insert(tokio::task::id()); + // do the actual work + let r = state.value.as_mut().unwrap(); + f(r); + Ok(()) + } + } + + impl MemDb { + pub fn new() -> Self { + let global = Arc::new(Mutex::new(Global::default())); + Self { + global: global.clone(), + m: EntityManager::::new(global, Options::default()), + } + } + } + + impl super::CounterDb for MemDb { + async fn add(&self, id: u64, value: u128) -> Result<(), &'static str> { + self.m + .spawn(id, move |arg| async move { + match arg { + SpawnArg::Active(state) => { + state + .with_value(|v| *v = v.wrapping_add(value)) + .await + .unwrap(); + } + SpawnArg::Busy => println!("Entity actor is busy"), + SpawnArg::Dead => println!("Entity actor is dead"), + } + }) + .await + } + + async fn get(&self, id: u64) -> Result { + let (tx, rx) = oneshot::channel(); + self.m + .spawn(id, move |arg| async move { + match arg { + SpawnArg::Active(state) => { + state + .with_value(|v| { + tx.send(*v) + .unwrap_or_else(|_| println!("Failed to send value")) + }) + .await + .unwrap(); + } + SpawnArg::Busy => println!("Entity actor is busy"), + SpawnArg::Dead => println!("Entity actor is dead"), + } + }) + .await?; + rx.await.map_err(|_| "Failed to receive value") + } + + async fn shutdown(&self) -> Result<(), &'static str> { + self.m.shutdown().await + } + + async fn check_consistency(&self, values: HashMap) { + let global = self.global.lock().unwrap(); + assert_eq!(global.data, values, "Data mismatch"); + for id in values.keys() { + let log = global.log.get(id).unwrap(); + assert!( + log.len() % 2 == 0, + "Log must contain alternating wakeup and shutdown events" + ); + for (i, (event, _)) in log.iter().enumerate() { + assert_eq!( + *event, + if i % 2 == 0 { + Event::Wakeup + } else { + Event::Shutdown + }, + "Unexpected event type" + ); + } + } + } + } + + /// If a task is so busy that it can't drain it's inbox in time, we will + /// get a SpawnArg::Busy instead of access to the actual state. + /// + /// This will only happen if the system is seriously overloaded, since + /// the entity actor just spawns tasks for each message. So here we + /// simulate it by just not spawning the task as we are supposed to. + #[tokio::test] + async fn test_busy() -> TestResult<()> { + let mut state = EntityManagerState::::new( + Arc::new(Mutex::new(Global::default())), + 1024, + 8, + 8, + 2, + ); + let active = Arc::new(AtomicUsize::new(0)); + let busy = Arc::new(AtomicUsize::new(0)); + let inc = || { + let active = active.clone(); + let busy = busy.clone(); + |arg: SpawnArg| async move { + match arg { + SpawnArg::Active(_) => { + active.fetch_add(1, Ordering::SeqCst); + } + SpawnArg::Busy => { + busy.fetch_add(1, Ordering::SeqCst); + } + SpawnArg::Dead => { + println!("Entity actor is dead"); + } + } + } + }; + let fut1 = state.spawn(1, inc()).await; + assert!(fut1.is_some(), "First spawn should give us a task to spawn"); + for _ in 0..9 { + let fut = state.spawn(1, inc()).await; + assert!( + fut.is_none(), + "Subsequent spawns should assume first task has been spawned" + ); + } + assert_eq!( + active.load(Ordering::SeqCst), + 0, + "Active should have never been called, since we did not spawn the task!" + ); + assert_eq!(busy.load(Ordering::SeqCst), 2, "Busy should have been called two times, since we sent 10 msgs to a queue with capacity 8, and nobody is draining it"); + Ok(()) + } + + /// If there is a panic in any of the fns that run on an entity actor, + /// the entire entity becomes dead. This can not be recovered from, and + /// trying to spawn a new task on the dead entity actor will result in + /// a SpawnArg::Dead. + #[tokio::test] + async fn test_dead() -> TestResult<()> { + let manager = EntityManager::::new( + Arc::new(Mutex::new(Global::default())), + Options::default(), + ); + let (tx, rx) = oneshot::channel(); + let killer = |arg: SpawnArg| async move { + if let SpawnArg::Active(_) = arg { + tx.send(()).ok(); + panic!("Panic to kill the task"); + } + }; + // spawn a task that kills the entity actor + manager.spawn(1, killer).await?; + rx.await.expect("Failed to receive kill confirmation"); + let (tx, rx) = oneshot::channel(); + let counter = |arg: SpawnArg| async move { + if let SpawnArg::Dead = arg { + tx.send(()).ok(); + } + }; + // // spawn another task on the - now dead - entity actor + manager.spawn(1, counter).await?; + rx.await.expect("Failed to receive dead confirmation"); + Ok(()) + } + } + + mod fs { + //! The fs db uses one file per counter, stored as a 16-byte big-endian u128. + use std::{ + collections::HashSet, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + time::Instant, + }; + + use atomic_refcell::AtomicRefCell; + + use super::*; + + #[derive(Debug, Clone, Default)] + struct State { + value: Option, + tasks: HashSet, + } + + #[derive(Debug)] + struct Global { + path: PathBuf, + log: HashMap>, + } + + #[derive(Debug, Clone, Default)] + struct EntityState(Arc>); + + impl Reset for EntityState { + fn reset(&mut self) { + *self.0.borrow_mut() = Default::default(); + } + + fn ref_count(&self) -> usize { + 1 + } + } + + fn get_path(root: impl AsRef, id: u64) -> PathBuf { + root.as_ref().join(hex::encode(id.to_be_bytes())) + } + + impl entity_actor::State { + async fn with_value(&self, f: impl FnOnce(&mut u128)) -> Result<(), &'static str> { + let Ok(mut r) = self.state.0.try_borrow_mut() else { + panic!("failed to borrow state mutably"); + }; + if r.value.is_none() { + let mut global = self.global.lock().unwrap(); + global + .log + .entry(self.id) + .or_default() + .push((Event::Wakeup, Instant::now())); + let path = get_path(&global.path, self.id); + // note: if we were to use async IO, we would need to make sure not to hold the + // lock guard over an await point. The entity manager makes sure that all fns + // are run on the same tokio task, but there is still concurrency, which + // a mutable borrow of the state does not allow. + let value = match std::fs::read(path) { + Ok(value) => value, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // If the file does not exist, we initialize it to 0. + vec![0; 16] + } + Err(_) => return Err("Failed to read disk state"), + }; + let value = u128::from_be_bytes( + value.try_into().map_err(|_| "Invalid disk state format")?, + ); + r.value = Some(value); + } + let Some(value) = r.value.as_mut() else { + panic!("State must be Memory at this point"); + }; + f(value); + Ok(()) + } + } + + struct Counters; + impl Params for Counters { + type EntityId = u64; + type GlobalState = Arc>; + type EntityState = EntityState; + async fn on_shutdown(state: entity_actor::State, _cause: ShutdownCause) { + let r = state.state.0.borrow(); + let mut global = state.global.lock().unwrap(); + if let Some(value) = r.value { + let path = get_path(&global.path, state.id); + let value_bytes = value.to_be_bytes(); + std::fs::write(&path, value_bytes).expect("Failed to write disk state"); + } + global + .log + .entry(state.id) + .or_default() + .push((Event::Shutdown, Instant::now())); + } + } + + pub struct FsDb { + global: Arc>, + m: EntityManager, + } + + impl FsDb { + pub fn new(path: impl AsRef) -> Self { + let global = Global { + path: path.as_ref().to_owned(), + log: HashMap::new(), + }; + let global = Arc::new(Mutex::new(global)); + Self { + global: global.clone(), + m: EntityManager::::new(global, Options::default()), + } + } + } + + impl super::CounterDb for FsDb { + async fn add(&self, id: u64, value: u128) -> Result<(), &'static str> { + self.m + .spawn(id, move |arg| async move { + match arg { + SpawnArg::Active(state) => { + println!( + "Adding value {} to entity actor with id {:?}", + value, state.id + ); + state + .with_value(|v| *v = v.wrapping_add(value)) + .await + .unwrap(); + } + SpawnArg::Busy => println!("Entity actor is busy"), + SpawnArg::Dead => println!("Entity actor is dead"), + } + }) + .await + } + + async fn get(&self, id: u64) -> Result { + let (tx, rx) = oneshot::channel(); + self.m + .spawn(id, move |arg| async move { + match arg { + SpawnArg::Active(state) => { + state + .with_value(|v| { + tx.send(*v) + .unwrap_or_else(|_| println!("Failed to send value")) + }) + .await + .unwrap(); + } + SpawnArg::Busy => println!("Entity actor is busy"), + SpawnArg::Dead => println!("Entity actor is dead"), + } + }) + .await?; + rx.await.map_err(|_| "Failed to receive value in get") + } + + async fn shutdown(&self) -> Result<(), &'static str> { + self.m.shutdown().await + } + + async fn check_consistency(&self, values: HashMap) { + let global = self.global.lock().unwrap(); + for (id, value) in &values { + let path = get_path(&global.path, *id); + let disk_value = match std::fs::read(path) { + Ok(data) => u128::from_be_bytes(data.try_into().unwrap()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0, + Err(_) => panic!("Failed to read disk state for id {id}"), + }; + assert_eq!(disk_value, *value, "Disk value mismatch for id {id}"); + } + for id in values.keys() { + let log = global.log.get(id).unwrap(); + assert!( + log.len() % 2 == 0, + "Log must contain alternating wakeup and shutdown events" + ); + for (i, (event, _)) in log.iter().enumerate() { + assert_eq!( + *event, + if i % 2 == 0 { + Event::Wakeup + } else { + Event::Shutdown + }, + "Unexpected event type" + ); + } + } + } + } + } + + async fn test_random( + db: impl CounterDb, + entries: &[(u64, u128)], + ) -> testresult::TestResult<()> { + // compute the expected values + let mut reference = HashMap::new(); + for (id, value) in entries { + let v: &mut u128 = reference.entry(*id).or_default(); + *v = v.wrapping_add(*value); + } + // do the same computation using the database, and some concurrency + // and parallelism (we will get parallelism if we are using a multi-threaded runtime). + let mut errors = Vec::new(); + n0_future::stream::iter(entries) + .map(|(id, value)| db.add(*id, *value)) + .buffered_unordered(16) + .for_each(|result| { + if let Err(e) = result { + errors.push(e); + } + }) + .await; + assert!(errors.is_empty(), "Failed to add some entries: {errors:?}"); + // check that the db contains the expected values + let ids = reference.keys().copied().collect::>(); + for id in &ids { + let res = db.get(*id).await?; + assert_eq!(res, reference.get(id).copied().unwrap_or_default()); + } + db.shutdown().await?; + // check that the db is consistent with the reference + db.check_consistency(reference).await; + Ok(()) + } + + #[test_strategy::proptest] + fn test_counters_manager_proptest_mem(entries: Vec<(u64, u128)>) { + let rt = tokio::runtime::Builder::new_multi_thread() + .build() + .expect("Failed to create tokio runtime"); + rt.block_on(async move { + let db = mem::MemDb::new(); + test_random(db, &entries).await + }) + .expect("Test failed"); + } + + #[test_strategy::proptest] + fn test_counters_manager_proptest_fs(entries: Vec<(u64, u128)>) { + let dir = tempfile::tempdir().unwrap(); + let rt = tokio::runtime::Builder::new_multi_thread() + .build() + .expect("Failed to create tokio runtime"); + rt.block_on(async move { + let db = fs::FsDb::new(dir.path()); + test_random(db, &entries).await + }) + .expect("Test failed"); + } +} diff --git a/src/store/mem.rs b/src/store/mem.rs index 083e95f2..6d022e0f 100644 --- a/src/store/mem.rs +++ b/src/store/mem.rs @@ -51,7 +51,7 @@ use crate::{ ImportByteStreamMsg, ImportByteStreamUpdate, ImportBytesMsg, ImportBytesRequest, ImportPathMsg, ImportPathRequest, ListBlobsMsg, ListTagsMsg, ListTagsRequest, ObserveMsg, ObserveRequest, RenameTagMsg, RenameTagRequest, Scope, SetTagMsg, - SetTagRequest, ShutdownMsg, SyncDbMsg, + SetTagRequest, ShutdownMsg, SyncDbMsg, WaitIdleMsg, }, tags::TagInfo, ApiClient, @@ -122,6 +122,7 @@ impl MemStore { options: Arc::new(Options::default()), temp_tags: Default::default(), protected: Default::default(), + idle_waiters: Default::default(), } .run(), ); @@ -137,6 +138,8 @@ struct Actor { options: Arc, // temp tags temp_tags: TempTags, + // idle waiters + idle_waiters: Vec>, protected: HashSet, } @@ -162,6 +165,16 @@ impl Actor { let entry = self.get_or_create_entry(hash); self.spawn(import_bao(entry, size, data, tx)); } + Command::WaitIdle(WaitIdleMsg { tx, .. }) => { + trace!("wait idle"); + if self.tasks.is_empty() { + // we are currently idle + tx.send(()).await.ok(); + } else { + // wait for idle state + self.idle_waiters.push(tx); + } + } Command::Observe(ObserveMsg { inner: ObserveRequest { hash }, tx, @@ -485,6 +498,12 @@ impl Actor { } TaskResult::Unit(_) => {} } + if self.tasks.is_empty() { + // we are idle now + for tx in self.idle_waiters.drain(..) { + tx.send(()).await.ok(); + } + } } } }; diff --git a/src/store/readonly_mem.rs b/src/store/readonly_mem.rs index 55ef3693..42274b2e 100644 --- a/src/store/readonly_mem.rs +++ b/src/store/readonly_mem.rs @@ -37,7 +37,7 @@ use crate::{ self, BlobStatus, Command, ExportBaoMsg, ExportBaoRequest, ExportPathMsg, ExportPathRequest, ExportRangesItem, ExportRangesMsg, ExportRangesRequest, ImportBaoMsg, ImportByteStreamMsg, ImportBytesMsg, ImportPathMsg, ObserveMsg, - ObserveRequest, + ObserveRequest, WaitIdleMsg, }, ApiClient, TempTag, }, @@ -62,6 +62,7 @@ impl Deref for ReadonlyMemStore { struct Actor { commands: tokio::sync::mpsc::Receiver, tasks: JoinSet<()>, + idle_waiters: Vec>, data: HashMap, } @@ -74,6 +75,7 @@ impl Actor { data, commands, tasks: JoinSet::new(), + idle_waiters: Vec::new(), } } @@ -86,6 +88,15 @@ impl Actor { .await .ok(); } + Command::WaitIdle(WaitIdleMsg { tx, .. }) => { + if self.tasks.is_empty() { + // we are currently idle + tx.send(()).await.ok(); + } else { + // wait for idle state + self.idle_waiters.push(tx); + } + } Command::ImportBytes(ImportBytesMsg { tx, .. }) => { tx.send(io::Error::other("import not supported").into()) .await @@ -226,6 +237,12 @@ impl Actor { }, Some(res) = self.tasks.join_next(), if !self.tasks.is_empty() => { self.log_unit_task(res); + if self.tasks.is_empty() { + // we are idle now + for tx in self.idle_waiters.drain(..) { + tx.send(()).await.ok(); + } + } }, else => break, } diff --git a/src/util.rs b/src/util.rs index e1c30921..7b9ad4e6 100644 --- a/src/util.rs +++ b/src/util.rs @@ -472,6 +472,7 @@ pub mod sink { } } + #[allow(dead_code)] pub struct IrpcSenderSink(pub irpc::channel::mpsc::Sender); impl Sink for IrpcSenderSink