Skip to content

Commit 1ff6927

Browse files
feat: use k8s job API and improve status check robustness in case of injected containers (#43)
Sometimes, just checking the status of a job is not enough, because apparently, depending on the cluster setup, there can be additional containers injected into pods that will prevent the job to detect that a pod is already terminated. This PR therefore checks the status of the snakemake container in addition to the job status. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Switched the execution mechanism from Kubernetes pods to jobs, leading to enhanced task reliability. - Standardized container naming for greater consistency. - **Improvements** - Upgraded error handling and log retrieval for smoother operation. - Refined cleanup procedures to ensure more dependable job management. - Enhanced resource management with updated toleration handling for GPU nodes. - Introduced a new class for improved failure policy management. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 92375e6 commit 1ff6927

File tree

1 file changed

+71
-31
lines changed

1 file changed

+71
-31
lines changed

snakemake_executor_plugin_kubernetes/__init__.py

Lines changed: 71 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -167,12 +167,12 @@ def run_job(self, job: JobExecutorInterface):
167167
get_uuid(f"{self.run_namespace}-{job.jobid}-{job.attempt}")
168168
)
169169

170-
body = kubernetes.client.V1Pod()
170+
body = kubernetes.client.V1Job()
171171
body.metadata = kubernetes.client.V1ObjectMeta(labels={"app": "snakemake"})
172172
body.metadata.name = jobid
173173

174174
# Container setup
175-
container = kubernetes.client.V1Container(name=jobid)
175+
container = kubernetes.client.V1Container(name="snakemake")
176176
container.image = self.container_image
177177
container.command = shlex.split("/bin/sh")
178178
container.args = ["-c", exec_job]
@@ -196,9 +196,13 @@ def run_job(self, job: JobExecutorInterface):
196196
self.logger.debug(f"Set node selector for machine type: {node_selector}")
197197

