Skip to content

Commit f8fbdb1

Browse files
committed
Retry rewatching feeds on failures
1 parent 7a74ef4 commit f8fbdb1

File tree

3 files changed

+25
-2
lines changed

3 files changed

+25
-2
lines changed

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/Create.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ internal fun createFeedsClient(
253253
FeedWatchHandler(
254254
connectionState = client.connectionState,
255255
feedsRepository = feedsRepository,
256+
retryProcessor = StreamRetryProcessor(logProvider.taggedLogger("WatchHandler")),
256257
scope = clientScope,
257258
)
258259

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package io.getstream.feeds.android.client.internal.client.reconnect
1717

1818
import io.getstream.android.core.api.filter.`in`
19+
import io.getstream.android.core.api.model.StreamRetryPolicy
1920
import io.getstream.android.core.api.model.connection.StreamConnectionState
21+
import io.getstream.android.core.api.processing.StreamRetryProcessor
2022
import io.getstream.feeds.android.client.api.model.FeedId
2123
import io.getstream.feeds.android.client.api.state.query.FeedsFilterField
2224
import io.getstream.feeds.android.client.api.state.query.FeedsQuery
@@ -41,6 +43,7 @@ import kotlinx.coroutines.launch
4143
internal class FeedWatchHandler(
4244
private val connectionState: Flow<StreamConnectionState>,
4345
private val feedsRepository: FeedsRepository,
46+
private val retryProcessor: StreamRetryProcessor,
4447
scope: CoroutineScope,
4548
) {
4649
private val watched = ConcurrentHashMap<FeedId, Unit>()
@@ -64,7 +67,15 @@ internal class FeedWatchHandler(
6467
private suspend fun rewatchAll() {
6568
val toRewatch = watched.keys.mapTo(mutableListOf(), FeedId::rawValue)
6669
if (toRewatch.isNotEmpty()) {
67-
feedsRepository.queryFeeds(FeedsQuery(FeedsFilterField.feed.`in`(toRewatch)))
70+
retryProcessor.retry(retryPolicy) {
71+
feedsRepository
72+
.queryFeeds(FeedsQuery(FeedsFilterField.feed.`in`(toRewatch)))
73+
.getOrThrow()
74+
}
6875
}
6976
}
77+
78+
companion object {
79+
private val retryPolicy = StreamRetryPolicy.exponential(maxRetries = 3)
80+
}
7081
}

stream-feeds-android-client/src/test/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandlerTest.kt

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616
package io.getstream.feeds.android.client.internal.client.reconnect
1717

1818
import io.getstream.android.core.api.filter.`in`
19+
import io.getstream.android.core.api.model.StreamRetryPolicy
1920
import io.getstream.android.core.api.model.connection.StreamConnectionState
21+
import io.getstream.android.core.api.processing.StreamRetryProcessor
22+
import io.getstream.android.core.result.runSafely
2023
import io.getstream.feeds.android.client.api.model.FeedId
2124
import io.getstream.feeds.android.client.api.state.query.FeedsFilterField
2225
import io.getstream.feeds.android.client.api.state.query.FeedsQuery
@@ -38,8 +41,9 @@ internal class FeedWatchHandlerTest {
3841
private val connectionEvents = MutableSharedFlow<StreamConnectionState>()
3942
private val feedsRepository: FeedsRepository = mockk()
4043
private val scope = TestScope(UnconfinedTestDispatcher())
44+
private val retryProcessor: StreamRetryProcessor = TestRetryProcessor()
4145

42-
private val handler = FeedWatchHandler(connectionEvents, feedsRepository, scope)
46+
private val handler = FeedWatchHandler(connectionEvents, feedsRepository, retryProcessor, scope)
4347

4448
@Test
4549
fun `on connected event, get feeds that are still being watched`() = runTest {
@@ -74,4 +78,11 @@ internal class FeedWatchHandlerTest {
7478

7579
verify { feedsRepository wasNot called }
7680
}
81+
82+
private class TestRetryProcessor : StreamRetryProcessor {
83+
override suspend fun <T> retry(
84+
policy: StreamRetryPolicy,
85+
block: suspend () -> T,
86+
): Result<T> = runSafely { block() }
87+
}
7788
}

0 commit comments

Comments
 (0)