diff --git a/libparsec/crates/client/src/workspace/store/cache.rs b/libparsec/crates/client/src/workspace/store/cache.rs index 01d5d066175..73f97e4ebab 100644 --- a/libparsec/crates/client/src/workspace/store/cache.rs +++ b/libparsec/crates/client/src/workspace/store/cache.rs @@ -193,17 +193,20 @@ pub(super) async fn populate_cache_from_local_storage( ) -> Result { // 1) Local storage lookup - let maybe_found = { - let mut maybe_storage = store.storage.lock().await; - let storage = match &mut *maybe_storage { - None => return Err(PopulateCacheFromLocalStorageError::Stopped), - Some(storage) => storage, - }; - - storage.get_manifest(entry_id).await.map_err(|err| { - PopulateCacheFromLocalStorageError::Internal(err.context("cannot access local storage")) - })? - }; + let maybe_found = store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| PopulateCacheFromLocalStorageError::Stopped)?; + + storage.get_manifest(entry_id).await.map_err(|err| { + PopulateCacheFromLocalStorageError::Internal( + err.context("cannot access local storage"), + ) + }) + }) + .await?; let manifest = match maybe_found { Some(encrypted) => { @@ -230,8 +233,9 @@ pub(super) async fn populate_cache_from_local_storage( // 2) We got our manifest, don't forget to update the cache before returning it - let mut cache = store.current_view_cache.lock().expect("Mutex is poisoned"); - let manifest = cache.manifests.insert_if_missing(manifest); + let manifest = store + .data + .with_current_view_cache(|cache| cache.manifests.insert_if_missing(manifest)); Ok(manifest) } @@ -368,21 +372,23 @@ pub(super) async fn populate_cache_from_local_storage_or_server( encrypted: manifest.dump_and_encrypt(&store.device.local_symkey), }, }; - let outcome = { - let mut maybe_storage = store.storage.lock().await; - let storage = match &mut *maybe_storage { - None => return Err(PopulateCacheFromLocalStorageOrServerError::Stopped), - Some(storage) => storage, - }; - storage - .populate_manifest(&update_data) - .await - .map_err(|err| { - PopulateCacheFromLocalStorageOrServerError::Internal( - err.context("cannot populate local storage with manifest"), - ) - })? - }; + let outcome = store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| PopulateCacheFromLocalStorageOrServerError::Stopped)?; + + storage + .populate_manifest(&update_data) + .await + .map_err(|err| { + PopulateCacheFromLocalStorageOrServerError::Internal( + err.context("cannot populate local storage with manifest"), + ) + }) + }) + .await?; match outcome { PopulateManifestOutcome::Stored => break manifest, // A concurrent operation has populated the local storage ! @@ -424,8 +430,9 @@ pub(super) async fn populate_cache_from_local_storage_or_server( // 4) Also update the cache - let mut cache = store.current_view_cache.lock().expect("Mutex is poisoned"); - let manifest = cache.manifests.insert_if_missing(manifest); + let manifest = store + .data + .with_current_view_cache(|cache| cache.manifests.insert_if_missing(manifest)); Ok(manifest) } diff --git a/libparsec/crates/client/src/workspace/store/file_updater.rs b/libparsec/crates/client/src/workspace/store/file_updater.rs index c8553e65a00..70ed352b3be 100644 --- a/libparsec/crates/client/src/workspace/store/file_updater.rs +++ b/libparsec/crates/client/src/workspace/store/file_updater.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use libparsec_platform_async::event::EventListener; use libparsec_types::prelude::*; use crate::certif::{InvalidCertificateError, InvalidKeysBundleError, InvalidManifestError}; @@ -49,8 +50,9 @@ pub(super) async fn for_update_file( // Guard's drop will panic if the lock is not released macro_rules! release_guard_on_error { ($update_guard:expr) => { - let mut cache_guard = store.current_view_cache.lock().expect("Mutex is poisoned"); - cache_guard.lock_update_manifests.release($update_guard); + store.data.with_current_view_cache(|cache| { + cache.lock_update_manifests.release($update_guard); + }); }; } @@ -62,36 +64,49 @@ pub(super) async fn for_update_file( listener.await; } - let update_guard = { - let mut cache_guard = store.current_view_cache.lock().expect("Mutex is poisoned"); - - // 1) Lock for update + // 1) Lock for update - let outcome = cache_guard.lock_update_manifests.take(entry_id); - match outcome { + enum LockForUpdateOutcome { + WaitAndRetryStep1(EventListener), + // There is no GoToStep2 because it's the dispatching step + GoToStep3(ManifestUpdateLockGuard), + GoToStep4((ManifestUpdateLockGuard, ArcLocalChildManifest)), + } + let outcome = store.data.with_current_view_cache(|cache| { + match cache.lock_update_manifests.take(entry_id) { ManifestUpdateLockTakeOutcome::Taken(update_guard) => { // 2) Cache lookup for entry... - let found = cache_guard.manifests.get(&entry_id); + let found = cache.manifests.get(&entry_id); if let Some(manifest) = found { // Cache hit ! We go to step 4. - break (update_guard, manifest.clone()); + LockForUpdateOutcome::GoToStep4((update_guard, manifest.clone())) + } else { + // The entry is not in cache, go to step 3 for a lookup in the local storage. + // Note we keep the update lock: this has no impact on read operation, and + // any other write operation taking the lock will have no choice but to try + // to populate the cache just like we are going to do. + LockForUpdateOutcome::GoToStep3(update_guard) } - // The entry is not in cache, go to step 3 for a lookup in the local storage. - // Note we keep the update lock: this has no impact on read operation, and - // any other write operation taking the lock will have no choice but to try - // to populate the cache just like we are going to do. - update_guard } ManifestUpdateLockTakeOutcome::NeedWait(listener) => { - if !wait { - return Err(ForUpdateFileError::WouldBlock); - } - maybe_need_wait = Some(listener); - continue; + LockForUpdateOutcome::WaitAndRetryStep1(listener) } } + }); + let update_guard = match outcome { + LockForUpdateOutcome::GoToStep3(update_guard) => update_guard, + LockForUpdateOutcome::GoToStep4((update_guard, manifest)) => { + break (update_guard, manifest) + } + LockForUpdateOutcome::WaitAndRetryStep1(listener) => { + if !wait { + return Err(ForUpdateFileError::WouldBlock); + } + maybe_need_wait = Some(listener); + continue; + } }; // Be careful here: `update_guard` must be manually released in case of error ! @@ -143,7 +158,7 @@ pub(super) async fn for_update_file( }; let updater = FileUpdater { - _update_guard: update_guard, + update_guard, #[cfg(debug_assertions)] entry_id: manifest.base.id, }; @@ -159,7 +174,7 @@ pub(super) async fn for_update_file( /// keep hold on the store. #[derive(Debug)] pub(crate) struct FileUpdater { - _update_guard: ManifestUpdateLockGuard, + update_guard: ManifestUpdateLockGuard, #[cfg(debug_assertions)] entry_id: VlobID, } @@ -176,39 +191,41 @@ impl FileUpdater { #[cfg(debug_assertions)] assert_eq!(manifest.base.id, self.entry_id); - let mut storage_guard = store.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| UpdateFileManifestAndContinueError::Stopped)?; - let update_data = UpdateManifestData { entry_id: manifest.base.id, base_version: manifest.base.version, need_sync: manifest.need_sync, encrypted: manifest.dump_and_encrypt(&store.device.local_symkey), }; - let new_chunks = new_chunks .map(|(chunk_id, cleartext)| (chunk_id, store.device.local_symkey.encrypt(cleartext))); - storage - .update_manifest_and_chunks(&update_data, new_chunks, removed_chunks) + store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateFileManifestAndContinueError::Stopped)?; + + storage + .update_manifest_and_chunks(&update_data, new_chunks, removed_chunks) + .await + .map_err(UpdateFileManifestAndContinueError::Internal) + }) .await?; // Finally update cache - let mut cache = store.current_view_cache.lock().expect("Mutex is poisoned"); - cache - .manifests - .insert(ArcLocalChildManifest::File(manifest)); + store.data.with_current_view_cache(|cache| { + cache + .manifests + .insert(ArcLocalChildManifest::File(manifest)); + }); Ok(()) } pub fn close(self, store: &super::WorkspaceStore) { - store - .current_view_cache - .lock() - .expect("Mutex is poisoned") - .lock_update_manifests - .release(self._update_guard); + store.data.with_current_view_cache(|cache| { + cache.lock_update_manifests.release(self.update_guard); + }); } } diff --git a/libparsec/crates/client/src/workspace/store/folder_updater.rs b/libparsec/crates/client/src/workspace/store/folder_updater.rs index 378077408f7..2fa3f90bcbe 100644 --- a/libparsec/crates/client/src/workspace/store/folder_updater.rs +++ b/libparsec/crates/client/src/workspace/store/folder_updater.rs @@ -2,6 +2,7 @@ use std::sync::Arc; +use libparsec_platform_async::event::EventListener; use libparsec_types::prelude::*; use crate::certif::{InvalidCertificateError, InvalidKeysBundleError, InvalidManifestError}; @@ -47,8 +48,9 @@ pub(super) async fn for_update_folder( // Guard's drop will panic if the lock is not released macro_rules! release_guard_on_error { ($update_guard:expr) => { - let mut cache_guard = store.current_view_cache.lock().expect("Mutex is poisoned"); - cache_guard.lock_update_manifests.release($update_guard); + store.data.with_current_view_cache(|cache| { + cache.lock_update_manifests.release($update_guard); + }); }; } @@ -60,33 +62,43 @@ pub(super) async fn for_update_folder( listener.await; } - let update_guard = { - let mut cache_guard = store.current_view_cache.lock().expect("Mutex is poisoned"); - - // 1) Lock for update - - let outcome = cache_guard.lock_update_manifests.take(entry_id); - match outcome { + enum LockForUpdateOutcome { + WaitAndRetryStep1(EventListener), + GoToStep3(ManifestUpdateLockGuard), + GoToStep4((ManifestUpdateLockGuard, ArcLocalChildManifest)), + } + let outcome = store.data.with_current_view_cache(|cache| { + match cache.lock_update_manifests.take(entry_id) { ManifestUpdateLockTakeOutcome::Taken(update_guard) => { // 2) Cache lookup for entry... - let found = cache_guard.manifests.get(&entry_id); + let found = cache.manifests.get(&entry_id); if let Some(manifest) = found { // Cache hit ! We go to step 4. - break (update_guard, manifest.clone()); + LockForUpdateOutcome::GoToStep4((update_guard, manifest.clone())) + } else { + // The entry is not in cache, go to step 3 for a lookup in the local storage. + // Note we keep the update lock: this has no impact on read operation, and + // any other write operation taking the lock will have no choice but to try + // to populate the cache just like we are going to do. + LockForUpdateOutcome::GoToStep3(update_guard) } - // The entry is not in cache, go to step 3 for a lookup in the local storage. - // Note we keep the update lock: this has no impact on read operation, and - // any other write operation taking the lock will have no choice but to try - // to populate the cache just like we are going to do. - update_guard } ManifestUpdateLockTakeOutcome::NeedWait(listener) => { - maybe_need_wait = Some(listener); - continue; + LockForUpdateOutcome::WaitAndRetryStep1(listener) } } + }); + let update_guard = match outcome { + LockForUpdateOutcome::GoToStep3(update_guard) => update_guard, + LockForUpdateOutcome::GoToStep4((update_guard, manifest)) => { + break (update_guard, manifest) + } + LockForUpdateOutcome::WaitAndRetryStep1(listener) => { + maybe_need_wait = Some(listener); + continue; + } }; // Be careful here: `update_guard` must be manually released in case of error ! @@ -223,31 +235,34 @@ impl<'a> FolderUpdater<'a> { } } - let mut storage_guard = self.store.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| UpdateFolderManifestError::Stopped)?; - let update_data = UpdateManifestData { entry_id: manifest.base.id, base_version: manifest.base.version, need_sync: manifest.need_sync, encrypted: manifest.dump_and_encrypt(&self.store.device.local_symkey), }; - match new_child { None => { - storage.update_manifest(&update_data).await?; + self.store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateFolderManifestError::Stopped)?; + + storage + .update_manifest(&update_data) + .await + .map_err(UpdateFolderManifestError::Internal) + }) + .await?; // Finally update cache - let mut cache = self - .store - .current_view_cache - .lock() - .expect("Mutex is poisoned"); - cache - .manifests - .insert(ArcLocalChildManifest::Folder(manifest)); + self.store.data.with_current_view_cache(|cache| { + cache + .manifests + .insert(ArcLocalChildManifest::Folder(manifest)); + }); } Some(new_child) => { @@ -265,20 +280,27 @@ impl<'a> FolderUpdater<'a> { encrypted: new_child.dump_and_encrypt(&self.store.device.local_symkey), }, }; - storage - .update_manifests([update_data, new_child_update_data].into_iter()) + self.store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateFolderManifestError::Stopped)?; + + storage + .update_manifests([update_data, new_child_update_data].into_iter()) + .await + .map_err(UpdateFolderManifestError::Internal) + }) .await?; // Finally update cache - let mut cache = self - .store - .current_view_cache - .lock() - .expect("Mutex is poisoned"); - cache - .manifests - .insert(ArcLocalChildManifest::Folder(manifest)); - cache.manifests.insert(new_child); + self.store.data.with_current_view_cache(|cache| { + cache + .manifests + .insert(ArcLocalChildManifest::Folder(manifest)); + cache.manifests.insert(new_child); + }); } } @@ -289,12 +311,9 @@ impl<'a> FolderUpdater<'a> { impl Drop for FolderUpdater<'_> { fn drop(&mut self) { if let Some(update_guard) = self.update_guard.take() { - self.store - .current_view_cache - .lock() - .expect("Mutex is poisoned") - .lock_update_manifests - .release(update_guard); + self.store.data.with_current_view_cache(|cache| { + cache.lock_update_manifests.release(update_guard); + }); } } } diff --git a/libparsec/crates/client/src/workspace/store/manifest_access.rs b/libparsec/crates/client/src/workspace/store/manifest_access.rs index 25fcc897657..1a5dea35d97 100644 --- a/libparsec/crates/client/src/workspace/store/manifest_access.rs +++ b/libparsec/crates/client/src/workspace/store/manifest_access.rs @@ -33,11 +33,11 @@ pub(super) async fn get_manifest( entry_id: VlobID, ) -> Result { // Fast path: cache lookup - { - let cache = store.current_view_cache.lock().expect("Mutex is poisoned"); - if let Some(manifest) = cache.manifests.get(&entry_id) { - return Ok(manifest.clone()); - } + let maybe_found = store + .data + .with_current_view_cache(|cache| cache.manifests.get(&entry_id).cloned()); + if let Some(manifest) = maybe_found { + return Ok(manifest); } // Entry not in the cache, try to fetch it from the local storage... diff --git a/libparsec/crates/client/src/workspace/store/mod.rs b/libparsec/crates/client/src/workspace/store/mod.rs index 1eec798a652..4908abfa47e 100644 --- a/libparsec/crates/client/src/workspace/store/mod.rs +++ b/libparsec/crates/client/src/workspace/store/mod.rs @@ -16,7 +16,7 @@ use std::{ }; use libparsec_client_connection::AuthenticatedCmds; -use libparsec_platform_async::lock::Mutex as AsyncMutex; +use libparsec_platform_async::{lock::Mutex as AsyncMutex, try_task_id, TaskID}; use libparsec_platform_storage::workspace::{UpdateManifestData, WorkspaceStorage}; use libparsec_types::prelude::*; @@ -107,6 +107,197 @@ pub(super) enum EnsureManifestExistsWithParentError { * WorkspaceStore & friends */ +mod data { + //! This module keeps private the internals of `WorkspaceStoreData` in order to reduce + //! the risk of deadlocks. + //! + //! Indeed, the current view cache and storage (i.e. local database) are both protected + //! by a mutex and used in conjunction, so deadlock may occur due to the wrong order of + //! acquisition or if a lock it kept too long (especially since locks are not reentrant). + //! + //! So to solve this issue we only give access to store and storage through a callback + //! based API: + //! - It better limits and makes explicit the scope of the lock. + //! - It is not possible to acquire the storage lock when the view cache lock is acquired + //! (since the former is protected by an async lock and the latter by a sync lock). + //! - In debug build a deadlock detection mechanism is in place to ensure only a single + //! lock is acquired at a time for any given task. + + use super::*; + + /* + * WorkspaceStoreData + */ + + #[derive(Debug)] + pub(super) struct WorkspaceStoreData { + /// Note cache also contains the update locks. + current_view_cache: Mutex, + /// Given accessing `storage` requires exclusive access, it is better to have it + /// under its own lock so that all cache hit operations can occur concurrently. + storage: AsyncMutex>, + #[cfg(debug_assertions)] + lock_tracking: Mutex>, + } + + impl WorkspaceStoreData { + pub fn new(storage: WorkspaceStorage, root_manifest: LocalWorkspaceManifest) -> Self { + Self { + current_view_cache: Mutex::new(CurrentViewCache::new(Arc::new( + root_manifest.into(), + ))), + storage: AsyncMutex::new(Some(storage)), + #[cfg(debug_assertions)] + lock_tracking: Default::default(), + } + } + + /// Acquire the lock on the current view cache. + /// + /// There is two rules when acquiring data lock: + /// - A task should never lock both current view cache and storage at the same time. + /// - You should do the minimum amount of work inside the closure (e.g. avoid + /// doing encryption). + pub fn with_current_view_cache(&self, cb: impl FnOnce(&mut CurrentViewCache) -> T) -> T { + #[cfg(debug_assertions)] + let _lock_tracker_guard = + DataLockTrackerGuard::register(&self.lock_tracking, DataLock::Cache); + + let mut guard = self.current_view_cache.lock().expect("Mutex is poisoned !"); + cb(&mut guard) + } + + /// Acquire the lock on the storage. + /// + /// There is two rules when acquiring data lock: + /// - A task should never lock both current view cache and storage at the same time. + /// - You should do the minimum amount of work inside the closure (e.g. avoid + /// doing encryption). + pub async fn with_storage( + &self, + cb: impl FnOnce(&'static mut Option) -> Fut, + ) -> T + where + Fut: std::future::Future, + { + let mut guard = self.storage.lock().await; + let storage_mut_ref = &mut *guard; + + unsafe fn pretend_static( + src: &mut Option, + ) -> &'static mut Option { + std::mem::transmute(src) + } + // SAFETY: It is not currently possible to express the fact the lifetime + // of a Future returned by a closure depends on the closure parameter if + // they are references. + // Here things are even worst because we have references coming from + // `for_write` body and from `cb` closure (so workarounds as boxed future + // don't work). + // However in practice all our references have a lifetime bound to the + // parent (i.e. `for_write`) or the grand-parent (i.e. + // `CertificateOps::add_certificates_batch`) which are going to poll this + // future directly, so the references' lifetimes *are* long enough. + // TODO: Remove this once async closure are available + let static_storage_mut_ref = unsafe { pretend_static(storage_mut_ref) }; + + #[cfg(debug_assertions)] + let _lock_tracker_guard = + DataLockTrackerGuard::register(&self.lock_tracking, DataLock::Storage); + + let fut = cb(static_storage_mut_ref); + fut.await + } + } + + /* + * Deadlock detection stuff + */ + + #[cfg(debug_assertions)] + #[derive(Debug)] + enum DataLock { + Cache, + Storage, + } + + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + enum RunningFutureID { + /// The future is being executed as a task (i.e. `tokio::spawn()`). + AsTask(TaskID), + /// The future is being executed as-is by a thread (i.e. `block_on()`). + FromBlockOn(std::thread::ThreadId), + } + + #[cfg(debug_assertions)] + struct DataLockTrackerGuard<'a> { + running_future_id: RunningFutureID, + lock_tracking: &'a Mutex>, + } + + impl<'a> DataLockTrackerGuard<'a> { + fn register( + lock_tracking: &'a Mutex>, + lock: DataLock, + ) -> Self { + // Task ID is only available when within a task (i.e. a spawned future) ! + // + // Put it another way, `try_task_id()` will return `None` if: + // a) It is called from a synchronous context (e.g. the main function). + // b) It is called from an asynchronous context that is not a task. + // + // In practice a) is not a problem since this code is deep within asynchronous + // land, however b) is very common since it occurs whenever Tokio's `block_on` + // is used, i.e.: + // b1) Functions decorated by `tokio::main` (using `tokio::Runtime::block_on` under the hood). + // b2) Functions decorated by `tokio::test` (using `tokio::Runtime::block_on` under the hood). + // b3) Non Tokio threads calling `tokio::Handle::block_on`. + // + // So, in a nutshell, we are going to end up with `None` as task ID in any + // code coming from: + // - Our CLI main functions (due to b1). + // - Our test functions (due to b2). + // - Mountpoint operations on Windows (due to b3). This is because WinFSP + // schedules them on a pool of threads with a synchronous API. + // + // And now the final twist: whenever task ID is `None`, it means we are in a + // thread doing a `block_on` on a future. This means we can simply use the ID + // of the thread to identify this running future \o/ + let id = { + match try_task_id() { + Some(task_id) => RunningFutureID::AsTask(task_id), + None => RunningFutureID::FromBlockOn(std::thread::current().id()), + } + }; + + let mut guard = lock_tracking.lock().expect("Mutex is poisoned !"); + for (candidate_id, candidate_lock) in guard.iter() { + if *candidate_id == id { + panic!("Running future {:?} is trying to acquire {:?} lock while holding another {:?} lock !", id, lock, *candidate_lock); + } + } + guard.push((id, lock)); + + Self { + running_future_id: id, + lock_tracking, + } + } + } + + #[cfg(debug_assertions)] + impl Drop for DataLockTrackerGuard<'_> { + fn drop(&mut self) { + let mut guard = self.lock_tracking.lock().expect("Mutex is poisoned !"); + let index = guard + .iter() + .position(|(running_future_id, _)| *running_future_id == self.running_future_id) + .expect("Running future not found !"); + guard.swap_remove(index); + } + } +} + #[derive(Debug)] pub(super) struct WorkspaceStore { realm_id: VlobID, @@ -114,11 +305,7 @@ pub(super) struct WorkspaceStore { cmds: Arc, certificates_ops: Arc, - /// Note cache also contains the update locks. - current_view_cache: Mutex, - /// Given accessing `storage` requires exclusive access, it is better to have it - /// under its own lock so that all cache hit operations can occur concurrently. - storage: AsyncMutex>, + data: data::WorkspaceStoreData, prevent_sync_pattern: PreventSyncPattern, } @@ -192,17 +379,21 @@ impl WorkspaceStore { device, cmds, certificates_ops, - current_view_cache: Mutex::new(CurrentViewCache::new(Arc::new(root_manifest.into()))), - storage: AsyncMutex::new(Some(storage)), + data: data::WorkspaceStoreData::new(storage, root_manifest), prevent_sync_pattern: prevent_sync_pattern.clone(), }) } pub async fn stop(&self) -> anyhow::Result<()> { - let maybe_storage = self.storage.lock().await.take(); - if let Some(storage) = maybe_storage { - storage.stop().await?; - } + self.data + .with_storage(|maybe_storage| async move { + if let Some(storage) = maybe_storage.take() { + storage.stop().await + } else { + Ok(()) + } + }) + .await?; Ok(()) } @@ -221,12 +412,8 @@ impl WorkspaceStore { } pub fn get_root_manifest(&self) -> Arc { - self.current_view_cache - .lock() - .expect("Mutex is poisoned") - .manifests - .root_manifest() - .clone() + self.data + .with_current_view_cache(|cache| cache.manifests.root_manifest().clone()) } pub async fn get_manifest( @@ -376,37 +563,40 @@ impl WorkspaceStore { } pub async fn is_entry_locked(&self, entry_id: VlobID) -> bool { - let cache_guard = self.current_view_cache.lock().expect("Mutex is poisoned"); - cache_guard.lock_update_manifests.is_taken(entry_id) + self.data + .with_current_view_cache(|cache| cache.lock_update_manifests.is_taken(entry_id)) } pub async fn get_chunk_or_block_local_only( &self, chunk_view: &ChunkView, ) -> Result { - { - let cache = self.current_view_cache.lock().expect("Mutex is poisoned"); - let found = cache.chunks.get(&chunk_view.id); - if let Some(data) = found { - return Ok(data); - } + let found = self + .data + .with_current_view_cache(|cache| cache.chunks.get(&chunk_view.id)); + if let Some(data) = found { + return Ok(data); } // Cache miss ! Try to fetch from the local storage - let mut maybe_storage = self.storage.lock().await; - let storage = match &mut *maybe_storage { - None => return Err(ReadChunkOrBlockLocalOnlyError::Stopped), - Some(storage) => storage, - }; - - let mut maybe_encrypted = storage.get_chunk(chunk_view.id).await?; + let maybe_encrypted = self + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| ReadChunkOrBlockLocalOnlyError::Stopped)?; + + let mut maybe_encrypted = storage.get_chunk(chunk_view.id).await?; + if maybe_encrypted.is_none() { + maybe_encrypted = storage + .get_block(chunk_view.id.into(), self.device.now()) + .await?; + } - if maybe_encrypted.is_none() { - maybe_encrypted = storage - .get_block(chunk_view.id.into(), self.device.now()) - .await?; - } + Result::<_, ReadChunkOrBlockLocalOnlyError>::Ok(maybe_encrypted) + }) + .await?; if let Some(encrypted) = maybe_encrypted { let data: Bytes = self @@ -418,8 +608,9 @@ impl WorkspaceStore { // Don't forget to update the cache ! - let mut cache = self.current_view_cache.lock().expect("Mutex is poisoned"); - cache.chunks.push(chunk_view.id, data.clone()); + self.data.with_current_view_cache(|cache| { + cache.chunks.push(chunk_view.id, data.clone()); + }); return Ok(data); } @@ -482,20 +673,25 @@ impl WorkspaceStore { // Should both store the data in local storage... - let mut maybe_storage = self.storage.lock().await; - let storage = match &mut *maybe_storage { - None => return Err(ReadChunkOrBlockError::Stopped), - Some(storage) => storage, - }; let encrypted = self.device.local_symkey.encrypt(&data); - storage - .set_block(access.id, &encrypted, self.device.now()) + self.data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| ReadChunkOrBlockError::Stopped)?; + + storage + .set_block(access.id, &encrypted, self.device.now()) + .await + .map_err(ReadChunkOrBlockError::Internal) + }) .await?; // ...and update the cache ! - let mut cache = self.current_view_cache.lock().expect("Mutex is poisoned"); - cache.chunks.push(chunk_view.id, data.clone()); + self.data.with_current_view_cache(|cache| { + cache.chunks.push(chunk_view.id, data.clone()); + }); Ok(data) } @@ -506,16 +702,19 @@ impl WorkspaceStore { ) -> Result, GetNotUploadedChunkError> { // Don't use the in-memory cache given it doesn't tell if the data is from and // uploaded block or not. - let mut maybe_storage = self.storage.lock().await; - let storage = match &mut *maybe_storage { - None => return Err(GetNotUploadedChunkError::Stopped), - Some(storage) => storage, - }; - - let maybe_encrypted = storage - .get_chunk(chunk_id) - .await - .map_err(GetNotUploadedChunkError::Internal)?; + let maybe_encrypted = self + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| GetNotUploadedChunkError::Stopped)?; + + storage + .get_chunk(chunk_id) + .await + .map_err(GetNotUploadedChunkError::Internal) + }) + .await?; match maybe_encrypted { None => Ok(None), @@ -537,53 +736,69 @@ impl WorkspaceStore { &self, chunk_id: ChunkID, ) -> Result<(), PromoteLocalOnlyChunkToUploadedBlockError> { - let mut storage_guard = self.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| PromoteLocalOnlyChunkToUploadedBlockError::Stopped)?; - storage - .promote_chunk_to_block(chunk_id, self.device.now()) + self.data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| PromoteLocalOnlyChunkToUploadedBlockError::Stopped)?; + + storage + .promote_chunk_to_block(chunk_id, self.device.now()) + .await + .map_err(PromoteLocalOnlyChunkToUploadedBlockError::Internal) + }) .await - .map_err(PromoteLocalOnlyChunkToUploadedBlockError::Internal) } pub async fn get_inbound_need_sync_entries( &self, limit: u32, ) -> Result, GetNeedSyncEntriesError> { - let mut storage_guard = self.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| GetNeedSyncEntriesError::Stopped)?; - storage - .get_inbound_need_sync(limit) + self.data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| GetNeedSyncEntriesError::Stopped)?; + + storage + .get_inbound_need_sync(limit) + .await + .map_err(GetNeedSyncEntriesError::Internal) + }) .await - .map_err(GetNeedSyncEntriesError::Internal) } pub async fn get_outbound_need_sync_entries( &self, limit: u32, ) -> Result, GetNeedSyncEntriesError> { - let mut storage_guard = self.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| GetNeedSyncEntriesError::Stopped)?; - storage - .get_outbound_need_sync(limit) + self.data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| GetNeedSyncEntriesError::Stopped)?; + + storage + .get_outbound_need_sync(limit) + .await + .map_err(GetNeedSyncEntriesError::Internal) + }) .await - .map_err(GetNeedSyncEntriesError::Internal) } pub async fn get_realm_checkpoint(&self) -> Result { - let mut storage_guard = self.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| GetRealmCheckpointError::Stopped)?; - storage - .get_realm_checkpoint() + self.data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| GetRealmCheckpointError::Stopped)?; + + storage + .get_realm_checkpoint() + .await + .map_err(GetRealmCheckpointError::Internal) + }) .await - .map_err(GetRealmCheckpointError::Internal) } pub async fn update_realm_checkpoint( @@ -591,13 +806,17 @@ impl WorkspaceStore { current_checkpoint: IndexInt, changes: &[(VlobID, VersionInt)], ) -> Result<(), UpdateRealmCheckpointError> { - let mut storage_guard = self.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| UpdateRealmCheckpointError::Stopped)?; - storage - .update_realm_checkpoint(current_checkpoint, changes) + self.data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateRealmCheckpointError::Stopped)?; + + storage + .update_realm_checkpoint(current_checkpoint, changes) + .await + .map_err(UpdateRealmCheckpointError::Internal) + }) .await - .map_err(UpdateRealmCheckpointError::Internal) } } diff --git a/libparsec/crates/client/src/workspace/store/reparent_updater.rs b/libparsec/crates/client/src/workspace/store/reparent_updater.rs index 8033cc91c2f..5f420b6821f 100644 --- a/libparsec/crates/client/src/workspace/store/reparent_updater.rs +++ b/libparsec/crates/client/src/workspace/store/reparent_updater.rs @@ -75,17 +75,14 @@ struct ReparentingUpdaterGuards<'a> { impl Drop for ReparentingUpdaterGuards<'_> { fn drop(&mut self) { if let Some((guard1, guard2, guard3, guard4)) = self.update_guards.take() { - let mut cache = self - .store - .current_view_cache - .lock() - .expect("Mutex is poisoned"); - cache.lock_update_manifests.release(guard1); - cache.lock_update_manifests.release(guard2); - cache.lock_update_manifests.release(guard3); - if let Some(guard4) = guard4 { - cache.lock_update_manifests.release(guard4); - } + self.store.data.with_current_view_cache(move |cache| { + cache.lock_update_manifests.release(guard1); + cache.lock_update_manifests.release(guard2); + cache.lock_update_manifests.release(guard3); + if let Some(guard4) = guard4 { + cache.lock_update_manifests.release(guard4); + } + }); } } } @@ -103,11 +100,6 @@ impl ReparentingUpdater<'_> { } = self; let store = guards.store; - let mut storage_guard = store.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| UpdateManifestsForReparentingError::Stopped)?; - // 1) Insert in database let src_parent_update_data = UpdateManifestData { @@ -140,15 +132,25 @@ impl ReparentingUpdater<'_> { match &dst_child_manifest { // 3 manifests to update None => { - storage - .update_manifests( - [ - src_parent_update_data, - src_child_update_data, - dst_parent_update_data, - ] - .into_iter(), - ) + store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateManifestsForReparentingError::Stopped)?; + + storage + .update_manifests( + [ + src_parent_update_data, + src_child_update_data, + dst_parent_update_data, + ] + .into_iter(), + ) + .await + .map_err(UpdateManifestsForReparentingError::Internal) + }) .await?; } @@ -169,34 +171,44 @@ impl ReparentingUpdater<'_> { }, }; - storage - .update_manifests( - [ - src_parent_update_data, - src_child_update_data, - dst_parent_update_data, - dst_child_update_data, - ] - .into_iter(), - ) + store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateManifestsForReparentingError::Stopped)?; + + storage + .update_manifests( + [ + src_parent_update_data, + src_child_update_data, + dst_parent_update_data, + dst_child_update_data, + ] + .into_iter(), + ) + .await + .map_err(UpdateManifestsForReparentingError::Internal) + }) .await?; } } // 2) Update the cache - let mut cache = store.current_view_cache.lock().expect("Mutex is poisoned"); - - cache - .manifests - .insert(ArcLocalChildManifest::Folder(src_parent_manifest)); - cache - .manifests - .insert(ArcLocalChildManifest::Folder(dst_parent_manifest)); - cache.manifests.insert(src_child_manifest); - if let Some(dst_child_manifest) = dst_child_manifest { - cache.manifests.insert(dst_child_manifest); - } + store.data.with_current_view_cache(move |cache| { + cache + .manifests + .insert(ArcLocalChildManifest::Folder(src_parent_manifest)); + cache + .manifests + .insert(ArcLocalChildManifest::Folder(dst_parent_manifest)); + cache.manifests.insert(src_child_manifest); + if let Some(dst_child_manifest) = dst_child_manifest { + cache.manifests.insert(dst_child_manifest); + } + }); Ok(()) } diff --git a/libparsec/crates/client/src/workspace/store/resolve_path.rs b/libparsec/crates/client/src/workspace/store/resolve_path.rs index 9d67a221551..5cf0f24e258 100644 --- a/libparsec/crates/client/src/workspace/store/resolve_path.rs +++ b/libparsec/crates/client/src/workspace/store/resolve_path.rs @@ -128,10 +128,9 @@ async fn resolve_path_maybe_lock_for_update( // Most of the time we should have each entry in the path already in the cache, // so we want to lock the cache once and only release it in the unlikely case // we need to fetch from the local storage or server. - let cache_only_outcome = { - let mut cache = store.current_view_cache.lock().expect("Mutex is poisoned"); - cache_only_path_resolution(store.realm_id, &mut cache, path_parts, lock_for_update) - }; + let cache_only_outcome = store.data.with_current_view_cache(|cache| { + cache_only_path_resolution(store.realm_id, cache, path_parts, lock_for_update) + }); match cache_only_outcome { CacheOnlyPathResolutionOutcome::Done { manifest, @@ -422,10 +421,14 @@ pub(crate) async fn resolve_path_for_reparenting( DstChild, } - let (needed_before_resolution, who_need) = 'needed_before_resolution: { - let mut cache = store.current_view_cache.lock().expect("Mutex is poisoned"); + enum CacheOnlyResolutionOutcome { + Resolved(ResolvePathForReparenting), + Need((CacheOnlyPathResolutionOutcome, WhoIsInNeed)), + } + + let outcome = store.data.with_current_view_cache(|cache| { let mut auto_release_guards = AutoReleaseGuards { - cache: &mut cache, + cache, src_parent_guard: None, src_child_guard: None, dst_parent_guard: None, @@ -461,7 +464,10 @@ pub(crate) async fn resolve_path_for_reparenting( } other_outcome => { - break 'needed_before_resolution (other_outcome, WhoIsInNeed::DstParent) + return Ok(CacheOnlyResolutionOutcome::Need(( + other_outcome, + WhoIsInNeed::DstParent, + ))); } }; @@ -494,7 +500,10 @@ pub(crate) async fn resolve_path_for_reparenting( } other_outcome => { - break 'needed_before_resolution (other_outcome, WhoIsInNeed::SrcParent) + return Ok(CacheOnlyResolutionOutcome::Need(( + other_outcome, + WhoIsInNeed::SrcParent, + ))); } }; @@ -511,10 +520,10 @@ pub(crate) async fn resolve_path_for_reparenting( Some(manifest) => manifest.to_owned(), // Cache miss ! None => { - break 'needed_before_resolution ( + return Ok(CacheOnlyResolutionOutcome::Need(( CacheOnlyPathResolutionOutcome::NeedPopulateCache(src_child_id), WhoIsInNeed::SrcChild, - ) + ))); } }; @@ -530,10 +539,10 @@ pub(crate) async fn resolve_path_for_reparenting( { ManifestUpdateLockTakeOutcome::Taken(lock) => Some(lock), ManifestUpdateLockTakeOutcome::NeedWait(need_wait) => { - break 'needed_before_resolution ( + return Ok(CacheOnlyResolutionOutcome::Need(( CacheOnlyPathResolutionOutcome::NeedWaitForTakenUpdateLock(need_wait), WhoIsInNeed::SrcChild, - ); + ))); } }; auto_release_guards.src_child_guard = @@ -555,10 +564,10 @@ pub(crate) async fn resolve_path_for_reparenting( Some(manifest) => manifest.to_owned(), // Cache miss ! None => { - break 'needed_before_resolution ( + return Ok(CacheOnlyResolutionOutcome::Need(( CacheOnlyPathResolutionOutcome::NeedPopulateCache(dst_child_id), WhoIsInNeed::DstChild, - ) + ))); } }; @@ -574,10 +583,10 @@ pub(crate) async fn resolve_path_for_reparenting( { ManifestUpdateLockTakeOutcome::Taken(lock) => Some(lock), ManifestUpdateLockTakeOutcome::NeedWait(need_wait) => { - break 'needed_before_resolution ( + return Ok(CacheOnlyResolutionOutcome::Need(( CacheOnlyPathResolutionOutcome::NeedWaitForTakenUpdateLock(need_wait), WhoIsInNeed::DstChild, - ); + ))); } }; auto_release_guards.dst_child_guard = @@ -590,25 +599,36 @@ pub(crate) async fn resolve_path_for_reparenting( // 5) All done, we defuse the auto release system to return its guards - return Ok(ResolvePathForReparenting { - src_parent_update_lock_guard: auto_release_guards - .src_parent_guard - .take() - .expect("always present"), - src_child_update_lock_guard: auto_release_guards - .src_child_guard - .take() - .expect("always present"), - dst_parent_update_lock_guard: auto_release_guards - .dst_parent_guard - .take() - .expect("always present"), - dst_child_update_lock_guard: auto_release_guards.dst_parent_guard.take(), - src_parent_manifest, - src_child_manifest, - dst_parent_manifest, - dst_child_manifest, - }); + Ok(CacheOnlyResolutionOutcome::Resolved( + ResolvePathForReparenting { + src_parent_update_lock_guard: auto_release_guards + .src_parent_guard + .take() + .expect("always present"), + src_child_update_lock_guard: auto_release_guards + .src_child_guard + .take() + .expect("always present"), + dst_parent_update_lock_guard: auto_release_guards + .dst_parent_guard + .take() + .expect("always present"), + dst_child_update_lock_guard: auto_release_guards.dst_parent_guard.take(), + src_parent_manifest, + src_child_manifest, + dst_parent_manifest, + dst_child_manifest, + }, + )) + })?; + + let (needed_before_resolution, who_need) = match outcome { + CacheOnlyResolutionOutcome::Resolved(resolve_path_for_reparenting) => { + return Ok(resolve_path_for_reparenting) + } + CacheOnlyResolutionOutcome::Need((needed_before_resolution, who_need)) => { + (needed_before_resolution, who_need) + } }; match needed_before_resolution { @@ -899,15 +919,9 @@ pub(crate) async fn retrieve_path_from_id_and_lock_for_update( // Most of the time we should have each entry in the path already in the cache, // so we want to lock the cache once and only release it in the unlikely case // we need to fetch from the local storage or server. - let cache_only_outcome = { - let mut cache = store.current_view_cache.lock().expect("Mutex is poisoned"); - cache_only_retrieve_path_from_id( - &mut cache, - entry_id, - true, - last_not_found_during_populate, - ) - }; + let cache_only_outcome = store.data.with_current_view_cache(|cache| { + cache_only_retrieve_path_from_id(cache, entry_id, true, last_not_found_during_populate) + }); match cache_only_outcome { CacheOnlyPathRetrievalOutcome::Done { entry, @@ -1009,15 +1023,9 @@ pub(crate) async fn retrieve_path_from_id( // Most of the time we should have each entry in the path already in the cache, // so we want to lock the cache once and only release it in the unlikely case // we need to fetch from the local storage or server. - let cache_only_outcome = { - let mut cache = store.current_view_cache.lock().expect("Mutex is poisoned"); - cache_only_retrieve_path_from_id( - &mut cache, - entry_id, - false, - last_not_found_during_populate, - ) - }; + let cache_only_outcome = store.data.with_current_view_cache(|cache| { + cache_only_retrieve_path_from_id(cache, entry_id, false, last_not_found_during_populate) + }); match cache_only_outcome { CacheOnlyPathRetrievalOutcome::Done { entry, diff --git a/libparsec/crates/client/src/workspace/store/sync_updater.rs b/libparsec/crates/client/src/workspace/store/sync_updater.rs index 5736ce8d68e..d79afd65167 100644 --- a/libparsec/crates/client/src/workspace/store/sync_updater.rs +++ b/libparsec/crates/client/src/workspace/store/sync_updater.rs @@ -84,44 +84,58 @@ pub(super) async fn for_update_sync_local_only( // Guard's drop will panic if the lock is not released macro_rules! release_guard_on_error { ($entry_guard:expr) => { - let mut cache_guard = store.current_view_cache.lock().expect("Mutex is poisoned"); - cache_guard.lock_update_manifests.release($entry_guard); + store.data.with_current_view_cache(|cache| { + cache.lock_update_manifests.release($entry_guard); + }); }; } // Step 1, 2 and 3 are about retrieving the manifest and locking it for update let (entry_guard, manifest) = 'get_lock_and_manifest: { - let entry_guard = { - let mut cache_guard = store.current_view_cache.lock().expect("Mutex is poisoned"); + // 1) Lock for update - // 1) Lock for update + enum LockForUpdateOutcome { + GoToStep3(ManifestUpdateLockGuard), + GoToStep4((ManifestUpdateLockGuard, ArcLocalChildManifest)), + WouldBlock, + } - // Don't wait for the lock: we'd rather wait for the entry to settle - // before synchronizing it anyway. - let outcome = cache_guard.lock_update_manifests.try_take(entry_id); - match outcome { + // Don't wait for the lock: we'd rather wait for the entry to settle + // before synchronizing it anyway. + let outcome = store.data.with_current_view_cache(|cache| { + match cache.lock_update_manifests.try_take(entry_id) { Some(entry_guard) => { // 2) Cache lookup for entry... - let found = cache_guard.manifests.get(&entry_id); + let found = cache.manifests.get(&entry_id); if let Some(manifest) = found { // Cache hit ! We go to step 4. - break 'get_lock_and_manifest (entry_guard, Some(manifest.clone())); + LockForUpdateOutcome::GoToStep4((entry_guard, manifest.clone())) + } else { + // The entry is not in cache, go to step 3 for a lookup in the local storage. + // Note we keep the update lock: this has no impact on read operation, and + // any other write operation taking the lock will have no choice but to try + // to populate the cache just like we are going to do. + LockForUpdateOutcome::GoToStep3(entry_guard) } - // The entry is not in cache, go to step 3 for a lookup in the local storage. - // Note we keep the update lock: this has no impact on read operation, and - // any other write operation taking the lock will have no choice but to try - // to populate the cache just like we are going to do. - entry_guard } None => { // Note it's not a big deal to return `WouldBlock` here: the caller // will just re-schedule the operation and try again later. - return Err(ForUpdateSyncLocalOnlyError::WouldBlock); + LockForUpdateOutcome::WouldBlock } } + }); + let entry_guard = match outcome { + LockForUpdateOutcome::GoToStep3(entry_guard) => entry_guard, + LockForUpdateOutcome::GoToStep4((entry_guard, manifest)) => { + break 'get_lock_and_manifest (entry_guard, Some(manifest)) + } + LockForUpdateOutcome::WouldBlock => { + return Err(ForUpdateSyncLocalOnlyError::WouldBlock); + } }; // Be careful here: `entry_guard` must be manually released in case of error ! @@ -261,11 +275,6 @@ impl<'a> SyncUpdater<'a> { } } - let mut storage_guard = self.store.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| UpdateManifestForSyncError::Stopped)?; - let update_data = match &manifest { ArcLocalChildManifest::File(manifest) => UpdateManifestData { entry_id: manifest.base.id, @@ -280,16 +289,24 @@ impl<'a> SyncUpdater<'a> { encrypted: manifest.dump_and_encrypt(&self.store.device.local_symkey), }, }; - - storage.update_manifest(&update_data).await?; + self.store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateManifestForSyncError::Stopped)?; + + storage + .update_manifest(&update_data) + .await + .map_err(UpdateManifestForSyncError::Internal) + }) + .await?; // Finally update cache - let mut cache = self - .store - .current_view_cache - .lock() - .expect("Mutex is poisoned"); - cache.manifests.insert(manifest); + self.store.data.with_current_view_cache(|cache| { + cache.manifests.insert(manifest); + }); Ok(()) } @@ -314,34 +331,35 @@ impl<'a> SyncUpdater<'a> { } } - let mut storage_guard = self.store.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| UpdateManifestForSyncError::Stopped)?; - let update_data = UpdateManifestData { entry_id: manifest.base.id, base_version: manifest.base.version, need_sync: manifest.need_sync, encrypted: manifest.dump_and_encrypt(&self.store.device.local_symkey), }; - let new_chunks = new_chunks.map(|(chunk_id, cleartext)| { (chunk_id, self.store.device.local_symkey.encrypt(cleartext)) }); - storage - .update_manifest_and_chunks(&update_data, new_chunks, removed_chunks) + self.store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateManifestForSyncError::Stopped)?; + + storage + .update_manifest_and_chunks(&update_data, new_chunks, removed_chunks) + .await + .map_err(UpdateManifestForSyncError::Internal) + }) .await?; // Finally update cache - let mut cache = self - .store - .current_view_cache - .lock() - .expect("Mutex is poisoned"); - cache - .manifests - .insert(ArcLocalChildManifest::File(manifest)); + self.store.data.with_current_view_cache(|cache| { + cache + .manifests + .insert(ArcLocalChildManifest::File(manifest)); + }); Ok(()) } @@ -374,39 +392,46 @@ impl<'a> SyncUpdater<'a> { // lead to a deadlock ! let (parent_update_guard, parent_manifest) = 'get_lock_and_manifest: { - let parent_update_guard = { - let mut cache_guard = self - .store - .current_view_cache - .lock() - .expect("Mutex is poisoned"); + // 1) Lock for update - // 1) Lock for update + enum LockForUpdateOutcome { + GoToStep3(ManifestUpdateLockGuard), + GoToStep4((ManifestUpdateLockGuard, ArcLocalChildManifest)), + WouldBlock, + } + let outcome = self.store.data.with_current_view_cache(|cache| { // Don't try to wait for the lock as it may lead to a deadlock ! - let outcome = cache_guard.lock_update_manifests.try_take(parent_id); - match outcome { + match cache.lock_update_manifests.try_take(parent_id) { Some(parent_update_guard) => { // 2) Cache lookup for entry... - let found = cache_guard.manifests.get(&parent_id); + let found = cache.manifests.get(&parent_id); if let Some(parent_manifest) = found { // Cache hit ! We go to step 4. - break 'get_lock_and_manifest ( + LockForUpdateOutcome::GoToStep4(( parent_update_guard, parent_manifest.to_owned(), - ); + )) + } else { + // The entry is not in cache, go to step 3 for a lookup in the local storage. + // Note we keep the update lock: this has no impact on read operation, and + // any other write operation taking the lock will have no choice but to try + // to populate the cache just like we are going to do. + LockForUpdateOutcome::GoToStep3(parent_update_guard) } - // The entry is not in cache, go to step 3 for a lookup in the local storage. - // Note we keep the update lock: this has no impact on read operation, and - // any other write operation taking the lock will have no choice but to try - // to populate the cache just like we are going to do. - parent_update_guard } - None => { - return Err((self, IntoSyncConflictUpdaterError::WouldBlock)); - } + None => LockForUpdateOutcome::WouldBlock, + } + }); + let parent_update_guard = match outcome { + LockForUpdateOutcome::GoToStep4((parent_update_guard, parent_manifest)) => { + break 'get_lock_and_manifest (parent_update_guard, parent_manifest); + } + LockForUpdateOutcome::GoToStep3(parent_update_guard) => parent_update_guard, + LockForUpdateOutcome::WouldBlock => { + return Err((self, IntoSyncConflictUpdaterError::WouldBlock)); } }; @@ -478,12 +503,9 @@ impl<'a> SyncUpdater<'a> { impl Drop for SyncUpdater<'_> { fn drop(&mut self) { if let Some(update_guard) = self.update_guard.take() { - self.store - .current_view_cache - .lock() - .expect("Mutex is poisoned") - .lock_update_manifests - .release(update_guard); + self.store.data.with_current_view_cache(|cache| { + cache.lock_update_manifests.release(update_guard); + }); } } } @@ -540,11 +562,6 @@ impl<'a> SyncConflictUpdater<'a> { } } - let mut storage_guard = self.store.storage.lock().await; - let storage = storage_guard - .as_mut() - .ok_or_else(|| UpdateManifestForSyncError::Stopped)?; - let child_update_data = match &child_manifest { ArcLocalChildManifest::File(manifest) => UpdateManifestData { entry_id: manifest.base.id, @@ -559,14 +576,12 @@ impl<'a> SyncConflictUpdater<'a> { encrypted: manifest.dump_and_encrypt(&self.store.device.local_symkey), }, }; - let parent_update_data = UpdateManifestData { entry_id: parent_manifest.base.id, base_version: parent_manifest.base.version, need_sync: parent_manifest.need_sync, encrypted: parent_manifest.dump_and_encrypt(&self.store.device.local_symkey), }; - let conflicting_new_child_update_data = match &conflicting_new_child_manifest { ArcLocalChildManifest::File(manifest) => UpdateManifestData { entry_id: manifest.base.id, @@ -581,31 +596,35 @@ impl<'a> SyncConflictUpdater<'a> { encrypted: manifest.dump_and_encrypt(&self.store.device.local_symkey), }, }; - - storage - .update_manifests( - [ - child_update_data, - parent_update_data, - conflicting_new_child_update_data, - ] - .into_iter(), - ) + self.store + .data + .with_storage(|maybe_storage| async move { + let storage = maybe_storage + .as_mut() + .ok_or_else(|| UpdateManifestForSyncError::Stopped)?; + + storage + .update_manifests( + [ + child_update_data, + parent_update_data, + conflicting_new_child_update_data, + ] + .into_iter(), + ) + .await + .map_err(UpdateManifestForSyncError::Internal) + }) .await?; // Finally update cache - - let mut cache = self - .store - .current_view_cache - .lock() - .expect("Mutex is poisoned"); - - cache - .manifests - .insert(ArcLocalChildManifest::Folder(parent_manifest)); - cache.manifests.insert(child_manifest); - cache.manifests.insert(conflicting_new_child_manifest); + self.store.data.with_current_view_cache(|cache| { + cache + .manifests + .insert(ArcLocalChildManifest::Folder(parent_manifest)); + cache.manifests.insert(child_manifest); + cache.manifests.insert(conflicting_new_child_manifest); + }); Ok(()) } @@ -614,13 +633,10 @@ impl<'a> SyncConflictUpdater<'a> { impl Drop for SyncConflictUpdater<'_> { fn drop(&mut self) { if let Some((child_update_guard, parent_update_guard)) = self.update_guards.take() { - let mut store = self - .store - .current_view_cache - .lock() - .expect("Mutex is poisoned"); - store.lock_update_manifests.release(child_update_guard); - store.lock_update_manifests.release(parent_update_guard); + self.store.data.with_current_view_cache(|cache| { + cache.lock_update_manifests.release(child_update_guard); + cache.lock_update_manifests.release(parent_update_guard); + }); } } } diff --git a/libparsec/crates/platform_async/src/lib.rs b/libparsec/crates/platform_async/src/lib.rs index 923da038438..0705802ba91 100644 --- a/libparsec/crates/platform_async/src/lib.rs +++ b/libparsec/crates/platform_async/src/lib.rs @@ -107,7 +107,8 @@ macro_rules! select3_biased { // Platform specific stuff pub use platform::{ - oneshot, pretend_future_is_send_on_web, sleep, spawn, watch, AbortHandle, JoinHandle, + oneshot, pretend_future_is_send_on_web, sleep, spawn, try_task_id, watch, AbortHandle, + JoinHandle, TaskID, }; pub use std::time::Duration; // Re-exposed to simplify use of `sleep` diff --git a/libparsec/crates/platform_async/src/native/mod.rs b/libparsec/crates/platform_async/src/native/mod.rs index 0023ef23609..028034fde49 100644 --- a/libparsec/crates/platform_async/src/native/mod.rs +++ b/libparsec/crates/platform_async/src/native/mod.rs @@ -2,6 +2,8 @@ use futures::FutureExt; +pub use tokio::task::{try_id as try_task_id, Id as TaskID}; + pub mod oneshot { pub use tokio::sync::oneshot::{ channel, diff --git a/libparsec/crates/platform_async/src/web/mod.rs b/libparsec/crates/platform_async/src/web/mod.rs index 6cb372ae2ae..7f368ebddc4 100644 --- a/libparsec/crates/platform_async/src/web/mod.rs +++ b/libparsec/crates/platform_async/src/web/mod.rs @@ -11,6 +11,13 @@ pub async fn sleep(duration: std::time::Duration) { pretend_future_is_send_on_web(not_send_future).await } +#[derive(Debug, Eq, PartialEq, Clone, Copy)] +pub struct TaskID; + +pub fn try_task_id() -> Option { + todo!() +} + #[derive(Debug)] pub struct JoinHandle { phantom: std::marker::PhantomData,