Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ type AutoscalingGceClient interface {
FetchAllMigs(zone string) ([]*gce.InstanceGroupManager, error)
FetchAllInstances(project, zone string, filter string) ([]GceInstance, error)
FetchMigTargetSize(GceRef) (int64, error)
FetchMigWorkloadPolicy(GceRef) (*string, error)
FetchMigBasename(GceRef) (string, error)
FetchMigInstances(GceRef) ([]GceInstance, error)
FetchMigTemplateName(migRef GceRef) (InstanceTemplateName, error)
Expand Down Expand Up @@ -239,6 +240,25 @@ func (client *autoscalingGceClientV1) FetchAllMigs(zone string) ([]*gce.Instance
return migs, nil
}

func (client *autoscalingGceClientV1) FetchMigWorkloadPolicy(migRef GceRef) (*string, error) {
registerRequest("instance_group_managers", "get")
ctx, cancel := context.WithTimeout(context.Background(), client.operationPerCallTimeout)
defer cancel()
igm, err := client.gceService.InstanceGroupManagers.Get(migRef.Project, migRef.Zone, migRef.Name).Context(ctx).Do()
if err != nil {
if err, ok := err.(*googleapi.Error); ok {
if err.Code == http.StatusNotFound {
return nil, errors.NewAutoscalerError(errors.NodeGroupDoesNotExistError, err.Error())
}
}
return nil, err
}
if igm == nil || igm.ResourcePolicies == nil {
return nil, nil
}
return &igm.ResourcePolicies.WorkloadPolicy, nil
}

func (client *autoscalingGceClientV1) FetchMigTargetSize(migRef GceRef) (int64, error) {
registerRequest("instance_group_managers", "get")
ctx, cancel := context.WithTimeout(context.Background(), client.operationPerCallTimeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,13 @@ func TestAutoscalingClientTimeouts(t *testing.T) {
},
operationPerCallTimeout: &instantTimeout,
},
"FetchMigWorkloadPolicy_ContextTimeout": {
clientFunc: func(client *autoscalingGceClientV1) error {
_, err := client.FetchMigWorkloadPolicy(GceRef{})
return err
},
operationPerCallTimeout: &instantTimeout,
},
"FetchMigTemplate_ContextTimeout": {
clientFunc: func(client *autoscalingGceClientV1) error {
_, err := client.FetchMigTemplate(GceRef{}, "", false)
Expand Down Expand Up @@ -839,6 +846,13 @@ func TestAutoscalingClientTimeouts(t *testing.T) {
},
httpTimeout: instantTimeout,
},
"FetchMigWorkloadPolicy_HttpClientTimeout": {
clientFunc: func(client *autoscalingGceClientV1) error {
_, err := client.FetchMigWorkloadPolicy(GceRef{})
return err
},
httpTimeout: instantTimeout,
},
"FetchMigInstances_HttpClientTimeout": {
clientFunc: func(client *autoscalingGceClientV1) error {
_, err := client.FetchMigInstances(GceRef{})
Expand Down
31 changes: 31 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type GceCache struct {
autoscalingOptionsCache map[GceRef]map[string]string
machinesCache map[MachineTypeKey]MachineType
migTargetSizeCache map[GceRef]int64
migWorkloadPolicyCache map[GceRef]*string
migBaseNameCache map[GceRef]string
migInstancesStateCountCache map[GceRef]map[cloudprovider.InstanceState]int64
listManagedInstancesResultsCache map[GceRef]string
Expand All @@ -91,6 +92,7 @@ func NewGceCache() *GceCache {
autoscalingOptionsCache: map[GceRef]map[string]string{},
machinesCache: map[MachineTypeKey]MachineType{},
migTargetSizeCache: map[GceRef]int64{},
migWorkloadPolicyCache: map[GceRef]*string{},
migBaseNameCache: map[GceRef]string{},
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
listManagedInstancesResultsCache: map[GceRef]string{},
Expand Down Expand Up @@ -352,6 +354,35 @@ func (gc *GceCache) InvalidateAllMigTargetSizes() {
gc.migTargetSizeCache = map[GceRef]int64{}
}

// GetMigWorkloadPolicy returns the cached WorkloadPolicy for a GceRef
func (gc *GceCache) GetMigWorkloadPolicy(ref GceRef) (*string, bool) {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()

wp, found := gc.migWorkloadPolicyCache[ref]
if found {
klog.V(5).Infof("WorkloadPolicy cache hit for %s", ref)
}
return wp, found
}

// SetMigWorkloadPolicy sets WorkloadPolicy for a GceRef
func (gc *GceCache) SetMigWorkloadPolicy(ref GceRef, wp *string) {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()

gc.migWorkloadPolicyCache[ref] = wp
}

// InvalidateAllMigWorkloadPolicies clears the WorkloadPolicy ref cache
func (gc *GceCache) InvalidateAllMigWorkloadPolicies() {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()

klog.V(5).Infof("WorkloadPolicy cache invalidated")
gc.migWorkloadPolicyCache = map[GceRef]*string{}
}

// GetMigInstanceTemplateName returns the cached instance template ref for a mig GceRef
func (gc *GceCache) GetMigInstanceTemplateName(ref GceRef) (InstanceTemplateName, bool) {
gc.cacheMutex.Lock()
Expand Down
1 change: 1 addition & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ func (m *gceManagerImpl) Refresh() error {
m.cache.InvalidateAllMigBasenames()
m.cache.InvalidateAllListManagedInstancesResults()
m.cache.InvalidateAllMigInstanceTemplateNames()
m.cache.InvalidateAllMigWorkloadPolicies()
if m.lastRefresh.Add(refreshInterval).After(time.Now()) {
return nil
}
Expand Down
1 change: 1 addition & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa
migBaseNameCache: map[GceRef]string{},
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
listManagedInstancesResultsCache: map[GceRef]string{},
migWorkloadPolicyCache: map[GceRef]*string{},
}
migLister := NewMigLister(cache)
manager := &gceManagerImpl{
Expand Down
33 changes: 33 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/mig_info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type MigInfoProvider interface {
RegenerateMigInstancesCache() error
// GetMigTargetSize returns target size for given MIG ref
GetMigTargetSize(migRef GceRef) (int64, error)
// GetMigWorkloadPolicy returns WorkloadPolicy for given MIG ref
GetMigWorkloadPolicy(migRef GceRef) (*string, error)
// GetMigBasename returns basename for given MIG ref
GetMigBasename(migRef GceRef) (string, error)
// GetMigInstanceTemplateName returns instance template name for given MIG ref
Expand Down Expand Up @@ -348,6 +350,31 @@ func (c *cachingMigInfoProvider) GetMigTargetSize(migRef GceRef) (int64, error)
return targetSize, nil
}

func (c *cachingMigInfoProvider) GetMigWorkloadPolicy(migRef GceRef) (*string, error) {
c.migInfoMutex.Lock()
defer c.migInfoMutex.Unlock()

wp, found := c.cache.GetMigWorkloadPolicy(migRef)
if found {
return wp, nil
}

err := c.fillMigInfoCache()
wp, found = c.cache.GetMigWorkloadPolicy(migRef)
if err == nil && found {
return wp, nil
}

// fallback to querying for single mig
wp, err = c.gceClient.FetchMigWorkloadPolicy(migRef)
if err != nil {
c.migLister.HandleMigIssue(migRef, err)
return nil, err
}
c.cache.SetMigWorkloadPolicy(migRef, wp)
return wp, nil
}

func (c *cachingMigInfoProvider) GetMigBasename(migRef GceRef) (string, error) {
c.migInfoMutex.Lock()
defer c.migInfoMutex.Unlock()
Expand Down Expand Up @@ -491,6 +518,12 @@ func (c *cachingMigInfoProvider) fillMigInfoCache() error {
c.cache.SetListManagedInstancesResults(zoneMigRef, zoneMig.ListManagedInstancesResults)
c.cache.SetMigInstancesStateCount(zoneMigRef, createInstancesStateCount(zoneMig.TargetSize, zoneMig.CurrentActions))

if zoneMig.ResourcePolicies == nil {
c.cache.SetMigWorkloadPolicy(zoneMigRef, nil)
} else {
c.cache.SetMigWorkloadPolicy(zoneMigRef, &zoneMig.ResourcePolicies.WorkloadPolicy)
}

templateUrl, err := url.Parse(zoneMig.InstanceTemplate)
if err == nil {
_, templateName := path.Split(templateUrl.EscapedPath())
Expand Down
105 changes: 105 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/mig_info_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/stretchr/testify/assert"
gce "google.golang.org/api/compute/v1"
"google.golang.org/protobuf/proto"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
)
Expand All @@ -33,6 +34,7 @@ var (
errFetchMig = errors.New("fetch migs error")
errFetchMigInstances = errors.New("fetch mig instances error")
errFetchMigTargetSize = errors.New("fetch mig target size error")
errFetchMigWorkloadPolicy = errors.New("fetch mig workload policy error")
errFetchMigBaseName = errors.New("fetch mig basename error")
errFetchMigTemplateName = errors.New("fetch mig template name error")
errFetchMigTemplate = errors.New("fetch mig template error")
Expand Down Expand Up @@ -112,6 +114,7 @@ type mockAutoscalingGceClient struct {
fetchMigs func(string) ([]*gce.InstanceGroupManager, error)
fetchAllInstances func(project, zone string, filter string) ([]GceInstance, error)
fetchMigTargetSize func(GceRef) (int64, error)
fetchMigWorkloadPolicy func(GceRef) (*string, error)
fetchMigBasename func(GceRef) (string, error)
fetchMigInstances func(GceRef) ([]GceInstance, error)
fetchMigTemplateName func(GceRef) (InstanceTemplateName, error)
Expand Down Expand Up @@ -140,6 +143,10 @@ func (client *mockAutoscalingGceClient) FetchMigTargetSize(migRef GceRef) (int64
return client.fetchMigTargetSize(migRef)
}

func (client *mockAutoscalingGceClient) FetchMigWorkloadPolicy(migRef GceRef) (*string, error) {
return client.fetchMigWorkloadPolicy(migRef)
}

func (client *mockAutoscalingGceClient) FetchMigBasename(migRef GceRef) (string, error) {
return client.fetchMigBasename(migRef)
}
Expand Down Expand Up @@ -685,6 +692,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) {
listManagedInstancesResultsCache: map[GceRef]string{},
instanceTemplateNameCache: map[GceRef]InstanceTemplateName{},
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
migWorkloadPolicyCache: map[GceRef]*string{},
},
fetchMigInstances: fetchMigInstancesConst(mig1Instances),
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig1Igm}),
Expand All @@ -710,6 +718,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) {
listManagedInstancesResultsCache: map[GceRef]string{},
instanceTemplateNameCache: map[GceRef]InstanceTemplateName{},
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
migWorkloadPolicyCache: map[GceRef]*string{},
},
fetchMigInstances: fetchMigInstancesConst(mig2Instances),
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig2Igm}),
Expand All @@ -736,6 +745,7 @@ func TestRegenerateMigInstancesCache(t *testing.T) {
listManagedInstancesResultsCache: map[GceRef]string{},
instanceTemplateNameCache: map[GceRef]InstanceTemplateName{},
migInstancesStateCountCache: map[GceRef]map[cloudprovider.InstanceState]int64{},
migWorkloadPolicyCache: map[GceRef]*string{},
},
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{mig2Igm}),
fetchAllInstances: fetchAllInstancesInZone(map[string][]GceInstance{"myzone2": {instance3, instance6}}),
Expand Down Expand Up @@ -863,6 +873,90 @@ func TestGetMigTargetSize(t *testing.T) {
}
}

func TestGetMigWorkloadPolicy(t *testing.T) {
workloadPolicy := proto.String("wp-123")
instanceGroupManager := &gce.InstanceGroupManager{
Zone: mig.GceRef().Zone,
Name: mig.GceRef().Name,
ResourcePolicies: &gce.InstanceGroupManagerResourcePolicies{
WorkloadPolicy: *workloadPolicy,
},
}

testCases := []struct {
name string
cache *GceCache
fetchMigs func(string) ([]*gce.InstanceGroupManager, error)
fetchMigWorkloadPolicy func(GceRef) (*string, error)
expectedWorkloadPolicy *string
expectedErr error
}{
{
name: "workload policy in cache",
cache: &GceCache{
migs: map[GceRef]Mig{mig.GceRef(): mig},
migWorkloadPolicyCache: map[GceRef]*string{mig.GceRef(): workloadPolicy},
},
expectedWorkloadPolicy: workloadPolicy,
},
{
name: "workload policy from cache fill",
cache: emptyCache(),
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{instanceGroupManager}),
expectedWorkloadPolicy: workloadPolicy,
},
{
name: "cache fill without mig, fallback success",
cache: emptyCache(),
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{}),
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyConst(workloadPolicy),
expectedWorkloadPolicy: workloadPolicy,
},
{
name: "cache fill failure, fallback success",
cache: emptyCache(),
fetchMigs: fetchMigsFail,
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyConst(workloadPolicy),
expectedWorkloadPolicy: workloadPolicy,
},
{
name: "cache fill without mig, fallback failure",
cache: emptyCache(),
fetchMigs: fetchMigsConst([]*gce.InstanceGroupManager{}),
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyFail,
expectedErr: errFetchMigWorkloadPolicy,
},
{
name: "cache fill failure, fallback failure",
cache: emptyCache(),
fetchMigs: fetchMigsFail,
fetchMigWorkloadPolicy: fetchMigWorkloadPolicyFail,
expectedErr: errFetchMigWorkloadPolicy,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := &mockAutoscalingGceClient{
fetchMigs: tc.fetchMigs,
fetchMigWorkloadPolicy: tc.fetchMigWorkloadPolicy,
}
migLister := NewMigLister(tc.cache)
provider := NewCachingMigInfoProvider(tc.cache, migLister, client, mig.GceRef().Project, 1, 0*time.Second, false)

workloadPolicy, err := provider.GetMigWorkloadPolicy(mig.GceRef())
cachedWorkloadPolicy, found := tc.cache.GetMigWorkloadPolicy(mig.GceRef())

assert.Equal(t, tc.expectedErr, err)
assert.Equal(t, tc.expectedErr == nil, found)
if tc.expectedErr == nil {
assert.Equal(t, tc.expectedWorkloadPolicy, workloadPolicy)
assert.Equal(t, tc.expectedWorkloadPolicy, cachedWorkloadPolicy)
}
})
}
}

func TestGetMigBasename(t *testing.T) {
basename := "base-instance-name"
instanceGroupManager := &gce.InstanceGroupManager{
Expand Down Expand Up @@ -1921,6 +2015,7 @@ func emptyCache() *GceCache {
instancesUpdateTime: make(map[GceRef]time.Time),
migTargetSizeCache: make(map[GceRef]int64),
migBaseNameCache: make(map[GceRef]string),
migWorkloadPolicyCache: make(map[GceRef]*string),
migInstancesStateCountCache: make(map[GceRef]map[cloudprovider.InstanceState]int64),
listManagedInstancesResultsCache: make(map[GceRef]string),
instanceTemplateNameCache: make(map[GceRef]InstanceTemplateName),
Expand Down Expand Up @@ -1972,10 +2067,20 @@ func fetchMigTargetSizeConst(targetSize int64) func(GceRef) (int64, error) {
}
}

func fetchMigWorkloadPolicyFail(_ GceRef) (*string, error) {
return nil, errFetchMigWorkloadPolicy
}

func fetchMigBasenameFail(_ GceRef) (string, error) {
return "", errFetchMigBaseName
}

func fetchMigWorkloadPolicyConst(workloadPolicy *string) func(GceRef) (*string, error) {
return func(GceRef) (*string, error) {
return workloadPolicy, nil
}
}

func fetchMigBasenameConst(basename string) func(GceRef) (string, error) {
return func(GceRef) (string, error) {
return basename, nil
Expand Down
Loading
Loading