Skip to content

Commit 22b9962

Browse files
Rewatch feeds on socket reconnection (#61)
* Rewatch feeds on socket reconnection * Use ConcurrentHashMap and rename connection flow property in FeedWatchHandler * Use queryFeeds instead of getOrCreateFeed * Retry rewatching feeds on failures * Post rewatch errors in a bus and log them * Revert to getOrCreateFeed instead of queryFeeds --------- Co-authored-by: Aleksandar Apostolov <[email protected]>
1 parent 0582754 commit 22b9962

File tree

7 files changed

+355
-11
lines changed

7 files changed

+355
-11
lines changed

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/Create.kt

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import io.getstream.android.core.api.log.StreamLogger
2626
import io.getstream.android.core.api.log.StreamLoggerProvider
2727
import io.getstream.android.core.api.model.config.StreamClientSerializationConfig
2828
import io.getstream.android.core.api.model.config.StreamHttpConfig
29+
import io.getstream.android.core.api.model.exceptions.StreamClientException
2930
import io.getstream.android.core.api.model.value.StreamApiKey
3031
import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader
3132
import io.getstream.android.core.api.model.value.StreamUserId
@@ -48,6 +49,7 @@ import io.getstream.feeds.android.client.api.model.FeedsConfig
4849
import io.getstream.feeds.android.client.api.model.User
4950
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
5051
import io.getstream.feeds.android.client.internal.client.reconnect.DefaultRetryStrategy
52+
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
5153
import io.getstream.feeds.android.client.internal.client.reconnect.lifecycle.StreamLifecycleObserver
5254
import io.getstream.feeds.android.client.internal.file.StreamFeedUploader
5355
import io.getstream.feeds.android.client.internal.http.FeedsSingleFlightApi
@@ -68,6 +70,7 @@ import kotlinx.coroutines.CoroutineExceptionHandler
6870
import kotlinx.coroutines.CoroutineScope
6971
import kotlinx.coroutines.Dispatchers
7072
import kotlinx.coroutines.SupervisorJob
73+
import kotlinx.coroutines.flow.MutableSharedFlow
7174
import kotlinx.coroutines.plus
7275
import okhttp3.OkHttpClient
7376
import okhttp3.logging.HttpLoggingInterceptor
@@ -247,6 +250,16 @@ internal fun createFeedsClient(
247250
val pollsRepository = PollsRepositoryImpl(feedsApi)
248251

249252
val moderation = ModerationImpl(moderationRepository)
253+
val errorBus = MutableSharedFlow<StreamClientException>(extraBufferCapacity = 100)
254+
255+
val feedWatchHandler =
256+
FeedWatchHandler(
257+
connectionState = client.connectionState,
258+
feedsRepository = feedsRepository,
259+
retryProcessor = StreamRetryProcessor(logProvider.taggedLogger("WatchHandler")),
260+
errorBus = errorBus,
261+
scope = clientScope,
262+
)
250263

251264
// Build client
252265
return FeedsClientImpl(
@@ -271,6 +284,9 @@ internal fun createFeedsClient(
271284
maxStrongSubscriptions = Integer.MAX_VALUE,
272285
maxWeakSubscriptions = Integer.MAX_VALUE,
273286
),
287+
feedWatchHandler = feedWatchHandler,
288+
errorBus = errorBus,
289+
scope = clientScope,
274290
logger = logger,
275291
)
276292
}

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/FeedsClientImpl.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.getstream.android.core.api.StreamClient
1919
import io.getstream.android.core.api.log.StreamLogger
2020
import io.getstream.android.core.api.model.connection.StreamConnectedUser
2121
import io.getstream.android.core.api.model.connection.StreamConnectionState
22+
import io.getstream.android.core.api.model.exceptions.StreamClientException
2223
import io.getstream.android.core.api.model.value.StreamApiKey
2324
import io.getstream.android.core.api.socket.listeners.StreamClientListener
2425
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
@@ -63,6 +64,7 @@ import io.getstream.feeds.android.client.api.state.query.ModerationConfigsQuery
6364
import io.getstream.feeds.android.client.api.state.query.PollVotesQuery
6465
import io.getstream.feeds.android.client.api.state.query.PollsQuery
6566
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
67+
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
6668
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
6769
import io.getstream.feeds.android.client.internal.repository.AppRepository
6870
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
@@ -95,11 +97,13 @@ import io.getstream.feeds.android.network.models.DeleteActivitiesRequest
9597
import io.getstream.feeds.android.network.models.DeleteActivitiesResponse
9698
import io.getstream.feeds.android.network.models.ListDevicesResponse
9799
import io.getstream.feeds.android.network.models.WSEvent
100+
import kotlinx.coroutines.CoroutineScope
98101
import kotlinx.coroutines.channels.BufferOverflow
99102
import kotlinx.coroutines.flow.Flow
100103
import kotlinx.coroutines.flow.MutableSharedFlow
101104
import kotlinx.coroutines.flow.StateFlow
102105
import kotlinx.coroutines.flow.asSharedFlow
106+
import kotlinx.coroutines.launch
103107

104108
internal class FeedsClientImpl(
105109
private val coreClient: StreamClient,
@@ -118,7 +122,10 @@ internal class FeedsClientImpl(
118122
private val pollsRepository: PollsRepository,
119123
override val uploader: FeedUploader,
120124
override val moderation: Moderation,
125+
private val feedWatchHandler: FeedWatchHandler,
121126
private val logger: StreamLogger,
127+
scope: CoroutineScope,
128+
errorBus: Flow<StreamClientException>,
122129
) : FeedsClient {
123130

124131
override val state: StateFlow<StreamConnectionState>
@@ -147,12 +154,19 @@ internal class FeedsClientImpl(
147154
}
148155
}
149156

157+
init {
158+
scope.launch {
159+
errorBus.collect { logger.e(it) { "[FeedsClient] Received error from bus" } }
160+
}
161+
}
162+
150163
override suspend fun connect(): Result<StreamConnectedUser> {
151164
if (user.type == UserAuthType.ANONYMOUS) {
152165
logger.e { "[connect] Attempting to connect an anonymous user, returning an error." }
153166
return Result.failure(IllegalArgumentException("Anonymous users cannot connect."))
154167
}
155168
coreClient.subscribe(clientSubscription)
169+
connectionRecoveryHandler.start()
156170
return coreClient.connect()
157171
}
158172

@@ -175,6 +189,7 @@ internal class FeedsClientImpl(
175189
feedsRepository = feedsRepository,
176190
pollsRepository = pollsRepository,
177191
subscriptionManager = feedsEventsSubscriptionManager,
192+
feedWatchHandler = feedWatchHandler,
178193
)
179194

180195
override fun feedList(query: FeedsQuery): FeedList =
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-feeds-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.getstream.feeds.android.client.internal.client.reconnect
17+
18+
import io.getstream.android.core.api.model.StreamRetryPolicy
19+
import io.getstream.android.core.api.model.connection.StreamConnectionState
20+
import io.getstream.android.core.api.model.exceptions.StreamClientException
21+
import io.getstream.android.core.api.processing.StreamRetryProcessor
22+
import io.getstream.feeds.android.client.api.model.FeedId
23+
import io.getstream.feeds.android.client.api.state.query.FeedQuery
24+
import io.getstream.feeds.android.client.internal.repository.FeedsRepository
25+
import io.getstream.feeds.android.client.internal.repository.GetOrCreateInfo
26+
import java.util.concurrent.ConcurrentHashMap
27+
import kotlinx.coroutines.CoroutineScope
28+
import kotlinx.coroutines.async
29+
import kotlinx.coroutines.coroutineScope
30+
import kotlinx.coroutines.flow.Flow
31+
import kotlinx.coroutines.flow.MutableSharedFlow
32+
import kotlinx.coroutines.flow.filterIsInstance
33+
import kotlinx.coroutines.launch
34+
35+
/**
36+
* Handles re-watching feeds upon reconnection.
37+
*
38+
* Keeps track of feeds that are being watched and re-subscribes to them when the connection is
39+
* re-established.
40+
*
41+
* @property connectionState A [Flow] that emits events when the connection state changes to
42+
* [StreamConnectionState.Connected].
43+
* @property feedsRepository The [FeedsRepository] used to rewatch feeds on connection.
44+
* @property scope The [CoroutineScope] in which to launch coroutines for re-watching feeds.
45+
*/
46+
internal class FeedWatchHandler(
47+
private val connectionState: Flow<StreamConnectionState>,
48+
private val feedsRepository: FeedsRepository,
49+
private val retryProcessor: StreamRetryProcessor,
50+
private val errorBus: MutableSharedFlow<StreamClientException>,
51+
private val scope: CoroutineScope,
52+
) {
53+
private val watched = ConcurrentHashMap<FeedId, Unit>()
54+
55+
init {
56+
scope.launch {
57+
connectionState.filterIsInstance<StreamConnectionState.Connected>().collect {
58+
rewatchAll()
59+
}
60+
}
61+
}
62+
63+
fun onStartWatching(feedId: FeedId) {
64+
watched[feedId] = Unit
65+
}
66+
67+
fun onStopWatching(feedId: FeedId) {
68+
watched -= feedId
69+
}
70+
71+
private suspend fun rewatchAll() {
72+
coroutineScope { watched.keys.map { id -> async { rewatch(id) } } }
73+
}
74+
75+
private suspend fun rewatch(id: FeedId): Result<GetOrCreateInfo> =
76+
retryProcessor
77+
.retry(retryPolicy) {
78+
feedsRepository.getOrCreateFeed(FeedQuery(id, watch = true)).getOrThrow()
79+
}
80+
.onFailure { errorBus.emit(StreamFeedRewatchException(id)) }
81+
82+
companion object {
83+
private val retryPolicy = StreamRetryPolicy.exponential(maxRetries = 3)
84+
}
85+
}
86+
87+
internal class StreamFeedRewatchException(val id: FeedId, cause: Throwable? = null) :
88+
StreamClientException("Failed to rewatch feed", cause)

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/state/FeedImpl.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import io.getstream.feeds.android.client.api.state.Feed
3232
import io.getstream.feeds.android.client.api.state.FeedState
3333
import io.getstream.feeds.android.client.api.state.query.FeedQuery
3434
import io.getstream.feeds.android.client.api.state.query.MembersQuery
35+
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
3536
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
3637
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
3738
import io.getstream.feeds.android.client.internal.repository.CommentsRepository
@@ -92,6 +93,7 @@ internal class FeedImpl(
9293
private val feedsRepository: FeedsRepository,
9394
private val pollsRepository: PollsRepository,
9495
private val subscriptionManager: StreamSubscriptionManager<FeedsEventListener>,
96+
private val feedWatchHandler: FeedWatchHandler,
9597
) : Feed {
9698

9799
private val memberList: MemberListImpl =
@@ -127,13 +129,17 @@ internal class FeedImpl(
127129
get() = _state
128130

129131
override suspend fun getOrCreate(): Result<FeedData> {
132+
if (query.watch) {
133+
feedWatchHandler.onStartWatching(query.fid)
134+
}
130135
return feedsRepository
131136
.getOrCreateFeed(query)
132137
.onSuccess { _state.onQueryFeed(it) }
133138
.map { it.feed }
134139
}
135140

136141
override suspend fun stopWatching(): Result<Unit> {
142+
feedWatchHandler.onStopWatching(query.fid)
137143
return feedsRepository.stopWatching(groupId = group, feedId = id)
138144
}
139145

stream-feeds-android-client/src/test/kotlin/io/getstream/feeds/android/client/internal/client/FeedsClientImplTest.kt

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.getstream.android.core.api.StreamClient
1919
import io.getstream.android.core.api.log.StreamLogger
2020
import io.getstream.android.core.api.model.connection.StreamConnectedUser
2121
import io.getstream.android.core.api.model.connection.StreamConnectionState
22+
import io.getstream.android.core.api.model.exceptions.StreamClientException
2223
import io.getstream.android.core.api.model.value.StreamApiKey
2324
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
2425
import io.getstream.feeds.android.client.api.Moderation
@@ -27,6 +28,7 @@ import io.getstream.feeds.android.client.api.model.PushNotificationsProvider
2728
import io.getstream.feeds.android.client.api.model.User
2829
import io.getstream.feeds.android.client.api.model.UserAuthType
2930
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
31+
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
3032
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
3133
import io.getstream.feeds.android.client.internal.repository.AppRepository
3234
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
@@ -47,13 +49,19 @@ import io.getstream.feeds.android.network.models.ListDevicesResponse
4749
import io.mockk.coEvery
4850
import io.mockk.every
4951
import io.mockk.mockk
52+
import io.mockk.verify
5053
import java.util.Date
54+
import kotlinx.coroutines.ExperimentalCoroutinesApi
55+
import kotlinx.coroutines.flow.MutableSharedFlow
5156
import kotlinx.coroutines.flow.MutableStateFlow
57+
import kotlinx.coroutines.test.TestScope
58+
import kotlinx.coroutines.test.UnconfinedTestDispatcher
5259
import kotlinx.coroutines.test.runTest
5360
import org.junit.Assert.assertEquals
5461
import org.junit.Assert.assertTrue
5562
import org.junit.Test
5663

64+
@OptIn(ExperimentalCoroutinesApi::class)
5765
internal class FeedsClientImplTest {
5866
private val coreClient: StreamClient = mockk(relaxed = true)
5967
private val feedsEventsSubscriptionManager: StreamSubscriptionManager<FeedsEventListener> =
@@ -72,7 +80,10 @@ internal class FeedsClientImplTest {
7280
private val pollsRepository: PollsRepository = mockk(relaxed = true)
7381
private val uploader: FeedUploader = mockk(relaxed = true)
7482
private val moderation: Moderation = mockk(relaxed = true)
83+
private val feedWatchHandler: FeedWatchHandler = mockk(relaxed = true)
7584
private val logger: StreamLogger = mockk(relaxed = true)
85+
private val scope = TestScope(UnconfinedTestDispatcher())
86+
private val errorBus = MutableSharedFlow<StreamClientException>()
7687

7788
private val feedsClient: FeedsClientImpl =
7889
FeedsClientImpl(
@@ -92,9 +103,21 @@ internal class FeedsClientImplTest {
92103
pollsRepository = pollsRepository,
93104
uploader = uploader,
94105
moderation = moderation,
106+
feedWatchHandler = feedWatchHandler,
95107
logger = logger,
108+
scope = scope,
109+
errorBus = errorBus,
96110
)
97111

112+
@Test
113+
fun `on error bus event, then log it`() = runTest {
114+
val exception = StreamClientException("error!")
115+
116+
errorBus.emit(exception)
117+
118+
verify { logger.e(exception, any()) }
119+
}
120+
98121
@Test
99122
fun `connect when user is regular, then subscribes to client and connects successfully`() =
100123
runTest {
@@ -136,7 +159,10 @@ internal class FeedsClientImplTest {
136159
pollsRepository = pollsRepository,
137160
uploader = uploader,
138161
moderation = moderation,
162+
feedWatchHandler = feedWatchHandler,
139163
logger = logger,
164+
scope = scope,
165+
errorBus = errorBus,
140166
)
141167

142168
val result = anonymousClient.connect()

0 commit comments

Comments
 (0)