Skip to content

Commit 8b8a9e5

Browse files
committed
Modify kai scheduler and sheduler plugins
Signed-off-by: win5923 <[email protected]>
1 parent 973fe82 commit 8b8a9e5

File tree

11 files changed

+295
-345
lines changed

11 files changed

+295
-345
lines changed

ray-operator/config/samples/ray-job.volcano-scheduler-queue.yaml

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ spec:
1111
apiVersion: ray.io/v1
1212
kind: RayJob
1313
metadata:
14-
name: rayjob-sample-2
14+
name: rayjob-sample-0
1515
labels:
1616
ray.io/scheduler-name: volcano
1717
volcano.sh/queue-name: kuberay-test-queue
@@ -23,16 +23,10 @@ spec:
2323
- pendulum==2.1.2
2424
env_vars:
2525
counter_name: "test_counter"
26-
# rayClusterSpec specifies the RayCluster instance to be created by the RayJob controller.
2726
rayClusterSpec:
28-
rayVersion: '2.46.0' # should match the Ray version in the image of the containers
29-
# Ray head pod template
27+
rayVersion: '2.46.0'
3028
headGroupSpec:
31-
# The `rayStartParams` are used to configure the `ray start` command.
32-
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
33-
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
3429
rayStartParams: {}
35-
#pod template
3630
template:
3731
spec:
3832
containers:
@@ -41,7 +35,7 @@ spec:
4135
ports:
4236
- containerPort: 6379
4337
name: gcs-server
44-
- containerPort: 8265 # Ray dashboard
38+
- containerPort: 8265
4539
name: dashboard
4640
- containerPort: 10001
4741
name: client
@@ -56,31 +50,22 @@ spec:
5650
- mountPath: /home/ray/samples
5751
name: code-sample
5852
volumes:
59-
# You set volumes at the Pod level, then mount them into containers inside that Pod
6053
- name: code-sample
6154
configMap:
62-
# Provide the name of the ConfigMap you want to mount.
6355
name: ray-job-code-sample
64-
# An array of keys from the ConfigMap to create as files
6556
items:
6657
- key: sample_code.py
6758
path: sample_code.py
6859
workerGroupSpecs:
69-
# the pod replicas in this group typed worker
7060
- replicas: 2
7161
minReplicas: 2
7262
maxReplicas: 2
73-
# logical group name, for this called small-group, also can be functional
7463
groupName: small-group
75-
# The `rayStartParams` are used to configure the `ray start` command.
76-
# See https://github.com/ray-project/kuberay/blob/master/docs/guidance/rayStartParams.md for the default settings of `rayStartParams` in KubeRay.
77-
# See https://docs.ray.io/en/latest/cluster/cli.html#ray-start for all available options in `rayStartParams`.
7864
rayStartParams: {}
79-
#pod template
8065
template:
8166
spec:
8267
containers:
83-
- name: ray-worker # must consist of lower case alphanumeric characters or '-', and must start and end with an alphanumeric character (e.g. 'my-name', or '123-abc'
68+
- name: ray-worker
8469
image: rayproject/ray:2.46.0
8570
resources:
8671
limits:
@@ -89,23 +74,6 @@ spec:
8974
requests:
9075
cpu: "1"
9176
memory: "1Gi"
92-
93-
# SubmitterPodTemplate is the template for the pod that will run the `ray job submit` command against the RayCluster.
94-
# If SubmitterPodTemplate is specified, the first container is assumed to be the submitter container.
95-
# submitterPodTemplate:
96-
# spec:
97-
# restartPolicy: Never
98-
# containers:
99-
# - name: my-custom-rayjob-submitter-pod
100-
# image: rayproject/ray:2.46.0
101-
# # If Command is not specified, the correct command will be supplied at runtime using the RayJob spec `entrypoint` field.
102-
# # Specifying Command is not recommended.
103-
# # command: ["sh", "-c", "ray job submit --address=http://$RAY_DASHBOARD_ADDRESS --submission-id=$RAY_JOB_SUBMISSION_ID -- echo hello world"]
104-
105-
106-
######################Ray code sample#################################
107-
# this sample is from https://docs.ray.io/en/latest/cluster/job-submission.html#quick-start-example
108-
# it is mounted into the container and executed to show the Ray job at work
10977
---
11078
apiVersion: v1
11179
kind: ConfigMap

ray-operator/controllers/ray/batchscheduler/interface/interface.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@ package schedulerinterface
33
import (
44
"context"
55

6-
corev1 "k8s.io/api/core/v1"
76
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
87
"k8s.io/apimachinery/pkg/runtime"
98
"k8s.io/client-go/rest"
109
"sigs.k8s.io/controller-runtime/pkg/builder"
1110
"sigs.k8s.io/controller-runtime/pkg/client"
12-
13-
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
1411
)
1512

