Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dlrover/python/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class PlatformType(object):
RAY = "ray"
PY_KUBERNETES = "pyk8s"
LOCAL = "local"
VOLCANO = "volcano"


class CommunicationType(object):
Expand Down
8 changes: 7 additions & 1 deletion dlrover/python/elastic_agent/torch/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
version_less_than_230,
version_less_than_240,
version_less_than_280,
is_run_in_volcano
)

_agent_evt = DLRoverAgentEvent().singleton_instance()
Expand Down Expand Up @@ -458,7 +459,12 @@ def _check_network_rdzv(self):
)

if num < 0:
raise JobStoppingError("Exit rendezvous when job is stopping")
if is_run_in_volcano():
# For volcano elastic job, some pod may be started after job is finished.
logger.info("Job is stopping, exit rendezvous.")
sys.exit(0)
else:
raise JobStoppingError("Exit rendezvous when job is stopping")

def _report_failure(self, err_msg, level):
if self._node_rank == 0:
Expand Down
13 changes: 13 additions & 0 deletions dlrover/python/master/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import argparse

from dlrover.python.common.constants import DistributionStrategy, OptimizeMode
from dlrover.python.common.global_context import DefaultValues
from dlrover.python.common.log import default_logger as logger
from dlrover.python.util.args_util import parse_tuple_list, pos_int
Expand Down Expand Up @@ -104,6 +105,18 @@ def _build_master_args_parser():
type=pos_int,
help="The timeout value of worker task process(For PS type job).",
)
parser.add_argument(
"--distribution_strategy",
default=DistributionStrategy.ALLREDUCE,
type=str,
help="distribution strategy",
)
parser.add_argument(
"--optimize_mode",
default=OptimizeMode.SINGLE_JOB,
type=str,
help="optimize mode",
)
return parser


Expand Down
4 changes: 4 additions & 0 deletions dlrover/python/master/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ def run(args):
else:
from dlrover.python.master.dist_master import DistributedJobMaster

if job_args.platform == PlatformType.VOLCANO:
job_args.distribution_strategy = args.distribution_strategy
job_args.optimize_mode = args.optimize_mode

update_context(job_args)
master = DistributedJobMaster(_dlrover_context.master_port, job_args)
master.prepare()
Expand Down
4 changes: 4 additions & 0 deletions dlrover/python/master/scaler/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ def new_job_scaler(platform, job_name, namespace):
return ActorScaler(job_name, namespace)
elif platform == PlatformType.LOCAL:
return None
elif platform == PlatformType.VOLCANO:
from dlrover.python.master.scaler.volcano_scaler import VolcanoScaler

return VolcanoScaler(job_name, namespace)
26 changes: 26 additions & 0 deletions dlrover/python/master/scaler/volcano_scaler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright 2022 The DLRover Authors. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from dlrover.python.master.scaler.base_scaler import Scaler, ScalePlan

# Do nothing for VolcanoScaler, use volcano to scaling.
class VolcanoScaler(Scaler):
def __init__(self, job_name, namespace):
super(VolcanoScaler, self).__init__(job_name)
self._namespace = namespace

def start(self):
pass

def scale(self, plan: ScalePlan):
pass
8 changes: 7 additions & 1 deletion dlrover/python/master/watcher/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

def new_node_watcher(platform, job_name, namespace="default"):
logger.info("New %s Node Watcher", platform)
if platform in (PlatformType.KUBERNETES, PlatformType.PY_KUBERNETES):
if platform in (PlatformType.KUBERNETES, PlatformType.PY_KUBERNETES, PlatformType.VOLCANO):
from dlrover.python.master.watcher.k8s_watcher import PodWatcher

return PodWatcher(job_name, namespace)
Expand All @@ -43,6 +43,12 @@ def new_scale_plan_watcher(platform, job_name, namespace, job_uuid):
)

return RayScalePlanWatcher(job_name, namespace, job_uuid)
elif platform in (PlatformType.VOLCANO):
from dlrover.python.master.watcher.volcano_watcher import (
VolcanoScalePlanWatcher,
)

return VolcanoScalePlanWatcher(job_name, namespace, job_uuid)
else:
raise ValueError("Not support engine %s", platform)

