Skip to content

Commit dd38daa

Browse files
committed
Append submitter resources
Signed-off-by: win5923 <[email protected]>
1 parent 92e7d95 commit dd38daa

File tree

4 files changed

+130
-30
lines changed

4 files changed

+130
-30
lines changed

ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
2222
schedulerinterface "github.com/ray-project/kuberay/ray-operator/controllers/ray/batchscheduler/interface"
23+
"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
2324
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
2425
)
2526

@@ -65,17 +66,25 @@ func (v *VolcanoBatchScheduler) handleRayCluster(ctx context.Context, raycluster
6566
}
6667

6768
// handleRayJob calculates the PodGroup MinMember and MinResources for a RayJob
68-
// The submitter pod is intentionally excluded from MinMember calculation.
69-
// Including it before the RayCluster is ready may prevent the PodGroup from
70-
// ever meeting the MinMember requirement, leaving all pods stuck in Pending.
7169
func (v *VolcanoBatchScheduler) handleRayJob(ctx context.Context, rayJob *rayv1.RayJob) error {
7270
if rayJob.Spec.RayClusterSpec == nil {
7371
return fmt.Errorf("gang scheduling does not support RayJob %s/%s referencing an existing RayCluster", rayJob.Namespace, rayJob.Name)
7472
}
7573

74+
totalResourceList := []corev1.ResourceList{{}}
7675
minMember, totalResource := v.calculatePodGroupParams(ctx, rayJob.Spec.RayClusterSpec)
76+
totalResourceList = append(totalResourceList, totalResource)
7777

78-
return v.syncPodGroup(ctx, rayJob, minMember, totalResource)
78+
// MinMember intentionally excludes the submitter pod to avoid a startup deadlock
79+
// (submitter waits for cluster; gang would wait for submitter). We still add the
80+
// submitter's resource requests into MinResources so capacity is reserved.
81+
if rayJob.Spec.SubmissionMode == rayv1.K8sJobMode {
82+
submitterTemplate := common.GetSubmitterTemplate(&rayJob.Spec, rayJob.Spec.RayClusterSpec)
83+
submitResource := utils.CalculatePodResource(submitterTemplate.Spec)
84+
totalResourceList = append(totalResourceList, submitResource)
85+
}
86+
87+
return v.syncPodGroup(ctx, rayJob, minMember, utils.SumResourceList(totalResourceList))
7988
}
8089

8190
func getAppPodGroupName(object metav1.Object) string {
@@ -195,7 +204,6 @@ func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalR
195204
},
196205
}
197206

198-
// Copy scheduling labels to PodGroup spec
199207
if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok {
200208
podGroup.Spec.Queue = queue
201209
}

ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go

