Skip to content

Conversation

alisterburt
Copy link

@alisterburt alisterburt commented Oct 7, 2025

This PR supersedes #9104 and implements missing support for grouped workers in SpecCluster.

As described in #9104, support for grouped workers was originally implemented in #3013 and is still documented in the SpecCluster docstring but appears to have been broken for a while from use in dask-jobqueue (dask/dask-jobqueue#498 and dask/dask-jobqueue#691)

Key Concepts Disambiguated

Added clear terminology and documentation distinguishing:

  • Spec name: Key in worker_spec dict (e.g., "0", "my-worker")
  • Worker name: Name reported to scheduler (e.g., "0-0", "0-1" for grouped workers)
  • self.workers dict: Always keyed by spec names (not worker names), mapping to Worker/Nanny/MultiWorker instances

This disambiguation was critical - fixed a bug where worker names were incorrectly used as keys to access self.workers.

Semantic Decisions

  1. Conservative Scaling (all parameters are maximums)
  • scale(n=5) with 2 grouped workers per spec → creates 2 specs = 4 workers (not 6)
  • scale(memory="6GB") with 4GB/spec → creates 1 spec = 4GB (not 8GB)
  • Rationale: Prevents resource overcommitment, matches user expectation of "at most N"
  • Special case: scale(n=1) from 0 workers creates 1 spec to avoid deadlock
  1. Grouped Worker Failure Handling
  • When any worker in a group fails, entire spec is removed
  • Removal triggered after lost-worker-timeout expires
  • Adaptive scaling recreates the group as a unit
  • Rationale: HPC allocations fail as one unit; partial groups are invalid
  1. Spec Names Must Be Strings
  • All spec names (keys in worker_spec) are now strings
  • Auto-generated spec names use str(i) instead of int(i)
  • Rationale: Type consistency, eliminates int/str conversion bugs

New Tests

Helper method tests:

  • test_spec_name_to_worker_names_regular
  • test_spec_name_to_worker_names_grouped
  • test_worker_name_to_spec_name_regular
  • test_worker_name_to_spec_name_grouped
  • test_worker_name_to_spec_name_mixed

Grouped worker functionality:

  • test_grouped_worker_death_removes_spec
  • test_grouped_worker_spec_removal_multiple_rounds
  • test_unexpected_close_whole_worker_group
  • test_scale_down_with_grouped_workers
  • test_mixed_regular_and_grouped_workers

String spec name tests:

  • test_new_spec_name_returns_string
  • test_worker_spec_keys_are_strings
  • test_run_spec_cluster_custom_spec_names

Implementation Notes

This implementation uses type hints for new methods and parameters). This is not consistent with the rest of the codebase and am happy to remove these if that's the maintainers' preference.

…nsistency

Establish four key concepts with precise terminology:
  - Spec name: String key in worker_spec dict (e.g., "0", "my-worker")
  - Worker name: String scheduler sees (e.g., "0-0", "0-1")
  - Worker spec: Dict with 'cls', 'options', 'group' (in worker_spec)
  - Worker instance: Actual Worker/Nanny object (in workers)

Changes:
 - Rename _new_worker_name() → _new_spec_name() for clarity
 - Add type hints: worker_spec: dict[str, dict], workers: dict[str, Worker | Nanny]
 - Update SpecCluster docstring with Terminology and Grouped Workers sections
 - Fix examples to use string keys ("0", "1") and consistent variable names
 - Add failing tests (TDD): test_new_spec_name_returns_string, test_worker_spec_keys_are_strings
 - Ensure spec numbers are strings in worker spec dictionary
…ted tests to enable grouped worker support in SpecCluster

  - `_spec_name_to_worker_names()`: Maps spec name → worker names
    - Regular workers: 1:1 mapping (spec "0" → worker "0")
    - Grouped workers: 1:many mapping (spec "0" → workers "0-0", "0-1", "0-2")

  - `_worker_name_to_spec_name()`: Maps worker name → spec name
    - Handles both regular and grouped workers
    - Returns None if worker name not found
  1. _update_worker_status(): When a grouped worker fails, removes entire spec
  2. _correct_state_internal(): Maps spec names → worker names before retiring workers
  3. scale(): Correctly identifies launched specs by mapping worker names → spec names
  4. scale_down(): Simplified using helper methods, handles both regular and grouped workers
  Changes scale() behavior for grouped workers to use conservative (floor)
  rounding instead of ceiling, ensuring resource limits are never exceeded:

  - scale(n): Rounds DOWN to complete specs (e.g., scale(5) with 2-worker
    specs → 4 workers, not 6)
  - scale(memory/cores): Uses floor division for grouped workers to stay
    under limits
  - Special case: When scaling from 0 workers, creates at least 1 spec to
    prevent deadlock (e.g., scale(1) with 2-worker specs → 2 workers)

  This makes all scaling parameters consistent ("at most X") and prevents
  resource overcommitment, OOM, and CPU oversubscription.

  Updated scale() docstring with comprehensive documentation of conservative
  behavior and updated tests to reflect new semantics.
  The `_update_worker_status()` method was checking worker names (e.g., "2-0")
  against `self.workers` which is keyed by spec names (e.g., "2"). This caused
  the grouped worker spec removal code to never execute.

  The bug manifested when:
  1. Spec "0" is removed (leaving spec "1")
  2. Adaptive scaling creates new spec "2"
  3. Attempting to remove spec "2" fails silently

  Fix:
  - Map worker name → spec name first using `_worker_name_to_spec_name()`
  - Use spec name to access `self.workers[spec_name]` for grouped workers
  - Properly close and remove MultiWorker instances before removing specs

  Testing:
  - Added `test_grouped_worker_spec_removal_multiple_rounds` to validate
    spec removal works across multiple rounds with different spec names

