Skip to content

Commit 9a19738

Browse files
committed
feat(sdk): Bulk process thread subscription updates from sync and companion enpoint
1 parent 540c60b commit 9a19738

File tree

2 files changed

+75
-64
lines changed

2 files changed

+75
-64
lines changed

crates/matrix-sdk/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ All notable changes to this project will be documented in this file.
4141
([#5678](https://github.com/matrix-org/matrix-rust-sdk/pull/5678).
4242
- Add new API to decline calls ([MSC4310](https://github.com/matrix-org/matrix-spec-proposals/pull/4310)): `Room::make_decline_call_event` and `Room::subscribe_to_call_decline_events`
4343
([#5614](https://github.com/matrix-org/matrix-rust-sdk/pull/5614))
44+
- Use `StateStore::upsert_thread_subscriptions()` to bulk process thread subscription updates received
45+
via the sync response or from the MSC4308 companion endpoint.
46+
([#5848](https://github.com/matrix-org/matrix-rust-sdk/pull/5848))
4447

4548
### Refactor
4649

crates/matrix-sdk/src/client/thread_subscriptions.rs

Lines changed: 72 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use matrix_sdk_base::{
2828
use matrix_sdk_common::executor::spawn;
2929
use once_cell::sync::OnceCell;
3030
use ruma::{
31-
OwnedEventId, OwnedRoomId,
31+
EventId, OwnedEventId, OwnedRoomId, RoomId,
3232
api::client::threads::get_thread_subscriptions_changes::unstable::{
3333
ThreadSubscription, ThreadUnsubscription,
3434
},
@@ -179,70 +179,21 @@ impl ThreadSubscriptionCatchup {
179179
unsubscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
180180
token: Option<ThreadSubscriptionCatchupToken>,
181181
) -> Result<()> {
182+
// Precompute the updates so we don't hold the guard for too long.
183+
let maybe_updates = build_subscription_updates(&subscribed, &unsubscribed);
182184
let Some(guard) = self.lock().await else {
183185
// Client is shutting down.
184186
return Ok(());
185187
};
186188
self.save_catchup_token(&guard, token).await?;
187-
self.store_subscriptions(&guard, subscribed, unsubscribed).await?;
188-
Ok(())
189-
}
190-
191-
async fn store_subscriptions(
192-
&self,
193-
guard: &GuardedStoreAccess,
194-
subscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadSubscription>>,
195-
unsubscribed: BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
196-
) -> Result<()> {
197-
if subscribed.is_empty() && unsubscribed.is_empty() {
198-
// Nothing to do.
199-
return Ok(());
200-
}
201-
202-
trace!(
203-
"saving {} new subscriptions and {} unsubscriptions",
204-
subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
205-
unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
206-
);
207-
208-
// Take into account the new unsubscriptions.
209-
for (room_id, room_map) in unsubscribed {
210-
for (event_id, thread_sub) in room_map {
211-
guard
212-
.client
213-
.state_store()
214-
.upsert_thread_subscription(
215-
&room_id,
216-
&event_id,
217-
StoredThreadSubscription {
218-
status: ThreadSubscriptionStatus::Unsubscribed,
219-
bump_stamp: Some(thread_sub.bump_stamp.into()),
220-
},
221-
)
222-
.await?;
223-
}
224-
}
225-
226-
// Take into account the new subscriptions.
227-
for (room_id, room_map) in subscribed {
228-
for (event_id, thread_sub) in room_map {
229-
guard
230-
.client
231-
.state_store()
232-
.upsert_thread_subscription(
233-
&room_id,
234-
&event_id,
235-
StoredThreadSubscription {
236-
status: ThreadSubscriptionStatus::Subscribed {
237-
automatic: thread_sub.automatic,
238-
},
239-
bump_stamp: Some(thread_sub.bump_stamp.into()),
240-
},
241-
)
242-
.await?;
243-
}
189+
if let Some(updates) = maybe_updates {
190+
trace!(
191+
"saving {} new subscriptions and {} unsubscriptions",
192+
subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
193+
unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
194+
);
195+
guard.client.state_store().upsert_thread_subscriptions(updates).await?;
244196
}
245-
246197
Ok(())
247198
}
248199

@@ -348,16 +299,28 @@ impl ThreadSubscriptionCatchup {
348299

349300
match client.send(req).await {
350301
Ok(resp) => {
302+
// Precompute the updates so we don't hold the guard for too long.
303+
let maybe_updates =
304+
build_subscription_updates(&resp.subscribed, &resp.unsubscribed);
305+
351306
let guard = this
352307
.lock()
353308
.await
354309
.expect("a client instance is alive, so the locking should not fail");
355310

356-
if let Err(err) =
357-
this.store_subscriptions(&guard, resp.subscribed, resp.unsubscribed).await
358-
{
359-
warn!("Failed to store caught up thread subscriptions: {err}");
360-
continue;
311+
if let Some(updates) = maybe_updates {
312+
trace!(
313+
"saving {} new subscriptions and {} unsubscriptions",
314+
resp.subscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
315+
resp.unsubscribed.values().map(|by_room| by_room.len()).sum::<usize>(),
316+
);
317+
318+
if let Err(err) =
319+
guard.client.state_store().upsert_thread_subscriptions(updates).await
320+
{
321+
warn!("Failed to store caught up thread subscriptions: {err}");
322+
continue;
323+
}
361324
}
362325

363326
// Refresh the tokens, as the list might have changed while we sent the
@@ -398,6 +361,51 @@ impl ThreadSubscriptionCatchup {
398361
}
399362
}
400363

364+
/// Internal helper for building the thread subscription updates Vec.
365+
fn build_subscription_updates<'a>(
366+
subscribed: &'a BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadSubscription>>,
367+
unsubscribed: &'a BTreeMap<OwnedRoomId, BTreeMap<OwnedEventId, ThreadUnsubscription>>,
368+
) -> Option<Vec<(&'a RoomId, &'a EventId, StoredThreadSubscription)>> {
369+
if subscribed.is_empty() && unsubscribed.is_empty() {
370+
return None;
371+
}
372+
373+
let mut updates: Vec<(&RoomId, &EventId, StoredThreadSubscription)> =
374+
Vec::with_capacity(unsubscribed.len() + subscribed.len());
375+
376+
// Take into account the new unsubscriptions.
377+
for (room_id, room_map) in unsubscribed {
378+
for (event_id, thread_sub) in room_map {
379+
updates.push((
380+
room_id,
381+
event_id,
382+
StoredThreadSubscription {
383+
status: ThreadSubscriptionStatus::Unsubscribed,
384+
bump_stamp: Some(thread_sub.bump_stamp.into()),
385+
},
386+
));
387+
}
388+
}
389+
390+
// Take into account the new subscriptions.
391+
for (room_id, room_map) in subscribed {
392+
for (event_id, thread_sub) in room_map {
393+
updates.push((
394+
room_id,
395+
event_id,
396+
StoredThreadSubscription {
397+
status: ThreadSubscriptionStatus::Subscribed {
398+
automatic: thread_sub.automatic,
399+
},
400+
bump_stamp: Some(thread_sub.bump_stamp.into()),
401+
},
402+
));
403+
}
404+
}
405+
406+
Some(updates)
407+
}
408+
401409
#[cfg(test)]
402410
mod tests {
403411
use std::ops::Not as _;

0 commit comments

Comments
 (0)