Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package volcano
import (
"context"
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -25,9 +26,11 @@ import (
)

const (
PodGroupName = "podgroups.scheduling.volcano.sh"
pluginName = "volcano"
QueueNameLabelKey = "volcano.sh/queue-name"
PodGroupName = "podgroups.scheduling.volcano.sh"
pluginName = "volcano"
QueueNameLabelKey = "volcano.sh/queue-name"
NetworkTopologyModeLabelKey = "volcano.sh/network-topology-mode"
NetworkTopologyHighestTierAllowedLabelKey = "volcano.sh/network-topology-highest-tier-allowed"
)

type VolcanoBatchScheduler struct {
Expand Down Expand Up @@ -155,7 +158,11 @@ func (v *VolcanoBatchScheduler) syncPodGroup(ctx context.Context, owner metav1.O
return err
}

podGroup := createPodGroup(owner, podGroupName, size, totalResource)
podGroup, err := createPodGroup(owner, podGroupName, size, totalResource)
if err != nil {
logger.Error(err, "Failed to create pod group specification", "PodGroup.Error", err)
return err
}
if err := v.cli.Create(ctx, &podGroup); err != nil {
if errors.IsAlreadyExists(err) {
logger.Info("podGroup already exists, no need to create", "name", podGroupName)
Expand Down Expand Up @@ -187,7 +194,7 @@ func (v *VolcanoBatchScheduler) calculatePodGroupParams(ctx context.Context, ray
return utils.CalculateMinReplicas(rayCluster) + 1, utils.CalculateMinResources(rayCluster)
}

func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) volcanoschedulingv1beta1.PodGroup {
func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalResource corev1.ResourceList) (volcanoschedulingv1beta1.PodGroup, error) {
var ownerRef metav1.OwnerReference
switch obj := owner.(type) {
case *rayv1.RayCluster:
Expand All @@ -211,14 +218,30 @@ func createPodGroup(owner metav1.Object, podGroupName string, size int32, totalR
},
}

// Handle network topology configuration
mode, modeOk := owner.GetLabels()[NetworkTopologyModeLabelKey]
if modeOk {
podGroup.Spec.NetworkTopology = &volcanoschedulingv1beta1.NetworkTopologySpec{
Mode: volcanoschedulingv1beta1.NetworkTopologyMode(mode),
}
highestTier, tierOk := owner.GetLabels()[NetworkTopologyHighestTierAllowedLabelKey]
if tierOk {
highestTierInt, err := strconv.Atoi(highestTier)
if err != nil {
return podGroup, fmt.Errorf("failed to convert %s label to int: %w for podgroup %s in namespace %s", NetworkTopologyHighestTierAllowedLabelKey, err, podGroupName, owner.GetNamespace())
}
podGroup.Spec.NetworkTopology.HighestTierAllowed = &highestTierInt
}
}

if queue, ok := owner.GetLabels()[QueueNameLabelKey]; ok {
podGroup.Spec.Queue = queue
}
if priorityClassName, ok := owner.GetLabels()[utils.RayPriorityClassName]; ok {
podGroup.Spec.PriorityClassName = priorityClassName
}

return podGroup
return podGroup, nil
}

func (v *VolcanoBatchScheduler) AddMetadataToChildResource(_ context.Context, parent metav1.Object, child metav1.Object, groupName string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func TestCreatePodGroupForRayCluster(t *testing.T) {

minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
totalResource := utils.CalculateDesiredResources(&cluster)
pg := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
require.NoError(t, err)

a.Equal(cluster.Namespace, pg.Namespace)

Expand All @@ -185,7 +186,8 @@ func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) {

minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
totalResource := utils.CalculateDesiredResources(&cluster)
pg := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
require.NoError(t, err)

a.Equal(cluster.Namespace, pg.Namespace)

Expand All @@ -206,6 +208,74 @@ func TestCreatePodGroupForRayCluster_NumOfHosts2(t *testing.T) {
a.Equal("4", pg.Spec.MinResources.Name("nvidia.com/gpu", resource.BinarySI).String())
}

func createTestRayClusterWithLabels(labels map[string]string) rayv1.RayCluster {
cluster := createTestRayCluster(1)
if cluster.ObjectMeta.Labels == nil {
cluster.ObjectMeta.Labels = make(map[string]string)
}
for k, v := range labels {
cluster.ObjectMeta.Labels[k] = v
}
return cluster
}

func TestCreatePodGroup_NetworkTopologyBothLabels(t *testing.T) {
a := assert.New(t)

// Test with both network topology mode and highest tier allowed
cluster := createTestRayClusterWithLabels(map[string]string{
NetworkTopologyModeLabelKey: "soft",
NetworkTopologyHighestTierAllowedLabelKey: "3",
})

minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
totalResource := utils.CalculateDesiredResources(&cluster)
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
require.NoError(t, err)

a.Equal(cluster.Namespace, pg.Namespace)
a.Equal(volcanoschedulingv1beta1.NetworkTopologyMode("soft"), pg.Spec.NetworkTopology.Mode)
a.NotNil(pg.Spec.NetworkTopology.HighestTierAllowed)
a.Equal(3, *pg.Spec.NetworkTopology.HighestTierAllowed)
}

func TestCreatePodGroup_NetworkTopologyOnlyModeLabel(t *testing.T) {
a := assert.New(t)

// Test with only network topology mode set
cluster := createTestRayClusterWithLabels(map[string]string{
NetworkTopologyModeLabelKey: "hard",
})

minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
totalResource := utils.CalculateDesiredResources(&cluster)
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)
require.NoError(t, err)

a.Equal(cluster.Namespace, pg.Namespace)
a.NotNil(pg.Spec.NetworkTopology)
a.Equal(volcanoschedulingv1beta1.NetworkTopologyMode("hard"), pg.Spec.NetworkTopology.Mode)
a.Nil(pg.Spec.NetworkTopology.HighestTierAllowed)
}

func TestCreatePodGroup_NetworkTopologyHighestTierAllowedNotInt(t *testing.T) {
a := assert.New(t)

// Test with network topology mode set and highest tier allowed is not an int
cluster := createTestRayClusterWithLabels(map[string]string{
NetworkTopologyModeLabelKey: "soft",
NetworkTopologyHighestTierAllowedLabelKey: "not-an-int",
})

minMember := utils.CalculateDesiredReplicas(context.Background(), &cluster) + 1
totalResource := utils.CalculateDesiredResources(&cluster)
pg, err := createPodGroup(&cluster, getAppPodGroupName(&cluster), minMember, totalResource)

require.Error(t, err)
a.Contains(err.Error(), "failed to convert "+NetworkTopologyHighestTierAllowedLabelKey+" label to int")
a.Equal(cluster.Namespace, pg.Namespace)
}

func TestCreatePodGroupForRayJob(t *testing.T) {
a := assert.New(t)
ctx := context.Background()
Expand Down
Loading