Skip to content

Commit a451124

Browse files
committed
Bump google.golang.org/api; cache MIG workload policy
Bump google.golang.org/api to use InstanceGroupManagerResourcePolicies.
1 parent 79aea9b commit a451124

File tree

9 files changed

+291
-159
lines changed

9 files changed

+291
-159
lines changed

cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ type AutoscalingGceClient interface {
123123
FetchAllMigs(zone string) ([]*gce.InstanceGroupManager, error)
124124
FetchAllInstances(project, zone string, filter string) ([]GceInstance, error)
125125
FetchMigTargetSize(GceRef) (int64, error)
126+
FetchMigWorkloadPolicy(GceRef) (*string, error)
126127
FetchMigBasename(GceRef) (string, error)
127128
FetchMigInstances(GceRef) ([]GceInstance, error)
128129
FetchMigTemplateName(migRef GceRef) (InstanceTemplateName, error)
@@ -239,6 +240,25 @@ func (client *autoscalingGceClientV1) FetchAllMigs(zone string) ([]*gce.Instance
239240
return migs, nil
240241
}
241242

243+
func (client *autoscalingGceClientV1) FetchMigWorkloadPolicy(migRef GceRef) (*string, error) {
244+
registerRequest("instance_group_managers", "get")
245+
ctx, cancel := context.WithTimeout(context.Background(), client.operationPerCallTimeout)
246+
defer cancel()
247+
igm, err := client.gceService.InstanceGroupManagers.Get(migRef.Project, migRef.Zone, migRef.Name).Context(ctx).Do()
248+
if err != nil {
249+
if err, ok := err.(*googleapi.Error); ok {
250+
if err.Code == http.StatusNotFound {
251+
return nil, errors.NewAutoscalerError(errors.NodeGroupDoesNotExistError, err.Error())
252+
}
253+
}
254+
return nil, err
255+
}
256+
if igm == nil || igm.ResourcePolicies == nil {
257+
return nil, nil
258+
}
259+
return &igm.ResourcePolicies.WorkloadPolicy, nil
260+
}
261+
242262
func (client *autoscalingGceClientV1) FetchMigTargetSize(migRef GceRef) (int64, error) {
243263
registerRequest("instance_group_managers", "get")
244264
ctx, cancel := context.WithTimeout(context.Background(), client.operationPerCallTimeout)

cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -839,6 +839,13 @@ func TestAutoscalingClientTimeouts(t *testing.T) {
839839
},
840840
httpTimeout: instantTimeout,
841841
},
842+
"FetchMigWorkloadPolicy_HttpClientTimeout": {
843+
clientFunc: func(client *autoscalingGceClientV1) error {
844+
_, err := client.FetchMigWorkloadPolicy(GceRef{})
845+
return err
846+
},
847+
httpTimeout: instantTimeout,
848+
},
842849
"FetchMigInstances_HttpClientTimeout": {
843850
clientFunc: func(client *autoscalingGceClientV1) error {
844851
_, err := client.FetchMigInstances(GceRef{})

cluster-autoscaler/cloudprovider/gce/cache.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type GceCache struct {
7272
autoscalingOptionsCache map[GceRef]map[string]string
7373
machinesCache map[MachineTypeKey]MachineType
7474
migTargetSizeCache map[GceRef]int64
75+
migWorkloadPolicyCache map[GceRef]*string
7576
migBaseNameCache map[GceRef]string
7677
migInstancesStateCountCache map[GceRef]map[cloudprovider.InstanceState]int64
7778
listManagedInstancesResultsCache map[GceRef]string
@@ -91,6 +92,7 @@ func NewGceCache() *GceCache {
9192
autoscalingOptionsCache: map[GceRef]map[string]string{},
9293
machinesCache: map[MachineTypeKey]MachineType{},
9394
migTargetSizeCache: map[GceRef]int64{},
95+
migWorkloadPolicyCache: map[GceRef]*string{},
9496
migBaseNameCache: map[GceRef]string{},
9597
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
9698
listManagedInstancesResultsCache: map[GceRef]string{},
@@ -352,6 +354,35 @@ func (gc *GceCache) InvalidateAllMigTargetSizes() {
352354
gc.migTargetSizeCache = map[GceRef]int64{}
353355
}
354356

357+
// GetMigWorkloadPolicy returns the cached WorkloadPolicy for a GceRef
358+
func (gc *GceCache) GetMigWorkloadPolicy(ref GceRef) (*string, bool) {
359+
gc.cacheMutex.Lock()
360+
defer gc.cacheMutex.Unlock()
361+
362+
wp, found := gc.migWorkloadPolicyCache[ref]
363+
if found {
364+
klog.V(5).Infof("WorkloadPolicy cache hit for %s", ref)
365+
}
366+
return wp, found
367+
}
368+
369+
// SetMigWorkloadPolicy sets WorkloadPolicy for a GceRef
370+
func (gc *GceCache) SetMigWorkloadPolicy(ref GceRef, wp *string) {
371+
gc.cacheMutex.Lock()
372+
defer gc.cacheMutex.Unlock()
373+
374+
gc.migWorkloadPolicyCache[ref] = wp
375+
}
376+
377+
// InvalidateAllMigWorkloadPolicies clears the WorkloadPolicy ref cache
378+
func (gc *GceCache) InvalidateAllMigWorkloadPolicies() {
379+
gc.cacheMutex.Lock()
380+
defer gc.cacheMutex.Unlock()
381+
382+
klog.V(5).Infof("WorkloadPolicy cache invalidated")
383+
gc.migWorkloadPolicyCache = map[GceRef]*string{}
384+
}
385+
355386
// GetMigInstanceTemplateName returns the cached instance template ref for a mig GceRef
356387
func (gc *GceCache) GetMigInstanceTemplateName(ref GceRef) (InstanceTemplateName, bool) {
357388
gc.cacheMutex.Lock()

cluster-autoscaler/cloudprovider/gce/gce_manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ func (m *gceManagerImpl) Refresh() error {
303303
m.cache.InvalidateAllMigBasenames()
304304
m.cache.InvalidateAllListManagedInstancesResults()
305305
m.cache.InvalidateAllMigInstanceTemplateNames()
306+
m.cache.InvalidateAllMigWorkloadPolicies()
306307
if m.lastRefresh.Add(refreshInterval).After(time.Now()) {
307308
return nil
308309
}

cluster-autoscaler/cloudprovider/gce/gce_manager_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa
349349
migBaseNameCache: map[GceRef]string{},
350350
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
351351
listManagedInstancesResultsCache: map[GceRef]string{},
352+
migWorkloadPolicyCache: map[GceRef]*string{},
352353
}
353354
migLister := NewMigLister(cache)
354355
manager := &gceManagerImpl{

cluster-autoscaler/cloudprovider/gce/mig_info_provider.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type MigInfoProvider interface {
4242
RegenerateMigInstancesCache() error
4343
// GetMigTargetSize returns target size for given MIG ref
4444
GetMigTargetSize(migRef GceRef) (int64, error)
45+
// GetMigWorkloadPolicy returns WorkloadPolicy for given MIG ref
46+
GetMigWorkloadPolicy(migRef GceRef) (*string, error)
4547
// GetMigBasename returns basename for given MIG ref
4648
GetMigBasename(migRef GceRef) (string, error)
4749
// GetMigInstanceTemplateName returns instance template name for given MIG ref
@@ -348,6 +350,31 @@ func (c *cachingMigInfoProvider) GetMigTargetSize(migRef GceRef) (int64, error)
348350
return targetSize, nil
349351
}
350352

353+
func (c *cachingMigInfoProvider) GetMigWorkloadPolicy(migRef GceRef) (*string, error) {
354+
c.migInfoMutex.Lock()
355+
defer c.migInfoMutex.Unlock()
356+
357+
wp, found := c.cache.GetMigWorkloadPolicy(migRef)
358+
if found {
359+
return wp, nil
360+
}
361+
362+
err := c.fillMigInfoCache()
363+
wp, found = c.cache.GetMigWorkloadPolicy(migRef)
364+
if err == nil && found {
365+
return wp, nil
366+
}
367+
368+
// fallback to querying for single mig
369+
wp, err = c.gceClient.FetchMigWorkloadPolicy(migRef)
370+
if err != nil {
371+
c.migLister.HandleMigIssue(migRef, err)
372+
return nil, err
373+
}
374+
c.cache.SetMigWorkloadPolicy(migRef, wp)
375+
return wp, nil
376+
}
377+
351378
func (c *cachingMigInfoProvider) GetMigBasename(migRef GceRef) (string, error) {
352379
c.migInfoMutex.Lock()
353380
defer c.migInfoMutex.Unlock()
@@ -491,6 +518,12 @@ func (c *cachingMigInfoProvider) fillMigInfoCache() error {
491518
c.cache.SetListManagedInstancesResults(zoneMigRef, zoneMig.ListManagedInstancesResults)
492519
c.cache.SetMigInstancesStateCount(zoneMigRef, createInstancesStateCount(zoneMig.TargetSize, zoneMig.CurrentActions))
493520

521+
if zoneMig.ResourcePolicies == nil {
522+
c.cache.SetMigWorkloadPolicy(zoneMigRef, nil)
523+
} else {
524+
c.cache.SetMigWorkloadPolicy(zoneMigRef, &zoneMig.ResourcePolicies.WorkloadPolicy)
525+
}
526+
494527
templateUrl, err := url.Parse(zoneMig.InstanceTemplate)
495528
if err == nil {
496529
_, templateName := path.Split(templateUrl.EscapedPath())

cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/stretchr/testify/assert"
2727
gce "google.golang.org/api/compute/v1"
28+
"google.golang.org/protobuf/proto"
2829
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2930
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
3031
)
@@ -33,6 +34,7 @@ var (
3334
errFetchMig = errors.New("fetch migs error")
3435
errFetchMigInstances = errors.New("fetch mig instances error")
3536
errFetchMigTargetSize = errors.New("fetch mig target size error")
37+
errFetchMigWorkloadPolicy = errors.New("fetch mig workload policy error")
3638
errFetchMigBaseName = errors.New("fetch mig basename error")
3739
errFetchMigTemplateName = errors.New("fetch mig template name error")
3840
errFetchMigTemplate = errors.New("fetch mig template error")
@@ -112,6 +114,7 @@ type mockAutoscalingGceClient struct {
112114
fetchMigs func(string) ([]*gce.InstanceGroupManager, error)
113115
fetchAllInstances func(project, zone string, filter string) ([]GceInstance, error)
114116
fetchMigTargetSize func(GceRef) (int64, error)
117+
fetchMigWorkloadPolicy func(GceRef) (*string, error)
115118
fetchMigBasename func(GceRef) (string, error)
116119
fetchMigInstances func(GceRef) ([]GceInstance, error)
117120
fetchMigTemplateName func(GceRef) (InstanceTemplateName, error)
@@ -140,6 +143,10 @@ func (client *mockAutoscalingGceClient) FetchMigTargetSize(migRef GceRef) (int64
140143
return client.fetchMigTargetSize(migRef)
141144
}
142145

146+
func (client *mockAutoscalingGceClient) FetchMigWorkloadPolicy(migRef GceRef) (*string, error) {
147+
return client.fetchMigWorkloadPolicy(migRef)
148+
}
149+
143150
func (client *mockAutoscalingGceClient) FetchMigBasename(migRef GceRef) (string, error) {
144151
return client.fetchMigBasename(migRef)
145152
}
@@ -863,6 +870,90 @@ func TestGetMigTargetSize(t *testing.T) {
863870
}
864871
}
865872

873+
func TestGetMigWorkloadPolicy(t *testing.T) {
874+
workloadPolicy := proto.String("wp-123")
875+
instanceGroupManager := &gce.InstanceGroupManager{
876+
Zone: mig.GceRef().Zone,
877+
Name: mig.GceRef().Name,
878+
ResourcePolicies: &gce.InstanceGroupManagerResourcePolicies{
879+
WorkloadPolicy: *workloadPolicy,
880+
},
881+
}
882+
883+
testCases := []struct {
884+
name string
885+
cache *GceCache
886+
fetchMigs func(string) ([]*gce.InstanceGroupManager, error)
887+
fetchMigWorkloadPolicy func(GceRef) (*string, error)
888+
expectedWorkloadPolicy *string
889+
expectedErr error
890+
}{
891+
{
892+
name: "workload policy in cache",
893+
cache: &GceCache{
894+
migs: map[GceRef]Mig{mig.GceRef(): mig},
895+
migWorkloadPolicyCache: map[GceRef]*string{mig.GceRef(): workloadPolicy},
896+
},
897+
expectedWorkloadPolicy: workloadPolicy,
898+
},
899+
{
900+
name: "workload policy from cache fill",
901+
cache: emptyCache(),
902+
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{instanceGroupManager}),
903+
expectedWorkloadPolicy: workloadPolicy,
904+
},
905+
{
906+
name: "cache fill without mig, fallback success",
907+
cache: emptyCache(),
908+
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{}),
909+
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyConst(workloadPolicy),
910+
expectedWorkloadPolicy: workloadPolicy,
911+
},
912+
{
913+
name: "cache fill failure, fallback success",
914+
cache: emptyCache(),
915+
fetchMigs: fetchMigsFail,
916+
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyConst(workloadPolicy),
917+
expectedWorkloadPolicy: workloadPolicy,
918+
},
919+
{
920+
name: "cache fill without mig, fallback failure",
921+
cache: emptyCache(),
922+
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{}),
923+
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyFail,
924+
expectedErr: errFetchMigWorkloadPolicy,
925+
},
926+
{
927+
name: "cache fill failure, fallback failure",
928+
cache: emptyCache(),
929+
fetchMigs: fetchMigsFail,
930+
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyFail,
931+
expectedErr: errFetchMigWorkloadPolicy,
932+
},
933+
}
934+
935+
for _, tc := range testCases {
936+
t.Run(tc.name, func(t *testing.T) {
937+
client := &mockAutoscalingGceClient{
938+
fetchMigs: tc.fetchMigs,
939+
fetchMigWorkloadPolicy: tc.fetchMigWorkloadPolicy,
940+
}
941+
migLister := NewMigLister(tc.cache)
942+
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false)
943+
944+
workloadPolicy, err := provider.GetMigWorkloadPolicy(mig.GceRef())
945+
cachedWorkloadPolicy, found := tc.cache.GetMigWorkloadPolicy(mig.GceRef())
946+
947+
assert.Equal(t, tc.expectedErr, err)
948+
assert.Equal(t, tc.expectedErr == nil, found)
949+
if tc.expectedErr == nil {
950+
assert.Equal(t, tc.expectedWorkloadPolicy, workloadPolicy)
951+
assert.Equal(t, tc.expectedWorkloadPolicy, cachedWorkloadPolicy)
952+
}
953+
})
954+
}
955+
}
956+
866957
func TestGetMigBasename(t *testing.T) {
867958
basename := "base-instance-name"
868959
instanceGroupManager := &gce.InstanceGroupManager{
@@ -1921,6 +2012,7 @@ func emptyCache() *GceCache {
19212012
instancesUpdateTime: make(map[GceRef]time.Time),
19222013
migTargetSizeCache: make(map[GceRef]int64),
19232014
migBaseNameCache: make(map[GceRef]string),
2015+
migWorkloadPolicyCache: make(map[GceRef]*string),
19242016
migInstancesStateCountCache: make(map[GceRef]map[cloudprovider.InstanceState]int64),
19252017
listManagedInstancesResultsCache: make(map[GceRef]string),
19262018
instanceTemplateNameCache: make(map[GceRef]InstanceTemplateName),
@@ -1972,10 +2064,20 @@ func fetchMigTargetSizeConst(targetSize int64) func(GceRef) (int64, error) {
19722064
}
19732065
}
19742066

2067+
func fetchMigWorkloadPolicyFail(_ GceRef) (*string, error) {
2068+
return nil, errFetchMigWorkloadPolicy
2069+
}
2070+
19752071
func fetchMigBasenameFail(_ GceRef) (string, error) {
19762072
return "", errFetchMigBaseName
19772073
}
19782074

2075+
func fetchMigWorkloadPolicyConst(workloadPolicy *string) func(GceRef) (*string, error) {
2076+
return func(GceRef) (*string, error) {
2077+
return workloadPolicy, nil
2078+
}
2079+
}
2080+
19792081
func fetchMigBasenameConst(basename string) func(GceRef) (string, error) {
19802082
return func(GceRef) (string, error) {
19812083
return basename, nil

0 commit comments

Comments
 (0)