Skip to content

Commit f85d0ef

Browse files
committed
Rewatch feeds on socket reconnection
1 parent 3bf7245 commit f85d0ef

File tree

7 files changed

+256
-11
lines changed

7 files changed

+256
-11
lines changed

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/Create.kt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ import io.getstream.feeds.android.client.api.model.FeedsConfig
4848
import io.getstream.feeds.android.client.api.model.User
4949
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
5050
import io.getstream.feeds.android.client.internal.client.reconnect.DefaultRetryStrategy
51+
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
5152
import io.getstream.feeds.android.client.internal.client.reconnect.lifecycle.StreamLifecycleObserver
5253
import io.getstream.feeds.android.client.internal.file.StreamFeedUploader
5354
import io.getstream.feeds.android.client.internal.http.FeedsSingleFlightApi
@@ -248,6 +249,13 @@ internal fun createFeedsClient(
248249

249250
val moderation = ModerationImpl(moderationRepository)
250251

252+
val feedWatchHandler =
253+
FeedWatchHandler(
254+
connectedEvents = client.connectionState,
255+
feedsRepository = feedsRepository,
256+
scope = clientScope,
257+
)
258+
251259
// Build client
252260
return FeedsClientImpl(
253261
apiKey = apiKey,
@@ -271,6 +279,7 @@ internal fun createFeedsClient(
271279
maxStrongSubscriptions = Integer.MAX_VALUE,
272280
maxWeakSubscriptions = Integer.MAX_VALUE,
273281
),
282+
feedWatchHandler = feedWatchHandler,
274283
logger = logger,
275284
)
276285
}

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/FeedsClientImpl.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ import io.getstream.feeds.android.client.api.state.query.ModerationConfigsQuery
6363
import io.getstream.feeds.android.client.api.state.query.PollVotesQuery
6464
import io.getstream.feeds.android.client.api.state.query.PollsQuery
6565
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
66+
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
6667
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
6768
import io.getstream.feeds.android.client.internal.repository.AppRepository
6869
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
@@ -118,6 +119,7 @@ internal class FeedsClientImpl(
118119
private val pollsRepository: PollsRepository,
119120
override val uploader: FeedUploader,
120121
override val moderation: Moderation,
122+
private val feedWatchHandler: FeedWatchHandler,
121123
private val logger: StreamLogger,
122124
) : FeedsClient {
123125

@@ -153,6 +155,7 @@ internal class FeedsClientImpl(
153155
return Result.failure(IllegalArgumentException("Anonymous users cannot connect."))
154156
}
155157
coreClient.subscribe(clientSubscription)
158+
connectionRecoveryHandler.start()
156159
return coreClient.connect()
157160
}
158161

@@ -175,6 +178,7 @@ internal class FeedsClientImpl(
175178
feedsRepository = feedsRepository,
176179
pollsRepository = pollsRepository,
177180
subscriptionManager = feedsEventsSubscriptionManager,
181+
feedWatchHandler = feedWatchHandler,
178182
)
179183

