Skip to content

Commit 12d0924

Browse files
authored
fix: job transition after failed recovery (#330)
If some worker can't be recovered, logic is to remove the pipeline of these workers and check if the job is still able to run. In the case of incomplete data loop after pipeline removal, the job will transition to Failing state. Failing state will then wait for all workers to have certain status before transitioning to Failed state. The bug was in the way we updated the worker status in job context. _reconcile_wrk_status was using workers from wrong source - class attribute 'self._new_cfg' instead of 'new_cfg' received as argument.
1 parent 3d3cd7d commit 12d0924

File tree

1 file changed

+10
-1
lines changed

1 file changed

+10
-1
lines changed

infscale/controller/job_context.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,15 @@ async def start(self):
410410

411411
self.context.set_state(JobStateEnum.STARTING)
412412

413+
def cond_stopped(self):
414+
"""Handle the transition to stopped."""
415+
# when a worker fails and is unrecoverable, we need to
416+
# remove that worker and the pipeline of that worker.
417+
# This means that some workers will send Failed status,
418+
# others will send Terminated status. This will avoid any
419+
# unnecessary exceptions since the job is already in Failed.
420+
pass
421+
413422

414423
class RecoveryState(BaseJobState):
415424
"""RecoveryState class."""
@@ -836,7 +845,7 @@ def _reconcile_wrk_status(self, cur_cfg: JobConfig, new_cfg: JobConfig) -> None:
836845
worker_diff = JobConfig.get_workers_diff(cur_cfg, new_cfg)
837846
self.remove_wrk_status(worker_diff)
838847

839-
for w in self._new_cfg.workers:
848+
for w in new_cfg.workers:
840849
if w.id not in self.wrk_status:
841850
self.wrk_status[w.id] = WorkerStatus.READY
842851

0 commit comments

Comments
 (0)