Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/matrix-sdk-base/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ All notable changes to this project will be documented in this file.
([#5817](https://github.com/matrix-org/matrix-rust-sdk/pull/5817))
- `ComposerDraft` can now store attachments alongside text messages.
([#5794](https://github.com/matrix-org/matrix-rust-sdk/pull/5794))
- Add `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))

## [0.14.1] - 2025-09-10

Expand Down
185 changes: 185 additions & 0 deletions crates/matrix-sdk-base/src/store/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ pub trait StateStoreIntegrationTests {
async fn test_thread_subscriptions(&self) -> TestResult;
/// Test thread subscription bumpstamp semantics.
async fn test_thread_subscriptions_bumpstamps(&self) -> TestResult;
/// Test thread subscriptions bulk upsert, including bumpstamp semantics.
async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult;
}

impl StateStoreIntegrationTests for DynStateStore {
Expand Down Expand Up @@ -1955,6 +1957,183 @@ impl StateStoreIntegrationTests for DynStateStore {

Ok(())
}

async fn test_thread_subscriptions_bulk_upsert(&self) -> TestResult {
let threads = [
event_id!("$t1"),
event_id!("$t2"),
event_id!("$t3"),
event_id!("$t4"),
event_id!("$t5"),
event_id!("$t6"),
];
// Helper for building the input for `upsert_thread_subscriptions()`,
// which is of the type: Vec<(&RoomId, &EventId, StoredThreadSubscription)>
let build_subscription_updates = |subs: &[StoredThreadSubscription]| {
threads
.iter()
.zip(subs)
.map(|(&event_id, &sub)| (room_id(), event_id, sub))
.collect::<Vec<_>>()
};

// Test bump_stamp logic
let initial_subscriptions = build_subscription_updates(&[
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(14),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: None,
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(210),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(5),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(100),
},
]);

let update_subscriptions = build_subscription_updates(&[
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: None,
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: None,
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(1101),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(222),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(1),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(100),
},
]);

let expected_subscriptions = build_subscription_updates(&[
// Status should be updated, because prev and new bump_stamp are both None
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: None,
},
// Status should be updated, but keep initial bump_stamp (new is None)
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(14),
},
// Status should be updated and also bump_stamp should be updated (initial was None)
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(1101),
},
// Status should be updated and also bump_stamp should be updated (initial was lower)
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(222),
},
// Status shouldn't change, as new bump_stamp is lower
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(5),
},
// Status shouldn't change, as bump_stamp is equal to the previous one
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(100),
},
]);

// Set the initial subscriptions
self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;

// Assert the subscriptions have been added
for (room_id, thread_id, expected_sub) in &initial_subscriptions {
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
assert_eq!(stored_subscription, Some(*expected_sub));
}

// Update subscriptions
self.upsert_thread_subscriptions(update_subscriptions).await?;

// Assert the expected subscriptions and bump_stamps
for (room_id, thread_id, expected_sub) in &expected_subscriptions {
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
assert_eq!(stored_subscription, Some(*expected_sub));
}

// Test just state changes, but first remove previous subscriptions
for (room_id, thread_id, _) in &expected_subscriptions {
self.remove_thread_subscription(room_id, thread_id).await?;
}

let initial_subscriptions = build_subscription_updates(&[
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(1),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: Some(1),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(1),
},
]);

self.upsert_thread_subscriptions(initial_subscriptions.clone()).await?;

for (room_id, thread_id, expected_sub) in &initial_subscriptions {
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
assert_eq!(stored_subscription, Some(*expected_sub));
}

let update_subscriptions = build_subscription_updates(&[
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: true },
bump_stamp: Some(2),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Unsubscribed,
bump_stamp: Some(2),
},
StoredThreadSubscription {
status: ThreadSubscriptionStatus::Subscribed { automatic: false },
bump_stamp: Some(2),
},
]);

self.upsert_thread_subscriptions(update_subscriptions.clone()).await?;

for (room_id, thread_id, expected_sub) in &update_subscriptions {
let stored_subscription = self.load_thread_subscription(room_id, thread_id).await?;
assert_eq!(stored_subscription, Some(*expected_sub));
}

Ok(())
}
}

/// Macro building to allow your StateStore implementation to run the entire
Expand Down Expand Up @@ -2141,6 +2320,12 @@ macro_rules! statestore_integration_tests {
let store = get_store().await?.into_state_store();
store.test_thread_subscriptions_bumpstamps().await
}

#[async_test]
async fn test_thread_subscriptions_bulk_upsert() -> TestResult {
let store = get_store().await?.into_state_store();
store.test_thread_subscriptions_bulk_upsert().await
}
}
};
}
Expand Down
27 changes: 27 additions & 0 deletions crates/matrix-sdk-base/src/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,33 @@ impl StateStore for MemoryStore {
Ok(())
}

async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
let mut inner = self.inner.write().unwrap();

for (room_id, thread_id, mut new) in updates {
let room_subs = inner.thread_subscriptions.entry(room_id.to_owned()).or_default();

if let Some(previous) = room_subs.get(thread_id) {
if *previous == new {
continue;
}
if !compare_thread_subscription_bump_stamps(
previous.bump_stamp,
&mut new.bump_stamp,
) {
continue;
}
}

room_subs.insert(thread_id.to_owned(), new);
}

Ok(())
}

async fn load_thread_subscription(
&self,
room: &RoomId,
Expand Down
23 changes: 22 additions & 1 deletion crates/matrix-sdk-base/src/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ pub trait StateStore: AsyncTraitDeps {
/// bumpstamp is kept.
///
/// If the new thread subscription has a bumpstamp that's lower than or
/// equal to a previously one, the existing subscription is kept, i.e.
/// equal to a previous one, the existing subscription is kept, i.e.
/// this method must have no effect.
async fn upsert_thread_subscription(
&self,
Expand All @@ -496,6 +496,20 @@ pub trait StateStore: AsyncTraitDeps {
subscription: StoredThreadSubscription,
) -> Result<(), Self::Error>;

/// Inserts or updates multiple thread subscriptions.
///
/// If the new thread subscription hasn't set a bumpstamp, and there was a
/// previous subscription in the database with a bumpstamp, the existing
/// bumpstamp is kept.
///
/// If the new thread subscription has a bumpstamp that's lower than or
/// equal to a previous one, the existing subscription is kept, i.e.
/// this method must have no effect.
async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error>;

/// Remove a previous thread subscription for a given room and thread.
///
/// Note: removing an unknown thread subscription is a no-op.
Expand Down Expand Up @@ -817,6 +831,13 @@ impl<T: StateStore> StateStore for EraseStateStoreError<T> {
self.0.upsert_thread_subscription(room, thread_id, subscription).await.map_err(Into::into)
}

async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<(), Self::Error> {
self.0.upsert_thread_subscriptions(updates).await.map_err(Into::into)
}

async fn load_thread_subscription(
&self,
room: &RoomId,
Expand Down
2 changes: 2 additions & 0 deletions crates/matrix-sdk-indexeddb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ All notable changes to this project will be documented in this file.
([#5819](https://github.com/matrix-org/matrix-rust-sdk/pull/5819))
- [**breaking**] `IndexeddbCryptoStore::get_withheld_info` now returns `Result<Option<RoomKeyWithheldEntry>, ...>`.
([#5737](https://github.com/matrix-org/matrix-rust-sdk/pull/5737))
- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))

### Performance

Expand Down
41 changes: 41 additions & 0 deletions crates/matrix-sdk-indexeddb/src/state_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1964,6 +1964,47 @@ impl_state_store!({
Ok(())
}

async fn upsert_thread_subscriptions(
&self,
updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)>,
) -> Result<()> {
let tx = self
.inner
.transaction(keys::THREAD_SUBSCRIPTIONS)
.with_mode(TransactionMode::Readwrite)
.build()?;
let obj = tx.object_store(keys::THREAD_SUBSCRIPTIONS)?;

for (room_id, thread_id, subscription) in updates {
let encoded_key = self.encode_key(keys::THREAD_SUBSCRIPTIONS, (room_id, thread_id));
let mut new = PersistedThreadSubscription::from(subscription);

// See if there's a previous subscription.
if let Some(previous_value) = obj.get(&encoded_key).await? {
let previous: PersistedThreadSubscription =
self.deserialize_value(&previous_value)?;

// If the previous status is the same as the new one, don't do anything.
if new == previous {
continue;
}
if !compare_thread_subscription_bump_stamps(
previous.bump_stamp,
&mut new.bump_stamp,
) {
continue;
}
}

let serialized_value = self.serialize_value(&new);
obj.put(&serialized_value?).with_key(encoded_key).build()?;
}

tx.commit().await?;

Ok(())
}

async fn load_thread_subscription(
&self,
room: &RoomId,
Expand Down
2 changes: 2 additions & 0 deletions crates/matrix-sdk-sqlite/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ All notable changes to this project will be documented in this file.

- Implement a new constructor that allows to open `SqliteCryptoStore` with a cryptographic key
([#5472](https://github.com/matrix-org/matrix-rust-sdk/pull/5472))
- Implement `StateStore::upsert_thread_subscriptions()` method for bulk upserts.
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))

### Refactor
- [breaking] Change the logic for opening a store so as to use a `Secret` enum in the function `open_with_pool` instead of a `passphrase`
Expand Down
Loading
Loading