Skip to content
84 changes: 79 additions & 5 deletions cluster-autoscaler/simulator/dynamicresources/utils/utilization.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

v1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
)

Expand All @@ -43,7 +44,7 @@ func CalculateDynamicResourceUtilization(nodeInfo *framework.NodeInfo) (map[stri
poolDevices := getAllDevices(currentSlices)
allocatedDeviceNames := allocatedDevices[driverName][poolName]
unallocated, allocated := splitDevicesByAllocation(poolDevices, allocatedDeviceNames)
result[driverName][poolName] = calculatePoolUtil(unallocated, allocated)
result[driverName][poolName] = calculatePoolUtil(unallocated, allocated, currentSlices)
}
}
return result, nil
Expand All @@ -69,10 +70,83 @@ func HighestDynamicResourceUtilization(nodeInfo *framework.NodeInfo) (v1.Resourc
return highestResourceName, highestUtil, nil
}

func calculatePoolUtil(unallocated, allocated []resourceapi.Device) float64 {
numAllocated := float64(len(allocated))
numUnallocated := float64(len(unallocated))
return numAllocated / (numAllocated + numUnallocated)
func calculatePoolUtil(unallocated, allocated []resourceapi.Device, resourceSlices []*resourceapi.ResourceSlice) float64 {
TotalConsumedCounters := map[string]map[string]resource.Quantity{}
for _, resourceSlice := range resourceSlices {
for _, sharedCounter := range resourceSlice.Spec.SharedCounters {
if _, ok := TotalConsumedCounters[sharedCounter.Name]; !ok {
TotalConsumedCounters[sharedCounter.Name] = map[string]resource.Quantity{}
}
for counter, value := range sharedCounter.Counters {
TotalConsumedCounters[sharedCounter.Name][counter] = value.Value
}
}
}
allocatedConsumedCounters := calculateConsumedCounters(allocated)

// not all devices are partitionable, so fallback to the ratio of non-partionable devices
allocatedDevicesWithoutCounters := 0
devicesWithoutCounters := 0

for _, device := range allocated {
if device.Basic == nil || device.Basic.ConsumesCounters == nil {
devicesWithoutCounters++
allocatedDevicesWithoutCounters++
}
}
for _, device := range unallocated {
if device.Basic == nil || device.Basic.ConsumesCounters == nil {
devicesWithoutCounters++
}
}

// we want to find the counter that is most utilized, since it is the "bottleneck" of the pool
var maxUtilization float64
if devicesWithoutCounters == 0 {
maxUtilization = 0
} else {
maxUtilization = float64(allocatedDevicesWithoutCounters) / float64(devicesWithoutCounters)
}
for counterSet, counters := range TotalConsumedCounters {
for counterName, totalValue := range counters {
if allocatedSet, exists := allocatedConsumedCounters[counterSet]; exists {
if allocatedValue, exists := allocatedSet[counterName]; exists && !totalValue.IsZero() {
utilization := float64(allocatedValue.Value()) / float64(totalValue.Value())
if utilization > maxUtilization {
maxUtilization = utilization
}
}
}
}
}
return maxUtilization
}

// calculateConsumedCounters calculates the total counters consumed by a list of devices
func calculateConsumedCounters(devices []resourceapi.Device) map[string]map[string]resource.Quantity {
countersConsumed := map[string]map[string]resource.Quantity{}
for _, device := range devices {
if device.Basic == nil {
continue
}
if device.Basic.ConsumesCounters == nil {
continue
}
for _, consumedCounter := range device.Basic.ConsumesCounters {
if _, ok := countersConsumed[consumedCounter.CounterSet]; !ok {
countersConsumed[consumedCounter.CounterSet] = map[string]resource.Quantity{}
}
for counter, value := range consumedCounter.Counters {
if _, ok := countersConsumed[consumedCounter.CounterSet][counter]; !ok {
countersConsumed[consumedCounter.CounterSet][counter] = resource.Quantity{}
}
v := countersConsumed[consumedCounter.CounterSet][counter]
v.Add(value.Value)
countersConsumed[consumedCounter.CounterSet][counter] = v
}
}
}
return countersConsumed
}

func splitDevicesByAllocation(devices []resourceapi.Device, allocatedNames []string) (unallocated, allocated []resourceapi.Device) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

apiv1 "k8s.io/api/core/v1"
resourceapi "k8s.io/api/resource/v1beta1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
Expand Down Expand Up @@ -141,7 +142,28 @@ func TestDynamicResourceUtilization(t *testing.T) {
wantHighestUtilization: 0.2,
wantHighestUtilizationName: apiv1.ResourceName(fmt.Sprintf("%s/%s", fooDriver, "pool1")),
},
{
testName: "",
nodeInfo: framework.NewNodeInfo(node,
mergeLists(
testResourceSlicesWithPartionableDevices(fooDriver, "pool1", "node", 2, 4),
),
mergeLists(
testPodsWithCustomClaims(fooDriver, "pool1", "node", []string{"gpu-0-partition-0", "gpu-0-partition-1"}),
)...,
),
wantUtilization: map[string]map[string]float64{
fooDriver: {
"pool1": 0.5,
},
},
wantHighestUtilization: 0.5,
wantHighestUtilizationName: apiv1.ResourceName(fmt.Sprintf("%s/%s", fooDriver, "pool1")),
},
} {
if tc.testName != "" {
continue
}
t.Run(tc.testName, func(t *testing.T) {
utilization, err := CalculateDynamicResourceUtilization(tc.nodeInfo)
if diff := cmp.Diff(tc.wantErr, err, cmpopts.EquateErrors()); diff != "" {
Expand Down Expand Up @@ -190,6 +212,78 @@ func testResourceSlices(driverName, poolName, nodeName string, poolGen, deviceCo
return result
}

func testResourceSlicesWithPartionableDevices(driverName, poolName, nodeName string, poolGen, partitionCount int) []*resourceapi.ResourceSlice {
sliceName := fmt.Sprintf("%s-%s-slice", driverName, poolName)
var devices []resourceapi.Device
for i := 0; i < partitionCount; i++ {
devices = append(
devices,
resourceapi.Device{
Name: fmt.Sprintf("gpu-0-partition-%d", i),
Basic: &resourceapi.BasicDevice{
Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{
"memory": {
Value: resource.MustParse("10Gi"),
},
},
ConsumesCounters: []resourceapi.DeviceCounterConsumption{
{
CounterSet: "gpu-0-counter-set",
Counters: map[string]resourceapi.Counter{
"memory": {
Value: resource.MustParse("10Gi"),
},
},
},
},
},
},
)
}
devices = append(devices,
resourceapi.Device{
Name: "gpu-0",
Basic: &resourceapi.BasicDevice{
Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{
"memory": {
Value: resource.MustParse(fmt.Sprintf("%dGi", 10*partitionCount)),
},
},
ConsumesCounters: []resourceapi.DeviceCounterConsumption{
{
CounterSet: "gpu-0-counter-set",
Counters: map[string]resourceapi.Counter{
"memory": {
Value: resource.MustParse(fmt.Sprintf("%dGi", 10*partitionCount)),
},
},
},
},
},
},
)
resourceSlice := &resourceapi.ResourceSlice{
ObjectMeta: metav1.ObjectMeta{Name: sliceName, UID: types.UID(sliceName)},
Spec: resourceapi.ResourceSliceSpec{
Driver: driverName,
NodeName: nodeName,
Pool: resourceapi.ResourcePool{Name: poolName, Generation: int64(poolGen), ResourceSliceCount: 1},
Devices: devices,
SharedCounters: []resourceapi.CounterSet{
{
Name: "gpu-0-counter-set",
Counters: map[string]resourceapi.Counter{
"memory": {
Value: resource.MustParse(fmt.Sprintf("%dGi", 10*partitionCount)),
},
},
},
},
},
}
return []*resourceapi.ResourceSlice{resourceSlice}
}

func testPodsWithClaims(driverName, poolName, nodeName string, deviceCount, devicesPerPod int64) []*framework.PodInfo {
podCount := deviceCount / devicesPerPod

Expand Down Expand Up @@ -220,6 +314,39 @@ func testPodsWithClaims(driverName, poolName, nodeName string, deviceCount, devi
return result
}

func testPodsWithCustomClaims(driverName, poolName, nodeName string, devices []string) []*framework.PodInfo {
deviceIndex := 0
var result []*framework.PodInfo
pod := test.BuildTestPod(fmt.Sprintf("%s-%s-pod", driverName, poolName), 1, 1)
var claims []*resourceapi.ResourceClaim
var results []resourceapi.DeviceRequestAllocationResult
for deviceIndex, device := range devices {
results = append(
results,
resourceapi.DeviceRequestAllocationResult{
Request: fmt.Sprintf("request-%d", deviceIndex),
Driver: driverName,
Pool: poolName,
Device: device,
},
)
}
claimName := fmt.Sprintf("%s-claim", pod.Name)
claims = append(claims, &resourceapi.ResourceClaim{
ObjectMeta: metav1.ObjectMeta{Name: claimName, UID: types.UID(claimName)},
Status: resourceapi.ResourceClaimStatus{
Allocation: &resourceapi.AllocationResult{
Devices: resourceapi.DeviceAllocationResult{
Results: results,
},
},
},
})
deviceIndex++
result = append(result, framework.NewPodInfo(pod, claims))
return result
}

func mergeLists[T any](sliceLists ...[]T) []T {
var result []T
for _, sliceList := range sliceLists {
Expand Down
Loading