diff --git a/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml b/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml new file mode 100644 index 00000000000..96b97a3042f --- /dev/null +++ b/ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml @@ -0,0 +1,111 @@ +apiVersion: scheduling.volcano.sh/v1beta1 +kind: Queue +metadata: + name: kuberay-test-queue +spec: + weight: 1 + capability: + cpu: 4 + memory: 6Gi +--- +apiVersion: ray.io/v1 +kind: RayJob +metadata: + name: rayjob-sample-0 + labels: + ray.io/scheduler-name: volcano + volcano.sh/queue-name: kuberay-test-queue +spec: + entrypoint: python /home/ray/samples/sample_code.py + runtimeEnvYAML: | + pip: + - requests==2.26.0 + - pendulum==2.1.2 + env_vars: + counter_name: "test_counter" + rayClusterSpec: + rayVersion: '2.46.0' + headGroupSpec: + rayStartParams: {} + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.46.0 + ports: + - containerPort: 6379 + name: gcs-server + - containerPort: 8265 + name: dashboard + - containerPort: 10001 + name: client + resources: + limits: + cpu: "1" + memory: "2Gi" + requests: + cpu: "1" + memory: "2Gi" + volumeMounts: + - mountPath: /home/ray/samples + name: code-sample + volumes: + - name: code-sample + configMap: + name: ray-job-code-sample + items: + - key: sample_code.py + path: sample_code.py + workerGroupSpecs: + - replicas: 2 + minReplicas: 2 + maxReplicas: 2 + groupName: small-group + rayStartParams: {} + template: + spec: + containers: + - name: ray-worker + image: rayproject/ray:2.46.0 + resources: + limits: + cpu: "1" + memory: "1Gi" + requests: + cpu: "1" + memory: "1Gi" +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: ray-job-code-sample +data: + sample_code.py: | + import ray + import os + import requests + + ray.init() + + @ray.remote + class Counter: + def __init__(self): + # Used to verify runtimeEnv + self.name = os.getenv("counter_name") + assert self.name == "test_counter" + self.counter = 0 + + def inc(self): + self.counter += 1 + + def get_counter(self): + return "{} got {}".format(self.name, self.counter) + + counter = Counter.remote() + + for _ in range(5): + ray.get(counter.inc.remote()) + print(ray.get(counter.get_counter.remote())) + + # Verify that the correct runtime env was used for the job. + assert requests.__version__ == "2.26.0" diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 7bc8bda6e76..44ccb55d5d8 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -15,16 +15,18 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - volcanov1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" - volcanov1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" + volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" + volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) const ( PodGroupName = "podgroups.scheduling.volcano.sh" + pluginName = "volcano" QueueNameLabelKey = "volcano.sh/queue-name" ) @@ -34,62 +36,141 @@ type VolcanoBatchScheduler struct { type VolcanoBatchSchedulerFactory struct{} -func GetPluginName() string { - return "volcano" -} +func GetPluginName() string { return pluginName } func (v *VolcanoBatchScheduler) Name() string { return GetPluginName() } func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error { - app, ok := object.(*rayv1.RayCluster) - if !ok { - return fmt.Errorf("currently only RayCluster is supported, got %T", object) - } - var minMember int32 - var totalResource corev1.ResourceList - if !utils.IsAutoscalingEnabled(&app.Spec) { - minMember = utils.CalculateDesiredReplicas(ctx, app) + 1 - totalResource = utils.CalculateDesiredResources(app) - } else { - minMember = utils.CalculateMinReplicas(app) + 1 - totalResource = utils.CalculateMinResources(app) + switch obj := object.(type) { + case *rayv1.RayCluster: + return v.handleRayCluster(ctx, obj) + case *rayv1.RayJob: + return v.handleRayJob(ctx, obj) + default: + return fmt.Errorf("unsupported object type %T, only RayCluster and RayJob are supported", object) + } +} + +// handleRayCluster calculates the PodGroup MinMember and MinResources for a RayCluster +func (v *VolcanoBatchScheduler) handleRayCluster(ctx context.Context, raycluster *rayv1.RayCluster) error { + // Check if this RayCluster is created by a RayJob, if so, skip PodGroup creation + if crdType, ok := raycluster.Labels[utils.RayOriginatedFromCRDLabelKey]; ok && crdType == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) { + return nil } - return v.syncPodGroup(ctx, app, minMember, totalResource) + minMember, totalResource := v.calculatePodGroupParams(ctx, &raycluster.Spec) + + return v.syncPodGroup(ctx, raycluster, minMember, totalResource) } -func getAppPodGroupName(app *rayv1.RayCluster) string { - return fmt.Sprintf("ray-%s-pg", app.Name) +// handleRayJob calculates the PodGroup MinMember and MinResources for a RayJob +func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1.RayJob) error { + if rayJob.Spec.RayClusterSpec == nil { + return fmt.Errorf("gang scheduling does not support RayJob %s/%s referencing an existing RayCluster", rayJob.Namespace, rayJob.Name) + } + + var totalResourceList []corev1.ResourceList + minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec) + totalResourceList = append(totalResourceList, totalResource) + + // MinMember intentionally excludes the submitter pod to avoid a startup deadlock + // (submitter waits for cluster; gang would wait for submitter). We still add the + // submitter's resource requests into MinResources so capacity is reserved. + submitterResource := getSubmitterResource(rayJob) + totalResourceList = append(totalResourceList, submitterResource) + return v.syncPodGroup(ctx, rayJob, minMember, utils.SumResourceList(totalResourceList)) } -func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, app *rayv1.RayCluster, size int32, totalResource corev1.ResourceList) error { - logger := ctrl.LoggerFrom(ctx).WithName(v.Name()) +func getSubmitterResource(rayJob *rayv1.RayJob) corev1.ResourceList { + if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode { + submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, rayJob.Spec.RayClusterSpec) + return utils.CalculatePodResource(submitterTemplate.Spec) + } else if rayJob.Spec.SubmissionMode == rayv1.SidecarMode { + submitterContainer := common.GetDefaultSubmitterContainer(rayJob.Spec.RayClusterSpec) + containerResource := submitterContainer.Resources.Requests + for name, quantity := range submitterContainer.Resources.Limits { + if _, ok := containerResource[name]; !ok { + containerResource[name] = quantity + } + } + return containerResource + } + return corev1.ResourceList{} +} + +func getAppPodGroupName(object metav1.Object) string { + // Prefer the RayJob name if this object originated from a RayJob + name := object.GetName() + if labels := object.GetLabels(); labels != nil && + labels[utils.RayOriginatedFromCRDLabelKey] == utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD) { + if rayJobName := labels[utils.RayOriginatedFromCRNameLabelKey]; rayJobName != "" { + name = rayJobName + } + } + return fmt.Sprintf("ray-%s-pg", name) +} - podGroupName := getAppPodGroupName(app) - podGroup := volcanov1beta1.PodGroup{} - if err := v.cli.Get(ctx, types.NamespacedName{Namespace: app.Namespace, Name: podGroupName}, &podGroup); err != nil { +func addSchedulerName(obj metav1.Object, schedulerName string) { + switch obj := obj.(type) { + case *corev1.Pod: + obj.Spec.SchedulerName = schedulerName + case *corev1.PodTemplateSpec: + obj.Spec.SchedulerName = schedulerName + } +} + +func populateAnnotations(parent metav1.Object, child metav1.Object, groupName string) { + annotations := child.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(parent) + annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName + child.SetAnnotations(annotations) +} + +func populateLabelsFromObject(parent metav1.Object, child metav1.Object, key string) { + labels := child.GetLabels() + if labels == nil { + labels = make(map[string]string) + } + if parentLabel, exist := parent.GetLabels()[key]; exist && parentLabel != "" { + labels[key] = parentLabel + } + child.SetLabels(labels) +} + +// syncPodGroup ensures a Volcano PodGroup exists/updated for the given object +// with the provided size (MinMember) and total resources. +func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.Object, size int32, totalResource corev1.ResourceList) error { + logger := ctrl.LoggerFrom(ctx).WithName(pluginName) + + podGroupName := getAppPodGroupName(owner) + podGroup := volcanoschedulingv1beta1.PodGroup{} + if err := v.cli.Get(ctx, types.NamespacedName{Namespace: owner.GetNamespace(), Name: podGroupName}, &podGroup); err != nil { if !errors.IsNotFound(err) { + logger.Error(err, "failed to get PodGroup", "podGroupName", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace()) return err } - podGroup := createPodGroup(app, podGroupName, size, totalResource) + podGroup := createPodGroup(owner, podGroupName, size, totalResource) if err := v.cli.Create(ctx, &podGroup); err != nil { if errors.IsAlreadyExists(err) { - logger.Info("pod group already exists, no need to create") + logger.Info("podGroup already exists, no need to create", "name", podGroupName) return nil } - logger.Error(err, "Pod group CREATE error!", "PodGroup.Error", err) + logger.Error(err, "failed to create PodGroup", "name", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace()) return err } } else { - if podGroup.Spec.MinMember != size || !quotav1.Equals(*podGroup.Spec.MinResources, totalResource) { + if podGroup.Spec.MinMember != size || podGroup.Spec.MinResources == nil || !quotav1.Equals(*podGroup.Spec.MinResources, totalResource) { podGroup.Spec.MinMember = size podGroup.Spec.MinResources = &totalResource if err := v.cli.Update(ctx, &podGroup); err != nil { - logger.Error(err, "Pod group UPDATE error!", "podGroup", podGroupName) + logger.Error(err, "failed to update PodGroup", "name", podGroupName, "ownerKind", utils.GetCRDType(owner.GetLabels()[utils.RayOriginatedFromCRDLabelKey]), "ownerName", owner.GetName(), "ownerNamespace", owner.GetNamespace()) return err } } @@ -97,43 +178,60 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, app *rayv1.Ray return nil } -func createPodGroup( - app *rayv1.RayCluster, - podGroupName string, - size int32, - totalResource corev1.ResourceList, -) volcanov1beta1.PodGroup { - podGroup := volcanov1beta1.PodGroup{ +func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, rayClusterSpec *rayv1.RayClusterSpec) (int32, corev1.ResourceList) { + rayCluster := &rayv1.RayCluster{Spec: *rayClusterSpec} + + if !utils.IsAutoscalingEnabled(rayClusterSpec) { + return utils.CalculateDesiredReplicas(ctx, rayCluster) + 1, utils.CalculateDesiredResources(rayCluster) + } + return utils.CalculateMinReplicas(rayCluster) + 1, utils.CalculateMinResources(rayCluster) +} + +func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup { + var ownerRef metav1.OwnerReference + switch obj := owner.(type) { + case *rayv1.RayCluster: + ownerRef = *metav1.NewControllerRef(obj, rayv1.SchemeGroupVersion.WithKind("RayCluster")) + case *rayv1.RayJob: + ownerRef = *metav1.NewControllerRef(obj, rayv1.SchemeGroupVersion.WithKind("RayJob")) + } + + podGroup := volcanoschedulingv1beta1.PodGroup{ ObjectMeta: metav1.ObjectMeta{ - Namespace: app.Namespace, - Name: podGroupName, - OwnerReferences: []metav1.OwnerReference{ - *metav1.NewControllerRef(app, rayv1.SchemeGroupVersion.WithKind("RayCluster")), - }, + Namespace: owner.GetNamespace(), + Name: podGroupName, + OwnerReferences: []metav1.OwnerReference{ownerRef}, }, - Spec: volcanov1beta1.PodGroupSpec{ + Spec: volcanoschedulingv1beta1.PodGroupSpec{ MinMember: size, MinResources: &totalResource, }, - Status: volcanov1beta1.PodGroupStatus{ - Phase: volcanov1beta1.PodGroupPending, + Status: volcanoschedulingv1beta1.PodGroupStatus{ + Phase: volcanoschedulingv1beta1.PodGroupPending, }, } - if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok { + if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok { podGroup.Spec.Queue = queue } - - if priorityClassName, ok := app.ObjectMeta.Labels[utils.RayPriorityClassName]; ok { + if priorityClassName, ok := owner.GetLabels()[utils.RayPriorityClassName]; ok { podGroup.Spec.PriorityClassName = priorityClassName } return podGroup } +func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, groupName string) { + populateLabelsFromObject(parent, child, QueueNameLabelKey) + populateLabelsFromObject(parent, child, utils.RayPriorityClassName) + populateAnnotations(parent, child, groupName) + addSchedulerName(child, v.Name()) +} + +// This function will be removed in interface migration PR func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.RayCluster, groupName string, pod *corev1.Pod) { - pod.Annotations[volcanov1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app) - pod.Annotations[volcanov1alpha1.TaskSpecKey] = groupName + pod.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey] = getAppPodGroupName(app) + pod.Annotations[volcanobatchv1alpha1.TaskSpecKey] = groupName if queue, ok := app.ObjectMeta.Labels[QueueNameLabelKey]; ok { pod.Labels[QueueNameLabelKey] = queue } @@ -143,11 +241,8 @@ func (v *VolcanoBatchScheduler) AddMetadataToPod(_ context.Context, app *rayv1.R pod.Spec.SchedulerName = v.Name() } -func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) { -} - func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, cli client.Client) (schedulerinterface.BatchScheduler, error) { - if err := volcanov1beta1.AddToScheme(cli.Scheme()); err != nil { + if err := volcanoschedulingv1beta1.AddToScheme(cli.Scheme()); err != nil { return nil, fmt.Errorf("failed to add volcano to scheme with error %w", err) } return &VolcanoBatchScheduler{ @@ -156,9 +251,9 @@ func (vf *VolcanoBatchSchedulerFactory) New(_ context.Context, _ *rest.Config, c } func (vf *VolcanoBatchSchedulerFactory) AddToScheme(scheme *runtime.Scheme) { - utilruntime.Must(volcanov1beta1.AddToScheme(scheme)) + utilruntime.Must(volcanoschedulingv1beta1.AddToScheme(scheme)) } func (vf *VolcanoBatchSchedulerFactory) ConfigureReconciler(b *builder.Builder) *builder.Builder { - return b.Owns(&volcanov1beta1.PodGroup{}) + return b.Owns(&volcanoschedulingv1beta1.PodGroup{}) } diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go index 2e810b34302..d137c01f76c 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go @@ -5,12 +5,19 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1" + volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" + "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" ) @@ -78,7 +85,76 @@ func createTestRayCluster(numOfHosts int32) rayv1.RayCluster { } } -func TestCreatePodGroup(t *testing.T) { +func createTestRayJob(numOfHosts int32) rayv1.RayJob { + headSpec := corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "ray-head", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("256m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + } + + workerSpec := corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "ray-worker", + Resources: corev1.ResourceRequirements{ + Limits: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("500m"), + corev1.ResourceMemory: resource.MustParse("512Mi"), + }, + Requests: corev1.ResourceList{ + corev1.ResourceCPU: resource.MustParse("256m"), + corev1.ResourceMemory: resource.MustParse("256Mi"), + }, + }, + }, + }, + } + + return rayv1.RayJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "rayjob-sample", + Namespace: "default", + Labels: map[string]string{ + QueueNameLabelKey: "test-queue", + utils.RayPriorityClassName: "test-priority", + }, + }, + Spec: rayv1.RayJobSpec{ + RayClusterSpec: &rayv1.RayClusterSpec{ + HeadGroupSpec: rayv1.HeadGroupSpec{ + Template: corev1.PodTemplateSpec{ + Spec: headSpec, + }, + }, + WorkerGroupSpecs: []rayv1.WorkerGroupSpec{ + { + Template: corev1.PodTemplateSpec{ + Spec: workerSpec, + }, + Replicas: ptr.To[int32](2), + NumOfHosts: numOfHosts, + MinReplicas: ptr.To[int32](1), + MaxReplicas: ptr.To[int32](4), + }, + }, + }, + }, + } +} + +func TestCreatePodGroupForRayCluster(t *testing.T) { a := assert.New(t) cluster := createTestRayCluster(1) @@ -102,7 +178,7 @@ func TestCreatePodGroup(t *testing.T) { a.Equal("2", pg.Spec.MinResources.Name("nvidia.com/gpu", resource.BinarySI).String()) } -func TestCreatePodGroup_NumOfHosts2(t *testing.T) { +func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) { a := assert.New(t) cluster := createTestRayCluster(2) @@ -129,3 +205,241 @@ func TestCreatePodGroup_NumOfHosts2(t *testing.T) { // 2 GPUs * 2 = 4 GPUs a.Equal("4", pg.Spec.MinResources.Name("nvidia.com/gpu", resource.BinarySI).String()) } + +func TestCreatePodGroupForRayJob(t *testing.T) { + a := assert.New(t) + ctx := context.Background() + + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + t.Run("No submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(1) + rayJob.Spec.SubmissionMode = rayv1.HTTPMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 1 head + 2 workers (desired, not min replicas) + a.Equal(int32(3), pg.Spec.MinMember) + // 256m * 3 (requests, not limits) + a.Equal("768m", pg.Spec.MinResources.Cpu().String()) + // 256m * 3 (requests, not limits) + a.Equal("768Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) + + t.Run("K8sJobMode includes submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(1) + rayJob.Spec.SubmissionMode = rayv1.K8sJobMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 1 head + 2 workers (desired, not min replicas) + a.Equal(int32(3), pg.Spec.MinMember) + // 768m + 500m = 1268m + a.Equal("1268m", pg.Spec.MinResources.Cpu().String()) + // 768Mi + 200Mi = 968Mi + a.Equal("968Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) + + t.Run("SidecarMode includes submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(1) + rayJob.Spec.SubmissionMode = rayv1.SidecarMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 1 head + 2 workers (desired, not min replicas) + a.Equal(int32(3), pg.Spec.MinMember) + // 768m + 500m = 1268m + a.Equal("1268m", pg.Spec.MinResources.Cpu().String()) + // 768Mi + 200Mi = 968Mi + a.Equal("968Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) +} + +func TestCreatePodGroupForRayJob_NumOfHosts2(t *testing.T) { + a := assert.New(t) + ctx := context.Background() + + scheme := runtime.NewScheme() + a.NoError(rayv1.AddToScheme(scheme)) + a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme)) + fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build() + scheduler := &VolcanoBatchScheduler{cli: fakeCli} + + t.Run("No submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(2) + rayJob.Spec.SubmissionMode = rayv1.HTTPMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 2 workers (desired, not min replicas) * 2 (num of hosts) + 1 head + // 2 * 2 + 1 = 5 + a.Equal(int32(5), pg.Spec.MinMember) + // 256m * (2 (requests, not limits) * 2 (num of hosts) + 1 head) + // 256m * 5 = 1280m + a.Equal("1280m", pg.Spec.MinResources.Cpu().String()) + // 256Mi * (2 (requests, not limits) * 2 (num of hosts) + 1 head) + // 256Mi * 5 = 1280Mi + a.Equal("1280Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) + + t.Run("K8sJobMode includes submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(2) + rayJob.Spec.SubmissionMode = rayv1.K8sJobMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 2 workers (desired, not min replicas) * 2 (num of hosts) + 1 head + // 2 * 2 + 1 = 5 + a.Equal(int32(5), pg.Spec.MinMember) + // 1280m + 500m = 1780m + a.Equal("1780m", pg.Spec.MinResources.Cpu().String()) + // 1280Mi + 200Mi = 1480Mi + a.Equal("1480Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) + + t.Run("SidecarMode includes submitter pod resources", func(_ *testing.T) { + rayJob := createTestRayJob(2) + rayJob.Spec.SubmissionMode = rayv1.SidecarMode + + err := scheduler.handleRayJob(ctx, &rayJob) + require.NoError(t, err) + + var pg volcanoschedulingv1beta1.PodGroup + err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg) + require.NoError(t, err) + + // 2 workers (desired, not min replicas) * 2 (num of hosts) + 1 head + // 2 * 2 + 1 = 5 + a.Equal(int32(5), pg.Spec.MinMember) + // 1280m + 500m = 1780m + a.Equal("1780m", pg.Spec.MinResources.Cpu().String()) + // 1280Mi + 200Mi = 1480Mi + a.Equal("1480Mi", pg.Spec.MinResources.Memory().String()) + a.Equal("test-queue", pg.Spec.Queue) + a.Equal("test-priority", pg.Spec.PriorityClassName) + a.Len(pg.OwnerReferences, 1) + a.Equal("RayJob", pg.OwnerReferences[0].Kind) + }) +} + +func TestAddMetadataToSubmitterPod(t *testing.T) { + a := assert.New(t) + scheduler := &VolcanoBatchScheduler{} + + rayJob := createTestRayJob(1) + rayCluster := &rayv1.RayCluster{Spec: *rayJob.Spec.RayClusterSpec} + submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, &rayCluster.Spec) + + scheduler.AddMetadataToChildResource( + context.Background(), + &rayJob, + &submitterTemplate, + utils.RayNodeSubmitterGroupLabelValue, + ) + + // Check annotations + a.Equal(getAppPodGroupName(&rayJob), submitterTemplate.Annotations[volcanoschedulingv1beta1.KubeGroupNameAnnotationKey]) + a.Equal(utils.RayNodeSubmitterGroupLabelValue, submitterTemplate.Annotations[volcanobatchv1alpha1.TaskSpecKey]) + + // Check labels + a.Equal("test-queue", submitterTemplate.Labels[QueueNameLabelKey]) + a.Equal("test-priority", submitterTemplate.Labels[utils.RayPriorityClassName]) + + // Check scheduler name + a.Equal(pluginName, submitterTemplate.Spec.SchedulerName) +} + +func TestCalculatePodGroupParams(t *testing.T) { + a := assert.New(t) + scheduler := &VolcanoBatchScheduler{} + + t.Run("Autoscaling disabled", func(_ *testing.T) { + cluster := createTestRayCluster(1) + + minMember, totalResource := scheduler.calculatePodGroupParams(context.Background(), &cluster.Spec) + + // 1 head + 2 workers (desired replicas) + a.Equal(int32(3), minMember) + + // 256m * 3 (requests, not limits) + a.Equal("768m", totalResource.Cpu().String()) + + // 256Mi * 3 (requests, not limits) + a.Equal("768Mi", totalResource.Memory().String()) + }) + + t.Run("Autoscaling enabled", func(_ *testing.T) { + cluster := createTestRayCluster(1) + cluster.Spec.EnableInTreeAutoscaling = ptr.To(true) + + minMember, totalResource := scheduler.calculatePodGroupParams(context.Background(), &cluster.Spec) + + // 1 head + 1 worker (min replicas) + a.Equal(int32(2), minMember) + + // 256m * 2 (requests, not limits) + a.Equal("512m", totalResource.Cpu().String()) + + // 256Mi * 2 (requests, not limits) + a.Equal("512Mi", totalResource.Memory().String()) + }) +} + +func TestGetAppPodGroupName(t *testing.T) { + a := assert.New(t) + + rayCluster := &rayv1.RayCluster{ObjectMeta: metav1.ObjectMeta{Name: "raycluster-sample", Namespace: "default"}} + a.Equal("ray-raycluster-sample-pg", getAppPodGroupName(rayCluster)) + + rayJob := createTestRayJob(1) + a.Equal("ray-rayjob-sample-pg", getAppPodGroupName(&rayJob)) +} diff --git a/ray-operator/controllers/ray/utils/util.go b/ray-operator/controllers/ray/utils/util.go index cf6b9066323..3bb63f79189 100644 --- a/ray-operator/controllers/ray/utils/util.go +++ b/ray-operator/controllers/ray/utils/util.go @@ -440,7 +440,7 @@ func CalculateDesiredResources(cluster *rayv1.RayCluster) corev1.ResourceList { desiredResourcesList = append(desiredResourcesList, podResource) } } - return sumResourceList(desiredResourcesList) + return SumResourceList(desiredResourcesList) } func CalculateMinResources(cluster *rayv1.RayCluster) corev1.ResourceList { @@ -454,7 +454,7 @@ func CalculateMinResources(cluster *rayv1.RayCluster) corev1.ResourceList { minResourcesList = append(minResourcesList, podResource) } } - return sumResourceList(minResourcesList) + return SumResourceList(minResourcesList) } // calculateReplicaResource adjusts the resource quantities in a given ResourceList @@ -503,7 +503,7 @@ func ConvertResourceListToMapString(resourceList corev1.ResourceList) map[string return result } -func sumResourceList(list []corev1.ResourceList) corev1.ResourceList { +func SumResourceList(list []corev1.ResourceList) corev1.ResourceList { totalResource := corev1.ResourceList{} for _, l := range list { for name, quantity := range l {