diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 67d009b037f78..232487e2ebc98 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1082,6 +1082,7 @@ private void addTaskToStateUpdater(final Task task) { try { if (canTryInitializeTask(task.id(), nowMs)) { task.initializeIfNeeded(); + task.clearTaskTimeout(); taskIdToBackoffRecord.remove(task.id()); stateUpdater.add(task); } else { @@ -1095,6 +1096,14 @@ private void addTaskToStateUpdater(final Task task) { lockException.getMessage()); tasks.addPendingTasksToInit(Collections.singleton(task)); updateOrCreateBackoffRecord(task.id(), nowMs); + } catch (final TimeoutException timeoutException) { + // A timeout can occur either during producer initialization OR while fetching committed offsets. + // Retry in the next iteration. + task.maybeInitTaskTimeoutOrThrow(nowMs, timeoutException); + tasks.addPendingTasksToInit(Collections.singleton(task)); + updateOrCreateBackoffRecord(task.id(), nowMs); + log.info("Encountered timeout exception. Reattempting initialization in the next iteration. Error message was: {}", + timeoutException.getMessage()); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 0156d101ed8db..7e696b766b0d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -1178,6 +1178,36 @@ public void shouldRetryInitializationWhenLockExceptionInStateUpdater() { verify(task00).initializeIfNeeded(); verify(task01).initializeIfNeeded(); + verify(task00, never()).clearTaskTimeout(); + verify(task01).clearTaskTimeout(); + verify(tasks).addPendingTasksToInit( + argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01)) + ); + verify(stateUpdater, never()).add(task00); + verify(stateUpdater).add(task01); + } + + @Test + public void shouldRetryInitializationWhenTimeoutExceptionInStateUpdater() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.RESTORING).build(); + final StandbyTask task01 = standbyTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.RUNNING).build(); + final TasksRegistry tasks = mock(TasksRegistry.class); + when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); + final TimeoutException timeoutException = new TimeoutException("Timed out!"); + doThrow(timeoutException).when(task00).initializeIfNeeded(); + taskManager = setUpTaskManagerWithStateUpdater(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, tasks, false); + + taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); + + verify(task00).initializeIfNeeded(); + verify(task01).initializeIfNeeded(); + verify(task00).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException)); + verify(task00, never()).clearTaskTimeout(); + verify(task01).clearTaskTimeout(); verify(tasks).addPendingTasksToInit( argThat(tasksToInit -> tasksToInit.contains(task00) && !tasksToInit.contains(task01)) ); @@ -1565,7 +1595,7 @@ private void verifyTransitionToRunningOfRestoredTask(final Set resto final TasksRegistry tasks) { for (final StreamTask restoredTask : restoredTasks) { verify(restoredTask).completeRestoration(noOpResetter); - verify(restoredTask).clearTaskTimeout(); + verify(restoredTask, atLeastOnce()).clearTaskTimeout(); verify(tasks).addTask(restoredTask); verify(consumer).resume(restoredTask.inputPartitions()); } @@ -2872,40 +2902,48 @@ public void shouldAddNewActiveTasks() { @Test public void shouldNotCompleteRestorationIfTasksCannotInitialize() { + final StreamTask task00 = statefulTask(taskId00, taskId00ChangelogPartitions) + .withInputPartitions(taskId00Partitions) + .inState(State.CREATED) + .build(); + final StreamTask task01 = statefulTask(taskId01, taskId01ChangelogPartitions) + .withInputPartitions(taskId01Partitions) + .inState(State.CREATED) + .build(); + + final TasksRegistry tasks = mock(TasksRegistry.class); + final TaskManager taskManager = setUpTaskManagerWithStateUpdater(ProcessingMode.AT_LEAST_ONCE, tasks); final Map> assignment = mkMap( mkEntry(taskId00, taskId00Partitions), mkEntry(taskId01, taskId01Partitions) ); - final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true, stateManager) { - @Override - public void initializeIfNeeded() { - throw new LockException("can't lock"); - } - }; - final Task task01 = new StateMachineTask(taskId01, taskId01Partitions, true, stateManager) { - @Override - public void initializeIfNeeded() { - throw new TimeoutException("timed out"); - } - }; - - when(activeTaskCreator.createTasks(any(), eq(assignment))).thenReturn(asList(task00, task01)); + when(activeTaskCreator.createTasks(any(), eq(assignment))) + .thenReturn(asList(task00, task01)); taskManager.handleAssignment(assignment, emptyMap()); - assertThat(task00.state(), is(Task.State.CREATED)); - assertThat(task01.state(), is(Task.State.CREATED)); + verify(tasks).addPendingTasksToInit(asList(task00, task01)); - assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(false)); + when(tasks.drainPendingTasksToInit()).thenReturn(Set.of(task00, task01)); + final LockException lockException = new LockException("can't lock"); + final TimeoutException timeoutException = new TimeoutException("timeout during init"); + doThrow(lockException).when(task00).initializeIfNeeded(); + doThrow(timeoutException).when(task01).initializeIfNeeded(); + when(tasks.hasPendingTasksToInit()).thenReturn(true); - assertThat(task00.state(), is(Task.State.CREATED)); - assertThat(task01.state(), is(Task.State.CREATED)); - assertThat( - taskManager.activeTaskMap(), - Matchers.equalTo(mkMap(mkEntry(taskId00, task00), mkEntry(taskId01, task01))) - ); - assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); - verify(changeLogReader).enforceRestoreActive(); + final boolean restorationComplete = taskManager.checkStateUpdater(time.milliseconds(), noOpResetter); + + assertFalse(restorationComplete); + verify(task00).initializeIfNeeded(); + verify(task01).initializeIfNeeded(); + verify(task00, never()).maybeInitTaskTimeoutOrThrow(anyLong(), any()); + verify(task01).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException)); + verify(task00, never()).clearTaskTimeout(); + verify(task01, never()).clearTaskTimeout(); + verify(tasks).addPendingTasksToInit(Collections.singleton(task00)); + verify(tasks).addPendingTasksToInit(Collections.singleton(task01)); + verify(stateUpdater, never()).add(task00); + verify(stateUpdater, never()).add(task01); verifyNoInteractions(consumer); }