Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benchmarks/benches/event_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ fn find_event_relations(c: &mut Criterion) {
let (target, relations) = room_event_cache
.find_event_with_relations(target_event_id, filter)
.await
.unwrap()
.unwrap();
assert_eq!(target.event_id().as_deref().unwrap(), target_event_id);
assert_eq!(relations.len(), num_related_events as usize);
Expand Down
2 changes: 2 additions & 0 deletions benchmarks/benches/room_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) {
.lock()
.await
.unwrap()
.as_clean()
.unwrap()
.clear_all_linked_chunks()
.await
.unwrap();
Expand Down
12 changes: 10 additions & 2 deletions crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use crate::{
InviteAcceptanceDetails, RoomStateFilter, SessionMeta,
deserialized_responses::DisplayName,
error::{Error, Result},
event_cache::store::EventCacheStoreLock,
event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState},
media::store::MediaStoreLock,
response_processors::{self as processors, Context},
room::{
Expand Down Expand Up @@ -1062,7 +1062,15 @@ impl BaseClient {
self.state_store.forget_room(room_id).await?;

// Remove the room in the event cache store too.
self.event_cache_store().lock().await?.remove_room(room_id).await?;
match self.event_cache_store().lock().await? {
// If the lock is clear, we can do the operation as expected.
// If the lock is dirty, we can ignore to refresh the state, we just need to remove a
// room. Also, we must not mark the lock as non-dirty because other operations may be
// critical and may need to refresh the `EventCache`' state.
EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => {
guard.remove_room(room_id).await?
}
}

Ok(())
}
Expand Down
35 changes: 26 additions & 9 deletions crates/matrix-sdk-base/src/event_cache/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ mod traits;

use matrix_sdk_common::cross_process_lock::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
TryLock,
MappedCrossProcessLockState, TryLock,
};
pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw};
Expand Down Expand Up @@ -83,37 +83,54 @@ impl EventCacheStoreLock {
}

/// Acquire a spin lock (see [`CrossProcessLock::spin_lock`]).
pub async fn lock(&self) -> Result<EventCacheStoreLockGuard<'_>, CrossProcessLockError> {
let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await??.into_guard();
pub async fn lock(&self) -> Result<EventCacheStoreLockState, CrossProcessLockError> {
let lock_state =
self.cross_process_lock.spin_lock(None).await??.map(|cross_process_lock_guard| {
EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.clone() }
});

Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() })
Ok(lock_state)
}
}

/// The equivalent of [`CrossProcessLockState`] but for the [`EventCacheStore`].
pub type EventCacheStoreLockState = MappedCrossProcessLockState<EventCacheStoreLockGuard>;

/// An RAII implementation of a “scoped lock” of an [`EventCacheStoreLock`].
/// When this structure is dropped (falls out of scope), the lock will be
/// unlocked.
pub struct EventCacheStoreLockGuard<'a> {
#[derive(Clone)]
pub struct EventCacheStoreLockGuard {
/// The cross process lock guard.
#[allow(unused)]
cross_process_lock_guard: CrossProcessLockGuard,

/// A reference to the store.
store: &'a DynEventCacheStore,
store: Arc<DynEventCacheStore>,
}

impl EventCacheStoreLockGuard {
/// Forward to [`CrossProcessLockGuard::clear_dirty`].
///
/// This is an associated method to avoid colliding with the [`Deref`]
/// implementation.
pub fn clear_dirty(this: &Self) {
this.cross_process_lock_guard.clear_dirty();
}
}

#[cfg(not(tarpaulin_include))]
impl fmt::Debug for EventCacheStoreLockGuard<'_> {
impl fmt::Debug for EventCacheStoreLockGuard {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive()
}
}

impl Deref for EventCacheStoreLockGuard<'_> {
impl Deref for EventCacheStoreLockGuard {
type Target = DynEventCacheStore;

fn deref(&self) -> &Self::Target {
self.store
self.store.as_ref()
}
}

Expand Down
8 changes: 4 additions & 4 deletions crates/matrix-sdk-base/src/media/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use std::{ops::Deref, sync::Arc};

