Skip to content

Commit 9b0c9db

Browse files
authored
KAFKA-19864: Handle TimeoutException from initializeIfNeeded() in StateUpdater Code (#20829)
Handle TimeoutException in StateUpdater code path in `TaskManager.java` Reviewers: Lucas Brutschy <[email protected]>
1 parent 22db7b9 commit 9b0c9db

File tree

2 files changed

+73
-26
lines changed

2 files changed

+73
-26
lines changed

streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1082,6 +1082,7 @@ private void addTaskToStateUpdater(final Task task) {
10821082
try {
10831083
if (canTryInitializeTask(task.id(), nowMs)) {
10841084
task.initializeIfNeeded();
1085+
task.clearTaskTimeout();
10851086
taskIdToBackoffRecord.remove(task.id());
10861087
stateUpdater.add(task);
10871088
} else {
@@ -1095,6 +1096,14 @@ private void addTaskToStateUpdater(final Task task) {
10951096
lockException.getMessage());
10961097
tasks.addPendingTasksToInit(Collections.singleton(task));
10971098
updateOrCreateBackoffRecord(task.id(), nowMs);
1099+
} catch (final TimeoutException timeoutException) {
1100+
// A timeout can occur either during producer initialization OR while fetching committed offsets.
1101+
// Retry in the next iteration.
1102+
task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException);
1103+
tasks.addPendingTasksToInit(Collections.singleton(task));
1104+
updateOrCreateBackoffRecord(task.id(), nowMs);
1105+
log.info("Encountered timeout exception. Reattempting initialization in the next iteration. Error message was: {}",
1106+
timeoutException.getMessage());
10981107
}
10991108
}
11001109

streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,36 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() {
11801180

11811181
verify(task00).initializeIfNeeded();
11821182
verify(task01).initializeIfNeeded();
1183+
verify(task00, never()).clearTaskTimeout();
1184+
verify(task01).clearTaskTimeout();
1185+
verify(tasks).addPendingTasksToInit(
1186+
argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
1187+
);
1188+
verify(stateUpdater, never()).add(task00);
1189+
verify(stateUpdater).add(task01);
1190+
}
1191+
1192+
@Test
1193+
public void shouldRetryInitializationWhenTimeoutExceptionInStateUpdater() {
1194+
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
1195+
.withInputPartitions(taskId00Partitions)
1196+
.inState(State.RESTORING).build();
1197+
final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions)
1198+
.withInputPartitions(taskId01Partitions)
1199+
.inState(State.RUNNING).build();
1200+
final TasksRegistry tasks = mock(TasksRegistry.class);
1201+
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01));
1202+
final TimeoutException timeoutException = new TimeoutException("Timed out!");
1203+
doThrow(timeoutException).when(task00).initializeIfNeeded();
1204+
taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false);
1205+
1206+
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
1207+
1208+
verify(task00).initializeIfNeeded();
1209+
verify(task01).initializeIfNeeded();
1210+
verify(task00).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));
1211+
verify(task00, never()).clearTaskTimeout();
1212+
verify(task01).clearTaskTimeout();
11831213
verify(tasks).addPendingTasksToInit(
11841214
argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01))
11851215
);
@@ -1567,7 +1597,7 @@ private void verifyTransitionToRunningOfRestoredTask(final Set<StreamTask> resto
15671597
final TasksRegistry tasks) {
15681598
for (final StreamTask restoredTask : restoredTasks) {
15691599
verify(restoredTask).completeRestoration(noOpResetter);
1570-
verify(restoredTask).clearTaskTimeout();
1600+
verify(restoredTask, atLeastOnce()).clearTaskTimeout();
15711601
verify(tasks).addTask(restoredTask);
15721602
verify(consumer).resume(restoredTask.inputPartitions());
15731603
}
@@ -2874,40 +2904,48 @@ public void shouldAddNewActiveTasks() {
28742904

28752905
@Test
28762906
public void shouldNotCompleteRestorationIfTasksCannotInitialize() {
2907+
final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions)
2908+
.withInputPartitions(taskId00Partitions)
2909+
.inState(State.CREATED)
2910+
.build();
2911+
final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions)
2912+
.withInputPartitions(taskId01Partitions)
2913+
.inState(State.CREATED)
2914+
.build();
2915+
2916+
final TasksRegistry tasks = mock(TasksRegistry.class);
2917+
final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks);
28772918
final Map<TaskId, Set<TopicPartition>> assignment = mkMap(
28782919
mkEntry(taskId00, taskId00Partitions),
28792920
mkEntry(taskId01, taskId01Partitions)
28802921
);
2881-
final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) {
2882-
@Override
2883-
public void initializeIfNeeded() {
2884-
throw new LockException("can't lock");
2885-
}
2886-
};
2887-
final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
2888-
@Override
2889-
public void initializeIfNeeded() {
2890-
throw new TimeoutException("timed out");
2891-
}
2892-
};
2893-
2894-
when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01));
28952922

2923+
when(activeTaskCreator.createTasks(any(), eq(assignment)))
2924+
.thenReturn(asList(task00, task01));
28962925
taskManager.handleAssignment(assignment, emptyMap());
28972926

2898-
assertThat(task00.state(), is(Task.State.CREATED));
2899-
assertThat(task01.state(), is(Task.State.CREATED));
2927+
verify(tasks).addPendingTasksToInit(asList(task00, task01));
29002928

2901-
assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false));
2929+
when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01));
2930+
final LockException lockException = new LockException("can't lock");
2931+
final TimeoutException timeoutException = new TimeoutException("timeout during init");
2932+
doThrow(lockException).when(task00).initializeIfNeeded();
2933+
doThrow(timeoutException).when(task01).initializeIfNeeded();
2934+
when(tasks.hasPendingTasksToInit()).thenReturn(true);
29022935

2903-
assertThat(task00.state(), is(Task.State.CREATED));
2904-
assertThat(task01.state(), is(Task.State.CREATED));
2905-
assertThat(
2906-
taskManager.activeTaskMap(),
2907-
Matchers.equalTo(mkMap(mkEntry(taskId00, task00), mkEntry(taskId01, task01)))
2908-
);
2909-
assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
2910-
verify(changeLogReader).enforceRestoreActive();
2936+
final boolean restorationComplete = taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
2937+
2938+
assertFalse(restorationComplete);
2939+
verify(task00).initializeIfNeeded();
2940+
verify(task01).initializeIfNeeded();
2941+
verify(task00, never()).maybeInitTaskTimeoutOrThrow(anyLong(), any());
2942+
verify(task01).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));
2943+
verify(task00, never()).clearTaskTimeout();
2944+
verify(task01, never()).clearTaskTimeout();
2945+
verify(tasks).addPendingTasksToInit(Collections.singleton(task00));
2946+
verify(tasks).addPendingTasksToInit(Collections.singleton(task01));
2947+
verify(stateUpdater, never()).add(task00);
2948+
verify(stateUpdater, never()).add(task01);
29112949
verifyNoInteractions(consumer);
29122950
}
29132951

0 commit comments

Comments
 (0)