@@ -28,7 +28,7 @@ use matrix_sdk_base::{
2828use matrix_sdk_common:: executor:: spawn;
2929use once_cell:: sync:: OnceCell ;
3030use ruma:: {
31- OwnedEventId , OwnedRoomId ,
31+ EventId , OwnedEventId , OwnedRoomId , RoomId ,
3232 api:: client:: threads:: get_thread_subscriptions_changes:: unstable:: {
3333 ThreadSubscription , ThreadUnsubscription ,
3434 } ,
@@ -179,70 +179,21 @@ impl ThreadSubscriptionCatchup {
179179 unsubscribed : BTreeMap < OwnedRoomId , BTreeMap < OwnedEventId , ThreadUnsubscription > > ,
180180 token : Option < ThreadSubscriptionCatchupToken > ,
181181 ) -> Result < ( ) > {
182+ // Precompute the updates so we don't hold the guard for too long.
183+ let updates = build_subscription_updates ( & subscribed, & unsubscribed) ;
182184 let Some ( guard) = self . lock ( ) . await else {
183185 // Client is shutting down.
184186 return Ok ( ( ) ) ;
185187 } ;
186188 self . save_catchup_token ( & guard, token) . await ?;
187- self . store_subscriptions ( & guard, subscribed, unsubscribed) . await ?;
188- Ok ( ( ) )
189- }
190-
191- async fn store_subscriptions (
192- & self ,
193- guard : & GuardedStoreAccess ,
194- subscribed : BTreeMap < OwnedRoomId , BTreeMap < OwnedEventId , ThreadSubscription > > ,
195- unsubscribed : BTreeMap < OwnedRoomId , BTreeMap < OwnedEventId , ThreadUnsubscription > > ,
196- ) -> Result < ( ) > {
197- if subscribed. is_empty ( ) && unsubscribed. is_empty ( ) {
198- // Nothing to do.
199- return Ok ( ( ) ) ;
200- }
201-
202- trace ! (
203- "saving {} new subscriptions and {} unsubscriptions" ,
204- subscribed. values( ) . map( |by_room| by_room. len( ) ) . sum:: <usize >( ) ,
205- unsubscribed. values( ) . map( |by_room| by_room. len( ) ) . sum:: <usize >( ) ,
206- ) ;
207-
208- // Take into account the new unsubscriptions.
209- for ( room_id, room_map) in unsubscribed {
210- for ( event_id, thread_sub) in room_map {
211- guard
212- . client
213- . state_store ( )
214- . upsert_thread_subscription (
215- & room_id,
216- & event_id,
217- StoredThreadSubscription {
218- status : ThreadSubscriptionStatus :: Unsubscribed ,
219- bump_stamp : Some ( thread_sub. bump_stamp . into ( ) ) ,
220- } ,
221- )
222- . await ?;
223- }
224- }
225-
226- // Take into account the new subscriptions.
227- for ( room_id, room_map) in subscribed {
228- for ( event_id, thread_sub) in room_map {
229- guard
230- . client
231- . state_store ( )
232- . upsert_thread_subscription (
233- & room_id,
234- & event_id,
235- StoredThreadSubscription {
236- status : ThreadSubscriptionStatus :: Subscribed {
237- automatic : thread_sub. automatic ,
238- } ,
239- bump_stamp : Some ( thread_sub. bump_stamp . into ( ) ) ,
240- } ,
241- )
242- . await ?;
243- }
189+ if !updates. is_empty ( ) {
190+ trace ! (
191+ "saving {} new subscriptions and {} unsubscriptions" ,
192+ subscribed. values( ) . map( |by_room| by_room. len( ) ) . sum:: <usize >( ) ,
193+ unsubscribed. values( ) . map( |by_room| by_room. len( ) ) . sum:: <usize >( ) ,
194+ ) ;
195+ guard. client . state_store ( ) . upsert_thread_subscriptions ( updates) . await ?;
244196 }
245-
246197 Ok ( ( ) )
247198 }
248199
@@ -348,16 +299,27 @@ impl ThreadSubscriptionCatchup {
348299
349300 match client. send ( req) . await {
350301 Ok ( resp) => {
302+ // Precompute the updates so we don't hold the guard for too long.
303+ let updates = build_subscription_updates ( & resp. subscribed , & resp. unsubscribed ) ;
304+
351305 let guard = this
352306 . lock ( )
353307 . await
354308 . expect ( "a client instance is alive, so the locking should not fail" ) ;
355309
356- if let Err ( err) =
357- this. store_subscriptions ( & guard, resp. subscribed , resp. unsubscribed ) . await
358- {
359- warn ! ( "Failed to store caught up thread subscriptions: {err}" ) ;
360- continue ;
310+ if !updates. is_empty ( ) {
311+ trace ! (
312+ "saving {} new subscriptions and {} unsubscriptions" ,
313+ resp. subscribed. values( ) . map( |by_room| by_room. len( ) ) . sum:: <usize >( ) ,
314+ resp. unsubscribed. values( ) . map( |by_room| by_room. len( ) ) . sum:: <usize >( ) ,
315+ ) ;
316+
317+ if let Err ( err) =
318+ guard. client . state_store ( ) . upsert_thread_subscriptions ( updates) . await
319+ {
320+ warn ! ( "Failed to store caught up thread subscriptions: {err}" ) ;
321+ continue ;
322+ }
361323 }
362324
363325 // Refresh the tokens, as the list might have changed while we sent the
@@ -398,6 +360,47 @@ impl ThreadSubscriptionCatchup {
398360 }
399361}
400362
363+ /// Internal helper for building the thread subscription updates Vec.
364+ fn build_subscription_updates < ' a > (
365+ subscribed : & ' a BTreeMap < OwnedRoomId , BTreeMap < OwnedEventId , ThreadSubscription > > ,
366+ unsubscribed : & ' a BTreeMap < OwnedRoomId , BTreeMap < OwnedEventId , ThreadUnsubscription > > ,
367+ ) -> Vec < ( & ' a RoomId , & ' a EventId , StoredThreadSubscription ) > {
368+ let mut updates: Vec < ( & RoomId , & EventId , StoredThreadSubscription ) > =
369+ Vec :: with_capacity ( unsubscribed. len ( ) + subscribed. len ( ) ) ;
370+
371+ // Take into account the new unsubscriptions.
372+ for ( room_id, room_map) in unsubscribed {
373+ for ( event_id, thread_sub) in room_map {
374+ updates. push ( (
375+ room_id,
376+ event_id,
377+ StoredThreadSubscription {
378+ status : ThreadSubscriptionStatus :: Unsubscribed ,
379+ bump_stamp : Some ( thread_sub. bump_stamp . into ( ) ) ,
380+ } ,
381+ ) ) ;
382+ }
383+ }
384+
385+ // Take into account the new subscriptions.
386+ for ( room_id, room_map) in subscribed {
387+ for ( event_id, thread_sub) in room_map {
388+ updates. push ( (
389+ room_id,
390+ event_id,
391+ StoredThreadSubscription {
392+ status : ThreadSubscriptionStatus :: Subscribed {
393+ automatic : thread_sub. automatic ,
394+ } ,
395+ bump_stamp : Some ( thread_sub. bump_stamp . into ( ) ) ,
396+ } ,
397+ ) ) ;
398+ }
399+ }
400+
401+ updates
402+ }
403+
401404#[ cfg( test) ]
402405mod tests {
403406 use std:: ops:: Not as _;
0 commit comments