Expand Down
25 changes: 25 additions & 0 deletions dlrover/python/master/watcher/volcano_watcher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Copyright 2022 The DLRover Authors. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time

# Do nothing for VolcanoScalePlanWatcher, use volcano to scaling.
class VolcanoScalePlanWatcher:
def __init__(self, job_name, namespace, job_uuid):
self.job_name = job_name
self.namespace = namespace
self.job_uuid = job_uuid

def watch(self):
while True:
time.sleep(1000)
yield None
7 changes: 7 additions & 0 deletions dlrover/python/scheduler/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ def new_elastic_job(platform, job_name, namespace):
from dlrover.python.scheduler.ray import RayElasticJob

return RayElasticJob(job_name, namespace)
elif platform in (PlatformType.VOLCANO):
from dlrover.python.scheduler.volcano import VolcanoElasticJob
return VolcanoElasticJob(job_name, namespace)
else:
raise ValueError("Not support engine %s", platform)

Expand All @@ -42,5 +45,9 @@ def new_job_args(platform, job_name, namespace):
return RayJobArgs(platform, namespace, job_name)
elif platform == PlatformType.LOCAL:
return LocalJobArgs(platform, namespace, job_name)
elif platform == PlatformType.VOLCANO:
from dlrover.python.scheduler.volcano import VolcanoJobArgs

return VolcanoJobArgs(platform, namespace, job_name)
else:
raise ValueError("Not support platform %s", platform)
99 changes: 99 additions & 0 deletions dlrover/python/scheduler/volcano.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Copyright 2022 The DLRover Authors. All rights reserved.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import time

from dlrover.python.common.node import NodeGroupResource, NodeResource

from dlrover.python.common.constants import (
NodeType, NodeEnv,
)
from dlrover.python.scheduler.job import ElasticJob, JobArgs, NodeArgs
from dlrover.python.scheduler.kubernetes import k8sClient, convert_cpu_to_decimal, convert_memory_to_mb, \
_dlrover_context
from dlrover.python.common.log import default_logger as logger

class VolcanoElasticJob(ElasticJob):
def __init__(self, job_name, namespace):
self._namespace = namespace
self._job_name = job_name

def get_node_name(self, type, id):
return "pod-name"

def get_node_service_addr(self, type, id):
return ""

class VolcanoJobArgs(JobArgs):
def __init__(self, platform, namespace, job_name):
super(VolcanoJobArgs, self).__init__(platform, namespace, job_name)

def initilize(self):
self.user = os.getenv("USER", "")
k8s_client = k8sClient.singleton_instance(self.namespace)

self.job_uuid = os.getenv(NodeEnv.JOB_UID, "")
# Get parameters from volcano job
vcjob = self._retry_to_get_vcjob(k8s_client)
for task in vcjob["spec"]["tasks"]:
if task["name"] == NodeType.WORKER:
restart_policy = task["template"]["spec"].get(
"restartPolicy", ""
)
self.relaunch_always = restart_policy == "Always"

num = int(task.get("replicas", 0))
assert len(task["template"]["spec"]["containers"]) == 1
container = task["template"]["spec"]["containers"][0]
resources = container.get("resources", {})
requests = resources.get("requests", {})
cpu = convert_cpu_to_decimal(requests.get("cpu", 0))
if "memory" in requests:
memory = convert_memory_to_mb(requests["memory"])
else:
memory = 0
gpu_type = None
gpu_num = 0
for k, v in requests.items():
if "nvidia.com" in k:
gpu_type = k
gpu_num = int(v)
group_resource = NodeGroupResource(
num,
NodeResource(
cpu=cpu,
memory=memory,
gpu_type=gpu_type,
gpu_num=gpu_num,
),
)
self.node_args[task["name"]] = NodeArgs(
group_resource,
process_timeout = _dlrover_context.seconds_to_timeout_task_process,
)
logger.info("Job args = %s", self.__dict__)