also checked that all usages of self.workers used correct keys (spec names)
Copy link
Contributor

github-actions bot commented Oct 7, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

    27 files  ±  0      27 suites  ±0   9h 33m 26s ⏱️ - 3m 11s
 4 124 tests + 12   4 017 ✅ + 12    104 💤 ±0  3 ❌ ±0 
51 672 runs  +156  49 485 ✅ +156  2 184 💤 ±0  3 ❌ ±0 

For more details on these failures, see this check.

Results for commit 6901bc1. ± Comparison against base commit c9e7aca.

This pull request removes 1 and adds 13 tests. Note that renamed tests count towards both.
distributed.deploy.tests.test_spec_cluster ‑ test_run_spec_cluster_worker_names
distributed.deploy.tests.test_spec_cluster ‑ test_grouped_worker_death_removes_spec
distributed.deploy.tests.test_spec_cluster ‑ test_grouped_worker_spec_removal_multiple_rounds
distributed.deploy.tests.test_spec_cluster ‑ test_mixed_regular_and_grouped_workers
distributed.deploy.tests.test_spec_cluster ‑ test_new_spec_name_returns_string
distributed.deploy.tests.test_spec_cluster ‑ test_run_spec_cluster_custom_spec_names
distributed.deploy.tests.test_spec_cluster ‑ test_scale_down_with_grouped_workers
distributed.deploy.tests.test_spec_cluster ‑ test_spec_name_to_worker_names_grouped
distributed.deploy.tests.test_spec_cluster ‑ test_spec_name_to_worker_names_regular
distributed.deploy.tests.test_spec_cluster ‑ test_unexpected_close_whole_worker_group
distributed.deploy.tests.test_spec_cluster ‑ test_worker_name_to_spec_name_grouped
…

@jacobtomlinson
Copy link
Member

Thanks for taking the time to pull things apart and figure this stuff out! As you can imagine with a big PR like this it's going to take a little while to review. I particularly want to open up some draft PRs in downstream projects like dask-kubernetes with this PR installed to ensure there aren't impacts that we can't predict. This PR touches a lot of "internal" APIs that aren't well documented and are used in other parts of Dask, there may be dragons here.

In the meantime here are a few high-level thoughts:

Spec name: Key in worker_spec dict (e.g., "0", "my-worker")

I like the terminology around differentiating spec name from worker name

Rationale: Prevents resource overcommitment, matches user expectation of "at most N"

I want to challenge this expectation. If I did scale(memory=140GB) it would probably be because I knew my workload will need that amount of memory at some point. If I actually got 132GB of memory this might be enough to make my workload fail. My expectation would be "at least N".

I wonder if we want to pick a position for memory= and document how it behaves, then also introduce max_memory= and min_memory= to allow people to be explicit?

When any worker in a group fails, entire spec is removed

This logic sounds fine to me, but we should check that things are removed gracefully enough that we don't loose too much in one go. Imagine each spec was a large node with many workers, we may need to gradually remove them.

This implementation uses type hints for new methods and parameters). This is not consistent with the rest of the codebase

This is great. We like type hints, but there is so much code here without it the best we can do is aim for eventual consistency.

@alisterburt
Copy link
Author

@jacobtomlinson thanks for taking a look quickly, glad the work is appreciated!

Your points are all valid and understood I think. I'll incorporate your suggestions and no rush from my side to get this in, let's do it right 🙂

One thing I'm not 100% on is your comment about the whole spec removal when one worker in a group disappears - could you be more specific with your suggestion around graceful closure? I was building against the assumption that the reason for unexpected worker loss was preemption of a whole allocation in a HPC system, but the use case you present makes sense... happy to add delays/more grace to the spec removal process or rethink this

Look forward to hearing about how it goes with dask-kubernetes, I'm running some tests against dask-jobqueue with this and will report any issues that crop up there too

@alisterburt
Copy link
Author

Good news, so far this PR has supported workers with multiple processes from SLURMCluster from dask-jobqueue without issue - previous issues with this setup have not cropped up (yet?)

@jacobtomlinson
Copy link
Member

That's great!

I've opened a few draft PRs in downstream projects that install the changes from this PR to see if CI is happy.

It looks like there are some scaling failures in dask/dask-jobqueue#694 that might be related to the changes here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants