Skip to content

Commit 0ee4293

Browse files
committed
Bump google.golang.org/api; cache MIG workload policy
Bump google.golang.org/api to use InstanceGroupManagerResourcePolicies.
1 parent 79aea9b commit 0ee4293

File tree

9 files changed

+301
-159
lines changed

9 files changed

+301
-159
lines changed

cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ type AutoscalingGceClient interface {
123123
FetchAllMigs(zone string) ([]*gce.InstanceGroupManager, error)
124124
FetchAllInstances(project, zone string, filter string) ([]GceInstance, error)
125125
FetchMigTargetSize(GceRef) (int64, error)
126+
FetchMigWorkloadPolicy(GceRef) (*string, error)
126127
FetchMigBasename(GceRef) (string, error)
127128
FetchMigInstances(GceRef) ([]GceInstance, error)
128129
FetchMigTemplateName(migRef GceRef) (InstanceTemplateName, error)
@@ -239,6 +240,25 @@ func (client *autoscalingGceClientV1) FetchAllMigs(zone string) ([]*gce.Instance
239240
return migs, nil
240241
}
241242

243+
func (client *autoscalingGceClientV1) FetchMigWorkloadPolicy(migRef GceRef) (*string, error) {
244+
registerRequest("instance_group_managers", "get")
245+
ctx, cancel := context.WithTimeout(context.Background(), client.operationPerCallTimeout)
246+
defer cancel()
247+
igm, err := client.gceService.InstanceGroupManagers.Get(migRef.Project, migRef.Zone, migRef.Name).Context(ctx).Do()
248+
if err != nil {
249+
if err, ok := err.(*googleapi.Error); ok {
250+
if err.Code == http.StatusNotFound {
251+
return nil, errors.NewAutoscalerError(errors.NodeGroupDoesNotExistError, err.Error())
252+
}
253+
}
254+
return nil, err
255+
}
256+
if igm == nil || igm.ResourcePolicies == nil {
257+
return nil, nil
258+
}
259+
return &igm.ResourcePolicies.WorkloadPolicy, nil
260+
}
261+
242262
func (client *autoscalingGceClientV1) FetchMigTargetSize(migRef GceRef) (int64, error) {
243263
registerRequest("instance_group_managers", "get")
244264
ctx, cancel := context.WithTimeout(context.Background(), client.operationPerCallTimeout)

cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,13 @@ func TestAutoscalingClientTimeouts(t *testing.T) {
730730
},
731731
operationPerCallTimeout: &instantTimeout,
732732
},
733+
"FetchMigWorkloadPolicy_ContextTimeout": {
734+
clientFunc: func(client *autoscalingGceClientV1) error {
735+
_, err := client.FetchMigWorkloadPolicy(GceRef{})
736+
return err
737+
},
738+
operationPerCallTimeout: &instantTimeout,
739+
},
733740
"FetchMigTemplate_ContextTimeout": {
734741
clientFunc: func(client *autoscalingGceClientV1) error {
735742
_, err := client.FetchMigTemplate(GceRef{}, "", false)
@@ -839,6 +846,13 @@ func TestAutoscalingClientTimeouts(t *testing.T) {
839846
},
840847
httpTimeout: instantTimeout,
841848
},
849+
"FetchMigWorkloadPolicy_HttpClientTimeout": {
850+
clientFunc: func(client *autoscalingGceClientV1) error {
851+
_, err := client.FetchMigWorkloadPolicy(GceRef{})
852+
return err
853+
},
854+
httpTimeout: instantTimeout,
855+
},
842856
"FetchMigInstances_HttpClientTimeout": {
843857
clientFunc: func(client *autoscalingGceClientV1) error {
844858
_, err := client.FetchMigInstances(GceRef{})

cluster-autoscaler/cloudprovider/gce/cache.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type GceCache struct {
7272
autoscalingOptionsCache map[GceRef]map[string]string
7373
machinesCache map[MachineTypeKey]MachineType
7474
migTargetSizeCache map[GceRef]int64
75+
migWorkloadPolicyCache map[GceRef]*string
7576
migBaseNameCache map[GceRef]string
7677
migInstancesStateCountCache map[GceRef]map[cloudprovider.InstanceState]int64
7778
listManagedInstancesResultsCache map[GceRef]string
@@ -91,6 +92,7 @@ func NewGceCache() *GceCache {
9192
autoscalingOptionsCache: map[GceRef]map[string]string{},
9293
machinesCache: map[MachineTypeKey]MachineType{},
9394
migTargetSizeCache: map[GceRef]int64{},
95+
migWorkloadPolicyCache: map[GceRef]*string{},
9496
migBaseNameCache: map[GceRef]string{},
9597
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
9698
listManagedInstancesResultsCache: map[GceRef]string{},
@@ -352,6 +354,35 @@ func (gc *GceCache) InvalidateAllMigTargetSizes() {
352354
gc.migTargetSizeCache = map[GceRef]int64{}
353355
}
354356

357+
// GetMigWorkloadPolicy returns the cached WorkloadPolicy for a GceRef
358+
func (gc *GceCache) GetMigWorkloadPolicy(ref GceRef) (*string, bool) {
359+
gc.cacheMutex.Lock()
360+
defer gc.cacheMutex.Unlock()
361+
362+
wp, found := gc.migWorkloadPolicyCache[ref]
363+
if found {
364+
klog.V(5).Infof("WorkloadPolicy cache hit for %s", ref)
365+
}
366+
return wp, found
367+
}
368+
369+
// SetMigWorkloadPolicy sets WorkloadPolicy for a GceRef
370+
func (gc *GceCache) SetMigWorkloadPolicy(ref GceRef, wp *string) {
371+
gc.cacheMutex.Lock()
372+
defer gc.cacheMutex.Unlock()
373+
374+
gc.migWorkloadPolicyCache[ref] = wp
375+
}
376+
377+
// InvalidateAllMigWorkloadPolicies clears the WorkloadPolicy ref cache
378+
func (gc *GceCache) InvalidateAllMigWorkloadPolicies() {
379+
gc.cacheMutex.Lock()
380+
defer gc.cacheMutex.Unlock()
381+
382+
klog.V(5).Infof("WorkloadPolicy cache invalidated")
383+
gc.migWorkloadPolicyCache = map[GceRef]*string{}
384+
}
385+
355386
// GetMigInstanceTemplateName returns the cached instance template ref for a mig GceRef
356387
func (gc *GceCache) GetMigInstanceTemplateName(ref GceRef) (InstanceTemplateName, bool) {
357388
gc.cacheMutex.Lock()

cluster-autoscaler/cloudprovider/gce/gce_manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,7 @@ func (m *gceManagerImpl) Refresh() error {
303303
m.cache.InvalidateAllMigBasenames()
304304
m.cache.InvalidateAllListManagedInstancesResults()
305305
m.cache.InvalidateAllMigInstanceTemplateNames()
306+
m.cache.InvalidateAllMigWorkloadPolicies()
306307
if m.lastRefresh.Add(refreshInterval).After(time.Now()) {
307308
return nil
308309
}

cluster-autoscaler/cloudprovider/gce/gce_manager_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa
349349
migBaseNameCache: map[GceRef]string{},
350350
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
351351
listManagedInstancesResultsCache: map[GceRef]string{},
352+
migWorkloadPolicyCache: map[GceRef]*string{},
352353
}
353354
migLister := NewMigLister(cache)
354355
manager := &gceManagerImpl{

cluster-autoscaler/cloudprovider/gce/mig_info_provider.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ type MigInfoProvider interface {
4242
RegenerateMigInstancesCache() error
4343
// GetMigTargetSize returns target size for given MIG ref
4444
GetMigTargetSize(migRef GceRef) (int64, error)
45+
// GetMigWorkloadPolicy returns WorkloadPolicy for given MIG ref
46+
GetMigWorkloadPolicy(migRef GceRef) (*string, error)
4547
// GetMigBasename returns basename for given MIG ref
4648
GetMigBasename(migRef GceRef) (string, error)
4749
// GetMigInstanceTemplateName returns instance template name for given MIG ref
@@ -348,6 +350,31 @@ func (c *cachingMigInfoProvider) GetMigTargetSize(migRef GceRef) (int64, error)
348350
return targetSize, nil
349351
}
350352

353+
func (c *cachingMigInfoProvider) GetMigWorkloadPolicy(migRef GceRef) (*string, error) {
354+
c.migInfoMutex.Lock()
355+
defer c.migInfoMutex.Unlock()
356+
357+
wp, found := c.cache.GetMigWorkloadPolicy(migRef)
358+
if found {
359+
return wp, nil
360+
}
361+
362+
err := c.fillMigInfoCache()
363+
wp, found = c.cache.GetMigWorkloadPolicy(migRef)
364+
if err == nil && found {
365+
return wp, nil
366+
}
367+
368+
// fallback to querying for single mig
369+
wp, err = c.gceClient.FetchMigWorkloadPolicy(migRef)
370+
if err != nil {
371+
c.migLister.HandleMigIssue(migRef, err)
372+
return nil, err
373+
}
374+
c.cache.SetMigWorkloadPolicy(migRef, wp)
375+
return wp, nil
376+
}
377+
351378
func (c *cachingMigInfoProvider) GetMigBasename(migRef GceRef) (string, error) {
352379
c.migInfoMutex.Lock()
353380
defer c.migInfoMutex.Unlock()
@@ -491,6 +518,12 @@ func (c *cachingMigInfoProvider) fillMigInfoCache() error {
491518
c.cache.SetListManagedInstancesResults(zoneMigRef, zoneMig.ListManagedInstancesResults)
492519
c.cache.SetMigInstancesStateCount(zoneMigRef, createInstancesStateCount(zoneMig.TargetSize, zoneMig.CurrentActions))
493520

521+
if zoneMig.ResourcePolicies == nil {
522+
c.cache.SetMigWorkloadPolicy(zoneMigRef, nil)
523+
} else {
524+
c.cache.SetMigWorkloadPolicy(zoneMigRef, &zoneMig.ResourcePolicies.WorkloadPolicy)
525+
}
526+
494527
templateUrl, err := url.Parse(zoneMig.InstanceTemplate)
495528
if err == nil {
496529
_, templateName := path.Split(templateUrl.EscapedPath())

cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"github.com/stretchr/testify/assert"
2727
gce "google.golang.org/api/compute/v1"
28+
"google.golang.org/protobuf/proto"
2829
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
2930
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
3031
)
@@ -33,6 +34,7 @@ var (
3334
errFetchMig = errors.New("fetch migs error")
3435
errFetchMigInstances = errors.New("fetch mig instances error")
3536
errFetchMigTargetSize = errors.New("fetch mig target size error")
37+
errFetchMigWorkloadPolicy = errors.New("fetch mig workload policy error")
3638
errFetchMigBaseName = errors.New("fetch mig basename error")
3739
errFetchMigTemplateName = errors.New("fetch mig template name error")
3840
errFetchMigTemplate = errors.New("fetch mig template error")
@@ -112,6 +114,7 @@ type mockAutoscalingGceClient struct {
112114
fetchMigs func(string) ([]*gce.InstanceGroupManager, error)
113115
fetchAllInstances func(project, zone string, filter string) ([]GceInstance, error)
114116
fetchMigTargetSize func(GceRef) (int64, error)
117+
fetchMigWorkloadPolicy func(GceRef) (*string, error)
115118
fetchMigBasename func(GceRef) (string, error)
116119
fetchMigInstances func(GceRef) ([]GceInstance, error)
117120
fetchMigTemplateName func(GceRef) (InstanceTemplateName, error)
@@ -140,6 +143,10 @@ func (client *mockAutoscalingGceClient) FetchMigTargetSize(migRef GceRef) (int64
140143
return client.fetchMigTargetSize(migRef)
141144
}
142145

146+
func (client *mockAutoscalingGceClient) FetchMigWorkloadPolicy(migRef GceRef) (*string, error) {
147+
return client.fetchMigWorkloadPolicy(migRef)
148+
}
149+
143150
func (client *mockAutoscalingGceClient) FetchMigBasename(migRef GceRef) (string, error) {
144151
return client.fetchMigBasename(migRef)
145152
}
@@ -685,6 +692,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) {
685692
listManagedInstancesResultsCache: map[GceRef]string{},
686693
instanceTemplateNameCache: map[GceRef]InstanceTemplateName{},
687694
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
695+
migWorkloadPolicyCache: map[GceRef]*string{},
688696
},
689697
fetchMigInstances: fetchMigInstancesConst(mig1Instances),
690698
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig1Igm}),
@@ -710,6 +718,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) {
710718
listManagedInstancesResultsCache: map[GceRef]string{},
711719
instanceTemplateNameCache: map[GceRef]InstanceTemplateName{},
712720
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
721+
migWorkloadPolicyCache: map[GceRef]*string{},
713722
},
714723
fetchMigInstances: fetchMigInstancesConst(mig2Instances),
715724
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig2Igm}),
@@ -736,6 +745,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) {
736745
listManagedInstancesResultsCache: map[GceRef]string{},
737746
instanceTemplateNameCache: map[GceRef]InstanceTemplateName{},
738747
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
748+
migWorkloadPolicyCache: map[GceRef]*string{},
739749
},
740750
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig2Igm}),
741751
fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone2": {instance3, instance6}}),
@@ -863,6 +873,90 @@ func TestGetMigTargetSize(t *testing.T) {
863873
}
864874
}
865875

876+
func TestGetMigWorkloadPolicy(t *testing.T) {
877+
workloadPolicy := proto.String("wp-123")
878+
instanceGroupManager := &gce.InstanceGroupManager{
879+
Zone: mig.GceRef().Zone,
880+
Name: mig.GceRef().Name,
881+
ResourcePolicies: &gce.InstanceGroupManagerResourcePolicies{
882+
WorkloadPolicy: *workloadPolicy,
883+
},
884+
}
885+
886+
testCases := []struct {
887+
name string
888+
cache *GceCache
889+
fetchMigs func(string) ([]*gce.InstanceGroupManager, error)
890+
fetchMigWorkloadPolicy func(GceRef) (*string, error)
891+
expectedWorkloadPolicy *string
892+
expectedErr error
893+
}{
894+
{
895+
name: "workload policy in cache",
896+
cache: &GceCache{
897+
migs: map[GceRef]Mig{mig.GceRef(): mig},
898+
migWorkloadPolicyCache: map[GceRef]*string{mig.GceRef(): workloadPolicy},
899+
},
900+
expectedWorkloadPolicy: workloadPolicy,
901+
},
902+
{
903+
name: "workload policy from cache fill",
904+
cache: emptyCache(),
905+
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{instanceGroupManager}),
906+
expectedWorkloadPolicy: workloadPolicy,
907+
},
908+
{
909+
name: "cache fill without mig, fallback success",
910+
cache: emptyCache(),
911+
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{}),
912+
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyConst(workloadPolicy),
913+
expectedWorkloadPolicy: workloadPolicy,
914+
},
915+
{
916+
name: "cache fill failure, fallback success",
917+
cache: emptyCache(),
918+
fetchMigs: fetchMigsFail,
919+
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyConst(workloadPolicy),
920+
expectedWorkloadPolicy: workloadPolicy,
921+
},
922+
{
923+
name: "cache fill without mig, fallback failure",
924+
cache: emptyCache(),
925+
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{}),
926+
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyFail,
927+
expectedErr: errFetchMigWorkloadPolicy,
928+
},
929+
{
930+
name: "cache fill failure, fallback failure",
931+
cache: emptyCache(),
932+
fetchMigs: fetchMigsFail,
933+
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyFail,
934+
expectedErr: errFetchMigWorkloadPolicy,
935+
},
936+
}
937+
938+
for _, tc := range testCases {
939+
t.Run(tc.name, func(t *testing.T) {
940+
client := &mockAutoscalingGceClient{
941+
fetchMigs: tc.fetchMigs,
942+
fetchMigWorkloadPolicy: tc.fetchMigWorkloadPolicy,
943+
}
944+
migLister := NewMigLister(tc.cache)
945+
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false)
946+
947+
workloadPolicy, err := provider.GetMigWorkloadPolicy(mig.GceRef())
948+
cachedWorkloadPolicy, found := tc.cache.GetMigWorkloadPolicy(mig.GceRef())
949+
950+
assert.Equal(t, tc.expectedErr, err)
951+
assert.Equal(t, tc.expectedErr == nil, found)
952+
if tc.expectedErr == nil {
953+
assert.Equal(t, tc.expectedWorkloadPolicy, workloadPolicy)
954+
assert.Equal(t, tc.expectedWorkloadPolicy, cachedWorkloadPolicy)
955+
}
956+
})
957+
}
958+
}
959+
866960
func TestGetMigBasename(t *testing.T) {
867961
basename := "base-instance-name"
868962
instanceGroupManager := &gce.InstanceGroupManager{
@@ -1921,6 +2015,7 @@ func emptyCache() *GceCache {
19212015
instancesUpdateTime: make(map[GceRef]time.Time),
19222016
migTargetSizeCache: make(map[GceRef]int64),
19232017
migBaseNameCache: make(map[GceRef]string),
2018+
migWorkloadPolicyCache: make(map[GceRef]*string),
19242019
migInstancesStateCountCache: make(map[GceRef]map[cloudprovider.InstanceState]int64),
19252020
listManagedInstancesResultsCache: make(map[GceRef]string),
19262021
instanceTemplateNameCache: make(map[GceRef]InstanceTemplateName),
@@ -1972,10 +2067,20 @@ func fetchMigTargetSizeConst(targetSize int64) func(GceRef) (int64, error) {
19722067
}
19732068
}
19742069

2070+
func fetchMigWorkloadPolicyFail(_ GceRef) (*string, error) {
2071+
return nil, errFetchMigWorkloadPolicy
2072+
}
2073+
19752074
func fetchMigBasenameFail(_ GceRef) (string, error) {
19762075
return "", errFetchMigBaseName
19772076
}
19782077

2078+
func fetchMigWorkloadPolicyConst(workloadPolicy *string) func(GceRef) (*string, error) {
2079+
return func(GceRef) (*string, error) {
2080+
return workloadPolicy, nil
2081+
}
2082+
}
2083+
19792084
func fetchMigBasenameConst(basename string) func(GceRef) (string, error) {
19802085
return func(GceRef) (string, error) {
19812086
return basename, nil

0 commit comments

Comments
 (0)