Skip to content

Commit fabc26f

Browse files
committed
Post rewatch errors in a bus and log them
1 parent f8fbdb1 commit fabc26f

File tree

5 files changed

+88
-12
lines changed

5 files changed

+88
-12
lines changed

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/Create.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import io.getstream.android.core.api.log.StreamLogger
2626
import io.getstream.android.core.api.log.StreamLoggerProvider
2727
import io.getstream.android.core.api.model.config.StreamClientSerializationConfig
2828
import io.getstream.android.core.api.model.config.StreamHttpConfig
29+
import io.getstream.android.core.api.model.exceptions.StreamClientException
2930
import io.getstream.android.core.api.model.value.StreamApiKey
3031
import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader
3132
import io.getstream.android.core.api.model.value.StreamUserId
@@ -69,6 +70,7 @@ import kotlinx.coroutines.CoroutineExceptionHandler
6970
import kotlinx.coroutines.CoroutineScope
7071
import kotlinx.coroutines.Dispatchers
7172
import kotlinx.coroutines.SupervisorJob
73+
import kotlinx.coroutines.flow.MutableSharedFlow
7274
import kotlinx.coroutines.plus
7375
import okhttp3.OkHttpClient
7476
import okhttp3.logging.HttpLoggingInterceptor
@@ -248,12 +250,14 @@ internal fun createFeedsClient(
248250
val pollsRepository = PollsRepositoryImpl(feedsApi)
249251

250252
val moderation = ModerationImpl(moderationRepository)
253+
val errorBus = MutableSharedFlow<StreamClientException>(extraBufferCapacity = 100)
251254

252255
val feedWatchHandler =
253256
FeedWatchHandler(
254257
connectionState = client.connectionState,
255258
feedsRepository = feedsRepository,
256259
retryProcessor = StreamRetryProcessor(logProvider.taggedLogger("WatchHandler")),
260+
errorBus = errorBus,
257261
scope = clientScope,
258262
)
259263

@@ -281,6 +285,8 @@ internal fun createFeedsClient(
281285
maxWeakSubscriptions = Integer.MAX_VALUE,
282286
),
283287
feedWatchHandler = feedWatchHandler,
288+
errorBus = errorBus,
289+
scope = clientScope,
284290
logger = logger,
285291
)
286292
}

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/FeedsClientImpl.kt

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.getstream.android.core.api.StreamClient
1919
import io.getstream.android.core.api.log.StreamLogger
2020
import io.getstream.android.core.api.model.connection.StreamConnectedUser
2121
import io.getstream.android.core.api.model.connection.StreamConnectionState
22+
import io.getstream.android.core.api.model.exceptions.StreamClientException
2223
import io.getstream.android.core.api.model.value.StreamApiKey
2324
import io.getstream.android.core.api.socket.listeners.StreamClientListener
2425
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
@@ -96,11 +97,13 @@ import io.getstream.feeds.android.network.models.DeleteActivitiesRequest
9697
import io.getstream.feeds.android.network.models.DeleteActivitiesResponse
9798
import io.getstream.feeds.android.network.models.ListDevicesResponse
9899
import io.getstream.feeds.android.network.models.WSEvent
100+
import kotlinx.coroutines.CoroutineScope
99101
import kotlinx.coroutines.channels.BufferOverflow
100102
import kotlinx.coroutines.flow.Flow
101103
import kotlinx.coroutines.flow.MutableSharedFlow
102104
import kotlinx.coroutines.flow.StateFlow
103105
import kotlinx.coroutines.flow.asSharedFlow
106+
import kotlinx.coroutines.launch
104107

105108
internal class FeedsClientImpl(
106109
private val coreClient: StreamClient,
@@ -121,6 +124,8 @@ internal class FeedsClientImpl(
121124
override val moderation: Moderation,
122125
private val feedWatchHandler: FeedWatchHandler,
123126
private val logger: StreamLogger,
127+
scope: CoroutineScope,
128+
errorBus: Flow<StreamClientException>,
124129
) : FeedsClient {
125130

126131
override val state: StateFlow<StreamConnectionState>
@@ -149,6 +154,12 @@ internal class FeedsClientImpl(
149154
}
150155
}
151156

157+
init {
158+
scope.launch {
159+
errorBus.collect { logger.e(it) { "[FeedsClient] Received error from bus" } }
160+
}
161+
}
162+
152163
override suspend fun connect(): Result<StreamConnectedUser> {
153164
if (user.type == UserAuthType.ANONYMOUS) {
154165
logger.e { "[connect] Attempting to connect an anonymous user, returning an error." }

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package io.getstream.feeds.android.client.internal.client.reconnect
1818
import io.getstream.android.core.api.filter.`in`
1919
import io.getstream.android.core.api.model.StreamRetryPolicy
2020
import io.getstream.android.core.api.model.connection.StreamConnectionState
21+
import io.getstream.android.core.api.model.exceptions.StreamClientException
2122
import io.getstream.android.core.api.processing.StreamRetryProcessor
2223
import io.getstream.feeds.android.client.api.model.FeedId
2324
import io.getstream.feeds.android.client.api.state.query.FeedsFilterField
@@ -26,6 +27,7 @@ import io.getstream.feeds.android.client.internal.repository.FeedsRepository
2627
import java.util.concurrent.ConcurrentHashMap
2728
import kotlinx.coroutines.CoroutineScope
2829
import kotlinx.coroutines.flow.Flow
30+
import kotlinx.coroutines.flow.MutableSharedFlow
2931
import kotlinx.coroutines.flow.filterIsInstance
3032
import kotlinx.coroutines.launch
3133

@@ -44,7 +46,8 @@ internal class FeedWatchHandler(
4446
private val connectionState: Flow<StreamConnectionState>,
4547
private val feedsRepository: FeedsRepository,
4648
private val retryProcessor: StreamRetryProcessor,
47-
scope: CoroutineScope,
49+
private val errorBus: MutableSharedFlow<StreamClientException>,
50+
private val scope: CoroutineScope,
4851
) {
4952
private val watched = ConcurrentHashMap<FeedId, Unit>()
5053

@@ -65,17 +68,25 @@ internal class FeedWatchHandler(
6568
}
6669

6770
private suspend fun rewatchAll() {
68-
val toRewatch = watched.keys.mapTo(mutableListOf(), FeedId::rawValue)
69-
if (toRewatch.isNotEmpty()) {
70-
retryProcessor.retry(retryPolicy) {
71-
feedsRepository
72-
.queryFeeds(FeedsQuery(FeedsFilterField.feed.`in`(toRewatch)))
73-
.getOrThrow()
74-
}
71+
val watchedCopy = watched.keys.toSet()
72+
73+
if (watchedCopy.isNotEmpty()) {
74+
val toRewatch = watchedCopy.map(FeedId::rawValue)
75+
76+
retryProcessor
77+
.retry(retryPolicy) {
78+
feedsRepository
79+
.queryFeeds(FeedsQuery(FeedsFilterField.feed.`in`(toRewatch)))
80+
.getOrThrow()
81+
}
82+
.onFailure { errorBus.emit(StreamFeedRewatchException(watchedCopy, it)) }
7583
}
7684
}
7785

7886
companion object {
7987
private val retryPolicy = StreamRetryPolicy.exponential(maxRetries = 3)
8088
}
8189
}
90+
91+
internal class StreamFeedRewatchException(val ids: Set<FeedId>, cause: Throwable? = null) :
92+
StreamClientException("Failed to rewatch feeds", cause)

stream-feeds-android-client/src/test/kotlin/io/getstream/feeds/android/client/internal/client/FeedsClientImplTest.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import io.getstream.android.core.api.StreamClient
1919
import io.getstream.android.core.api.log.StreamLogger
2020
import io.getstream.android.core.api.model.connection.StreamConnectedUser
2121
import io.getstream.android.core.api.model.connection.StreamConnectionState
22+
import io.getstream.android.core.api.model.exceptions.StreamClientException
2223
import io.getstream.android.core.api.model.value.StreamApiKey
2324
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
2425
import io.getstream.feeds.android.client.api.Moderation
@@ -48,13 +49,19 @@ import io.getstream.feeds.android.network.models.ListDevicesResponse
4849
import io.mockk.coEvery
4950
import io.mockk.every
5051
import io.mockk.mockk
52+
import io.mockk.verify
5153
import java.util.Date
54+
import kotlinx.coroutines.ExperimentalCoroutinesApi
55+
import kotlinx.coroutines.flow.MutableSharedFlow
5256
import kotlinx.coroutines.flow.MutableStateFlow
57+
import kotlinx.coroutines.test.TestScope
58+
import kotlinx.coroutines.test.UnconfinedTestDispatcher
5359
import kotlinx.coroutines.test.runTest
5460
import org.junit.Assert.assertEquals
5561
import org.junit.Assert.assertTrue
5662
import org.junit.Test
5763

64+
@OptIn(ExperimentalCoroutinesApi::class)
5865
internal class FeedsClientImplTest {
5966
private val coreClient: StreamClient = mockk(relaxed = true)
6067
private val feedsEventsSubscriptionManager: StreamSubscriptionManager<FeedsEventListener> =
@@ -75,6 +82,8 @@ internal class FeedsClientImplTest {
7582
private val moderation: Moderation = mockk(relaxed = true)
7683
private val feedWatchHandler: FeedWatchHandler = mockk(relaxed = true)
7784
private val logger: StreamLogger = mockk(relaxed = true)
85+
private val scope = TestScope(UnconfinedTestDispatcher())
86+
private val errorBus = MutableSharedFlow<StreamClientException>()
7887

7988
private val feedsClient: FeedsClientImpl =
8089
FeedsClientImpl(
@@ -96,8 +105,19 @@ internal class FeedsClientImplTest {
96105
moderation = moderation,
97106
feedWatchHandler = feedWatchHandler,
98107
logger = logger,
108+
scope = scope,
109+
errorBus = errorBus,
99110
)
100111

112+
@Test
113+
fun `on error bus event, then log it`() = runTest {
114+
val exception = StreamClientException("error!")
115+
116+
errorBus.emit(exception)
117+
118+
verify { logger.e(exception, any()) }
119+
}
120+
101121
@Test
102122
fun `connect when user is regular, then subscribes to client and connects successfully`() =
103123
runTest {
@@ -141,6 +161,8 @@ internal class FeedsClientImplTest {
141161
moderation = moderation,
142162
feedWatchHandler = feedWatchHandler,
143163
logger = logger,
164+
scope = scope,
165+
errorBus = errorBus,
144166
)
145167

146168
val result = anonymousClient.connect()

stream-feeds-android-client/src/test/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandlerTest.kt

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515
*/
1616
package io.getstream.feeds.android.client.internal.client.reconnect
1717

18+
import app.cash.turbine.test
1819
import io.getstream.android.core.api.filter.`in`
1920
import io.getstream.android.core.api.model.StreamRetryPolicy
2021
import io.getstream.android.core.api.model.connection.StreamConnectionState
22+
import io.getstream.android.core.api.model.exceptions.StreamClientException
2123
import io.getstream.android.core.api.processing.StreamRetryProcessor
2224
import io.getstream.android.core.result.runSafely
2325
import io.getstream.feeds.android.client.api.model.FeedId
@@ -34,20 +36,29 @@ import kotlinx.coroutines.flow.MutableSharedFlow
3436
import kotlinx.coroutines.test.TestScope
3537
import kotlinx.coroutines.test.UnconfinedTestDispatcher
3638
import kotlinx.coroutines.test.runTest
39+
import org.junit.Assert.assertEquals
3740
import org.junit.Test
3841

3942
@OptIn(ExperimentalCoroutinesApi::class)
4043
internal class FeedWatchHandlerTest {
4144
private val connectionEvents = MutableSharedFlow<StreamConnectionState>()
4245
private val feedsRepository: FeedsRepository = mockk()
43-
private val scope = TestScope(UnconfinedTestDispatcher())
4446
private val retryProcessor: StreamRetryProcessor = TestRetryProcessor()
47+
private val errorBus = MutableSharedFlow<StreamClientException>()
48+
private val scope = TestScope(UnconfinedTestDispatcher())
4549

46-
private val handler = FeedWatchHandler(connectionEvents, feedsRepository, retryProcessor, scope)
50+
private val handler =
51+
FeedWatchHandler(
52+
connectionState = connectionEvents,
53+
feedsRepository = feedsRepository,
54+
retryProcessor = retryProcessor,
55+
errorBus = errorBus,
56+
scope = scope,
57+
)
4758

4859
@Test
49-
fun `on connected event, get feeds that are still being watched`() = runTest {
50-
coEvery { feedsRepository.queryFeeds(any()) } returns Result.failure(Exception())
60+
fun `on connected event, query feeds that are still being watched`() = runTest {
61+
coEvery { feedsRepository.queryFeeds(any()) } returns Result.success(mockk())
5162
val id1 = FeedId("group", "id1")
5263
val id2 = FeedId("group", "id2")
5364
val id3 = FeedId("group", "id3")
@@ -63,6 +74,21 @@ internal class FeedWatchHandlerTest {
6374
coVerify { feedsRepository.queryFeeds(expectedQuery) }
6475
}
6576

77+
@Test
78+
fun `on connected event when query fails, post error`() = runTest {
79+
coEvery { feedsRepository.queryFeeds(any()) } returns Result.failure(Exception())
80+
val id1 = FeedId("group", "id1")
81+
val id2 = FeedId("group", "id2")
82+
handler.onStartWatching(id1)
83+
handler.onStartWatching(id2)
84+
85+
errorBus.test {
86+
connectionEvents.emit(StreamConnectionState.Connected(mockk(), "connection-id"))
87+
88+
assertEquals(setOf(id1, id2), (awaitItem() as? StreamFeedRewatchException)?.ids)
89+
}
90+
}
91+
6692
@Test
6793
fun `on non-connected event, do nothing`() = runTest {
6894
val id1 = FeedId("group", "id1")

0 commit comments

Comments
 (0)