-
Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19864: Handle TimeoutException from initializeIfNeeded() in StateUpdater Code #20829
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| for (final StreamTask restoredTask : restoredTasks) { | ||
| verify(restoredTask).completeRestoration(noOpResetter); | ||
| verify(restoredTask).clearTaskTimeout(); | ||
| verify(restoredTask, atLeastOnce()).clearTaskTimeout(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this because, one of the tests shouldAddNewActiveTasks was failing as it expected the call to clearTaskTimeout() more than once.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds handling for TimeoutException during task initialization in the state updater, aligning its behavior with existing LockException handling. It also ensures that task timeouts are properly cleared after successful initialization.
Key changes:
- Added a
TimeoutExceptioncatch block inaddTaskToStateUpdaterthat retries initialization with backoff - Added
clearTaskTimeout()call after successful task initialization - Updated tests to verify timeout handling behavior and clearTaskTimeout invocation
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| TaskManager.java | Added TimeoutException handling with retry logic and clearTaskTimeout call after successful initialization |
| TaskManagerTest.java | Added test for TimeoutException during initialization and updated existing tests to verify clearTaskTimeout behavior |
Comments suppressed due to low confidence (1)
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:1160
- The test
shouldAddTasksToStateUpdatershould verify thatclearTaskTimeout()is called on both tasks after successful initialization, consistent with the new behavior added in line 1085 of TaskManager.java and verified in other tests likeshouldRetryInitializationWhenLockExceptionInStateUpdater.
verify(task00).initializeIfNeeded();
verify(task01).initializeIfNeeded();
verify(stateUpdater).add(task00);
verify(stateUpdater).add(task01);
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
lucasbru
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THanks! Looks good to me with minor comments.
| task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException); | ||
| tasks.addPendingTasksToInit(Collections.singleton(task)); | ||
| updateOrCreateBackoffRecord(task.id(), nowMs); | ||
| log.debug("Task {} timed out during initialization; will retry", task.id(), timeoutException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be info like above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed in a0e55f0
| tasks.addPendingTasksToInit(Collections.singleton(task)); | ||
| updateOrCreateBackoffRecord(task.id(), nowMs); | ||
| } catch (final TimeoutException timeoutException) { | ||
| task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add a comment when this can happen: Either during producer initialization, or while fetching committed offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added comment in a0e55f0
a0e55f0 to
7c1bfc7
Compare
…teUpdater Code (apache#20829) Handle TimeoutException in StateUpdater code path in `TaskManager.java` Reviewers: Lucas Brutschy <[email protected]>
Handle TimeoutException in StateUpdater code path in
TaskManager.javaReviewers: Lucas Brutschy [email protected]