Skip to content

Commit 2417153

Browse files
committed
feat(sdk): Bulk process thread subscription updates from sync and companion enpoint
1 parent df99d2f commit 2417153

File tree

2 files changed

+58
-45
lines changed

2 files changed

+58
-45
lines changed

crates/matrix-sdk/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ All notable changes to this project will be documented in this file.
4141
([#5678](https://github.com/matrix-org/matrix-rust-sdk/pull/5678).
4242
- Add new API to decline calls ([MSC4310](https://github.com/matrix-org/matrix-spec-proposals/pull/4310)): `Room::make_decline_call_event` and `Room::subscribe_to_call_decline_events`
4343
([#5614](https://github.com/matrix-org/matrix-rust-sdk/pull/5614))
44+
- Use `StateStore::upsert_thread_subscriptions()` to bulk process thread subscription updates received
45+
via the sync response or from the MSC4308 companion endpoint.
46+
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
4447

4548
### Refactor
4649

crates/matrix-sdk/src/client/thread_subscriptions.rs

Lines changed: 55 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use matrix_sdk_base::{
2828
use matrix_sdk_common::executor::spawn;
2929
use once_cell::sync::OnceCell;
3030
use ruma::{
31-
OwnedEventId, OwnedRoomId,
31+
EventId, OwnedEventId, OwnedRoomId, RoomId,
3232
api::client::threads::get_thread_subscriptions_changes::unstable::{
3333
ThreadSubscription, ThreadUnsubscription,
3434
},
@@ -179,71 +179,69 @@ impl ThreadSubscriptionCatchup {
179179
unsubscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
180180
token: Option<ThreadSubscriptionCatchupToken>,
181181
) -> Result<()> {
182+
// Precompute the updates so we don't hold the guard for too long.
183+
let maybe_updates = self.build_subscription_updates(&subscribed, &unsubscribed);
182184
let Some(guard) = self.lock().await else {
183185
// Client is shutting down.
184186
return Ok(());
185187
};
188+
// Save the token even if there aren't any updates.
186189
self.save_catchup_token(&guard, token).await?;
187-
self.store_subscriptions(&guard, subscribed, unsubscribed).await?;
190+
if let Some(updates) = maybe_updates {
191+
trace!(
192+
"saving {} new subscriptions and {} unsubscriptions",
193+
subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
194+
unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
195+
);
196+
guard.client.state_store().upsert_thread_subscriptions(updates).await?;
197+
}
188198
Ok(())
189199
}
190200

191-
async fn store_subscriptions(
201+
/// Internal helper for building the thread subscription updates Vec.
202+
fn build_subscription_updates<'a>(
192203
&self,
193-
guard: &GuardedStoreAccess,
194-
subscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadSubscription>>,
195-
unsubscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
196-
) -> Result<()> {
204+
subscribed: &'a BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadSubscription>>,
205+
unsubscribed: &'a BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
206+
) -> Option<Vec<(&'a RoomId, &'a EventId, StoredThreadSubscription)>> {
197207
if subscribed.is_empty() && unsubscribed.is_empty() {
198-
// Nothing to do.
199-
return Ok(());
208+
return None;
200209
}
201210

202-
trace!(
203-
"saving {} new subscriptions and {} unsubscriptions",
204-
subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
205-
unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
206-
);
211+
let mut updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)> =
212+
Vec::with_capacity(unsubscribed.len() + subscribed.len());
207213

208214
// Take into account the new unsubscriptions.
209215
for (room_id, room_map) in unsubscribed {
210216
for (event_id, thread_sub) in room_map {
211-
guard
212-
.client
213-
.state_store()
214-
.upsert_thread_subscription(
215-
&room_id,
216-
&event_id,
217-
StoredThreadSubscription {
218-
status: ThreadSubscriptionStatus::Unsubscribed,
219-
bump_stamp: Some(thread_sub.bump_stamp.into()),
220-
},
221-
)
222-
.await?;
217+
updates.push((
218+
room_id,
219+
event_id,
220+
StoredThreadSubscription {
221+
status: ThreadSubscriptionStatus::Unsubscribed,
222+
bump_stamp: Some(thread_sub.bump_stamp.into()),
223+
},
224+
));
223225
}
224226
}
225227

226228
// Take into account the new subscriptions.
227229
for (room_id, room_map) in subscribed {
228230
for (event_id, thread_sub) in room_map {
229-
guard
230-
.client
231-
.state_store()
232-
.upsert_thread_subscription(
233-
&room_id,
234-
&event_id,
235-
StoredThreadSubscription {
236-
status: ThreadSubscriptionStatus::Subscribed {
237-
automatic: thread_sub.automatic,
238-
},
239-
bump_stamp: Some(thread_sub.bump_stamp.into()),
231+
updates.push((
232+
room_id,
233+
event_id,
234+
StoredThreadSubscription {
235+
status: ThreadSubscriptionStatus::Subscribed {
236+
automatic: thread_sub.automatic,
240237
},
241-
)
242-
.await?;
238+
bump_stamp: Some(thread_sub.bump_stamp.into()),
239+
},
240+
));
243241
}
244242
}
245243

246-
Ok(())
244+
Some(updates)
247245
}
248246

249247
/// Internal helper to lock writes to the thread subscriptions catchup
@@ -348,16 +346,28 @@ impl ThreadSubscriptionCatchup {
348346

349347
match client.send(req).await {
350348
Ok(resp) => {
349+
// Precompute the updates so we don't hold the guard for too long.
350+
let maybe_updates =
351+
this.build_subscription_updates(&resp.subscribed, &resp.unsubscribed);
352+
351353
let guard = this
352354
.lock()
353355
.await
354356
.expect("a client instance is alive, so the locking should not fail");
355357

356-
if let Err(err) =
357-
this.store_subscriptions(&guard, resp.subscribed, resp.unsubscribed).await
358-
{
359-
warn!("Failed to store caught up thread subscriptions: {err}");
360-
continue;
358+
if let Some(updates) = maybe_updates {
359+
trace!(
360+
"saving {} new subscriptions and {} unsubscriptions",
361+
resp.subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
362+
resp.unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
363+
);
364+
365+
if let Err(err) =
366+
guard.client.state_store().upsert_thread_subscriptions(updates).await
367+
{
368+
warn!("Failed to store caught up thread subscriptions: {err}");
369+
continue;
370+
}
361371
}
362372

363373
// Refresh the tokens, as the list might have changed while we sent the

0 commit comments

Comments
 (0)