180184
override fun feedList(query: FeedsQuery): FeedList =
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-feeds-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.getstream.feeds.android.client.internal.client.reconnect
17+
18+
import io.getstream.android.core.api.model.connection.StreamConnectionState
19+
import io.getstream.feeds.android.client.api.model.FeedId
20+
import io.getstream.feeds.android.client.api.state.query.FeedQuery
21+
import io.getstream.feeds.android.client.internal.repository.FeedsRepository
22+
import kotlinx.coroutines.CoroutineScope
23+
import kotlinx.coroutines.coroutineScope
24+
import kotlinx.coroutines.flow.Flow
25+
import kotlinx.coroutines.flow.filterIsInstance
26+
import kotlinx.coroutines.joinAll
27+
import kotlinx.coroutines.launch
28+
29+
/**
30+
* Handles re-watching feeds upon reconnection.
31+
*
32+
* Keeps track of feeds that are being watched and re-subscribes to them when the connection is
33+
* re-established.
34+
*
35+
* @property connectedEvents A [Flow] that emits events when the connection state changes to
36+
* @property feedsRepository The [FeedsRepository] used to rewatch feeds on connection.
37+
* @property scope The [CoroutineScope] in which to launch coroutines for re-watching feeds.
38+
*/
39+
internal class FeedWatchHandler(
40+
private val connectedEvents: Flow<StreamConnectionState>,
41+
private val feedsRepository: FeedsRepository,
42+
scope: CoroutineScope,
43+
) {
44+
private val watched = mutableSetOf<FeedId>()
45+
46+
init {
47+
scope.launch {
48+
connectedEvents.filterIsInstance<StreamConnectionState.Connected>().collect {
49+
rewatchAll()
50+
}
51+
}
52+
}
53+
54+
@Synchronized
55+
fun onStartWatching(feedId: FeedId) {
56+
watched += feedId
57+
}
58+
59+
@Synchronized
60+
fun onStopWatching(feedId: FeedId) {
61+
watched -= feedId
62+
}
63+
64+
private suspend fun rewatchAll() {
65+
coroutineScope {
66+
synchronized(this) { watched.toSet() }
67+
.map { launch { feedsRepository.getOrCreateFeed(FeedQuery(it, watch = true)) } }
68+
.joinAll()
69+
}
70+
}
71+
}

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/state/FeedImpl.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import io.getstream.feeds.android.client.api.state.Feed
3232
import io.getstream.feeds.android.client.api.state.FeedState
3333
import io.getstream.feeds.android.client.api.state.query.FeedQuery
3434
import io.getstream.feeds.android.client.api.state.query.MembersQuery
35+
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
3536
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
3637
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
3738
import io.getstream.feeds.android.client.internal.repository.CommentsRepository
@@ -92,6 +93,7 @@ internal class FeedImpl(
9293
private val feedsRepository: FeedsRepository,
9394
private val pollsRepository: PollsRepository,
9495
private val subscriptionManager: StreamSubscriptionManager<FeedsEventListener>,
96+
private val feedWatchHandler: FeedWatchHandler,
9597
) : Feed {
9698

9799
private val memberList: MemberListImpl =
@@ -127,13 +129,17 @@ internal class FeedImpl(
127129
get() = _state
128130

129131
override suspend fun getOrCreate(): Result<FeedData> {
132+
if (query.watch) {
133+
feedWatchHandler.onStartWatching(query.fid)
134+
}
130135
return feedsRepository
131136
.getOrCreateFeed(query)
132137
.onSuccess { _state.onQueryFeed(it) }
133138
.map { it.feed }
134139
}
135140

136141
override suspend fun stopWatching(): Result<Unit> {
142+
feedWatchHandler.onStopWatching(query.fid)
137143
return feedsRepository.stopWatching(groupId = group, feedId = id)
138144
}
139145

stream-feeds-android-client/src/test/kotlin/io/getstream/feeds/android/client/internal/client/FeedsClientImplTest.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import io.getstream.feeds.android.client.api.model.PushNotificationsProvider
2727
import io.getstream.feeds.android.client.api.model.User
2828
import io.getstream.feeds.android.client.api.model.UserAuthType
2929
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
30+
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
3031
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
3132
import io.getstream.feeds.android.client.internal.repository.AppRepository
3233
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
@@ -72,6 +73,7 @@ internal class FeedsClientImplTest {
7273
private val pollsRepository: PollsRepository = mockk(relaxed = true)
7374
private val uploader: FeedUploader = mockk(relaxed = true)
7475
private val moderation: Moderation = mockk(relaxed = true)
76+
private val feedWatchHandler: FeedWatchHandler = mockk(relaxed = true)
7577
private val logger: StreamLogger = mockk(relaxed = true)
7678

7779
private val feedsClient: FeedsClientImpl =
@@ -92,6 +94,7 @@ internal class FeedsClientImplTest {
9294
pollsRepository = pollsRepository,
9395
uploader = uploader,
9496
moderation = moderation,
97+
feedWatchHandler = feedWatchHandler,
9598
logger = logger,
9699
)
97100

@@ -136,6 +139,7 @@ internal class FeedsClientImplTest {
136139
pollsRepository = pollsRepository,
137140
uploader = uploader,
138141
moderation = moderation,
142+
feedWatchHandler = feedWatchHandler,
139143
logger = logger,
140144
)
141145

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
3+
*
4+
* Licensed under the Stream License;
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://github.com/GetStream/stream-feeds-android/blob/main/LICENSE
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.getstream.feeds.android.client.internal.client.reconnect
17+
18+
import io.getstream.android.core.api.model.connection.StreamConnectionState
19+
import io.getstream.feeds.android.client.api.model.FeedId
20+
import io.getstream.feeds.android.client.api.state.query.FeedQuery
21+
import io.getstream.feeds.android.client.internal.repository.FeedsRepository
22+
import io.mockk.called
23+
import io.mockk.coEvery
24+
import io.mockk.coVerify
25+
import io.mockk.mockk
26+
import io.mockk.verify
27+
import kotlinx.coroutines.ExperimentalCoroutinesApi
28+
import kotlinx.coroutines.flow.MutableSharedFlow
29+
import kotlinx.coroutines.test.TestScope
30+
import kotlinx.coroutines.test.UnconfinedTestDispatcher
31+
import kotlinx.coroutines.test.runTest
32+
import org.junit.Test
33+
34+
@OptIn(ExperimentalCoroutinesApi::class)
35+
internal class FeedWatchHandlerTest {
36+
private val connectionEvents = MutableSharedFlow<StreamConnectionState>()
37+
private val feedsRepository: FeedsRepository = mockk()
38+
private val scope = TestScope(UnconfinedTestDispatcher())
39+
40+
private val handler = FeedWatchHandler(connectionEvents, feedsRepository, scope)
41+
42+
@Test
43+
fun `on connected event, get feeds that are still being watched`() = runTest {
44+
coEvery { feedsRepository.getOrCreateFeed(any()) } returns Result.failure(Exception())
45+
val id1 = FeedId("group", "id1")
46+
val id2 = FeedId("group", "id2")
47+
val id3 = FeedId("group", "id3")
48+
49+
handler.onStartWatching(id1)
50+
handler.onStartWatching(id2)
51+
handler.onStartWatching(id3)
52+
handler.onStopWatching(id2)
53+
54+
connectionEvents.emit(StreamConnectionState.Connected(mockk(), "connection-id"))
55+
56+
coVerify {
57+
feedsRepository.getOrCreateFeed(FeedQuery(id1))
58+
feedsRepository.getOrCreateFeed(FeedQuery(id3))
59+
}
60+
}
61+
62+
@Test
63+
fun `on non-connected event, do nothing`() = runTest {
64+
val id1 = FeedId("group", "id1")
65+
handler.onStartWatching(id1)
66+
67+
listOf(
68+
StreamConnectionState.Idle,
69+
StreamConnectionState.Connecting.Opening("user-id"),
70+
StreamConnectionState.Connecting.Authenticating("user-id"),
71+
StreamConnectionState.Disconnected(),
72+
)
73+
.forEach { connectionEvents.emit(it) }
74+
75+
verify { feedsRepository wasNot called }
76+
}
77+
}

0 commit comments

Comments
 (0)