diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go index 44ccb55d5d8..0bd0b9deb58 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler.go @@ -3,6 +3,7 @@ package volcano import ( "context" "fmt" + "strconv" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -25,9 +26,11 @@ import ( ) const ( - PodGroupName = "podgroups.scheduling.volcano.sh" - pluginName = "volcano" - QueueNameLabelKey = "volcano.sh/queue-name" + PodGroupName = "podgroups.scheduling.volcano.sh" + pluginName = "volcano" + QueueNameLabelKey = "volcano.sh/queue-name" + NetworkTopologyModeLabelKey = "volcano.sh/network-topology-mode" + NetworkTopologyHighestTierAllowedLabelKey = "volcano.sh/network-topology-highest-tier-allowed" ) type VolcanoBatchScheduler struct { @@ -155,7 +158,11 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.O return err } - podGroup := createPodGroup(owner, podGroupName, size, totalResource) + podGroup, err := createPodGroup(owner, podGroupName, size, totalResource) + if err != nil { + logger.Error(err, "Failed to create pod group specification", "PodGroup.Error", err) + return err + } if err := v.cli.Create(ctx, &podGroup); err != nil { if errors.IsAlreadyExists(err) { logger.Info("podGroup already exists, no need to create", "name", podGroupName) @@ -187,7 +194,7 @@ func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, ray return utils.CalculateMinReplicas(rayCluster) + 1, utils.CalculateMinResources(rayCluster) } -func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup { +func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) (volcanoschedulingv1beta1.PodGroup, error) { var ownerRef metav1.OwnerReference switch obj := owner.(type) { case *rayv1.RayCluster: @@ -211,6 +218,22 @@ func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalR }, } + // Handle network topology configuration + mode, modeOk := owner.GetLabels()[NetworkTopologyModeLabelKey] + if modeOk { + podGroup.Spec.NetworkTopology = &volcanoschedulingv1beta1.NetworkTopologySpec{ + Mode: volcanoschedulingv1beta1.NetworkTopologyMode(mode), + } + highestTier, tierOk := owner.GetLabels()[NetworkTopologyHighestTierAllowedLabelKey] + if tierOk { + highestTierInt, err := strconv.Atoi(highestTier) + if err != nil { + return podGroup, fmt.Errorf("failed to convert %s label to int: %w for podgroup %s in namespace %s", NetworkTopologyHighestTierAllowedLabelKey, err, podGroupName, owner.GetNamespace()) + } + podGroup.Spec.NetworkTopology.HighestTierAllowed = &highestTierInt + } + } + if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok { podGroup.Spec.Queue = queue } @@ -218,7 +241,7 @@ func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalR podGroup.Spec.PriorityClassName = priorityClassName } - return podGroup + return podGroup, nil } func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, groupName string) { diff --git a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go index d137c01f76c..7783c3daf60 100644 --- a/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go +++ b/ray-operator/controllers/ray/batchscheduler/volcano/volcano_scheduler_test.go @@ -161,7 +161,8 @@ func TestCreatePodGroupForRayCluster(t *testing.T) { minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1 totalResource := utils.CalculateDesiredResources(&cluster) - pg := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource) + pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource) + require.NoError(t, err) a.Equal(cluster.Namespace, pg.Namespace) @@ -185,7 +186,8 @@ func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) { minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1 totalResource := utils.CalculateDesiredResources(&cluster) - pg := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource) + pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource) + require.NoError(t, err) a.Equal(cluster.Namespace, pg.Namespace) @@ -206,6 +208,74 @@ func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) { a.Equal("4", pg.Spec.MinResources.Name("nvidia.com/gpu", resource.BinarySI).String()) } +func createTestRayClusterWithLabels(labels map[string]string) rayv1.RayCluster { + cluster := createTestRayCluster(1) + if cluster.ObjectMeta.Labels == nil { + cluster.ObjectMeta.Labels = make(map[string]string) + } + for k, v := range labels { + cluster.ObjectMeta.Labels[k] = v + } + return cluster +} + +func TestCreatePodGroup_NetworkTopologyBothLabels(t *testing.T) { + a := assert.New(t) + + // Test with both network topology mode and highest tier allowed + cluster := createTestRayClusterWithLabels(map[string]string{ + NetworkTopologyModeLabelKey: "soft", + NetworkTopologyHighestTierAllowedLabelKey: "3", + }) + + minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1 + totalResource := utils.CalculateDesiredResources(&cluster) + pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource) + require.NoError(t, err) + + a.Equal(cluster.Namespace, pg.Namespace) + a.Equal(volcanoschedulingv1beta1.NetworkTopologyMode("soft"), pg.Spec.NetworkTopology.Mode) + a.NotNil(pg.Spec.NetworkTopology.HighestTierAllowed) + a.Equal(3, *pg.Spec.NetworkTopology.HighestTierAllowed) +} + +func TestCreatePodGroup_NetworkTopologyOnlyModeLabel(t *testing.T) { + a := assert.New(t) + + // Test with only network topology mode set + cluster := createTestRayClusterWithLabels(map[string]string{ + NetworkTopologyModeLabelKey: "hard", + }) + + minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1 + totalResource := utils.CalculateDesiredResources(&cluster) + pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource) + require.NoError(t, err) + + a.Equal(cluster.Namespace, pg.Namespace) + a.NotNil(pg.Spec.NetworkTopology) + a.Equal(volcanoschedulingv1beta1.NetworkTopologyMode("hard"), pg.Spec.NetworkTopology.Mode) + a.Nil(pg.Spec.NetworkTopology.HighestTierAllowed) +} + +func TestCreatePodGroup_NetworkTopologyHighestTierAllowedNotInt(t *testing.T) { + a := assert.New(t) + + // Test with network topology mode set and highest tier allowed is not an int + cluster := createTestRayClusterWithLabels(map[string]string{ + NetworkTopologyModeLabelKey: "soft", + NetworkTopologyHighestTierAllowedLabelKey: "not-an-int", + }) + + minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1 + totalResource := utils.CalculateDesiredResources(&cluster) + pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource) + + require.Error(t, err) + a.Contains(err.Error(), "failed to convert "+NetworkTopologyHighestTierAllowedLabelKey+" label to int") + a.Equal(cluster.Namespace, pg.Namespace) +} + func TestCreatePodGroupForRayJob(t *testing.T) { a := assert.New(t) ctx := context.Background()