From 5ddb228a72a2ac759e67cfd085a8f7f1a89529fe Mon Sep 17 00:00:00 2001 From: Norbert Cyran Date: Fri, 26 Sep 2025 13:15:36 +0200 Subject: [PATCH 1/2] Add support for multiple resource limits --- .../cloudprovider/resource_limiter.go | 20 +- cluster-autoscaler/resourcelimits/factory.go | 104 ++++++ .../resourcelimits/factory_test.go | 309 ++++++++++++++++++ cluster-autoscaler/resourcelimits/provider.go | 26 ++ .../resourcelimits/provider_test.go | 34 ++ .../resourcelimits/testutils.go | 86 +++++ cluster-autoscaler/resourcelimits/tracker.go | 166 ++++++++++ .../resourcelimits/tracker_test.go | 236 +++++++++++++ cluster-autoscaler/resourcelimits/usage.go | 56 ++++ .../resourcelimits/usage_test.go | 207 ++++++++++++ 10 files changed, 1243 insertions(+), 1 deletion(-) create mode 100644 cluster-autoscaler/resourcelimits/factory.go create mode 100644 cluster-autoscaler/resourcelimits/factory_test.go create mode 100644 cluster-autoscaler/resourcelimits/provider.go create mode 100644 cluster-autoscaler/resourcelimits/provider_test.go create mode 100644 cluster-autoscaler/resourcelimits/testutils.go create mode 100644 cluster-autoscaler/resourcelimits/tracker.go create mode 100644 cluster-autoscaler/resourcelimits/tracker_test.go create mode 100644 cluster-autoscaler/resourcelimits/usage.go create mode 100644 cluster-autoscaler/resourcelimits/usage_test.go diff --git a/cluster-autoscaler/cloudprovider/resource_limiter.go b/cluster-autoscaler/cloudprovider/resource_limiter.go index 56143017994a..eb07c2ff6b21 100644 --- a/cluster-autoscaler/cloudprovider/resource_limiter.go +++ b/cluster-autoscaler/cloudprovider/resource_limiter.go @@ -18,9 +18,11 @@ package cloudprovider import ( "fmt" - "k8s.io/apimachinery/pkg/util/sets" "math" "strings" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" ) // ResourceLimiter contains limits (max, min) for resources (cores, memory etc.). @@ -29,6 +31,10 @@ type ResourceLimiter struct { maxLimits map[string]int64 } +func (r *ResourceLimiter) ID() string { + return "cluster-wide" +} + // NewResourceLimiter creates new ResourceLimiter for map. Maps are deep copied. func NewResourceLimiter(minLimits map[string]int64, maxLimits map[string]int64) *ResourceLimiter { minLimitsCopy := make(map[string]int64) @@ -88,3 +94,15 @@ func (r *ResourceLimiter) String() string { } return strings.Join(resourceDetails, ", ") } + +func (r *ResourceLimiter) AppliesTo(node *apiv1.Node) bool { + return true +} + +func (r *ResourceLimiter) MaxLimits() map[string]int64 { + return r.maxLimits +} + +func (r *ResourceLimiter) MinLimits() map[string]int64 { + return r.minLimits +} diff --git a/cluster-autoscaler/resourcelimits/factory.go b/cluster-autoscaler/resourcelimits/factory.go new file mode 100644 index 000000000000..5008d8fa671a --- /dev/null +++ b/cluster-autoscaler/resourcelimits/factory.go @@ -0,0 +1,104 @@ +package resourcelimits + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +// TrackerFactory builds trackers. +type TrackerFactory struct { + crp customresources.CustomResourcesProcessor + limitProviders []Provider + usageCalculator *usageCalculator +} + +// NewTrackerFactory creates a new TrackerFactory. +func NewTrackerFactory(opts TrackerOptions) *TrackerFactory { + uc := newUsageCalculator(opts.CRP, opts.NodeFilter) + return &TrackerFactory{ + crp: opts.CRP, + limitProviders: opts.Providers, + usageCalculator: uc, + } +} + +// NewMaxLimitsTracker builds a new Tracker for max limits. +func (f *TrackerFactory) NewMaxLimitsTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node) (*Tracker, error) { + return f.newLimitsTracker(ctx, nodes, &maxLimitsStrategy{}) +} + +// NewMinLimitsTracker builds a new Tracker for min limits. +func (f *TrackerFactory) NewMinLimitsTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node) (*Tracker, error) { + return f.newLimitsTracker(ctx, nodes, &minLimitsStrategy{}) +} + +func (f *TrackerFactory) newLimitsTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node, strategy limitStrategy) (*Tracker, error) { + limiters, err := f.limiters() + if err != nil { + return nil, err + } + usages, err := f.usageCalculator.calculateUsages(ctx, nodes, limiters) + if err != nil { + return nil, err + } + limitsLeft := make(map[string]resourceList) + for _, rl := range limiters { + limitsLeft[rl.ID()] = make(resourceList) + limits := strategy.GetLimits(rl) + for resourceType, limit := range limits { + usage := usages[rl.ID()][resourceType] + limitsLeft[rl.ID()][resourceType] = strategy.CalculateLimitsLeft(limit, usage) + } + } + tracker := newTracker(f.crp, limiters, limitsLeft) + return tracker, nil +} + +func (f *TrackerFactory) limiters() ([]Limiter, error) { + var limiters []Limiter + for _, provider := range f.limitProviders { + provLimiters, err := provider.AllLimiters() + if err != nil { + return nil, fmt.Errorf("failed to get limiters from provider: %w", err) + } + for _, limiter := range provLimiters { + limiters = append(limiters, limiter) + } + } + return limiters, nil +} + +// limitStrategy is an interface for defining limit calculation strategies. +type limitStrategy interface { + GetLimits(rl Limiter) resourceList + CalculateLimitsLeft(limit, usage int64) int64 +} + +// maxLimitsStrategy is a strategy for max limits. +type maxLimitsStrategy struct{} + +// GetLimits returns max limits. +func (s *maxLimitsStrategy) GetLimits(rl Limiter) resourceList { + return rl.MaxLimits() +} + +// CalculateLimitsLeft calculates the remaining limits for max limits. +func (s *maxLimitsStrategy) CalculateLimitsLeft(limit, usage int64) int64 { + return max(0, limit-usage) +} + +// minLimitsStrategy is a strategy for min limits. +type minLimitsStrategy struct{} + +// GetLimits returns min limits. +func (s *minLimitsStrategy) GetLimits(rl Limiter) resourceList { + return rl.MinLimits() +} + +// CalculateLimitsLeft calculates the remaining limits for min limits. +func (s *minLimitsStrategy) CalculateLimitsLeft(limit, usage int64) int64 { + return max(0, usage-limit) +} diff --git a/cluster-autoscaler/resourcelimits/factory_test.go b/cluster-autoscaler/resourcelimits/factory_test.go new file mode 100644 index 000000000000..b633ee6a18be --- /dev/null +++ b/cluster-autoscaler/resourcelimits/factory_test.go @@ -0,0 +1,309 @@ +package resourcelimits + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/units" +) + +type nodeExcludeFn func(node *apiv1.Node) bool + +func (n nodeExcludeFn) ExcludeFromTracking(node *apiv1.Node) bool { + return n(node) +} + +func TestMaxLimitsTracker(t *testing.T) { + testCases := []struct { + name string + crp customresources.CustomResourcesProcessor + nodeFilter NodeFilter + nodes []*apiv1.Node + limits map[string]int64 + newNode *apiv1.Node + nodeDelta int + wantResult *CheckDeltaResult + }{ + { + name: "default config allowed operation", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 12, + "memory": 32 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "default config exceeded operation", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 6, + "memory": 16 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededResources: map[string][]string{ + "cluster-wide": {"cpu", "memory"}, + }, + }, + }, + { + name: "default config partially allowed operation", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 7, + "memory": 16 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededResources: map[string][]string{ + "cluster-wide": {"cpu", "memory"}, + }, + }, + }, + { + name: "custom resource config allowed operation", + crp: &fakeCustomResourcesProcessor{ + NodeResourceTargets: func(n *apiv1.Node) []customresources.CustomResourceTarget { + if n.Name == "n1" { + return []customresources.CustomResourceTarget{ + { + ResourceType: "gpu", + ResourceCount: 1, + }, + } + } + return nil + }, + }, + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 12, + "memory": 32 * units.GiB, + "gpu": 6, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "custom resource config exceeded operation", + crp: &fakeCustomResourcesProcessor{ + NodeResourceTargets: func(n *apiv1.Node) []customresources.CustomResourceTarget { + if n.Name == "n1" || n.Name == "n4" { + return []customresources.CustomResourceTarget{ + { + ResourceType: "gpu", + ResourceCount: 1, + }, + } + } + return nil + }, + }, + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 12, + "memory": 32 * units.GiB, + "gpu": 1, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededResources: map[string][]string{ + "cluster-wide": {"gpu"}, + }, + }, + }, + { + name: "node filter config allowed operation", + nodeFilter: nodeExcludeFn(func(node *apiv1.Node) bool { + return node.Name == "n3" + }), + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 4, + "memory": 8 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 1000, 2*units.GiB), + nodeDelta: 1, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + }, + }, + { + name: "node filter config exceeded operation", + nodeFilter: nodeExcludeFn(func(node *apiv1.Node) bool { + return node.Name == "n3" + }), + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 4, + "memory": 8 * units.GiB, + }, + newNode: test.BuildTestNode("n4", 2000, 4*units.GiB), + nodeDelta: 1, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededResources: map[string][]string{ + "cluster-wide": {"cpu", "memory"}, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cloudProvider := cptest.NewTestCloudProviderBuilder().Build() + resourceLimiter := cloudprovider.NewResourceLimiter(nil, tc.limits) + cloudProvider.SetResourceLimiter(resourceLimiter) + ctx := &context.AutoscalingContext{CloudProvider: cloudProvider} + crp := tc.crp + if crp == nil { + crp = &fakeCustomResourcesProcessor{} + } + factory := NewTrackerFactory(TrackerOptions{ + CRP: crp, + Providers: []Provider{NewCloudLimitersProvider(cloudProvider)}, + NodeFilter: tc.nodeFilter, + }) + tracker, err := factory.NewMaxLimitsTracker(ctx, tc.nodes) + if err != nil { + t.Errorf("failed to create tracker: %v", err) + } + var ng cloudprovider.NodeGroup + result, err := tracker.CheckDelta(ctx, ng, tc.newNode, tc.nodeDelta) + if err != nil { + t.Errorf("failed to check delta: %v", err) + } + if diff := cmp.Diff(tc.wantResult, result, cmpopts.SortSlices(func(a, b string) bool { return a < b }), cmpopts.EquateEmpty()); diff != "" { + t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestMinLimitsTracker(t *testing.T) { + testCases := []struct { + name string + crp customresources.CustomResourcesProcessor + nodeFilter NodeFilter + nodes []*apiv1.Node + limits map[string]int64 + newNode *apiv1.Node + nodeDelta int + wantResult *CheckDeltaResult + }{ + { + name: "default config allowed operation", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 1, + "memory": 2 * units.GiB, + }, + newNode: test.BuildTestNode("n1", 1000, 2*units.GiB), + nodeDelta: 1, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + }, + }, + { + name: "default config exceeded operation", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2*units.GiB), + test.BuildTestNode("n2", 2000, 4*units.GiB), + test.BuildTestNode("n3", 3000, 8*units.GiB), + }, + limits: map[string]int64{ + "cpu": 6, + "memory": 10 * units.GiB, + }, + newNode: test.BuildTestNode("n3", 3000, 8*units.GiB), + nodeDelta: 1, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededResources: map[string][]string{ + "cluster-wide": {"cpu", "memory"}, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + cloudProvider := cptest.NewTestCloudProviderBuilder().Build() + resourceLimiter := cloudprovider.NewResourceLimiter(tc.limits, nil) + cloudProvider.SetResourceLimiter(resourceLimiter) + ctx := &context.AutoscalingContext{CloudProvider: cloudProvider} + crp := tc.crp + if crp == nil { + crp = &fakeCustomResourcesProcessor{} + } + factory := NewTrackerFactory(TrackerOptions{ + CRP: crp, + Providers: []Provider{NewCloudLimitersProvider(cloudProvider)}, + NodeFilter: tc.nodeFilter, + }) + tracker, err := factory.NewMinLimitsTracker(ctx, tc.nodes) + if err != nil { + t.Errorf("failed to create tracker: %v", err) + } + var ng cloudprovider.NodeGroup + result, err := tracker.CheckDelta(ctx, ng, tc.newNode, tc.nodeDelta) + if err != nil { + t.Errorf("failed to check delta: %v", err) + } + if diff := cmp.Diff(tc.wantResult, result, cmpopts.SortSlices(func(a, b string) bool { return a < b }), cmpopts.EquateEmpty()); diff != "" { + t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/cluster-autoscaler/resourcelimits/provider.go b/cluster-autoscaler/resourcelimits/provider.go new file mode 100644 index 000000000000..abbbd87aedfc --- /dev/null +++ b/cluster-autoscaler/resourcelimits/provider.go @@ -0,0 +1,26 @@ +package resourcelimits + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +type Provider interface { + AllLimiters() ([]Limiter, error) +} +type CloudLimitersProvider struct { + cloudProvider cloudprovider.CloudProvider +} + +func (p *CloudLimitersProvider) AllLimiters() ([]Limiter, error) { + rl, err := p.cloudProvider.GetResourceLimiter() + if err != nil { + return nil, err + } + return []Limiter{rl}, nil +} + +func NewCloudLimitersProvider(cloudProvider cloudprovider.CloudProvider) *CloudLimitersProvider { + return &CloudLimitersProvider{ + cloudProvider: cloudProvider, + } +} diff --git a/cluster-autoscaler/resourcelimits/provider_test.go b/cluster-autoscaler/resourcelimits/provider_test.go new file mode 100644 index 000000000000..cf88ba4085b0 --- /dev/null +++ b/cluster-autoscaler/resourcelimits/provider_test.go @@ -0,0 +1,34 @@ +package resourcelimits + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/units" +) + +func TestCloudLimitersProvider(t *testing.T) { + cloudProvider := test.NewTestCloudProviderBuilder().Build() + minLimits := map[string]int64{"cpu": 2, "memory": 8 * units.GiB} + maxLimits := map[string]int64{"cpu": 4, "memory": 16 * units.GiB} + resourceLimiter := cloudprovider.NewResourceLimiter(minLimits, maxLimits) + cloudProvider.SetResourceLimiter(resourceLimiter) + + limitsProvider := NewCloudLimitersProvider(cloudProvider) + limiters, err := limitsProvider.AllLimiters() + if err != nil { + t.Errorf("failed to get limiters: %v", err) + } + if len(limiters) != 1 { + t.Errorf("got %d limiters, expected 1", len(limiters)) + } + limiter := limiters[0] + if diff := cmp.Diff(minLimits, limiter.MinLimits()); diff != "" { + t.Errorf("MinLimits() mismatch (-want +got):\n%s", diff) + } + if diff := cmp.Diff(maxLimits, limiter.MaxLimits()); diff != "" { + t.Errorf("MaxLimits() mismatch (-want +got):\n%s", diff) + } +} diff --git a/cluster-autoscaler/resourcelimits/testutils.go b/cluster-autoscaler/resourcelimits/testutils.go new file mode 100644 index 000000000000..ab25740f75e5 --- /dev/null +++ b/cluster-autoscaler/resourcelimits/testutils.go @@ -0,0 +1,86 @@ +package resourcelimits + +import ( + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + drasnapshot "k8s.io/autoscaler/cluster-autoscaler/simulator/dynamicresources/snapshot" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" +) + +// fakeProvider is a fake implementation of a resource limit provider. +type fakeProvider struct { + limiters []Limiter +} + +// NewFakeProvider creates a new fakeProvider. +func NewFakeProvider() *fakeProvider { + return &fakeProvider{} +} + +// AddLimiter adds a limiter to the provider. +func (p *fakeProvider) AddLimiter(limiter Limiter) { + p.limiters = append(p.limiters, limiter) +} + +// AllLimiters returns all limiters from the provider. +func (p *fakeProvider) AllLimiters() ([]Limiter, error) { + return p.limiters, nil +} + +type fakeNodeFilter struct { + NodeFilterFn func(*apiv1.Node) bool +} + +func (f *fakeNodeFilter) ExcludeFromTracking(node *apiv1.Node) bool { + if f.NodeFilterFn == nil { + return false + } + return f.NodeFilterFn(node) +} + +type fakeCustomResourcesProcessor struct { + NodeResourceTargets func(*apiv1.Node) []customresources.CustomResourceTarget + GetNodeResourceTargetsError errors.AutoscalerError +} + +func (f *fakeCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) { + return allNodes, readyNodes +} + +func (f *fakeCustomResourcesProcessor) GetNodeResourceTargets(context *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]customresources.CustomResourceTarget, errors.AutoscalerError) { + if f.GetNodeResourceTargetsError != nil { + return nil, f.GetNodeResourceTargetsError + } + if f.NodeResourceTargets == nil { + return nil, nil + } + return f.NodeResourceTargets(node), nil +} + +func (f *fakeCustomResourcesProcessor) CleanUp() { +} + +type fakeLimiter struct { + id string + appliesToFn func(*apiv1.Node) bool + minLimits resourceList + maxLimits resourceList +} + +func (f *fakeLimiter) ID() string { + return f.id +} + +func (f *fakeLimiter) AppliesTo(node *apiv1.Node) bool { + return f.appliesToFn(node) +} + +func (f *fakeLimiter) MaxLimits() map[string]int64 { + return f.maxLimits +} + +func (f *fakeLimiter) MinLimits() map[string]int64 { + return f.minLimits +} diff --git a/cluster-autoscaler/resourcelimits/tracker.go b/cluster-autoscaler/resourcelimits/tracker.go new file mode 100644 index 000000000000..56772b0b5622 --- /dev/null +++ b/cluster-autoscaler/resourcelimits/tracker.go @@ -0,0 +1,166 @@ +package resourcelimits + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/utils" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +const ( + ResourceNameCores = "cpu" + ResourceNameMemory = "memory" + ResourceNameNodes = "nodes" +) + +// Limiter is an interface for a single resource limit. +type Limiter interface { + ID() string + AppliesTo(node *corev1.Node) bool + MaxLimits() map[string]int64 + MinLimits() map[string]int64 +} + +// resourceList is a map of resource names to their quantities. +type resourceList map[string]int64 + +// Tracker tracks resource limits. A single tracker should track either max or min limits. +type Tracker struct { + crp customresources.CustomResourcesProcessor + limiters []Limiter + limitsLeft map[string]resourceList +} + +// newTracker creates a new Tracker. +func newTracker(crp customresources.CustomResourcesProcessor, limiters []Limiter, limitsLeft map[string]resourceList) *Tracker { + return &Tracker{ + crp: crp, + limiters: limiters, + limitsLeft: limitsLeft, + } +} + +type TrackerOptions struct { + CRP customresources.CustomResourcesProcessor + Providers []Provider + NodeFilter NodeFilter +} + +// ApplyDelta checks if a delta is within limits and applies it. +func (t *Tracker) ApplyDelta( + ctx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int, +) (*CheckDeltaResult, error) { + delta, err := deltaForNode(ctx, t.crp, node, nodeGroup) + if err != nil { + return nil, err + } + matchingLimiters := t.matchingLimiters(node) + + result := t.checkDelta(delta, matchingLimiters, nodeDelta) + + if result.AllowedDelta != nodeDelta { + return result, nil + } + + for _, rl := range matchingLimiters { + if t.limitsLeft[rl.ID()] == nil { + continue + } + for resource, resourceDelta := range delta { + if limit, ok := t.limitsLeft[rl.ID()][resource]; ok { + t.limitsLeft[rl.ID()][resource] = max(limit-resourceDelta*int64(result.AllowedDelta), 0) + } + } + } + + return result, nil +} + +// CheckDelta checks if a delta is within limits, without applying it. +func (t *Tracker) CheckDelta( + ctx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int, +) (*CheckDeltaResult, error) { + // TODO: cache deltas + delta, err := deltaForNode(ctx, t.crp, node, nodeGroup) + if err != nil { + return nil, err + } + matchingLimiters := t.matchingLimiters(node) + return t.checkDelta(delta, matchingLimiters, nodeDelta), nil +} + +func (t *Tracker) checkDelta(delta resourceList, matchingLimiters []Limiter, nodeDelta int) *CheckDeltaResult { + result := &CheckDeltaResult{ + AllowedDelta: nodeDelta, + } + + exceededResources := make(map[string][]string) + for _, rl := range matchingLimiters { + limiterLimitsLeft := t.limitsLeft[rl.ID()] + for resource, resourceDelta := range delta { + if resourceDelta <= 0 { + continue + } + + limitsLeft, ok := limiterLimitsLeft[resource] + if !ok { + continue + } + + if limitsLeft < resourceDelta*int64(nodeDelta) { + allowedNodes := limitsLeft / resourceDelta + if allowedNodes < int64(result.AllowedDelta) { + result.AllowedDelta = int(allowedNodes) + } + exceededResources[rl.ID()] = append(exceededResources[rl.ID()], resource) + } + } + } + result.ExceededResources = exceededResources + return result +} + +func (t *Tracker) matchingLimiters(node *corev1.Node) []Limiter { + var matchingLimiters []Limiter + for _, rl := range t.limiters { + if rl.AppliesTo(node) { + matchingLimiters = append(matchingLimiters, rl) + } + } + return matchingLimiters +} + +// CheckDeltaResult is a result of checking a delta. +type CheckDeltaResult struct { + ExceededResources map[string][]string + AllowedDelta int +} + +// Exceeded returns true if any resource limit was exceeded. +func (r *CheckDeltaResult) Exceeded() bool { + return len(r.ExceededResources) > 0 +} + +// deltaForNode calculates the amount of resources that will be used from the cluster when creating a node. +func deltaForNode(ctx *context.AutoscalingContext, crp customresources.CustomResourcesProcessor, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) { + nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(node) + nodeResources := resourceList{ + ResourceNameCores: nodeCPU, + ResourceNameMemory: nodeMemory, + ResourceNameNodes: 1, + } + + resourceTargets, err := crp.GetNodeResourceTargets(ctx, node, nodeGroup) + if err != nil { + return nil, fmt.Errorf("failed to get custom resources: %w", err) + } + + for _, resourceTarget := range resourceTargets { + nodeResources[resourceTarget.ResourceType] = resourceTarget.ResourceCount + } + + return nodeResources, nil +} diff --git a/cluster-autoscaler/resourcelimits/tracker_test.go b/cluster-autoscaler/resourcelimits/tracker_test.go new file mode 100644 index 000000000000..fe6f1799df37 --- /dev/null +++ b/cluster-autoscaler/resourcelimits/tracker_test.go @@ -0,0 +1,236 @@ +package resourcelimits + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + apiv1 "k8s.io/api/core/v1" + cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestCheckDelta(t *testing.T) { + testCases := []struct { + name string + tracker *Tracker + node *apiv1.Node + nodeDelta int + wantResult *CheckDeltaResult + }{ + { + name: "delta fits within limits", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ + "limiter1": {"cpu": 10, "memory": 1000, "nodes": 5}, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "delta exceeds one resource limit", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ + "limiter1": {"cpu": 1, "memory": 1000, "nodes": 5}, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + ExceededResources: map[string][]string{"limiter1": {"cpu"}}, + }, + }, + { + name: "delta exceeds multiple resource limits", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ + "limiter1": {"cpu": 1, "memory": 300, "nodes": 5}, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + ExceededResources: map[string][]string{"limiter1": {"cpu", "memory"}}, + }, + }, + { + name: "no matching limiters", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return false }}}, map[string]resourceList{ + "limiter1": {"cpu": 1, "memory": 100, "nodes": 1}, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + provider := cptest.NewTestCloudProviderBuilder().Build() + ctx := &context.AutoscalingContext{CloudProvider: provider} + gotResult, err := tc.tracker.CheckDelta(ctx, nil, tc.node, tc.nodeDelta) + if err != nil { + t.Fatalf("CheckDelta() returned an unexpected error: %v", err) + } + + if diff := cmp.Diff(tc.wantResult, gotResult, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestApplyDelta(t *testing.T) { + testCases := []struct { + name string + tracker *Tracker + node *apiv1.Node + nodeDelta int + wantResult *CheckDeltaResult + wantLimitsLeft map[string]resourceList + }{ + { + name: "delta applied successfully", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ + "limiter1": {"cpu": 10, "memory": 1000, "nodes": 5}, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + wantLimitsLeft: map[string]resourceList{ + "limiter1": {"cpu": 8, "memory": 600, "nodes": 3}, + }, + }, + { + name: "partial delta calculated, nothing applied", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ + "limiter1": {"cpu": 3, "memory": 1000, "nodes": 5}, + }), + node: test.BuildTestNode("n1", 2000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 1, + ExceededResources: map[string][]string{"limiter1": {"cpu"}}, + }, + wantLimitsLeft: map[string]resourceList{ + "limiter1": {"cpu": 3, "memory": 1000, "nodes": 5}, + }, + }, + { + name: "delta not applied because it exceeds limits", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ + "limiter1": {"cpu": 1, "memory": 100, "nodes": 5}, + }), + node: test.BuildTestNode("n1", 2000, 200), + nodeDelta: 1, + wantResult: &CheckDeltaResult{ + AllowedDelta: 0, + ExceededResources: map[string][]string{"limiter1": {"cpu", "memory"}}, + }, + wantLimitsLeft: map[string]resourceList{ + "limiter1": {"cpu": 1, "memory": 100, "nodes": 5}, + }, + }, + { + name: "applied delta results in zero limit", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ + "limiter1": {"cpu": 2, "memory": 500, "nodes": 10}, + }), + node: test.BuildTestNode("n1", 1000, 200), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + wantLimitsLeft: map[string]resourceList{ + "limiter1": {"cpu": 0, "memory": 100, "nodes": 8}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + provider := cptest.NewTestCloudProviderBuilder().Build() + ctx := &context.AutoscalingContext{CloudProvider: provider} + gotResult, err := tc.tracker.ApplyDelta(ctx, nil, tc.node, tc.nodeDelta) + if err != nil { + t.Fatalf("ApplyDelta() returned an unexpected error: %v", err) + } + + if diff := cmp.Diff(tc.wantResult, gotResult, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("ApplyDelta() result mismatch (-want +got):\n%s", diff) + } + + if diff := cmp.Diff(tc.wantLimitsLeft, tc.tracker.limitsLeft, cmpopts.EquateEmpty()); diff != "" { + t.Errorf("ApplyDelta() limitsLeft mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func TestDeltaForNode(t *testing.T) { + testCases := []struct { + name string + node *apiv1.Node + crp customresources.CustomResourcesProcessor + wantDelta resourceList + }{ + { + name: "node just with CPU and memory", + node: test.BuildTestNode("test", 1000, 2048), + crp: &fakeCustomResourcesProcessor{}, + wantDelta: resourceList{ + "cpu": 1, + "memory": 2048, + "nodes": 1, + }, + }, + { + // nodes should not have milliCPUs in the capacity, so we round it up + // to the nearest integer. + name: "node just with CPU and memory, milli cores rounded up", + node: test.BuildTestNode("test", 2500, 4096), + crp: &fakeCustomResourcesProcessor{}, + wantDelta: resourceList{ + "cpu": 3, + "memory": 4096, + "nodes": 1, + }, + }, + { + name: "node with custom resources", + node: test.BuildTestNode("test", 1000, 2048), + crp: &fakeCustomResourcesProcessor{NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget { + return []customresources.CustomResourceTarget{ + { + ResourceType: "gpu", + ResourceCount: 1, + }, + } + }}, + wantDelta: resourceList{ + "cpu": 1, + "memory": 2048, + "gpu": 1, + "nodes": 1, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := &context.AutoscalingContext{} + delta, err := deltaForNode(ctx, tc.crp, tc.node, nil) + if err != nil { + t.Errorf("deltaForNode: unexpected error: %v", err) + } + if diff := cmp.Diff(tc.wantDelta, delta); diff != "" { + t.Errorf("delta mismatch (-want +got):\n%s", diff) + } + }) + } +} diff --git a/cluster-autoscaler/resourcelimits/usage.go b/cluster-autoscaler/resourcelimits/usage.go new file mode 100644 index 000000000000..7a9ec69487e7 --- /dev/null +++ b/cluster-autoscaler/resourcelimits/usage.go @@ -0,0 +1,56 @@ +package resourcelimits + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +// NodeFilter is an interface for filtering nodes which should be included in usage calculations. +type NodeFilter interface { + ExcludeFromTracking(node *corev1.Node) bool +} + +type usageCalculator struct { + crp customresources.CustomResourcesProcessor + nodeFilter NodeFilter +} + +func newUsageCalculator(crp customresources.CustomResourcesProcessor, nodeFilter NodeFilter) *usageCalculator { + return &usageCalculator{ + crp: crp, + nodeFilter: nodeFilter, + } +} + +func (u *usageCalculator) calculateUsages(ctx *context.AutoscalingContext, nodes []*corev1.Node, limiters []Limiter) (map[string]resourceList, error) { + usages := make(map[string]resourceList) + for _, rl := range limiters { + usages[rl.ID()] = make(resourceList) + } + + for _, node := range nodes { + if u.nodeFilter != nil && u.nodeFilter.ExcludeFromTracking(node) { + continue + } + + ng, err := ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + return nil, fmt.Errorf("failed to get node group for node %q: %w", node.Name, err) + } + delta, err := deltaForNode(ctx, u.crp, node, ng) + if err != nil { + return nil, err + } + for _, rl := range limiters { + if rl.AppliesTo(node) { + for resourceType, resourceCount := range delta { + usages[rl.ID()][resourceType] += resourceCount + } + } + } + } + return usages, nil +} diff --git a/cluster-autoscaler/resourcelimits/usage_test.go b/cluster-autoscaler/resourcelimits/usage_test.go new file mode 100644 index 000000000000..d28e0c5b0676 --- /dev/null +++ b/cluster-autoscaler/resourcelimits/usage_test.go @@ -0,0 +1,207 @@ +package resourcelimits + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + apiv1 "k8s.io/api/core/v1" + cptest "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" + "k8s.io/autoscaler/cluster-autoscaler/utils/test" +) + +func TestCalculateUsages(t *testing.T) { + testCases := []struct { + name string + nodes []*apiv1.Node + limiters []Limiter + nodeFilter func(node *apiv1.Node) bool + customTargets map[string][]customresources.CustomResourceTarget + wantUsages map[string]resourceList + }{ + { + name: "cluster-wide limiter, no node filter", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2000), + test.BuildTestNode("n2", 2000, 4000), + test.BuildTestNode("n3", 3000, 8000), + }, + limiters: []Limiter{ + &fakeLimiter{ + id: "cluster-wide", + appliesToFn: includeAll, + }, + }, + wantUsages: map[string]resourceList{ + "cluster-wide": { + "cpu": 6, + "memory": 14000, + "nodes": 3, + }, + }, + }, + { + name: "multiple limiters", + nodes: []*apiv1.Node{ + addLabel(test.BuildTestNode("n1", 1000, 2000), "pool", "a"), + addLabel(test.BuildTestNode("n2", 2000, 4000), "pool", "b"), + addLabel(test.BuildTestNode("n3", 3000, 8000), "pool", "a"), + }, + limiters: []Limiter{ + &fakeLimiter{ + id: "pool-a", + appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, + }, + &fakeLimiter{ + id: "pool-b", + appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, + }, + }, + wantUsages: map[string]resourceList{ + "pool-a": { + "cpu": 4, + "memory": 10000, + "nodes": 2, + }, + "pool-b": { + "cpu": 2, + "memory": 4000, + "nodes": 1, + }, + }, + }, + { + name: "with node filter", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2000), + test.BuildTestNode("n2", 2000, 4000), + test.BuildTestNode("n3", 3000, 8000), + }, + limiters: []Limiter{ + &fakeLimiter{ + id: "cluster-wide", + appliesToFn: includeAll, + }, + }, + nodeFilter: func(node *apiv1.Node) bool { return node.Name == "n2" }, + wantUsages: map[string]resourceList{ + "cluster-wide": { + "cpu": 4, + "memory": 10000, + "nodes": 2, + }, + }, + }, + { + name: "limiter doesn't match any node", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2000), + }, + limiters: []Limiter{ + &fakeLimiter{ + id: "no-match", + appliesToFn: func(node *apiv1.Node) bool { return false }, + }, + }, + wantUsages: map[string]resourceList{ + "no-match": {}, + }, + }, + { + name: "with custom resources", + nodes: []*apiv1.Node{ + test.BuildTestNode("n1", 1000, 2000), + test.BuildTestNode("n2", 2000, 4000), + }, + limiters: []Limiter{ + &fakeLimiter{ + id: "cluster-wide", + appliesToFn: includeAll, + }, + }, + customTargets: map[string][]customresources.CustomResourceTarget{ + "n1": { + {ResourceType: "gpu", ResourceCount: 2}, + }, + }, + wantUsages: map[string]resourceList{ + "cluster-wide": { + "cpu": 3, + "memory": 6000, + "gpu": 2, + "nodes": 2, + }, + }, + }, + { + name: "multiple limiters and node filter", + nodes: []*apiv1.Node{ + addLabel(test.BuildTestNode("n1", 1000, 2000), "pool", "a"), + addLabel(test.BuildTestNode("n2", 2000, 4000), "pool", "b"), + addLabel(test.BuildTestNode("n3", 3000, 8000), "pool", "a"), + }, + limiters: []Limiter{ + &fakeLimiter{ + id: "pool-a", + appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, + }, + &fakeLimiter{ + id: "pool-b", + appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, + }, + }, + nodeFilter: func(node *apiv1.Node) bool { return node.Name == "n3" }, + wantUsages: map[string]resourceList{ + "pool-a": { + "cpu": 1, + "memory": 2000, + "nodes": 1, + }, + "pool-b": { + "cpu": 2, + "memory": 4000, + "nodes": 1, + }, + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + provider := cptest.NewTestCloudProviderBuilder().Build() + ctx := &context.AutoscalingContext{CloudProvider: provider} + crp := &fakeCustomResourcesProcessor{ + NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget { + if tc.customTargets == nil { + return nil + } + return tc.customTargets[node.Name] + }, + } + var nf NodeFilter + if tc.nodeFilter != nil { + nf = &fakeNodeFilter{NodeFilterFn: tc.nodeFilter} + } + calculator := newUsageCalculator(crp, nf) + usages, err := calculator.calculateUsages(ctx, tc.nodes, tc.limiters) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if diff := cmp.Diff(tc.wantUsages, usages); diff != "" { + t.Errorf("calculateUsages() mismatch (-want +got):\n%s", diff) + } + }) + } +} + +func includeAll(node *apiv1.Node) bool { + return true +} + +func addLabel(node *apiv1.Node, key, value string) *apiv1.Node { + if node.Labels == nil { + node.Labels = make(map[string]string) + } + node.Labels[key] = value + return node +} From 080fd155c93b656398057475478a57af209706b8 Mon Sep 17 00:00:00 2001 From: Norbert Cyran Date: Wed, 15 Oct 2025 17:01:06 +0200 Subject: [PATCH 2/2] Add support for multiple resource limits --- .../cloudprovider/resource_limiter.go | 14 +- cluster-autoscaler/resourcelimits/factory.go | 104 ---------- cluster-autoscaler/resourcelimits/provider.go | 26 --- .../resourcelimits/provider_test.go | 34 ---- cluster-autoscaler/resourcelimits/tracker.go | 166 --------------- cluster-autoscaler/resourcelimits/usage.go | 56 ------ cluster-autoscaler/resourcequotas/factory.go | 78 ++++++++ .../factory_test.go | 128 ++++-------- cluster-autoscaler/resourcequotas/provider.go | 50 +++++ .../resourcequotas/provider_test.go | 46 +++++ .../testutils.go | 61 +++--- cluster-autoscaler/resourcequotas/tracker.go | 189 ++++++++++++++++++ .../tracker_test.go | 159 ++++++++++++--- cluster-autoscaler/resourcequotas/usage.go | 75 +++++++ .../usage_test.go | 54 +++-- 15 files changed, 667 insertions(+), 573 deletions(-) delete mode 100644 cluster-autoscaler/resourcelimits/factory.go delete mode 100644 cluster-autoscaler/resourcelimits/provider.go delete mode 100644 cluster-autoscaler/resourcelimits/provider_test.go delete mode 100644 cluster-autoscaler/resourcelimits/tracker.go delete mode 100644 cluster-autoscaler/resourcelimits/usage.go create mode 100644 cluster-autoscaler/resourcequotas/factory.go rename cluster-autoscaler/{resourcelimits => resourcequotas}/factory_test.go (64%) create mode 100644 cluster-autoscaler/resourcequotas/provider.go create mode 100644 cluster-autoscaler/resourcequotas/provider_test.go rename cluster-autoscaler/{resourcelimits => resourcequotas}/testutils.go (55%) create mode 100644 cluster-autoscaler/resourcequotas/tracker.go rename cluster-autoscaler/{resourcelimits => resourcequotas}/tracker_test.go (51%) create mode 100644 cluster-autoscaler/resourcequotas/usage.go rename cluster-autoscaler/{resourcelimits => resourcequotas}/usage_test.go (82%) diff --git a/cluster-autoscaler/cloudprovider/resource_limiter.go b/cluster-autoscaler/cloudprovider/resource_limiter.go index eb07c2ff6b21..84dc9eaaaeda 100644 --- a/cluster-autoscaler/cloudprovider/resource_limiter.go +++ b/cluster-autoscaler/cloudprovider/resource_limiter.go @@ -31,6 +31,7 @@ type ResourceLimiter struct { maxLimits map[string]int64 } +// ID returns the identifier of the limiter. func (r *ResourceLimiter) ID() string { return "cluster-wide" } @@ -95,14 +96,17 @@ func (r *ResourceLimiter) String() string { return strings.Join(resourceDetails, ", ") } +// AppliesTo checks if the limiter applies to node. +// +// As this is a compatibility layer for cluster-wide limits, it always returns true. func (r *ResourceLimiter) AppliesTo(node *apiv1.Node) bool { return true } -func (r *ResourceLimiter) MaxLimits() map[string]int64 { +// Limits returns max limits of the limiter. +// +// New resource quotas system supports only max limits, therefore only max limits +// are returned here. +func (r *ResourceLimiter) Limits() map[string]int64 { return r.maxLimits } - -func (r *ResourceLimiter) MinLimits() map[string]int64 { - return r.minLimits -} diff --git a/cluster-autoscaler/resourcelimits/factory.go b/cluster-autoscaler/resourcelimits/factory.go deleted file mode 100644 index 5008d8fa671a..000000000000 --- a/cluster-autoscaler/resourcelimits/factory.go +++ /dev/null @@ -1,104 +0,0 @@ -package resourcelimits - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" -) - -// TrackerFactory builds trackers. -type TrackerFactory struct { - crp customresources.CustomResourcesProcessor - limitProviders []Provider - usageCalculator *usageCalculator -} - -// NewTrackerFactory creates a new TrackerFactory. -func NewTrackerFactory(opts TrackerOptions) *TrackerFactory { - uc := newUsageCalculator(opts.CRP, opts.NodeFilter) - return &TrackerFactory{ - crp: opts.CRP, - limitProviders: opts.Providers, - usageCalculator: uc, - } -} - -// NewMaxLimitsTracker builds a new Tracker for max limits. -func (f *TrackerFactory) NewMaxLimitsTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node) (*Tracker, error) { - return f.newLimitsTracker(ctx, nodes, &maxLimitsStrategy{}) -} - -// NewMinLimitsTracker builds a new Tracker for min limits. -func (f *TrackerFactory) NewMinLimitsTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node) (*Tracker, error) { - return f.newLimitsTracker(ctx, nodes, &minLimitsStrategy{}) -} - -func (f *TrackerFactory) newLimitsTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node, strategy limitStrategy) (*Tracker, error) { - limiters, err := f.limiters() - if err != nil { - return nil, err - } - usages, err := f.usageCalculator.calculateUsages(ctx, nodes, limiters) - if err != nil { - return nil, err - } - limitsLeft := make(map[string]resourceList) - for _, rl := range limiters { - limitsLeft[rl.ID()] = make(resourceList) - limits := strategy.GetLimits(rl) - for resourceType, limit := range limits { - usage := usages[rl.ID()][resourceType] - limitsLeft[rl.ID()][resourceType] = strategy.CalculateLimitsLeft(limit, usage) - } - } - tracker := newTracker(f.crp, limiters, limitsLeft) - return tracker, nil -} - -func (f *TrackerFactory) limiters() ([]Limiter, error) { - var limiters []Limiter - for _, provider := range f.limitProviders { - provLimiters, err := provider.AllLimiters() - if err != nil { - return nil, fmt.Errorf("failed to get limiters from provider: %w", err) - } - for _, limiter := range provLimiters { - limiters = append(limiters, limiter) - } - } - return limiters, nil -} - -// limitStrategy is an interface for defining limit calculation strategies. -type limitStrategy interface { - GetLimits(rl Limiter) resourceList - CalculateLimitsLeft(limit, usage int64) int64 -} - -// maxLimitsStrategy is a strategy for max limits. -type maxLimitsStrategy struct{} - -// GetLimits returns max limits. -func (s *maxLimitsStrategy) GetLimits(rl Limiter) resourceList { - return rl.MaxLimits() -} - -// CalculateLimitsLeft calculates the remaining limits for max limits. -func (s *maxLimitsStrategy) CalculateLimitsLeft(limit, usage int64) int64 { - return max(0, limit-usage) -} - -// minLimitsStrategy is a strategy for min limits. -type minLimitsStrategy struct{} - -// GetLimits returns min limits. -func (s *minLimitsStrategy) GetLimits(rl Limiter) resourceList { - return rl.MinLimits() -} - -// CalculateLimitsLeft calculates the remaining limits for min limits. -func (s *minLimitsStrategy) CalculateLimitsLeft(limit, usage int64) int64 { - return max(0, usage-limit) -} diff --git a/cluster-autoscaler/resourcelimits/provider.go b/cluster-autoscaler/resourcelimits/provider.go deleted file mode 100644 index abbbd87aedfc..000000000000 --- a/cluster-autoscaler/resourcelimits/provider.go +++ /dev/null @@ -1,26 +0,0 @@ -package resourcelimits - -import ( - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" -) - -type Provider interface { - AllLimiters() ([]Limiter, error) -} -type CloudLimitersProvider struct { - cloudProvider cloudprovider.CloudProvider -} - -func (p *CloudLimitersProvider) AllLimiters() ([]Limiter, error) { - rl, err := p.cloudProvider.GetResourceLimiter() - if err != nil { - return nil, err - } - return []Limiter{rl}, nil -} - -func NewCloudLimitersProvider(cloudProvider cloudprovider.CloudProvider) *CloudLimitersProvider { - return &CloudLimitersProvider{ - cloudProvider: cloudProvider, - } -} diff --git a/cluster-autoscaler/resourcelimits/provider_test.go b/cluster-autoscaler/resourcelimits/provider_test.go deleted file mode 100644 index cf88ba4085b0..000000000000 --- a/cluster-autoscaler/resourcelimits/provider_test.go +++ /dev/null @@ -1,34 +0,0 @@ -package resourcelimits - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" - "k8s.io/autoscaler/cluster-autoscaler/utils/units" -) - -func TestCloudLimitersProvider(t *testing.T) { - cloudProvider := test.NewTestCloudProviderBuilder().Build() - minLimits := map[string]int64{"cpu": 2, "memory": 8 * units.GiB} - maxLimits := map[string]int64{"cpu": 4, "memory": 16 * units.GiB} - resourceLimiter := cloudprovider.NewResourceLimiter(minLimits, maxLimits) - cloudProvider.SetResourceLimiter(resourceLimiter) - - limitsProvider := NewCloudLimitersProvider(cloudProvider) - limiters, err := limitsProvider.AllLimiters() - if err != nil { - t.Errorf("failed to get limiters: %v", err) - } - if len(limiters) != 1 { - t.Errorf("got %d limiters, expected 1", len(limiters)) - } - limiter := limiters[0] - if diff := cmp.Diff(minLimits, limiter.MinLimits()); diff != "" { - t.Errorf("MinLimits() mismatch (-want +got):\n%s", diff) - } - if diff := cmp.Diff(maxLimits, limiter.MaxLimits()); diff != "" { - t.Errorf("MaxLimits() mismatch (-want +got):\n%s", diff) - } -} diff --git a/cluster-autoscaler/resourcelimits/tracker.go b/cluster-autoscaler/resourcelimits/tracker.go deleted file mode 100644 index 56772b0b5622..000000000000 --- a/cluster-autoscaler/resourcelimits/tracker.go +++ /dev/null @@ -1,166 +0,0 @@ -package resourcelimits - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/core/utils" - "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" -) - -const ( - ResourceNameCores = "cpu" - ResourceNameMemory = "memory" - ResourceNameNodes = "nodes" -) - -// Limiter is an interface for a single resource limit. -type Limiter interface { - ID() string - AppliesTo(node *corev1.Node) bool - MaxLimits() map[string]int64 - MinLimits() map[string]int64 -} - -// resourceList is a map of resource names to their quantities. -type resourceList map[string]int64 - -// Tracker tracks resource limits. A single tracker should track either max or min limits. -type Tracker struct { - crp customresources.CustomResourcesProcessor - limiters []Limiter - limitsLeft map[string]resourceList -} - -// newTracker creates a new Tracker. -func newTracker(crp customresources.CustomResourcesProcessor, limiters []Limiter, limitsLeft map[string]resourceList) *Tracker { - return &Tracker{ - crp: crp, - limiters: limiters, - limitsLeft: limitsLeft, - } -} - -type TrackerOptions struct { - CRP customresources.CustomResourcesProcessor - Providers []Provider - NodeFilter NodeFilter -} - -// ApplyDelta checks if a delta is within limits and applies it. -func (t *Tracker) ApplyDelta( - ctx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int, -) (*CheckDeltaResult, error) { - delta, err := deltaForNode(ctx, t.crp, node, nodeGroup) - if err != nil { - return nil, err - } - matchingLimiters := t.matchingLimiters(node) - - result := t.checkDelta(delta, matchingLimiters, nodeDelta) - - if result.AllowedDelta != nodeDelta { - return result, nil - } - - for _, rl := range matchingLimiters { - if t.limitsLeft[rl.ID()] == nil { - continue - } - for resource, resourceDelta := range delta { - if limit, ok := t.limitsLeft[rl.ID()][resource]; ok { - t.limitsLeft[rl.ID()][resource] = max(limit-resourceDelta*int64(result.AllowedDelta), 0) - } - } - } - - return result, nil -} - -// CheckDelta checks if a delta is within limits, without applying it. -func (t *Tracker) CheckDelta( - ctx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int, -) (*CheckDeltaResult, error) { - // TODO: cache deltas - delta, err := deltaForNode(ctx, t.crp, node, nodeGroup) - if err != nil { - return nil, err - } - matchingLimiters := t.matchingLimiters(node) - return t.checkDelta(delta, matchingLimiters, nodeDelta), nil -} - -func (t *Tracker) checkDelta(delta resourceList, matchingLimiters []Limiter, nodeDelta int) *CheckDeltaResult { - result := &CheckDeltaResult{ - AllowedDelta: nodeDelta, - } - - exceededResources := make(map[string][]string) - for _, rl := range matchingLimiters { - limiterLimitsLeft := t.limitsLeft[rl.ID()] - for resource, resourceDelta := range delta { - if resourceDelta <= 0 { - continue - } - - limitsLeft, ok := limiterLimitsLeft[resource] - if !ok { - continue - } - - if limitsLeft < resourceDelta*int64(nodeDelta) { - allowedNodes := limitsLeft / resourceDelta - if allowedNodes < int64(result.AllowedDelta) { - result.AllowedDelta = int(allowedNodes) - } - exceededResources[rl.ID()] = append(exceededResources[rl.ID()], resource) - } - } - } - result.ExceededResources = exceededResources - return result -} - -func (t *Tracker) matchingLimiters(node *corev1.Node) []Limiter { - var matchingLimiters []Limiter - for _, rl := range t.limiters { - if rl.AppliesTo(node) { - matchingLimiters = append(matchingLimiters, rl) - } - } - return matchingLimiters -} - -// CheckDeltaResult is a result of checking a delta. -type CheckDeltaResult struct { - ExceededResources map[string][]string - AllowedDelta int -} - -// Exceeded returns true if any resource limit was exceeded. -func (r *CheckDeltaResult) Exceeded() bool { - return len(r.ExceededResources) > 0 -} - -// deltaForNode calculates the amount of resources that will be used from the cluster when creating a node. -func deltaForNode(ctx *context.AutoscalingContext, crp customresources.CustomResourcesProcessor, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) { - nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(node) - nodeResources := resourceList{ - ResourceNameCores: nodeCPU, - ResourceNameMemory: nodeMemory, - ResourceNameNodes: 1, - } - - resourceTargets, err := crp.GetNodeResourceTargets(ctx, node, nodeGroup) - if err != nil { - return nil, fmt.Errorf("failed to get custom resources: %w", err) - } - - for _, resourceTarget := range resourceTargets { - nodeResources[resourceTarget.ResourceType] = resourceTarget.ResourceCount - } - - return nodeResources, nil -} diff --git a/cluster-autoscaler/resourcelimits/usage.go b/cluster-autoscaler/resourcelimits/usage.go deleted file mode 100644 index 7a9ec69487e7..000000000000 --- a/cluster-autoscaler/resourcelimits/usage.go +++ /dev/null @@ -1,56 +0,0 @@ -package resourcelimits - -import ( - "fmt" - - corev1 "k8s.io/api/core/v1" - "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" -) - -// NodeFilter is an interface for filtering nodes which should be included in usage calculations. -type NodeFilter interface { - ExcludeFromTracking(node *corev1.Node) bool -} - -type usageCalculator struct { - crp customresources.CustomResourcesProcessor - nodeFilter NodeFilter -} - -func newUsageCalculator(crp customresources.CustomResourcesProcessor, nodeFilter NodeFilter) *usageCalculator { - return &usageCalculator{ - crp: crp, - nodeFilter: nodeFilter, - } -} - -func (u *usageCalculator) calculateUsages(ctx *context.AutoscalingContext, nodes []*corev1.Node, limiters []Limiter) (map[string]resourceList, error) { - usages := make(map[string]resourceList) - for _, rl := range limiters { - usages[rl.ID()] = make(resourceList) - } - - for _, node := range nodes { - if u.nodeFilter != nil && u.nodeFilter.ExcludeFromTracking(node) { - continue - } - - ng, err := ctx.CloudProvider.NodeGroupForNode(node) - if err != nil { - return nil, fmt.Errorf("failed to get node group for node %q: %w", node.Name, err) - } - delta, err := deltaForNode(ctx, u.crp, node, ng) - if err != nil { - return nil, err - } - for _, rl := range limiters { - if rl.AppliesTo(node) { - for resourceType, resourceCount := range delta { - usages[rl.ID()][resourceType] += resourceCount - } - } - } - } - return usages, nil -} diff --git a/cluster-autoscaler/resourcequotas/factory.go b/cluster-autoscaler/resourcequotas/factory.go new file mode 100644 index 000000000000..a3916a707a88 --- /dev/null +++ b/cluster-autoscaler/resourcequotas/factory.go @@ -0,0 +1,78 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +// TrackerFactory builds quota trackers. +type TrackerFactory struct { + crp customresources.CustomResourcesProcessor + quotasProvider Provider + usageCalculator *usageCalculator +} + +// TrackerOptions stores configuration for quota tracking. +type TrackerOptions struct { + CustomResourcesProcessor customresources.CustomResourcesProcessor + QuotaProvider Provider + NodeFilter NodeFilter +} + +// NewTrackerFactory creates a new TrackerFactory. +func NewTrackerFactory(opts TrackerOptions) *TrackerFactory { + uc := newUsageCalculator(opts.CustomResourcesProcessor, opts.NodeFilter) + return &TrackerFactory{ + crp: opts.CustomResourcesProcessor, + quotasProvider: opts.QuotaProvider, + usageCalculator: uc, + } +} + +// NewQuotasTracker builds a new Tracker. +// +// NewQuotasTracker calculates resources used by the nodes for every +// quota returned by the Provider. Then, based on usages and limits it calculates +// how many resources can be still added to the cluster. Returns a Tracker object. +func (f *TrackerFactory) NewQuotasTracker(ctx *context.AutoscalingContext, nodes []*corev1.Node) (*Tracker, error) { + quotas, err := f.quotasProvider.Quotas() + if err != nil { + return nil, err + } + usages, err := f.usageCalculator.calculateUsages(ctx, nodes, quotas) + if err != nil { + return nil, err + } + var quotaStatuses []*quotaStatus + for _, rq := range quotas { + limitsLeft := make(resourceList) + limits := rq.Limits() + for resourceType, limit := range limits { + usage := usages[rq.ID()][resourceType] + limitsLeft[resourceType] = max(0, limit-usage) + } + quotaStatuses = append(quotaStatuses, "aStatus{ + quota: rq, + limitsLeft: limitsLeft, + }) + } + tracker := newTracker(f.crp, quotaStatuses) + return tracker, nil +} diff --git a/cluster-autoscaler/resourcelimits/factory_test.go b/cluster-autoscaler/resourcequotas/factory_test.go similarity index 64% rename from cluster-autoscaler/resourcelimits/factory_test.go rename to cluster-autoscaler/resourcequotas/factory_test.go index b633ee6a18be..569430101968 100644 --- a/cluster-autoscaler/resourcelimits/factory_test.go +++ b/cluster-autoscaler/resourcequotas/factory_test.go @@ -1,4 +1,20 @@ -package resourcelimits +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas import ( "testing" @@ -20,7 +36,7 @@ func (n nodeExcludeFn) ExcludeFromTracking(node *apiv1.Node) bool { return n(node) } -func TestMaxLimitsTracker(t *testing.T) { +func TestNewQuotasTracker(t *testing.T) { testCases := []struct { name string crp customresources.CustomResourcesProcessor @@ -63,8 +79,8 @@ func TestMaxLimitsTracker(t *testing.T) { nodeDelta: 2, wantResult: &CheckDeltaResult{ AllowedDelta: 0, - ExceededResources: map[string][]string{ - "cluster-wide": {"cpu", "memory"}, + ExceededQuotas: []ExceededQuota{ + {ID: "cluster-wide", ExceededResources: []string{"cpu", "memory"}}, }, }, }, @@ -83,8 +99,8 @@ func TestMaxLimitsTracker(t *testing.T) { nodeDelta: 2, wantResult: &CheckDeltaResult{ AllowedDelta: 0, - ExceededResources: map[string][]string{ - "cluster-wide": {"cpu", "memory"}, + ExceededQuotas: []ExceededQuota{ + {ID: "cluster-wide", ExceededResources: []string{"cpu", "memory"}}, }, }, }, @@ -148,8 +164,8 @@ func TestMaxLimitsTracker(t *testing.T) { nodeDelta: 2, wantResult: &CheckDeltaResult{ AllowedDelta: 0, - ExceededResources: map[string][]string{ - "cluster-wide": {"gpu"}, + ExceededQuotas: []ExceededQuota{ + {ID: "cluster-wide", ExceededResources: []string{"gpu"}}, }, }, }, @@ -191,8 +207,8 @@ func TestMaxLimitsTracker(t *testing.T) { nodeDelta: 1, wantResult: &CheckDeltaResult{ AllowedDelta: 0, - ExceededResources: map[string][]string{ - "cluster-wide": {"cpu", "memory"}, + ExceededQuotas: []ExceededQuota{ + {ID: "cluster-wide", ExceededResources: []string{"cpu", "memory"}}, }, }, }, @@ -208,11 +224,11 @@ func TestMaxLimitsTracker(t *testing.T) { crp = &fakeCustomResourcesProcessor{} } factory := NewTrackerFactory(TrackerOptions{ - CRP: crp, - Providers: []Provider{NewCloudLimitersProvider(cloudProvider)}, - NodeFilter: tc.nodeFilter, + CustomResourcesProcessor: crp, + QuotaProvider: NewCloudQuotasProvider(cloudProvider), + NodeFilter: tc.nodeFilter, }) - tracker, err := factory.NewMaxLimitsTracker(ctx, tc.nodes) + tracker, err := factory.NewQuotasTracker(ctx, tc.nodes) if err != nil { t.Errorf("failed to create tracker: %v", err) } @@ -221,87 +237,11 @@ func TestMaxLimitsTracker(t *testing.T) { if err != nil { t.Errorf("failed to check delta: %v", err) } - if diff := cmp.Diff(tc.wantResult, result, cmpopts.SortSlices(func(a, b string) bool { return a < b }), cmpopts.EquateEmpty()); diff != "" { - t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff) - } - }) - } -} - -func TestMinLimitsTracker(t *testing.T) { - testCases := []struct { - name string - crp customresources.CustomResourcesProcessor - nodeFilter NodeFilter - nodes []*apiv1.Node - limits map[string]int64 - newNode *apiv1.Node - nodeDelta int - wantResult *CheckDeltaResult - }{ - { - name: "default config allowed operation", - nodes: []*apiv1.Node{ - test.BuildTestNode("n1", 1000, 2*units.GiB), - test.BuildTestNode("n2", 2000, 4*units.GiB), - test.BuildTestNode("n3", 3000, 8*units.GiB), - }, - limits: map[string]int64{ - "cpu": 1, - "memory": 2 * units.GiB, - }, - newNode: test.BuildTestNode("n1", 1000, 2*units.GiB), - nodeDelta: 1, - wantResult: &CheckDeltaResult{ - AllowedDelta: 1, - }, - }, - { - name: "default config exceeded operation", - nodes: []*apiv1.Node{ - test.BuildTestNode("n1", 1000, 2*units.GiB), - test.BuildTestNode("n2", 2000, 4*units.GiB), - test.BuildTestNode("n3", 3000, 8*units.GiB), - }, - limits: map[string]int64{ - "cpu": 6, - "memory": 10 * units.GiB, - }, - newNode: test.BuildTestNode("n3", 3000, 8*units.GiB), - nodeDelta: 1, - wantResult: &CheckDeltaResult{ - AllowedDelta: 0, - ExceededResources: map[string][]string{ - "cluster-wide": {"cpu", "memory"}, - }, - }, - }, - } - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - cloudProvider := cptest.NewTestCloudProviderBuilder().Build() - resourceLimiter := cloudprovider.NewResourceLimiter(tc.limits, nil) - cloudProvider.SetResourceLimiter(resourceLimiter) - ctx := &context.AutoscalingContext{CloudProvider: cloudProvider} - crp := tc.crp - if crp == nil { - crp = &fakeCustomResourcesProcessor{} - } - factory := NewTrackerFactory(TrackerOptions{ - CRP: crp, - Providers: []Provider{NewCloudLimitersProvider(cloudProvider)}, - NodeFilter: tc.nodeFilter, - }) - tracker, err := factory.NewMinLimitsTracker(ctx, tc.nodes) - if err != nil { - t.Errorf("failed to create tracker: %v", err) - } - var ng cloudprovider.NodeGroup - result, err := tracker.CheckDelta(ctx, ng, tc.newNode, tc.nodeDelta) - if err != nil { - t.Errorf("failed to check delta: %v", err) + opts := []cmp.Option{ + cmpopts.SortSlices(func(a, b string) bool { return a < b }), + cmpopts.EquateEmpty(), } - if diff := cmp.Diff(tc.wantResult, result, cmpopts.SortSlices(func(a, b string) bool { return a < b }), cmpopts.EquateEmpty()); diff != "" { + if diff := cmp.Diff(tc.wantResult, result, opts...); diff != "" { t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff) } }) diff --git a/cluster-autoscaler/resourcequotas/provider.go b/cluster-autoscaler/resourcequotas/provider.go new file mode 100644 index 000000000000..848c5734e5cd --- /dev/null +++ b/cluster-autoscaler/resourcequotas/provider.go @@ -0,0 +1,50 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" +) + +// Provider provides Quotas. Each Provider implementation acts as a different +// source of Quotas. +type Provider interface { + Quotas() ([]Quota, error) +} + +// CloudQuotasProvider is an adapter for cloudprovider.ResourceLimiter. +type CloudQuotasProvider struct { + cloudProvider cloudprovider.CloudProvider +} + +// Quotas returns the cloud provider's ResourceLimiter, which implements Quota interface. +// +// This acts as a compatibility layer with the legacy resource limits system. +func (p *CloudQuotasProvider) Quotas() ([]Quota, error) { + rl, err := p.cloudProvider.GetResourceLimiter() + if err != nil { + return nil, err + } + return []Quota{rl}, nil +} + +// NewCloudQuotasProvider returns a new CloudQuotasProvider. +func NewCloudQuotasProvider(cloudProvider cloudprovider.CloudProvider) *CloudQuotasProvider { + return &CloudQuotasProvider{ + cloudProvider: cloudProvider, + } +} diff --git a/cluster-autoscaler/resourcequotas/provider_test.go b/cluster-autoscaler/resourcequotas/provider_test.go new file mode 100644 index 000000000000..bc4003fd5942 --- /dev/null +++ b/cluster-autoscaler/resourcequotas/provider_test.go @@ -0,0 +1,46 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/units" +) + +func TestCloudLimitersProvider(t *testing.T) { + cloudProvider := test.NewTestCloudProviderBuilder().Build() + maxLimits := map[string]int64{"cpu": 4, "memory": 16 * units.GiB} + resourceLimiter := cloudprovider.NewResourceLimiter(nil, maxLimits) + cloudProvider.SetResourceLimiter(resourceLimiter) + + quotasProvider := NewCloudQuotasProvider(cloudProvider) + quotas, err := quotasProvider.Quotas() + if err != nil { + t.Errorf("failed to get quotas: %v", err) + } + if len(quotas) != 1 { + t.Errorf("got %d quotas, expected 1", len(quotas)) + } + quota := quotas[0] + if diff := cmp.Diff(maxLimits, quota.Limits()); diff != "" { + t.Errorf("Limits() mismatch (-want +got):\n%s", diff) + } +} diff --git a/cluster-autoscaler/resourcelimits/testutils.go b/cluster-autoscaler/resourcequotas/testutils.go similarity index 55% rename from cluster-autoscaler/resourcelimits/testutils.go rename to cluster-autoscaler/resourcequotas/testutils.go index ab25740f75e5..8acdc43674a4 100644 --- a/cluster-autoscaler/resourcelimits/testutils.go +++ b/cluster-autoscaler/resourcequotas/testutils.go @@ -1,4 +1,20 @@ -package resourcelimits +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas import ( apiv1 "k8s.io/api/core/v1" @@ -9,26 +25,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/errors" ) -// fakeProvider is a fake implementation of a resource limit provider. -type fakeProvider struct { - limiters []Limiter -} - -// NewFakeProvider creates a new fakeProvider. -func NewFakeProvider() *fakeProvider { - return &fakeProvider{} -} - -// AddLimiter adds a limiter to the provider. -func (p *fakeProvider) AddLimiter(limiter Limiter) { - p.limiters = append(p.limiters, limiter) -} - -// AllLimiters returns all limiters from the provider. -func (p *fakeProvider) AllLimiters() ([]Limiter, error) { - return p.limiters, nil -} - type fakeNodeFilter struct { NodeFilterFn func(*apiv1.Node) bool } @@ -41,8 +37,7 @@ func (f *fakeNodeFilter) ExcludeFromTracking(node *apiv1.Node) bool { } type fakeCustomResourcesProcessor struct { - NodeResourceTargets func(*apiv1.Node) []customresources.CustomResourceTarget - GetNodeResourceTargetsError errors.AutoscalerError + NodeResourceTargets func(*apiv1.Node) []customresources.CustomResourceTarget } func (f *fakeCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node, draSnapshot *drasnapshot.Snapshot) ([]*apiv1.Node, []*apiv1.Node) { @@ -50,9 +45,6 @@ func (f *fakeCustomResourcesProcessor) FilterOutNodesWithUnreadyResources(contex } func (f *fakeCustomResourcesProcessor) GetNodeResourceTargets(context *context.AutoscalingContext, node *apiv1.Node, nodeGroup cloudprovider.NodeGroup) ([]customresources.CustomResourceTarget, errors.AutoscalerError) { - if f.GetNodeResourceTargetsError != nil { - return nil, f.GetNodeResourceTargetsError - } if f.NodeResourceTargets == nil { return nil, nil } @@ -62,25 +54,20 @@ func (f *fakeCustomResourcesProcessor) GetNodeResourceTargets(context *context.A func (f *fakeCustomResourcesProcessor) CleanUp() { } -type fakeLimiter struct { +type fakeQuota struct { id string appliesToFn func(*apiv1.Node) bool - minLimits resourceList - maxLimits resourceList + limits resourceList } -func (f *fakeLimiter) ID() string { +func (f *fakeQuota) ID() string { return f.id } -func (f *fakeLimiter) AppliesTo(node *apiv1.Node) bool { +func (f *fakeQuota) AppliesTo(node *apiv1.Node) bool { return f.appliesToFn(node) } -func (f *fakeLimiter) MaxLimits() map[string]int64 { - return f.maxLimits -} - -func (f *fakeLimiter) MinLimits() map[string]int64 { - return f.minLimits +func (f *fakeQuota) Limits() map[string]int64 { + return f.limits } diff --git a/cluster-autoscaler/resourcequotas/tracker.go b/cluster-autoscaler/resourcequotas/tracker.go new file mode 100644 index 000000000000..0c70e6c906ba --- /dev/null +++ b/cluster-autoscaler/resourcequotas/tracker.go @@ -0,0 +1,189 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/core/utils" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +const ( + // ResourceNodes is a resource name for number of nodes. + ResourceNodes = "nodes" +) + +// Quota is an interface for a single quota. +type Quota interface { + ID() string + // AppliesTo returns true if the quota applies to the given node. + AppliesTo(node *corev1.Node) bool + // Limits returns the resource limits defined by the quota. + Limits() map[string]int64 +} + +// resourceList is a map of resource names to their quantities. +type resourceList map[string]int64 + +// Tracker tracks resource quotas. +type Tracker struct { + crp customresources.CustomResourcesProcessor + quotaStatuses []*quotaStatus +} + +type quotaStatus struct { + quota Quota + limitsLeft resourceList +} + +// newTracker creates a new Tracker. +func newTracker(crp customresources.CustomResourcesProcessor, quotaStatuses []*quotaStatus) *Tracker { + return &Tracker{ + crp: crp, + quotaStatuses: quotaStatuses, + } +} + +// ApplyDelta checks if a delta is within limits and applies it. Delta is applied only if it can be applied entirely. +func (t *Tracker) ApplyDelta( + ctx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int, +) (*CheckDeltaResult, error) { + delta, err := deltaForNode(ctx, t.crp, node, nodeGroup) + if err != nil { + return nil, err + } + matchingQuotas := t.matchingQuotas(node) + + result := t.checkDelta(delta, matchingQuotas, nodeDelta) + + if result.AllowedDelta != nodeDelta { + return result, nil + } + + for _, ls := range matchingQuotas { + for resource, resourceDelta := range delta { + if limit, ok := ls.limitsLeft[resource]; ok { + ls.limitsLeft[resource] = max(limit-resourceDelta*int64(result.AllowedDelta), 0) + } + } + } + + return result, nil +} + +// CheckDelta checks if a delta is within limits, without applying it. +func (t *Tracker) CheckDelta( + ctx *context.AutoscalingContext, nodeGroup cloudprovider.NodeGroup, node *corev1.Node, nodeDelta int, +) (*CheckDeltaResult, error) { + // TODO: cache deltas + delta, err := deltaForNode(ctx, t.crp, node, nodeGroup) + if err != nil { + return nil, err + } + matchingQuotas := t.matchingQuotas(node) + return t.checkDelta(delta, matchingQuotas, nodeDelta), nil +} + +func (t *Tracker) checkDelta(delta resourceList, matchingQuotas []*quotaStatus, nodeDelta int) *CheckDeltaResult { + result := &CheckDeltaResult{ + AllowedDelta: nodeDelta, + } + + for _, qs := range matchingQuotas { + var exceededResources []string + for resource, resourceDelta := range delta { + if resourceDelta <= 0 { + continue + } + + limitsLeft, ok := qs.limitsLeft[resource] + if !ok { + continue + } + + if limitsLeft < resourceDelta*int64(nodeDelta) { + allowedNodes := limitsLeft / resourceDelta + if allowedNodes < int64(result.AllowedDelta) { + result.AllowedDelta = int(allowedNodes) + } + exceededResources = append(exceededResources, resource) + } + } + if len(exceededResources) > 0 { + result.ExceededQuotas = append(result.ExceededQuotas, ExceededQuota{ + ID: qs.quota.ID(), ExceededResources: exceededResources, + }) + } + } + + return result +} + +func (t *Tracker) matchingQuotas(node *corev1.Node) []*quotaStatus { + var limits []*quotaStatus + for _, ls := range t.quotaStatuses { + if ls.quota.AppliesTo(node) { + limits = append(limits, ls) + } + } + return limits +} + +// CheckDeltaResult is a result of checking a delta. +type CheckDeltaResult struct { + // ExceededQuotas contains information about quotas that were exceeded. + ExceededQuotas []ExceededQuota + // AllowedDelta specifies how many nodes could be added without violating the quotas. + AllowedDelta int +} + +// Exceeded returns true if any resource limit was exceeded. +func (r *CheckDeltaResult) Exceeded() bool { + return len(r.ExceededQuotas) > 0 +} + +// ExceededQuota contains information about quota that was exceeded. +type ExceededQuota struct { + ID string + ExceededResources []string +} + +// deltaForNode calculates the amount of resources that will be used from the cluster when creating a node. +func deltaForNode(ctx *context.AutoscalingContext, crp customresources.CustomResourcesProcessor, node *corev1.Node, nodeGroup cloudprovider.NodeGroup) (resourceList, error) { + // TODO: storage? + nodeCPU, nodeMemory := utils.GetNodeCoresAndMemory(node) + nodeResources := resourceList{ + string(corev1.ResourceCPU): nodeCPU, + string(corev1.ResourceMemory): nodeMemory, + ResourceNodes: 1, + } + + resourceTargets, err := crp.GetNodeResourceTargets(ctx, node, nodeGroup) + if err != nil { + return nil, fmt.Errorf("failed to get custom resources: %w", err) + } + + for _, resourceTarget := range resourceTargets { + nodeResources[resourceTarget.ResourceType] = resourceTarget.ResourceCount + } + + return nodeResources, nil +} diff --git a/cluster-autoscaler/resourcelimits/tracker_test.go b/cluster-autoscaler/resourcequotas/tracker_test.go similarity index 51% rename from cluster-autoscaler/resourcelimits/tracker_test.go rename to cluster-autoscaler/resourcequotas/tracker_test.go index fe6f1799df37..fd3a274f10c5 100644 --- a/cluster-autoscaler/resourcelimits/tracker_test.go +++ b/cluster-autoscaler/resourcequotas/tracker_test.go @@ -1,4 +1,20 @@ -package resourcelimits +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas import ( "testing" @@ -10,20 +26,25 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/autoscaler/cluster-autoscaler/utils/units" ) func TestCheckDelta(t *testing.T) { testCases := []struct { - name string - tracker *Tracker - node *apiv1.Node - nodeDelta int - wantResult *CheckDeltaResult + name string + tracker *Tracker + node *apiv1.Node + nodeDelta int + wantResult *CheckDeltaResult + wantExceeded bool }{ { name: "delta fits within limits", - tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ - "limiter1": {"cpu": 10, "memory": 1000, "nodes": 5}, + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 10, "memory": 1000, "nodes": 5}, + }, }), node: test.BuildTestNode("n1", 1000, 200), nodeDelta: 2, @@ -33,32 +54,47 @@ func TestCheckDelta(t *testing.T) { }, { name: "delta exceeds one resource limit", - tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ - "limiter1": {"cpu": 1, "memory": 1000, "nodes": 5}, + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 1, "memory": 1000, "nodes": 5}, + }, }), node: test.BuildTestNode("n1", 1000, 200), nodeDelta: 2, wantResult: &CheckDeltaResult{ - AllowedDelta: 1, - ExceededResources: map[string][]string{"limiter1": {"cpu"}}, + AllowedDelta: 1, + ExceededQuotas: []ExceededQuota{ + {ID: "limiter1", ExceededResources: []string{"cpu"}}, + }, }, + wantExceeded: true, }, { name: "delta exceeds multiple resource limits", - tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ - "limiter1": {"cpu": 1, "memory": 300, "nodes": 5}, + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 1, "memory": 300, "nodes": 5}, + }, }), node: test.BuildTestNode("n1", 1000, 200), nodeDelta: 2, wantResult: &CheckDeltaResult{ - AllowedDelta: 1, - ExceededResources: map[string][]string{"limiter1": {"cpu", "memory"}}, + AllowedDelta: 1, + ExceededQuotas: []ExceededQuota{ + {ID: "limiter1", ExceededResources: []string{"cpu", "memory"}}, + }, }, + wantExceeded: true, }, { - name: "no matching limiters", - tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return false }}}, map[string]resourceList{ - "limiter1": {"cpu": 1, "memory": 100, "nodes": 1}, + name: "no matching quotas", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return false }}, + limitsLeft: resourceList{"cpu": 1, "memory": 100, "nodes": 1}, + }, }), node: test.BuildTestNode("n1", 1000, 200), nodeDelta: 2, @@ -66,6 +102,41 @@ func TestCheckDelta(t *testing.T) { AllowedDelta: 2, }, }, + { + name: "resource in limits but not in the node", + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 4, "memory": 32 * units.GiB, "gpu": 2}, + }, + }), + node: test.BuildTestNode("n1", 1000, 2000), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, + { + name: "resource in the node but not in the limits", + tracker: newTracker(&fakeCustomResourcesProcessor{NodeResourceTargets: func(node *apiv1.Node) []customresources.CustomResourceTarget { + return []customresources.CustomResourceTarget{ + { + ResourceType: "gpu", + ResourceCount: 1, + }, + } + }}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 4, "memory": 32 * units.GiB}, + }, + }), + node: test.BuildTestNode("n1", 1000, 2000), + nodeDelta: 2, + wantResult: &CheckDeltaResult{ + AllowedDelta: 2, + }, + }, } for _, tc := range testCases { @@ -80,6 +151,9 @@ func TestCheckDelta(t *testing.T) { if diff := cmp.Diff(tc.wantResult, gotResult, cmpopts.EquateEmpty()); diff != "" { t.Errorf("CheckDelta() mismatch (-want +got):\n%s", diff) } + if gotResult.Exceeded() != tc.wantExceeded { + t.Errorf("Exceeded() mismatch, want: %v, got: %v", tc.wantExceeded, gotResult.Exceeded()) + } }) } } @@ -95,8 +169,11 @@ func TestApplyDelta(t *testing.T) { }{ { name: "delta applied successfully", - tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ - "limiter1": {"cpu": 10, "memory": 1000, "nodes": 5}, + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 10, "memory": 1000, "nodes": 5}, + }, }), node: test.BuildTestNode("n1", 1000, 200), nodeDelta: 2, @@ -109,14 +186,19 @@ func TestApplyDelta(t *testing.T) { }, { name: "partial delta calculated, nothing applied", - tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ - "limiter1": {"cpu": 3, "memory": 1000, "nodes": 5}, + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 3, "memory": 1000, "nodes": 5}, + }, }), node: test.BuildTestNode("n1", 2000, 200), nodeDelta: 2, wantResult: &CheckDeltaResult{ - AllowedDelta: 1, - ExceededResources: map[string][]string{"limiter1": {"cpu"}}, + AllowedDelta: 1, + ExceededQuotas: []ExceededQuota{ + {ID: "limiter1", ExceededResources: []string{"cpu"}}, + }, }, wantLimitsLeft: map[string]resourceList{ "limiter1": {"cpu": 3, "memory": 1000, "nodes": 5}, @@ -124,14 +206,19 @@ func TestApplyDelta(t *testing.T) { }, { name: "delta not applied because it exceeds limits", - tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ - "limiter1": {"cpu": 1, "memory": 100, "nodes": 5}, + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 1, "memory": 100, "nodes": 5}, + }, }), node: test.BuildTestNode("n1", 2000, 200), nodeDelta: 1, wantResult: &CheckDeltaResult{ - AllowedDelta: 0, - ExceededResources: map[string][]string{"limiter1": {"cpu", "memory"}}, + AllowedDelta: 0, + ExceededQuotas: []ExceededQuota{ + {ID: "limiter1", ExceededResources: []string{"cpu", "memory"}}, + }, }, wantLimitsLeft: map[string]resourceList{ "limiter1": {"cpu": 1, "memory": 100, "nodes": 5}, @@ -139,8 +226,11 @@ func TestApplyDelta(t *testing.T) { }, { name: "applied delta results in zero limit", - tracker: newTracker(&fakeCustomResourcesProcessor{}, []Limiter{&fakeLimiter{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}}, map[string]resourceList{ - "limiter1": {"cpu": 2, "memory": 500, "nodes": 10}, + tracker: newTracker(&fakeCustomResourcesProcessor{}, []*quotaStatus{ + { + quota: &fakeQuota{id: "limiter1", appliesToFn: func(*apiv1.Node) bool { return true }}, + limitsLeft: resourceList{"cpu": 2, "memory": 500, "nodes": 10}, + }, }), node: test.BuildTestNode("n1", 1000, 200), nodeDelta: 2, @@ -166,9 +256,14 @@ func TestApplyDelta(t *testing.T) { t.Errorf("ApplyDelta() result mismatch (-want +got):\n%s", diff) } - if diff := cmp.Diff(tc.wantLimitsLeft, tc.tracker.limitsLeft, cmpopts.EquateEmpty()); diff != "" { + gotLimitsLeft := make(map[string]resourceList) + for _, ls := range tc.tracker.quotaStatuses { + gotLimitsLeft[ls.quota.ID()] = ls.limitsLeft + } + if diff := cmp.Diff(tc.wantLimitsLeft, gotLimitsLeft, cmpopts.EquateEmpty()); diff != "" { t.Errorf("ApplyDelta() limitsLeft mismatch (-want +got):\n%s", diff) } + }) } } diff --git a/cluster-autoscaler/resourcequotas/usage.go b/cluster-autoscaler/resourcequotas/usage.go new file mode 100644 index 000000000000..484e8aab679a --- /dev/null +++ b/cluster-autoscaler/resourcequotas/usage.go @@ -0,0 +1,75 @@ +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" +) + +// NodeFilter customizes what nodes should be included in usage calculations. +type NodeFilter interface { + // ExcludeFromTracking returns true if the node should be excluded from usage calculations. + ExcludeFromTracking(node *corev1.Node) bool +} + +type usageCalculator struct { + crp customresources.CustomResourcesProcessor + nodeFilter NodeFilter +} + +func newUsageCalculator(crp customresources.CustomResourcesProcessor, nodeFilter NodeFilter) *usageCalculator { + return &usageCalculator{ + crp: crp, + nodeFilter: nodeFilter, + } +} + +// calculateUsages calculates resources used by nodes for every quota. +// Returns a map with quota ID as a key and resources used in the corresponding quota as a value. +func (u *usageCalculator) calculateUsages(ctx *context.AutoscalingContext, nodes []*corev1.Node, quotas []Quota) (map[string]resourceList, error) { + usages := make(map[string]resourceList) + for _, rl := range quotas { + usages[rl.ID()] = make(resourceList) + } + + for _, node := range nodes { + if u.nodeFilter != nil && u.nodeFilter.ExcludeFromTracking(node) { + continue + } + + ng, err := ctx.CloudProvider.NodeGroupForNode(node) + if err != nil { + return nil, fmt.Errorf("failed to get node group for node %q: %w", node.Name, err) + } + delta, err := deltaForNode(ctx, u.crp, node, ng) + if err != nil { + return nil, err + } + for _, rq := range quotas { + if rq.AppliesTo(node) { + for resourceType, resourceCount := range delta { + usages[rq.ID()][resourceType] += resourceCount + } + } + } + } + return usages, nil +} diff --git a/cluster-autoscaler/resourcelimits/usage_test.go b/cluster-autoscaler/resourcequotas/usage_test.go similarity index 82% rename from cluster-autoscaler/resourcelimits/usage_test.go rename to cluster-autoscaler/resourcequotas/usage_test.go index d28e0c5b0676..fc8160eccfb6 100644 --- a/cluster-autoscaler/resourcelimits/usage_test.go +++ b/cluster-autoscaler/resourcequotas/usage_test.go @@ -1,4 +1,20 @@ -package resourcelimits +/* +Copyright 2025 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcequotas import ( "testing" @@ -15,7 +31,7 @@ func TestCalculateUsages(t *testing.T) { testCases := []struct { name string nodes []*apiv1.Node - limiters []Limiter + quotas []Quota nodeFilter func(node *apiv1.Node) bool customTargets map[string][]customresources.CustomResourceTarget wantUsages map[string]resourceList @@ -27,8 +43,8 @@ func TestCalculateUsages(t *testing.T) { test.BuildTestNode("n2", 2000, 4000), test.BuildTestNode("n3", 3000, 8000), }, - limiters: []Limiter{ - &fakeLimiter{ + quotas: []Quota{ + &fakeQuota{ id: "cluster-wide", appliesToFn: includeAll, }, @@ -42,18 +58,18 @@ func TestCalculateUsages(t *testing.T) { }, }, { - name: "multiple limiters", + name: "multiple quotas", nodes: []*apiv1.Node{ addLabel(test.BuildTestNode("n1", 1000, 2000), "pool", "a"), addLabel(test.BuildTestNode("n2", 2000, 4000), "pool", "b"), addLabel(test.BuildTestNode("n3", 3000, 8000), "pool", "a"), }, - limiters: []Limiter{ - &fakeLimiter{ + quotas: []Quota{ + &fakeQuota{ id: "pool-a", appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, }, - &fakeLimiter{ + &fakeQuota{ id: "pool-b", appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, }, @@ -78,8 +94,8 @@ func TestCalculateUsages(t *testing.T) { test.BuildTestNode("n2", 2000, 4000), test.BuildTestNode("n3", 3000, 8000), }, - limiters: []Limiter{ - &fakeLimiter{ + quotas: []Quota{ + &fakeQuota{ id: "cluster-wide", appliesToFn: includeAll, }, @@ -98,8 +114,8 @@ func TestCalculateUsages(t *testing.T) { nodes: []*apiv1.Node{ test.BuildTestNode("n1", 1000, 2000), }, - limiters: []Limiter{ - &fakeLimiter{ + quotas: []Quota{ + &fakeQuota{ id: "no-match", appliesToFn: func(node *apiv1.Node) bool { return false }, }, @@ -114,8 +130,8 @@ func TestCalculateUsages(t *testing.T) { test.BuildTestNode("n1", 1000, 2000), test.BuildTestNode("n2", 2000, 4000), }, - limiters: []Limiter{ - &fakeLimiter{ + quotas: []Quota{ + &fakeQuota{ id: "cluster-wide", appliesToFn: includeAll, }, @@ -135,18 +151,18 @@ func TestCalculateUsages(t *testing.T) { }, }, { - name: "multiple limiters and node filter", + name: "multiple quotas and node filter", nodes: []*apiv1.Node{ addLabel(test.BuildTestNode("n1", 1000, 2000), "pool", "a"), addLabel(test.BuildTestNode("n2", 2000, 4000), "pool", "b"), addLabel(test.BuildTestNode("n3", 3000, 8000), "pool", "a"), }, - limiters: []Limiter{ - &fakeLimiter{ + quotas: []Quota{ + &fakeQuota{ id: "pool-a", appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "a" }, }, - &fakeLimiter{ + &fakeQuota{ id: "pool-b", appliesToFn: func(node *apiv1.Node) bool { return node.Labels["pool"] == "b" }, }, @@ -183,7 +199,7 @@ func TestCalculateUsages(t *testing.T) { nf = &fakeNodeFilter{NodeFilterFn: tc.nodeFilter} } calculator := newUsageCalculator(crp, nf) - usages, err := calculator.calculateUsages(ctx, tc.nodes, tc.limiters) + usages, err := calculator.calculateUsages(ctx, tc.nodes, tc.quotas) if err != nil { t.Errorf("unexpected error: %v", err) }