diff --git a/benchmarks/benches/event_cache.rs b/benchmarks/benches/event_cache.rs index 10f7627edaa..53c225e5c2f 100644 --- a/benchmarks/benches/event_cache.rs +++ b/benchmarks/benches/event_cache.rs @@ -317,6 +317,7 @@ fn find_event_relations(c: &mut Criterion) { let (target, relations) = room_event_cache .find_event_with_relations(target_event_id, filter) .await + .unwrap() .unwrap(); assert_eq!(target.event_id().as_deref().unwrap(), target_event_id); assert_eq!(relations.len(), num_related_events as usize); diff --git a/benchmarks/benches/room_bench.rs b/benchmarks/benches/room_bench.rs index fc9d7dcb54b..d9a11227e22 100644 --- a/benchmarks/benches/room_bench.rs +++ b/benchmarks/benches/room_bench.rs @@ -181,6 +181,8 @@ pub fn load_pinned_events_benchmark(c: &mut Criterion) { .lock() .await .unwrap() + .as_clean() + .unwrap() .clear_all_linked_chunks() .await .unwrap(); diff --git a/crates/matrix-sdk-base/src/client.rs b/crates/matrix-sdk-base/src/client.rs index 4837483ac8e..4f273010f14 100644 --- a/crates/matrix-sdk-base/src/client.rs +++ b/crates/matrix-sdk-base/src/client.rs @@ -57,7 +57,7 @@ use crate::{ InviteAcceptanceDetails, RoomStateFilter, SessionMeta, deserialized_responses::DisplayName, error::{Error, Result}, - event_cache::store::EventCacheStoreLock, + event_cache::store::{EventCacheStoreLock, EventCacheStoreLockState}, media::store::MediaStoreLock, response_processors::{self as processors, Context}, room::{ @@ -1062,7 +1062,15 @@ impl BaseClient { self.state_store.forget_room(room_id).await?; // Remove the room in the event cache store too. - self.event_cache_store().lock().await?.remove_room(room_id).await?; + match self.event_cache_store().lock().await? { + // If the lock is clear, we can do the operation as expected. + // If the lock is dirty, we can ignore to refresh the state, we just need to remove a + // room. Also, we must not mark the lock as non-dirty because other operations may be + // critical and may need to refresh the `EventCache`' state. + EventCacheStoreLockState::Clean(guard) | EventCacheStoreLockState::Dirty(guard) => { + guard.remove_room(room_id).await? + } + } Ok(()) } diff --git a/crates/matrix-sdk-base/src/event_cache/store/mod.rs b/crates/matrix-sdk-base/src/event_cache/store/mod.rs index 3062b3de361..dad964fd0de 100644 --- a/crates/matrix-sdk-base/src/event_cache/store/mod.rs +++ b/crates/matrix-sdk-base/src/event_cache/store/mod.rs @@ -29,7 +29,7 @@ mod traits; use matrix_sdk_common::cross_process_lock::{ CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard, - TryLock, + MappedCrossProcessLockState, TryLock, }; pub use matrix_sdk_store_encryption::Error as StoreEncryptionError; use ruma::{OwnedEventId, events::AnySyncTimelineEvent, serde::Raw}; @@ -83,37 +83,54 @@ impl EventCacheStoreLock { } /// Acquire a spin lock (see [`CrossProcessLock::spin_lock`]). - pub async fn lock(&self) -> Result, CrossProcessLockError> { - let cross_process_lock_guard = self.cross_process_lock.spin_lock(None).await??.into_guard(); + pub async fn lock(&self) -> Result { + let lock_state = + self.cross_process_lock.spin_lock(None).await??.map(|cross_process_lock_guard| { + EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.clone() } + }); - Ok(EventCacheStoreLockGuard { cross_process_lock_guard, store: self.store.deref() }) + Ok(lock_state) } } +/// The equivalent of [`CrossProcessLockState`] but for the [`EventCacheStore`]. +pub type EventCacheStoreLockState = MappedCrossProcessLockState; + /// An RAII implementation of a “scoped lock” of an [`EventCacheStoreLock`]. /// When this structure is dropped (falls out of scope), the lock will be /// unlocked. -pub struct EventCacheStoreLockGuard<'a> { +#[derive(Clone)] +pub struct EventCacheStoreLockGuard { /// The cross process lock guard. #[allow(unused)] cross_process_lock_guard: CrossProcessLockGuard, /// A reference to the store. - store: &'a DynEventCacheStore, + store: Arc, +} + +impl EventCacheStoreLockGuard { + /// Forward to [`CrossProcessLockGuard::clear_dirty`]. + /// + /// This is an associated method to avoid colliding with the [`Deref`] + /// implementation. + pub fn clear_dirty(this: &Self) { + this.cross_process_lock_guard.clear_dirty(); + } } #[cfg(not(tarpaulin_include))] -impl fmt::Debug for EventCacheStoreLockGuard<'_> { +impl fmt::Debug for EventCacheStoreLockGuard { fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { formatter.debug_struct("EventCacheStoreLockGuard").finish_non_exhaustive() } } -impl Deref for EventCacheStoreLockGuard<'_> { +impl Deref for EventCacheStoreLockGuard { type Target = DynEventCacheStore; fn deref(&self) -> &Self::Target { - self.store + self.store.as_ref() } } diff --git a/crates/matrix-sdk-base/src/media/store/mod.rs b/crates/matrix-sdk-base/src/media/store/mod.rs index b861673e890..b7f6b4ba02b 100644 --- a/crates/matrix-sdk-base/src/media/store/mod.rs +++ b/crates/matrix-sdk-base/src/media/store/mod.rs @@ -33,7 +33,7 @@ use std::{ops::Deref, sync::Arc}; use matrix_sdk_common::cross_process_lock::{ CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockGuard, - CrossProcessLockKind, TryLock, + CrossProcessLockState, TryLock, }; use matrix_sdk_store_encryption::Error as StoreEncryptionError; pub use traits::{DynMediaStore, IntoMediaStore, MediaStore, MediaStoreInner}; @@ -135,14 +135,14 @@ impl MediaStoreLock { pub async fn lock(&self) -> Result, CrossProcessLockError> { let cross_process_lock_guard = match self.cross_process_lock.spin_lock(None).await?? { // The lock is clean: no other hold acquired it, all good! - CrossProcessLockKind::Clean(guard) => guard, + CrossProcessLockState::Clean(guard) => guard, // The lock is dirty: another holder acquired it since the last time we acquired it. // It's not a problem in the case of the `MediaStore` because this API is “stateless” at // the time of writing (2025-11-11). There is nothing that can be out-of-sync: all the // state is in the database, nothing in memory. - CrossProcessLockKind::Dirty(guard) => { - self.cross_process_lock.clear_dirty(); + CrossProcessLockState::Dirty(guard) => { + guard.clear_dirty(); guard } diff --git a/crates/matrix-sdk-common/src/cross_process_lock.rs b/crates/matrix-sdk-common/src/cross_process_lock.rs index 2dbbefc168b..e55c37d1e5c 100644 --- a/crates/matrix-sdk-common/src/cross_process_lock.rs +++ b/crates/matrix-sdk-common/src/cross_process_lock.rs @@ -107,14 +107,38 @@ enum WaitingTime { /// /// The lock will be automatically released a short period of time after all the /// guards have dropped. -#[derive(Debug)] +#[derive(Clone, Debug)] +#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"] pub struct CrossProcessLockGuard { + /// A clone of [`CrossProcessLock::num_holders`]. num_holders: Arc, + + /// A clone of [`CrossProcessLock::is_dirty`]. + is_dirty: Arc, } impl CrossProcessLockGuard { - fn new(num_holders: Arc) -> Self { - Self { num_holders } + fn new(num_holders: Arc, is_dirty: Arc) -> Self { + Self { num_holders, is_dirty } + } + + /// Determine whether the cross-process lock associated to this guard is + /// dirty. + /// + /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics + /// of _dirty_. + pub fn is_dirty(&self) -> bool { + self.is_dirty.load(Ordering::SeqCst) + } + + /// Clear the dirty state from the cross-process lock associated to this + /// guard. + /// + /// If the cross-process lock is dirtied, it will remain dirtied until + /// this method is called. This allows recovering from a dirty state and + /// marking that it has recovered. + pub fn clear_dirty(&self) { + self.is_dirty.store(false, Ordering::SeqCst); } } @@ -169,7 +193,7 @@ where /// Whether the lock has been dirtied. /// - /// See [`CrossProcessLockKind::Dirty`] to learn more about the semantics + /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics /// of _dirty_. is_dirty: Arc, } @@ -231,7 +255,7 @@ where /// Determine whether the cross-process lock is dirty. /// - /// See [`CrossProcessLockKind::Dirty`] to learn more about the semantics + /// See [`CrossProcessLockState::Dirty`] to learn more about the semantics /// of _dirty_. pub fn is_dirty(&self) -> bool { self.is_dirty.load(Ordering::SeqCst) @@ -244,7 +268,6 @@ where /// marking that it has recovered. pub fn clear_dirty(&self) { self.is_dirty.store(false, Ordering::SeqCst); - self.generation.store(NO_CROSS_PROCESS_LOCK_GENERATION, Ordering::SeqCst); } /// Try to lock once, returns whether the lock was obtained or not. @@ -254,7 +277,7 @@ where #[instrument(skip(self), fields(?self.lock_key, ?self.lock_holder))] pub async fn try_lock_once( &self, - ) -> Result, L::LockError> { + ) -> Result, L::LockError> { // Hold onto the locking attempt mutex for the entire lifetime of this // function, to avoid multiple reentrant calls. let mut _attempt = self.locking_attempt.lock().await; @@ -270,8 +293,9 @@ where self.num_holders.fetch_add(1, Ordering::SeqCst); - return Ok(Ok(CrossProcessLockKind::Clean(CrossProcessLockGuard::new( + return Ok(Ok(CrossProcessLockState::Clean(CrossProcessLockGuard::new( self.num_holders.clone(), + self.is_dirty.clone(), )))); } @@ -396,12 +420,12 @@ where self.num_holders.fetch_add(1, Ordering::SeqCst); - let guard = CrossProcessLockGuard::new(self.num_holders.clone()); + let guard = CrossProcessLockGuard::new(self.num_holders.clone(), self.is_dirty.clone()); Ok(Ok(if self.is_dirty() { - CrossProcessLockKind::Dirty(guard) + CrossProcessLockState::Dirty(guard) } else { - CrossProcessLockKind::Clean(guard) + CrossProcessLockState::Clean(guard) })) } @@ -417,7 +441,7 @@ where pub async fn spin_lock( &self, max_backoff: Option, - ) -> Result, L::LockError> { + ) -> Result, L::LockError> { let max_backoff = max_backoff.unwrap_or(MAX_BACKOFF_MS); // Note: reads/writes to the backoff are racy across threads in theory, but the @@ -467,7 +491,8 @@ where /// Represent a successful result of a locking attempt, either by /// [`CrossProcessLock::try_lock_once`] or [`CrossProcessLock::spin_lock`]. #[derive(Debug)] -pub enum CrossProcessLockKind { +#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"] +pub enum CrossProcessLockState { /// The lock has been obtained successfully, all good. Clean(CrossProcessLockGuard), @@ -487,13 +512,51 @@ pub enum CrossProcessLockKind { Dirty(CrossProcessLockGuard), } -impl CrossProcessLockKind { +impl CrossProcessLockState { /// Map this value into the inner [`CrossProcessLockGuard`]. pub fn into_guard(self) -> CrossProcessLockGuard { match self { Self::Clean(guard) | Self::Dirty(guard) => guard, } } + + /// Map this [`CrossProcessLockState`] into a + /// [`MappedCrossProcessLockState`]. + /// + /// This is helpful when one wants to create its own wrapper over + /// [`CrossProcessLockGuard`]. + pub fn map(self, mapper: F) -> MappedCrossProcessLockState + where + F: FnOnce(CrossProcessLockGuard) -> G, + { + match self { + Self::Clean(guard) => MappedCrossProcessLockState::Clean(mapper(guard)), + Self::Dirty(guard) => MappedCrossProcessLockState::Dirty(mapper(guard)), + } + } +} + +/// A mapped [`CrossProcessLockState`]. +/// +/// Created by [`CrossProcessLockState::map`]. +#[derive(Debug)] +#[must_use = "If unused, the `CrossProcessLock` will unlock at the end of the lease"] +pub enum MappedCrossProcessLockState { + /// The equivalent of [`CrossProcessLockState::Clean`]. + Clean(G), + + /// The equivalent of [`CrossProcessLockState::Dirty`]. + Dirty(G), +} + +impl MappedCrossProcessLockState { + /// Return `Some(G)` if `Self` is [`Clean`][Self::Clean]. + pub fn as_clean(&self) -> Option<&G> { + match self { + Self::Clean(guard) => Some(guard), + Self::Dirty(_) => None, + } + } } /// Represent an unsuccessful result of a lock attempt, either by @@ -544,7 +607,7 @@ mod tests { }; use super::{ - CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockKind, + CrossProcessLock, CrossProcessLockError, CrossProcessLockGeneration, CrossProcessLockState, CrossProcessLockUnobtained, EXTEND_LEASE_EVERY_MS, TryLock, memory_store_helper::{Lease, try_take_leased_lock}, }; @@ -588,7 +651,7 @@ mod tests { } } - async fn release_lock(lock: CrossProcessLockKind) { + async fn release_lock(lock: CrossProcessLockState) { drop(lock); sleep(Duration::from_millis(EXTEND_LEASE_EVERY_MS)).await; } diff --git a/crates/matrix-sdk-ui/src/timeline/builder.rs b/crates/matrix-sdk-ui/src/timeline/builder.rs index 90e443583f0..e918cd8b685 100644 --- a/crates/matrix-sdk-ui/src/timeline/builder.rs +++ b/crates/matrix-sdk-ui/src/timeline/builder.rs @@ -164,7 +164,7 @@ impl TimelineBuilder { room.client().event_cache().subscribe()?; let (room_event_cache, event_cache_drop) = room.event_cache().await?; - let (_, event_subscriber) = room_event_cache.subscribe().await; + let (_, event_subscriber) = room_event_cache.subscribe().await.unwrap(); let is_room_encrypted = room .latest_encryption_state() @@ -209,36 +209,36 @@ impl TimelineBuilder { .instrument(span) }); - let thread_update_join_handle = if let TimelineFocus::Thread { root_event_id: root } = - &focus - { - Some({ - let span = info_span!( - parent: Span::none(), - "thread_live_update_handler", - room_id = ?room.room_id(), - focus = focus.debug_string(), - prefix = internal_id_prefix - ); - span.follows_from(Span::current()); - - // Note: must be done here *before* spawning the task, to avoid race conditions - // with event cache updates happening in the background. - let (_events, receiver) = room_event_cache.subscribe_to_thread(root.clone()).await; - - spawn( - thread_updates_task( - receiver, - room_event_cache.clone(), - controller.clone(), - root.clone(), + let thread_update_join_handle = + if let TimelineFocus::Thread { root_event_id: root } = &focus { + Some({ + let span = info_span!( + parent: Span::none(), + "thread_live_update_handler", + room_id = ?room.room_id(), + focus = focus.debug_string(), + prefix = internal_id_prefix + ); + span.follows_from(Span::current()); + + // Note: must be done here *before* spawning the task, to avoid race conditions + // with event cache updates happening in the background. + let (_events, receiver) = + room_event_cache.subscribe_to_thread(root.clone()).await.unwrap(); + + spawn( + thread_updates_task( + receiver, + room_event_cache.clone(), + controller.clone(), + root.clone(), + ) + .instrument(span), ) - .instrument(span), - ) - }) - } else { - None - }; + }) + } else { + None + }; let local_echo_listener_handle = { let timeline_controller = controller.clone(); diff --git a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs index 342317d2edc..1f5bb505dcc 100644 --- a/crates/matrix-sdk-ui/src/timeline/controller/mod.rs +++ b/crates/matrix-sdk-ui/src/timeline/controller/mod.rs @@ -446,7 +446,7 @@ impl TimelineController

{ match focus { TimelineFocus::Live { .. } => { // Retrieve the cached events, and add them to the timeline. - let events = room_event_cache.events().await; + let events = room_event_cache.events().await?; let has_events = !events.is_empty(); @@ -556,7 +556,8 @@ impl TimelineController

{ } TimelineFocus::Thread { root_event_id, .. } => { - let (events, _) = room_event_cache.subscribe_to_thread(root_event_id.clone()).await; + let (events, _) = + room_event_cache.subscribe_to_thread(root_event_id.clone()).await?; let has_events = !events.is_empty(); // For each event, we also need to find the related events, as they don't @@ -565,7 +566,7 @@ impl TimelineController

{ let mut related_events = Vector::new(); for event_id in events.iter().filter_map(|event| event.event_id()) { if let Some((_original, related)) = - room_event_cache.find_event_with_relations(&event_id, None).await + room_event_cache.find_event_with_relations(&event_id, None).await? { related_events.extend(related); } diff --git a/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs b/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs index d605d6b6c9f..d4d0e585191 100644 --- a/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs +++ b/crates/matrix-sdk-ui/src/timeline/pinned_events_loader.rs @@ -173,7 +173,7 @@ impl PinnedEventsRoom for Room { Box::pin(async move { if let Ok((cache, _handles)) = self.event_cache().await && let Some(ret) = - cache.find_event_with_relations(event_id, related_event_filters).await + cache.find_event_with_relations(event_id, related_event_filters).await? { debug!("Loaded pinned event {event_id} and related events from cache"); return Ok(ret); diff --git a/crates/matrix-sdk-ui/src/timeline/tasks.rs b/crates/matrix-sdk-ui/src/timeline/tasks.rs index b40f0c9cb25..5f1455ba2f4 100644 --- a/crates/matrix-sdk-ui/src/timeline/tasks.rs +++ b/crates/matrix-sdk-ui/src/timeline/tasks.rs @@ -28,7 +28,7 @@ use matrix_sdk::{ use ruma::OwnedEventId; use tokio::sync::broadcast::{Receiver, error::RecvError}; use tokio_stream::StreamExt as _; -use tracing::{instrument, trace, warn}; +use tracing::{error, instrument, trace, warn}; use crate::timeline::{TimelineController, TimelineFocus, event_item::RemoteEventOrigin}; @@ -93,7 +93,14 @@ pub(in crate::timeline) async fn thread_updates_task( // The updates might have lagged, but the room event cache might // have events, so retrieve them and add them back again to the // timeline, after clearing it. - let (initial_events, _) = room_event_cache.subscribe_to_thread(root.clone()).await; + let (initial_events, _) = + match room_event_cache.subscribe_to_thread(root.clone()).await { + Ok(values) => values, + Err(err) => { + error!(?err, "Subscribing to thread failed"); + break; + } + }; timeline_controller .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache) @@ -145,7 +152,16 @@ pub(in crate::timeline) async fn room_event_cache_updates_task( // The updates might have lagged, but the room event cache might have // events, so retrieve them and add them back again to the timeline, // after clearing it. - let initial_events = room_event_cache.events().await; + let initial_events = match room_event_cache.events().await { + Ok(initial_events) => initial_events, + Err(err) => { + error!( + ?err, + "Failed to replace the initial remote events in the event cache" + ); + break; + } + }; timeline_controller .replace_with_initial_remote_events(initial_events, RemoteEventOrigin::Cache) diff --git a/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs b/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs index 27794fc5cd3..1441ebaaa1d 100644 --- a/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs +++ b/crates/matrix-sdk-ui/tests/integration/timeline/decryption.rs @@ -73,6 +73,8 @@ async fn test_an_utd_from_the_event_cache_as_an_initial_item_is_decrypted() { // The item is an encrypted event! It has been stored before having a chance to // be decrypted. Damn. We want to see if decryption will trigger automatically. event_cache_store + .as_clean() + .unwrap() .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -212,6 +214,8 @@ async fn test_an_utd_from_the_event_cache_as_a_paginated_item_is_decrypted() { // chance to be decrypted. Damn. We want to see if decryption will trigger // automatically. event_cache_store + .as_clean() + .unwrap() .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ diff --git a/crates/matrix-sdk/src/event_cache/deduplicator.rs b/crates/matrix-sdk/src/event_cache/deduplicator.rs index 9d9f69facf2..581afed9d93 100644 --- a/crates/matrix-sdk/src/event_cache/deduplicator.rs +++ b/crates/matrix-sdk/src/event_cache/deduplicator.rs @@ -18,7 +18,7 @@ use std::collections::BTreeSet; use matrix_sdk_base::{ - event_cache::store::EventCacheStoreLock, + event_cache::store::EventCacheStoreLockGuard, linked_chunk::{LinkedChunkId, Position}, }; use ruma::OwnedEventId; @@ -32,7 +32,7 @@ use super::{ /// information about the duplicates found in the new events, including the /// events that are not loaded in memory. pub async fn filter_duplicate_events( - store: &EventCacheStoreLock, + store_guard: &EventCacheStoreLockGuard, linked_chunk_id: LinkedChunkId<'_>, linked_chunk: &EventLinkedChunk, mut new_events: Vec, @@ -50,10 +50,8 @@ pub async fn filter_duplicate_events( }); } - let store = store.lock().await?; - // Let the store do its magic ✨ - let duplicated_event_ids = store + let duplicated_event_ids = store_guard .filter_duplicated_events( linked_chunk_id, new_events.iter().filter_map(|event| event.event_id()).collect(), @@ -148,7 +146,10 @@ pub(super) struct DeduplicationOutcome { mod tests { use std::ops::Not as _; - use matrix_sdk_base::{deserialized_responses::TimelineEvent, linked_chunk::ChunkIdentifier}; + use matrix_sdk_base::{ + deserialized_responses::TimelineEvent, event_cache::store::EventCacheStoreLock, + linked_chunk::ChunkIdentifier, + }; use matrix_sdk_test::{async_test, event_factory::EventFactory}; use ruma::{EventId, owned_event_id, serde::Raw, user_id}; @@ -222,6 +223,8 @@ mod tests { .unwrap(); let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned()); + let event_cache_store = event_cache_store.lock().await.unwrap(); + let event_cache_store_guard = event_cache_store.as_clean().unwrap(); { // When presenting with only duplicate events, some of them in the in-memory @@ -232,7 +235,7 @@ mod tests { linked_chunk.push_events([event_1.clone(), event_2.clone(), event_3.clone()]); let outcome = filter_duplicate_events( - &event_cache_store, + event_cache_store_guard, LinkedChunkId::Room(room_id), &linked_chunk, vec![event_0.clone(), event_1.clone(), event_2.clone(), event_3.clone()], @@ -247,7 +250,7 @@ mod tests { linked_chunk.push_events([event_2.clone(), event_3.clone()]); let outcome = filter_duplicate_events( - &event_cache_store, + event_cache_store_guard, LinkedChunkId::Room(room_id), &linked_chunk, vec![event_0, event_1, event_2, event_3, event_4], @@ -351,6 +354,8 @@ mod tests { // Wrap the store into its lock. let event_cache_store = EventCacheStoreLock::new(event_cache_store, "hodor".to_owned()); + let event_cache_store = event_cache_store.lock().await.unwrap(); + let event_cache_store_guard = event_cache_store.as_clean().unwrap(); let linked_chunk = EventLinkedChunk::new(); @@ -360,7 +365,7 @@ mod tests { in_store_duplicated_event_ids, non_empty_all_duplicates, } = filter_duplicate_events( - &event_cache_store, + event_cache_store_guard, LinkedChunkId::Room(room_id), &linked_chunk, vec![ev1, ev2, ev3, ev4], diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index 810e07fa553..c00fdf9c7f1 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -42,7 +42,7 @@ use matrix_sdk_base::{ deserialized_responses::{AmbiguityChange, TimelineEvent}, event_cache::{ Gap, - store::{EventCacheStoreError, EventCacheStoreLock}, + store::{EventCacheStoreError, EventCacheStoreLock, EventCacheStoreLockState}, }, executor::AbortOnDrop, linked_chunk::{self, OwnedLinkedChunkId, lazy_loader::LazyLoaderError}, @@ -51,7 +51,6 @@ use matrix_sdk_base::{ timer, }; use matrix_sdk_common::executor::{JoinHandle, spawn}; -use room::RoomEventCacheState; use ruma::{ OwnedEventId, OwnedRoomId, OwnedTransactionId, RoomId, events::AnySyncEphemeralRoomEvent, serde::Raw, @@ -69,6 +68,7 @@ use tracing::{Instrument as _, Span, debug, error, info, info_span, instrument, use crate::{ Client, client::WeakClient, + event_cache::room::RoomEventCacheStateLock, send_queue::{LocalEchoContent, RoomSendQueueUpdate, SendQueueUpdate}, }; @@ -374,7 +374,13 @@ impl EventCache { }; trace!("waiting for state lock…"); - let mut state = room.inner.state.write().await; + let mut state = match room.inner.state.write().await { + Ok(state) => state, + Err(err) => { + warn!(for_room = %room_id, "Failed to get the `RoomEventCacheStateLock`: {err}"); + continue; + } + }; match state.auto_shrink_if_no_subscribers().await { Ok(diffs) => { @@ -900,12 +906,17 @@ impl EventCacheInner { .await; // Clear the storage for all the rooms, using the storage facility. - self.store.lock().await?.clear_all_linked_chunks().await?; + let store_guard = match self.store.lock().await? { + EventCacheStoreLockState::Clean(store_guard) => store_guard, + EventCacheStoreLockState::Dirty(store_guard) => store_guard, + }; + store_guard.clear_all_linked_chunks().await?; // At this point, all the in-memory linked chunks are desynchronized from the // storage. Resynchronize them manually by calling reset(), and // propagate updates to observers. - try_join_all(room_locks.into_iter().map(|(room, mut state_guard)| async move { + try_join_all(room_locks.into_iter().map(|(room, state_guard)| async move { + let mut state_guard = state_guard?; let updates_as_vector_diffs = state_guard.reset().await?; let _ = room.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { @@ -1005,7 +1016,7 @@ impl EventCacheInner { ThreadingSupport::Enabled { .. } ); - let room_state = RoomEventCacheState::new( + let room_state = RoomEventCacheStateLock::new( room_id.to_owned(), room_version_rules, enabled_thread_support, @@ -1016,7 +1027,7 @@ impl EventCacheInner { .await?; let timeline_is_not_empty = - room_state.room_linked_chunk().revents().next().is_some(); + room_state.read().await?.room_linked_chunk().revents().next().is_some(); // SAFETY: we must have subscribed before reaching this code, otherwise // something is very wrong. @@ -1214,7 +1225,7 @@ mod tests { let mut generic_stream = event_cache.subscribe_to_room_generic_updates(); let (room_event_cache, _drop_handles) = event_cache.for_room(room_id).await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await; + let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); assert!(events.is_empty()); @@ -1300,15 +1311,15 @@ mod tests { let (room_event_cache, _drop_handles) = room1.event_cache().await.unwrap(); - let found1 = room_event_cache.find_event(eid1).await.unwrap(); + let found1 = room_event_cache.find_event(eid1).await.unwrap().unwrap(); assert_event_matches_msg(&found1, "hey"); - let found2 = room_event_cache.find_event(eid2).await.unwrap(); + let found2 = room_event_cache.find_event(eid2).await.unwrap().unwrap(); assert_event_matches_msg(&found2, "you"); // Retrieving the event with id3 from the room which doesn't contain it will // fail… - assert!(room_event_cache.find_event(eid3).await.is_none()); + assert!(room_event_cache.find_event(eid3).await.unwrap().is_none()); } #[async_test] @@ -1329,7 +1340,7 @@ mod tests { room_event_cache.save_events([f.text_msg("hey there").event_id(event_id).into()]).await; // Retrieving the event at the room-wide cache works. - assert!(room_event_cache.find_event(event_id).await.is_some()); + assert!(room_event_cache.find_event(event_id).await.unwrap().is_some()); } #[async_test] @@ -1352,7 +1363,9 @@ mod tests { .event_cache_store() .lock() .await - .unwrap() + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(room_id_0), vec![ @@ -1417,7 +1430,9 @@ mod tests { .event_cache_store() .lock() .await - .unwrap() + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 1d2ec50c027..df4ee95451b 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -14,7 +14,10 @@ //! A sub-object for running pagination tasks on a given room. -use std::{sync::Arc, time::Duration}; +use std::{ + sync::{Arc, atomic::Ordering}, + time::Duration, +}; use eyeball::{SharedObservable, Subscriber}; use matrix_sdk_base::timeout::timeout; @@ -182,11 +185,13 @@ impl RoomPagination { // there's no previous events chunk to load. loop { - let mut state_guard = self.inner.state.write().await; + let mut state_guard = self.inner.state.write().await?; match state_guard.load_more_events_backwards().await? { LoadMoreEventsBackwardsOutcome::Gap { prev_token } => { - if prev_token.is_none() && !state_guard.waited_for_initial_prev_token { + if prev_token.is_none() + && !state_guard.waited_for_initial_prev_token().load(Ordering::SeqCst) + { // We didn't reload a pagination token, and we haven't waited for one; wait // and start over. @@ -205,7 +210,12 @@ impl RoomPagination { .await; trace!("done waiting"); - self.inner.state.write().await.waited_for_initial_prev_token = true; + self.inner + .state + .write() + .await? + .waited_for_initial_prev_token() + .store(true, Ordering::SeqCst); // Retry! // @@ -296,7 +306,7 @@ impl RoomPagination { .inner .state .write() - .await + .await? .handle_backpagination(events, new_token, prev_token) .await? { diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index 78a14c1b16d..0b81dcc2418 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -40,7 +40,7 @@ use ruma::{ serde::Raw, }; use tokio::sync::{ - Notify, RwLock, + Notify, broadcast::{Receiver, Sender}, mpsc, }; @@ -164,7 +164,7 @@ impl RoomEventCache { /// Create a new [`RoomEventCache`] using the given room and store. pub(super) fn new( client: WeakClient, - state: RoomEventCacheState, + state: RoomEventCacheStateLock, pagination_status: SharedObservable, room_id: OwnedRoomId, auto_shrink_sender: mpsc::Sender, @@ -186,10 +186,10 @@ impl RoomEventCache { /// /// Use [`RoomEventCache::subscribe`] to get all current events, plus a /// subscriber. - pub async fn events(&self) -> Vec { - let state = self.inner.state.read().await; + pub async fn events(&self) -> Result> { + let state = self.inner.state.read().await?; - state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect() + Ok(state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect()) } /// Subscribe to this room updates, after getting the initial list of @@ -198,12 +198,13 @@ impl RoomEventCache { /// Use [`RoomEventCache::events`] to get all current events without the /// subscriber. Creating, and especially dropping, a /// [`RoomEventCacheSubscriber`] isn't free, as it triggers side-effects. - pub async fn subscribe(&self) -> (Vec, RoomEventCacheSubscriber) { - let state = self.inner.state.read().await; + pub async fn subscribe(&self) -> Result<(Vec, RoomEventCacheSubscriber)> { + let state = self.inner.state.read().await?; let events = state.room_linked_chunk().events().map(|(_position, item)| item.clone()).collect(); - let previous_subscriber_count = state.subscriber_count.fetch_add(1, Ordering::SeqCst); + let subscriber_count = state.subscriber_count(); + let previous_subscriber_count = subscriber_count.fetch_add(1, Ordering::SeqCst); trace!("added a room event cache subscriber; new count: {}", previous_subscriber_count + 1); let recv = self.inner.sender.subscribe(); @@ -211,10 +212,10 @@ impl RoomEventCache { recv, room_id: self.inner.room_id.clone(), auto_shrink_sender: self.inner.auto_shrink_sender.clone(), - subscriber_count: state.subscriber_count.clone(), + subscriber_count: subscriber_count.clone(), }; - (events, subscriber) + Ok((events, subscriber)) } /// Subscribe to thread for a given root event, and get a (maybe empty) @@ -222,9 +223,9 @@ impl RoomEventCache { pub async fn subscribe_to_thread( &self, thread_root: OwnedEventId, - ) -> (Vec, Receiver) { - let mut state = self.inner.state.write().await; - state.subscribe_to_thread(thread_root) + ) -> Result<(Vec, Receiver)> { + let mut state = self.inner.state.write().await?; + Ok(state.subscribe_to_thread(thread_root)) } /// Paginate backwards in a thread, given its root event ID. @@ -241,7 +242,7 @@ impl RoomEventCache { // Take the lock only for a short time here. let mut outcome = - self.inner.state.write().await.load_more_thread_events_backwards(thread_root.clone()); + self.inner.state.write().await?.load_more_thread_events_backwards(thread_root.clone()); loop { match outcome { @@ -275,7 +276,7 @@ impl RoomEventCache { None }; - let mut state = self.inner.state.write().await; + let mut state = self.inner.state.write().await?; // Save all the events (but the thread root) in the store. state.save_events(result.chunk.iter().cloned()).await?; @@ -321,27 +322,28 @@ impl RoomEventCache { /// /// **Warning**! It looks into the loaded events from the in-memory linked /// chunk **only**. It doesn't look inside the storage. - pub async fn rfind_map_event_in_memory_by(&self, predicate: P) -> Option + pub async fn rfind_map_event_in_memory_by(&self, predicate: P) -> Result> where P: FnMut(&Event) -> Option, { - self.inner.state.read().await.rfind_map_event_in_memory_by(predicate) + Ok(self.inner.state.read().await?.rfind_map_event_in_memory_by(predicate)) } /// Try to find an event by ID in this room. /// /// It starts by looking into loaded events before looking inside the /// storage. - pub async fn find_event(&self, event_id: &EventId) -> Option { - self.inner + pub async fn find_event(&self, event_id: &EventId) -> Result> { + Ok(self + .inner .state .read() - .await + .await? .find_event(event_id) .await .ok() .flatten() - .map(|(_loc, event)| event) + .map(|(_loc, event)| event)) } /// Try to find an event by ID in this room, along with its related events. @@ -359,16 +361,17 @@ impl RoomEventCache { &self, event_id: &EventId, filter: Option>, - ) -> Option<(Event, Vec)> { + ) -> Result)>> { // Search in all loaded or stored events. - self.inner + Ok(self + .inner .state .read() - .await + .await? .find_event_with_relations(event_id, filter.clone()) .await .ok() - .flatten() + .flatten()) } /// Clear all the storage for this [`RoomEventCache`]. @@ -377,7 +380,7 @@ impl RoomEventCache { /// storage. pub async fn clear(&self) -> Result<()> { // Clear the linked chunk and persisted storage. - let updates_as_vector_diffs = self.inner.state.write().await.reset().await?; + let updates_as_vector_diffs = self.inner.state.write().await?.reset().await?; // Notify observers about the update. let _ = self.inner.sender.send(RoomEventCacheUpdate::UpdateTimelineEvents { @@ -397,15 +400,23 @@ impl RoomEventCache { /// Save some events in the event cache, for further retrieval with /// [`Self::event`]. pub(crate) async fn save_events(&self, events: impl IntoIterator) { - if let Err(err) = self.inner.state.write().await.save_events(events).await { - warn!("couldn't save event in the event cache: {err}"); + match self.inner.state.write().await { + Ok(mut state_guard) => { + if let Err(err) = state_guard.save_events(events).await { + warn!("couldn't save event in the event cache: {err}"); + } + } + + Err(err) => { + warn!("couldn't save event in the event cache: {err}"); + } } } /// Return a nice debug string (a vector of lines) for the linked chunk of /// events for this room. pub async fn debug_string(&self) -> Vec { - self.inner.state.read().await.room_linked_chunk().debug_string() + self.inner.state.read().await.unwrap().room_linked_chunk().debug_string() } } @@ -420,7 +431,7 @@ pub(super) struct RoomEventCacheInner { pub sender: Sender, /// State for this room's event cache. - pub state: RwLock, + pub state: RoomEventCacheStateLock, /// A notifier that we received a new pagination token. pub pagination_batch_token_notifier: Notify, @@ -446,7 +457,7 @@ impl RoomEventCacheInner { /// to handle new timeline events. fn new( client: WeakClient, - state: RoomEventCacheState, + state: RoomEventCacheStateLock, pagination_status: SharedObservable, room_id: OwnedRoomId, auto_shrink_sender: mpsc::Sender, @@ -457,7 +468,7 @@ impl RoomEventCacheInner { Self { room_id: weak_room.room_id().to_owned(), weak_room, - state: RwLock::new(state), + state, sender, pagination_batch_token_notifier: Default::default(), auto_shrink_sender, @@ -545,7 +556,7 @@ impl RoomEventCacheInner { trace!("adding new events"); let (stored_prev_batch_token, timeline_event_diffs) = - self.state.write().await.handle_sync(timeline).await?; + self.state.write().await?.handle_sync(timeline).await?; // Now that all events have been added, we can trigger the // `pagination_token_notifier`. @@ -602,7 +613,10 @@ pub(super) enum LoadMoreEventsBackwardsOutcome { mod private { use std::{ collections::{BTreeMap, HashMap, HashSet}, - sync::{Arc, atomic::AtomicUsize}, + sync::{ + Arc, + atomic::{AtomicBool, AtomicUsize, Ordering}, + }, }; use eyeball::SharedObservable; @@ -612,12 +626,11 @@ mod private { deserialized_responses::{ThreadSummary, ThreadSummaryStatus, TimelineEventKind}, event_cache::{ Event, Gap, - store::{DynEventCacheStore, EventCacheStoreLock}, + store::{EventCacheStoreLock, EventCacheStoreLockGuard, EventCacheStoreLockState}, }, linked_chunk::{ ChunkContent, ChunkIdentifierGenerator, ChunkMetadata, LinkedChunkId, - OwnedLinkedChunkId, Position, Update, - lazy_loader::{self}, + OwnedLinkedChunkId, Position, Update, lazy_loader, }, serde_helpers::{extract_edit_target, extract_thread_root}, sync::Timeline, @@ -632,35 +645,39 @@ mod private { room_version_rules::RoomVersionRules, serde::Raw, }; - use tokio::sync::broadcast::{Receiver, Sender}; + use tokio::sync::{ + RwLock, RwLockReadGuard, RwLockWriteGuard, + broadcast::{Receiver, Sender}, + }; use tracing::{debug, error, instrument, trace, warn}; use super::{ - super::{EventCacheError, deduplicator::DeduplicationOutcome}, + super::{ + BackPaginationOutcome, EventCacheError, RoomEventCacheLinkedChunkUpdate, + RoomPaginationStatus, ThreadEventCacheUpdate, + deduplicator::{DeduplicationOutcome, filter_duplicate_events}, + room::threads::ThreadEventCache, + }, EventLocation, LoadMoreEventsBackwardsOutcome, events::EventLinkedChunk, sort_positions_descending, }; - use crate::event_cache::{ - BackPaginationOutcome, RoomEventCacheLinkedChunkUpdate, RoomPaginationStatus, - ThreadEventCacheUpdate, deduplicator::filter_duplicate_events, - room::threads::ThreadEventCache, - }; /// State for a single room's event cache. /// /// This contains all the inner mutable states that ought to be updated at /// the same time. - pub struct RoomEventCacheState { - /// The room this state relates to. - room: OwnedRoomId, - - /// The rules for the version of this room. - room_version_rules: RoomVersionRules, + pub struct RoomEventCacheStateLock { + locked_state: RwLock, + } + struct RoomEventCacheStateLockInner { /// Whether thread support has been enabled for the event cache. enabled_thread_support: bool, + /// The room this state relates to. + room_id: OwnedRoomId, + /// Reference to the underlying backing store. store: EventCacheStoreLock, @@ -673,24 +690,27 @@ mod private { /// Keyed by the thread root event ID. threads: HashMap, - /// Have we ever waited for a previous-batch-token to come from sync, in - /// the context of pagination? We do this at most once per room, - /// the first time we try to run backward pagination. We reset - /// that upon clearing the timeline events. - pub waited_for_initial_prev_token: bool, - pagination_status: SharedObservable, /// See doc comment of /// [`super::super::EventCacheInner::linked_chunk_update_sender`]. linked_chunk_update_sender: Sender, + /// The rules for the version of this room. + room_version_rules: RoomVersionRules, + + /// Have we ever waited for a previous-batch-token to come from sync, in + /// the context of pagination? We do this at most once per room, + /// the first time we try to run backward pagination. We reset + /// that upon clearing the timeline events. + waited_for_initial_prev_token: Arc, + /// An atomic count of the current number of subscriber of the /// [`super::RoomEventCache`]. - pub(super) subscriber_count: Arc, + subscriber_count: Arc, } - impl RoomEventCacheState { + impl RoomEventCacheStateLock { /// Create a new state, or reload it from storage if it's been enabled. /// /// Not all events are going to be loaded. Only a portion of them. The @@ -708,7 +728,17 @@ mod private { store: EventCacheStoreLock, pagination_status: SharedObservable, ) -> Result { - let store_lock = store.lock().await?; + let store_guard = match store.lock().await? { + // + EventCacheStoreLockState::Clean(guard) => guard, + + // + EventCacheStoreLockState::Dirty(guard) => { + EventCacheStoreLockGuard::clear_dirty(&guard); + + guard + } + }; let linked_chunk_id = LinkedChunkId::Room(&room_id); @@ -717,7 +747,7 @@ mod private { // If loading the full linked chunk failed, we'll clear the event cache, as it // indicates that at some point, there's some malformed data. let full_linked_chunk_metadata = - match Self::load_linked_chunk_metadata(&*store_lock, linked_chunk_id).await { + match load_linked_chunk_metadata(&store_guard, linked_chunk_id).await { Ok(metas) => metas, Err(err) => { error!( @@ -725,7 +755,7 @@ mod private { ); // Try to clear storage for this room. - store_lock + store_guard .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) .await?; @@ -734,7 +764,7 @@ mod private { } }; - let linked_chunk = match store_lock + let linked_chunk = match store_guard .load_last_chunk(linked_chunk_id) .await .map_err(EventCacheError::from) @@ -749,7 +779,7 @@ mod private { ); // Try to clear storage for this room. - store_lock + store_guard .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) .await?; @@ -757,174 +787,253 @@ mod private { } }; - let room_linked_chunk = EventLinkedChunk::with_initial_linked_chunk( - linked_chunk, - full_linked_chunk_metadata, - ); - - // The threads mapping is intentionally empty at start, since we're going to - // reload threads lazily, as soon as we need to (based on external - // subscribers) or when we get new information about those (from - // sync). - let threads = HashMap::new(); + let waited_for_initial_prev_token = Arc::new(AtomicBool::new(false)); Ok(Self { - room: room_id, - room_version_rules, - enabled_thread_support, - store, - room_linked_chunk, - threads, - waited_for_initial_prev_token: false, - subscriber_count: Default::default(), - pagination_status, - linked_chunk_update_sender, + locked_state: RwLock::new(RoomEventCacheStateLockInner { + enabled_thread_support, + room_id, + store, + room_linked_chunk: EventLinkedChunk::with_initial_linked_chunk( + linked_chunk, + full_linked_chunk_metadata, + ), + // The threads mapping is intentionally empty at start, since we're going to + // reload threads lazily, as soon as we need to (based on external + // subscribers) or when we get new information about those (from + // sync). + threads: HashMap::new(), + pagination_status, + linked_chunk_update_sender, + room_version_rules, + waited_for_initial_prev_token, + subscriber_count: Default::default(), + }), }) } - /// Load a linked chunk's full metadata, making sure the chunks are - /// according to their their links. - /// - /// Returns `None` if there's no such linked chunk in the store, or an - /// error if the linked chunk is malformed. - async fn load_linked_chunk_metadata( - store: &DynEventCacheStore, - linked_chunk_id: LinkedChunkId<'_>, - ) -> Result>, EventCacheError> { - let mut all_chunks = store - .load_all_chunks_metadata(linked_chunk_id) - .await - .map_err(EventCacheError::from)?; - - if all_chunks.is_empty() { - // There are no chunks, so there's nothing to do. - return Ok(None); - } + pub async fn read(&self) -> Result, EventCacheError> { + let state_guard = self.locked_state.read().await; + let store_guard = match state_guard.store.lock().await? { + EventCacheStoreLockState::Clean(guard) => guard, + EventCacheStoreLockState::Dirty(_guard) => todo!("Dirty lock"), + }; - // Transform the vector into a hashmap, for quick lookup of the predecessors. - let chunk_map: HashMap<_, _> = - all_chunks.iter().map(|meta| (meta.identifier, meta)).collect(); + Ok(RoomEventCacheStateLockReadGuard { state: state_guard, store: store_guard }) + } - // Find a last chunk. - let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none()); - let Some(last) = iter.next() else { - return Err(EventCacheError::InvalidLinkedChunkMetadata { - details: "no last chunk found".to_owned(), - }); + pub async fn write( + &self, + ) -> Result, EventCacheError> { + let state_guard = self.locked_state.write().await; + let store_guard = match state_guard.store.lock().await? { + EventCacheStoreLockState::Clean(guard) => guard, + EventCacheStoreLockState::Dirty(_guard) => todo!("Dirty lock"), }; - // There must at most one last chunk. - if let Some(other_last) = iter.next() { - return Err(EventCacheError::InvalidLinkedChunkMetadata { - details: format!( - "chunks {} and {} both claim to be last chunks", - last.identifier.index(), - other_last.identifier.index() - ), - }); + Ok(RoomEventCacheStateLockWriteGuard { state: state_guard, store: store_guard }) + } + } + + pub struct RoomEventCacheStateLockReadGuard<'a> { + state: RwLockReadGuard<'a, RoomEventCacheStateLockInner>, + store: EventCacheStoreLockGuard, + } + + pub struct RoomEventCacheStateLockWriteGuard<'a> { + state: RwLockWriteGuard<'a, RoomEventCacheStateLockInner>, + store: EventCacheStoreLockGuard, + } + + impl<'a> RoomEventCacheStateLockWriteGuard<'a> { + fn as_read_guard(&'a self) -> RoomEventCacheStateLockReadGuard<'a> { + todo!() + /* + RoomEventCacheStateLockReadGuard { + room_id: self.room_id, + store_guard: self.store_guard.clone(), + room_linked_chunk: self.room_linked_chunk, } + */ + } + } - // Rewind the chain back to the first chunk, and do some checks at the same - // time. - let mut seen = HashSet::new(); - let mut current = last; - loop { - // If we've already seen this chunk, there's a cycle somewhere. - if !seen.insert(current.identifier) { - return Err(EventCacheError::InvalidLinkedChunkMetadata { - details: format!( - "cycle detected in linked chunk at {}", - current.identifier.index() - ), - }); + impl<'a> RoomEventCacheStateLockReadGuard<'a> { + /// Returns a read-only reference to the underlying room linked chunk. + pub fn room_linked_chunk(&self) -> &EventLinkedChunk { + &self.state.room_linked_chunk + } + + pub fn subscriber_count(&self) -> &Arc { + &self.state.subscriber_count + } + + /// Find a single event in this room. + /// + /// It starts by looking into loaded events in `EventLinkedChunk` before + /// looking inside the storage. + pub async fn find_event( + &self, + event_id: &EventId, + ) -> Result, EventCacheError> { + // There are supposedly fewer events loaded in memory than in the store. Let's + // start by looking up in the `EventLinkedChunk`. + for (position, event) in self.state.room_linked_chunk.revents() { + if event.event_id().as_deref() == Some(event_id) { + return Ok(Some((EventLocation::Memory(position), event.clone()))); } + } - let Some(prev_id) = current.previous else { - // If there's no previous chunk, we're done. - if seen.len() != all_chunks.len() { - return Err(EventCacheError::InvalidLinkedChunkMetadata { - details: format!( - "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected", - seen.len(), - all_chunks.len() - ), - }); - } - break; - }; + Ok(self + .store + .find_event(&self.state.room_id, event_id) + .await? + .map(|event| (EventLocation::Store, event))) + } - // If the previous chunk is not in the map, then it's unknown - // and missing. - let Some(pred_meta) = chunk_map.get(&prev_id) else { - return Err(EventCacheError::InvalidLinkedChunkMetadata { - details: format!( - "missing predecessor {} chunk for {}", - prev_id.index(), - current.identifier.index() - ), - }); - }; + /// Find an event and all its relations in the persisted storage. + /// + /// This goes straight to the database, as a simplification; we don't + /// expect to need to have to look up in memory events, or that + /// all the related events are actually loaded. + /// + /// The related events are sorted like this: + /// - events saved out-of-band with + /// [`super::RoomEventCache::save_events`] will be located at the + /// beginning of the array. + /// - events present in the linked chunk (be it in memory or in the + /// database) will be sorted according to their ordering in the linked + /// chunk. + pub async fn find_event_with_relations( + &self, + event_id: &EventId, + filters: Option>, + ) -> Result)>, EventCacheError> { + // First, hit storage to get the target event and its related events. + let found = self.store.find_event(&self.state.room_id, event_id).await?; - // If the previous chunk isn't connected to the next, then the link is invalid. - if pred_meta.next != Some(current.identifier) { - return Err(EventCacheError::InvalidLinkedChunkMetadata { - details: format!( - "chunk {}'s next ({:?}) doesn't match the current chunk ({})", - pred_meta.identifier.index(), - pred_meta.next.map(|chunk_id| chunk_id.index()), - current.identifier.index() - ), - }); + let Some(target) = found else { + // We haven't found the event: return early. + return Ok(None); + }; + + // Then, initialize the stack with all the related events, to find the + // transitive closure of all the related events. + let mut related = self + .store + .find_event_relations(&self.state.room_id, event_id, filters.as_deref()) + .await?; + let mut stack = + related.iter().filter_map(|(event, _pos)| event.event_id()).collect::>(); + + // Also keep track of already seen events, in case there's a loop in the + // relation graph. + let mut already_seen = HashSet::new(); + already_seen.insert(event_id.to_owned()); + + let mut num_iters = 1; + + // Find the related event for each previously-related event. + while let Some(event_id) = stack.pop() { + if !already_seen.insert(event_id.clone()) { + // Skip events we've already seen. + continue; } - current = *pred_meta; + let other_related = self + .store + .find_event_relations(&self.state.room_id, &event_id, filters.as_deref()) + .await?; + + stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id())); + related.extend(other_related); + + num_iters += 1; } - // At this point, `current` is the identifier of the first chunk. - // - // Reorder the resulting vector, by going through the chain of `next` links, and - // swapping items into their final position. + trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events"); + + // Sort the results by their positions in the linked chunk, if available. // - // Invariant in this loop: all items in [0..i[ are in their final, correct - // position. - let mut current = current.identifier; - for i in 0..all_chunks.len() { - // Find the target metadata. - let j = all_chunks - .iter() - .rev() - .position(|meta| meta.identifier == current) - .map(|j| all_chunks.len() - 1 - j) - .expect("the target chunk must be present in the metadata"); - if i != j { - all_chunks.swap(i, j); - } - if let Some(next) = all_chunks[i].next { - current = next; + // If an event doesn't have a known position, it goes to the start of the array. + related.sort_by(|(_, lhs), (_, rhs)| { + use std::cmp::Ordering; + match (lhs, rhs) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => { + let lhs = self.room_event_order(*lhs); + let rhs = self.room_event_order(*rhs); + + // The events should have a definite position, but in the case they don't, + // still consider that not having a position means you'll end at the start + // of the array. + match (lhs, rhs) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(lhs), Some(rhs)) => lhs.cmp(&rhs), + } + } } - } + }); + + // Keep only the events, not their positions. + let related = related.into_iter().map(|(event, _pos)| event).collect(); + + Ok(Some((target, related))) + } + + //// Find a single event in this room, starting from the most recent event. + /// + /// **Warning**! It looks into the loaded events from the in-memory + /// linked chunk **only**. It doesn't look inside the storage, + /// contrary to [`Self::find_event`]. + pub fn rfind_map_event_in_memory_by(&self, mut predicate: P) -> Option + where + P: FnMut(&Event) -> Option, + { + self.state.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event)) + } + + pub(super) fn room_event_order(&self, event_pos: Position) -> Option { + self.state.room_linked_chunk.event_order(event_pos) + } + } - Ok(Some(all_chunks)) + impl<'a> RoomEventCacheStateLockWriteGuard<'a> { + /// Returns a write reference to the underlying room linked chunk. + pub fn room_linked_chunk(&mut self) -> &mut EventLinkedChunk { + &mut self.state.room_linked_chunk + } + + pub fn waited_for_initial_prev_token(&self) -> &Arc { + &self.state.waited_for_initial_prev_token } /// Load more events backwards if the last chunk is **not** a gap. - pub(in super::super) async fn load_more_events_backwards( + pub async fn load_more_events_backwards( &mut self, ) -> Result { // If any in-memory chunk is a gap, don't load more events, and let the caller // resolve the gap. - if let Some(prev_token) = self.room_linked_chunk.rgap().map(|gap| gap.prev_token) { + if let Some(prev_token) = self.state.room_linked_chunk.rgap().map(|gap| gap.prev_token) + { return Ok(LoadMoreEventsBackwardsOutcome::Gap { prev_token: Some(prev_token) }); } - let store = self.store.lock().await?; - - let prev_first_chunk = - self.room_linked_chunk.chunks().next().expect("a linked chunk is never empty"); + let prev_first_chunk = self + .state + .room_linked_chunk + .chunks() + .next() + .expect("a linked chunk is never empty"); // The first chunk is not a gap, we can load its previous chunk. - let linked_chunk_id = LinkedChunkId::Room(&self.room); - let new_first_chunk = match store + let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id); + let new_first_chunk = match self + .store .load_previous_chunk(linked_chunk_id, prev_first_chunk.identifier()) .await { @@ -938,7 +1047,7 @@ mod private { // sync for that room, because every room must have *at least* a room creation // event. Otherwise, we have reached the start of the timeline. - if self.room_linked_chunk.events().next().is_some() { + if self.state.room_linked_chunk.events().next().is_some() { // If there's at least one event, this means we've reached the start of the // timeline, since the chunk is fully loaded. trace!("chunk is fully loaded and non-empty: reached_start=true"); @@ -953,7 +1062,9 @@ mod private { error!("error when loading the previous chunk of a linked chunk: {err}"); // Clear storage for this room. - store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?; + self.store + .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) + .await?; // Return the error. return Err(err.into()); @@ -969,11 +1080,18 @@ mod private { // `Items`. let reached_start = new_first_chunk.previous.is_none(); - if let Err(err) = self.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk) { + if let Err(err) = + self.state.room_linked_chunk.insert_new_chunk_as_first(new_first_chunk) + { error!("error when inserting the previous chunk into its linked chunk: {err}"); // Clear storage for this room. - store.handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]).await?; + self.store + .handle_linked_chunk_updates( + LinkedChunkId::Room(&self.state.room_id), + vec![Update::Clear], + ) + .await?; // Return the error. return Err(err.into()); @@ -981,10 +1099,10 @@ mod private { // ⚠️ Let's not propagate the updates to the store! We already have these data // in the store! Let's drain them. - let _ = self.room_linked_chunk.store_updates().take(); + let _ = self.state.room_linked_chunk.store_updates().take(); // However, we want to get updates as `VectorDiff`s. - let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs(); + let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); Ok(match chunk_content { ChunkContent::Gap(gap) => { @@ -1011,13 +1129,11 @@ mod private { /// pending diff updates with the result of this function. /// /// Otherwise, returns `None`. - pub(super) async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> { - let store_lock = self.store.lock().await?; - + pub async fn shrink_to_last_chunk(&mut self) -> Result<(), EventCacheError> { // Attempt to load the last chunk. - let linked_chunk_id = LinkedChunkId::Room(&self.room); + let linked_chunk_id = LinkedChunkId::Room(&self.state.room_id); let (last_chunk, chunk_identifier_generator) = - match store_lock.load_last_chunk(linked_chunk_id).await { + match self.store.load_last_chunk(linked_chunk_id).await { Ok(pair) => pair, Err(err) => { @@ -1025,7 +1141,7 @@ mod private { error!("error when reloading a linked chunk from memory: {err}"); // Clear storage for this room. - store_lock + self.store .handle_linked_chunk_updates(linked_chunk_id, vec![Update::Clear]) .await?; @@ -1039,7 +1155,7 @@ mod private { // Remove all the chunks from the linked chunks, except for the last one, and // updates the chunk identifier generator. if let Err(err) = - self.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator) + self.state.room_linked_chunk.replace_with(last_chunk, chunk_identifier_generator) { error!("error when replacing the linked chunk: {err}"); return self.reset_internal().await; @@ -1048,11 +1164,13 @@ mod private { // Let pagination observers know that we may have not reached the start of the // timeline. // TODO: likely need to cancel any ongoing pagination. - self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false }); + self.state + .pagination_status + .set(RoomPaginationStatus::Idle { hit_timeline_start: false }); // Don't propagate those updates to the store; this is only for the in-memory // representation that we're doing this. Let's drain those store updates. - let _ = self.room_linked_chunk.store_updates().take(); + let _ = self.state.room_linked_chunk.store_updates().take(); Ok(()) } @@ -1060,10 +1178,10 @@ mod private { /// Automatically shrink the room if there are no more subscribers, as /// indicated by the atomic number of active subscribers. #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] - pub(crate) async fn auto_shrink_if_no_subscribers( + pub async fn auto_shrink_if_no_subscribers( &mut self, ) -> Result>>, EventCacheError> { - let subscriber_count = self.subscriber_count.load(std::sync::atomic::Ordering::SeqCst); + let subscriber_count = self.state.subscriber_count.load(Ordering::SeqCst); trace!(subscriber_count, "received request to auto-shrink"); @@ -1071,66 +1189,20 @@ mod private { // If we are the last strong reference to the auto-shrinker, we can shrink the // events data structure to its last chunk. self.shrink_to_last_chunk().await?; - Ok(Some(self.room_linked_chunk.updates_as_vector_diffs())) + + Ok(Some(self.state.room_linked_chunk.updates_as_vector_diffs())) } else { Ok(None) } } #[cfg(test)] - pub(crate) async fn force_shrink_to_last_chunk( + pub async fn force_shrink_to_last_chunk( &mut self, ) -> Result>, EventCacheError> { self.shrink_to_last_chunk().await?; - Ok(self.room_linked_chunk.updates_as_vector_diffs()) - } - - pub(crate) fn room_event_order(&self, event_pos: Position) -> Option { - self.room_linked_chunk.event_order(event_pos) - } - - /// Removes the bundled relations from an event, if they were present. - /// - /// Only replaces the present if it contained bundled relations. - fn strip_relations_if_present(event: &mut Raw) { - // We're going to get rid of the `unsigned`/`m.relations` field, if it's - // present. - // Use a closure that returns an option so we can quickly short-circuit. - let mut closure = || -> Option<()> { - let mut val: serde_json::Value = event.deserialize_as().ok()?; - let unsigned = val.get_mut("unsigned")?; - let unsigned_obj = unsigned.as_object_mut()?; - if unsigned_obj.remove("m.relations").is_some() { - *event = Raw::new(&val).ok()?.cast_unchecked(); - } - None - }; - let _ = closure(); - } - - fn strip_relations_from_event(ev: &mut Event) { - match &mut ev.kind { - TimelineEventKind::Decrypted(decrypted) => { - // Remove all information about encryption info for - // the bundled events. - decrypted.unsigned_encryption_info = None; - - // Remove the `unsigned`/`m.relations` field, if needs be. - Self::strip_relations_if_present(&mut decrypted.event); - } - - TimelineEventKind::UnableToDecrypt { event, .. } - | TimelineEventKind::PlainText { event } => { - Self::strip_relations_if_present(event); - } - } - } - /// Strips the bundled relations from a collection of events. - fn strip_relations_from_events(items: &mut [Event]) { - for ev in items.iter_mut() { - Self::strip_relations_from_event(ev); - } + Ok(self.state.room_linked_chunk.updates_as_vector_diffs()) } /// Remove events by their position, in `EventLinkedChunk` and in @@ -1139,7 +1211,7 @@ mod private { /// This method is purposely isolated because it must ensure that /// positions are sorted appropriately or it can be disastrous. #[instrument(skip_all)] - async fn remove_events( + pub async fn remove_events( &mut self, in_memory_events: Vec<(OwnedEventId, Position)>, in_store_events: Vec<(OwnedEventId, Position)>, @@ -1168,7 +1240,8 @@ mod private { } // `remove_events_by_position` is responsible of sorting positions. - self.room_linked_chunk + self.state + .room_linked_chunk .remove_events_by_position( in_memory_events.into_iter().map(|(_event_id, position)| position).collect(), ) @@ -1177,9 +1250,8 @@ mod private { self.propagate_changes().await } - /// Propagate changes to the underlying storage. async fn propagate_changes(&mut self) -> Result<(), EventCacheError> { - let updates = self.room_linked_chunk.store_updates().take(); + let updates = self.state.room_linked_chunk.store_updates().take(); self.send_updates_to_store(updates).await } @@ -1193,7 +1265,7 @@ mod private { &mut self, updates: Vec>, ) -> Result<(), EventCacheError> { - self.room_linked_chunk.order_tracker.map_updates(&updates); + self.state.room_linked_chunk.order_tracker.map_updates(&updates); self.send_updates_to_store(updates).await } @@ -1208,8 +1280,8 @@ mod private { // Strip relations from updates which insert or replace items. for update in updates.iter_mut() { match update { - Update::PushItems { items, .. } => Self::strip_relations_from_events(items), - Update::ReplaceItem { item, .. } => Self::strip_relations_from_event(item), + Update::PushItems { items, .. } => strip_relations_from_events(items), + Update::ReplaceItem { item, .. } => strip_relations_from_event(item), // Other update kinds don't involve adding new events. Update::NewItemsChunk { .. } | Update::NewGapChunk { .. } @@ -1229,12 +1301,10 @@ mod private { // storing updates happens in the expected order. let store = self.store.clone(); - let room_id = self.room.clone(); + let room_id = self.state.room_id.clone(); let cloned_updates = updates.clone(); spawn(async move { - let store = store.lock().await?; - trace!(updates = ?cloned_updates, "sending linked chunk updates to the store"); let linked_chunk_id = LinkedChunkId::Room(&room_id); store.handle_linked_chunk_updates(linked_chunk_id, cloned_updates).await?; @@ -1246,8 +1316,8 @@ mod private { .expect("joining failed")?; // Forward that the store got updated to observers. - let _ = self.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate { - linked_chunk_id: OwnedLinkedChunkId::Room(self.room.clone()), + let _ = self.state.linked_chunk_update_sender.send(RoomEventCacheLinkedChunkUpdate { + linked_chunk_id: OwnedLinkedChunkId::Room(self.state.room_id.clone()), updates, }); @@ -1259,11 +1329,10 @@ mod private { /// Return a single diff update that is a clear of all events; as a /// result, the caller may override any pending diff updates /// with the result of this function. - #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] pub async fn reset(&mut self) -> Result>, EventCacheError> { self.reset_internal().await?; - let diff_updates = self.room_linked_chunk.updates_as_vector_diffs(); + let diff_updates = self.state.room_linked_chunk.updates_as_vector_diffs(); // Ensure the contract defined in the doc comment is true: debug_assert_eq!(diff_updates.len(), 1); @@ -1273,14 +1342,13 @@ mod private { } async fn reset_internal(&mut self) -> Result<(), EventCacheError> { - self.room_linked_chunk.reset(); + self.state.room_linked_chunk.reset(); // No need to update the thread summaries: the room events are - // gone because of the - // reset of `room_linked_chunk`. + // gone because of the reset of `room_linked_chunk`. // // Clear the threads. - for thread in self.threads.values_mut() { + for thread in self.state.threads.values_mut() { thread.clear(); } @@ -1289,146 +1357,264 @@ mod private { // Reset the pagination state too: pretend we never waited for the initial // prev-batch token, and indicate that we're not at the start of the // timeline, since we don't know about that anymore. - self.waited_for_initial_prev_token = false; + self.state.waited_for_initial_prev_token.store(false, Ordering::SeqCst); // TODO: likely must cancel any ongoing back-paginations too - self.pagination_status.set(RoomPaginationStatus::Idle { hit_timeline_start: false }); + self.state + .pagination_status + .set(RoomPaginationStatus::Idle { hit_timeline_start: false }); Ok(()) } - /// Returns a read-only reference to the underlying room linked chunk. - pub fn room_linked_chunk(&self) -> &EventLinkedChunk { - &self.room_linked_chunk - } - - //// Find a single event in this room, starting from the most recent event. + /// Handle the result of a sync. /// - /// **Warning**! It looks into the loaded events from the in-memory - /// linked chunk **only**. It doesn't look inside the storage, - /// contrary to [`Self::find_event`]. - pub fn rfind_map_event_in_memory_by(&self, mut predicate: P) -> Option - where - P: FnMut(&Event) -> Option, - { - self.room_linked_chunk.revents().find_map(|(_position, event)| predicate(event)) - } - - /// Find a single event in this room. + /// It may send room event cache updates to the given sender, if it + /// generated any of those. /// - /// It starts by looking into loaded events in `EventLinkedChunk` before - /// looking inside the storage. - pub async fn find_event( - &self, - event_id: &EventId, - ) -> Result, EventCacheError> { - // There are supposedly fewer events loaded in memory than in the store. Let's - // start by looking up in the `EventLinkedChunk`. - for (position, event) in self.room_linked_chunk.revents() { - if event.event_id().as_deref() == Some(event_id) { - return Ok(Some((EventLocation::Memory(position), event.clone()))); + /// Returns `true` for the first part of the tuple if a new gap + /// (previous-batch token) has been inserted, `false` otherwise. + #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] + pub async fn handle_sync( + &mut self, + mut timeline: Timeline, + ) -> Result<(bool, Vec>), EventCacheError> { + let mut prev_batch = timeline.prev_batch.take(); + + let DeduplicationOutcome { + all_events: events, + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + non_empty_all_duplicates: all_duplicates, + } = filter_duplicate_events( + &self.store, + LinkedChunkId::Room(&self.state.room_id), + &self.state.room_linked_chunk, + timeline.events, + ) + .await?; + + // If the timeline isn't limited, and we already knew about some past events, + // then this definitely knows what the timeline head is (either we know + // about all the events persisted in storage, or we have a gap + // somewhere). In this case, we can ditch the previous-batch + // token, which is an optimization to avoid unnecessary future back-pagination + // requests. + // + // We can also ditch it if we knew about all the events that came from sync, + // namely, they were all deduplicated. In this case, using the + // previous-batch token would only result in fetching other events we + // knew about. This is slightly incorrect in the presence of + // network splits, but this has shown to be Good Enough™. + if !timeline.limited && self.state.room_linked_chunk.events().next().is_some() + || all_duplicates + { + prev_batch = None; + } + + if prev_batch.is_some() { + // Sad time: there's a gap, somewhere, in the timeline, and there's at least one + // non-duplicated event. We don't know which threads might have gappy, so we + // must invalidate them all :( + // TODO(bnjbvr): figure out a better catchup mechanism for threads. + let mut summaries_to_update = Vec::new(); + + for (thread_root, thread) in self.state.threads.iter_mut() { + // Empty the thread's linked chunk. + thread.clear(); + + summaries_to_update.push(thread_root.clone()); } + + // Now, update the summaries to indicate that we're not sure what the latest + // thread event is. The thread count can remain as is, as it might still be + // valid, and there's no good value to reset it to, anyways. + for thread_root in summaries_to_update { + let Some((location, mut target_event)) = + self.as_read_guard().find_event(&thread_root).await? + else { + trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync"); + continue; + }; + + if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() { + prev_summary.latest_reply = None; + + target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary); + + self.replace_event_at(location, target_event).await?; + } + } + } + + if all_duplicates { + // No new events and no gap (per the previous check), thus no need to change the + // room state. We're done! + return Ok((false, Vec::new())); } - let store = self.store.lock().await?; + let has_new_gap = prev_batch.is_some(); - Ok(store - .find_event(&self.room, event_id) - .await? - .map(|event| (EventLocation::Store, event))) - } + // If we've never waited for an initial previous-batch token, and we've now + // inserted a gap, no need to wait for a previous-batch token later. + if !self.state.waited_for_initial_prev_token.load(Ordering::SeqCst) && has_new_gap { + self.state.waited_for_initial_prev_token.store(true, Ordering::SeqCst); + } - /// Find an event and all its relations in the persisted storage. - /// - /// This goes straight to the database, as a simplification; we don't - /// expect to need to have to look up in memory events, or that - /// all the related events are actually loaded. - /// - /// The related events are sorted like this: - /// - events saved out-of-band with - /// [`super::RoomEventCache::save_events`] will be located at the - /// beginning of the array. - /// - events present in the linked chunk (be it in memory or in the - /// database) will be sorted according to their ordering in the linked - /// chunk. - pub async fn find_event_with_relations( - &self, - event_id: &EventId, - filters: Option>, - ) -> Result)>, EventCacheError> { - let store = self.store.lock().await?; + // Remove the old duplicated events. + // + // We don't have to worry the removals can change the position of the existing + // events, because we are pushing all _new_ `events` at the back. + self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) + .await?; - // First, hit storage to get the target event and its related events. - let found = store.find_event(&self.room, event_id).await?; + self.state + .room_linked_chunk + .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events); - let Some(target) = found else { - // We haven't found the event: return early. - return Ok(None); - }; + self.post_process_new_events(events, true).await?; - // Then, initialize the stack with all the related events, to find the - // transitive closure of all the related events. - let mut related = - store.find_event_relations(&self.room, event_id, filters.as_deref()).await?; - let mut stack = - related.iter().filter_map(|(event, _pos)| event.event_id()).collect::>(); + if timeline.limited && has_new_gap { + // If there was a previous batch token for a limited timeline, unload the chunks + // so it only contains the last one; otherwise, there might be a + // valid gap in between, and observers may not render it (yet). + // + // We must do this *after* persisting these events to storage (in + // `post_process_new_events`). + self.shrink_to_last_chunk().await?; + } - // Also keep track of already seen events, in case there's a loop in the - // relation graph. - let mut already_seen = HashSet::new(); - already_seen.insert(event_id.to_owned()); + let timeline_event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); - let mut num_iters = 1; + Ok((has_new_gap, timeline_event_diffs)) + } - // Find the related event for each previously-related event. - while let Some(event_id) = stack.pop() { - if !already_seen.insert(event_id.clone()) { - // Skip events we've already seen. - continue; + /// Handle the result of a single back-pagination request. + /// + /// If the `prev_token` is set, then this function will check that the + /// corresponding gap is present in the in-memory linked chunk. + /// If it's not the case, `Ok(None)` will be returned, and the + /// caller may decide to do something based on that (e.g. restart a + /// pagination). + #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] + pub async fn handle_backpagination( + &mut self, + events: Vec, + mut new_token: Option, + prev_token: Option, + ) -> Result>)>, EventCacheError> + { + // Check that the previous token still exists; otherwise it's a sign that the + // room's timeline has been cleared. + let prev_gap_id = if let Some(token) = prev_token { + // Find the corresponding gap in the in-memory linked chunk. + let gap_chunk_id = self.state.room_linked_chunk.chunk_identifier(|chunk| { + matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token) + }); + + if gap_chunk_id.is_none() { + // We got a previous-batch token from the linked chunk *before* running the + // request, but it is missing *after* completing the request. + // + // It may be a sign the linked chunk has been reset, but it's fine, per this + // function's contract. + return Ok(None); } - let other_related = - store.find_event_relations(&self.room, &event_id, filters.as_deref()).await?; + gap_chunk_id + } else { + None + }; - stack.extend(other_related.iter().filter_map(|(event, _pos)| event.event_id())); - related.extend(other_related); + let DeduplicationOutcome { + all_events: mut events, + in_memory_duplicated_event_ids, + in_store_duplicated_event_ids, + non_empty_all_duplicates: all_duplicates, + } = filter_duplicate_events( + &self.store, + LinkedChunkId::Room(&self.state.room_id), + &self.state.room_linked_chunk, + events, + ) + .await?; - num_iters += 1; + // If not all the events have been back-paginated, we need to remove the + // previous ones, otherwise we can end up with misordered events. + // + // Consider the following scenario: + // - sync returns [D, E, F] + // - then sync returns [] with a previous batch token PB1, so the internal + // linked chunk state is [D, E, F, PB1]. + // - back-paginating with PB1 may return [A, B, C, D, E, F]. + // + // Only inserting the new events when replacing PB1 would result in a timeline + // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove + // all the events, in case this happens (see also #4746). + + if !all_duplicates { + // Let's forget all the previous events. + self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) + .await?; + } else { + // All new events are duplicated, they can all be ignored. + events.clear(); + // The gap can be ditched too, as it won't be useful to backpaginate any + // further. + new_token = None; } - trace!(num_related = %related.len(), num_iters, "computed transitive closure of related events"); + // `/messages` has been called with `dir=b` (backwards), so the events are in + // the inverted order; reorder them. + let topo_ordered_events = events.iter().rev().cloned().collect::>(); - // Sort the results by their positions in the linked chunk, if available. - // - // If an event doesn't have a known position, it goes to the start of the array. - related.sort_by(|(_, lhs), (_, rhs)| { - use std::cmp::Ordering; - match (lhs, rhs) { - (None, None) => Ordering::Equal, - (None, Some(_)) => Ordering::Less, - (Some(_), None) => Ordering::Greater, - (Some(lhs), Some(rhs)) => { - let lhs = self.room_event_order(*lhs); - let rhs = self.room_event_order(*rhs); + let new_gap = new_token.map(|prev_token| Gap { prev_token }); + let reached_start = self.state.room_linked_chunk.finish_back_pagination( + prev_gap_id, + new_gap, + &topo_ordered_events, + ); - // The events should have a definite position, but in the case they don't, - // still consider that not having a position means you'll end at the start - // of the array. - match (lhs, rhs) { - (None, None) => Ordering::Equal, - (None, Some(_)) => Ordering::Less, - (Some(_), None) => Ordering::Greater, - (Some(lhs), Some(rhs)) => lhs.cmp(&rhs), - } - } - } - }); + // Note: this flushes updates to the store. + self.post_process_new_events(topo_ordered_events, false).await?; - // Keep only the events, not their positions. - let related = related.into_iter().map(|(event, _pos)| event).collect(); + let event_diffs = self.state.room_linked_chunk.updates_as_vector_diffs(); - Ok(Some((target, related))) + Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs))) } + /// Subscribe to thread for a given root event, and get a (maybe empty) + /// initially known list of events for that thread. + pub fn subscribe_to_thread( + &mut self, + root: OwnedEventId, + ) -> (Vec, Receiver) { + self.get_or_reload_thread(root).subscribe() + } + + /// Back paginate in the given thread. + /// + /// Will always start from the end, unless we previously paginated. + pub fn finish_thread_network_pagination( + &mut self, + root: OwnedEventId, + prev_token: Option, + new_token: Option, + events: Vec, + ) -> Option { + self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events) + } + + pub fn load_more_thread_events_backwards( + &mut self, + root: OwnedEventId, + ) -> LoadMoreEventsBackwardsOutcome { + self.get_or_reload_thread(root).load_more_events_backwards() + } + + // -------------------------------------------- + // utility methods + // -------------------------------------------- + /// Post-process new events, after they have been added to the in-memory /// linked chunk. /// @@ -1446,7 +1632,7 @@ mod private { for event in events { self.maybe_apply_new_redaction(&event).await?; - if self.enabled_thread_support { + if self.state.enabled_thread_support { // Only add the event to a thread if: // - thread support is enabled, // - and if this is a sync (we can't know where to insert backpaginated events @@ -1459,7 +1645,7 @@ mod private { .push(event.clone()); } else if let Some(event_id) = event.event_id() { // If we spot the root of a thread, add it to its linked chunk. - if self.threads.contains_key(&event_id) { + if self.state.threads.contains_key(&event_id) { new_events_by_thread .entry(event_id) .or_default() @@ -1472,7 +1658,7 @@ mod private { if let Some(edit_target) = extract_edit_target(event.raw()) { // If the edited event is known, and part of a thread, if let Some((_location, edit_target_event)) = - self.find_event(&edit_target).await? + self.as_read_guard().find_event(&edit_target).await? && let Some(thread_root) = extract_thread_root(edit_target_event.raw()) { // Mark the thread for processing, unless it was already marked as @@ -1488,7 +1674,7 @@ mod private { } } - if self.enabled_thread_support { + if self.state.enabled_thread_support { self.update_threads(new_events_by_thread).await?; } @@ -1498,12 +1684,11 @@ mod private { fn get_or_reload_thread(&mut self, root_event_id: OwnedEventId) -> &mut ThreadEventCache { // TODO: when there's persistent storage, try to lazily reload from disk, if // missing from memory. - self.threads.entry(root_event_id.clone()).or_insert_with(|| { - ThreadEventCache::new( - self.room.clone(), - root_event_id, - self.linked_chunk_update_sender.clone(), - ) + let room_id = self.state.room_id.clone(); + let linked_chunk_update_sender = self.state.linked_chunk_update_sender.clone(); + + self.state.threads.entry(root_event_id.clone()).or_insert_with(|| { + ThreadEventCache::new(room_id, root_event_id, linked_chunk_update_sender) }) } @@ -1523,6 +1708,7 @@ mod private { // event id as the latest event id for the thread summary. if let Some(event_id) = latest_event_id.as_ref() && let Some((_, edits)) = self + .as_read_guard() .find_event_with_relations(event_id, Some(vec![RelationType::Replacement])) .await? && let Some(latest_edit) = edits.last() @@ -1545,7 +1731,9 @@ mod private { // Add a thread summary to the (room) event which has the thread root, if we // knew about it. - let Some((location, mut target_event)) = self.find_event(&thread_root).await? else { + let Some((location, mut target_event)) = + self.as_read_guard().find_event(&thread_root).await? + else { trace!(%thread_root, "thread root event is missing from the room linked chunk"); return Ok(()); }; @@ -1561,9 +1749,13 @@ mod private { // worry about filtering out aggregation events (like // reactions/edits/etc.). Pretty neat, huh? let num_replies = { - let store_guard = &*self.store.lock().await?; - let thread_replies = store_guard - .find_event_relations(&self.room, &thread_root, Some(&[RelationType::Thread])) + let thread_replies = self + .store + .find_event_relations( + &self.state.room_id, + &thread_root, + Some(&[RelationType::Thread]), + ) .await?; thread_replies.len().try_into().unwrap_or(u32::MAX) }; @@ -1598,7 +1790,8 @@ mod private { ) -> Result<(), EventCacheError> { match location { EventLocation::Memory(position) => { - self.room_linked_chunk + self.state + .room_linked_chunk .replace_event_at(position, event) .expect("should have been a valid position of an item"); // We just changed the in-memory representation; synchronize this with @@ -1640,13 +1833,15 @@ mod private { return Ok(()); }; - let Some(event_id) = redaction.redacts(&self.room_version_rules.redaction) else { + let Some(event_id) = redaction.redacts(&self.state.room_version_rules.redaction) else { warn!("missing target event id from the redaction event"); return Ok(()); }; // Replace the redacted event by a redacted form, if we knew about it. - let Some((location, mut target_event)) = self.find_event(event_id).await? else { + let Some((location, mut target_event)) = + self.as_read_guard().find_event(event_id).await? + else { trace!("redacted event is missing from the linked chunk"); return Ok(()); }; @@ -1679,7 +1874,7 @@ mod private { if let Some(redacted_event) = apply_redaction( target_event.raw(), event.raw().cast_ref_unchecked::(), - &self.room_version_rules.redaction, + &self.state.room_version_rules.redaction, ) { // It's safe to cast `redacted_event` here: // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent` @@ -1697,7 +1892,7 @@ mod private { // happen BEFORE we recompute the summary, otherwise the set of // replies may include the to-be-redacted event. if let Some(thread_root) = thread_root - && let Some(thread_cache) = self.threads.get_mut(&thread_root) + && let Some(thread_cache) = self.state.threads.get_mut(&thread_root) { thread_cache.remove_if_present(event_id); @@ -1713,16 +1908,15 @@ mod private { /// Save events into the database, without notifying observers. pub async fn save_events( - &self, + &mut self, events: impl IntoIterator, ) -> Result<(), EventCacheError> { let store = self.store.clone(); - let room_id = self.room.clone(); + let room_id = self.state.room_id.clone(); let events = events.into_iter().collect::>(); // Spawn a task so the save is uninterrupted by task cancellation. spawn(async move { - let store = store.lock().await?; for event in events { store.save_event(&room_id, event).await?; } @@ -1733,248 +1927,174 @@ mod private { Ok(()) } + } - /// Handle the result of a sync. - /// - /// It may send room event cache updates to the given sender, if it - /// generated any of those. - /// - /// Returns `true` for the first part of the tuple if a new gap - /// (previous-batch token) has been inserted, `false` otherwise. - #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] - pub async fn handle_sync( - &mut self, - mut timeline: Timeline, - ) -> Result<(bool, Vec>), EventCacheError> { - let mut prev_batch = timeline.prev_batch.take(); - - let DeduplicationOutcome { - all_events: events, - in_memory_duplicated_event_ids, - in_store_duplicated_event_ids, - non_empty_all_duplicates: all_duplicates, - } = filter_duplicate_events( - &self.store, - LinkedChunkId::Room(self.room.as_ref()), - &self.room_linked_chunk, - timeline.events, - ) - .await?; - - // If the timeline isn't limited, and we already knew about some past events, - // then this definitely knows what the timeline head is (either we know - // about all the events persisted in storage, or we have a gap - // somewhere). In this case, we can ditch the previous-batch - // token, which is an optimization to avoid unnecessary future back-pagination - // requests. - // - // We can also ditch it if we knew about all the events that came from sync, - // namely, they were all deduplicated. In this case, using the - // previous-batch token would only result in fetching other events we - // knew about. This is slightly incorrect in the presence of - // network splits, but this has shown to be Good Enough™. - if !timeline.limited && self.room_linked_chunk.events().next().is_some() - || all_duplicates - { - prev_batch = None; - } - - if prev_batch.is_some() { - // Sad time: there's a gap, somewhere, in the timeline, and there's at least one - // non-duplicated event. We don't know which threads might have gappy, so we - // must invalidate them all :( - // TODO(bnjbvr): figure out a better catchup mechanism for threads. - let mut summaries_to_update = Vec::new(); - - for (thread_root, thread) in self.threads.iter_mut() { - // Empty the thread's linked chunk. - thread.clear(); - - summaries_to_update.push(thread_root.clone()); - } - - // Now, update the summaries to indicate that we're not sure what the latest - // thread event is. The thread count can remain as is, as it might still be - // valid, and there's no good value to reset it to, anyways. - for thread_root in summaries_to_update { - let Some((location, mut target_event)) = self.find_event(&thread_root).await? - else { - trace!(%thread_root, "thread root event is unknown, when updating thread summary after a gappy sync"); - continue; - }; - - if let Some(mut prev_summary) = target_event.thread_summary.summary().cloned() { - prev_summary.latest_reply = None; - - target_event.thread_summary = ThreadSummaryStatus::Some(prev_summary); - - self.replace_event_at(location, target_event).await?; - } - } - } - - if all_duplicates { - // No new events and no gap (per the previous check), thus no need to change the - // room state. We're done! - return Ok((false, Vec::new())); - } - - let has_new_gap = prev_batch.is_some(); - - // If we've never waited for an initial previous-batch token, and we've now - // inserted a gap, no need to wait for a previous-batch token later. - if !self.waited_for_initial_prev_token && has_new_gap { - self.waited_for_initial_prev_token = true; - } - - // Remove the old duplicated events. - // - // We don't have to worry the removals can change the position of the existing - // events, because we are pushing all _new_ `events` at the back. - self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) - .await?; - - self.room_linked_chunk - .push_live_events(prev_batch.map(|prev_token| Gap { prev_token }), &events); + /// Load a linked chunk's full metadata, making sure the chunks are + /// according to their their links. + /// + /// Returns `None` if there's no such linked chunk in the store, or an + /// error if the linked chunk is malformed. + async fn load_linked_chunk_metadata( + store_guard: &EventCacheStoreLockGuard, + linked_chunk_id: LinkedChunkId<'_>, + ) -> Result>, EventCacheError> { + let mut all_chunks = store_guard + .load_all_chunks_metadata(linked_chunk_id) + .await + .map_err(EventCacheError::from)?; - self.post_process_new_events(events, true).await?; + if all_chunks.is_empty() { + // There are no chunks, so there's nothing to do. + return Ok(None); + } - if timeline.limited && has_new_gap { - // If there was a previous batch token for a limited timeline, unload the chunks - // so it only contains the last one; otherwise, there might be a - // valid gap in between, and observers may not render it (yet). - // - // We must do this *after* persisting these events to storage (in - // `post_process_new_events`). - self.shrink_to_last_chunk().await?; - } + // Transform the vector into a hashmap, for quick lookup of the predecessors. + let chunk_map: HashMap<_, _> = + all_chunks.iter().map(|meta| (meta.identifier, meta)).collect(); - let timeline_event_diffs = self.room_linked_chunk.updates_as_vector_diffs(); + // Find a last chunk. + let mut iter = all_chunks.iter().filter(|meta| meta.next.is_none()); + let Some(last) = iter.next() else { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: "no last chunk found".to_owned(), + }); + }; - Ok((has_new_gap, timeline_event_diffs)) + // There must at most one last chunk. + if let Some(other_last) = iter.next() { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "chunks {} and {} both claim to be last chunks", + last.identifier.index(), + other_last.identifier.index() + ), + }); } - /// Handle the result of a single back-pagination request. - /// - /// If the `prev_token` is set, then this function will check that the - /// corresponding gap is present in the in-memory linked chunk. - /// If it's not the case, `Ok(None)` will be returned, and the - /// caller may decide to do something based on that (e.g. restart a - /// pagination). - #[must_use = "Propagate `VectorDiff` updates via `RoomEventCacheUpdate`"] - pub async fn handle_backpagination( - &mut self, - events: Vec, - mut new_token: Option, - prev_token: Option, - ) -> Result>)>, EventCacheError> - { - // Check that the previous token still exists; otherwise it's a sign that the - // room's timeline has been cleared. - let prev_gap_id = if let Some(token) = prev_token { - // Find the corresponding gap in the in-memory linked chunk. - let gap_chunk_id = self.room_linked_chunk.chunk_identifier(|chunk| { - matches!(chunk.content(), ChunkContent::Gap(Gap { prev_token }) if *prev_token == token) + // Rewind the chain back to the first chunk, and do some checks at the same + // time. + let mut seen = HashSet::new(); + let mut current = last; + loop { + // If we've already seen this chunk, there's a cycle somewhere. + if !seen.insert(current.identifier) { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "cycle detected in linked chunk at {}", + current.identifier.index() + ), }); + } - if gap_chunk_id.is_none() { - // We got a previous-batch token from the linked chunk *before* running the - // request, but it is missing *after* completing the request. - // - // It may be a sign the linked chunk has been reset, but it's fine, per this - // function's contract. - return Ok(None); + let Some(prev_id) = current.previous else { + // If there's no previous chunk, we're done. + if seen.len() != all_chunks.len() { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "linked chunk likely has multiple components: {} chunks seen through the chain of predecessors, but {} expected", + seen.len(), + all_chunks.len() + ), + }); } - - gap_chunk_id - } else { - None + break; }; - let DeduplicationOutcome { - all_events: mut events, - in_memory_duplicated_event_ids, - in_store_duplicated_event_ids, - non_empty_all_duplicates: all_duplicates, - } = filter_duplicate_events( - &self.store, - LinkedChunkId::Room(self.room.as_ref()), - &self.room_linked_chunk, - events, - ) - .await?; - - // If not all the events have been back-paginated, we need to remove the - // previous ones, otherwise we can end up with misordered events. - // - // Consider the following scenario: - // - sync returns [D, E, F] - // - then sync returns [] with a previous batch token PB1, so the internal - // linked chunk state is [D, E, F, PB1]. - // - back-paginating with PB1 may return [A, B, C, D, E, F]. - // - // Only inserting the new events when replacing PB1 would result in a timeline - // ordering of [D, E, F, A, B, C], which is incorrect. So we do have to remove - // all the events, in case this happens (see also #4746). + // If the previous chunk is not in the map, then it's unknown + // and missing. + let Some(pred_meta) = chunk_map.get(&prev_id) else { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "missing predecessor {} chunk for {}", + prev_id.index(), + current.identifier.index() + ), + }); + }; - if !all_duplicates { - // Let's forget all the previous events. - self.remove_events(in_memory_duplicated_event_ids, in_store_duplicated_event_ids) - .await?; - } else { - // All new events are duplicated, they can all be ignored. - events.clear(); - // The gap can be ditched too, as it won't be useful to backpaginate any - // further. - new_token = None; + // If the previous chunk isn't connected to the next, then the link is invalid. + if pred_meta.next != Some(current.identifier) { + return Err(EventCacheError::InvalidLinkedChunkMetadata { + details: format!( + "chunk {}'s next ({:?}) doesn't match the current chunk ({})", + pred_meta.identifier.index(), + pred_meta.next.map(|chunk_id| chunk_id.index()), + current.identifier.index() + ), + }); } - // `/messages` has been called with `dir=b` (backwards), so the events are in - // the inverted order; reorder them. - let topo_ordered_events = events.iter().rev().cloned().collect::>(); + current = *pred_meta; + } - let new_gap = new_token.map(|prev_token| Gap { prev_token }); - let reached_start = self.room_linked_chunk.finish_back_pagination( - prev_gap_id, - new_gap, - &topo_ordered_events, - ); + // At this point, `current` is the identifier of the first chunk. + // + // Reorder the resulting vector, by going through the chain of `next` links, and + // swapping items into their final position. + // + // Invariant in this loop: all items in [0..i[ are in their final, correct + // position. + let mut current = current.identifier; + for i in 0..all_chunks.len() { + // Find the target metadata. + let j = all_chunks + .iter() + .rev() + .position(|meta| meta.identifier == current) + .map(|j| all_chunks.len() - 1 - j) + .expect("the target chunk must be present in the metadata"); + if i != j { + all_chunks.swap(i, j); + } + if let Some(next) = all_chunks[i].next { + current = next; + } + } - // Note: this flushes updates to the store. - self.post_process_new_events(topo_ordered_events, false).await?; + Ok(Some(all_chunks)) + } - let event_diffs = self.room_linked_chunk.updates_as_vector_diffs(); + /// Removes the bundled relations from an event, if they were present. + /// + /// Only replaces the present if it contained bundled relations. + fn strip_relations_if_present(event: &mut Raw) { + // We're going to get rid of the `unsigned`/`m.relations` field, if it's + // present. + // Use a closure that returns an option so we can quickly short-circuit. + let mut closure = || -> Option<()> { + let mut val: serde_json::Value = event.deserialize_as().ok()?; + let unsigned = val.get_mut("unsigned")?; + let unsigned_obj = unsigned.as_object_mut()?; + if unsigned_obj.remove("m.relations").is_some() { + *event = Raw::new(&val).ok()?.cast_unchecked(); + } + None + }; + let _ = closure(); + } - Ok(Some((BackPaginationOutcome { events, reached_start }, event_diffs))) - } + fn strip_relations_from_event(ev: &mut Event) { + match &mut ev.kind { + TimelineEventKind::Decrypted(decrypted) => { + // Remove all information about encryption info for + // the bundled events. + decrypted.unsigned_encryption_info = None; - /// Subscribe to thread for a given root event, and get a (maybe empty) - /// initially known list of events for that thread. - pub fn subscribe_to_thread( - &mut self, - root: OwnedEventId, - ) -> (Vec, Receiver) { - self.get_or_reload_thread(root).subscribe() - } + // Remove the `unsigned`/`m.relations` field, if needs be. + strip_relations_if_present(&mut decrypted.event); + } - /// Back paginate in the given thread. - /// - /// Will always start from the end, unless we previously paginated. - pub fn finish_thread_network_pagination( - &mut self, - root: OwnedEventId, - prev_token: Option, - new_token: Option, - events: Vec, - ) -> Option { - self.get_or_reload_thread(root).finish_network_pagination(prev_token, new_token, events) + TimelineEventKind::UnableToDecrypt { event, .. } + | TimelineEventKind::PlainText { event } => { + strip_relations_if_present(event); + } } + } - pub fn load_more_thread_events_backwards( - &mut self, - root: OwnedEventId, - ) -> LoadMoreEventsBackwardsOutcome { - self.get_or_reload_thread(root).load_more_events_backwards() + /// Strips the bundled relations from a collection of events. + fn strip_relations_from_events(items: &mut [Event]) { + for ev in items.iter_mut() { + strip_relations_from_event(ev); } } } @@ -1988,7 +2108,9 @@ pub(super) enum EventLocation { Store, } -pub(super) use private::RoomEventCacheState; +pub(super) use private::{ + RoomEventCacheStateLock, RoomEventCacheStateLockReadGuard, RoomEventCacheStateLockWriteGuard, +}; #[cfg(test)] mod tests { @@ -2129,8 +2251,11 @@ mod tests { room_event_cache.save_events([associated_related_event]).await; let filter = Some(vec![RelationType::Replacement]); - let (event, related_events) = - room_event_cache.find_event_with_relations(original_id, filter).await.unwrap(); + let (event, related_events) = room_event_cache + .find_event_with_relations(original_id, filter) + .await + .expect("Failed to find the event with relations") + .expect("Event has no relation"); // Fetched event is the right one. let cached_event_id = event.event_id().unwrap(); assert_eq!(cached_event_id, original_id); @@ -2143,8 +2268,12 @@ mod tests { // Now we'll filter threads instead, there should be no related events let filter = Some(vec![RelationType::Thread]); - let (event, related_events) = - room_event_cache.find_event_with_relations(original_id, filter).await.unwrap(); + let (event, related_events) = room_event_cache + .find_event_with_relations(original_id, filter) + .await + .expect("Failed to find the event with relations") + .expect("Event has no relation"); + // Fetched event is the right one. let cached_event_id = event.event_id().unwrap(); assert_eq!(cached_event_id, original_id); @@ -2188,8 +2317,11 @@ mod tests { // Save the associated related event, which redacts the related event. room_event_cache.save_events([associated_related_event]).await; - let (event, related_events) = - room_event_cache.find_event_with_relations(original_id, None).await.unwrap(); + let (event, related_events) = room_event_cache + .find_event_with_relations(original_id, None) + .await + .expect("Failed to find the event with relations") + .expect("Event has no relation"); // Fetched event is the right one. let cached_event_id = event.event_id().unwrap(); assert_eq!(cached_event_id, original_id); @@ -2236,8 +2368,11 @@ mod tests { let related_id = related_event.event_id().unwrap(); room_event_cache.save_events([related_event]).await; - let (event, related_events) = - room_event_cache.find_event_with_relations(&original_event_id, None).await.unwrap(); + let (event, related_events) = room_event_cache + .find_event_with_relations(&original_event_id, None) + .await + .expect("Failed to find the event with relations") + .expect("Event has no relation"); // Fetched event is the right one. let cached_event_id = event.event_id().unwrap(); assert_eq!(cached_event_id, original_event_id); @@ -2413,7 +2548,7 @@ mod timed_tests { // The in-memory linked chunk keeps the bundled relation. { - let events = room_event_cache.events().await; + let events = room_event_cache.events().await.unwrap(); assert_eq!(events.len(), 1); @@ -2529,13 +2664,13 @@ mod timed_tests { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (items, mut stream) = room_event_cache.subscribe().await; + let (items, mut stream) = room_event_cache.subscribe().await.unwrap(); let mut generic_stream = event_cache.subscribe_to_room_generic_updates(); // The rooms knows about all cached events. { - assert!(room_event_cache.find_event(event_id1).await.is_some()); - assert!(room_event_cache.find_event(event_id2).await.is_some()); + assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some()); + assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some()); } // But only part of events are loaded from the store @@ -2581,10 +2716,10 @@ mod timed_tests { // Events individually are not forgotten by the event cache, after clearing a // room. - assert!(room_event_cache.find_event(event_id1).await.is_some()); + assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some()); // But their presence in a linked chunk is forgotten. - let items = room_event_cache.events().await; + let items = room_event_cache.events().await.unwrap(); assert!(items.is_empty()); // The event cache store too. @@ -2689,7 +2824,7 @@ mod timed_tests { } ); - let (items, mut stream) = room_event_cache.subscribe().await; + let (items, mut stream) = room_event_cache.subscribe().await.unwrap(); // The initial items contain one event because only the last chunk is loaded by // default. @@ -2698,8 +2833,8 @@ mod timed_tests { assert!(stream.is_empty()); // The event cache knows only all events though, even if they aren't loaded. - assert!(room_event_cache.find_event(event_id1).await.is_some()); - assert!(room_event_cache.find_event(event_id2).await.is_some()); + assert!(room_event_cache.find_event(event_id1).await.unwrap().is_some()); + assert!(room_event_cache.find_event(event_id2).await.unwrap().is_some()); // Let's paginate to load more events. room_event_cache.pagination().run_backwards_once(20).await.unwrap(); @@ -2739,7 +2874,7 @@ mod timed_tests { // when subscribing, to check that the items correspond to their new // positions. The duplicated item is removed (so it's not the first // element anymore), and it's added to the back of the list. - let items = room_event_cache.events().await; + let items = room_event_cache.events().await.unwrap(); assert_eq!(items.len(), 2); assert_eq!(items[0].event_id().unwrap(), event_id1); assert_eq!(items[1].event_id().unwrap(), event_id2); @@ -2801,7 +2936,7 @@ mod timed_tests { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let items = room_event_cache.events().await; + let items = room_event_cache.events().await.unwrap(); // Because the persisted content was invalid, the room store is reset: there are // no events in the cache. @@ -2854,7 +2989,7 @@ mod timed_tests { ); { - let mut state = room_event_cache.inner.state.write().await; + let mut state = room_event_cache.inner.state.write().await.unwrap(); let mut num_gaps = 0; let mut num_events = 0; @@ -2915,7 +3050,7 @@ mod timed_tests { ); { - let state = room_event_cache.inner.state.read().await; + let state = room_event_cache.inner.state.read().await.unwrap(); let mut num_gaps = 0; let mut num_events = 0; @@ -2952,9 +3087,13 @@ mod timed_tests { // Fill the event cache store with an initial linked chunk with 2 events chunks. { - let store = client.event_cache_store(); - let store = store.lock().await.unwrap(); - store + client + .event_cache_store() + .lock() + .await + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -2990,7 +3129,7 @@ mod timed_tests { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); // Sanity check: lazily loaded, so only includes one item at start. - let (events, mut stream) = room_event_cache.subscribe().await; + let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); assert_eq!(events.len(), 1); assert_eq!(events[0].event_id().as_deref(), Some(evid2)); assert!(stream.is_empty()); @@ -3026,6 +3165,7 @@ mod timed_tests { .state .write() .await + .unwrap() .force_shrink_to_last_chunk() .await .expect("shrinking should succeed"); @@ -3044,7 +3184,7 @@ mod timed_tests { assert!(generic_stream.is_empty()); // When reading the events, we do get only the last one. - let events = room_event_cache.events().await; + let events = room_event_cache.events().await.unwrap(); assert_eq!(events.len(), 1); assert_eq!(events[0].event_id().as_deref(), Some(evid2)); @@ -3074,9 +3214,13 @@ mod timed_tests { // Fill the event cache store with an initial linked chunk with 2 events chunks. { - let store = client.event_cache_store(); - let store = store.lock().await.unwrap(); - store + client + .event_cache_store() + .lock() + .await + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -3114,7 +3258,7 @@ mod timed_tests { // Initially, the linked chunk only contains the last chunk, so only ev3 is // loaded. { - let state = room_event_cache.inner.state.read().await; + let state = room_event_cache.inner.state.read().await.unwrap(); // But we can get the order of ev1. assert_eq!(state.room_event_order(Position::new(ChunkIdentifier::new(0), 0)), Some(0)); @@ -3140,7 +3284,7 @@ mod timed_tests { // All events are now loaded, so their order is precisely their enumerated index // in a linear iteration. { - let state = room_event_cache.inner.state.read().await; + let state = room_event_cache.inner.state.read().await.unwrap(); for (i, (pos, _)) in state.room_linked_chunk().events().enumerate() { assert_eq!(state.room_event_order(pos), Some(i)); } @@ -3165,7 +3309,7 @@ mod timed_tests { .unwrap(); { - let state = room_event_cache.inner.state.read().await; + let state = room_event_cache.inner.state.read().await.unwrap(); // After the shrink, only evid3 and evid4 are loaded. let mut events = state.room_linked_chunk().events(); @@ -3207,9 +3351,13 @@ mod timed_tests { // Fill the event cache store with an initial linked chunk with 2 events chunks. { - let store = client.event_cache_store(); - let store = store.lock().await.unwrap(); - store + client + .event_cache_store() + .lock() + .await + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -3245,7 +3393,7 @@ mod timed_tests { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); // Sanity check: lazily loaded, so only includes one item at start. - let (events1, mut stream1) = room_event_cache.subscribe().await; + let (events1, mut stream1) = room_event_cache.subscribe().await.unwrap(); assert_eq!(events1.len(), 1); assert_eq!(events1[0].event_id().as_deref(), Some(evid2)); assert!(stream1.is_empty()); @@ -3271,7 +3419,7 @@ mod timed_tests { // Have another subscriber. // Since it's not the first one, and the previous one loaded some more events, // the second subscribers sees them all. - let (events2, stream2) = room_event_cache.subscribe().await; + let (events2, stream2) = room_event_cache.subscribe().await.unwrap(); assert_eq!(events2.len(), 2); assert_eq!(events2[0].event_id().as_deref(), Some(evid1)); assert_eq!(events2[1].event_id().as_deref(), Some(evid2)); @@ -3292,12 +3440,12 @@ mod timed_tests { { // Check the inner state: there's no more shared auto-shrinker. - let state = room_event_cache.inner.state.read().await; - assert_eq!(state.subscriber_count.load(std::sync::atomic::Ordering::SeqCst), 0); + let state = room_event_cache.inner.state.read().await.unwrap(); + assert_eq!(state.subscriber_count().load(std::sync::atomic::Ordering::SeqCst), 0); } // Getting the events will only give us the latest chunk. - let events3 = room_event_cache.events().await; + let events3 = room_event_cache.events().await.unwrap(); assert_eq!(events3.len(), 1); assert_eq!(events3[0].event_id().as_deref(), Some(evid2)); } @@ -3326,9 +3474,13 @@ mod timed_tests { // Fill the event cache store with an initial linked chunk of 2 chunks, and 4 // events. { - let store = client.event_cache_store(); - let store = store.lock().await.unwrap(); - store + client + .event_cache_store() + .lock() + .await + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -3370,7 +3522,7 @@ mod timed_tests { (event.raw().get_field::("sender").unwrap().as_deref() == Some(*BOB)).then(|| event.event_id()) }) .await, - Some(event_id) => { + Ok(Some(event_id)) => { assert_eq!(event_id.as_deref(), Some(event_id_0)); } ); @@ -3383,7 +3535,7 @@ mod timed_tests { (event.raw().get_field::("sender").unwrap().as_deref() == Some(*ALICE)).then(|| event.event_id()) }) .await, - Some(event_id) => { + Ok(Some(event_id)) => { assert_eq!(event_id.as_deref(), Some(event_id_2)); } ); @@ -3397,10 +3549,13 @@ mod timed_tests { .then(|| event.event_id()) }) .await + .unwrap() .is_none() ); // Look for an event that doesn't exist. - assert!(room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.is_none()); + assert!( + room_event_cache.rfind_map_event_in_memory_by(|_| None::<()>).await.unwrap().is_none() + ); } } diff --git a/crates/matrix-sdk/src/latest_events/latest_event.rs b/crates/matrix-sdk/src/latest_events/latest_event.rs index ac173cf24a0..6edd284bece 100644 --- a/crates/matrix-sdk/src/latest_events/latest_event.rs +++ b/crates/matrix-sdk/src/latest_events/latest_event.rs @@ -292,7 +292,9 @@ mod tests_latest_event { .event_cache_store() .lock() .await - .unwrap() + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(&room_id), vec![ @@ -410,7 +412,9 @@ mod tests_latest_event { .event_cache_store() .lock() .await - .unwrap() + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(&room_id), vec![ @@ -515,13 +519,16 @@ impl LatestEventValueBuilder { own_user_id: Option<&UserId>, power_levels: Option<&RoomPowerLevels>, ) -> LatestEventValue { - room_event_cache + if let Ok(Some(event)) = room_event_cache .rfind_map_event_in_memory_by(|event| { filter_timeline_event(event, own_user_id, power_levels).then(|| event.clone()) }) .await - .map(LatestEventValue::Remote) - .unwrap_or_default() + { + LatestEventValue::Remote(event) + } else { + LatestEventValue::default() + } } /// Create a new [`LatestEventValue::LocalIsSending`] or @@ -1693,7 +1700,9 @@ mod tests_latest_event_value_builder { .event_cache_store() .lock() .await - .unwrap() + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -2316,7 +2325,9 @@ mod tests_latest_event_value_builder { .event_cache_store() .lock() .await - .unwrap() + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ diff --git a/crates/matrix-sdk/src/latest_events/mod.rs b/crates/matrix-sdk/src/latest_events/mod.rs index 5443bbff7e6..3d292429064 100644 --- a/crates/matrix-sdk/src/latest_events/mod.rs +++ b/crates/matrix-sdk/src/latest_events/mod.rs @@ -1154,7 +1154,9 @@ mod tests { .event_cache_store() .lock() .await - .unwrap() + .expect("Could not acquire the event cache lock") + .as_clean() + .expect("Could not acquire a clean event cache lock") .handle_linked_chunk_updates( LinkedChunkId::Room(&room_id), vec![ diff --git a/crates/matrix-sdk/src/room/mod.rs b/crates/matrix-sdk/src/room/mod.rs index fd70abfaea9..77e6d3ff992 100644 --- a/crates/matrix-sdk/src/room/mod.rs +++ b/crates/matrix-sdk/src/room/mod.rs @@ -816,7 +816,7 @@ impl Room { ) -> Result { match self.event_cache().await { Ok((event_cache, _drop_handles)) => { - if let Some(event) = event_cache.find_event(event_id).await { + if let Some(event) = event_cache.find_event(event_id).await? { return Ok(event); } // Fallthrough: try with a request. diff --git a/crates/matrix-sdk/src/search_index/mod.rs b/crates/matrix-sdk/src/search_index/mod.rs index 2493afa14bd..6f696e42671 100644 --- a/crates/matrix-sdk/src/search_index/mod.rs +++ b/crates/matrix-sdk/src/search_index/mod.rs @@ -218,7 +218,7 @@ async fn get_most_recent_edit( ) -> Option { use ruma::events::{AnySyncTimelineEvent, relation::RelationType}; - let Some((original_ev, related)) = + let Ok(Some((original_ev, related))) = cache.find_event_with_relations(original, Some(vec![RelationType::Replacement])).await else { debug!("Couldn't find relations for {}", original); @@ -275,7 +275,7 @@ async fn handle_room_redaction( rules: &RedactionRules, ) -> Option { if let Some(redacted_event_id) = event.redacts(rules) - && let Some(redacted_event) = cache.find_event(redacted_event_id).await + && let Ok(Some(redacted_event)) = cache.find_event(redacted_event_id).await && let Ok(AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage( redacted_event, ))) = redacted_event.raw().deserialize() diff --git a/crates/matrix-sdk/src/sliding_sync/client.rs b/crates/matrix-sdk/src/sliding_sync/client.rs index 7f96fbef764..ab689b76899 100644 --- a/crates/matrix-sdk/src/sliding_sync/client.rs +++ b/crates/matrix-sdk/src/sliding_sync/client.rs @@ -355,7 +355,7 @@ async fn handle_receipts_extension( return Ok::<_, crate::Error>(None); }; - let previous_events = room_event_cache.events().await; + let previous_events = room_event_cache.events().await?; let receipt_event = client .base_client() diff --git a/crates/matrix-sdk/tests/integration/event_cache/mod.rs b/crates/matrix-sdk/tests/integration/event_cache/mod.rs index 6e36881350e..9fd44fdeb11 100644 --- a/crates/matrix-sdk/tests/integration/event_cache/mod.rs +++ b/crates/matrix-sdk/tests/integration/event_cache/mod.rs @@ -74,7 +74,7 @@ async fn test_event_cache_receives_events() { // If I create a room event subscriber, let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut subscriber) = room_event_cache.subscribe().await; + let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); // Then at first it's empty, and the subscriber doesn't yield anything. assert!(events.is_empty()); @@ -148,7 +148,7 @@ async fn test_ignored_unignored() { // And subscribe to the room, let room = client.get_room(room_id).unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await; + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); // Then at first it contains the two initial events. assert_eq!(events.len(), 2); @@ -199,7 +199,7 @@ async fn test_ignored_unignored() { { let room = client.get_room(other_room_id).unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let events = room_event_cache.events().await; + let events = room_event_cache.events().await.unwrap(); assert!(events.is_empty()); } @@ -254,7 +254,7 @@ async fn test_backpaginate_once() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await; + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); // This is racy: either the initial message has been processed by the event // cache (and no room updates will happen in this case), or it hasn't, and @@ -343,7 +343,7 @@ async fn test_backpaginate_many_times_with_many_iterations() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await; + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); // This is racy: either the initial message has been processed by the event // cache (and no room updates will happen in this case), or it hasn't, and @@ -424,7 +424,7 @@ async fn test_backpaginate_many_times_with_many_iterations() { assert!(room_stream.is_empty()); // And next time I'll open the room, I'll get the events in the right order. - let (events, room_stream) = room_event_cache.subscribe().await; + let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "oh well"); assert_event_matches_msg(&events[1], "hello"); @@ -465,7 +465,7 @@ async fn test_backpaginate_many_times_with_one_iteration() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await; + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); // This is racy: either the initial message has been processed by the event // cache (and no room updates will happen in this case), or it hasn't, and @@ -543,7 +543,7 @@ async fn test_backpaginate_many_times_with_one_iteration() { }); // And next time I'll open the room, I'll get the events in the right order. - let (events, room_stream) = room_event_cache.subscribe().await; + let (events, room_stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "oh well"); assert_event_matches_msg(&events[1], "hello"); @@ -585,7 +585,7 @@ async fn test_reset_while_backpaginating() { let (room_event_cache, _drop_handles) = client.get_room(room_id).unwrap().event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await; + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); wait_for_initial_events(events, &mut room_stream).await; @@ -698,7 +698,7 @@ async fn test_backpaginating_without_token() { let room = server.sync_joined_room(&client, room_id).await; let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await; + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); assert!(events.is_empty()); assert!(room_stream.is_empty()); @@ -755,7 +755,7 @@ async fn test_limited_timeline_resets_pagination() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut room_stream) = room_event_cache.subscribe().await; + let (events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); assert!(events.is_empty()); assert!(room_stream.is_empty()); @@ -849,7 +849,7 @@ async fn test_limited_timeline_with_storage() { ) .await; - let (initial_events, mut subscriber) = room_event_cache.subscribe().await; + let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); // This is racy: either the sync has been handled, or it hasn't yet. if initial_events.is_empty() { @@ -980,7 +980,7 @@ async fn test_backpaginate_with_no_initial_events() { pagination.run_backwards_once(20).await.unwrap(); // The linked chunk should contain the events in the correct order. - let events = room_event_cache.events().await; + let events = room_event_cache.events().await.unwrap(); assert_eq!(events.len(), 3, "{events:?}"); assert_event_matches_msg(&events[0], "oh well"); @@ -1015,7 +1015,7 @@ async fn test_backpaginate_replace_empty_gap() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await; + let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); wait_for_initial_events(events, &mut stream).await; // The first back-pagination will return a previous-batch token, but no events. @@ -1043,7 +1043,7 @@ async fn test_backpaginate_replace_empty_gap() { pagination.run_backwards_once(20).await.unwrap(); // The linked chunk should contain the events in the correct order. - let events = room_event_cache.events().await; + let events = room_event_cache.events().await.unwrap(); assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); @@ -1083,7 +1083,7 @@ async fn test_no_gap_stored_after_deduplicated_sync() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await; + let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); if events.is_empty() { assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv()); @@ -1126,7 +1126,7 @@ async fn test_no_gap_stored_after_deduplicated_sync() { assert!(stream.is_empty()); { - let events = room_event_cache.events().await; + let events = room_event_cache.events().await.unwrap(); assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); assert_event_matches_msg(&events[2], "sup"); @@ -1145,7 +1145,7 @@ async fn test_no_gap_stored_after_deduplicated_sync() { assert!(outcome.reached_start); { - let events = room_event_cache.events().await; + let events = room_event_cache.events().await.unwrap(); assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); assert_event_matches_msg(&events[2], "sup"); @@ -1180,7 +1180,7 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await; + let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); if events.is_empty() { assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv()); @@ -1268,7 +1268,7 @@ async fn test_no_gap_stored_after_deduplicated_backpagination() { // we shouldn't have to, since it is useless; all events were deduplicated // from the previous pagination. - let (events, stream) = room_event_cache.subscribe().await; + let (events, stream) = room_event_cache.subscribe().await.unwrap(); assert_event_matches_msg(&events[0], "hello"); assert_event_matches_msg(&events[1], "world"); assert_event_matches_msg(&events[2], "sup"); @@ -1304,7 +1304,7 @@ async fn test_dont_delete_gap_that_wasnt_inserted() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (events, mut stream) = room_event_cache.subscribe().await; + let (events, mut stream) = room_event_cache.subscribe().await.unwrap(); if events.is_empty() { assert_let_timeout!(Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = stream.recv()); } @@ -1378,7 +1378,7 @@ async fn test_apply_redaction_when_redaction_comes_later() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); // Wait for the event. - let (events, mut subscriber) = room_event_cache.subscribe().await; + let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); if events.is_empty() { assert_let_timeout!( Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv() @@ -1438,7 +1438,7 @@ async fn test_apply_redaction_when_redaction_comes_later() { let room = client.get_room(room_id).unwrap(); let (cache, _drop_handles) = room.event_cache().await.unwrap(); - let events = cache.events().await; + let events = cache.events().await.unwrap(); // We have two events: assert_eq!(events.len(), 2); @@ -1470,6 +1470,8 @@ async fn test_apply_redaction_on_an_in_store_event() { // 1. a chunk of 1 item, the one we are going to redact! // 2. a chunk of 1 item, the chunk that is going to be loaded. event_cache_store + .as_clean() + .unwrap() .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -1512,7 +1514,7 @@ async fn test_apply_redaction_on_an_in_store_event() { let room = mock_server.sync_joined_room(&client, room_id).await; let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap(); - let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await; + let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await.unwrap(); // Initial events! // @@ -1603,7 +1605,7 @@ async fn test_apply_redaction_when_redacted_and_redaction_are_in_same_sync() { let room_id = room_id!("!omelette:fromage.fr"); let room = server.sync_joined_room(&client, room_id).await; let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (_events, mut subscriber) = room_event_cache.subscribe().await; + let (_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); @@ -1676,6 +1678,8 @@ async fn test_lazy_loading() { // 2. a chunk of a gap // 1. a chunk of 6 items event_cache_store + .as_clean() + .unwrap() .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -1753,7 +1757,7 @@ async fn test_lazy_loading() { let room = mock_server.sync_joined_room(&client, room_id).await; let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap(); - let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await; + let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await.unwrap(); // Initial events! // @@ -2040,6 +2044,8 @@ async fn test_deduplication() { // 2. a chunk of 4 items // 1. a chunk of 3 items event_cache_store + .as_clean() + .unwrap() .handle_linked_chunk_updates( LinkedChunkId::Room(room_id), vec![ @@ -2092,7 +2098,7 @@ async fn test_deduplication() { let room = mock_server.sync_joined_room(&client, room_id).await; let (room_event_cache, _room_event_cache_drop_handle) = room.event_cache().await.unwrap(); - let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await; + let (initial_updates, mut updates_stream) = room_event_cache.subscribe().await.unwrap(); // One chunk has been loaded, so there are 3 events in memory. { @@ -2221,7 +2227,7 @@ async fn test_timeline_then_empty_timeline_then_deduplication_with_storage() { ]; let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (initial_events, mut subscriber) = room_event_cache.subscribe().await; + let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); assert!(initial_events.is_empty()); // Receive a sync with only the latest events. @@ -2383,7 +2389,7 @@ async fn test_clear_all_rooms() { .await; let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (initial, mut room_updates) = room_event_cache.subscribe().await; + let (initial, mut room_updates) = room_event_cache.subscribe().await.unwrap(); let mut initial = Vector::from(initial); // Wait for the ev1 event. @@ -2466,7 +2472,7 @@ async fn test_sync_while_back_paginate() { client.event_cache().subscribe().unwrap(); let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (initial_events, mut subscriber) = room_event_cache.subscribe().await; + let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); assert!(initial_events.is_empty()); // Mock /messages in case we use the prev_batch token from sync. @@ -2577,13 +2583,13 @@ async fn test_relations_ordering() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (initial_events, mut listener) = room_event_cache.subscribe().await; + let (initial_events, mut listener) = room_event_cache.subscribe().await.unwrap(); assert_eq!(initial_events.len(), 1); assert!(listener.recv().now_or_never().is_none()); // Sanity check: there are no relations for the target event yet. let (_, relations) = - room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap(); + room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap().unwrap(); assert!(relations.is_empty()); let edit2 = event_id!("$edit2"); @@ -2631,7 +2637,7 @@ async fn test_relations_ordering() { // At this point, relations are known for the target event. let (_, relations) = - room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap(); + room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap().unwrap(); assert_eq!(relations.len(), 2); // And the edit events are correctly ordered according to their position in the // linked chunk. @@ -2662,7 +2668,7 @@ async fn test_relations_ordering() { // Relations are returned accordingly. let (_, relations) = - room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap(); + room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap().unwrap(); assert_eq!(relations.len(), 3); assert_eq!(relations[0].event_id().unwrap(), edit2); assert_eq!(relations[1].event_id().unwrap(), edit3); @@ -2683,7 +2689,7 @@ async fn test_relations_ordering() { room.event(edit5, None).await.unwrap(); let (_, relations) = - room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap(); + room_event_cache.find_event_with_relations(target_event_id, None).await.unwrap().unwrap(); assert_eq!(relations.len(), 4); assert_eq!(relations[0].event_id().unwrap(), edit5); assert_eq!(relations[1].event_id().unwrap(), edit2); diff --git a/crates/matrix-sdk/tests/integration/event_cache/threads.rs b/crates/matrix-sdk/tests/integration/event_cache/threads.rs index e15b7d24036..748abb7e66a 100644 --- a/crates/matrix-sdk/tests/integration/event_cache/threads.rs +++ b/crates/matrix-sdk/tests/integration/event_cache/threads.rs @@ -162,7 +162,7 @@ async fn test_ignored_user_empties_threads() { // And we subscribe to the thread, let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (events, mut thread_stream) = - room_event_cache.subscribe_to_thread(thread_root.to_owned()).await; + room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap(); // Then, at first, the thread contains the two initial events. let events = wait_for_initial_events(events, &mut thread_stream).await; @@ -238,13 +238,13 @@ async fn test_gappy_sync_empties_all_threads() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (thread1_events, mut thread1_stream) = - room_event_cache.subscribe_to_thread(thread_root1.to_owned()).await; + room_event_cache.subscribe_to_thread(thread_root1.to_owned()).await.unwrap(); assert!(thread1_events.is_empty()); assert!(thread1_stream.is_empty()); let (thread2_events, mut thread2_stream) = - room_event_cache.subscribe_to_thread(thread_root2.to_owned()).await; + room_event_cache.subscribe_to_thread(thread_root2.to_owned()).await.unwrap(); assert!(thread2_events.is_empty()); assert!(thread2_stream.is_empty()); @@ -399,7 +399,7 @@ async fn test_deduplication() { // And we subscribe to the thread, let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (events, mut thread_stream) = - room_event_cache.subscribe_to_thread(thread_root.to_owned()).await; + room_event_cache.subscribe_to_thread(thread_root.to_owned()).await.unwrap(); // Then, at first, the thread contains the two initial events. let events = wait_for_initial_events(events, &mut thread_stream).await; @@ -487,7 +487,7 @@ async fn thread_subscription_test_setup() -> ThreadSubscriptionTestSetup { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - let (initial_events, mut subscriber) = room_event_cache.subscribe().await; + let (initial_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); assert!(initial_events.is_empty()); assert!(subscriber.is_empty()); @@ -647,7 +647,7 @@ async fn test_auto_subscribe_on_thread_paginate() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (thread_events, mut thread_stream) = - room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await.unwrap(); // Sanity check: the sync event is added to the thread. let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await; @@ -729,7 +729,7 @@ async fn test_auto_subscribe_on_thread_paginate_root_event() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (thread_events, mut thread_stream) = - room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await.unwrap(); // Sanity check: the sync event is added to the thread. let mut thread_events = wait_for_initial_events(thread_events, &mut thread_stream).await; @@ -802,7 +802,7 @@ async fn test_redact_touches_threads() { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); let (thread_events, mut thread_stream) = - room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await; + room_event_cache.subscribe_to_thread(thread_root_id.to_owned()).await.unwrap(); // Receive a thread root, and a threaded reply. s.server @@ -823,7 +823,7 @@ async fn test_redact_touches_threads() { assert_eq!(thread_events.remove(0).event_id().as_ref(), Some(&thread_resp1)); assert_eq!(thread_events.remove(0).event_id().as_ref(), Some(&thread_resp2)); - let (room_events, mut room_stream) = room_event_cache.subscribe().await; + let (room_events, mut room_stream) = room_event_cache.subscribe().await.unwrap(); assert_eq!(room_events.len(), 3); { diff --git a/labs/multiverse/src/main.rs b/labs/multiverse/src/main.rs index c569ca07a0d..226ab353758 100644 --- a/labs/multiverse/src/main.rs +++ b/labs/multiverse/src/main.rs @@ -379,7 +379,7 @@ impl App { async fn index_event_cache( client: &Client, update_sender: &Sender<(bool, IndexingMessage)>, - store: EventCacheStoreLockGuard<'_>, + store: &EventCacheStoreLockGuard, search_index_guard: &mut SearchIndexGuard<'_>, mut count: usize, ) -> Result { @@ -525,9 +525,14 @@ impl App { debug!("Start indexing from the event cache."); // First index everything in the cache - let Ok(count) = - App::index_event_cache(&client, &update_sender, store, &mut search_index_guard, count) - .await + let Ok(count) = App::index_event_cache( + &client, + &update_sender, + store.as_clean().expect("Only one process should access the event cache store"), + &mut search_index_guard, + count, + ) + .await else { debug!("Quitting index task."); return; @@ -948,6 +953,9 @@ async fn get_events_from_event_ids( event_ids: Vec, ) -> Vec { if let Ok(cache_lock) = client.event_cache_store().lock().await { + let cache_lock = + cache_lock.as_clean().expect("Only one process must access the event cache store"); + futures_util::future::join_all(event_ids.iter().map(|event_id| async { let event_id = event_id.clone(); match cache_lock.find_event(room.room_id(), &event_id).await { diff --git a/labs/multiverse/src/widgets/room_view/details/events.rs b/labs/multiverse/src/widgets/room_view/details/events.rs index bdfa5de33ce..9ea9acab3e7 100644 --- a/labs/multiverse/src/widgets/room_view/details/events.rs +++ b/labs/multiverse/src/widgets/room_view/details/events.rs @@ -28,7 +28,7 @@ impl Widget for &mut EventsView<'_> { let events = tokio::task::block_in_place(|| { Handle::current().block_on(async { let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); - room_event_cache.events().await + room_event_cache.events().await.unwrap() }) });