diff --git a/snakemake_executor_plugin_slurm/__init__.py b/snakemake_executor_plugin_slurm/__init__.py index badda89a..e187b96d 100644 --- a/snakemake_executor_plugin_slurm/__init__.py +++ b/snakemake_executor_plugin_slurm/__init__.py @@ -141,6 +141,7 @@ def __post_init__(self, test_mode: bool = False): self.test_mode = test_mode self.run_uuid = str(uuid.uuid4()) self.logger.info(f"SLURM run ID: {self.run_uuid}") + self._failed_nodes = [] self._fallback_account_arg = None self._fallback_partition = None self._preemption_warning = False # no preemption warning has been issued @@ -279,6 +280,22 @@ def run_job(self, job: JobExecutorInterface): "Probably not what you want." ) + # we need to set cpus-per-task OR cpus-per-gpu, the function + # will return a string with the corresponding value + call += f" {get_cpu_setting(job, gpu_job)}" + if job.resources.get("slurm_extra"): + self.check_slurm_extra(job) + call += f" {job.resources.slurm_extra}" + + # if the workflow encountered any failed jobs, due to node failures, + # we now exclude these nodes from the job submission + if self._failed_nodes: + call += f" --exclude={','.join(self._failed_nodes)}" + self.logger.debug( + f"Excluding the following nodes from job submission: " + f"{','.join(self._failed_nodes)}" + ) + exec_job = self.format_job_exec(job) # and finally the job to execute with all the snakemake parameters @@ -496,6 +513,11 @@ async def check_active_jobs( j, msg=msg, aux_logs=[j.aux["slurm_logfile"]._str] ) active_jobs_seen_by_sacct.remove(j.external_jobid) + if status == "NODE_FAIL": + # get the node from the job which failed + # and add it to the list of failed nodes + node = j.aux["slurm_logfile"].parent.parent.name + self._failed_nodes.append(node) else: # still running? yield j