@@ -14,6 +14,12 @@ import (
14
14
ctrl "sigs.k8s.io/controller-runtime"
15
15
"sigs.k8s.io/controller-runtime/pkg/client"
16
16
"sigs.k8s.io/controller-runtime/pkg/log"
17
+ "sigs.k8s.io/controller-runtime/pkg/manager"
18
+ )
19
+
20
+ var (
21
+ _ manager.Runnable = (* Autoscaler )(nil )
22
+ _ manager.LeaderElectionRunnable = (* Autoscaler )(nil )
17
23
)
18
24
19
25
type Autoscaler struct {
@@ -32,7 +38,7 @@ func NewAutoscaler(c client.Client) (*Autoscaler, error) {
32
38
return & Autoscaler {
33
39
Client : c ,
34
40
Recommender : NewRecommender (),
35
- MetricsProvider : NewMetricsProvider (),
41
+ MetricsProvider : NewMetricsProvider (nil ),
36
42
WorkloadStates : map [string ]* WorkloadState {},
37
43
WorkerStates : map [string ]* WorkerState {},
38
44
}, nil
@@ -57,6 +63,10 @@ func (s *Autoscaler) Start(ctx context.Context) error {
57
63
}
58
64
}
59
65
66
+ func (s * Autoscaler ) NeedLeaderElection () bool {
67
+ return true
68
+ }
69
+
60
70
func (s * Autoscaler ) Run (ctx context.Context ) {
61
71
log := log .FromContext (ctx )
62
72
@@ -80,8 +90,8 @@ func (s *Autoscaler) LoadWorkloads(ctx context.Context) {
80
90
autoScalingConfig := workload .Spec .AutoScalingConfig
81
91
// Currently only supports enabling both AutoSetLimits and AutoSetRequests simultaneously
82
92
if ! workload .DeletionTimestamp .IsZero () ||
83
- ! ( autoScalingConfig .AutoSetLimits .Enable &&
84
- autoScalingConfig .AutoSetRequests .Enable ) {
93
+ ! autoScalingConfig .AutoSetLimits .Enable ||
94
+ ! autoScalingConfig .AutoSetRequests .Enable {
85
95
continue
86
96
}
87
97
@@ -138,15 +148,15 @@ func (s *Autoscaler) LoadHistoryMetrics(ctx context.Context) {
138
148
139
149
workersMetrics := s .MetricsProvider .GetHistoryMetrics ()
140
150
for _ , metrics := range workersMetrics {
141
- workloadState , exists := s .WorkloadStates [metrics .Workload ]
151
+ workloadState , exists := s .WorkloadStates [metrics .WorkloadName ]
142
152
if ! exists {
143
- workloadState = NewWorkloadState (metrics .Workload )
144
- s .WorkloadStates [metrics .Workload ] = workloadState
153
+ workloadState = NewWorkloadState (metrics .WorkloadName )
154
+ s .WorkloadStates [metrics .WorkloadName ] = workloadState
145
155
}
146
- workerState , exists := s .WorkerStates [metrics .Worker ]
156
+ workerState , exists := s .WorkerStates [metrics .WorkerName ]
147
157
if ! exists {
148
- workerState = NewWorkerState (metrics .Worker , metrics .Workload )
149
- s .WorkerStates [metrics .Worker ] = workerState
158
+ workerState = NewWorkerState (metrics .WorkerName , metrics .WorkloadName )
159
+ s .WorkerStates [metrics .WorkerName ] = workerState
150
160
}
151
161
152
162
s .addSamples (workloadState , workerState , metrics )
@@ -159,11 +169,11 @@ func (s *Autoscaler) LoadRealTimeMetrics(ctx context.Context) {
159
169
160
170
workersMetrics := s .MetricsProvider .GetWorkersMetrics ()
161
171
for _ , metrics := range workersMetrics {
162
- workloadState , workloadExists := s .WorkloadStates [metrics .Workload ]
172
+ workloadState , workloadExists := s .WorkloadStates [metrics .WorkloadName ]
163
173
if ! workloadExists {
164
174
continue
165
175
}
166
- workerState , workerExists := s .WorkerStates [metrics .Worker ]
176
+ workerState , workerExists := s .WorkerStates [metrics .WorkerName ]
167
177
if ! workerExists {
168
178
continue
169
179
}
@@ -186,8 +196,12 @@ func (s *Autoscaler) ProcessWorkloads(ctx context.Context) {
186
196
continue
187
197
}
188
198
199
+ if len (podList .Items ) <= 0 {
200
+ continue
201
+ }
202
+
189
203
// TODO: apply config
190
- // asConfig := workloadState.AutoScalingConfig
204
+ // asConfig := workloadState.AutoScalingConfig
191
205
rr := s .Recommender .GetRecommendedResources (workloadState )
192
206
log .Info ("Autoscaler processWorkloads" , "recommended resources" , rr )
193
207
@@ -197,13 +211,13 @@ func (s *Autoscaler) ProcessWorkloads(ctx context.Context) {
197
211
}
198
212
199
213
annotations := worker .GetAnnotations ()
214
+ newAnnotations := map [string ]string {}
215
+
200
216
tflopsRequest , err := resource .ParseQuantity (annotations [constants .TFLOPSRequestAnnotation ])
201
217
if err != nil {
202
218
log .Error (err , "failed to parse vram request" )
203
219
continue
204
220
}
205
-
206
- newAnnotations := map [string ]string {}
207
221
if tflopsRequest .Cmp (QuantityFromAmount (rr .LowerBoundTflops )) < 0 ||
208
222
tflopsRequest .Cmp (QuantityFromAmount (rr .UpperBoundTflops )) > 0 {
209
223
targetTflopsRequest := QuantityFromAmount (rr .TargetTflops )
@@ -248,6 +262,7 @@ func (s *Autoscaler) ProcessWorkloads(ctx context.Context) {
248
262
worker .Annotations [key ] = value
249
263
}
250
264
265
+ // TODO: replace using the patch method
251
266
if err := s .Update (ctx , & worker ); err != nil {
252
267
log .Error (err , "failed to update worker" )
253
268
}
0 commit comments