@@ -25,47 +25,46 @@ import kotlinx.coroutines.flow.Flow
25
25
import kotlinx.coroutines.flow.filterIsInstance
26
26
import kotlinx.coroutines.joinAll
27
27
import kotlinx.coroutines.launch
28
+ import java.util.concurrent.ConcurrentHashMap
28
29
29
30
/* *
30
31
* Handles re-watching feeds upon reconnection.
31
32
*
32
33
* Keeps track of feeds that are being watched and re-subscribes to them when the connection is
33
34
* re-established.
34
35
*
35
- * @property connectedEvents A [Flow] that emits events when the connection state changes to
36
+ * @property connectionState A [Flow] that emits events when the connection state changes to
36
37
* [StreamConnectionState.Connected].
37
38
* @property feedsRepository The [FeedsRepository] used to rewatch feeds on connection.
38
39
* @property scope The [CoroutineScope] in which to launch coroutines for re-watching feeds.
39
40
*/
40
41
internal class FeedWatchHandler (
41
- private val connectedEvents : Flow <StreamConnectionState >,
42
+ private val connectionState : Flow <StreamConnectionState >,
42
43
private val feedsRepository : FeedsRepository ,
43
44
scope : CoroutineScope ,
44
45
) {
45
- private val watched = mutableSetOf <FeedId >()
46
+ private val watched = ConcurrentHashMap <FeedId , Unit >()
46
47
47
48
init {
48
49
scope.launch {
49
- connectedEvents .filterIsInstance<StreamConnectionState .Connected >().collect {
50
+ connectionState .filterIsInstance<StreamConnectionState .Connected >().collect {
50
51
rewatchAll()
51
52
}
52
53
}
53
54
}
54
55
55
- @Synchronized
56
56
fun onStartWatching (feedId : FeedId ) {
57
- watched + = feedId
57
+ watched[feedId] = Unit
58
58
}
59
59
60
- @Synchronized
61
60
fun onStopWatching (feedId : FeedId ) {
62
61
watched - = feedId
63
62
}
64
63
65
64
private suspend fun rewatchAll () {
66
65
coroutineScope {
67
- synchronized( this @FeedWatchHandler) { watched.toSet() }
68
- .map { launch { feedsRepository.getOrCreateFeed(FeedQuery (it, watch = true )) } }
66
+ watched
67
+ .map { launch { feedsRepository.getOrCreateFeed(FeedQuery (it.key , watch = true )) } }
69
68
.joinAll()
70
69
}
71
70
}
0 commit comments