-
Notifications
You must be signed in to change notification settings - Fork 0
Rewatch feeds on socket reconnection #61
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
SDK Size Comparison 📏
|
4607cee
to
4c84bae
Compare
4c84bae
to
f85d0ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements feed rewatching functionality that automatically re-subscribes to feeds when the socket connection is re-established after a disconnection.
Key changes include:
- Added
FeedWatchHandler
to track watched feeds and trigger re-subscription on reconnection - Updated
FeedImpl
to notify the watch handler when feeds start/stop being watched - Modified client creation and initialization to include the new watch handler
Reviewed Changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
FeedWatchHandler.kt | New handler class that tracks watched feeds and re-subscribes them on connection events |
FeedWatchHandlerTest.kt | Comprehensive test coverage for the new watch handler functionality |
FeedImpl.kt | Integration of watch handler notifications for start/stop watching operations |
FeedImplTest.kt | Updated tests to cover watch handler integration and added helper methods |
FeedsClientImpl.kt | Added watch handler dependency and connection recovery initialization |
FeedsClientImplTest.kt | Updated test setup to include the new watch handler dependency |
Create.kt | Added watch handler creation and wiring in the client factory |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
.../main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt
Outdated
Show resolved
Hide resolved
.../main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt
Outdated
Show resolved
Hide resolved
9b4f9eb
to
7566b65
Compare
PR checklist ✅All required conditions are satisfied:
🎉 Great job! This PR is ready for review. |
.../main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt
Outdated
Show resolved
Hide resolved
.../main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt
Outdated
Show resolved
Hide resolved
.../main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt
Outdated
Show resolved
Hide resolved
eb13087
to
fdc1801
Compare
.../main/kotlin/io/getstream/feeds/android/client/internal/client/reconnect/FeedWatchHandler.kt
Outdated
Show resolved
Hide resolved
init { | ||
scope.launch { | ||
connectionState.filterIsInstance<StreamConnectionState.Connected>().collect { | ||
rewatchAll() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rewatchAll()
swallows the potential failure of the request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know but what's the alternative? I'm going to change it to queryFeeds
as you suggested above, so we can easily return a Result
, but we're still executing this code in background automatically, so clients won't receive anything anyway. Maybe we can log failures?
} | ||
} | ||
|
||
fun onStartWatching(feedId: FeedId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:maybe startWatching(feedID)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put on
because the watch handler is not really doing the watching, it's just being notified that we started watching (by executing the actual query)
watched[feedId] = Unit | ||
} | ||
|
||
fun onStopWatching(feedId: FeedId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: same as the other stopWatching(feedId)
@@ -153,6 +155,7 @@ internal class FeedsClientImpl( | |||
return Result.failure(IllegalArgumentException("Anonymous users cannot connect.")) | |||
} | |||
coreClient.subscribe(clientSubscription) | |||
connectionRecoveryHandler.start() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't it better to just call rewatchAll
here in case we have non empty list of watched feeds. Since if connect
goes trough in the coreClient
we will definitely have a new connection ID which will require re-watching??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually a change that is not directly tied to the rewatch logic. I discovered that we lost this call when migrating to the core client, so we were never reconnecting the socket automatically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not concerned about connectionRecoveryHandler.start()
it should be there.
My point is that right after that we can call rewatchAll
instead of initing a collector in the watch handler.
* @property scope The [CoroutineScope] in which to launch coroutines for re-watching feeds. | ||
*/ | ||
internal class FeedWatchHandler( | ||
private val connectionState: Flow<StreamConnectionState>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: Maybe its better for the FeedsWatchhandler
to be dumb and just collect the bunch of feed IDs and wire to the public rewatchAll() : Result<Whatever>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To do that, we'd need to add a rewatchAll
call everywhere we connect the core client (after we validated that the connection was successful), which atm would be in FeedsClientImpl
and ConnectionRecoveryHandler
. This means that if we ever add more code calling .connect()
, we need to remember to also call rewatchAll
. It's more error prone than using the socket state as the source of truth
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ConnectionRecoveryHandler
I think should not call connect()
directly to the core client, but that is another topic.
9491e16
to
f8fbdb1
Compare
b5379b0
to
16375c8
Compare
16375c8
to
37d171f
Compare
@@ -127,13 +129,17 @@ internal class FeedImpl( | |||
get() = _state | |||
|
|||
override suspend fun getOrCreate(): Result<FeedData> { | |||
if (query.watch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am probably oversimplifying this whole logic, but wouldn't it be possible to handle the rewatching inside the FeedImpl
class?
The FeedImpl
already listens for WS events, so we could potentially react to ConnectedEvent
and re-run the getOrCreate
method (if watch = true
). We would probably need to have some logic/guards for making sure that we call that only after a terminated + restored connection, but it would potentially reduce the complexity of the whole problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we receive ConnectedEvent
, or at least I couldn't find it. Maybe it was there before integrating core?
We could still inject Flow<StreamConnectionState>
and collect it inside FeedImpl
, but I think we'd have a coordination problem in this scenario:
- Client creates two instances of the same feed, both having
watch=true
- Client calls
stopWatching
on one of them -> so we stop receiving events - The socket disconnects and reconnects
- At least one of the feeds won't know that the client called
stopWatching
, so it will callgetOrCreate
again and will resubscribe
I guess this can be solved by sharing some state across feed instances (i.e. the set of watched feeds), but at that point maybe it's better to leave the logic fully centralized? Not sure. 🤔 Maybe moving this responsibility into FeedImpl
is still easier to understand vs the current approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Centralising this is better.
LGTM (connection recovery will be handled in core at a later point) |
Goal
Make sure that feeds that the client started watching, are re-watched on socket reconnection.
Implementation
Introduce a
FeedWatchHandler
class that keeps track of watched feeds and re-watch them when the socket state becomes connectedTesting