Lines changed: 113 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@ import (
55
"testing"
66

77
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
89
corev1 "k8s.io/api/core/v1"
910
"k8s.io/apimachinery/pkg/api/resource"
1011
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
12+
"k8s.io/apimachinery/pkg/runtime"
1113
"k8s.io/utils/ptr"
14+
"sigs.k8s.io/controller-runtime/pkg/client"
15+
"sigs.k8s.io/controller-runtime/pkg/client/fake"
1216
volcanobatchv1alpha1 "volcano.sh/apis/pkg/apis/batch/v1alpha1"
1317
volcanoschedulingv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
1418

@@ -124,7 +128,7 @@ func createTestRayJob(numOfHosts int32) rayv1.RayJob {
124128
Namespace: "default",
125129
Labels: map[string]string{
126130
QueueNameLabelKey: "test-queue",
127-
utils.RayPriorityClassName: "high-priority",
131+
utils.RayPriorityClassName: "test-priority",
128132
},
129133
},
130134
Spec: rayv1.RayJobSpec{
@@ -204,32 +208,120 @@ func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) {
204208

205209
func TestCreatePodGroupForRayJob(t *testing.T) {
206210
a := assert.New(t)
211+
ctx := context.Background()
207212

208-
rayJob := createTestRayJob(1)
213+
scheme := runtime.NewScheme()
214+
a.NoError(rayv1.AddToScheme(scheme))
215+
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
216+
fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build()
217+
scheduler := &VolcanoBatchScheduler{cli: fakeCli}
209218

210-
// Create RayCluster from RayJob spec for calculation
211-
rayCluster := &rayv1.RayCluster{
212-
Spec: *rayJob.Spec.RayClusterSpec,
213-
}
219+
t.Run("No submitter pod resources", func(_ *testing.T) {
220+
rayJob := createTestRayJob(1)
221+
rayJob.Spec.SubmissionMode = rayv1.HTTPMode
214222

215-
minMember := utils.CalculateDesiredReplicas(context.Background(), rayCluster) + 1
216-
totalResource := utils.CalculateDesiredResources(rayCluster)
217-
pg := createPodGroup(&rayJob, getAppPodGroupName(&rayJob), minMember, totalResource)
223+
err := scheduler.handleRayJob(ctx, &rayJob)
224+
require.NoError(t, err)
218225

219-
a.Equal(rayJob.Namespace, pg.Namespace)
220-
a.Equal("ray-rayjob-sample-pg", pg.Name)
226+
var pg volcanoschedulingv1beta1.PodGroup
227+
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg)
228+
require.NoError(t, err)
221229

222-
// Verify owner reference is set to RayJob
223-
a.Len(pg.OwnerReferences, 1)
224-
a.Equal("RayJob", pg.OwnerReferences[0].Kind)
225-
a.Equal(rayJob.Name, pg.OwnerReferences[0].Name)
230+
// 1 head + 2 workers (desired, not min replicas)
231+
a.Equal(int32(3), pg.Spec.MinMember)
232+
// 256m * 3 (requests, not limits)
233+
a.Equal("768m", pg.Spec.MinResources.Cpu().String())
234+
// 256m * 3 (requests, not limits)
235+
a.Equal("768Mi", pg.Spec.MinResources.Memory().String())
236+
a.Equal("test-queue", pg.Spec.Queue)
237+
a.Equal("test-priority", pg.Spec.PriorityClassName)
238+
a.Len(pg.OwnerReferences, 1)
239+
a.Equal("RayJob", pg.OwnerReferences[0].Kind)
240+
})
226241

227-
// Verify queue and priority class are set from RayJob labels
228-
a.Equal("test-queue", pg.Spec.Queue)
229-
a.Equal("high-priority", pg.Spec.PriorityClassName)
242+
t.Run("K8sJobMode includes submitter pod resources", func(_ *testing.T) {
243+
rayJob := createTestRayJob(1)
244+
rayJob.Spec.SubmissionMode = rayv1.K8sJobMode
245+
246+
err := scheduler.handleRayJob(ctx, &rayJob)
247+
require.NoError(t, err)
248+
249+
var pg volcanoschedulingv1beta1.PodGroup
250+
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg)
251+
require.NoError(t, err)
252+
253+
// 1 head + 2 workers (desired, not min replicas)
254+
a.Equal(int32(3), pg.Spec.MinMember)
255+
// 768m + 500m = 1268m
256+
a.Equal("1268m", pg.Spec.MinResources.Cpu().String())
257+
// 768Mi + 200Mi = 968Mi
258+
a.Equal("968Mi", pg.Spec.MinResources.Memory().String())
259+
a.Equal("test-queue", pg.Spec.Queue)
260+
a.Equal("test-priority", pg.Spec.PriorityClassName)
261+
a.Len(pg.OwnerReferences, 1)
262+
a.Equal("RayJob", pg.OwnerReferences[0].Kind)
263+
})
264+
}
230265

