diff --git a/crates/matrix-sdk-base/CHANGELOG.md b/crates/matrix-sdk-base/CHANGELOG.md index 2e4a0130ff4..a3105fcf937 100644 --- a/crates/matrix-sdk-base/CHANGELOG.md +++ b/crates/matrix-sdk-base/CHANGELOG.md @@ -20,6 +20,8 @@ All notable changes to this project will be documented in this file. ([#5817](https://github.com/matrix-org/matrix-rust-sdk/pull/5817)) - `ComposerDraft` can now store attachments alongside text messages. ([#5794](https://github.com/matrix-org/matrix-rust-sdk/pull/5794)) +- Add `StateStore::upsert_thread_subscriptions()` method for bulk upserts. + ([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848)) ## [0.14.1] - 2025-09-10 diff --git a/crates/matrix-sdk-base/src/store/integration_tests.rs b/crates/matrix-sdk-base/src/store/integration_tests.rs index 5b5f8f5a835..c0da97519ec 100644 --- a/crates/matrix-sdk-base/src/store/integration_tests.rs +++ b/crates/matrix-sdk-base/src/store/integration_tests.rs @@ -109,6 +109,8 @@ pub trait StateStoreIntegrationTests { async fn test_thread_subscriptions(&self) -> TestResult; /// Test thread subscription bumpstamp semantics. async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult; + /// Test thread subscriptions bulk upsert, including bumpstamp semantics. + async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult; } impl StateStoreIntegrationTests for DynStateStore { @@ -1955,6 +1957,183 @@ impl StateStoreIntegrationTests for DynStateStore { Ok(()) } + + async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult { + let threads = [ + event_id!("$t1"), + event_id!("$t2"), + event_id!("$t3"), + event_id!("$t4"), + event_id!("$t5"), + event_id!("$t6"), + ]; + // Helper for building the input for `upsert_thread_subscriptions()`, + // which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)> + let build_subscription_updates = |subs: &[StoredThreadSubscription]| { + threads + .iter() + .zip(subs) + .map(|(&event_id, &sub)| (room_id(), event_id, sub)) + .collect::>() + }; + + // Test bump_stamp logic + let initial_subscriptions = build_subscription_updates(&[ + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: None, + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(14), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: None, + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(210), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(5), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(100), + }, + ]); + + let update_subscriptions = build_subscription_updates(&[ + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: None, + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: None, + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(1101), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(222), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(1), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(100), + }, + ]); + + let expected_subscriptions = build_subscription_updates(&[ + // Status should be updated, because prev and new bump_stamp are both None + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: None, + }, + // Status should be updated, but keep initial bump_stamp (new is None) + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(14), + }, + // Status should be updated and also bump_stamp should be updated (initial was None) + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(1101), + }, + // Status should be updated and also bump_stamp should be updated (initial was lower) + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(222), + }, + // Status shouldn't change, as new bump_stamp is lower + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(5), + }, + // Status shouldn't change, as bump_stamp is equal to the previous one + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(100), + }, + ]); + + // Set the initial subscriptions + self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?; + + // Assert the subscriptions have been added + for (room_id, thread_id, expected_sub) in &initial_subscriptions { + let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?; + assert_eq!(stored_subscription, Some(*expected_sub)); + } + + // Update subscriptions + self.upsert_thread_subscriptions(update_subscriptions).await?; + + // Assert the expected subscriptions and bump_stamps + for (room_id, thread_id, expected_sub) in &expected_subscriptions { + let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?; + assert_eq!(stored_subscription, Some(*expected_sub)); + } + + // Test just state changes, but first remove previous subscriptions + for (room_id, thread_id, _) in &expected_subscriptions { + self.remove_thread_subscription(room_id, thread_id).await?; + } + + let initial_subscriptions = build_subscription_updates(&[ + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(1), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: false }, + bump_stamp: Some(1), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(1), + }, + ]); + + self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?; + + for (room_id, thread_id, expected_sub) in &initial_subscriptions { + let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?; + assert_eq!(stored_subscription, Some(*expected_sub)); + } + + let update_subscriptions = build_subscription_updates(&[ + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: true }, + bump_stamp: Some(2), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(2), + }, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { automatic: false }, + bump_stamp: Some(2), + }, + ]); + + self.upsert_thread_subscriptions(update_subscriptions.clone()).await?; + + for (room_id, thread_id, expected_sub) in &update_subscriptions { + let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?; + assert_eq!(stored_subscription, Some(*expected_sub)); + } + + Ok(()) + } } /// Macro building to allow your StateStore implementation to run the entire @@ -2141,6 +2320,12 @@ macro_rules! statestore_integration_tests { let store = get_store().await?.into_state_store(); store.test_thread_subscriptions_bumpstamps().await } + + #[async_test] + async fn test_thread_subscriptions_bulk_upsert() -> TestResult { + let store = get_store().await?.into_state_store(); + store.test_thread_subscriptions_bulk_upsert().await + } } }; } diff --git a/crates/matrix-sdk-base/src/store/memory_store.rs b/crates/matrix-sdk-base/src/store/memory_store.rs index 6ffedcf686a..f5fc018ffa1 100644 --- a/crates/matrix-sdk-base/src/store/memory_store.rs +++ b/crates/matrix-sdk-base/src/store/memory_store.rs @@ -1005,6 +1005,33 @@ impl StateStore for MemoryStore { Ok(()) } + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<(), Self::Error> { + let mut inner = self.inner.write().unwrap(); + + for (room_id, thread_id, mut new) in updates { + let room_subs = inner.thread_subscriptions.entry(room_id.to_owned()).or_default(); + + if let Some(previous) = room_subs.get(thread_id) { + if *previous == new { + continue; + } + if !compare_thread_subscription_bump_stamps( + previous.bump_stamp, + &mut new.bump_stamp, + ) { + continue; + } + } + + room_subs.insert(thread_id.to_owned(), new); + } + + Ok(()) + } + async fn load_thread_subscription( &self, room: &RoomId, diff --git a/crates/matrix-sdk-base/src/store/traits.rs b/crates/matrix-sdk-base/src/store/traits.rs index fccce238e94..1bf4b604a19 100644 --- a/crates/matrix-sdk-base/src/store/traits.rs +++ b/crates/matrix-sdk-base/src/store/traits.rs @@ -487,7 +487,7 @@ pub trait StateStore: AsyncTraitDeps { /// bumpstamp is kept. /// /// If the new thread subscription has a bumpstamp that's lower than or - /// equal to a previously one, the existing subscription is kept, i.e. + /// equal to a previous one, the existing subscription is kept, i.e. /// this method must have no effect. async fn upsert_thread_subscription( &self, @@ -496,6 +496,20 @@ pub trait StateStore: AsyncTraitDeps { subscription: StoredThreadSubscription, ) -> Result<(), Self::Error>; + /// Inserts or updates multiple thread subscriptions. + /// + /// If the new thread subscription hasn't set a bumpstamp, and there was a + /// previous subscription in the database with a bumpstamp, the existing + /// bumpstamp is kept. + /// + /// If the new thread subscription has a bumpstamp that's lower than or + /// equal to a previous one, the existing subscription is kept, i.e. + /// this method must have no effect. + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<(), Self::Error>; + /// Remove a previous thread subscription for a given room and thread. /// /// Note: removing an unknown thread subscription is a no-op. @@ -817,6 +831,13 @@ impl StateStore for EraseStateStoreError { self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into) } + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<(), Self::Error> { + self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into) + } + async fn load_thread_subscription( &self, room: &RoomId, diff --git a/crates/matrix-sdk-indexeddb/CHANGELOG.md b/crates/matrix-sdk-indexeddb/CHANGELOG.md index a4f2ae92bf9..441e54e3032 100644 --- a/crates/matrix-sdk-indexeddb/CHANGELOG.md +++ b/crates/matrix-sdk-indexeddb/CHANGELOG.md @@ -12,6 +12,8 @@ All notable changes to this project will be documented in this file. ([#5819](https://github.com/matrix-org/matrix-rust-sdk/pull/5819)) - [**breaking**] `IndexeddbCryptoStore::get_withheld_info` now returns `Result, ...>`. ([#5737](https://github.com/matrix-org/matrix-rust-sdk/pull/5737)) +- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts. + ([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848)) ### Performance diff --git a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs index 99107b9e6ed..89303e89338 100644 --- a/crates/matrix-sdk-indexeddb/src/state_store/mod.rs +++ b/crates/matrix-sdk-indexeddb/src/state_store/mod.rs @@ -1964,6 +1964,47 @@ impl_state_store!({ Ok(()) } + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<()> { + let tx = self + .inner + .transaction(keys::THREAD_SUBSCRIPTIONS) + .with_mode(TransactionMode::Readwrite) + .build()?; + let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?; + + for (room_id, thread_id, subscription) in updates { + let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id)); + let mut new = PersistedThreadSubscription::from(subscription); + + // See if there's a previous subscription. + if let Some(previous_value) = obj.get(&encoded_key).await? { + let previous: PersistedThreadSubscription = + self.deserialize_value(&previous_value)?; + + // If the previous status is the same as the new one, don't do anything. + if new == previous { + continue; + } + if !compare_thread_subscription_bump_stamps( + previous.bump_stamp, + &mut new.bump_stamp, + ) { + continue; + } + } + + let serialized_value = self.serialize_value(&new); + obj.put(&serialized_value?).with_key(encoded_key).build()?; + } + + tx.commit().await?; + + Ok(()) + } + async fn load_thread_subscription( &self, room: &RoomId, diff --git a/crates/matrix-sdk-sqlite/CHANGELOG.md b/crates/matrix-sdk-sqlite/CHANGELOG.md index 1ac0d9d2cea..19bc44ec97d 100644 --- a/crates/matrix-sdk-sqlite/CHANGELOG.md +++ b/crates/matrix-sdk-sqlite/CHANGELOG.md @@ -15,6 +15,8 @@ All notable changes to this project will be documented in this file. - Implement a new constructor that allows to open `SqliteCryptoStore` with a cryptographic key ([#5472](https://github.com/matrix-org/matrix-rust-sdk/pull/5472)) +- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts. + ([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848)) ### Refactor - [breaking] Change the logic for opening a store so as to use a `Secret` enum in the function `open_with_pool` instead of a `passphrase` diff --git a/crates/matrix-sdk-sqlite/src/state_store.rs b/crates/matrix-sdk-sqlite/src/state_store.rs index cca55782817..c9decd43335 100644 --- a/crates/matrix-sdk-sqlite/src/state_store.rs +++ b/crates/matrix-sdk-sqlite/src/state_store.rs @@ -2210,6 +2210,57 @@ impl StateStore for SqliteStateStore { Ok(()) } + async fn upsert_thread_subscriptions( + &self, + updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>, + ) -> Result<(), Self::Error> { + let values: Vec<_> = updates + .into_iter() + .map(|(room_id, thread_id, subscription)| { + ( + self.encode_key(keys::THREAD_SUBSCRIPTIONS, room_id), + self.encode_key(keys::THREAD_SUBSCRIPTIONS, thread_id), + subscription.status.as_str(), + subscription.bump_stamp, + ) + }) + .collect(); + + self.write() + .await + .with_transaction(move |txn| { + let mut txn = txn.prepare_cached( + "INSERT INTO thread_subscriptions (room_id, event_id, status, bump_stamp) + VALUES (?, ?, ?, ?) + ON CONFLICT (room_id, event_id) DO UPDATE + SET + status = + CASE + WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.status + WHEN EXCLUDED.bump_stamp IS NULL THEN EXCLUDED.status + WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.status + ELSE thread_subscriptions.status + END, + bump_stamp = + CASE + WHEN thread_subscriptions.bump_stamp IS NULL THEN EXCLUDED.bump_stamp + WHEN EXCLUDED.bump_stamp IS NULL THEN thread_subscriptions.bump_stamp + WHEN thread_subscriptions.bump_stamp < EXCLUDED.bump_stamp THEN EXCLUDED.bump_stamp + ELSE thread_subscriptions.bump_stamp + END", + )?; + + for value in values { + txn.execute(value)?; + } + + Result::<_, Error>::Ok(()) + }) + .await?; + + Ok(()) + } + async fn load_thread_subscription( &self, room_id: &RoomId, diff --git a/crates/matrix-sdk/CHANGELOG.md b/crates/matrix-sdk/CHANGELOG.md index 3d5fbc9c6e3..9558eb2053b 100644 --- a/crates/matrix-sdk/CHANGELOG.md +++ b/crates/matrix-sdk/CHANGELOG.md @@ -41,6 +41,9 @@ All notable changes to this project will be documented in this file. ([#5678](https://github.com/matrix-org/matrix-rust-sdk/pull/5678). - Add new API to decline calls ([MSC4310](https://github.com/matrix-org/matrix-spec-proposals/pull/4310)): `Room::make_decline_call_event` and `Room::subscribe_to_call_decline_events` ([#5614](https://github.com/matrix-org/matrix-rust-sdk/pull/5614)) +- Use `StateStore::upsert_thread_subscriptions()` to bulk process thread subscription updates received + via the sync response or from the MSC4308 companion endpoint. + ([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848)) ### Refactor diff --git a/crates/matrix-sdk/src/client/thread_subscriptions.rs b/crates/matrix-sdk/src/client/thread_subscriptions.rs index 45a9afe16c9..fc359da8e59 100644 --- a/crates/matrix-sdk/src/client/thread_subscriptions.rs +++ b/crates/matrix-sdk/src/client/thread_subscriptions.rs @@ -28,7 +28,7 @@ use matrix_sdk_base::{ use matrix_sdk_common::executor::spawn; use once_cell::sync::OnceCell; use ruma::{ - OwnedEventId, OwnedRoomId, + EventId, OwnedEventId, OwnedRoomId, RoomId, api::client::threads::get_thread_subscriptions_changes::unstable::{ ThreadSubscription, ThreadUnsubscription, }, @@ -179,70 +179,21 @@ impl ThreadSubscriptionCatchup { unsubscribed: BTreeMap>, token: Option, ) -> Result<()> { + // Precompute the updates so we don't hold the guard for too long. + let updates = build_subscription_updates(&subscribed, &unsubscribed); let Some(guard) = self.lock().await else { // Client is shutting down. return Ok(()); }; self.save_catchup_token(&guard, token).await?; - self.store_subscriptions(&guard, subscribed, unsubscribed).await?; - Ok(()) - } - - async fn store_subscriptions( - &self, - guard: &GuardedStoreAccess, - subscribed: BTreeMap>, - unsubscribed: BTreeMap>, - ) -> Result<()> { - if subscribed.is_empty() && unsubscribed.is_empty() { - // Nothing to do. - return Ok(()); - } - - trace!( - "saving {} new subscriptions and {} unsubscriptions", - subscribed.values().map(|by_room| by_room.len()).sum::(), - unsubscribed.values().map(|by_room| by_room.len()).sum::(), - ); - - // Take into account the new unsubscriptions. - for (room_id, room_map) in unsubscribed { - for (event_id, thread_sub) in room_map { - guard - .client - .state_store() - .upsert_thread_subscription( - &room_id, - &event_id, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Unsubscribed, - bump_stamp: Some(thread_sub.bump_stamp.into()), - }, - ) - .await?; - } - } - - // Take into account the new subscriptions. - for (room_id, room_map) in subscribed { - for (event_id, thread_sub) in room_map { - guard - .client - .state_store() - .upsert_thread_subscription( - &room_id, - &event_id, - StoredThreadSubscription { - status: ThreadSubscriptionStatus::Subscribed { - automatic: thread_sub.automatic, - }, - bump_stamp: Some(thread_sub.bump_stamp.into()), - }, - ) - .await?; - } + if !updates.is_empty() { + trace!( + "saving {} new subscriptions and {} unsubscriptions", + subscribed.values().map(|by_room| by_room.len()).sum::(), + unsubscribed.values().map(|by_room| by_room.len()).sum::(), + ); + guard.client.state_store().upsert_thread_subscriptions(updates).await?; } - Ok(()) } @@ -348,16 +299,27 @@ impl ThreadSubscriptionCatchup { match client.send(req).await { Ok(resp) => { + // Precompute the updates so we don't hold the guard for too long. + let updates = build_subscription_updates(&resp.subscribed, &resp.unsubscribed); + let guard = this .lock() .await .expect("a client instance is alive, so the locking should not fail"); - if let Err(err) = - this.store_subscriptions(&guard, resp.subscribed, resp.unsubscribed).await - { - warn!("Failed to store caught up thread subscriptions: {err}"); - continue; + if !updates.is_empty() { + trace!( + "saving {} new subscriptions and {} unsubscriptions", + resp.subscribed.values().map(|by_room| by_room.len()).sum::(), + resp.unsubscribed.values().map(|by_room| by_room.len()).sum::(), + ); + + if let Err(err) = + guard.client.state_store().upsert_thread_subscriptions(updates).await + { + warn!("Failed to store caught up thread subscriptions: {err}"); + continue; + } } // Refresh the tokens, as the list might have changed while we sent the @@ -398,6 +360,47 @@ impl ThreadSubscriptionCatchup { } } +/// Internal helper for building the thread subscription updates Vec. +fn build_subscription_updates<'a>( + subscribed: &'a BTreeMap>, + unsubscribed: &'a BTreeMap>, +) -> Vec<(&'a RoomId, &'a EventId, StoredThreadSubscription)> { + let mut updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)> = + Vec::with_capacity(unsubscribed.len() + subscribed.len()); + + // Take into account the new unsubscriptions. + for (room_id, room_map) in unsubscribed { + for (event_id, thread_sub) in room_map { + updates.push(( + room_id, + event_id, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Unsubscribed, + bump_stamp: Some(thread_sub.bump_stamp.into()), + }, + )); + } + } + + // Take into account the new subscriptions. + for (room_id, room_map) in subscribed { + for (event_id, thread_sub) in room_map { + updates.push(( + room_id, + event_id, + StoredThreadSubscription { + status: ThreadSubscriptionStatus::Subscribed { + automatic: thread_sub.automatic, + }, + bump_stamp: Some(thread_sub.bump_stamp.into()), + }, + )); + } + } + + updates +} + #[cfg(test)] mod tests { use std::ops::Not as _;