use matrix_sdk_common::cross_process_lock::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard,
CrossProcessLockKind, TryLock,
CrossProcessLockState, TryLock,
};
use matrix_sdk_store_encryption::Error as StoreEncryptionError;
pub use traits::{DynMediaStore, IntoMediaStore, MediaStore, MediaStoreInner};
Expand Down Expand Up @@ -135,14 +135,14 @@ impl MediaStoreLock {
pub async fn lock(&self) -> Result<MediaStoreLockGuard<'_>, CrossProcessLockError> {
let cross_process_lock_guard = match self.cross_process_lock.spin_lock(None).await?? {
// The lock is clean: no other hold acquired it, all good!
CrossProcessLockKind::Clean(guard) => guard,
CrossProcessLockState::Clean(guard) => guard,

// The lock is dirty: another holder acquired it since the last time we acquired it.
// It's not a problem in the case of the `MediaStore` because this API is “stateless” at
// the time of writing (2025-11-11). There is nothing that can be out-of-sync: all the
// state is in the database, nothing in memory.
CrossProcessLockKind::Dirty(guard) => {
self.cross_process_lock.clear_dirty();
CrossProcessLockState::Dirty(guard) => {
guard.clear_dirty();

guard
}
Expand Down
95 changes: 79 additions & 16 deletions crates/matrix-sdk-common/src/cross_process_lock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,38 @@ enum WaitingTime {
///
/// The lock will be automatically released a short period of time after all the
/// guards have dropped.
#[derive(Debug)]
#[derive(Clone, Debug)]
#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
pub struct CrossProcessLockGuard {
/// A clone of [`CrossProcessLock::num_holders`].
num_holders: Arc<AtomicU32>,

/// A clone of [`CrossProcessLock::is_dirty`].
is_dirty: Arc<AtomicBool>,
}

impl CrossProcessLockGuard {
fn new(num_holders: Arc<AtomicU32>) -> Self {
Self { num_holders }
fn new(num_holders: Arc<AtomicU32>, is_dirty: Arc<AtomicBool>) -> Self {
Self { num_holders, is_dirty }
}

/// Determine whether the cross-process lock associated to this guard is
/// dirty.
///
/// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
/// of _dirty_.
pub fn is_dirty(&self) -> bool {
self.is_dirty.load(Ordering::SeqCst)
}

/// Clear the dirty state from the cross-process lock associated to this
/// guard.
///
/// If the cross-process lock is dirtied, it will remain dirtied until
/// this method is called. This allows recovering from a dirty state and
/// marking that it has recovered.
pub fn clear_dirty(&self) {
self.is_dirty.store(false, Ordering::SeqCst);
}
}

Expand Down Expand Up @@ -169,7 +193,7 @@ where

/// Whether the lock has been dirtied.
///
/// See [`CrossProcessLockKind::Dirty`] to learn more about the semantics
/// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
/// of _dirty_.
is_dirty: Arc<AtomicBool>,
}
Expand Down Expand Up @@ -231,7 +255,7 @@ where

/// Determine whether the cross-process lock is dirty.
///
/// See [`CrossProcessLockKind::Dirty`] to learn more about the semantics
/// See [`CrossProcessLockState::Dirty`] to learn more about the semantics
/// of _dirty_.
pub fn is_dirty(&self) -> bool {
self.is_dirty.load(Ordering::SeqCst)
Expand All @@ -244,7 +268,6 @@ where
/// marking that it has recovered.
pub fn clear_dirty(&self) {
self.is_dirty.store(false, Ordering::SeqCst);
self.generation.store(NO_CROSS_PROCESS_LOCK_GENERATION, Ordering::SeqCst);
}

/// Try to lock once, returns whether the lock was obtained or not.
Expand All @@ -254,7 +277,7 @@ where
#[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))]
pub async fn try_lock_once(
&self,
) -> Result<Result<CrossProcessLockKind, CrossProcessLockUnobtained>, L::LockError> {
) -> Result<Result<CrossProcessLockState, CrossProcessLockUnobtained>, L::LockError> {
// Hold onto the locking attempt mutex for the entire lifetime of this
// function, to avoid multiple reentrant calls.
let mut _attempt = self.locking_attempt.lock().await;
Expand All @@ -270,8 +293,9 @@ where

self.num_holders.fetch_add(1, Ordering::SeqCst);

return Ok(Ok(CrossProcessLockKind::Clean(CrossProcessLockGuard::new(
return Ok(Ok(CrossProcessLockState::Clean(CrossProcessLockGuard::new(
self.num_holders.clone(),
self.is_dirty.clone(),
))));
}

Expand Down Expand Up @@ -396,12 +420,12 @@ where

self.num_holders.fetch_add(1, Ordering::SeqCst);

let guard = CrossProcessLockGuard::new(self.num_holders.clone());
let guard = CrossProcessLockGuard::new(self.num_holders.clone(), self.is_dirty.clone());

Ok(Ok(if self.is_dirty() {
CrossProcessLockKind::Dirty(guard)
CrossProcessLockState::Dirty(guard)
} else {
CrossProcessLockKind::Clean(guard)
CrossProcessLockState::Clean(guard)
}))
}

Expand All @@ -417,7 +441,7 @@ where
pub async fn spin_lock(
&self,
max_backoff: Option<u32>,
) -> Result<Result<CrossProcessLockKind, CrossProcessLockUnobtained>, L::LockError> {
) -> Result<Result<CrossProcessLockState, CrossProcessLockUnobtained>, L::LockError> {
let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS);

// Note: reads/writes to the backoff are racy across threads in theory, but the
Expand Down Expand Up @@ -467,7 +491,8 @@ where
/// Represent a successful result of a locking attempt, either by
/// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`].
#[derive(Debug)]
pub enum CrossProcessLockKind {
#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
pub enum CrossProcessLockState {
/// The lock has been obtained successfully, all good.
Clean(CrossProcessLockGuard),

Expand All @@ -487,13 +512,51 @@ pub enum CrossProcessLockKind {
Dirty(CrossProcessLockGuard),
}

impl CrossProcessLockKind {
impl CrossProcessLockState {
/// Map this value into the inner [`CrossProcessLockGuard`].
pub fn into_guard(self) -> CrossProcessLockGuard {
match self {
Self::Clean(guard) | Self::Dirty(guard) => guard,
}
}

/// Map this [`CrossProcessLockState`] into a
/// [`MappedCrossProcessLockState`].
///
/// This is helpful when one wants to create its own wrapper over
/// [`CrossProcessLockGuard`].
pub fn map<F, G>(self, mapper: F) -> MappedCrossProcessLockState<G>
where
F: FnOnce(CrossProcessLockGuard) -> G,
{
match self {
Self::Clean(guard) => MappedCrossProcessLockState::Clean(mapper(guard)),
Self::Dirty(guard) => MappedCrossProcessLockState::Dirty(mapper(guard)),
}
}
}

/// A mapped [`CrossProcessLockState`].
///
/// Created by [`CrossProcessLockState::map`].
#[derive(Debug)]
#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"]
pub enum MappedCrossProcessLockState<G> {
/// The equivalent of [`CrossProcessLockState::Clean`].
Clean(G),

/// The equivalent of [`CrossProcessLockState::Dirty`].
Dirty(G),
}

impl<G> MappedCrossProcessLockState<G> {
/// Return `Some(G)` if `Self` is [`Clean`][Self::Clean].
pub fn as_clean(&self) -> Option<&G> {
match self {
Self::Clean(guard) => Some(guard),
Self::Dirty(_) => None,
}
}
}

/// Represent an unsuccessful result of a lock attempt, either by
Expand Down Expand Up @@ -544,7 +607,7 @@ mod tests {
};

use super::{
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockKind,
CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockState,
CrossProcessLockUnobtained, EXTEND_LEASE_EVERY_MS, TryLock,
memory_store_helper::{Lease, try_take_leased_lock},
};
Expand Down Expand Up @@ -588,7 +651,7 @@ mod tests {
}
}

async fn release_lock(lock: CrossProcessLockKind) {
async fn release_lock(lock: CrossProcessLockState) {
drop(lock);
sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await;
}
Expand Down
Loading