198198
# Initialize PodSpec
199-
body.spec = kubernetes.client.V1PodSpec(
199+
pod_spec = kubernetes.client.V1PodSpec(
200200
containers=[container], node_selector=node_selector, restart_policy="Never"
201201
)
202+
body.spec = kubernetes.client.V1JobSpec(
203+
backoff_limit=0,
204+
template=kubernetes.client.V1PodTemplateSpec(spec=pod_spec),
205+
)
202206

203207
# Add toleration for GPU nodes if GPU is requested
204208
if "gpu" in resources_dict:
@@ -212,9 +216,9 @@ def run_job(self, job: JobExecutorInterface):
212216
manufacturer_lc = manufacturer.lower()
213217
if manufacturer_lc == "nvidia":
214218
# Toleration for nvidia.com/gpu
215-
if body.spec.tolerations is None:
216-
body.spec.tolerations = []
217-
body.spec.tolerations.append(
219+
if pod_spec.tolerations is None:
220+
pod_spec.tolerations = []
221+
pod_spec.tolerations.append(
218222
kubernetes.client.V1Toleration(
219223
key="nvidia.com/gpu",
220224
operator="Equal",
@@ -223,14 +227,14 @@ def run_job(self, job: JobExecutorInterface):
223227
)
224228
)
225229
self.logger.debug(
226-
f"Added toleration for NVIDIA GPU: {body.spec.tolerations}"
230+
f"Added toleration for NVIDIA GPU: {pod_spec.tolerations}"
227231
)
228232

229233
elif manufacturer_lc == "amd":
230234
# Toleration for amd.com/gpu
231-
if body.spec.tolerations is None:
232-
body.spec.tolerations = []
233-
body.spec.tolerations.append(
235+
if pod_spec.tolerations is None:
236+
pod_spec.tolerations = []
237+
pod_spec.tolerations.append(
234238
kubernetes.client.V1Toleration(
235239
key="amd.com/gpu",
236240
operator="Equal",
@@ -239,7 +243,7 @@ def run_job(self, job: JobExecutorInterface):
239243
)
240244
)
241245
self.logger.debug(
242-
f"Added toleration for AMD GPU: {body.spec.tolerations}"
246+
f"Added toleration for AMD GPU: {pod_spec.tolerations}"
243247
)
244248

245249
else:
@@ -273,15 +277,15 @@ def run_job(self, job: JobExecutorInterface):
273277

274278
# Add service account name if provided
275279
if self.k8s_service_account_name:
276-
body.spec.service_account_name = self.k8s_service_account_name
280+
pod_spec.service_account_name = self.k8s_service_account_name
277281
self.logger.debug(
278282
f"Set service account name: {self.k8s_service_account_name}"
279283
)
280284

281285
# Workdir volume
282286
workdir_volume = kubernetes.client.V1Volume(name="workdir")
283287
workdir_volume.empty_dir = kubernetes.client.V1EmptyDirVolumeSource()
284-
body.spec.volumes = [workdir_volume]
288+
pod_spec.volumes = [workdir_volume]
285289

286290
for pvc in self.persistent_volumes:
287291
volume = kubernetes.client.V1Volume(name=pvc.name)
@@ -290,7 +294,7 @@ def run_job(self, job: JobExecutorInterface):
290294
claim_name=pvc.name
291295
)
292296
)
293-
body.spec.volumes.append(volume)
297+
pod_spec.volumes.append(volume)
294298

295299
# Env vars
296300
container.env = []
@@ -378,7 +382,7 @@ def run_job(self, job: JobExecutorInterface):
378382
# Try creating the pod with exception handling
379383
try:
380384
pod = self._kubernetes_retry(
381-
lambda: self.kubeapi.create_namespaced_pod(self.namespace, body)
385+
lambda: self.batchapi.create_namespaced_job(self.namespace, body)
382386
)
383387
except kubernetes.client.rest.ApiException as e:
384388
self.logger.error(f"Failed to create pod: {e}")
@@ -416,7 +420,7 @@ async def check_active_jobs(
416420
async with self.status_rate_limiter:
417421
try:
418422
res = self._kubernetes_retry(
419-
lambda: self.kubeapi.read_namespaced_pod_status(
423+
lambda: self.batchapi.read_namespaced_job_status(
420424
j.external_jobid, self.namespace
421425
)
422426
)
@@ -436,34 +440,66 @@ async def check_active_jobs(
436440
self.report_job_error(j, msg=str(e))
437441
continue
438442

443+
# Sometimes, just checking the status of a job is not enough, because
444+
# apparently, depending on the cluster setup, there can be additional
445+
# containers injected into pods that will prevent the job to detect
446+
# that a pod is already terminated.
447+
# We therefore check the status of the snakemake container in addition
448+
# to the job status.
449+
pods = self.kubeapi.list_namespaced_pod(
450+
namespace=self.namespace,
451+
label_selector=f"job-name={j.external_jobid}",
452+
)
453+
assert len(pods.items) <= 1
454+
if pods.items:
455+
pod = pods.items[0]
456+
snakemake_container = [
457+
container
458+
for container in pod.status.container_statuses
459+
if container.name == "snakemake"
460+
][0]
461+
snakemake_container_exit_code = (
462+
snakemake_container.state.terminated.exit_code
463+
if snakemake_container.state.terminated is not None
464+
else None
465+
)
466+
else:
467+
snakemake_container = None
468+
snakemake_container_exit_code = None
469+
439470
if res is None:
440471
msg = (
441-
"Unknown pod {jobid}. Has the pod been deleted manually?"
472+
"Unknown job {jobid}. Has the job been deleted manually?"
442473
).format(jobid=j.external_jobid)
443474
self.logger.error(msg)
444475
self.report_job_error(j, msg=msg)
445-
elif res.status.phase == "Failed":
476+
elif res.status.failed == 1 or (
477+
snakemake_container_exit_code is not None
478+
and snakemake_container_exit_code != 0
479+
):
446480
msg = (
447481
"For details, please issue:\n"
448-
"kubectl describe pod {jobid}\n"
449-
"kubectl logs {jobid}"
450-
).format(jobid=j.external_jobid)
451-
# failed
452-
kube_log_content = self.kubeapi.read_namespaced_pod_log(
453-
name=j.external_jobid, namespace=self.namespace
482+
f"kubectl describe job {j.external_jobid}\n"
483+
f"kubectl logs {j.external_jobid}"
454484
)
485+
# failed
455486
kube_log = self.log_path / f"{j.external_jobid}.log"
456487
with open(kube_log, "w") as f:
457-
f.write(kube_log_content)
488+
kube_log_content = self.kubeapi.read_namespaced_pod_log(
489+
name=pod.metadata.name,
490+
namespace=self.namespace,
491+
container=snakemake_container.name,
492+
)
493+
print(kube_log_content, file=f)
458494
self.logger.error(f"Job {j.external_jobid} failed. {msg}")
459495
self.report_job_error(j, msg=msg, aux_logs=[str(kube_log)])
460-
elif res.status.phase == "Succeeded":
496+
elif res.status.succeeded == 1 or (snakemake_container_exit_code == 0):
461497
# finished
462498
self.logger.info(f"Job {j.external_jobid} succeeded.")
463499
self.report_job_success(j)
464500

465501
self._kubernetes_retry(
466-
lambda: self.safe_delete_pod(
502+
lambda: self.safe_delete_job(
467503
j.external_jobid, ignore_not_found=True
468504
)
469505
)
@@ -476,7 +512,9 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
476512
# Cancel all active jobs.
477513
for j in active_jobs:
478514
self._kubernetes_retry(
479-
lambda: self.safe_delete_pod(j.external_jobid, ignore_not_found=True)
515+
lambda jobid=j.external_jobid: self.safe_delete_job(
516+
jobid, ignore_not_found=True
517+
)
480518
)
481519

482520
def shutdown(self):
@@ -521,16 +559,18 @@ def unregister_secret(self):
521559
)
522560
)
523561

524-
def safe_delete_pod(self, jobid, ignore_not_found=True):
562+
def safe_delete_job(self, jobid, ignore_not_found=True):
525563
import kubernetes.client
526564

527565
body = kubernetes.client.V1DeleteOptions()
528566
try:
529-
self.kubeapi.delete_namespaced_pod(jobid, self.namespace, body=body)
567+
self.batchapi.delete_namespaced_job(
568+
jobid, self.namespace, propagation_policy="Foreground", body=body
569+
)
530570
except kubernetes.client.rest.ApiException as e:
531571
if e.status == 404 and ignore_not_found:
532572
self.logger.warning(
533-
"[WARNING] 404 not found when trying to delete the pod: {jobid}\n"
573+
"[WARNING] 404 not found when trying to delete the job: {jobid}\n"
534574
"[WARNING] Ignore this error\n".format(jobid=jobid)
535575
)
536576
else:

0 commit comments

Comments
 (0)