Skip to content

Commit 1d663e8

Browse files
committed
feat: add option to validate k8s spec (#1152)
1 parent 1d26b39 commit 1d663e8

File tree

2 files changed

+138
-4
lines changed

2 files changed

+138
-4
lines changed

torchx/schedulers/kubernetes_scheduler.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ def app_to_resource(
369369
queue: str,
370370
service_account: Optional[str],
371371
priority_class: Optional[str] = None,
372-
) -> Dict[str, object]:
372+
) -> Dict[str, Any]:
373373
"""
374374
app_to_resource creates a volcano job kubernetes resource definition from
375375
the provided AppDef. The resource definition can be used to launch the
@@ -444,7 +444,7 @@ def app_to_resource(
444444
if priority_class is not None:
445445
job_spec["priorityClassName"] = priority_class
446446

447-
resource: Dict[str, object] = {
447+
resource: Dict[str, Any] = {
448448
"apiVersion": "batch.volcano.sh/v1alpha1",
449449
"kind": "Job",
450450
"metadata": {"name": f"{unique_app_id}"},
@@ -456,7 +456,7 @@ def app_to_resource(
456456
@dataclass
457457
class KubernetesJob:
458458
images_to_push: Dict[str, Tuple[str, str]]
459-
resource: Dict[str, object]
459+
resource: Dict[str, Any]
460460

461461
def __str__(self) -> str:
462462
return yaml.dump(sanitize_for_serialization(self.resource))
@@ -471,6 +471,7 @@ class KubernetesOpts(TypedDict, total=False):
471471
image_repo: Optional[str]
472472
service_account: Optional[str]
473473
priority_class: Optional[str]
474+
validate_spec: Optional[bool]
474475

475476

476477
class KubernetesScheduler(
@@ -659,6 +660,36 @@ def _submit_dryrun(
659660
), "priority_class must be a str"
660661

661662
resource = app_to_resource(app, queue, service_account, priority_class)
663+
664+
if cfg.get("validate_spec"):
665+
try:
666+
self._custom_objects_api().create_namespaced_custom_object(
667+
group="batch.volcano.sh",
668+
version="v1alpha1",
669+
namespace=cfg.get("namespace") or "default",
670+
plural="jobs",
671+
body=resource,
672+
dry_run="All",
673+
)
674+
except Exception as e:
675+
from kubernetes.client.rest import ApiException
676+
677+
if isinstance(e, ApiException):
678+
raise ValueError(f"Invalid job spec: {e.reason}") from e
679+
raise
680+
681+
job_name = resource["metadata"]["name"]
682+
for task in resource["spec"]["tasks"]:
683+
task_name = task["name"]
684+
replicas = task.get("replicas", 1)
685+
max_index = replicas - 1
686+
pod_name = f"{job_name}-{task_name}-{max_index}"
687+
if len(pod_name) > 63:
688+
raise ValueError(
689+
f"Pod name '{pod_name}' ({len(pod_name)} chars) exceeds 63 character limit. "
690+
f"Shorten app.name or role names"
691+
)
692+
662693
req = KubernetesJob(
663694
resource=resource,
664695
images_to_push=images_to_push,
@@ -703,6 +734,12 @@ def _run_opts(self) -> runopts:
703734
type_=str,
704735
help="The name of the PriorityClass to set on the job specs",
705736
)
737+
opts.add(
738+
"validate_spec",
739+
type_=bool,
740+
help="Validate job spec using Kubernetes API dry-run before submission",
741+
default=True,
742+
)
706743
return opts
707744

708745
def describe(self, app_id: str) -> Optional[DescribeAppResponse]:

torchx/schedulers/test/kubernetes_scheduler_test.py

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import sys
1212
import unittest
1313
from datetime import datetime
14-
from typing import Any, Dict
14+
from typing import Any, cast, Dict
1515
from unittest.mock import MagicMock, patch
1616

1717
import torchx
@@ -726,6 +726,7 @@ def test_runopts(self) -> None:
726726
"image_repo",
727727
"service_account",
728728
"priority_class",
729+
"validate_spec",
729730
},
730731
)
731732

@@ -929,6 +930,102 @@ def test_min_replicas(self) -> None:
929930
]
930931
self.assertEqual(min_available, [1, 1, 0])
931932

933+
@patch(
934+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
935+
)
936+
def test_validate_spec_invalid_name(self, mock_api: MagicMock) -> None:
937+
from kubernetes.client.rest import ApiException
938+
939+
scheduler = create_scheduler("test")
940+
app = _test_app()
941+
app.name = "Invalid_Name"
942+
943+
mock_api_instance = MagicMock()
944+
mock_api_instance.create_namespaced_custom_object.side_effect = ApiException(
945+
status=422,
946+
reason="Invalid",
947+
)
948+
mock_api.return_value = mock_api_instance
949+
950+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
951+
952+
with self.assertRaises(ValueError) as ctx:
953+
scheduler.submit_dryrun(app, cfg)
954+
955+
self.assertIn("Invalid job spec", str(ctx.exception))
956+
mock_api_instance.create_namespaced_custom_object.assert_called_once()
957+
call_kwargs = mock_api_instance.create_namespaced_custom_object.call_args[1]
958+
self.assertEqual(call_kwargs["dry_run"], "All")
959+
960+
def test_validate_spec_enabled_by_default(self) -> None:
961+
scheduler = create_scheduler("test")
962+
app = _test_app()
963+
964+
cfg = KubernetesOpts({"queue": "testqueue"})
965+
966+
with patch(
967+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
968+
) as mock_api:
969+
mock_api_instance = MagicMock()
970+
mock_api_instance.create_namespaced_custom_object.return_value = {}
971+
mock_api.return_value = mock_api_instance
972+
973+
info = scheduler.submit_dryrun(app, cfg)
974+
975+
self.assertIsNotNone(info)
976+
mock_api_instance.create_namespaced_custom_object.assert_called_once()
977+
call_kwargs = mock_api_instance.create_namespaced_custom_object.call_args[1]
978+
self.assertEqual(call_kwargs["dry_run"], "All")
979+
980+
@patch(
981+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
982+
)
983+
def test_validate_spec_invalid_task_name(self, mock_api: MagicMock) -> None:
984+
from kubernetes.client.rest import ApiException
985+
986+
scheduler = create_scheduler("test")
987+
app = _test_app()
988+
app.roles[0].name = "Invalid-Task-Name"
989+
990+
mock_api_instance = MagicMock()
991+
mock_api_instance.create_namespaced_custom_object.side_effect = ApiException(
992+
status=422,
993+
reason="Invalid",
994+
)
995+
mock_api.return_value = mock_api_instance
996+
997+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
998+
999+
with self.assertRaises(ValueError) as ctx:
1000+
scheduler.submit_dryrun(app, cfg)
1001+
1002+
self.assertIn("Invalid job spec", str(ctx.exception))
1003+
1004+
@patch(
1005+
"torchx.schedulers.kubernetes_scheduler.KubernetesScheduler._custom_objects_api"
1006+
)
1007+
def test_validate_spec_long_pod_name(self, mock_api: MagicMock) -> None:
1008+
scheduler = create_scheduler("test")
1009+
app = _test_app()
1010+
app.name = "x" * 50
1011+
app.roles[0].name = "y" * 20
1012+
1013+
mock_api_instance = MagicMock()
1014+
mock_api_instance.create_namespaced_custom_object.return_value = {}
1015+
mock_api.return_value = mock_api_instance
1016+
1017+
cfg = cast(KubernetesOpts, {"queue": "testqueue", "validate_spec": True})
1018+
1019+
with patch(
1020+
"torchx.schedulers.kubernetes_scheduler.make_unique"
1021+
) as make_unique_ctx:
1022+
make_unique_ctx.return_value = "x" * 50
1023+
with self.assertRaises(ValueError) as ctx:
1024+
scheduler.submit_dryrun(app, cfg)
1025+
1026+
self.assertIn("Pod name", str(ctx.exception))
1027+
self.assertIn("exceeds 63 character limit", str(ctx.exception))
1028+
9321029

9331030
class KubernetesSchedulerNoImportTest(unittest.TestCase):
9341031
"""

0 commit comments

Comments
 (0)