def _retry_to_get_vcjob(self, k8s_client: k8sClient):
for _ in range(3):
vcjob = k8s_client.get_custom_resource(
name=self.job_name,
group="batch.volcano.sh",
version="v1alpha1",
plural="jobs",
)
if vcjob:
return vcjob
else:
time.sleep(5)
raise ValueError("Cannot get the training volcano job %s" % self.job_name)
42 changes: 40 additions & 2 deletions dlrover/trainer/torch/elastic_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
JobConstant,
NodeEnv,
NodeEventType,
PreCheckStatus,
PreCheckStatus, NodeType,
)
from dlrover.python.common.log import default_logger as logger
from dlrover.python.elastic_agent.master_client import MasterClient
Expand All @@ -118,7 +118,7 @@
launch_agent,
)
from dlrover.python.training_event import DLRoverAgentEvent
from dlrover.trainer.torch.utils import version_less_than_230
from dlrover.trainer.torch.utils import version_less_than_230, is_run_in_volcano


def parse_args(args):
Expand Down Expand Up @@ -216,6 +216,41 @@ def parse_args(args):
)
return parser.parse_args(args)

# Require svc and env plugin for volcano job.
# And the mater task name must be 'master'. the worker name must be 'worker'
def setup_env_for_volcano():
if not is_run_in_volcano():
return
os.environ['GRPC_ENABLE_FORK'] = 'false'
os.environ['NODE_TYPE'] = 'worker'
vcMasterHosts = os.environ.get('VC_MASTER_HOSTS', "")
if vcMasterHosts == "":
raise RuntimeError("VC_MASTER_HOSTS env is not set, you must use"
"master as task name for dlrover master.")
if len(vcMasterHosts.split(',')) != 1:
raise RuntimeError(f"The length of env VC_MASTER_HOSTS {vcMasterHosts} should be 1.")
dlroverMasterPort = os.environ.get('DLROVER_MASTER_PORT', "")
if dlroverMasterPort == "":
raise RuntimeError("DLROVER_MASTER_PORT env is not set, you must use")
os.environ['DLROVER_MASTER_ADDR'] = f"{vcMasterHosts}:{dlroverMasterPort}"
if os.environ.get('VC_TASK_INDEX', -1) == -1:
raise RuntimeError("VC_TASK_INDEX env is not set.")
# NODE_ID, NODE_RANK, RANK are same with VC_TASK_INDEX in volcano
os.environ['NODE_ID'] = os.environ.get('VC_TASK_INDEX')
os.environ['NODE_RANK'] = os.environ.get('VC_TASK_INDEX')
os.environ['RANK'] = os.environ.get('VC_TASK_INDEX')
if os.environ.get('VC_WORKER_NUM', -1) == -1:
raise RuntimeError("VC_WORKER_NUM env is not set, you must use "
"worker as task name for dlrover worker.")
# NODE_ID, WORLD_SIZE are same with VC_WORKER_NUM in volcano
os.environ['NODE_NUM'] = os.environ.get('VC_WORKER_NUM')
os.environ['WORLD_SIZE'] = os.environ.get('VC_WORKER_NUM')
os.environ['DLROVER_MASTER_SERVICE_TYPE'] = 'grpc'
# worker related env
os.environ['WORKER_TYPE'] = NodeType.WORKER
os.environ['WORKER_ID'] = os.environ.get('VC_TASK_INDEX')
os.environ['WORKER_RANK'] = os.environ.get('VC_TASK_INDEX')
os.environ['WORKER_NUM'] = os.environ.get('VC_WORKER_NUM')

class ElasticLaunch:
"""
Expand Down Expand Up @@ -518,6 +553,9 @@ def _check_to_use_dlrover_run(job_name, is_standalone=False):


def run(args):
# reconfigure env for volcano
setup_env_for_volcano()

# export event for dlrover agent
agent = DLRoverAgentEvent.singleton_instance()
agent.start(pid=vars(args))
Expand Down
5 changes: 5 additions & 0 deletions dlrover/trainer/torch/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import torch
from packaging import version
Expand All @@ -28,3 +29,7 @@ def version_less_than_240():
def version_less_than_280():
current_version = version.parse(torch.__version__).base_version
return version.parse(current_version) <= version.parse("2.7.1")

def is_run_in_volcano():
# Use volcano environment plugins to check if we are running in volcano
return os.environ.get('VC_TASK_INDEX', -1) != -1