diff --git a/dlrover/python/common/constants.py b/dlrover/python/common/constants.py index f2730f5f2c..f4d6c738a6 100644 --- a/dlrover/python/common/constants.py +++ b/dlrover/python/common/constants.py @@ -26,6 +26,7 @@ class PlatformType(object): RAY = "ray" PY_KUBERNETES = "pyk8s" LOCAL = "local" + VOLCANO = "volcano" class CommunicationType(object): diff --git a/dlrover/python/elastic_agent/torch/training.py b/dlrover/python/elastic_agent/torch/training.py index cbb5652dff..c1c6c8087c 100644 --- a/dlrover/python/elastic_agent/torch/training.py +++ b/dlrover/python/elastic_agent/torch/training.py @@ -114,6 +114,7 @@ version_less_than_230, version_less_than_240, version_less_than_280, + is_run_in_volcano ) _agent_evt = DLRoverAgentEvent().singleton_instance() @@ -458,7 +459,12 @@ def _check_network_rdzv(self): ) if num < 0: - raise JobStoppingError("Exit rendezvous when job is stopping") + if is_run_in_volcano(): + # For volcano elastic job, some pod may be started after job is finished. + logger.info("Job is stopping, exit rendezvous.") + sys.exit(0) + else: + raise JobStoppingError("Exit rendezvous when job is stopping") def _report_failure(self, err_msg, level): if self._node_rank == 0: diff --git a/dlrover/python/master/args.py b/dlrover/python/master/args.py index a0bb002978..31c50e9f25 100644 --- a/dlrover/python/master/args.py +++ b/dlrover/python/master/args.py @@ -13,6 +13,7 @@ import argparse +from dlrover.python.common.constants import DistributionStrategy, OptimizeMode from dlrover.python.common.global_context import DefaultValues from dlrover.python.common.log import default_logger as logger from dlrover.python.util.args_util import parse_tuple_list, pos_int @@ -104,6 +105,18 @@ def _build_master_args_parser(): type=pos_int, help="The timeout value of worker task process(For PS type job).", ) + parser.add_argument( + "--distribution_strategy", + default=DistributionStrategy.ALLREDUCE, + type=str, + help="distribution strategy", + ) + parser.add_argument( + "--optimize_mode", + default=OptimizeMode.SINGLE_JOB, + type=str, + help="optimize mode", + ) return parser diff --git a/dlrover/python/master/main.py b/dlrover/python/master/main.py index 6fe744b274..f7b8780f06 100644 --- a/dlrover/python/master/main.py +++ b/dlrover/python/master/main.py @@ -81,6 +81,10 @@ def run(args): else: from dlrover.python.master.dist_master import DistributedJobMaster + if job_args.platform == PlatformType.VOLCANO: + job_args.distribution_strategy = args.distribution_strategy + job_args.optimize_mode = args.optimize_mode + update_context(job_args) master = DistributedJobMaster(_dlrover_context.master_port, job_args) master.prepare() diff --git a/dlrover/python/master/scaler/factory.py b/dlrover/python/master/scaler/factory.py index 90ff80a0fa..6c475175f1 100644 --- a/dlrover/python/master/scaler/factory.py +++ b/dlrover/python/master/scaler/factory.py @@ -29,3 +29,7 @@ def new_job_scaler(platform, job_name, namespace): return ActorScaler(job_name, namespace) elif platform == PlatformType.LOCAL: return None + elif platform == PlatformType.VOLCANO: + from dlrover.python.master.scaler.volcano_scaler import VolcanoScaler + + return VolcanoScaler(job_name, namespace) diff --git a/dlrover/python/master/scaler/volcano_scaler.py b/dlrover/python/master/scaler/volcano_scaler.py new file mode 100644 index 0000000000..08427df0e8 --- /dev/null +++ b/dlrover/python/master/scaler/volcano_scaler.py @@ -0,0 +1,26 @@ +# Copyright 2022 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dlrover.python.master.scaler.base_scaler import Scaler, ScalePlan + +# Do nothing for VolcanoScaler, use volcano to scaling. +class VolcanoScaler(Scaler): + def __init__(self, job_name, namespace): + super(VolcanoScaler, self).__init__(job_name) + self._namespace = namespace + + def start(self): + pass + + def scale(self, plan: ScalePlan): + pass diff --git a/dlrover/python/master/watcher/factory.py b/dlrover/python/master/watcher/factory.py index cf019090d2..78a90a450e 100644 --- a/dlrover/python/master/watcher/factory.py +++ b/dlrover/python/master/watcher/factory.py @@ -17,7 +17,7 @@ def new_node_watcher(platform, job_name, namespace="default"): logger.info("New %s Node Watcher", platform) - if platform in (PlatformType.KUBERNETES, PlatformType.PY_KUBERNETES): + if platform in (PlatformType.KUBERNETES, PlatformType.PY_KUBERNETES, PlatformType.VOLCANO): from dlrover.python.master.watcher.k8s_watcher import PodWatcher return PodWatcher(job_name, namespace) @@ -43,6 +43,12 @@ def new_scale_plan_watcher(platform, job_name, namespace, job_uuid): ) return RayScalePlanWatcher(job_name, namespace, job_uuid) + elif platform in (PlatformType.VOLCANO): + from dlrover.python.master.watcher.volcano_watcher import ( + VolcanoScalePlanWatcher, + ) + + return VolcanoScalePlanWatcher(job_name, namespace, job_uuid) else: raise ValueError("Not support engine %s", platform) diff --git a/dlrover/python/master/watcher/volcano_watcher.py b/dlrover/python/master/watcher/volcano_watcher.py new file mode 100644 index 0000000000..c770039ed7 --- /dev/null +++ b/dlrover/python/master/watcher/volcano_watcher.py @@ -0,0 +1,25 @@ +# Copyright 2022 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time + +# Do nothing for VolcanoScalePlanWatcher, use volcano to scaling. +class VolcanoScalePlanWatcher: + def __init__(self, job_name, namespace, job_uuid): + self.job_name = job_name + self.namespace = namespace + self.job_uuid = job_uuid + + def watch(self): + while True: + time.sleep(1000) + yield None \ No newline at end of file diff --git a/dlrover/python/scheduler/factory.py b/dlrover/python/scheduler/factory.py index eeaf8a3362..2b1ddb08c0 100644 --- a/dlrover/python/scheduler/factory.py +++ b/dlrover/python/scheduler/factory.py @@ -26,6 +26,9 @@ def new_elastic_job(platform, job_name, namespace): from dlrover.python.scheduler.ray import RayElasticJob return RayElasticJob(job_name, namespace) + elif platform in (PlatformType.VOLCANO): + from dlrover.python.scheduler.volcano import VolcanoElasticJob + return VolcanoElasticJob(job_name, namespace) else: raise ValueError("Not support engine %s", platform) @@ -42,5 +45,9 @@ def new_job_args(platform, job_name, namespace): return RayJobArgs(platform, namespace, job_name) elif platform == PlatformType.LOCAL: return LocalJobArgs(platform, namespace, job_name) + elif platform == PlatformType.VOLCANO: + from dlrover.python.scheduler.volcano import VolcanoJobArgs + + return VolcanoJobArgs(platform, namespace, job_name) else: raise ValueError("Not support platform %s", platform) diff --git a/dlrover/python/scheduler/volcano.py b/dlrover/python/scheduler/volcano.py new file mode 100644 index 0000000000..3464960c12 --- /dev/null +++ b/dlrover/python/scheduler/volcano.py @@ -0,0 +1,99 @@ +# Copyright 2022 The DLRover Authors. All rights reserved. +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time + +from dlrover.python.common.node import NodeGroupResource, NodeResource + +from dlrover.python.common.constants import ( + NodeType, NodeEnv, +) +from dlrover.python.scheduler.job import ElasticJob, JobArgs, NodeArgs +from dlrover.python.scheduler.kubernetes import k8sClient, convert_cpu_to_decimal, convert_memory_to_mb, \ + _dlrover_context +from dlrover.python.common.log import default_logger as logger + +class VolcanoElasticJob(ElasticJob): + def __init__(self, job_name, namespace): + self._namespace = namespace + self._job_name = job_name + + def get_node_name(self, type, id): + return "pod-name" + + def get_node_service_addr(self, type, id): + return "" + +class VolcanoJobArgs(JobArgs): + def __init__(self, platform, namespace, job_name): + super(VolcanoJobArgs, self).__init__(platform, namespace, job_name) + + def initilize(self): + self.user = os.getenv("USER", "") + k8s_client = k8sClient.singleton_instance(self.namespace) + + self.job_uuid = os.getenv(NodeEnv.JOB_UID, "") + # Get parameters from volcano job + vcjob = self._retry_to_get_vcjob(k8s_client) + for task in vcjob["spec"]["tasks"]: + if task["name"] == NodeType.WORKER: + restart_policy = task["template"]["spec"].get( + "restartPolicy", "" + ) + self.relaunch_always = restart_policy == "Always" + + num = int(task.get("replicas", 0)) + assert len(task["template"]["spec"]["containers"]) == 1 + container = task["template"]["spec"]["containers"][0] + resources = container.get("resources", {}) + requests = resources.get("requests", {}) + cpu = convert_cpu_to_decimal(requests.get("cpu", 0)) + if "memory" in requests: + memory = convert_memory_to_mb(requests["memory"]) + else: + memory = 0 + gpu_type = None + gpu_num = 0 + for k, v in requests.items(): + if "nvidia.com" in k: + gpu_type = k + gpu_num = int(v) + group_resource = NodeGroupResource( + num, + NodeResource( + cpu=cpu, + memory=memory, + gpu_type=gpu_type, + gpu_num=gpu_num, + ), + ) + self.node_args[task["name"]] = NodeArgs( + group_resource, + process_timeout = _dlrover_context.seconds_to_timeout_task_process, + ) + logger.info("Job args = %s", self.__dict__) + + def _retry_to_get_vcjob(self, k8s_client: k8sClient): + for _ in range(3): + vcjob = k8s_client.get_custom_resource( + name=self.job_name, + group="batch.volcano.sh", + version="v1alpha1", + plural="jobs", + ) + if vcjob: + return vcjob + else: + time.sleep(5) + raise ValueError("Cannot get the training volcano job %s" % self.job_name) diff --git a/dlrover/trainer/torch/elastic_run.py b/dlrover/trainer/torch/elastic_run.py index fe234dc779..b79873dbc3 100644 --- a/dlrover/trainer/torch/elastic_run.py +++ b/dlrover/trainer/torch/elastic_run.py @@ -109,7 +109,7 @@ JobConstant, NodeEnv, NodeEventType, - PreCheckStatus, + PreCheckStatus, NodeType, ) from dlrover.python.common.log import default_logger as logger from dlrover.python.elastic_agent.master_client import MasterClient @@ -118,7 +118,7 @@ launch_agent, ) from dlrover.python.training_event import DLRoverAgentEvent -from dlrover.trainer.torch.utils import version_less_than_230 +from dlrover.trainer.torch.utils import version_less_than_230, is_run_in_volcano def parse_args(args): @@ -216,6 +216,41 @@ def parse_args(args): ) return parser.parse_args(args) +# Require svc and env plugin for volcano job. +# And the mater task name must be 'master'. the worker name must be 'worker' +def setup_env_for_volcano(): + if not is_run_in_volcano(): + return + os.environ['GRPC_ENABLE_FORK'] = 'false' + os.environ['NODE_TYPE'] = 'worker' + vcMasterHosts = os.environ.get('VC_MASTER_HOSTS', "") + if vcMasterHosts == "": + raise RuntimeError("VC_MASTER_HOSTS env is not set, you must use" + "master as task name for dlrover master.") + if len(vcMasterHosts.split(',')) != 1: + raise RuntimeError(f"The length of env VC_MASTER_HOSTS {vcMasterHosts} should be 1.") + dlroverMasterPort = os.environ.get('DLROVER_MASTER_PORT', "") + if dlroverMasterPort == "": + raise RuntimeError("DLROVER_MASTER_PORT env is not set, you must use") + os.environ['DLROVER_MASTER_ADDR'] = f"{vcMasterHosts}:{dlroverMasterPort}" + if os.environ.get('VC_TASK_INDEX', -1) == -1: + raise RuntimeError("VC_TASK_INDEX env is not set.") + # NODE_ID, NODE_RANK, RANK are same with VC_TASK_INDEX in volcano + os.environ['NODE_ID'] = os.environ.get('VC_TASK_INDEX') + os.environ['NODE_RANK'] = os.environ.get('VC_TASK_INDEX') + os.environ['RANK'] = os.environ.get('VC_TASK_INDEX') + if os.environ.get('VC_WORKER_NUM', -1) == -1: + raise RuntimeError("VC_WORKER_NUM env is not set, you must use " + "worker as task name for dlrover worker.") + # NODE_ID, WORLD_SIZE are same with VC_WORKER_NUM in volcano + os.environ['NODE_NUM'] = os.environ.get('VC_WORKER_NUM') + os.environ['WORLD_SIZE'] = os.environ.get('VC_WORKER_NUM') + os.environ['DLROVER_MASTER_SERVICE_TYPE'] = 'grpc' + # worker related env + os.environ['WORKER_TYPE'] = NodeType.WORKER + os.environ['WORKER_ID'] = os.environ.get('VC_TASK_INDEX') + os.environ['WORKER_RANK'] = os.environ.get('VC_TASK_INDEX') + os.environ['WORKER_NUM'] = os.environ.get('VC_WORKER_NUM') class ElasticLaunch: """ @@ -518,6 +553,9 @@ def _check_to_use_dlrover_run(job_name, is_standalone=False): def run(args): + # reconfigure env for volcano + setup_env_for_volcano() + # export event for dlrover agent agent = DLRoverAgentEvent.singleton_instance() agent.start(pid=vars(args)) diff --git a/dlrover/trainer/torch/utils.py b/dlrover/trainer/torch/utils.py index 0a29d1c262..3c6d7b0166 100644 --- a/dlrover/trainer/torch/utils.py +++ b/dlrover/trainer/torch/utils.py @@ -10,6 +10,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import os import torch from packaging import version @@ -28,3 +29,7 @@ def version_less_than_240(): def version_less_than_280(): current_version = version.parse(torch.__version__).base_version return version.parse(current_version) <= version.parse("2.7.1") + +def is_run_in_volcano(): + # Use volcano environment plugins to check if we are running in volcano + return os.environ.get('VC_TASK_INDEX', -1) != -1