Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import io.getstream.android.core.api.log.StreamLogger
import io.getstream.android.core.api.log.StreamLoggerProvider
import io.getstream.android.core.api.model.config.StreamClientSerializationConfig
import io.getstream.android.core.api.model.config.StreamHttpConfig
import io.getstream.android.core.api.model.exceptions.StreamClientException
import io.getstream.android.core.api.model.value.StreamApiKey
import io.getstream.android.core.api.model.value.StreamHttpClientInfoHeader
import io.getstream.android.core.api.model.value.StreamUserId
Expand All @@ -48,6 +49,7 @@ import io.getstream.feeds.android.client.api.model.FeedsConfig
import io.getstream.feeds.android.client.api.model.User
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
import io.getstream.feeds.android.client.internal.client.reconnect.DefaultRetryStrategy
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
import io.getstream.feeds.android.client.internal.client.reconnect.lifecycle.StreamLifecycleObserver
import io.getstream.feeds.android.client.internal.file.StreamFeedUploader
import io.getstream.feeds.android.client.internal.http.FeedsSingleFlightApi
Expand All @@ -68,6 +70,7 @@ import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.plus
import okhttp3.OkHttpClient
import okhttp3.logging.HttpLoggingInterceptor
Expand Down Expand Up @@ -247,6 +250,16 @@ internal fun createFeedsClient(
val pollsRepository = PollsRepositoryImpl(feedsApi)

val moderation = ModerationImpl(moderationRepository)
val errorBus = MutableSharedFlow<StreamClientException>(extraBufferCapacity = 100)

val feedWatchHandler =
FeedWatchHandler(
connectionState = client.connectionState,
feedsRepository = feedsRepository,
retryProcessor = StreamRetryProcessor(logProvider.taggedLogger("WatchHandler")),
errorBus = errorBus,
scope = clientScope,
)

// Build client
return FeedsClientImpl(
Expand All @@ -271,6 +284,9 @@ internal fun createFeedsClient(
maxStrongSubscriptions = Integer.MAX_VALUE,
maxWeakSubscriptions = Integer.MAX_VALUE,
),
feedWatchHandler = feedWatchHandler,
errorBus = errorBus,
scope = clientScope,
logger = logger,
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.getstream.android.core.api.StreamClient
import io.getstream.android.core.api.log.StreamLogger
import io.getstream.android.core.api.model.connection.StreamConnectedUser
import io.getstream.android.core.api.model.connection.StreamConnectionState
import io.getstream.android.core.api.model.exceptions.StreamClientException
import io.getstream.android.core.api.model.value.StreamApiKey
import io.getstream.android.core.api.socket.listeners.StreamClientListener
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
Expand Down Expand Up @@ -63,6 +64,7 @@ import io.getstream.feeds.android.client.api.state.query.ModerationConfigsQuery
import io.getstream.feeds.android.client.api.state.query.PollVotesQuery
import io.getstream.feeds.android.client.api.state.query.PollsQuery
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
import io.getstream.feeds.android.client.internal.repository.AppRepository
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
Expand Down Expand Up @@ -95,11 +97,13 @@ import io.getstream.feeds.android.network.models.DeleteActivitiesRequest
import io.getstream.feeds.android.network.models.DeleteActivitiesResponse
import io.getstream.feeds.android.network.models.ListDevicesResponse
import io.getstream.feeds.android.network.models.WSEvent
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch

internal class FeedsClientImpl(
private val coreClient: StreamClient,
Expand All @@ -118,7 +122,10 @@ internal class FeedsClientImpl(
private val pollsRepository: PollsRepository,
override val uploader: FeedUploader,
override val moderation: Moderation,
private val feedWatchHandler: FeedWatchHandler,
private val logger: StreamLogger,
scope: CoroutineScope,
errorBus: Flow<StreamClientException>,
) : FeedsClient {

override val state: StateFlow<StreamConnectionState>
Expand Down Expand Up @@ -147,12 +154,19 @@ internal class FeedsClientImpl(
}
}

init {
scope.launch {
errorBus.collect { logger.e(it) { "[FeedsClient] Received error from bus" } }
}
}

override suspend fun connect(): Result<StreamConnectedUser> {
if (user.type == UserAuthType.ANONYMOUS) {
logger.e { "[connect] Attempting to connect an anonymous user, returning an error." }
return Result.failure(IllegalArgumentException("Anonymous users cannot connect."))
}
coreClient.subscribe(clientSubscription)
connectionRecoveryHandler.start()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to just call rewatchAll here in case we have non empty list of watched feeds. Since if connect goes trough in the coreClient we will definitely have a new connection ID which will require re-watching??

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a change that is not directly tied to the rewatch logic. I discovered that we lost this call when migrating to the core client, so we were never reconnecting the socket automatically.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not concerned about connectionRecoveryHandler.start() it should be there.
My point is that right after that we can call rewatchAll instead of initing a collector in the watch handler.

return coreClient.connect()
}

Expand All @@ -175,6 +189,7 @@ internal class FeedsClientImpl(
feedsRepository = feedsRepository,
pollsRepository = pollsRepository,
subscriptionManager = feedsEventsSubscriptionManager,
feedWatchHandler = feedWatchHandler,
)

override fun feedList(query: FeedsQuery): FeedList =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2014-2025 Stream.io Inc. All rights reserved.
*
* Licensed under the Stream License;
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://github.com/GetStream/stream-feeds-android/blob/main/LICENSE
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.getstream.feeds.android.client.internal.client.reconnect

import io.getstream.android.core.api.model.StreamRetryPolicy
import io.getstream.android.core.api.model.connection.StreamConnectionState
import io.getstream.android.core.api.model.exceptions.StreamClientException
import io.getstream.android.core.api.processing.StreamRetryProcessor
import io.getstream.feeds.android.client.api.model.FeedId
import io.getstream.feeds.android.client.api.state.query.FeedQuery
import io.getstream.feeds.android.client.internal.repository.FeedsRepository
import io.getstream.feeds.android.client.internal.repository.GetOrCreateInfo
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.launch

/**
* Handles re-watching feeds upon reconnection.
*
* Keeps track of feeds that are being watched and re-subscribes to them when the connection is
* re-established.
*
* @property connectionState A [Flow] that emits events when the connection state changes to
* [StreamConnectionState.Connected].
* @property feedsRepository The [FeedsRepository] used to rewatch feeds on connection.
* @property scope The [CoroutineScope] in which to launch coroutines for re-watching feeds.
*/
internal class FeedWatchHandler(
private val connectionState: Flow<StreamConnectionState>,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Maybe its better for the FeedsWatchhandler to be dumb and just collect the bunch of feed IDs and wire to the public rewatchAll() : Result<Whatever>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To do that, we'd need to add a rewatchAll call everywhere we connect the core client (after we validated that the connection was successful), which atm would be in FeedsClientImpl and ConnectionRecoveryHandler. This means that if we ever add more code calling .connect(), we need to remember to also call rewatchAll. It's more error prone than using the socket state as the source of truth

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ConnectionRecoveryHandler I think should not call connect() directly to the core client, but that is another topic.

private val feedsRepository: FeedsRepository,
private val retryProcessor: StreamRetryProcessor,
private val errorBus: MutableSharedFlow<StreamClientException>,
private val scope: CoroutineScope,
) {
private val watched = ConcurrentHashMap<FeedId, Unit>()

init {
scope.launch {
connectionState.filterIsInstance<StreamConnectionState.Connected>().collect {
rewatchAll()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rewatchAll() swallows the potential failure of the request.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know but what's the alternative? I'm going to change it to queryFeeds as you suggested above, so we can easily return a Result, but we're still executing this code in background automatically, so clients won't receive anything anyway. Maybe we can log failures?

}
}
}

fun onStartWatching(feedId: FeedId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:maybe startWatching(feedID) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put on because the watch handler is not really doing the watching, it's just being notified that we started watching (by executing the actual query)

watched[feedId] = Unit
}

fun onStopWatching(feedId: FeedId) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: same as the other stopWatching(feedId)

watched -= feedId
}

private suspend fun rewatchAll() {
coroutineScope { watched.keys.map { id -> async { rewatch(id) } } }
}

private suspend fun rewatch(id: FeedId): Result<GetOrCreateInfo> =
retryProcessor
.retry(retryPolicy) {
feedsRepository.getOrCreateFeed(FeedQuery(id, watch = true)).getOrThrow()
}
.onFailure { errorBus.emit(StreamFeedRewatchException(id)) }

companion object {
private val retryPolicy = StreamRetryPolicy.exponential(maxRetries = 3)
}
}

internal class StreamFeedRewatchException(val id: FeedId, cause: Throwable? = null) :
StreamClientException("Failed to rewatch feed", cause)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import io.getstream.feeds.android.client.api.state.Feed
import io.getstream.feeds.android.client.api.state.FeedState
import io.getstream.feeds.android.client.api.state.query.FeedQuery
import io.getstream.feeds.android.client.api.state.query.MembersQuery
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
import io.getstream.feeds.android.client.internal.repository.CommentsRepository
Expand Down Expand Up @@ -92,6 +93,7 @@ internal class FeedImpl(
private val feedsRepository: FeedsRepository,
private val pollsRepository: PollsRepository,
private val subscriptionManager: StreamSubscriptionManager<FeedsEventListener>,
private val feedWatchHandler: FeedWatchHandler,
) : Feed {

private val memberList: MemberListImpl =
Expand Down Expand Up @@ -127,13 +129,17 @@ internal class FeedImpl(
get() = _state

override suspend fun getOrCreate(): Result<FeedData> {
if (query.watch) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am probably oversimplifying this whole logic, but wouldn't it be possible to handle the rewatching inside the FeedImpl class?
The FeedImpl already listens for WS events, so we could potentially react to ConnectedEvent and re-run the getOrCreate method (if watch = true). We would probably need to have some logic/guards for making sure that we call that only after a terminated + restored connection, but it would potentially reduce the complexity of the whole problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we receive ConnectedEvent, or at least I couldn't find it. Maybe it was there before integrating core?

We could still inject Flow<StreamConnectionState> and collect it inside FeedImpl, but I think we'd have a coordination problem in this scenario:

  1. Client creates two instances of the same feed, both having watch=true
  2. Client calls stopWatching on one of them -> so we stop receiving events
  3. The socket disconnects and reconnects
  4. At least one of the feeds won't know that the client called stopWatching, so it will call getOrCreate again and will resubscribe

I guess this can be solved by sharing some state across feed instances (i.e. the set of watched feeds), but at that point maybe it's better to leave the logic fully centralized? Not sure. 🤔 Maybe moving this responsibility into FeedImpl is still easier to understand vs the current approach?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Centralising this is better.

feedWatchHandler.onStartWatching(query.fid)
}
return feedsRepository
.getOrCreateFeed(query)
.onSuccess { _state.onQueryFeed(it) }
.map { it.feed }
}

override suspend fun stopWatching(): Result<Unit> {
feedWatchHandler.onStopWatching(query.fid)
return feedsRepository.stopWatching(groupId = group, feedId = id)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import io.getstream.android.core.api.StreamClient
import io.getstream.android.core.api.log.StreamLogger
import io.getstream.android.core.api.model.connection.StreamConnectedUser
import io.getstream.android.core.api.model.connection.StreamConnectionState
import io.getstream.android.core.api.model.exceptions.StreamClientException
import io.getstream.android.core.api.model.value.StreamApiKey
import io.getstream.android.core.api.subscribe.StreamSubscriptionManager
import io.getstream.feeds.android.client.api.Moderation
Expand All @@ -27,6 +28,7 @@ import io.getstream.feeds.android.client.api.model.PushNotificationsProvider
import io.getstream.feeds.android.client.api.model.User
import io.getstream.feeds.android.client.api.model.UserAuthType
import io.getstream.feeds.android.client.internal.client.reconnect.ConnectionRecoveryHandler
import io.getstream.feeds.android.client.internal.client.reconnect.FeedWatchHandler
import io.getstream.feeds.android.client.internal.repository.ActivitiesRepository
import io.getstream.feeds.android.client.internal.repository.AppRepository
import io.getstream.feeds.android.client.internal.repository.BookmarksRepository
Expand All @@ -47,13 +49,19 @@ import io.getstream.feeds.android.network.models.ListDevicesResponse
import io.mockk.coEvery
import io.mockk.every
import io.mockk.mockk
import io.mockk.verify
import java.util.Date
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import org.junit.Assert.assertEquals
import org.junit.Assert.assertTrue
import org.junit.Test

@OptIn(ExperimentalCoroutinesApi::class)
internal class FeedsClientImplTest {
private val coreClient: StreamClient = mockk(relaxed = true)
private val feedsEventsSubscriptionManager: StreamSubscriptionManager<FeedsEventListener> =
Expand All @@ -72,7 +80,10 @@ internal class FeedsClientImplTest {
private val pollsRepository: PollsRepository = mockk(relaxed = true)
private val uploader: FeedUploader = mockk(relaxed = true)
private val moderation: Moderation = mockk(relaxed = true)
private val feedWatchHandler: FeedWatchHandler = mockk(relaxed = true)
private val logger: StreamLogger = mockk(relaxed = true)
private val scope = TestScope(UnconfinedTestDispatcher())
private val errorBus = MutableSharedFlow<StreamClientException>()

private val feedsClient: FeedsClientImpl =
FeedsClientImpl(
Expand All @@ -92,9 +103,21 @@ internal class FeedsClientImplTest {
pollsRepository = pollsRepository,
uploader = uploader,
moderation = moderation,
feedWatchHandler = feedWatchHandler,
logger = logger,
scope = scope,
errorBus = errorBus,
)

@Test
fun `on error bus event, then log it`() = runTest {
val exception = StreamClientException("error!")

errorBus.emit(exception)

verify { logger.e(exception, any()) }
}

@Test
fun `connect when user is regular, then subscribes to client and connects successfully`() =
runTest {
Expand Down Expand Up @@ -136,7 +159,10 @@ internal class FeedsClientImplTest {
pollsRepository = pollsRepository,
uploader = uploader,
moderation = moderation,
feedWatchHandler = feedWatchHandler,
logger = logger,
scope = scope,
errorBus = errorBus,
)

val result = anonymousClient.connect()
Expand Down
Loading