Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 35 additions & 4 deletions cluster-autoscaler/expander/grpcplugin/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

v1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -111,11 +112,20 @@ func populateOptionsForGRPC(expansionOptions []expander.Option) ([]*protos.Optio
nodeGroupIDOptionMap := make(map[string]expander.Option)
for _, option := range expansionOptions {
nodeGroupIDOptionMap[option.NodeGroup.Id()] = option
grpcOptionsSlice = append(grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods))
similarNodeGroupIds := getSimilarNodeGroupIds(option)
grpcOptionsSlice = append(grpcOptionsSlice, newOptionMessage(option.NodeGroup.Id(), int32(option.NodeCount), option.Debug, option.Pods, similarNodeGroupIds))
}
return grpcOptionsSlice, nodeGroupIDOptionMap
}

func getSimilarNodeGroupIds(option expander.Option) []string {
var similarNodeGroupIds []string
for _, sng := range option.SimilarNodeGroups {
similarNodeGroupIds = append(similarNodeGroupIds, sng.Id())
}
return similarNodeGroupIds
}

// populateNodeInfoForGRPC looks at the corresponding v1.Node object per NodeInfo object, and populates the grpcNodeInfoMap with these to pass over grpc
func populateNodeInfoForGRPC(nodeInfos map[string]*schedulerframework.NodeInfo) map[string]*v1.Node {
grpcNodeInfoMap := make(map[string]*v1.Node)
Expand All @@ -133,7 +143,9 @@ func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Op
continue
}
if _, ok := nodeGroupIDOptionMap[option.NodeGroupId]; ok {
options = append(options, nodeGroupIDOptionMap[option.NodeGroupId])
expanderOption := nodeGroupIDOptionMap[option.NodeGroupId]
expanderOption.SimilarNodeGroups = getRetainedSimilarNodegroups(option, expanderOption)
options = append(options, expanderOption)
} else {
klog.Errorf("GRPC server returned invalid nodeGroup ID: ", option.NodeGroupId)
continue
Expand All @@ -142,6 +154,25 @@ func transformAndSanitizeOptionsFromGRPC(bestOptionsResponseOptions []*protos.Op
return options
}

func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod) *protos.Option {
return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods}
// Any similar options that were not included in the original grpcExpander request, but were added
// as part of the response, will be ignored
func getRetainedSimilarNodegroups(grpcOption *protos.Option, expanderOption expander.Option) []cloudprovider.NodeGroup {
var retainedSimilarNodeGroups []cloudprovider.NodeGroup
for _, sng := range expanderOption.SimilarNodeGroups {
retained := false
for _, id := range grpcOption.SimilarNodeGroupIds {
if sng.Id() == id {
retained = true
continue
}
}
if retained {
retainedSimilarNodeGroups = append(retainedSimilarNodeGroups, sng)
}
}
return retainedSimilarNodeGroups
}

func newOptionMessage(nodeGroupId string, nodeCount int32, debug string, pods []*v1.Pod, similarNodeGroupIds []string) *protos.Option {
return &protos.Option{NodeGroupId: nodeGroupId, NodeCount: nodeCount, Debug: debug, Pod: pods, SimilarNodeGroupIds: similarNodeGroupIds}
}
74 changes: 63 additions & 11 deletions cluster-autoscaler/expander/grpcplugin/grpc_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/expander/grpcplugin/protos"
"k8s.io/autoscaler/cluster-autoscaler/expander/mocks"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -58,6 +59,11 @@ var (
Debug: "m4.4xlarge",
NodeGroup: test.NewTestNodeGroup("my-asg.m4.4xlarge", 10, 1, 1, true, false, "m4.4xlarge", nil, nil),
}
eoT2MicroWithSimilar = expander.Option{
Debug: "t2.micro",
NodeGroup: test.NewTestNodeGroup("my-asg.t2.micro", 10, 1, 1, true, false, "t2.micro", nil, nil),
SimilarNodeGroups: []cloudprovider.NodeGroup{test.NewTestNodeGroup("my-similar-asg.t2.micro", 10, 1, 1, true, false, "t2.micro", nil, nil)},
}
options = []expander.Option{eoT2Micro, eoT2Large, eoT3Large, eoM44XLarge}

