Skip to content

Handle Dask worker-loss task failures#21673

Open
harsh21234i wants to merge 3 commits intoPrefectHQ:mainfrom
harsh21234i:fix/issue-21638
Open

Handle Dask worker-loss task failures#21673
harsh21234i wants to merge 3 commits intoPrefectHQ:mainfrom
harsh21234i:fix/issue-21638

Conversation

@harsh21234i
Copy link
Copy Markdown
Contributor

@harsh21234i harsh21234i commented Apr 22, 2026

Hey @desertaxle

Summary

This fixes prefect-dask so task runs are not left in RUNNING when Dask ultimately fails a task at the scheduler level after repeated worker loss.

When the wrapped Dask future raises a scheduler-side exception such as KilledWorker, Prefect now converts that into a terminal task-run state instead of
swallowing the exception and falling back to a stale API state.

Fixes #21638.

What changed

  • updated PrefectDaskFuture.wait() to treat unexpected Dask future exceptions as terminal task failures instead of returning without a final state
  • updated PrefectDaskFuture.result() to do the same when the wrapped future raises during result retrieval
  • converted those scheduler-side exceptions into a Prefect Crashed task-run state using Prefect’s existing exception-to-state logic
  • proposed that terminal state back to the API through the normal task-run state endpoint
  • if the worker already recorded a terminal state first, reused the server state instead of raising on a second state proposal

Why

The issue report describes a Dask cluster where workers are repeatedly killed under memory pressure until Dask gives up. In that case, the wrapped
distributed future can end in a scheduler exception like KilledWorker instead of returning a Prefect State.

Previously, the prefect-dask future wrapper would swallow that exception in .wait(), leave _final_state unset, and later read the task run state
back from the API. If that API row was still RUNNING, Prefect would treat the task as non-terminal and downstream dependencies could remain blocked
indefinitely.

With this change, scheduler-level Dask failures are translated into a terminal Prefect state immediately.

Tests

Added regression coverage in src/integrations/prefect-dask/tests/test_task_runners.py for:

  • direct wrapped-future scheduler exceptions causing a crashed task state
  • the existing Dask integration case where a worker-side KeyboardInterrupt results in Dask worker loss and the Prefect future still resolves to
    CRASHED

Validation

  • uv run --project src/integrations/prefect-dask ruff check src/integrations/prefect-dask/prefect_dask/task_runners.py src/integrations/prefect-dask/ tests/test_task_runners.py
  • uv run --project src/integrations/prefect-dask ruff format --check src/integrations/prefect-dask/prefect_dask/task_runners.py src/integrations/prefect- dask/tests/test_task_runners.py
  • uv run --project src/integrations/prefect-dask pytest src/integrations/prefect-dask/tests/test_task_runners.py -k "PrefectDaskFuture or wait_captures_exceptions_as_crashed_state"

Results:

  • ruff check: passed
  • ruff format check: passed
  • pytest: `4 passed, 2 skipped

@github-actions github-actions Bot added the bug Something isn't working label Apr 22, 2026
@zzstoatzz
Copy link
Copy Markdown
Collaborator

hi @harsh21234i - the enthusiasm is much appreciated! but i think taking time to reproduce and empirically iterate on these issues will be the most productive use of time! we at prefect also make heavy use of agent harnesses, but they often fail to reckon with the nuance that these issues require

you seem to be hoisting details of the engine into the task runner implementation, which seems like a bit of a design leak here. piping my response here directly back into claude et al will probably not yield a desirable outcome

let me know if you think is unfair, if this was all very intentionally designed. happy to iterate with you on this

@zzstoatzz
Copy link
Copy Markdown
Collaborator

some suggestions on how you might iterate on this

  1. faithful repro: a dask cluster with tight memory limits and a task that OOMs enough times to exhaust DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES
  2. at that point, observe what self._wrapped_future.result() actually does. there are three possibilities and each implies a different fix:
    • (a) raises a specific distributed.* exception → narrow catch + translate (probably at the engine layer, not the future wrapper)
    • (b) returns silently without a prefect State → the isinstance(result, State) branch in wait() needs a fallback
    • (c) hangs indefinitely → a timeout or a scheduler-event hook (client.get_events, worker-state subscription), not an except clause

mikicz's reply in #21638 — "no terminal exception in any log, just OOM restarts" — doesn't rule out (b) or (c). pinning which branch we're in should determine the shape of the fix and the layer it belongs at

@harsh21234i
Copy link
Copy Markdown
Contributor Author

I agree the right next step is to reproduce this more faithfully and pin down which branch we’re actually in before changing the fix shape or layer. I had been assuming the wrapped future was ending in an exception path, but you’re right that the reporter’s logs don’t rule out a non-State return or a hang.

I’ll reproduce it against a constrained Dask cluster and check what self._wrapped_future.result() actually does once allowed failures are exhausted, then rework the fix from there.

@zzstoatzz
Copy link
Copy Markdown
Collaborator

sounds great! thanks

@harsh21234i
Copy link
Copy Markdown
Contributor Author

harsh21234i commented Apr 23, 2026

@zzstoatzz I reproduced it more faithfully on main with a constrained Dask cluster and confirmed we were in the path where
self._wrapped_future.result() raises distributed.scheduler.KilledWorker, while the Prefect task run can still be left
in RUNNING.

The updated fix keeps prefect-dask narrow. The Dask future wrapper now just records unexpected wrapped-future
failures, and the crash reconciliation/state proposal logic lives in shared core future handling instead of the
integration layer.

I also replaced the earlier mock-heavy tests with a more faithful regression that exhausts worker failures under a
tight-memory Dask cluster and verifies the task run resolves to CRASHED.

@codspeed-hq
Copy link
Copy Markdown

codspeed-hq Bot commented Apr 23, 2026

Merging this PR will not alter performance

✅ 2 untouched benchmarks


Comparing harsh21234i:fix/issue-21638 (c234aa7) with main (3efef43)

Open in CodSpeed

@harsh21234i
Copy link
Copy Markdown
Contributor Author

hey @zzstoatzz have you went through this is it perfect now?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Task not marked as Failed when task fails consistently in Dask Cluster

2 participants