Skip to content

Commit c748df6

Browse files
committed
feat: apply updates to specified target resources
1 parent 36b76a5 commit c748df6

File tree

2 files changed

+62
-24
lines changed

2 files changed

+62
-24
lines changed

internal/autoscaler/autoscaler.go

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ func (s *Autoscaler) LoadWorkloads(ctx context.Context) {
9999

100100
observedWorkloads := map[string]bool{}
101101
for _, workload := range workloadList.Items {
102-
autoScalingConfig := workload.Spec.AutoScalingConfig
103102
if !workload.DeletionTimestamp.IsZero() {
104103
continue
105104
}
@@ -111,7 +110,7 @@ func (s *Autoscaler) LoadWorkloads(ctx context.Context) {
111110
}
112111
workloadState.Namespace = workload.Namespace
113112
workloadState.Resources = workload.Spec.Resources
114-
workloadState.AutoScalingConfig = autoScalingConfig
113+
workloadState.AutoScalingConfig = workload.Spec.AutoScalingConfig
115114
s.WorkloadStates[workloadName] = workloadState
116115

117116
observedWorkloads[workloadName] = true
@@ -218,44 +217,40 @@ func (s *Autoscaler) ProcessWorkloads(ctx context.Context) {
218217
continue
219218
}
220219

221-
// TODO: apply config
222-
// asConfig := workloadState.AutoScalingConfig
223-
// NewResourceRecommenderFromAutoScalingConfig(ResouceRecomenderConfig{
224-
// }).GetRecommendedResources(workloadState)
225220
rr := s.ResourceRecommender.GetRecommendedResources(workloadState)
226-
log.Info("Autoscaler processWorkloads", "recommended resources", rr)
221+
log.Info("recommend resources", "workload", workloadState.Name, "resources", rr)
227222

228223
for _, worker := range podList.Items {
229224
if !worker.DeletionTimestamp.IsZero() {
230225
continue
231226
}
232227

233-
if err := s.updateWorker(ctx, &worker, rr); err != nil {
228+
if err := s.updateWorkerResourcesIfNeeded(ctx, workloadState, &worker, rr); err != nil {
234229
log.Error(err, "failed to update worker")
235230
}
236231
}
237232
}
238233
}
239234

240-
func (s *Autoscaler) updateWorker(ctx context.Context, worker *corev1.Pod, rr *RecommendedResources) error {
241-
annotations := worker.GetAnnotations()
242-
newAnnotations := map[string]string{}
243-
235+
func (s *Autoscaler) updateWorkerResourcesIfNeeded(ctx context.Context, workloadState *WorkloadState, worker *corev1.Pod, rr *RecommendedResources) error {
244236
resourcesInfo := []struct {
237+
name string
245238
requestKey string
246239
limitKey string
247240
lowerBound ResourceAmount
248241
upperBound ResourceAmount
249242
target ResourceAmount
250243
}{
251244
{
245+
name: "tflops",
252246
requestKey: constants.TFLOPSRequestAnnotation,
253247
limitKey: constants.TFLOPSLimitAnnotation,
254248
lowerBound: rr.LowerBoundTflops,
255249
upperBound: rr.UpperBoundTflops,
256250
target: rr.TargetTflops,
257251
},
258252
{
253+
name: "vram",
259254
requestKey: constants.VRAMRequestAnnotation,
260255
limitKey: constants.VRAMLimitAnnotation,
261256
lowerBound: rr.LowerBoundVram,
@@ -264,8 +259,13 @@ func (s *Autoscaler) updateWorker(ctx context.Context, worker *corev1.Pod, rr *R
264259
},
265260
}
266261

262+
annotations := worker.GetAnnotations()
263+
newAnnotations := map[string]string{}
267264
for _, resInfo := range resourcesInfo {
268-
if err := updateResource(
265+
if !workloadState.IsTargetResource(resInfo.name) {
266+
continue
267+
}
268+
if err := detectResourceChanges(
269269
annotations, newAnnotations,
270270
resInfo.requestKey, resInfo.limitKey,
271271
resInfo.lowerBound, resInfo.upperBound, resInfo.target,
@@ -291,7 +291,7 @@ func (s *Autoscaler) updateWorker(ctx context.Context, worker *corev1.Pod, rr *R
291291
return nil
292292
}
293293

294-
func updateResource(annotations, newAnnotations map[string]string, requestKey, limitKey string, lowerBound, upperBound, target ResourceAmount) error {
294+
func detectResourceChanges(annotations, newAnnotations map[string]string, requestKey, limitKey string, lowerBound, upperBound, target ResourceAmount) error {
295295
currentRequest, err := resource.ParseQuantity(annotations[requestKey])
296296
if err != nil {
297297
return fmt.Errorf("failed to parse %s: %v", requestKey, err)

internal/autoscaler/autoscaler_test.go

Lines changed: 48 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ var _ = Describe("Autoscaler", func() {
158158
})
159159

160160
Context("when processing workloads", func() {
161-
FIt("should update only those resources exceeding the recommended resource boundaries", func() {
161+
It("should update only those resources exceeding the recommended resource boundaries", func() {
162162
tfEnv := NewTensorFusionEnvBuilder().
163163
AddPoolWithNodeCount(1).SetGpuCountPerNode(1).
164164
Build()
@@ -184,6 +184,34 @@ var _ = Describe("Autoscaler", func() {
184184
}).Should(Succeed())
185185
})
186186

187+
It("should update specific resources based on TargetResource", func() {
188+
tfEnv := NewTensorFusionEnvBuilder().
189+
AddPoolWithNodeCount(1).SetGpuCountPerNode(1).
190+
Build()
191+
defer tfEnv.Cleanup()
192+
workload := createWorkload(tfEnv.GetGPUPool(0), 0, 1)
193+
defer deleteWorkload(workload)
194+
195+
scaler, _ := NewAutoscaler(k8sClient, &FakeAllocator{})
196+
scaler.LoadWorkloads(ctx)
197+
198+
scaler.ResourceRecommender = &FakeOutBoundRecommender{}
199+
rr := scaler.ResourceRecommender.GetRecommendedResources(nil)
200+
201+
workloadState := scaler.WorkloadStates[workload.Name]
202+
workloadState.AutoScalingConfig.AutoSetResources.TargetResource = "tflops"
203+
204+
oldRes := workloadState.Resources
205+
scaler.ProcessWorkloads(ctx)
206+
Eventually(func(g Gomega) {
207+
tflopsRequest, tflopsLimit, vramRequest, vramLimit := parseResourceAnnotations(getWorkers(workload)[0])
208+
Expect(tflopsRequest.Value()).To(Equal(int64(rr.TargetTflops)))
209+
Expect(tflopsLimit.Value()).To(Equal(int64(rr.TargetTflops * 2)))
210+
Expect(vramRequest.Equal(oldRes.Requests.Vram)).To(BeTrue())
211+
Expect(vramLimit.Equal(oldRes.Limits.Vram)).To(BeTrue())
212+
}).Should(Succeed())
213+
})
214+
187215
It("should return an error if failed to reallocate resources", func() {
188216
tfEnv := NewTensorFusionEnvBuilder().
189217
AddPoolWithNodeCount(1).SetGpuCountPerNode(1).
@@ -196,7 +224,7 @@ var _ = Describe("Autoscaler", func() {
196224
scaler.LoadWorkloads(ctx)
197225
scaler.ResourceRecommender = &FakeOutBoundRecommender{}
198226
rr := scaler.ResourceRecommender.GetRecommendedResources(nil)
199-
err := scaler.updateWorker(ctx, getWorkers(workload)[0], rr)
227+
err := scaler.updateWorkerResourcesIfNeeded(ctx, scaler.WorkloadStates[workload.Name], getWorkers(workload)[0], rr)
200228
Expect(err.Error()).To(ContainSubstring("failed to reallocate resources"))
201229
})
202230

@@ -388,16 +416,26 @@ func cleanupWorkload(key client.ObjectKey) {
388416
}
389417

390418
func assertWorkerAnnotations(worker *corev1.Pod, rr *RecommendedResources) {
391-
annotations := worker.GetAnnotations()
392-
tflopsRequest := resource.MustParse(annotations[constants.TFLOPSRequestAnnotation])
419+
tflopsRequest, tflopsLimit, vramRequest, vramLimit := parseResourceAnnotations(worker)
393420
Expect(tflopsRequest.Value()).To(Equal(int64(rr.TargetTflops)))
394-
395-
tflopsLimit := resource.MustParse(annotations[constants.TFLOPSLimitAnnotation])
396421
Expect(tflopsLimit.Value()).To(Equal(int64(rr.TargetTflops * 2)))
397-
398-
vramRequest := resource.MustParse(annotations[constants.VRAMRequestAnnotation])
399422
Expect(vramRequest.Value()).To(Equal(int64(rr.TargetVram)))
400-
401-
vramLimit := resource.MustParse(annotations[constants.VRAMLimitAnnotation])
402423
Expect(vramLimit.Value()).To(Equal(int64(rr.TargetVram * 2)))
403424
}
425+
426+
func parseResourceAnnotations(worker *corev1.Pod) (tflopsRequest, tflopsLimit, vramRequest, vramLimit resource.Quantity) {
427+
annotations := worker.GetAnnotations()
428+
keys := []struct {
429+
key string
430+
dst *resource.Quantity
431+
}{
432+
{constants.TFLOPSRequestAnnotation, &tflopsRequest},
433+
{constants.TFLOPSLimitAnnotation, &tflopsLimit},
434+
{constants.VRAMRequestAnnotation, &vramRequest},
435+
{constants.VRAMLimitAnnotation, &vramLimit},
436+
}
437+
for _, k := range keys {
438+
*k.dst = resource.MustParse(annotations[k.key])
439+
}
440+
return
441+
}

0 commit comments

Comments
 (0)