grpcEoT2Micro = protos.Option{
Expand All @@ -84,6 +90,20 @@ var (
Debug: eoM44XLarge.Debug,
Pod: eoM44XLarge.Pods,
}
grpcEoT2MicroWithSimilar = protos.Option{
NodeGroupId: eoT2Micro.NodeGroup.Id(),
NodeCount: int32(eoT2Micro.NodeCount),
Debug: eoT2Micro.Debug,
Pod: eoT2Micro.Pods,
SimilarNodeGroupIds: []string{eoT2MicroWithSimilar.SimilarNodeGroups[0].Id()},
}
grpcEoT2MicroWithSimilarWithExtraOptions = protos.Option{
NodeGroupId: eoT2Micro.NodeGroup.Id(),
NodeCount: int32(eoT2Micro.NodeCount),
Debug: eoT2Micro.Debug,
Pod: eoT2Micro.Pods,
SimilarNodeGroupIds: []string{eoT2MicroWithSimilar.SimilarNodeGroups[0].Id(), "extra-ng-id"},
}
)

func TestPopulateOptionsForGrpc(t *testing.T) {
Expand Down Expand Up @@ -116,6 +136,12 @@ func TestPopulateOptionsForGrpc(t *testing.T) {
eoM44XLarge.NodeGroup.Id(): eoM44XLarge,
},
},
{
desc: "similar nodegroups are included",
opts: []expander.Option{eoT2MicroWithSimilar},
expectedOpts: []*protos.Option{&grpcEoT2MicroWithSimilar},
expectedMap: map[string]expander.Option{eoT2MicroWithSimilar.NodeGroup.Id(): eoT2MicroWithSimilar},
},
}
for _, tc := range testCases {
grpcOptionsSlice, nodeGroupIDOptionMap := populateOptionsForGRPC(tc.opts)
Expand Down Expand Up @@ -146,18 +172,44 @@ func TestPopulateNodeInfoForGRPC(t *testing.T) {
}

func TestValidTransformAndSanitizeOptionsFromGRPC(t *testing.T) {
responseOptionsSlice := []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge}
nodeGroupIDOptionMap := map[string]expander.Option{
eoT2Micro.NodeGroup.Id(): eoT2Micro,
eoT2Large.NodeGroup.Id(): eoT2Large,
eoT3Large.NodeGroup.Id(): eoT3Large,
eoM44XLarge.NodeGroup.Id(): eoM44XLarge,
testCases := []struct {
desc string
responseOptions []*protos.Option
expectedOptions []expander.Option
nodegroupIDOptionaMap map[string]expander.Option
}{
{
desc: "valid transform and sanitize options",
responseOptions: []*protos.Option{&grpcEoT2Micro, &grpcEoT3Large, &grpcEoM44XLarge},
nodegroupIDOptionaMap: map[string]expander.Option{
eoT2Micro.NodeGroup.Id(): eoT2Micro,
eoT2Large.NodeGroup.Id(): eoT2Large,
eoT3Large.NodeGroup.Id(): eoT3Large,
eoM44XLarge.NodeGroup.Id(): eoM44XLarge,
},
expectedOptions: []expander.Option{eoT2Micro, eoT3Large, eoM44XLarge},
},
{
desc: "similar ngs are retained in proto options are retained",
responseOptions: []*protos.Option{&grpcEoT2MicroWithSimilar},
nodegroupIDOptionaMap: map[string]expander.Option{
eoT2MicroWithSimilar.NodeGroup.Id(): eoT2MicroWithSimilar,
},
expectedOptions: []expander.Option{eoT2MicroWithSimilar},
},
{
desc: "extra similar ngs added to expander response are ignored",
responseOptions: []*protos.Option{&grpcEoT2MicroWithSimilarWithExtraOptions},
nodegroupIDOptionaMap: map[string]expander.Option{
eoT2MicroWithSimilar.NodeGroup.Id(): eoT2MicroWithSimilar,
},
expectedOptions: []expander.Option{eoT2MicroWithSimilar},
},
}
for _, tc := range testCases {
ret := transformAndSanitizeOptionsFromGRPC(tc.responseOptions, tc.nodegroupIDOptionaMap)
assert.Equal(t, tc.expectedOptions, ret)
}

expectedOptions := []expander.Option{eoT2Micro, eoT3Large, eoM44XLarge}

ret := transformAndSanitizeOptionsFromGRPC(responseOptionsSlice, nodeGroupIDOptionMap)
assert.Equal(t, expectedOptions, ret)
}

func TestAnInvalidTransformAndSanitizeOptionsFromGRPC(t *testing.T) {
Expand Down
Loading