From 3bb795fd5deb0c553f09133ab47386ec16860db9 Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Fri, 6 Jun 2025 13:20:44 -0700 Subject: [PATCH 1/6] feat: initial commit of automated partition selection --- snakemake_executor_plugin_slurm/__init__.py | 24 +- snakemake_executor_plugin_slurm/partitions.py | 267 +++++++++ .../submit_string.py | 3 - tests/tests.py | 524 ++++++++++++++++++ 4 files changed, 813 insertions(+), 5 deletions(-) create mode 100644 snakemake_executor_plugin_slurm/partitions.py diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index badda89a..de9d2e36 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -29,6 +29,7 @@ from .utils import delete_slurm_environment, delete_empty_dirs, set_gres_string from .submit_string import get_submit_command +from .partitions import read_partition_file, get_best_partition @dataclass @@ -106,6 +107,14 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) + partitions: Optional[Path] = field( + default=None, + metadata={ + "help": "Path to YAML file that specifies partitions. See docs for details.", + "env_var": False, + "required": False, + }, + ) # Required: @@ -149,6 +158,11 @@ def __post_init__(self, test_mode: bool = False): if self.workflow.executor_settings.logdir else Path(".snakemake/slurm_logs").resolve() ) + self._partitions = ( + read_partition_file(self.workflow.executor_settings.partitions) + if self.workflow.executor_settings.partitions + else None + ) atexit.register(self.clean_old_logs) def clean_old_logs(self) -> None: @@ -231,12 +245,12 @@ def run_job(self, job: JobExecutorInterface): if job.resources.get("slurm_extra"): self.check_slurm_extra(job) + # NOTE removed partition from below, such that partition selection can benefit from resource checking as the call is built up. job_params = { "run_uuid": self.run_uuid, "slurm_logfile": slurm_logfile, "comment_str": comment_str, "account": self.get_account_arg(job), - "partition": self.get_partition_arg(job), "workdir": self.workflow.workdir_init, } @@ -279,6 +293,8 @@ def run_job(self, job: JobExecutorInterface): "Probably not what you want." ) + call += self.get_partition_arg(job) + exec_job = self.format_job_exec(job) # and finally the job to execute with all the snakemake parameters @@ -618,9 +634,13 @@ def get_partition_arg(self, job: JobExecutorInterface): returns a default partition, if applicable else raises an error - implicetly. """ + partition = None if job.resources.get("slurm_partition"): partition = job.resources.slurm_partition - else: + elif self._partitions: + partition = get_best_partition(self._partitions, job, self.logger) + # we didnt get a partition yet so try fallback. + if not partition: if self._fallback_partition is None: self._fallback_partition = self.get_default_partition(job) partition = self._fallback_partition diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py new file mode 100644 index 00000000..12c8d553 --- /dev/null +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -0,0 +1,267 @@ +from dataclasses import dataclass +from typing import Optional, List +import yaml +from pathlib import Path +from math import inf, isinf +from snakemake_interface_common.exceptions import WorkflowError +from snakemake_interface_executor_plugins.jobs import ( + JobExecutorInterface, +) +from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface + + +def read_partition_file(partition_file: Path) -> List["Partition"]: + with open(partition_file, "r") as f: + out = [] + partitions = yaml.safe_load(f)["partitions"] + for p in partitions: + out.append( + Partition( + name=p["name"], + limits=PartitionLimits(**p["limits"]), + description=p["description"], + ) + ) + return out + + +def get_best_partition( + candidate_partitions: List["Partition"], + job: JobExecutorInterface, + logger: LoggerExecutorInterface, +) -> Optional[str]: + scored_partitions = [ + (p, score) + for p in candidate_partitions + if (score := p.score_job_fit(job)) is not None + ] + + if scored_partitions: + best_partition, best_score = max(scored_partitions, key=lambda x: x[1]) + partition = best_partition.name + logger.warning( + f"Auto-selected partition '{partition}' for job {job.name} " + f"with score {best_score:.3f}" + ) + return partition + else: + logger.warning( + f"No suitable partition found for job {job.name} based on " + f"resource requirements. Falling back to default behavior." + ) + return None + + +def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str]]: + """Parse GPU requirements from job resources. Returns (count, model)""" + gpu_required = job.resources.get("gpu", 0) + gres = job.resources.get("gres", "") + + # Convert to int if it's a string representation of a number + if isinstance(gpu_required, str): + try: + gpu_required = int(gpu_required) + except ValueError: + gpu_required = 0 + + # Ensure gres is a string + if not isinstance(gres, str): + gres = str(gres) if gres else "" + + if "gpu" in gres and gpu_required: + raise WorkflowError( + "GPU resource specified in both 'gpu' and 'gres'. These are mutually exclusive." + ) + + if gpu_required: + return int(gpu_required), job.resources.get("gpu_model") + elif "gpu" in gres: + # Parse gres string format: gpu: or gpu:: + gpu_parts = [part for part in gres.split(",") if part.strip().startswith("gpu")] + if gpu_parts: + gpu_spec = gpu_parts[0].strip().split(":") + if len(gpu_spec) == 2: # gpu: + return int(gpu_spec[1]), None + elif len(gpu_spec) == 3: # gpu:: + return int(gpu_spec[2]), gpu_spec[1] + + return 0, None + + +def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: + """ + This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. + """ + + gpu_required = job.resources.get("gpu", 0) + gres = job.resources.get("gres", "") + + # Convert gpu_required to int if it's a string + if isinstance(gpu_required, str): + try: + gpu_required = int(gpu_required) + except ValueError: + gpu_required = 0 + + # Ensure gres is a string for the "in" check + if not isinstance(gres, str): + gres = str(gres) if gres else "" + + has_gpu = bool(gpu_required) or "gpu" in gres + + cpus_per_task = job.resources.get("cpus_per_task") + if cpus_per_task is not None: + # Convert to int if it's a string + if isinstance(cpus_per_task, str): + try: + cpus_per_task = int(cpus_per_task) + except ValueError: + cpus_per_task = 0 + else: + cpus_per_task = int(cpus_per_task) + + if cpus_per_task < 0: + return (0, "none") + # ensure that at least 1 cpu is requested because 0 is not allowed by slurm + return (max(1, cpus_per_task), "task") + + elif has_gpu: + cpus_per_gpu = job.resources.get("cpus_per_gpu") + if cpus_per_gpu is not None: + # Convert to int if it's a string + if isinstance(cpus_per_gpu, str): + try: + cpus_per_gpu = int(cpus_per_gpu) + except ValueError: + cpus_per_gpu = 0 + else: + cpus_per_gpu = int(cpus_per_gpu) + + if cpus_per_gpu <= 0: + return (0, "none") + return (cpus_per_gpu, "gpu") + + return (job.threads, "task") + + +@dataclass +class PartitionLimits: + """Represents resource limits for a SLURM partition""" + + # Standard resources + max_runtime: float = inf # minutes + max_mem_mb: float = inf + max_mem_mb_per_cpu: float = inf + max_cpus_per_task: float = inf + + # SLURM-specific resources + max_nodes: float = inf + max_tasks: float = inf + max_tasks_per_node: float = inf + + # GPU resources + max_gpu: int = 0 + available_gpu_models: Optional[List[str]] = None + max_cpus_per_gpu: float = inf + + # MPI resources + supports_mpi: bool = True + max_mpi_tasks: float = inf + + # Node features/constraints + available_constraints: Optional[List[str]] = None + + +@dataclass +class Partition: + """Represents a SLURM partition with its properties and limits""" + + name: str + limits: PartitionLimits + description: Optional[str] = None + + def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: + """ + Check if a job can run on this partition. If not return none. + Calculate a score for how well a partition fits the job requirements + """ + + # try to score how closely a job matches a partition's limits, in order to handle case where multiple partitions can run a given job + # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources + # here a higher score indicates a better fit + # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. + + score = 0.0 + + numerical_resources = { + "mem_mb": self.limits.max_mem_mb, + "mem_mb_per_cpu": self.limits.max_mem_mb_per_cpu, + "runtime": self.limits.max_runtime, + "nodes": self.limits.max_nodes, + "tasks": self.limits.max_tasks, + "tasks_per_node": self.limits.max_tasks_per_node, + "mpi_tasks": self.limits.max_mpi_tasks, + } + + for resource_key, limit in numerical_resources.items(): + job_requirement = job.resources.get(resource_key, 0) + # Convert to numeric value if it's a string + if isinstance(job_requirement, str): + try: + job_requirement = float(job_requirement) + except ValueError: + job_requirement = 0 + elif not isinstance(job_requirement, (int, float)): + job_requirement = 0 + + if job_requirement > 0: + if not isinf(limit) and job_requirement > limit: + return None + if not isinf(limit): + score += job_requirement / limit + + cpu_count, cpu_type = get_job_cpu_requirement(job) + if cpu_type == "task" and cpu_count > 0: + if ( + not isinf(self.limits.max_cpus_per_task) + and cpu_count > self.limits.max_cpus_per_task + ): + return None + if not isinf(self.limits.max_cpus_per_task): + score += cpu_count / self.limits.max_cpus_per_task + elif cpu_type == "gpu" and cpu_count > 0: + if ( + not isinf(self.limits.max_cpus_per_gpu) + and cpu_count > self.limits.max_cpus_per_gpu + ): + return None + if not isinf(self.limits.max_cpus_per_gpu): + score += cpu_count / self.limits.max_cpus_per_gpu + + gpu_count, gpu_model = parse_gpu_requirements(job) + if gpu_count > 0: + if self.limits.max_gpu == 0 or gpu_count > self.limits.max_gpu: + return None + score += gpu_count / self.limits.max_gpu + + if gpu_model and self.limits.available_gpu_models: + if gpu_model not in self.limits.available_gpu_models: + return None + + if job.resources.get("mpi") and not self.limits.supports_mpi: + return None + + constraint = job.resources.get("constraint") + if constraint and self.limits.available_constraints: + # Ensure constraint is a string + if not isinstance(constraint, str): + constraint = str(constraint) + required_constraints = [ + c.strip() for c in constraint.split(",") if c.strip() + ] + if not all( + req in self.limits.available_constraints for req in required_constraints + ): + return None + + return score diff --git a/snakemake_executor_plugin_slurm/submit_string.py b/snakemake_executor_plugin_slurm/submit_string.py index 2da03f69..f6798314 100644 --- a/snakemake_executor_plugin_slurm/submit_string.py +++ b/snakemake_executor_plugin_slurm/submit_string.py @@ -24,9 +24,6 @@ def get_submit_command(job, params): # here, only the string is used, as it already contains # '-A {account_name}' call += f" {params.account}" - # here, only the string is used, as it already contains - # '- p {partition_name}' - call += f" {params.partition}" if job.resources.get("clusters"): call += f" --clusters {job.resources.clusters}" diff --git a/tests/tests.py b/tests/tests.py index f98328b7..ddfad201 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -3,10 +3,17 @@ from snakemake_interface_executor_plugins.settings import ExecutorSettingsBase from unittest.mock import MagicMock, patch import pytest +import tempfile +import yaml +from pathlib import Path from snakemake_executor_plugin_slurm import ExecutorSettings from snakemake_executor_plugin_slurm.utils import set_gres_string from snakemake_executor_plugin_slurm.submit_string import get_submit_command +from snakemake_executor_plugin_slurm.partitions import ( + read_partition_file, + get_best_partition, +) from snakemake_interface_common.exceptions import WorkflowError @@ -474,3 +481,520 @@ def test_wildcard_slash_replacement(self): # Verify no slashes remain in the wildcard string assert "/" not in wildcard_str + +class TestPartitionSelection: + @pytest.fixture + def basic_partition_config(self): + """Basic partition configuration with two partitions.""" + return { + "partitions": [ + { + "name": "default", + "description": "General purpose compute nodes", + "limits": { + "max_runtime": 1440, + "max_mem_mb": 128000, + "max_cpus_per_task": 32, + "supports_mpi": True, + }, + }, + { + "name": "gpu", + "description": "GPU-enabled nodes", + "limits": { + "max_runtime": 720, + "max_mem_mb": 256000, + "max_gpu": 4, + "available_gpu_models": ["a100", "v100"], + "supports_mpi": False, + }, + }, + ] + } + + @pytest.fixture + def minimal_partition_config(self): + """Minimal partition configuration.""" + return { + "partitions": [ + { + "name": "minimal", + "description": "Minimal partition", + "limits": {}, + } + ] + } + + @pytest.fixture + def comprehensive_partition_config(self): + """Comprehensive partition configuration with all limit types.""" + return { + "partitions": [ + { + "name": "comprehensive", + "description": "Partition with all limits", + "limits": { + # Standard resources + "max_runtime": 2880, + "max_mem_mb": 500000, + "max_mem_mb_per_cpu": 8000, + "max_cpus_per_task": 64, + # SLURM-specific resources + "max_nodes": 4, + "max_tasks": 256, + "max_tasks_per_node": 64, + # GPU resources + "max_gpu": 8, + "available_gpu_models": ["a100", "v100", "rtx3090"], + "max_cpus_per_gpu": 16, + # MPI resources + "supports_mpi": True, + "max_mpi_tasks": 512, + # Node features/constraints + "available_constraints": ["intel", "avx2", "highmem"], + }, + } + ] + } + + @pytest.fixture + def empty_partitions_config(self): + """Empty partitions configuration.""" + return {"partitions": []} + + @pytest.fixture + def missing_name_config(self): + """Configuration with missing name field.""" + return { + "partitions": [ + { + # Missing 'name' field + "description": "Missing name", + "limits": {}, + } + ] + } + + @pytest.fixture + def invalid_key_config(self): + """Configuration with invalid key.""" + return {"invalid_key": []} + + @pytest.fixture + def temp_yaml_file(self): + """Helper fixture to create temporary YAML files.""" + + def _create_temp_file(config): + with tempfile.NamedTemporaryFile( + mode="w", suffix=".yaml", delete=False + ) as f: + yaml.dump(config, f) + return Path(f.name) + + return _create_temp_file + + def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file): + """Test reading a valid partition configuration file.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 2 + + # Check first partition + assert partitions[0].name == "default" + assert partitions[0].description == "General purpose compute nodes" + assert partitions[0].limits.max_runtime == 1440 + assert partitions[0].limits.max_mem_mb == 128000 + assert partitions[0].limits.max_cpus_per_task == 32 + assert partitions[0].limits.supports_mpi is True + + # Check second partition + assert partitions[1].name == "gpu" + assert partitions[1].description == "GPU-enabled nodes" + assert partitions[1].limits.max_runtime == 720 + assert partitions[1].limits.max_gpu == 4 + assert partitions[1].limits.available_gpu_models == ["a100", "v100"] + assert partitions[1].limits.supports_mpi is False + + finally: + temp_path.unlink() + + def test_read_minimal_partition_file( + self, minimal_partition_config, temp_yaml_file + ): + """Test reading a partition file with minimal configuration.""" + from math import isinf + + temp_path = temp_yaml_file(minimal_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + assert partitions[0].name == "minimal" + assert partitions[0].description == "Minimal partition" + + # Check that all limits are inf + limits = partitions[0].limits + assert isinf(limits.max_runtime) + assert isinf(limits.max_mem_mb) + assert limits.max_gpu == 0 + assert limits.supports_mpi is True + + finally: + temp_path.unlink() + + def test_read_partition_file_with_all_limits( + self, comprehensive_partition_config, temp_yaml_file + ): + """Test reading a partition file with all possible limit types.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + + assert len(partitions) == 1 + limits = partitions[0].limits + + # Check standard resources + assert limits.max_runtime == 2880 + assert limits.max_mem_mb == 500000 + assert limits.max_mem_mb_per_cpu == 8000 + assert limits.max_cpus_per_task == 64 + + # Check SLURM-specific resources + assert limits.max_nodes == 4 + assert limits.max_tasks == 256 + assert limits.max_tasks_per_node == 64 + + # Check GPU resources + assert limits.max_gpu == 8 + assert limits.available_gpu_models == ["a100", "v100", "rtx3090"] + assert limits.max_cpus_per_gpu == 16 + + # Check MPI resources + assert limits.supports_mpi is True + assert limits.max_mpi_tasks == 512 + + # Check constraints + assert limits.available_constraints == ["intel", "avx2", "highmem"] + + finally: + temp_path.unlink() + + def test_read_empty_partitions_list(self, empty_partitions_config, temp_yaml_file): + """Test reading a file with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + assert len(partitions) == 0 + + finally: + temp_path.unlink() + + def test_read_nonexistent_file(self): + """Test reading a non-existent file raises appropriate error.""" + nonexistent_path = Path("/nonexistent/path/to/file.yaml") + + with pytest.raises(FileNotFoundError): + read_partition_file(nonexistent_path) + + def test_read_invalid_yaml_file(self): + """Test reading an invalid YAML file raises appropriate error.""" + invalid_yaml = "partitions:\n - name: test\n invalid: {\n" + + with tempfile.NamedTemporaryFile(mode="w", suffix=".yaml", delete=False) as f: + f.write(invalid_yaml) + temp_path = Path(f.name) + + try: + with pytest.raises(yaml.YAMLError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_file_missing_partitions_key(self, invalid_key_config, temp_yaml_file): + """Test reading a file without 'partitions' key raises KeyError.""" + temp_path = temp_yaml_file(invalid_key_config) + + try: + with pytest.raises(KeyError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + def test_read_partition_missing_required_fields( + self, missing_name_config, temp_yaml_file + ): + """Test reading partition with missing required fields.""" + temp_path = temp_yaml_file(missing_name_config) + + try: + with pytest.raises(KeyError): + read_partition_file(temp_path) + finally: + temp_path.unlink() + + @pytest.fixture + def mock_job(self): + """Create a mock job with configurable resources and threads.""" + + def _create_job(threads=1, **resources): + mock_resources = MagicMock() + mock_resources.get.side_effect = lambda key, default=None: resources.get( + key, default + ) + + mock_job = MagicMock() + mock_job.resources = mock_resources + mock_job.threads = threads + mock_job.name = "test_job" + return mock_job + + return _create_job + + @pytest.fixture + def mock_logger(self): + """Create a mock logger.""" + return MagicMock() + + def test_basic_partition_selection_cpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a basic CPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=4, mem_mb=16000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports CPU jobs + assert selected_partition == "default" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'default'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_basic_partition_selection_gpu_job( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test selecting partition for a GPU job.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=2, mem_mb=32000, runtime=300, gpu=2, gpu_model="a100" + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports GPU jobs + assert selected_partition == "gpu" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'gpu'" in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_no_suitable_partition( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test when no partition can accommodate the job requirements.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more memory than any partition allows + job = mock_job(threads=1, mem_mb=500000, runtime=60) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no suitable partition found + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_comprehensive_partition_selection( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with comprehensive limits.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job( + threads=8, + mem_mb=64000, + runtime=1200, + gpu=2, + gpu_model="a100", + constraint="intel,avx2", + ) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select the comprehensive partition + assert selected_partition == "comprehensive" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_constraint_mismatch( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with constraints not available in partition.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires constraint not available in partition + job = mock_job(threads=2, constraint="amd,gpu_direct") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to constraint mismatch + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_mpi_job_selection( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test MPI job partition selection.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mpi=True, tasks=16) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'default' partition as it supports MPI, 'gpu' doesn't + assert selected_partition == "default" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'default'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_gpu_model_mismatch( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job with unsupported GPU model.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Request GPU model not available in any partition + job = mock_job(threads=2, gpu=1, gpu_model="rtx4090") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None due to GPU model mismatch + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_empty_partitions_list( + self, empty_partitions_config, temp_yaml_file, mock_job, mock_logger + ): + """Test partition selection with empty partitions list.""" + temp_path = temp_yaml_file(empty_partitions_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, mem_mb=1000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None when no partitions available + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() + + def test_gres_gpu_specification( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test GPU job specified via gres parameter.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=2, gres="gpu:v100:1", runtime=400) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select 'gpu' partition as it supports v100 GPUs + assert selected_partition == "gpu" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'gpu'" in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_cpus_per_task_specification( + self, comprehensive_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with cpus_per_task specification.""" + temp_path = temp_yaml_file(comprehensive_partition_config) + + try: + partitions = read_partition_file(temp_path) + job = mock_job(threads=1, cpus_per_task=32, mem_mb=64000) + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should select comprehensive partition as it can handle 32 cpus per task + assert selected_partition == "comprehensive" + mock_logger.warning.assert_called_once() + assert ( + "Auto-selected partition 'comprehensive'" + in mock_logger.warning.call_args[0][0] + ) + finally: + temp_path.unlink() + + def test_runtime_exceeds_limit( + self, basic_partition_config, temp_yaml_file, mock_job, mock_logger + ): + """Test job with runtime exceeding partition limits.""" + temp_path = temp_yaml_file(basic_partition_config) + + try: + partitions = read_partition_file(temp_path) + # Job requires more runtime than gpu partition allows (720 min max) + job = mock_job(threads=1, runtime=1000, gpu=1, gpu_model="a100") + + selected_partition = get_best_partition(partitions, job, mock_logger) + + # Should return None as no partition can accommodate the runtime + assert selected_partition is None + mock_logger.warning.assert_called_once() + assert "No suitable partition found" in mock_logger.warning.call_args[0][0] + finally: + temp_path.unlink() \ No newline at end of file From 6f475c3b683f80d7d6157da92b9393cb0b9acb29 Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Fri, 6 Jun 2025 14:04:58 -0700 Subject: [PATCH 2/6] fix: flatten expected yaml structure; remove partition description --- snakemake_executor_plugin_slurm/partitions.py | 13 ++- tests/tests.py | 104 +++++++----------- 2 files changed, 46 insertions(+), 71 deletions(-) diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 12c8d553..89c3edff 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -13,13 +13,15 @@ def read_partition_file(partition_file: Path) -> List["Partition"]: with open(partition_file, "r") as f: out = [] - partitions = yaml.safe_load(f)["partitions"] - for p in partitions: + partitions_dict = yaml.safe_load(f)["partitions"] + for partition_name, partition_config in partitions_dict.items(): + if not partition_name or not partition_name.strip(): + raise KeyError("Partition name cannot be empty") + out.append( Partition( - name=p["name"], - limits=PartitionLimits(**p["limits"]), - description=p["description"], + name=partition_name, + limits=PartitionLimits(**partition_config), ) ) return out @@ -178,7 +180,6 @@ class Partition: name: str limits: PartitionLimits - description: Optional[str] = None def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: """ diff --git a/tests/tests.py b/tests/tests.py index ddfad201..b75fe7c4 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -487,92 +487,69 @@ class TestPartitionSelection: def basic_partition_config(self): """Basic partition configuration with two partitions.""" return { - "partitions": [ - { - "name": "default", - "description": "General purpose compute nodes", - "limits": { - "max_runtime": 1440, - "max_mem_mb": 128000, - "max_cpus_per_task": 32, - "supports_mpi": True, - }, + "partitions": { + "default": { + "max_runtime": 1440, + "max_mem_mb": 128000, + "max_cpus_per_task": 32, + "supports_mpi": True, }, - { - "name": "gpu", - "description": "GPU-enabled nodes", - "limits": { - "max_runtime": 720, - "max_mem_mb": 256000, - "max_gpu": 4, - "available_gpu_models": ["a100", "v100"], - "supports_mpi": False, - }, + "gpu": { + "max_runtime": 720, + "max_mem_mb": 256000, + "max_gpu": 4, + "available_gpu_models": ["a100", "v100"], + "supports_mpi": False, }, - ] + } } @pytest.fixture def minimal_partition_config(self): """Minimal partition configuration.""" - return { - "partitions": [ - { - "name": "minimal", - "description": "Minimal partition", - "limits": {}, - } - ] - } + return {"partitions": {"minimal": {}}} @pytest.fixture def comprehensive_partition_config(self): """Comprehensive partition configuration with all limit types.""" return { - "partitions": [ - { - "name": "comprehensive", - "description": "Partition with all limits", - "limits": { - # Standard resources - "max_runtime": 2880, - "max_mem_mb": 500000, - "max_mem_mb_per_cpu": 8000, - "max_cpus_per_task": 64, - # SLURM-specific resources - "max_nodes": 4, - "max_tasks": 256, - "max_tasks_per_node": 64, - # GPU resources - "max_gpu": 8, - "available_gpu_models": ["a100", "v100", "rtx3090"], - "max_cpus_per_gpu": 16, - # MPI resources - "supports_mpi": True, - "max_mpi_tasks": 512, - # Node features/constraints - "available_constraints": ["intel", "avx2", "highmem"], - }, + "partitions": { + "comprehensive": { + # Standard resources + "max_runtime": 2880, + "max_mem_mb": 500000, + "max_mem_mb_per_cpu": 8000, + "max_cpus_per_task": 64, + # SLURM-specific resources + "max_nodes": 4, + "max_tasks": 256, + "max_tasks_per_node": 64, + # GPU resources + "max_gpu": 8, + "available_gpu_models": ["a100", "v100", "rtx3090"], + "max_cpus_per_gpu": 16, + # MPI resources + "supports_mpi": True, + "max_mpi_tasks": 512, + # Node features/constraints + "available_constraints": ["intel", "avx2", "highmem"], } - ] + } } @pytest.fixture def empty_partitions_config(self): """Empty partitions configuration.""" - return {"partitions": []} + return {"partitions": {}} @pytest.fixture def missing_name_config(self): """Configuration with missing name field.""" return { - "partitions": [ - { - # Missing 'name' field - "description": "Missing name", - "limits": {}, + "partitions": { + "": { # Empty partition name } - ] + } } @pytest.fixture @@ -604,7 +581,6 @@ def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file) # Check first partition assert partitions[0].name == "default" - assert partitions[0].description == "General purpose compute nodes" assert partitions[0].limits.max_runtime == 1440 assert partitions[0].limits.max_mem_mb == 128000 assert partitions[0].limits.max_cpus_per_task == 32 @@ -612,7 +588,6 @@ def test_read_valid_partition_file(self, basic_partition_config, temp_yaml_file) # Check second partition assert partitions[1].name == "gpu" - assert partitions[1].description == "GPU-enabled nodes" assert partitions[1].limits.max_runtime == 720 assert partitions[1].limits.max_gpu == 4 assert partitions[1].limits.available_gpu_models == ["a100", "v100"] @@ -634,7 +609,6 @@ def test_read_minimal_partition_file( assert len(partitions) == 1 assert partitions[0].name == "minimal" - assert partitions[0].description == "Minimal partition" # Check that all limits are inf limits = partitions[0].limits From f8d342ea6116b322922d9453c7df54e3f17b493f Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Fri, 6 Jun 2025 14:10:41 -0700 Subject: [PATCH 3/6] fix: update cli arg name and help --- snakemake_executor_plugin_slurm/__init__.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index de9d2e36..3c026012 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -107,10 +107,21 @@ class ExecutorSettings(ExecutorSettingsBase): "required": False, }, ) - partitions: Optional[Path] = field( + partition_config: Optional[Path] = field( default=None, metadata={ - "help": "Path to YAML file that specifies partitions. See docs for details.", + "help": "Path to YAML file defining partition limits for automatic partition selection. " + "When provided, jobs will be automatically assigned to the best-fitting partition based on " + "their resource requirements. Expected format:\n" + "partitions:\n" + " short:\n" + " max_runtime: 30 # minutes\n" + " max_memory_mb: 4000\n" + " gpu:\n" + " max_runtime: 120\n" + " max_gpu: 2\n" + " available_gpu_models: ['v100', 'a100']\n" + "See documentation for complete list of available limits.", "env_var": False, "required": False, }, @@ -159,8 +170,8 @@ def __post_init__(self, test_mode: bool = False): else Path(".snakemake/slurm_logs").resolve() ) self._partitions = ( - read_partition_file(self.workflow.executor_settings.partitions) - if self.workflow.executor_settings.partitions + read_partition_file(self.workflow.executor_settings.partition_config) + if self.workflow.executor_settings.partition_config else None ) atexit.register(self.clean_old_logs) From f5511ff769b7c79813aeba4d0d9e6974bae8ceff Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Fri, 6 Jun 2025 14:52:58 -0700 Subject: [PATCH 4/6] docs: update docs and help --- docs/further.md | 73 +++++++++++++++++++++ snakemake_executor_plugin_slurm/__init__.py | 9 --- 2 files changed, 73 insertions(+), 9 deletions(-) diff --git a/docs/further.md b/docs/further.md index c0cd20af..6bdeafa0 100644 --- a/docs/further.md +++ b/docs/further.md @@ -64,6 +64,79 @@ See the [snakemake documentation on profiles](https://snakemake.readthedocs.io/e How and where you set configurations on factors like file size or increasing the runtime with every `attempt` of running a job (if [`--retries` is greater than `0`](https://snakemake.readthedocs.io/en/stable/executing/cli.html#snakemake.cli-get_argument_parser-behavior)). [There are detailed examples for these in the snakemake documentation.](https://snakemake.readthedocs.io/en/stable/snakefiles/rules.html#dynamic-resources) +#### Automatic Partition Selection + +The SLURM executor plugin supports automatic partition selection based on job resource requirements, via the command line option `--slurm-partition-config`. This feature allows the plugin to choose the most appropriate partition for each job, without the need to manually specify partitions for different job types. This also enables variable partition selection as a job's resource requirements change based on [dynamic resources](#dynamic-resource-specification), ensuring that jobs are always scheduled to an appropriate partition. + +*Jobs that explicitly specify a `slurm_partition` resource will bypass automatic selection and use the specified partition directly.* + +##### Partition Limits Specification + +To enable automatic partition selection, create a YAML configuration file that defines the available partitions and their resource limits. This file should be structured as follows: + +```yaml +partitions: + some_partition: + max_runtime: 100 + another_partition: + ... +``` +Where `some_partition` and `another_partition` are the names of the partition on your cluster, according to `sinfo`. + +The following limits can be defined for each partition: + +| Parameter | Type | Description | Default | +| ----------------------- | --------- | ---------------------------------- | --------- | +| `max_runtime` | int | Maximum walltime in minutes | unlimited | +| `max_mem_mb` | int | Maximum total memory in MB | unlimited | +| `max_mem_mb_per_cpu` | int | Maximum memory per CPU in MB | unlimited | +| `max_cpus_per_task` | int | Maximum CPUs per task | unlimited | +| `max_nodes` | int | Maximum number of nodes | unlimited | +| `max_tasks` | int | Maximum number of tasks | unlimited | +| `max_tasks_per_node` | int | Maximum tasks per node | unlimited | +| `max_gpu` | int | Maximum number of GPUs | 0 | +| `available_gpu_models` | list[str] | List of available GPU models | none | +| `max_cpus_per_gpu` | int | Maximum CPUs per GPU | unlimited | +| `supports_mpi` | bool | Whether MPI jobs are supported | true | +| `max_mpi_tasks` | int | Maximum MPI tasks | unlimited | +| `available_constraints` | list[str] | List of available node constraints | none | + +##### Example Partition Configuration + +```yaml +partitions: + standard: + max_runtime: 720 # 12 hours + max_mem_mb: 64000 # 64 GB + max_cpus_per_task: 24 + max_nodes: 1 + + highmem: + max_runtime: 1440 # 24 hours + max_mem_mb: 512000 # 512 GB + max_mem_mb_per_cpu: 16000 + max_cpus_per_task: 48 + max_nodes: 1 + + gpu: + max_runtime: 2880 # 48 hours + max_mem_mb: 128000 # 128 GB + max_cpus_per_task: 32 + max_gpu: 8 + available_gpu_models: ["a100", "v100", "rtx3090"] + max_cpus_per_gpu: 8 +``` + +##### How Partition Selection Works + +When automatic partition selection is enabled, the plugin evaluates each job's resource requirements against the defined partition limits to ensure the job is placed on a partition that can accommodate all of its requirements. When multiple partitions are compatible, the plugin uses a scoring algorithm that favors partitions with limits closer to the job's needs, preventing jobs from being assigned to partitions with excessively high resource limits. + +The scoring algorithm calculates a score by summing the ratios of requested resources to partition limits (e.g., if a job requests 8 CPUs and a partition allows 16, this contributes 0.5 to the score). Higher scores indicate better resource utilization, so a job requesting 8 CPUs would prefer a 16-CPU partition (score 0.5) over a 64-CPU partition (score 0.125). + +##### Fallback Behavior + +If no suitable partition is found based on the job's resource requirements, the plugin falls back to the default SLURM behavior, which typically uses the cluster's default partition or any partition specified explicitly in the job's resources. + #### Standard Resources diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 3c026012..1e4e04ac 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -112,15 +112,6 @@ class ExecutorSettings(ExecutorSettingsBase): metadata={ "help": "Path to YAML file defining partition limits for automatic partition selection. " "When provided, jobs will be automatically assigned to the best-fitting partition based on " - "their resource requirements. Expected format:\n" - "partitions:\n" - " short:\n" - " max_runtime: 30 # minutes\n" - " max_memory_mb: 4000\n" - " gpu:\n" - " max_runtime: 120\n" - " max_gpu: 2\n" - " available_gpu_models: ['v100', 'a100']\n" "See documentation for complete list of available limits.", "env_var": False, "required": False, From 01a94c6ac5d491b715380bad7a16027cfd3517b8 Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Sat, 7 Jun 2025 12:51:48 -0700 Subject: [PATCH 5/6] chore: linting; change automatic to dynamic partition selection --- snakemake_executor_plugin_slurm/__init__.py | 10 ++++++---- snakemake_executor_plugin_slurm/partitions.py | 10 +++++----- tests/tests.py | 3 ++- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index 1e4e04ac..f5a3897a 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -110,8 +110,9 @@ class ExecutorSettings(ExecutorSettingsBase): partition_config: Optional[Path] = field( default=None, metadata={ - "help": "Path to YAML file defining partition limits for automatic partition selection. " - "When provided, jobs will be automatically assigned to the best-fitting partition based on " + "help": "Path to YAML file defining partition limits for dynamic " + "partition selection. When provided, jobs will be dynamically " + "assigned to the best-fitting partition based on " "See documentation for complete list of available limits.", "env_var": False, "required": False, @@ -247,7 +248,8 @@ def run_job(self, job: JobExecutorInterface): if job.resources.get("slurm_extra"): self.check_slurm_extra(job) - # NOTE removed partition from below, such that partition selection can benefit from resource checking as the call is built up. + # NOTE removed partition from below, such that partition + # selection can benefit from resource checking as the call is built up. job_params = { "run_uuid": self.run_uuid, "slurm_logfile": slurm_logfile, @@ -396,7 +398,7 @@ async def check_active_jobs( # We use this sacct syntax for argument 'starttime' to keep it compatible # with slurm < 20.11 - sacct_starttime = f"{datetime.now() - timedelta(days = 2):%Y-%m-%dT%H:00}" + sacct_starttime = f"{datetime.now() - timedelta(days=2):%Y-%m-%dT%H:00}" # previously we had # f"--starttime now-2days --endtime now --name {self.run_uuid}" # in line 218 - once v20.11 is definitively not in use any more, diff --git a/snakemake_executor_plugin_slurm/partitions.py b/snakemake_executor_plugin_slurm/partitions.py index 89c3edff..834d2467 100644 --- a/snakemake_executor_plugin_slurm/partitions.py +++ b/snakemake_executor_plugin_slurm/partitions.py @@ -72,7 +72,7 @@ def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str if "gpu" in gres and gpu_required: raise WorkflowError( - "GPU resource specified in both 'gpu' and 'gres'. These are mutually exclusive." + "GPU resource specified in both 'gpu' and 'gres'. These are mutually exclusive." # noqa: E501 ) if gpu_required: @@ -92,7 +92,7 @@ def parse_gpu_requirements(job: JobExecutorInterface) -> tuple[int, Optional[str def get_job_cpu_requirement(job: JobExecutorInterface) -> tuple[int, str]: """ - This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. + This uses the same logic as snakemake_executor_plugin_slurm_jobstep.get_cpu_setting, but returns a tuple instead of a arg string. # noqa: E501 """ gpu_required = job.resources.get("gpu", 0) @@ -187,10 +187,10 @@ def score_job_fit(self, job: JobExecutorInterface) -> Optional[float]: Calculate a score for how well a partition fits the job requirements """ - # try to score how closely a job matches a partition's limits, in order to handle case where multiple partitions can run a given job - # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources + # try to score how closely a job matches a partition's limits, in order to handle case where multiple partitions can run a given job # noqa: E501 + # naive approach here is to just sum the ratio of requested resource to limit, of course this limits us to only consider numerical resources # noqa: E501 # here a higher score indicates a better fit - # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. + # TODO decide how to handle unspecified limits, for now we assume inf for numerical limits, none for others. # noqa: E501 score = 0.0 diff --git a/tests/tests.py b/tests/tests.py index b75fe7c4..41a81e6b 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -482,6 +482,7 @@ def test_wildcard_slash_replacement(self): # Verify no slashes remain in the wildcard string assert "/" not in wildcard_str + class TestPartitionSelection: @pytest.fixture def basic_partition_config(self): @@ -971,4 +972,4 @@ def test_runtime_exceeds_limit( mock_logger.warning.assert_called_once() assert "No suitable partition found" in mock_logger.warning.call_args[0][0] finally: - temp_path.unlink() \ No newline at end of file + temp_path.unlink() From bb8c47192bacd9baf343038faf21d88d623b99fc Mon Sep 17 00:00:00 2001 From: Cade Mirchandani Date: Sat, 7 Jun 2025 16:01:27 -0700 Subject: [PATCH 6/6] chore: format tests --- tests/tests.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tests/tests.py b/tests/tests.py index 41a81e6b..18a0d6f8 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -546,12 +546,7 @@ def empty_partitions_config(self): @pytest.fixture def missing_name_config(self): """Configuration with missing name field.""" - return { - "partitions": { - "": { # Empty partition name - } - } - } + return {"partitions": {"": {}}} # Empty partition name @pytest.fixture def invalid_key_config(self):