Skip to content

Commit 16375c8

Browse files
committed
Revert to getOrCreateFeed instead of queryFeeds
1 parent 09b06ab commit 16375c8

File tree

2 files changed

+31
-31
lines changed
  • stream-feeds-android-client/src

2 files changed

+31
-31
lines changed

stream-feeds-android-client/src/main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,22 @@
1515
*/
1616
package io.getstream.feeds.android.client.internal.client.reconnect
1717

18-
import io.getstream.android.core.api.filter.`in`
1918
import io.getstream.android.core.api.model.StreamRetryPolicy
2019
import io.getstream.android.core.api.model.connection.StreamConnectionState
2120
import io.getstream.android.core.api.model.exceptions.StreamClientException
2221
import io.getstream.android.core.api.processing.StreamRetryProcessor
2322
import io.getstream.feeds.android.client.api.model.FeedId
24-
import io.getstream.feeds.android.client.api.state.query.FeedsFilterField
25-
import io.getstream.feeds.android.client.api.state.query.FeedsQuery
23+
import io.getstream.feeds.android.client.api.state.query.FeedQuery
2624
import io.getstream.feeds.android.client.internal.repository.FeedsRepository
27-
import java.util.concurrent.ConcurrentHashMap
25+
import io.getstream.feeds.android.client.internal.repository.GetOrCreateInfo
2826
import kotlinx.coroutines.CoroutineScope
27+
import kotlinx.coroutines.async
28+
import kotlinx.coroutines.coroutineScope
2929
import kotlinx.coroutines.flow.Flow
3030
import kotlinx.coroutines.flow.MutableSharedFlow
3131
import kotlinx.coroutines.flow.filterIsInstance
3232
import kotlinx.coroutines.launch
33+
import java.util.concurrent.ConcurrentHashMap
3334

3435
/**
3536
* Handles re-watching feeds upon reconnection.
@@ -68,25 +69,20 @@ internal class FeedWatchHandler(
6869
}
6970

7071
private suspend fun rewatchAll() {
71-
val watchedCopy = watched.keys.toSet()
72-
73-
if (watchedCopy.isNotEmpty()) {
74-
val toRewatch = watchedCopy.map(FeedId::rawValue)
75-
76-
retryProcessor
77-
.retry(retryPolicy) {
78-
feedsRepository
79-
.queryFeeds(FeedsQuery(FeedsFilterField.feed.`in`(toRewatch)))
80-
.getOrThrow()
81-
}
82-
.onFailure { errorBus.emit(StreamFeedRewatchException(watchedCopy, it)) }
83-
}
72+
coroutineScope { watched.keys.map { id -> async { rewatch(id) } } }
8473
}
8574

75+
private suspend fun rewatch(id: FeedId): Result<GetOrCreateInfo> =
76+
retryProcessor
77+
.retry(retryPolicy) {
78+
feedsRepository.getOrCreateFeed(FeedQuery(id, watch = true)).getOrThrow()
79+
}
80+
.onFailure { errorBus.emit(StreamFeedRewatchException(id)) }
81+
8682
companion object {
8783
private val retryPolicy = StreamRetryPolicy.exponential(maxRetries = 3)
8884
}
8985
}
9086

91-
internal class StreamFeedRewatchException(val ids: Set<FeedId>, cause: Throwable? = null) :
92-
StreamClientException("Failed to rewatch feeds", cause)
87+
internal class StreamFeedRewatchException(val id: FeedId, cause: Throwable? = null) :
88+
StreamClientException("Failed to rewatch feed", cause)

stream-feeds-android-client/src/test/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandlerTest.kt

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,13 @@
1616
package io.getstream.feeds.android.client.internal.client.reconnect
1717

1818
import app.cash.turbine.test
19-
import io.getstream.android.core.api.filter.`in`
2019
import io.getstream.android.core.api.model.StreamRetryPolicy
2120
import io.getstream.android.core.api.model.connection.StreamConnectionState
2221
import io.getstream.android.core.api.model.exceptions.StreamClientException
2322
import io.getstream.android.core.api.processing.StreamRetryProcessor
2423
import io.getstream.android.core.result.runSafely
2524
import io.getstream.feeds.android.client.api.model.FeedId
26-
import io.getstream.feeds.android.client.api.state.query.FeedsFilterField
27-
import io.getstream.feeds.android.client.api.state.query.FeedsQuery
25+
import io.getstream.feeds.android.client.api.state.query.FeedQuery
2826
import io.getstream.feeds.android.client.internal.repository.FeedsRepository
2927
import io.mockk.called
3028
import io.mockk.coEvery
@@ -58,11 +56,10 @@ internal class FeedWatchHandlerTest {
5856

5957
@Test
6058
fun `on connected event, query feeds that are still being watched`() = runTest {
61-
coEvery { feedsRepository.queryFeeds(any()) } returns Result.success(mockk())
59+
coEvery { feedsRepository.getOrCreateFeed(any()) } returns Result.success(mockk())
6260
val id1 = FeedId("group", "id1")
6361
val id2 = FeedId("group", "id2")
6462
val id3 = FeedId("group", "id3")
65-
val expectedQuery = FeedsQuery(FeedsFilterField.feed.`in`("group:id1", "group:id3"))
6663

6764
handler.onStartWatching(id1)
6865
handler.onStartWatching(id2)
@@ -71,7 +68,10 @@ internal class FeedWatchHandlerTest {
7168

7269
connectionEvents.emit(StreamConnectionState.Connected(mockk(), "connection-id"))
7370

74-
coVerify { feedsRepository.queryFeeds(expectedQuery) }
71+
coVerify {
72+
feedsRepository.getOrCreateFeed(FeedQuery(id1))
73+
feedsRepository.getOrCreateFeed(FeedQuery(id3))
74+
}
7575
}
7676

7777
@Test
@@ -85,7 +85,11 @@ internal class FeedWatchHandlerTest {
8585
errorBus.test {
8686
connectionEvents.emit(StreamConnectionState.Connected(mockk(), "connection-id"))
8787

88-
assertEquals(setOf(id1, id2), (awaitItem() as? StreamFeedRewatchException)?.ids)
88+
val erroredIds = setOf(
89+
(awaitItem() as? StreamFeedRewatchException)?.id,
90+
(awaitItem() as? StreamFeedRewatchException)?.id,
91+
)
92+
assertEquals(setOf(id1, id2), erroredIds)
8993
}
9094
}
9195

@@ -95,11 +99,11 @@ internal class FeedWatchHandlerTest {
9599
handler.onStartWatching(id1)
96100

97101
listOf(
98-
StreamConnectionState.Idle,
99-
StreamConnectionState.Connecting.Opening("user-id"),
100-
StreamConnectionState.Connecting.Authenticating("user-id"),
101-
StreamConnectionState.Disconnected(),
102-
)
102+
StreamConnectionState.Idle,
103+
StreamConnectionState.Connecting.Opening("user-id"),
104+
StreamConnectionState.Connecting.Authenticating("user-id"),
105+
StreamConnectionState.Disconnected(),
106+
)
103107
.forEach { connectionEvents.emit(it) }
104108

105109
verify { feedsRepository wasNot called }

0 commit comments

Comments
 (0)