diff --git a/cluster-autoscaler/cloudprovider/resource_limiter.go b/cluster-autoscaler/cloudprovider/resource_limiter.go index 56143017994..84dc9eaaaed 100644 --- a/cluster-autoscaler/cloudprovider/resource_limiter.go +++ b/cluster-autoscaler/cloudprovider/resource_limiter.go @@ -18,9 +18,11 @@ package cloudprovider import ( "fmt" - "k8s.io/apimachinery/pkg/util/sets" "math" "strings" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" ) // ResourceLimiter contains limits (max, min) for resources (cores, memory etc.). @@ -29,6 +31,11 @@ type ResourceLimiter struct { maxLimits map[string]int64 } +// ID returns the identifier of the limiter. +func (r *ResourceLimiter) ID() string { + return "cluster-wide" +} + // NewResourceLimiter creates new ResourceLimiter for map. Maps are deep copied. func NewResourceLimiter(minLimits map[string]int64, maxLimits map[string]int64) *ResourceLimiter { minLimitsCopy := make(map[string]int64) @@ -88,3 +95,18 @@ func (r *ResourceLimiter) String() string { } return strings.Join(resourceDetails, ", ") } + +// AppliesTo checks if the limiter applies to node. +// +// As this is a compatibility layer for cluster-wide limits, it always returns true. +func (r *ResourceLimiter) AppliesTo(node *apiv1.Node) bool { + return true +} + +// Limits returns max limits of the limiter. +// +// New resource quotas system supports only max limits, therefore only max limits +// are returned here. +func (r *ResourceLimiter) Limits() map[string]int64 { + return r.maxLimits +} diff --git a/cluster-autoscaler/resourcequotas/factory.go b/cluster-autoscaler/resourcequotas/factory.go new file mode 100644 index 00000000000..a3916a707a8 --- /dev/null +++ b/cluster-autoscaler/resourcequotas/factory.go @@ -0,0 +1,78 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +// TrackerFactory builds quota trackers. +type TrackerFactory struct { + crp customresources.CustomResourcesProcessor + quotasProvider Provider + usageCalculator *usageCalculator +} + +// TrackerOptions stores configuration for quota tracking. +type TrackerOptions struct { + CustomResourcesProcessor customresources.CustomResourcesProcessor + QuotaProvider Provider + NodeFilter NodeFilter +} + +// NewTrackerFactory creates a new TrackerFactory. +func NewTrackerFactory(opts TrackerOptions) *TrackerFactory { + uc := newUsageCalculator(opts.CustomResourcesProcessor, opts.NodeFilter) + return &TrackerFactory{ + crp: opts.CustomResourcesProcessor, + quotasProvider: opts.QuotaProvider, + usageCalculator: uc, + } +} + +// NewQuotasTracker builds a new Tracker. +// +// NewQuotasTracker calculates resources used by the nodes for every +// quota returned by the Provider. Then, based on usages and limits it calculates +// how many resources can be still added to the cluster. Returns a Tracker object. +func (f *TrackerFactory) NewQuotasTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node) (*Tracker, error) { + quotas, err := f.quotasProvider.Quotas() + if err != nil { + return nil, err + } + usages, err := f.usageCalculator.calculateUsages(ctx, nodes, quotas) + if err != nil { + return nil, err + } + var quotaStatuses []*quotaStatus + for _, rq := range quotas { + limitsLeft := make(resourceList) + limits := rq.Limits() + for resourceType, limit := range limits { + usage := usages[rq.ID()][resourceType] + limitsLeft[resourceType] = max(0, limit-usage) + } + quotaStatuses = append(quotaStatuses, "aStatus{ + quota: rq, + limitsLeft: limitsLeft, + }) + } + tracker := newTracker(f.crp, quotaStatuses) + return tracker, nil +} diff --git a/cluster-autoscaler/resourcequotas/factory_test.go b/cluster-autoscaler/resourcequotas/factory_test.go new file mode 100644 index 00000000000..56943010196 --- /dev/null +++ b/cluster-autoscaler/resourcequotas/factory_test.go @@ -0,0 +1,249 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/units" +) + +type nodeExcludeFn func(node *apiv1.Node) bool + +func (n nodeExcludeFn) ExcludeFromTracking(node *apiv1.Node) bool { + return n(node) +} + +func TestNewQuotasTracker(t *testing.T) { + testCases := []struct { + name string + crp customresources.CustomResourcesProcessor + nodeFilter NodeFilter + nodes []*apiv1.Node + limits map[string]int64 + newNode *apiv1.Node + nodeDelta int + wantResult *CheckDeltaResult + }{ + { + name: "default config allowed operation", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 12, + "memory": 32 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "default config exceeded operation", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 6, + "memory": 16 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededQuotas: []ExceededQuota{ + {ID: "cluster-wide", ExceededResources: []string{"cpu", "memory"}}, + }, + }, + }, + { + name: "default config partially allowed operation", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 7, + "memory": 16 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededQuotas: []ExceededQuota{ + {ID: "cluster-wide", ExceededResources: []string{"cpu", "memory"}}, + }, + }, + }, + { + name: "custom resource config allowed operation", + crp: &fakeCustomResourcesProcessor{ + NodeResourceTargets: func(n *apiv1.Node) []customresources.CustomResourceTarget { + if n.Name == "n1" { + return []customresources.CustomResourceTarget{ + { + ResourceType: "gpu", + ResourceCount: 1, + }, + } + } + return nil + }, + }, + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 12, + "memory": 32 * units.GiB, + "gpu": 6, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "custom resource config exceeded operation", + crp: &fakeCustomResourcesProcessor{ + NodeResourceTargets: func(n *apiv1.Node) []customresources.CustomResourceTarget { + if n.Name == "n1" || n.Name == "n4" { + return []customresources.CustomResourceTarget{ + { + ResourceType: "gpu", + ResourceCount: 1, + }, + } + } + return nil + }, + }, + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 12, + "memory": 32 * units.GiB, + "gpu": 1, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededQuotas: []ExceededQuota{ + {ID: "cluster-wide", ExceededResources: []string{"gpu"}}, + }, + }, + }, + { + name: "node filter config allowed operation", + nodeFilter: nodeExcludeFn(func(node *apiv1.Node) bool { + return node.Name == "n3" + }), + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 4, + "memory": 8 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 1000, 2*units.GiB), + nodeDelta: 1, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + }, + }, + { + name: "node filter config exceeded operation", + nodeFilter: nodeExcludeFn(func(node *apiv1.Node) bool { + return node.Name == "n3" + }), + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 4, + "memory": 8 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 1, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededQuotas: []ExceededQuota{ + {ID: "cluster-wide", ExceededResources: []string{"cpu", "memory"}}, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cloudProvider := cptest.NewTestCloudProviderBuilder().Build() + resourceLimiter := cloudprovider.NewResourceLimiter(nil, tc.limits) + cloudProvider.SetResourceLimiter(resourceLimiter) + ctx := &context.AutoscalingContext{CloudProvider: cloudProvider} + crp := tc.crp + if crp == nil { + crp = &fakeCustomResourcesProcessor{} + } + factory := NewTrackerFactory(TrackerOptions{ + CustomResourcesProcessor: crp, + QuotaProvider: NewCloudQuotasProvider(cloudProvider), + NodeFilter: tc.nodeFilter, + }) + tracker, err := factory.NewQuotasTracker(ctx, tc.nodes) + if err != nil { + t.Errorf("failed to create tracker: %v", err) + } + var ng cloudprovider.NodeGroup + result, err := tracker.CheckDelta(ctx, ng, tc.newNode, tc.nodeDelta) + if err != nil { + t.Errorf("failed to check delta: %v", err) + } + opts := []cmp.Option{ + cmpopts.SortSlices(func(a, b string) bool { return a < b }), + cmpopts.EquateEmpty(), + } + if diff := cmp.Diff(tc.wantResult, result, opts...); diff != "" { + t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/cluster-autoscaler/resourcequotas/provider.go b/cluster-autoscaler/resourcequotas/provider.go new file mode 100644 index 00000000000..848c5734e5c --- /dev/null +++ b/cluster-autoscaler/resourcequotas/provider.go @@ -0,0 +1,50 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +// Provider provides Quotas. Each Provider implementation acts as a different +// source of Quotas. +type Provider interface { + Quotas() ([]Quota, error) +} + +// CloudQuotasProvider is an adapter for cloudprovider.ResourceLimiter. +type CloudQuotasProvider struct { + cloudProvider cloudprovider.CloudProvider +} + +// Quotas returns the cloud provider's ResourceLimiter, which implements Quota interface. +// +// This acts as a compatibility layer with the legacy resource limits system. +func (p *CloudQuotasProvider) Quotas() ([]Quota, error) { + rl, err := p.cloudProvider.GetResourceLimiter() + if err != nil { + return nil, err + } + return []Quota{rl}, nil +} + +// NewCloudQuotasProvider returns a new CloudQuotasProvider. +func NewCloudQuotasProvider(cloudProvider cloudprovider.CloudProvider) *CloudQuotasProvider { + return &CloudQuotasProvider{ + cloudProvider: cloudProvider, + } +} diff --git a/cluster-autoscaler/resourcequotas/provider_test.go b/cluster-autoscaler/resourcequotas/provider_test.go new file mode 100644 index 00000000000..bc4003fd594 --- /dev/null +++ b/cluster-autoscaler/resourcequotas/provider_test.go @@ -0,0 +1,46 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/units" +) + +func TestCloudLimitersProvider(t *testing.T) { + cloudProvider := test.NewTestCloudProviderBuilder().Build() + maxLimits := map[string]int64{"cpu": 4, "memory": 16 * units.GiB} + resourceLimiter := cloudprovider.NewResourceLimiter(nil, maxLimits) + cloudProvider.SetResourceLimiter(resourceLimiter) + + quotasProvider := NewCloudQuotasProvider(cloudProvider) + quotas, err := quotasProvider.Quotas() + if err != nil { + t.Errorf("failed to get quotas: %v", err) + } + if len(quotas) != 1 { + t.Errorf("got %d quotas, expected 1", len(quotas)) + } + quota := quotas[0] + if diff := cmp.Diff(maxLimits, quota.Limits()); diff != "" { + t.Errorf("Limits() mismatch (-want +got):\n%s", diff) + } +} diff --git a/cluster-autoscaler/resourcequotas/testutils.go b/cluster-autoscaler/resourcequotas/testutils.go new file mode 100644 index 00000000000..8acdc43674a --- /dev/null +++ b/cluster-autoscaler/resourcequotas/testutils.go @@ -0,0 +1,73 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +type fakeNodeFilter struct { + NodeFilterFn func(*apiv1.Node) bool +} + +func (f *fakeNodeFilter) ExcludeFromTracking(node *apiv1.Node) bool { + if f.NodeFilterFn == nil { + return false + } + return f.NodeFilterFn(node) +} + +type fakeCustomResourcesProcessor struct { + NodeResourceTargets func(*apiv1.Node) []customresources.CustomResourceTarget +} + +func (f *fakeCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) { + return allNodes, readyNodes +} + +func (f *fakeCustomResourcesProcessor) GetNodeResourceTargets(context *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]customresources.CustomResourceTarget, errors.AutoscalerError) { + if f.NodeResourceTargets == nil { + return nil, nil + } + return f.NodeResourceTargets(node), nil +} + +func (f *fakeCustomResourcesProcessor) CleanUp() { +} + +type fakeQuota struct { + id string + appliesToFn func(*apiv1.Node) bool + limits resourceList +} + +func (f *fakeQuota) ID() string { + return f.id +} + +func (f *fakeQuota) AppliesTo(node *apiv1.Node) bool { + return f.appliesToFn(node) +} + +func (f *fakeQuota) Limits() map[string]int64 { + return f.limits +} diff --git a/cluster-autoscaler/resourcequotas/tracker.go b/cluster-autoscaler/resourcequotas/tracker.go new file mode 100644 index 00000000000..0c70e6c906b --- /dev/null +++ b/cluster-autoscaler/resourcequotas/tracker.go @@ -0,0 +1,189 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/utils" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +const ( + // ResourceNodes is a resource name for number of nodes. + ResourceNodes = "nodes" +) + +// Quota is an interface for a single quota. +type Quota interface { + ID() string + // AppliesTo returns true if the quota applies to the given node. + AppliesTo(node *corev1.Node) bool + // Limits returns the resource limits defined by the quota. + Limits() map[string]int64 +} + +// resourceList is a map of resource names to their quantities. +type resourceList map[string]int64 + +// Tracker tracks resource quotas. +type Tracker struct { + crp customresources.CustomResourcesProcessor + quotaStatuses []*quotaStatus +} + +type quotaStatus struct { + quota Quota + limitsLeft resourceList +} + +// newTracker creates a new Tracker. +func newTracker(crp customresources.CustomResourcesProcessor, quotaStatuses []*quotaStatus) *Tracker { + return &Tracker{ + crp: crp, + quotaStatuses: quotaStatuses, + } +} + +// ApplyDelta checks if a delta is within limits and applies it. Delta is applied only if it can be applied entirely. +func (t *Tracker) ApplyDelta( + ctx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int, +) (*CheckDeltaResult, error) { + delta, err := deltaForNode(ctx, t.crp, node, nodeGroup) + if err != nil { + return nil, err + } + matchingQuotas := t.matchingQuotas(node) + + result := t.checkDelta(delta, matchingQuotas, nodeDelta) + + if result.AllowedDelta != nodeDelta { + return result, nil + } + + for _, ls := range matchingQuotas { + for resource, resourceDelta := range delta { + if limit, ok := ls.limitsLeft[resource]; ok { + ls.limitsLeft[resource] = max(limit-resourceDelta*int64(result.AllowedDelta), 0) + } + } + } + + return result, nil +} + +// CheckDelta checks if a delta is within limits, without applying it. +func (t *Tracker) CheckDelta( + ctx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int, +) (*CheckDeltaResult, error) { + // TODO: cache deltas + delta, err := deltaForNode(ctx, t.crp, node, nodeGroup) + if err != nil { + return nil, err + } + matchingQuotas := t.matchingQuotas(node) + return t.checkDelta(delta, matchingQuotas, nodeDelta), nil +} + +func (t *Tracker) checkDelta(delta resourceList, matchingQuotas []*quotaStatus, nodeDelta int) *CheckDeltaResult { + result := &CheckDeltaResult{ + AllowedDelta: nodeDelta, + } + + for _, qs := range matchingQuotas { + var exceededResources []string + for resource, resourceDelta := range delta { + if resourceDelta <= 0 { + continue + } + + limitsLeft, ok := qs.limitsLeft[resource] + if !ok { + continue + } + + if limitsLeft < resourceDelta*int64(nodeDelta) { + allowedNodes := limitsLeft / resourceDelta + if allowedNodes < int64(result.AllowedDelta) { + result.AllowedDelta = int(allowedNodes) + } + exceededResources = append(exceededResources, resource) + } + } + if len(exceededResources) > 0 { + result.ExceededQuotas = append(result.ExceededQuotas, ExceededQuota{ + ID: qs.quota.ID(), ExceededResources: exceededResources, + }) + } + } + + return result +} + +func (t *Tracker) matchingQuotas(node *corev1.Node) []*quotaStatus { + var limits []*quotaStatus + for _, ls := range t.quotaStatuses { + if ls.quota.AppliesTo(node) { + limits = append(limits, ls) + } + } + return limits +} + +// CheckDeltaResult is a result of checking a delta. +type CheckDeltaResult struct { + // ExceededQuotas contains information about quotas that were exceeded. + ExceededQuotas []ExceededQuota + // AllowedDelta specifies how many nodes could be added without violating the quotas. + AllowedDelta int +} + +// Exceeded returns true if any resource limit was exceeded. +func (r *CheckDeltaResult) Exceeded() bool { + return len(r.ExceededQuotas) > 0 +} + +// ExceededQuota contains information about quota that was exceeded. +type ExceededQuota struct { + ID string + ExceededResources []string +} + +// deltaForNode calculates the amount of resources that will be used from the cluster when creating a node. +func deltaForNode(ctx *context.AutoscalingContext, crp customresources.CustomResourcesProcessor, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) { + // TODO: storage? + nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(node) + nodeResources := resourceList{ + string(corev1.ResourceCPU): nodeCPU, + string(corev1.ResourceMemory): nodeMemory, + ResourceNodes: 1, + } + + resourceTargets, err := crp.GetNodeResourceTargets(ctx, node, nodeGroup) + if err != nil { + return nil, fmt.Errorf("failed to get custom resources: %w", err) + } + + for _, resourceTarget := range resourceTargets { + nodeResources[resourceTarget.ResourceType] = resourceTarget.ResourceCount + } + + return nodeResources, nil +} diff --git a/cluster-autoscaler/resourcequotas/tracker_test.go b/cluster-autoscaler/resourcequotas/tracker_test.go new file mode 100644 index 00000000000..fd3a274f10c --- /dev/null +++ b/cluster-autoscaler/resourcequotas/tracker_test.go @@ -0,0 +1,331 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + apiv1 "k8s.io/api/core/v1" + cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/units" +) + +func TestCheckDelta(t *testing.T) { + testCases := []struct { + name string + tracker *Tracker + node *apiv1.Node + nodeDelta int + wantResult *CheckDeltaResult + wantExceeded bool + }{ + { + name: "delta fits within limits", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 10, "memory": 1000, "nodes": 5}, + }, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "delta exceeds one resource limit", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 1, "memory": 1000, "nodes": 5}, + }, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + ExceededQuotas: []ExceededQuota{ + {ID: "limiter1", ExceededResources: []string{"cpu"}}, + }, + }, + wantExceeded: true, + }, + { + name: "delta exceeds multiple resource limits", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 1, "memory": 300, "nodes": 5}, + }, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + ExceededQuotas: []ExceededQuota{ + {ID: "limiter1", ExceededResources: []string{"cpu", "memory"}}, + }, + }, + wantExceeded: true, + }, + { + name: "no matching quotas", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return false }}, + limitsLeft: resourceList{"cpu": 1, "memory": 100, "nodes": 1}, + }, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "resource in limits but not in the node", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 4, "memory": 32 * units.GiB, "gpu": 2}, + }, + }), + node: test.BuildTestNode("n1", 1000, 2000), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "resource in the node but not in the limits", + tracker: newTracker(&fakeCustomResourcesProcessor{NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget { + return []customresources.CustomResourceTarget{ + { + ResourceType: "gpu", + ResourceCount: 1, + }, + } + }}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 4, "memory": 32 * units.GiB}, + }, + }), + node: test.BuildTestNode("n1", 1000, 2000), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + provider := cptest.NewTestCloudProviderBuilder().Build() + ctx := &context.AutoscalingContext{CloudProvider: provider} + gotResult, err := tc.tracker.CheckDelta(ctx, nil, tc.node, tc.nodeDelta) + if err != nil { + t.Fatalf("CheckDelta() returned an unexpected error: %v", err) + } + + if diff := cmp.Diff(tc.wantResult, gotResult, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff) + } + if gotResult.Exceeded() != tc.wantExceeded { + t.Errorf("Exceeded() mismatch, want: %v, got: %v", tc.wantExceeded, gotResult.Exceeded()) + } + }) + } +} + +func TestApplyDelta(t *testing.T) { + testCases := []struct { + name string + tracker *Tracker + node *apiv1.Node + nodeDelta int + wantResult *CheckDeltaResult + wantLimitsLeft map[string]resourceList + }{ + { + name: "delta applied successfully", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 10, "memory": 1000, "nodes": 5}, + }, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + wantLimitsLeft: map[string]resourceList{ + "limiter1": {"cpu": 8, "memory": 600, "nodes": 3}, + }, + }, + { + name: "partial delta calculated, nothing applied", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 3, "memory": 1000, "nodes": 5}, + }, + }), + node: test.BuildTestNode("n1", 2000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + ExceededQuotas: []ExceededQuota{ + {ID: "limiter1", ExceededResources: []string{"cpu"}}, + }, + }, + wantLimitsLeft: map[string]resourceList{ + "limiter1": {"cpu": 3, "memory": 1000, "nodes": 5}, + }, + }, + { + name: "delta not applied because it exceeds limits", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 1, "memory": 100, "nodes": 5}, + }, + }), + node: test.BuildTestNode("n1", 2000, 200), + nodeDelta: 1, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededQuotas: []ExceededQuota{ + {ID: "limiter1", ExceededResources: []string{"cpu", "memory"}}, + }, + }, + wantLimitsLeft: map[string]resourceList{ + "limiter1": {"cpu": 1, "memory": 100, "nodes": 5}, + }, + }, + { + name: "applied delta results in zero limit", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 2, "memory": 500, "nodes": 10}, + }, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + wantLimitsLeft: map[string]resourceList{ + "limiter1": {"cpu": 0, "memory": 100, "nodes": 8}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + provider := cptest.NewTestCloudProviderBuilder().Build() + ctx := &context.AutoscalingContext{CloudProvider: provider} + gotResult, err := tc.tracker.ApplyDelta(ctx, nil, tc.node, tc.nodeDelta) + if err != nil { + t.Fatalf("ApplyDelta() returned an unexpected error: %v", err) + } + + if diff := cmp.Diff(tc.wantResult, gotResult, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("ApplyDelta() result mismatch (-want +got):\n%s", diff) + } + + gotLimitsLeft := make(map[string]resourceList) + for _, ls := range tc.tracker.quotaStatuses { + gotLimitsLeft[ls.quota.ID()] = ls.limitsLeft + } + if diff := cmp.Diff(tc.wantLimitsLeft, gotLimitsLeft, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("ApplyDelta() limitsLeft mismatch (-want +got):\n%s", diff) + } + + }) + } +} + +func TestDeltaForNode(t *testing.T) { + testCases := []struct { + name string + node *apiv1.Node + crp customresources.CustomResourcesProcessor + wantDelta resourceList + }{ + { + name: "node just with CPU and memory", + node: test.BuildTestNode("test", 1000, 2048), + crp: &fakeCustomResourcesProcessor{}, + wantDelta: resourceList{ + "cpu": 1, + "memory": 2048, + "nodes": 1, + }, + }, + { + // nodes should not have milliCPUs in the capacity, so we round it up + // to the nearest integer. + name: "node just with CPU and memory, milli cores rounded up", + node: test.BuildTestNode("test", 2500, 4096), + crp: &fakeCustomResourcesProcessor{}, + wantDelta: resourceList{ + "cpu": 3, + "memory": 4096, + "nodes": 1, + }, + }, + { + name: "node with custom resources", + node: test.BuildTestNode("test", 1000, 2048), + crp: &fakeCustomResourcesProcessor{NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget { + return []customresources.CustomResourceTarget{ + { + ResourceType: "gpu", + ResourceCount: 1, + }, + } + }}, + wantDelta: resourceList{ + "cpu": 1, + "memory": 2048, + "gpu": 1, + "nodes": 1, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := &context.AutoscalingContext{} + delta, err := deltaForNode(ctx, tc.crp, tc.node, nil) + if err != nil { + t.Errorf("deltaForNode: unexpected error: %v", err) + } + if diff := cmp.Diff(tc.wantDelta, delta); diff != "" { + t.Errorf("delta mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/cluster-autoscaler/resourcequotas/usage.go b/cluster-autoscaler/resourcequotas/usage.go new file mode 100644 index 00000000000..484e8aab679 --- /dev/null +++ b/cluster-autoscaler/resourcequotas/usage.go @@ -0,0 +1,75 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +// NodeFilter customizes what nodes should be included in usage calculations. +type NodeFilter interface { + // ExcludeFromTracking returns true if the node should be excluded from usage calculations. + ExcludeFromTracking(node *corev1.Node) bool +} + +type usageCalculator struct { + crp customresources.CustomResourcesProcessor + nodeFilter NodeFilter +} + +func newUsageCalculator(crp customresources.CustomResourcesProcessor, nodeFilter NodeFilter) *usageCalculator { + return &usageCalculator{ + crp: crp, + nodeFilter: nodeFilter, + } +} + +// calculateUsages calculates resources used by nodes for every quota. +// Returns a map with quota ID as a key and resources used in the corresponding quota as a value. +func (u *usageCalculator) calculateUsages(ctx *context.AutoscalingContext, nodes []*corev1.Node, quotas []Quota) (map[string]resourceList, error) { + usages := make(map[string]resourceList) + for _, rl := range quotas { + usages[rl.ID()] = make(resourceList) + } + + for _, node := range nodes { + if u.nodeFilter != nil && u.nodeFilter.ExcludeFromTracking(node) { + continue + } + + ng, err := ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + return nil, fmt.Errorf("failed to get node group for node %q: %w", node.Name, err) + } + delta, err := deltaForNode(ctx, u.crp, node, ng) + if err != nil { + return nil, err + } + for _, rq := range quotas { + if rq.AppliesTo(node) { + for resourceType, resourceCount := range delta { + usages[rq.ID()][resourceType] += resourceCount + } + } + } + } + return usages, nil +} diff --git a/cluster-autoscaler/resourcequotas/usage_test.go b/cluster-autoscaler/resourcequotas/usage_test.go new file mode 100644 index 00000000000..fc8160eccfb --- /dev/null +++ b/cluster-autoscaler/resourcequotas/usage_test.go @@ -0,0 +1,223 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + apiv1 "k8s.io/api/core/v1" + cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestCalculateUsages(t *testing.T) { + testCases := []struct { + name string + nodes []*apiv1.Node + quotas []Quota + nodeFilter func(node *apiv1.Node) bool + customTargets map[string][]customresources.CustomResourceTarget + wantUsages map[string]resourceList + }{ + { + name: "cluster-wide limiter, no node filter", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2000), + test.BuildTestNode("n2", 2000, 4000), + test.BuildTestNode("n3", 3000, 8000), + }, + quotas: []Quota{ + &fakeQuota{ + id: "cluster-wide", + appliesToFn: includeAll, + }, + }, + wantUsages: map[string]resourceList{ + "cluster-wide": { + "cpu": 6, + "memory": 14000, + "nodes": 3, + }, + }, + }, + { + name: "multiple quotas", + nodes: []*apiv1.Node{ + addLabel(test.BuildTestNode("n1", 1000, 2000), "pool", "a"), + addLabel(test.BuildTestNode("n2", 2000, 4000), "pool", "b"), + addLabel(test.BuildTestNode("n3", 3000, 8000), "pool", "a"), + }, + quotas: []Quota{ + &fakeQuota{ + id: "pool-a", + appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, + }, + &fakeQuota{ + id: "pool-b", + appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, + }, + }, + wantUsages: map[string]resourceList{ + "pool-a": { + "cpu": 4, + "memory": 10000, + "nodes": 2, + }, + "pool-b": { + "cpu": 2, + "memory": 4000, + "nodes": 1, + }, + }, + }, + { + name: "with node filter", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2000), + test.BuildTestNode("n2", 2000, 4000), + test.BuildTestNode("n3", 3000, 8000), + }, + quotas: []Quota{ + &fakeQuota{ + id: "cluster-wide", + appliesToFn: includeAll, + }, + }, + nodeFilter: func(node *apiv1.Node) bool { return node.Name == "n2" }, + wantUsages: map[string]resourceList{ + "cluster-wide": { + "cpu": 4, + "memory": 10000, + "nodes": 2, + }, + }, + }, + { + name: "limiter doesn't match any node", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2000), + }, + quotas: []Quota{ + &fakeQuota{ + id: "no-match", + appliesToFn: func(node *apiv1.Node) bool { return false }, + }, + }, + wantUsages: map[string]resourceList{ + "no-match": {}, + }, + }, + { + name: "with custom resources", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2000), + test.BuildTestNode("n2", 2000, 4000), + }, + quotas: []Quota{ + &fakeQuota{ + id: "cluster-wide", + appliesToFn: includeAll, + }, + }, + customTargets: map[string][]customresources.CustomResourceTarget{ + "n1": { + {ResourceType: "gpu", ResourceCount: 2}, + }, + }, + wantUsages: map[string]resourceList{ + "cluster-wide": { + "cpu": 3, + "memory": 6000, + "gpu": 2, + "nodes": 2, + }, + }, + }, + { + name: "multiple quotas and node filter", + nodes: []*apiv1.Node{ + addLabel(test.BuildTestNode("n1", 1000, 2000), "pool", "a"), + addLabel(test.BuildTestNode("n2", 2000, 4000), "pool", "b"), + addLabel(test.BuildTestNode("n3", 3000, 8000), "pool", "a"), + }, + quotas: []Quota{ + &fakeQuota{ + id: "pool-a", + appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, + }, + &fakeQuota{ + id: "pool-b", + appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, + }, + }, + nodeFilter: func(node *apiv1.Node) bool { return node.Name == "n3" }, + wantUsages: map[string]resourceList{ + "pool-a": { + "cpu": 1, + "memory": 2000, + "nodes": 1, + }, + "pool-b": { + "cpu": 2, + "memory": 4000, + "nodes": 1, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + provider := cptest.NewTestCloudProviderBuilder().Build() + ctx := &context.AutoscalingContext{CloudProvider: provider} + crp := &fakeCustomResourcesProcessor{ + NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget { + if tc.customTargets == nil { + return nil + } + return tc.customTargets[node.Name] + }, + } + var nf NodeFilter + if tc.nodeFilter != nil { + nf = &fakeNodeFilter{NodeFilterFn: tc.nodeFilter} + } + calculator := newUsageCalculator(crp, nf) + usages, err := calculator.calculateUsages(ctx, tc.nodes, tc.quotas) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if diff := cmp.Diff(tc.wantUsages, usages); diff != "" { + t.Errorf("calculateUsages() mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func includeAll(node *apiv1.Node) bool { + return true +} + +func addLabel(node *apiv1.Node, key, value string) *apiv1.Node { + if node.Labels == nil { + node.Labels = make(map[string]string) + } + node.Labels[key] = value + return node +}