diff --git a/cluster-autoscaler/cloudprovider/aws/aws_local_storage.go b/cluster-autoscaler/cloudprovider/aws/aws_local_storage.go new file mode 100644 index 000000000000..c1c0e57bfe2a --- /dev/null +++ b/cluster-autoscaler/cloudprovider/aws/aws_local_storage.go @@ -0,0 +1,46 @@ +package aws + +import ( + "strings" +) + +var volumesPerInstanceFamilly = map[string]int64{ + "c1": 1, + "c3": 1, + "c5ad": 1, + "c5d": 1, + "c6gd": 1, + "d2": 1, + "f1": 1, + "g2": 1, + "g4dn": 1, + "h1": 1, + "i2": 1, + "i3": 1, + "i3en": 1, + "i3p": 1, + "m1": 1, + "m2": 1, + "m3": 1, + "m5ad": 1, + "m5d": 1, + "m5dn": 1, + "m6gd": 1, + "p3dn": 1, + "r3": 1, + "r5ad": 1, + "r5d": 1, + "r5dn": 1, + "r6gd": 1, + "x1": 1, + "x1e": 1, + "z1d": 1, +} + +func numberOfLocalVolumes(instanceType string) int64 { + familly := strings.Split(instanceType, ".")[0] + if volumes, ok := volumesPerInstanceFamilly[familly]; ok { + return volumes + } + return 0 +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index cd91b402ec36..9136896b2256 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -369,6 +369,7 @@ func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*ap node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(template.InstanceType.VCPU, resource.DecimalSI) node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(template.InstanceType.GPU, resource.DecimalSI) node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(instanceMemoryBi, resource.BinarySI) + node.Status.Capacity["storageclass/local-data"] = *resource.NewQuantity(numberOfLocalVolumes(template.InstanceType.InstanceType), resource.DecimalSI) resourcesFromTags := extractAllocatableResourcesFromAsg(template.Tags) for resourceName, val := range resourcesFromTags { diff --git a/cluster-autoscaler/cloudprovider/azure/azure_local_storage.go b/cluster-autoscaler/cloudprovider/azure/azure_local_storage.go new file mode 100644 index 000000000000..832629b9c697 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/azure/azure_local_storage.go @@ -0,0 +1,21 @@ +package azure + +import ( + "regexp" +) + +// https://github.com/DataDog/k8s-nodegroups/blob/controller-runtime-v1/pkg/cloud/azure/clients/resource_skus_cache.go#L21 +var volumesPerInstanceFamilly = map[string]int64{ + "Standard_L\\d+s_v2": 1, // standardLSv2Family + "Standard_E\\d+d_v4": 1, // standardEDv4Family + "Standard_E.*ds_v4": 1, // standardEDSv4Family +} + +func numberOfLocalVolumes(instanceType string) int64 { + for familly, volumes := range volumesPerInstanceFamilly { + if match, _ := regexp.MatchString(familly, instanceType); match { + return volumes + } + } + return 0 +} diff --git a/cluster-autoscaler/cloudprovider/azure/azure_template.go b/cluster-autoscaler/cloudprovider/azure/azure_template.go index 81c7e9ae0348..98984b98419b 100644 --- a/cluster-autoscaler/cloudprovider/azure/azure_template.go +++ b/cluster-autoscaler/cloudprovider/azure/azure_template.go @@ -118,6 +118,11 @@ func buildNodeFromTemplate(scaleSetName string, template compute.VirtualMachineS node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val } + if vmssType.InstanceType != "" { + volumes := numberOfLocalVolumes(vmssType.InstanceType) + node.Status.Capacity["storageclass/local-data"] = *resource.NewQuantity(volumes, resource.DecimalSI) + } + // TODO: set real allocatable. node.Status.Allocatable = node.Status.Capacity diff --git a/cluster-autoscaler/cloudprovider/gce/cache.go b/cluster-autoscaler/cloudprovider/gce/cache.go index 23323e16fa84..90c9f2a508fe 100644 --- a/cluster-autoscaler/cloudprovider/gce/cache.go +++ b/cluster-autoscaler/cloudprovider/gce/cache.go @@ -17,12 +17,16 @@ limitations under the License. package gce import ( + "context" "fmt" + "os" "reflect" + "strconv" "strings" "sync" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/client-go/util/workqueue" gce "google.golang.org/api/compute/v1" klog "k8s.io/klog/v2" @@ -222,9 +226,28 @@ func (gc *GceCache) getMigNoLock(migRef GceRef) (mig Mig, found bool) { // RegenerateInstanceCacheForMig triggers instances cache regeneration for single MIG under lock. func (gc *GceCache) RegenerateInstanceCacheForMig(migRef GceRef) error { + klog.V(4).Infof("Regenerating MIG information for %s", migRef.String()) + + instances, err := gc.GceService.FetchMigInstances(migRef) + if err != nil { + klog.V(4).Infof("Failed MIG info request for %s: %v", migRef.String(), err) + return err + } + gc.cacheMutex.Lock() defer gc.cacheMutex.Unlock() - return gc.regenerateInstanceCacheForMigNoLock(migRef) + + // cleanup old entries + gc.removeInstancesForMigs(migRef) + + for _, instance := range instances { + instanceRef, err := GceRefFromProviderId(instance.Id) + if err != nil { + return err + } + gc.instanceRefToMigRef[instanceRef] = migRef + } + return nil } func (gc *GceCache) regenerateInstanceCacheForMigNoLock(migRef GceRef) error { @@ -250,17 +273,32 @@ func (gc *GceCache) regenerateInstanceCacheForMigNoLock(migRef GceRef) error { // RegenerateInstancesCache triggers instances cache regeneration under lock. func (gc *GceCache) RegenerateInstancesCache() error { - gc.cacheMutex.Lock() - defer gc.cacheMutex.Unlock() - gc.instanceRefToMigRef = make(map[GceRef]GceRef) gc.instancesFromUnknownMigs = make(map[GceRef]struct{}) - for _, migRef := range gc.getMigRefs() { - err := gc.regenerateInstanceCacheForMigNoLock(migRef) + + concurrency := 5 + if value, exists := os.LookupEnv("GCP_MAX_CONCURRENCY"); exists { + if i, err := strconv.Atoi(value); err == nil { + concurrency = i + } + } + + migs := gc.getMigRefs() + errors := make([]error, len(migs)) + ctx, cancel := context.WithCancel(context.Background()) + workqueue.ParallelizeUntil(ctx, concurrency, len(migs), func(piece int) { + errors[piece] = gc.RegenerateInstanceCacheForMig(migs[piece]) + if errors[piece] != nil { + cancel() + } + }, workqueue.WithChunkSize(concurrency)) + + for _, err := range errors { if err != nil { return err } } + return nil } diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager.go b/cluster-autoscaler/cloudprovider/gce/gce_manager.go index dfb9f89c3ec4..ffb9d441ba9c 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager.go @@ -17,6 +17,7 @@ limitations under the License. package gce import ( + "context" "errors" "fmt" "io" @@ -24,14 +25,16 @@ import ( "regexp" "strconv" "strings" + "sync/atomic" "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/utils/units" + "k8s.io/client-go/util/workqueue" apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/util/wait" provider_gce "k8s.io/legacy-cloud-providers/gce" "cloud.google.com/go/compute/metadata" @@ -359,12 +362,22 @@ func (m *gceManagerImpl) buildMigFromSpec(s *dynamic.NodeGroupSpec) (Mig, error) // they no longer exist in GCE. func (m *gceManagerImpl) fetchAutoMigs() error { exists := make(map[GceRef]bool) - changed := false + var changed int32 = 0 + + concurrency := 5 + if value, exists := os.LookupEnv("GCP_MAX_CONCURRENCY"); exists { + if i, err := strconv.Atoi(value); err == nil { + concurrency = i + } + } + + toRegister := make([]Mig, 0) for _, cfg := range m.migAutoDiscoverySpecs { links, err := m.findMigsNamed(cfg.Re) if err != nil { return fmt.Errorf("cannot autodiscover managed instance groups: %v", err) } + for _, link := range links { mig, err := m.buildMigFromAutoCfg(link, cfg) if err != nil { @@ -378,21 +391,26 @@ func (m *gceManagerImpl) fetchAutoMigs() error { klog.V(3).Infof("Ignoring explicitly configured MIG %s in autodiscovery.", mig.GceRef().String()) continue } - if m.registerMig(mig) { - klog.V(3).Infof("Autodiscovered MIG %s using regexp %s", mig.GceRef().String(), cfg.Re.String()) - changed = true - } + toRegister = append(toRegister, mig) } } + workqueue.ParallelizeUntil(context.Background(), concurrency, len(toRegister), func(piece int) { + mig := toRegister[piece] + if m.registerMig(mig) { + klog.V(3).Infof("Autodiscovered MIG %s", mig.GceRef().String()) + atomic.StoreInt32(&changed, int32(1)) + } + }, workqueue.WithChunkSize(concurrency)) + for _, mig := range m.GetMigs() { if !exists[mig.GceRef()] && !m.explicitlyConfigured[mig.GceRef()] { m.cache.UnregisterMig(mig) - changed = true + atomic.StoreInt32(&changed, int32(1)) } } - if changed { + if atomic.LoadInt32(&changed) > 0 { return m.cache.RegenerateInstancesCache() } diff --git a/cluster-autoscaler/cloudprovider/gce/mig_instance_templates_provider.go b/cluster-autoscaler/cloudprovider/gce/mig_instance_templates_provider.go index e934aaa6a907..afe5aa541ec5 100644 --- a/cluster-autoscaler/cloudprovider/gce/mig_instance_templates_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/mig_instance_templates_provider.go @@ -52,14 +52,13 @@ func NewCachingMigInstanceTemplatesProvider(cache *GceCache, gceClient Autoscali // GetMigInstanceTemplate returns instance template for MIG with given ref func (p *CachingMigInstanceTemplatesProvider) GetMigInstanceTemplate(migRef GceRef) (*gce.InstanceTemplate, error) { p.mutex.Lock() - defer p.mutex.Unlock() - if !p.lastRefresh.Add(migInstanceCacheRefreshInterval).After(time.Now()) { p.cache.InvalidateAllMigInstanceTemplates() p.lastRefresh = time.Now() } instanceTemplate, found := p.cache.GetMigInstanceTemplate(migRef) + p.mutex.Unlock() if found { return instanceTemplate, nil @@ -69,6 +68,8 @@ func (p *CachingMigInstanceTemplatesProvider) GetMigInstanceTemplate(migRef GceR if err != nil { return nil, err } + p.mutex.Lock() p.cache.SetMigInstanceTemplate(migRef, instanceTemplate) + p.mutex.Unlock() return instanceTemplate, nil } diff --git a/cluster-autoscaler/cloudprovider/gce/templates.go b/cluster-autoscaler/cloudprovider/gce/templates.go index b90410e6411d..254c28ce8b55 100644 --- a/cluster-autoscaler/cloudprovider/gce/templates.go +++ b/cluster-autoscaler/cloudprovider/gce/templates.go @@ -150,6 +150,11 @@ func (t *GceTemplateBuilder) BuildNodeFromTemplate(mig Mig, template *gce.Instan if err != nil { return nil, err } + var storage int64 = 0 + if len(template.Properties.Disks) > 1 { + storage = 1 + } + capacity["storageclass/local-data"] = *resource.NewQuantity(storage, resource.DecimalSI) node.Status = apiv1.NodeStatus{ Capacity: capacity, } diff --git a/cluster-autoscaler/core/filter_out_schedulable.go b/cluster-autoscaler/core/filter_out_schedulable.go index 5bd766de17c1..8df6379cbc92 100644 --- a/cluster-autoscaler/core/filter_out_schedulable.go +++ b/cluster-autoscaler/core/filter_out_schedulable.go @@ -20,11 +20,13 @@ import ( "sort" "time" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/types" "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" + schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework/v1alpha1" apiv1 "k8s.io/api/core/v1" klog "k8s.io/klog/v2" @@ -66,6 +68,37 @@ func (p *filterOutSchedulablePodListProcessor) Process( filterOutSchedulableStart := time.Now() var unschedulablePodsToHelp []*apiv1.Pod + pvcLister := context.ListerRegistry.PersistentVolumeClaimLister() + for _, po := range unschedulablePods { + var volumes []apiv1.Volume + for _, vol := range po.Spec.Volumes { + if vol.PersistentVolumeClaim == nil { + volumes = append(volumes, vol) + continue + } + pvc, err := pvcLister.PersistentVolumeClaims(po.Namespace).Get(vol.PersistentVolumeClaim.ClaimName) + if err != nil { + volumes = append(volumes, vol) + continue + } + if *pvc.Spec.StorageClassName != "local-data" { + volumes = append(volumes, vol) + continue + } + + if len(po.Spec.Containers[0].Resources.Requests) == 0 { + po.Spec.Containers[0].Resources.Requests = apiv1.ResourceList{} + } + if len(po.Spec.Containers[0].Resources.Limits) == 0 { + po.Spec.Containers[0].Resources.Limits = apiv1.ResourceList{} + } + + po.Spec.Containers[0].Resources.Requests["storageclass/local-data"] = *resource.NewQuantity(1, resource.DecimalSI) + po.Spec.Containers[0].Resources.Limits["storageclass/local-data"] = *resource.NewQuantity(1, resource.DecimalSI) + } + po.Spec.Volumes = volumes + } + unschedulablePodsToHelp, err := p.filterOutSchedulableByPacking(unschedulablePods, context.ClusterSnapshot, context.PredicateChecker) @@ -111,7 +144,8 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking( for _, pod := range unschedulableCandidates { scheduledOnHintedNode := false if hintedNodeName, hintFound := p.schedulablePodsNodeHints[pod.UID]; hintFound { - if predicateChecker.CheckPredicates(clusterSnapshot, pod, hintedNodeName) == nil { + nodeInfo, _ := clusterSnapshot.NodeInfos().Get(hintedNodeName) + if predicateChecker.CheckPredicates(clusterSnapshot, pod, hintedNodeName) == nil && isLivingNode(nodeInfo) { // We treat predicate error and missing node error here in the same way scheduledOnHintedNode = true podsFilteredUsingHints++ @@ -152,7 +186,9 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking( unschedulePodsCacheHitCounter++ continue } - nodeName, err := predicateChecker.FitsAnyNode(clusterSnapshot, pod) + nodeName, err := predicateChecker.FitsAnyNodeMatching(clusterSnapshot, pod, func(nodeInfo *schedulerframework.NodeInfo) bool { + return isLivingNode(nodeInfo) + }) if err == nil { klog.V(4).Infof("Pod %s.%s marked as unschedulable can be scheduled on node %s. Ignoring"+ " in scale up.", pod.Namespace, pod.Name, nodeName) @@ -172,6 +208,36 @@ func (p *filterOutSchedulablePodListProcessor) filterOutSchedulableByPacking( return unschedulablePods, nil } +// filter out dead nodes (having "unknown" NodeReady condition for over 10mn), so we can ignore them if hinted. +// Needed for 1.10 clusters, until we set TaintBasedEvictions feature gate to "true" there (already enabled +// by default on clusters using k8s v1.14 and up): TaintBasedEvictions places a node.kubernetes.io/unreachable +// taint on dead nodes, that helps the CA to consider them unschedulable (unless explicitely tolerated). +func isLivingNode(nodeInfo *schedulerframework.NodeInfo) bool { + if nodeInfo == nil { + // we only care about filtering out nodes having "unknown" status. + return true + } + + node := nodeInfo.Node() + if node == nil && node.Status.Conditions == nil { + return true + } + + for _, cond := range node.Status.Conditions { + if cond.Type != apiv1.NodeReady { + continue + } + if cond.Status != apiv1.ConditionUnknown { + continue + } + if cond.LastTransitionTime.Time.Add(10 * time.Minute).Before(time.Now()) { + return false + } + } + + return true +} + func moreImportantPod(pod1, pod2 *apiv1.Pod) bool { // based on schedulers MoreImportantPod but does not compare Pod.Status.StartTime which does not make sense // for unschedulable pods diff --git a/cluster-autoscaler/core/scale_down.go b/cluster-autoscaler/core/scale_down.go index bc3f5b1cea0f..0c9ac2b101b9 100644 --- a/cluster-autoscaler/core/scale_down.go +++ b/cluster-autoscaler/core/scale_down.go @@ -414,7 +414,7 @@ func (sd *ScaleDown) checkNodeUtilization(timestamp time.Time, node *apiv1.Node, return simulator.ScaleDownDisabledAnnotation, nil } - utilInfo, err := simulator.CalculateUtilization(node, nodeInfo, sd.context.IgnoreDaemonSetsUtilization, sd.context.IgnoreMirrorPodsUtilization, sd.context.CloudProvider.GPULabel()) + utilInfo, err := simulator.CalculateUtilization(node, nodeInfo, sd.context.IgnoreDaemonSetsUtilization, sd.context.IgnoreMirrorPodsUtilization, sd.context.CloudProvider.GPULabel(), timestamp) if err != nil { klog.Warningf("Failed to calculate utilization for %s: %v", node.Name, err) } @@ -488,7 +488,7 @@ func (sd *ScaleDown) UpdateUnneededNodes( klog.V(1).Infof("Scale-down calculation: ignoring %v nodes unremovable in the last %v", skipped, sd.context.AutoscalingOptions.UnremovableNodeRecheckTimeout) } - emptyNodesList := sd.getEmptyNodesNoResourceLimits(currentlyUnneededNodeNames, len(currentlyUnneededNodeNames)) + emptyNodesList := sd.getEmptyNodesNoResourceLimits(currentlyUnneededNodeNames, len(currentlyUnneededNodeNames), timestamp) emptyNodes := make(map[string]bool) for _, node := range emptyNodesList { @@ -873,7 +873,7 @@ func (sd *ScaleDown) TryToScaleDown( // Trying to delete empty nodes in bulk. If there are no empty nodes then CA will // try to delete not-so-empty nodes, possibly killing some pods and allowing them // to recreate on other nodes. - emptyNodes := sd.getEmptyNodes(candidateNames, sd.context.MaxEmptyBulkDelete, scaleDownResourcesLeft) + emptyNodes := sd.getEmptyNodes(candidateNames, sd.context.MaxEmptyBulkDelete, scaleDownResourcesLeft, currentTime) if len(emptyNodes) > 0 { nodeDeletionStart := time.Now() deletedNodes, err := sd.scheduleDeleteEmptyNodes(emptyNodes, sd.context.ClientSet, sd.context.Recorder, readinessMap, candidateNodeGroups) @@ -979,16 +979,16 @@ func updateScaleDownMetrics(scaleDownStart time.Time, findNodesToRemoveDuration metrics.UpdateDuration(metrics.ScaleDownMiscOperations, miscDuration) } -func (sd *ScaleDown) getEmptyNodesNoResourceLimits(candidates []string, maxEmptyBulkDelete int) []*apiv1.Node { - return sd.getEmptyNodes(candidates, maxEmptyBulkDelete, noScaleDownLimitsOnResources()) +func (sd *ScaleDown) getEmptyNodesNoResourceLimits(candidates []string, maxEmptyBulkDelete int, timestamp time.Time) []*apiv1.Node { + return sd.getEmptyNodes(candidates, maxEmptyBulkDelete, noScaleDownLimitsOnResources(), timestamp) } // This functions finds empty nodes among passed candidates and returns a list of empty nodes // that can be deleted at the same time. func (sd *ScaleDown) getEmptyNodes(candidates []string, maxEmptyBulkDelete int, - resourcesLimits scaleDownResourcesLimits) []*apiv1.Node { + resourcesLimits scaleDownResourcesLimits, timestamp time.Time) []*apiv1.Node { - emptyNodes := simulator.FindEmptyNodesToRemove(sd.context.ClusterSnapshot, candidates) + emptyNodes := simulator.FindEmptyNodesToRemove(sd.context.ClusterSnapshot, candidates, timestamp) availabilityMap := make(map[string]int) result := make([]*apiv1.Node, 0) resourcesLimitsCopy := copyScaleDownResourcesLimits(resourcesLimits) // we do not want to modify input parameter diff --git a/cluster-autoscaler/core/scale_down_test.go b/cluster-autoscaler/core/scale_down_test.go index a6050d560022..9af4ee529948 100644 --- a/cluster-autoscaler/core/scale_down_test.go +++ b/cluster-autoscaler/core/scale_down_test.go @@ -635,7 +635,7 @@ func TestDeleteNode(t *testing.T) { fakeClient.Fake.AddReactor("get", "pods", podNotFoundFunc) // build context - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{}, fakeClient, registry, provider, nil) assert.NoError(t, err) @@ -966,7 +966,7 @@ func TestScaleDown(t *testing.T) { } jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job}) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) assert.NoError(t, err) @@ -1218,7 +1218,7 @@ func simpleScaleDownEmpty(t *testing.T, config *scaleTestConfig) { assert.NotNil(t, provider) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(config.options, fakeClient, registry, provider, nil) assert.NoError(t, err) @@ -1305,7 +1305,7 @@ func TestNoScaleDownUnready(t *testing.T) { ScaleDownUnreadyTime: time.Hour, MaxGracefulTerminationSec: 60, } - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) assert.NoError(t, err) @@ -1415,7 +1415,7 @@ func TestScaleDownNoMove(t *testing.T) { } jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job}) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) assert.NoError(t, err) @@ -1664,7 +1664,7 @@ func TestSoftTaint(t *testing.T) { } jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job}) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) assert.NoError(t, err) @@ -1783,7 +1783,7 @@ func TestSoftTaintTimeLimit(t *testing.T) { } jobLister, err := kube_util.NewTestJobLister([]*batchv1.Job{&job}) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, nil, nil, jobLister, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, registry, provider, nil) assert.NoError(t, err) diff --git a/cluster-autoscaler/core/scale_up_test.go b/cluster-autoscaler/core/scale_up_test.go index b7f8ce699e45..63a53ad41336 100644 --- a/cluster-autoscaler/core/scale_up_test.go +++ b/cluster-autoscaler/core/scale_up_test.go @@ -489,7 +489,7 @@ func runSimpleScaleUpTest(t *testing.T, config *scaleTestConfig) *scaleTestResul } podLister := kube_util.NewTestPodLister(pods) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil, nil) provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { expandedGroups <- groupSizeChange{groupName: nodeGroup, sizeChange: increase} @@ -661,7 +661,7 @@ func TestScaleUpUnhealthy(t *testing.T) { p2.Spec.NodeName = "n2" podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2}) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil, nil) provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { t.Fatalf("No expansion is expected, but increased %s by %d", nodeGroup, increase) @@ -702,7 +702,7 @@ func TestScaleUpNoHelp(t *testing.T) { p1.Spec.NodeName = "n1" podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1}) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil, nil) provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { t.Fatalf("No expansion is expected") @@ -775,7 +775,7 @@ func TestScaleUpBalanceGroups(t *testing.T) { } podLister := kube_util.NewTestPodLister(podList) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil, nil) options := config.AutoscalingOptions{ EstimatorName: estimator.BinpackingEstimatorName, @@ -843,7 +843,7 @@ func TestScaleUpAutoprovisionedNodeGroup(t *testing.T) { MaxAutoprovisionedNodeGroupCount: 10, } podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil) assert.NoError(t, err) @@ -896,7 +896,7 @@ func TestScaleUpBalanceAutoprovisionedNodeGroups(t *testing.T) { MaxAutoprovisionedNodeGroupCount: 10, } podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) - listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil, nil) context, err := NewScaleTestAutoscalingContext(options, fakeClient, listers, provider, nil) assert.NoError(t, err) diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 50aaa1817612..a2ed564e9f81 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -188,7 +188,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, - nil, nil, nil, nil) + nil, nil, nil, nil, nil) context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ @@ -376,7 +376,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, - nil, nil, nil, nil) + nil, nil, nil, nil, nil) context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ @@ -509,7 +509,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, - nil, nil, nil, nil) + nil, nil, nil, nil, nil) context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ @@ -652,7 +652,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, - nil, nil, nil, nil) + nil, nil, nil, nil, nil) context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ @@ -777,7 +777,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, - nil, nil, nil, nil) + nil, nil, nil, nil, nil) context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ @@ -870,7 +870,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodMock, unschedulablePodMock, podDisruptionBudgetListerMock, daemonSetListerMock, - nil, nil, nil, nil) + nil, nil, nil, nil, nil) context.ListerRegistry = listerRegistry clusterStateConfig := clusterstate.ClusterStateRegistryConfig{ diff --git a/cluster-autoscaler/core/utils/utils_test.go b/cluster-autoscaler/core/utils/utils_test.go index df8a29563363..b67dddded578 100644 --- a/cluster-autoscaler/core/utils/utils_test.go +++ b/cluster-autoscaler/core/utils/utils_test.go @@ -64,7 +64,7 @@ func TestGetNodeInfosForGroups(t *testing.T) { provider2.AddNodeGroup("ng5", 1, 10, 1) // Nodegroup without nodes. podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) - registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil, nil) predicateChecker, err := simulator.NewTestPredicateChecker() assert.NoError(t, err) @@ -133,7 +133,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { provider1.AddNode("ng4", ready6) podLister := kube_util.NewTestPodLister([]*apiv1.Pod{}) - registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil) + registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil, nil) predicateChecker, err := simulator.NewTestPredicateChecker() assert.NoError(t, err) diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 87701248092a..8fea5fdcee0d 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -157,10 +157,10 @@ candidateloop: if fastCheck { podsToRemove, blockingPod, err = FastGetPodsToMove(nodeInfo, *skipNodesWithSystemPods, *skipNodesWithLocalStorage, - podDisruptionBudgets) + podDisruptionBudgets, timestamp) } else { podsToRemove, blockingPod, err = DetailedGetPodsForMove(nodeInfo, *skipNodesWithSystemPods, *skipNodesWithLocalStorage, listers, int32(*minReplicaCount), - podDisruptionBudgets) + podDisruptionBudgets, timestamp) } if err != nil { @@ -194,7 +194,7 @@ candidateloop: } // FindEmptyNodesToRemove finds empty nodes that can be removed. -func FindEmptyNodesToRemove(snapshot ClusterSnapshot, candidates []string) []string { +func FindEmptyNodesToRemove(snapshot ClusterSnapshot, candidates []string, timestamp time.Time) []string { result := make([]string, 0) for _, node := range candidates { nodeInfo, err := snapshot.NodeInfos().Get(node) @@ -203,7 +203,7 @@ func FindEmptyNodesToRemove(snapshot ClusterSnapshot, candidates []string) []str continue } // Should block on all pods. - podsToRemove, _, err := FastGetPodsToMove(nodeInfo, true, true, nil) + podsToRemove, _, err := FastGetPodsToMove(nodeInfo, true, true, nil, timestamp) if err == nil && len(podsToRemove) == 0 { result = append(result, node) } @@ -214,9 +214,9 @@ func FindEmptyNodesToRemove(snapshot ClusterSnapshot, candidates []string) []str // CalculateUtilization calculates utilization of a node, defined as maximum of (cpu, memory) or gpu utilization // based on if the node has GPU or not. Per resource utilization is the sum of requests for it divided by allocatable. // It also returns the individual cpu, memory and gpu utilization. -func CalculateUtilization(node *apiv1.Node, nodeInfo *schedulerframework.NodeInfo, skipDaemonSetPods, skipMirrorPods bool, gpuLabel string) (utilInfo UtilizationInfo, err error) { +func CalculateUtilization(node *apiv1.Node, nodeInfo *schedulerframework.NodeInfo, skipDaemonSetPods, skipMirrorPods bool, gpuLabel string, currentTime time.Time) (utilInfo UtilizationInfo, err error) { if gpu.NodeHasGpu(gpuLabel, node) { - gpuUtil, err := calculateUtilizationOfResource(node, nodeInfo, gpu.ResourceNvidiaGPU, skipDaemonSetPods, skipMirrorPods) + gpuUtil, err := calculateUtilizationOfResource(node, nodeInfo, gpu.ResourceNvidiaGPU, skipDaemonSetPods, skipMirrorPods, currentTime) if err != nil { klog.V(3).Infof("node %s has unready GPU", node.Name) // Return 0 if GPU is unready. This will guarantee we can still scale down a node with unready GPU. @@ -227,11 +227,11 @@ func CalculateUtilization(node *apiv1.Node, nodeInfo *schedulerframework.NodeInf return UtilizationInfo{GpuUtil: gpuUtil, ResourceName: gpu.ResourceNvidiaGPU, Utilization: gpuUtil}, nil } - cpu, err := calculateUtilizationOfResource(node, nodeInfo, apiv1.ResourceCPU, skipDaemonSetPods, skipMirrorPods) + cpu, err := calculateUtilizationOfResource(node, nodeInfo, apiv1.ResourceCPU, skipDaemonSetPods, skipMirrorPods, currentTime) if err != nil { return UtilizationInfo{}, err } - mem, err := calculateUtilizationOfResource(node, nodeInfo, apiv1.ResourceMemory, skipDaemonSetPods, skipMirrorPods) + mem, err := calculateUtilizationOfResource(node, nodeInfo, apiv1.ResourceMemory, skipDaemonSetPods, skipMirrorPods, currentTime) if err != nil { return UtilizationInfo{}, err } @@ -249,7 +249,7 @@ func CalculateUtilization(node *apiv1.Node, nodeInfo *schedulerframework.NodeInf return utilization, nil } -func calculateUtilizationOfResource(node *apiv1.Node, nodeInfo *schedulerframework.NodeInfo, resourceName apiv1.ResourceName, skipDaemonSetPods, skipMirrorPods bool) (float64, error) { +func calculateUtilizationOfResource(node *apiv1.Node, nodeInfo *schedulerframework.NodeInfo, resourceName apiv1.ResourceName, skipDaemonSetPods, skipMirrorPods bool, currentTime time.Time) (float64, error) { nodeAllocatable, found := node.Status.Allocatable[resourceName] if !found { return 0, fmt.Errorf("failed to get %v from %s", resourceName, node.Name) @@ -273,6 +273,10 @@ func calculateUtilizationOfResource(node *apiv1.Node, nodeInfo *schedulerframewo if skipMirrorPods && pod_util.IsMirrorPod(podInfo.Pod) { continue } + // ignore Pods that should be terminated + if drain.IsPodLongTerminating(podInfo.Pod, currentTime) { + continue + } for _, container := range podInfo.Pod.Spec.Containers { if resourceValue, found := container.Resources.Requests[resourceName]; found { podsRequest.Add(resourceValue) diff --git a/cluster-autoscaler/simulator/cluster_test.go b/cluster-autoscaler/simulator/cluster_test.go index 76d637bf52ac..409c71092c68 100644 --- a/cluster-autoscaler/simulator/cluster_test.go +++ b/cluster-autoscaler/simulator/cluster_test.go @@ -23,6 +23,7 @@ import ( apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/utils/drain" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/kubernetes/pkg/kubelet/types" @@ -32,6 +33,7 @@ import ( ) func TestUtilization(t *testing.T) { + testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC) gpuLabel := GetGPULabel() pod := BuildTestPod("p1", 100, 200000) pod2 := BuildTestPod("p2", -1, -1) @@ -40,13 +42,13 @@ func TestUtilization(t *testing.T) { node := BuildTestNode("node1", 2000, 2000000) SetNodeReadyState(node, true, time.Time{}) - utilInfo, err := CalculateUtilization(node, nodeInfo, false, false, gpuLabel) + utilInfo, err := CalculateUtilization(node, nodeInfo, false, false, gpuLabel, testTime) assert.NoError(t, err) assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01) node2 := BuildTestNode("node1", 2000, -1) - _, err = CalculateUtilization(node2, nodeInfo, false, false, gpuLabel) + _, err = CalculateUtilization(node2, nodeInfo, false, false, gpuLabel, testTime) assert.Error(t, err) daemonSetPod3 := BuildTestPod("p3", 100, 200000) @@ -57,12 +59,19 @@ func TestUtilization(t *testing.T) { daemonSetPod4.Annotations = map[string]string{"cluster-autoscaler.kubernetes.io/daemonset-pod": "true"} nodeInfo = schedulerframework.NewNodeInfo(pod, pod, pod2, daemonSetPod3, daemonSetPod4) - utilInfo, err = CalculateUtilization(node, nodeInfo, true, false, gpuLabel) + utilInfo, err = CalculateUtilization(node, nodeInfo, true, false, gpuLabel, testTime) assert.NoError(t, err) assert.InEpsilon(t, 2.5/10, utilInfo.Utilization, 0.01) nodeInfo = schedulerframework.NewNodeInfo(pod, pod2, daemonSetPod3) - utilInfo, err = CalculateUtilization(node, nodeInfo, false, false, gpuLabel) + utilInfo, err = CalculateUtilization(node, nodeInfo, false, false, gpuLabel, testTime) + assert.NoError(t, err) + assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01) + + terminatedPod := BuildTestPod("podTerminated", 100, 200000) + terminatedPod.DeletionTimestamp = &metav1.Time{testTime.Add(-10 * time.Minute)} + nodeInfo = schedulerframework.NewNodeInfo(pod, pod, pod2, terminatedPod) + utilInfo, err = CalculateUtilization(node, nodeInfo, false, false, gpuLabel, testTime) assert.NoError(t, err) assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01) @@ -72,22 +81,27 @@ func TestUtilization(t *testing.T) { } nodeInfo = schedulerframework.NewNodeInfo(pod, pod, pod2, mirrorPod4) - utilInfo, err = CalculateUtilization(node, nodeInfo, false, true, gpuLabel) + utilInfo, err = CalculateUtilization(node, nodeInfo, false, true, gpuLabel, testTime) assert.NoError(t, err) assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01) nodeInfo = schedulerframework.NewNodeInfo(pod, pod2, mirrorPod4) - utilInfo, err = CalculateUtilization(node, nodeInfo, false, false, gpuLabel) + utilInfo, err = CalculateUtilization(node, nodeInfo, false, false, gpuLabel, testTime) assert.NoError(t, err) assert.InEpsilon(t, 2.0/10, utilInfo.Utilization, 0.01) + nodeInfo = schedulerframework.NewNodeInfo(pod, mirrorPod4, daemonSetPod3) + utilInfo, err = CalculateUtilization(node, nodeInfo, true, true, gpuLabel, testTime) + assert.NoError(t, err) + assert.InEpsilon(t, 1.0/8.0, utilInfo.Utilization, 0.01) + gpuNode := BuildTestNode("gpu_node", 2000, 2000000) AddGpusToNode(gpuNode, 1) gpuPod := BuildTestPod("gpu_pod", 100, 200000) RequestGpuForPod(gpuPod, 1) TolerateGpuForPod(gpuPod) nodeInfo = schedulerframework.NewNodeInfo(pod, pod, gpuPod) - utilInfo, err = CalculateUtilization(gpuNode, nodeInfo, false, false, gpuLabel) + utilInfo, err = CalculateUtilization(gpuNode, nodeInfo, false, false, gpuLabel, testTime) assert.NoError(t, err) assert.InEpsilon(t, 1/1, utilInfo.Utilization, 0.01) @@ -95,7 +109,7 @@ func TestUtilization(t *testing.T) { gpuNode = BuildTestNode("gpu_node", 2000, 2000000) AddGpuLabelToNode(gpuNode) nodeInfo = schedulerframework.NewNodeInfo(pod, pod) - utilInfo, err = CalculateUtilization(gpuNode, nodeInfo, false, false, gpuLabel) + utilInfo, err = CalculateUtilization(gpuNode, nodeInfo, false, false, gpuLabel, testTime) assert.NoError(t, err) assert.Zero(t, utilInfo.Utilization) } @@ -246,8 +260,8 @@ func TestFindEmptyNodes(t *testing.T) { clusterSnapshot := NewBasicClusterSnapshot() InitializeClusterSnapshotOrDie(t, clusterSnapshot, []*apiv1.Node{nodes[0], nodes[1], nodes[2], nodes[3]}, []*apiv1.Pod{pod1, pod2}) - - emptyNodes := FindEmptyNodesToRemove(clusterSnapshot, nodeNames) + testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC) + emptyNodes := FindEmptyNodesToRemove(clusterSnapshot, nodeNames, testTime) assert.Equal(t, []string{nodeNames[0], nodeNames[2], nodeNames[3]}, emptyNodes) } diff --git a/cluster-autoscaler/simulator/drain.go b/cluster-autoscaler/simulator/drain.go index 3c0f990f9cf9..8d6991d5aa8b 100644 --- a/cluster-autoscaler/simulator/drain.go +++ b/cluster-autoscaler/simulator/drain.go @@ -35,7 +35,7 @@ import ( // along with their pods (no abandoned pods with dangling created-by annotation). Useful for fast // checks. func FastGetPodsToMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool, - pdbs []*policyv1.PodDisruptionBudget) ([]*apiv1.Pod, *drain.BlockingPod, error) { + pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) ([]*apiv1.Pod, *drain.BlockingPod, error) { var pods []*apiv1.Pod for _, podInfo := range nodeInfo.Pods { pods = append(pods, podInfo.Pod) @@ -48,7 +48,7 @@ func FastGetPodsToMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWithSyste false, nil, 0, - time.Now()) + timestamp) if err != nil { return pods, blockingPod, err @@ -66,7 +66,7 @@ func FastGetPodsToMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWithSyste // still exist. func DetailedGetPodsForMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWithSystemPods bool, skipNodesWithLocalStorage bool, listers kube_util.ListerRegistry, minReplicaCount int32, - pdbs []*policyv1.PodDisruptionBudget) ([]*apiv1.Pod, *drain.BlockingPod, error) { + pdbs []*policyv1.PodDisruptionBudget, timestamp time.Time) ([]*apiv1.Pod, *drain.BlockingPod, error) { var pods []*apiv1.Pod for _, podInfo := range nodeInfo.Pods { pods = append(pods, podInfo.Pod) @@ -79,7 +79,7 @@ func DetailedGetPodsForMove(nodeInfo *schedulerframework.NodeInfo, skipNodesWith true, listers, minReplicaCount, - time.Now()) + timestamp) if err != nil { return pods, blockingPod, err } diff --git a/cluster-autoscaler/simulator/drain_test.go b/cluster-autoscaler/simulator/drain_test.go index 949817777f0f..3388400960c1 100644 --- a/cluster-autoscaler/simulator/drain_test.go +++ b/cluster-autoscaler/simulator/drain_test.go @@ -18,6 +18,7 @@ package simulator import ( "testing" + "time" apiv1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1beta1" @@ -32,7 +33,7 @@ import ( ) func TestFastGetPodsToMove(t *testing.T) { - + testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC) // Unreplicated pod pod1 := &apiv1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -40,7 +41,7 @@ func TestFastGetPodsToMove(t *testing.T) { Namespace: "ns", }, } - _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod1), true, true, nil) + _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod1), true, true, nil, testTime) assert.Error(t, err) assert.Equal(t, &drain.BlockingPod{Pod: pod1, Reason: drain.NotReplicated}, blockingPod) @@ -52,7 +53,7 @@ func TestFastGetPodsToMove(t *testing.T) { OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""), }, } - r2, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod2), true, true, nil) + r2, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod2), true, true, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 1, len(r2)) @@ -68,7 +69,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, }, } - r3, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod3), true, true, nil) + r3, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod3), true, true, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 0, len(r3)) @@ -81,7 +82,7 @@ func TestFastGetPodsToMove(t *testing.T) { OwnerReferences: GenerateOwnerReferences("ds", "DaemonSet", "extensions/v1beta1", ""), }, } - r4, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod2, pod3, pod4), true, true, nil) + r4, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod2, pod3, pod4), true, true, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 1, len(r4)) @@ -95,7 +96,7 @@ func TestFastGetPodsToMove(t *testing.T) { OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""), }, } - _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod5), true, true, nil) + _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod5), true, true, nil, testTime) assert.Error(t, err) assert.Equal(t, &drain.BlockingPod{Pod: pod5, Reason: drain.UnmovableKubeSystemPod}, blockingPod) @@ -116,7 +117,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, }, } - _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod6), true, true, nil) + _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod6), true, true, nil, testTime) assert.Error(t, err) assert.Equal(t, &drain.BlockingPod{Pod: pod6, Reason: drain.LocalStorageRequested}, blockingPod) @@ -139,7 +140,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, }, } - r7, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod7), true, true, nil) + r7, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod7), true, true, nil, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 1, len(r7)) @@ -175,7 +176,7 @@ func TestFastGetPodsToMove(t *testing.T) { }, } - _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod8), true, true, []*policyv1.PodDisruptionBudget{pdb8}) + _, blockingPod, err = FastGetPodsToMove(schedulerframework.NewNodeInfo(pod8), true, true, []*policyv1.PodDisruptionBudget{pdb8}, testTime) assert.Error(t, err) assert.Equal(t, &drain.BlockingPod{Pod: pod8, Reason: drain.NotEnoughPdb}, blockingPod) @@ -209,8 +210,41 @@ func TestFastGetPodsToMove(t *testing.T) { }, } - r9, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod9), true, true, []*policyv1.PodDisruptionBudget{pdb9}) + r9, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod9), true, true, []*policyv1.PodDisruptionBudget{pdb9}, testTime) assert.NoError(t, err) assert.Nil(t, blockingPod) assert.Equal(t, 1, len(r9)) + + pod10 := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod10", + Namespace: "ns", + OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""), + }, + } + pod10Terminated := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod10Terminated", + Namespace: "ns", + OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""), + DeletionTimestamp: &metav1.Time{ + Time: testTime.Add(-1*drain.PodLongTerminatingExtraThreshold - time.Minute), // more than PodLongTerminatingExtraThreshold + }, + }, + } + pod10Terminating := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod10Terminating", + Namespace: "ns", + OwnerReferences: GenerateOwnerReferences("rs", "ReplicaSet", "extensions/v1beta1", ""), + DeletionTimestamp: &metav1.Time{ + Time: testTime.Add(-1*drain.PodLongTerminatingExtraThreshold + time.Minute), // still terminating, below the default TerminatingGracePeriode + }, + }, + } + + r10SkipPodsThatShouldBeTerminatedTrue, _, blockingPod, err := FastGetPodsToMove(schedulerframework.NewNodeInfo(pod10, pod10Terminated, pod10Terminating), true, true, nil, testTime) + assert.NoError(t, err) + assert.Nil(t, blockingPod) + assert.ElementsMatch(t, []*apiv1.Pod{pod10, pod10Terminating}, r10SkipPodsThatShouldBeTerminatedTrue) } diff --git a/cluster-autoscaler/utils/drain/drain.go b/cluster-autoscaler/utils/drain/drain.go index b3fdd3ba0c7d..90a0648e93e9 100644 --- a/cluster-autoscaler/utils/drain/drain.go +++ b/cluster-autoscaler/utils/drain/drain.go @@ -30,8 +30,8 @@ import ( ) const ( - // PodDeletionTimeout - time after which a pod to be deleted is not included in the list of pods for drain. - PodDeletionTimeout = 12 * time.Minute + // PodLongTerminatingExtraThreshold - time after which a pod, that is terminating and that has run over its terminationGracePeriod, should be ignored and considered as deleted + PodLongTerminatingExtraThreshold = 30 * time.Second ) const ( @@ -100,7 +100,7 @@ func GetPodsForDeletionOnNodeDrain( // Possibly skip a pod under deletion but only if it was being deleted for long enough // to avoid a situation when we delete the empty node immediately after the pod was marked for // deletion without respecting any graceful termination. - if pod.DeletionTimestamp != nil && pod.DeletionTimestamp.Time.Before(currentTime.Add(-1*PodDeletionTimeout)) { + if IsPodLongTerminating(pod, currentTime) { // pod is being deleted for long enough - no need to care about it. continue } @@ -287,3 +287,18 @@ func hasSafeToEvictAnnotation(pod *apiv1.Pod) bool { func hasNotSafeToEvictAnnotation(pod *apiv1.Pod) bool { return pod.GetAnnotations()[PodSafeToEvictKey] == "false" } + +// IsPodLongTerminating checks if a pod has been terminating for a long time (pod's terminationGracePeriod + an additional const buffer) +func IsPodLongTerminating(pod *apiv1.Pod, currentTime time.Time) bool { + // pod has not even been deleted + if pod.DeletionTimestamp == nil { + return false + } + + gracePeriod := pod.Spec.TerminationGracePeriodSeconds + if gracePeriod == nil { + defaultGracePeriod := int64(apiv1.DefaultTerminationGracePeriodSeconds) + gracePeriod = &defaultGracePeriod + } + return pod.DeletionTimestamp.Time.Add(time.Duration(*gracePeriod) * time.Second).Add(PodLongTerminatingExtraThreshold).Before(currentTime) +} diff --git a/cluster-autoscaler/utils/drain/drain_test.go b/cluster-autoscaler/utils/drain/drain_test.go index 79d23b29f8f0..80b803bed1ea 100644 --- a/cluster-autoscaler/utils/drain/drain_test.go +++ b/cluster-autoscaler/utils/drain/drain_test.go @@ -34,6 +34,7 @@ import ( ) func TestDrain(t *testing.T) { + testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC) replicas := int32(5) rc := apiv1.ReplicationController{ @@ -175,7 +176,7 @@ func TestDrain(t *testing.T) { Name: "bar", Namespace: "default", OwnerReferences: GenerateOwnerReferences(rs.Name, "ReplicaSet", "apps/v1", ""), - DeletionTimestamp: &metav1.Time{Time: time.Now().Add(-time.Hour)}, + DeletionTimestamp: &metav1.Time{Time: testTime.Add(-time.Hour)}, }, Spec: apiv1.PodSpec{ NodeName: "node", @@ -223,6 +224,41 @@ func TestDrain(t *testing.T) { }, } + zeroGracePeriod := int64(0) + longTerminatingPod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + Namespace: "default", + DeletionTimestamp: &metav1.Time{Time: testTime.Add(-2 * PodLongTerminatingExtraThreshold)}, + OwnerReferences: GenerateOwnerReferences(rc.Name, "ReplicationController", "core/v1", ""), + }, + Spec: apiv1.PodSpec{ + NodeName: "node", + RestartPolicy: apiv1.RestartPolicyOnFailure, + TerminationGracePeriodSeconds: &zeroGracePeriod, + }, + Status: apiv1.PodStatus{ + Phase: apiv1.PodUnknown, + }, + } + extendedGracePeriod := int64(6 * 60) // 6 minutes + longTerminatingPodWithExtendedGracePeriod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + Namespace: "default", + DeletionTimestamp: &metav1.Time{Time: testTime.Add(-time.Duration(extendedGracePeriod/2) * time.Second)}, + OwnerReferences: GenerateOwnerReferences(rc.Name, "ReplicationController", "core/v1", ""), + }, + Spec: apiv1.PodSpec{ + NodeName: "node", + RestartPolicy: apiv1.RestartPolicyOnFailure, + TerminationGracePeriodSeconds: &extendedGracePeriod, + }, + Status: apiv1.PodStatus{ + Phase: apiv1.PodUnknown, + }, + } + failedPod := &apiv1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: "bar", @@ -451,11 +487,27 @@ func TestDrain(t *testing.T) { expectPods: []*apiv1.Pod{failedPod}, }, { - description: "evicted pod", - pods: []*apiv1.Pod{evictedPod}, - pdbs: []*policyv1.PodDisruptionBudget{}, - expectFatal: false, - expectPods: []*apiv1.Pod{evictedPod}, + description: "long terminating pod with 0 grace period", + pods: []*apiv1.Pod{longTerminatingPod}, + pdbs: []*policyv1.PodDisruptionBudget{}, + rcs: []*apiv1.ReplicationController{&rc}, + expectFatal: false, + expectPods: []*apiv1.Pod{}, + }, + { + description: "long terminating pod with extended grace period", + pods: []*apiv1.Pod{longTerminatingPodWithExtendedGracePeriod}, + pdbs: []*policyv1.PodDisruptionBudget{}, + rcs: []*apiv1.ReplicationController{&rc}, + expectFatal: false, + expectPods: []*apiv1.Pod{longTerminatingPodWithExtendedGracePeriod}, + }, + { + description: "evicted pod", + pods: []*apiv1.Pod{evictedPod}, + pdbs: []*policyv1.PodDisruptionBudget{}, + expectFatal: false, + expectPods: []*apiv1.Pod{evictedPod}, }, { description: "pod in terminal state", @@ -567,9 +619,9 @@ func TestDrain(t *testing.T) { ssLister, err := kube_util.NewTestStatefulSetLister([]*appsv1.StatefulSet{&statefulset}) assert.NoError(t, err) - registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister) + registry := kube_util.NewListerRegistry(nil, nil, nil, nil, nil, dsLister, rcLister, jobLister, rsLister, ssLister, nil) - pods, blockingPod, err := GetPodsForDeletionOnNodeDrain(test.pods, test.pdbs, true, true, true, registry, 0, time.Now()) + pods, blockingPod, err := GetPodsForDeletionOnNodeDrain(test.pods, test.pdbs, true, true, true, registry, 0, testTime) if test.expectFatal { assert.Equal(t, test.expectBlockingPod, blockingPod) @@ -590,3 +642,108 @@ func TestDrain(t *testing.T) { } } } + +func TestIsPodLongTerminating(t *testing.T) { + testTime := time.Date(2020, time.December, 18, 17, 0, 0, 0, time.UTC) + twoMinGracePeriod := int64(2 * 60) + zeroGracePeriod := int64(0) + + tests := []struct { + name string + pod apiv1.Pod + want bool + }{ + { + name: "No deletion timestamp", + pod: apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: nil, + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: &zeroGracePeriod, + }, + }, + want: false, + }, + { + name: "Just deleted no grace period defined", + pod: apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: testTime}, // default Grace Period is 30s so this pod can still be terminating + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: nil, + }, + }, + want: false, + }, + { + name: "Deleted for longer than PodLongTerminatingExtraThreshold with no grace period", + pod: apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: testTime.Add(-3 * PodLongTerminatingExtraThreshold)}, + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: nil, + }, + }, + want: true, + }, + { + name: "Just deleted with grace period defined to 0", + pod: apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: testTime}, + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: &zeroGracePeriod, + }, + }, + want: false, + }, + { + name: "Deleted for longer than PodLongTerminatingExtraThreshold with grace period defined to 0", + pod: apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: testTime.Add(-2 * PodLongTerminatingExtraThreshold)}, + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: &zeroGracePeriod, + }, + }, + want: true, + }, + { + name: "Deleted for longer than PodLongTerminatingExtraThreshold but not longer than grace period (2 min)", + pod: apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: testTime.Add(-2 * PodLongTerminatingExtraThreshold)}, + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: &twoMinGracePeriod, + }, + }, + want: false, + }, + { + name: "Deleted for longer than grace period (2 min) and PodLongTerminatingExtraThreshold", + pod: apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: testTime.Add(-2*PodLongTerminatingExtraThreshold - time.Duration(twoMinGracePeriod)*time.Second)}, + }, + Spec: apiv1.PodSpec{ + TerminationGracePeriodSeconds: &twoMinGracePeriod, + }, + }, + want: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + if got := IsPodLongTerminating(&tc.pod, testTime); got != tc.want { + t.Errorf("IsPodLongTerminating() = %v, want %v", got, tc.want) + } + }) + } +} diff --git a/cluster-autoscaler/utils/kubernetes/listers.go b/cluster-autoscaler/utils/kubernetes/listers.go index b70ca1db3ca4..4d31acaeb6dc 100644 --- a/cluster-autoscaler/utils/kubernetes/listers.go +++ b/cluster-autoscaler/utils/kubernetes/listers.go @@ -46,6 +46,7 @@ type ListerRegistry interface { JobLister() v1batchlister.JobLister ReplicaSetLister() v1appslister.ReplicaSetLister StatefulSetLister() v1appslister.StatefulSetLister + PersistentVolumeClaimLister() v1lister.PersistentVolumeClaimLister } type listerRegistryImpl struct { @@ -59,6 +60,7 @@ type listerRegistryImpl struct { jobLister v1batchlister.JobLister replicaSetLister v1appslister.ReplicaSetLister statefulSetLister v1appslister.StatefulSetLister + persistentVolumeClaimLister v1lister.PersistentVolumeClaimLister } // NewListerRegistry returns a registry providing various listers to list pods or nodes matching conditions @@ -66,7 +68,8 @@ func NewListerRegistry(allNode NodeLister, readyNode NodeLister, scheduledPod Po unschedulablePod PodLister, podDisruptionBudgetLister PodDisruptionBudgetLister, daemonSetLister v1appslister.DaemonSetLister, replicationControllerLister v1lister.ReplicationControllerLister, jobLister v1batchlister.JobLister, replicaSetLister v1appslister.ReplicaSetLister, - statefulSetLister v1appslister.StatefulSetLister) ListerRegistry { + statefulSetLister v1appslister.StatefulSetLister, + persistentVolumeClaimLister v1lister.PersistentVolumeClaimLister) ListerRegistry { return listerRegistryImpl{ allNodeLister: allNode, readyNodeLister: readyNode, @@ -78,6 +81,7 @@ func NewListerRegistry(allNode NodeLister, readyNode NodeLister, scheduledPod Po jobLister: jobLister, replicaSetLister: replicaSetLister, statefulSetLister: statefulSetLister, + persistentVolumeClaimLister: persistentVolumeClaimLister, } } @@ -93,9 +97,10 @@ func NewListerRegistryWithDefaultListers(kubeClient client.Interface, stopChanne jobLister := NewJobLister(kubeClient, stopChannel) replicaSetLister := NewReplicaSetLister(kubeClient, stopChannel) statefulSetLister := NewStatefulSetLister(kubeClient, stopChannel) + persistentVolumeClaimLister := NewPersistentVolumeClaimLister(kubeClient, stopChannel) return NewListerRegistry(allNodeLister, readyNodeLister, scheduledPodLister, unschedulablePodLister, podDisruptionBudgetLister, daemonSetLister, - replicationControllerLister, jobLister, replicaSetLister, statefulSetLister) + replicationControllerLister, jobLister, replicaSetLister, statefulSetLister, persistentVolumeClaimLister) } // AllNodeLister returns the AllNodeLister registered to this registry @@ -148,6 +153,20 @@ func (r listerRegistryImpl) StatefulSetLister() v1appslister.StatefulSetLister { return r.statefulSetLister } +// PersistentVolumeClaimLister returns the persistentVolumeClaimLister registered to this registry +func (r listerRegistryImpl) PersistentVolumeClaimLister() v1lister.PersistentVolumeClaimLister { + return r.persistentVolumeClaimLister +} + +// NewPersistentVolumeClaimLister builds a persistentvolumeclaim lister. +func NewPersistentVolumeClaimLister(kubeClient client.Interface, stopchannel <-chan struct{}) v1lister.PersistentVolumeClaimLister { + listWatcher := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "persistentvolumeclaims", apiv1.NamespaceAll, fields.Everything()) + store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(listWatcher, &apiv1.PersistentVolumeClaim{}, time.Hour) + lister := v1lister.NewPersistentVolumeClaimLister(store) + go reflector.Run(stopchannel) + return lister +} + // PodLister lists pods. type PodLister interface { List() ([]*apiv1.Pod, error)