1613
// BatchScheduler manages submitting RayCluster pods to a third-party scheduler.
@@ -23,11 +20,6 @@ type BatchScheduler interface {
2320
// For most batch schedulers, this results in the creation of a PodGroup.
2421
DoBatchSchedulingOnSubmission(ctx context.Context, object metav1.Object) error
2522

26-
// AddMetadataToPod enriches the pod with metadata necessary to tie it to the scheduler.
27-
// For example, setting labels for queues / priority, and setting schedulerName.
28-
// This function will be removed once Rayjob Volcano scheduler integration is completed.
29-
AddMetadataToPod(ctx context.Context, rayCluster *rayv1.RayCluster, groupName string, pod *corev1.Pod)
30-
3123
// AddMetadataToChildResource enriches the child resource (batchv1.Job, rayv1.RayCluster) with metadata necessary to tie it to the scheduler.
3224
// For example, setting labels for queues / priority, and setting schedulerName.
3325
AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, groupName string)
@@ -63,9 +55,6 @@ func (d *DefaultBatchScheduler) DoBatchSchedulingOnSubmission(_ context.Context,
6355
return nil
6456
}
6557

66-
func (d *DefaultBatchScheduler) AddMetadataToPod(_ context.Context, _ *rayv1.RayCluster, _ string, _ *corev1.Pod) {
67-
}
68-
6958
func (d *DefaultBatchScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
7059
}
7160

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"sigs.k8s.io/controller-runtime/pkg/builder"
1919
"sigs.k8s.io/controller-runtime/pkg/client"
2020

21-
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2221
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
2322
)
2423

@@ -38,23 +37,33 @@ func (k *KaiScheduler) DoBatchSchedulingOnSubmission(_ context.Context, _ metav1
3837
return nil
3938
}
4039