231-
// 1 head + 2 workers (desired, not min replicas)
232-
a.Equal(int32(3), pg.Spec.MinMember)
266+
func TestCreatePodGroupForRayJob_NumOfHosts2(t *testing.T) {
267+
a := assert.New(t)
268+
ctx := context.Background()
269+
270+
scheme := runtime.NewScheme()
271+
a.NoError(rayv1.AddToScheme(scheme))
272+
a.NoError(volcanoschedulingv1beta1.AddToScheme(scheme))
273+
fakeCli := fake.NewClientBuilder().WithScheme(scheme).Build()
274+
scheduler := &VolcanoBatchScheduler{cli: fakeCli}
275+
276+
t.Run("No submitter pod resources", func(_ *testing.T) {
277+
rayJob := createTestRayJob(2)
278+
rayJob.Spec.SubmissionMode = rayv1.HTTPMode
279+
280+
err := scheduler.handleRayJob(ctx, &rayJob)
281+
require.NoError(t, err)
282+
283+
var pg volcanoschedulingv1beta1.PodGroup
284+
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg)
285+
require.NoError(t, err)
286+
287+
// 2 workers (desired, not min replicas) * 2 (num of hosts) + 1 head
288+
// 2 * 2 + 1 = 5
289+
a.Equal(int32(5), pg.Spec.MinMember)
290+
// 256m * (2 (requests, not limits) * 2 (num of hosts) + 1 head)
291+
// 256m * 5 = 1280m
292+
a.Equal("1280m", pg.Spec.MinResources.Cpu().String())
293+
// 256Mi * (2 (requests, not limits) * 2 (num of hosts) + 1 head)
294+
// 256Mi * 5 = 1280Mi
295+
a.Equal("1280Mi", pg.Spec.MinResources.Memory().String())
296+
a.Equal("test-queue", pg.Spec.Queue)
297+
a.Equal("test-priority", pg.Spec.PriorityClassName)
298+
a.Len(pg.OwnerReferences, 1)
299+
a.Equal("RayJob", pg.OwnerReferences[0].Kind)
300+
})
301+
302+
t.Run("K8sJobMode includes submitter pod resources", func(_ *testing.T) {
303+
rayJob := createTestRayJob(2)
304+
rayJob.Spec.SubmissionMode = rayv1.K8sJobMode
305+
306+
err := scheduler.handleRayJob(ctx, &rayJob)
307+
require.NoError(t, err)
308+
309+
var pg volcanoschedulingv1beta1.PodGroup
310+
err = fakeCli.Get(ctx, client.ObjectKey{Namespace: rayJob.Namespace, Name: getAppPodGroupName(&rayJob)}, &pg)
311+
require.NoError(t, err)
312+
313+
// 2 workers (desired, not min replicas) * 2 (num of hosts) + 1 head
314+
// 2 * 2 + 1 = 5
315+
a.Equal(int32(5), pg.Spec.MinMember)
316+
// 1280m + 500m = 1780m
317+
a.Equal("1780m", pg.Spec.MinResources.Cpu().String())
318+
// 1280Mi + 200Mi = 1480Mi
319+
a.Equal("1480Mi", pg.Spec.MinResources.Memory().String())
320+
a.Equal("test-queue", pg.Spec.Queue)
321+
a.Equal("test-priority", pg.Spec.PriorityClassName)
322+
a.Len(pg.OwnerReferences, 1)
323+
a.Equal("RayJob", pg.OwnerReferences[0].Kind)
324+
})
233325
}
234326

235327
func TestAddMetadataToSubmitterPod(t *testing.T) {
@@ -253,7 +345,7 @@ func TestAddMetadataToSubmitterPod(t *testing.T) {
253345

254346
// Check labels
255347
a.Equal("test-queue", submitterTemplate.Labels[QueueNameLabelKey])
256-
a.Equal("high-priority", submitterTemplate.Labels[utils.RayPriorityClassName])
348+
a.Equal("test-priority", submitterTemplate.Labels[utils.RayPriorityClassName])
257349

258350
// Check scheduler name
259351
a.Equal(pluginName, submitterTemplate.Spec.SchedulerName)

ray-operator/controllers/ray/rayjob_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -937,7 +937,7 @@ func (r *RayJobReconciler) getOrCreateRayClusterInstance(ctx context.Context, ra
937937
if err != nil {
938938
return nil, err
939939
}
940-
if r.options.BatchSchedulerManager != nil && rayJobInstance.Spec.SubmissionMode == rayv1.K8sJobMode {
940+
if r.options.BatchSchedulerManager != nil {
941941
if scheduler, err := r.options.BatchSchedulerManager.GetScheduler(); err == nil {
942942
// Group name is only used for individual pods to specify their task group ("headgroup", "worker-group-1", etc.).
943943
// RayCluster contains multiple groups, so we pass an empty string.

ray-operator/controllers/ray/utils/util.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ func CalculateDesiredResources(cluster *rayv1.RayCluster) corev1.ResourceList {
440440
desiredResourcesList = append(desiredResourcesList, podResource)
441441
}
442442
}
443-
return sumResourceList(desiredResourcesList)
443+
return SumResourceList(desiredResourcesList)
444444
}
445445

446446
func CalculateMinResources(cluster *rayv1.RayCluster) corev1.ResourceList {
@@ -454,7 +454,7 @@ func CalculateMinResources(cluster *rayv1.RayCluster) corev1.ResourceList {
454454
minResourcesList = append(minResourcesList, podResource)
455455
}
456456
}
457-
return sumResourceList(minResourcesList)
457+
return SumResourceList(minResourcesList)
458458
}
459459

460460
// calculateReplicaResource adjusts the resource quantities in a given ResourceList
@@ -503,7 +503,7 @@ func ConvertResourceListToMapString(resourceList corev1.ResourceList) map[string
503503
return result
504504
}
505505

506-
func sumResourceList(list []corev1.ResourceList) corev1.ResourceList {
506+
func SumResourceList(list []corev1.ResourceList) corev1.ResourceList {
507507
totalResource := corev1.ResourceList{}
508508
for _, l := range list {
509509
for name, quantity := range l {

0 commit comments

Comments
 (0)