41-
func (k *KaiScheduler) AddMetadataToPod(ctx context.Context, app *rayv1.RayCluster, _ string, pod *corev1.Pod) {
40+
func (k *KaiScheduler) AddMetadataToChildResource(ctx context.Context, parent metav1.Object, child metav1.Object, _ string) {
4241
logger := ctrl.LoggerFrom(ctx).WithName("kai-scheduler")
43-
pod.Spec.SchedulerName = k.Name()
42+
addSchedulerNameToObject(child, k.Name())
4443

45-
queue, ok := app.Labels[QueueLabelName]
44+
parentLabel := parent.GetLabels()
45+
queue, ok := parentLabel[QueueLabelName]
4646
if !ok || queue == "" {
47-
logger.Info("Queue label missing from RayCluster; pods will remain pending",
47+
logger.Info("Queue label missing from parent; child will remain pending",
4848
"requiredLabel", QueueLabelName)
4949
return
5050
}
51-
if pod.Labels == nil {
52-
pod.Labels = make(map[string]string)
51+
52+
childLabels := child.GetLabels()
53+
if childLabels == nil {
54+
childLabels = make(map[string]string)
5355
}
54-
pod.Labels[QueueLabelName] = queue
56+
childLabels[QueueLabelName] = queue
57+
child.SetLabels(childLabels)
5558
}
5659

57-
func (k *KaiScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
60+
func addSchedulerNameToObject(obj metav1.Object, schedulerName string) {
61+
switch obj := obj.(type) {
62+
case *corev1.Pod:
63+
obj.Spec.SchedulerName = schedulerName
64+
case *corev1.PodTemplateSpec:
65+
obj.Spec.SchedulerName = schedulerName
66+
}
5867
}
5968

6069
func (kf *KaiSchedulerFactory) New(_ context.Context, _ *rest.Config, _ client.Client) (schedulerinterface.BatchScheduler, error) {

ray-operator/controllers/ray/batchscheduler/kai-scheduler/kai_scheduler_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func createTestPod() *corev1.Pod {
4141
}
4242
}
4343

44-
func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
44+
func TestAddMetadataToChildResource_WithQueueLabel(t *testing.T) {
4545
a := assert.New(t)
4646
scheduler := &KaiScheduler{}
4747
ctx := context.Background()
@@ -52,8 +52,8 @@ func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
5252
})
5353
pod := createTestPod()
5454

55-
// Call AddMetadataToPod
56-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
55+
// Call AddMetadataToChildResource
56+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
5757

5858
// Assert scheduler name is set to kai-scheduler
5959
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -63,7 +63,7 @@ func TestAddMetadataToPod_WithQueueLabel(t *testing.T) {
6363
a.Equal("test-queue", pod.Labels[QueueLabelName])
6464
}
6565

66-
func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
66+
func TestAddMetadataToChildResource_WithoutQueueLabel(t *testing.T) {
6767
a := assert.New(t)
6868
scheduler := &KaiScheduler{}
6969
ctx := context.Background()
@@ -72,8 +72,8 @@ func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
7272
rayCluster := createTestRayCluster(map[string]string{})
7373
pod := createTestPod()
7474

75-
// Call AddMetadataToPod
76-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
75+
// Call AddMetadataToChildResource
76+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
7777

7878
// Assert scheduler name is still set (always required)
7979
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -85,7 +85,7 @@ func TestAddMetadataToPod_WithoutQueueLabel(t *testing.T) {
8585
}
8686
}
8787

88-
func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
88+
func TestAddMetadataToChildResource_WithEmptyQueueLabel(t *testing.T) {
8989
a := assert.New(t)
9090
scheduler := &KaiScheduler{}
9191
ctx := context.Background()
@@ -96,8 +96,8 @@ func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
9696
})
9797
pod := createTestPod()
9898

99-
// Call AddMetadataToPod
100-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
99+
// Call AddMetadataToChildResource
100+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
101101

102102
// Assert scheduler name is still set
103103
a.Equal("kai-scheduler", pod.Spec.SchedulerName)
@@ -109,7 +109,7 @@ func TestAddMetadataToPod_WithEmptyQueueLabel(t *testing.T) {
109109
}
110110
}
111111

112-
func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) {
112+
func TestAddMetadataToChildResource_PreservesExistingPodLabels(t *testing.T) {
113113
a := assert.New(t)
114114
scheduler := &KaiScheduler{}
115115
ctx := context.Background()
@@ -126,8 +126,8 @@ func TestAddMetadataToPod_PreservesExistingPodLabels(t *testing.T) {
126126
"app": "ray",
127127
}
128128

129-
// Call AddMetadataToPod
130-
scheduler.AddMetadataToPod(ctx, rayCluster, "test-group", pod)
129+
// Call AddMetadataToChildResource
130+
scheduler.AddMetadataToChildResource(ctx, rayCluster, pod, "test-group")
131131

132132
// Assert scheduler name is set
133133
a.Equal("kai-scheduler", pod.Spec.SchedulerName)

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins.go

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,30 @@ func (k *KubeScheduler) DoBatchSchedulingOnSubmission(ctx context.Context, objec
9393
return nil
9494
}
9595

96-
// AddMetadataToPod adds essential labels and annotations to the Ray pod
97-
// the scheduler needs these labels and annotations in order to do the scheduling properly
98-
func (k *KubeScheduler) AddMetadataToPod(_ context.Context, rayCluster *rayv1.RayCluster, _ string, pod *corev1.Pod) {
99-
// when gang scheduling is enabled, extra labels need to be added to all pods
100-
if k.isGangSchedulingEnabled(rayCluster) {
101-
pod.Labels[kubeSchedulerPodGroupLabelKey] = rayCluster.Name
96+
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, _ string) {
97+
// when gang scheduling is enabled, extra labels need to be added to all child resources
98+
if k.isGangSchedulingEnabled(parent) {
99+
labels := child.GetLabels()
100+
if labels == nil {
101+
labels = make(map[string]string)
102+
}
103+
labels[kubeSchedulerPodGroupLabelKey] = parent.GetName()
104+
child.SetLabels(labels)
102105
}
103-
pod.Spec.SchedulerName = k.Name()
106+
addSchedulerNameToObject(child, k.Name())
104107
}
105108

106-
func (k *KubeScheduler) AddMetadataToChildResource(_ context.Context, _ metav1.Object, _ metav1.Object, _ string) {
109+
func addSchedulerNameToObject(obj metav1.Object, schedulerName string) {
110+
switch obj := obj.(type) {
111+
case *corev1.Pod:
112+
obj.Spec.SchedulerName = schedulerName
113+
case *corev1.PodTemplateSpec:
114+
obj.Spec.SchedulerName = schedulerName
115+
}
107116
}
108117

109-
func (k *KubeScheduler) isGangSchedulingEnabled(app *rayv1.RayCluster) bool {
110-
_, exist := app.Labels[utils.RayGangSchedulingEnabled]
118+
func (k *KubeScheduler) isGangSchedulingEnabled(obj metav1.Object) bool {
119+
_, exist := obj.GetLabels()[utils.RayGangSchedulingEnabled]
111120
return exist
112121
}
113122

ray-operator/controllers/ray/batchscheduler/scheduler-plugins/scheduler_plugins_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func TestCreatePodGroupWithMultipleHosts(t *testing.T) {
117117
a.Equal(int32(5), podGroup.Spec.MinMember)
118118
}
119119

120-
func TestAddMetadataToPod(t *testing.T) {
120+
func TestAddMetadataToChildResource(t *testing.T) {
121121
tests := []struct {
122122
name string
123123
enableGang bool
@@ -150,7 +150,7 @@ func TestAddMetadataToPod(t *testing.T) {
150150
}
151151

152152
scheduler := &KubeScheduler{}
153-
scheduler.AddMetadataToPod(context.TODO(), &cluster, "worker", pod)
153+
scheduler.AddMetadataToChildResource(context.TODO(), &cluster, pod, "worker")
154154

155155
if tt.enableGang {
156156
a.Equal(cluster.Name, pod.Labels[kubeSchedulerPodGroupLabelKey])

0 commit comments

Comments
 (0)