Skip to content

Commit 2fddc1c

Browse files
authored
feat: update stagedUpdateRun to use latest resource snapshot (#320)
Signed-off-by: Britania Rodriguez Reyes <[email protected]>
1 parent 4ce199e commit 2fddc1c

11 files changed

+586
-77
lines changed

apis/placement/v1beta1/stageupdate_types.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,8 @@ type UpdateRunSpec struct {
186186

187187
// The resource snapshot index of the selected resources to be updated across clusters.
188188
// The index represents a group of resource snapshots that includes all the resources a ResourcePlacement selected.
189-
// +kubebuilder:validation:Required
190189
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="resourceSnapshotIndex is immutable"
190+
// +kubebuilder:validation:Optional
191191
ResourceSnapshotIndex string `json:"resourceSnapshotIndex"`
192192

193193
// The name of the update strategy that specifies the stages and the sequence
@@ -374,6 +374,11 @@ type UpdateRunStatus struct {
374374
// +kubebuilder:validation:Optional
375375
PolicyObservedClusterCount int `json:"policyObservedClusterCount,omitempty"`
376376

377+
// ResourceSnapshotIndexUsed records the resource snapshot index that the update run is based on.
378+
// The index represents the same resource snapshots as specified in the spec field, or the latest.
379+
// +kubbebuilder:validation:Optional
380+
ResourceSnapshotIndexUsed string `json:"resourceSnapshotIndexUsed,omitempty"`
381+
377382
// ApplyStrategy is the apply strategy that the stagedUpdateRun is using.
378383
// It is the same as the apply strategy in the CRP when the staged update run starts.
379384
// The apply strategy is not updated during the update run even if it changes in the CRP.

config/crd/bases/placement.kubernetes-fleet.io_clusterstagedupdateruns.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1200,7 +1200,6 @@ spec:
12001200
type: string
12011201
required:
12021202
- placementName
1203-
- resourceSnapshotIndex
12041203
- stagedRolloutStrategyName
12051204
type: object
12061205
x-kubernetes-validations:
@@ -1895,6 +1894,11 @@ spec:
18951894
All clusters involved in the update run are selected from the list of clusters scheduled by the CRP according
18961895
to the current policy.
18971896
type: string
1897+
resourceSnapshotIndexUsed:
1898+
description: |-
1899+
ResourceSnapshotIndexUsed records the resource snapshot index that the update run is based on.
1900+
The index represents the same resource snapshots as specified in the spec field, or the latest.
1901+
type: string
18981902
stagedUpdateStrategySnapshot:
18991903
description: |-
19001904
UpdateStrategySnapshot is the snapshot of the UpdateStrategy used for the update run.

config/crd/bases/placement.kubernetes-fleet.io_stagedupdateruns.yaml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@ spec:
120120
type: string
121121
required:
122122
- placementName
123-
- resourceSnapshotIndex
124123
- stagedRolloutStrategyName
125124
type: object
126125
x-kubernetes-validations:
@@ -815,6 +814,11 @@ spec:
815814
All clusters involved in the update run are selected from the list of clusters scheduled by the CRP according
816815
to the current policy.
817816
type: string
817+
resourceSnapshotIndexUsed:
818+
description: |-
819+
ResourceSnapshotIndexUsed records the resource snapshot index that the update run is based on.
820+
The index represents the same resource snapshots as specified in the spec field, or the latest.
821+
type: string
818822
stagedUpdateStrategySnapshot:
819823
description: |-
820824
UpdateStrategySnapshot is the snapshot of the UpdateStrategy used for the update run.

pkg/controllers/updaterun/execution.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func (r *Reconciler) executeUpdatingStage(
9696
updateRunSpec := updateRun.GetUpdateRunSpec()
9797
updatingStageStatus := &updateRunStatus.StagesStatus[updatingStageIndex]
9898
// The parse error is ignored because the initialization should have caught it.
99-
resourceIndex, _ := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
99+
resourceIndex, _ := strconv.Atoi(updateRunStatus.ResourceSnapshotIndexUsed)
100100
resourceSnapshotName := fmt.Sprintf(placementv1beta1.ResourceSnapshotNameFmt, updateRunSpec.PlacementName, resourceIndex)
101101
updateRunRef := klog.KObj(updateRun)
102102
// Create the map of the toBeUpdatedBindings.

pkg/controllers/updaterun/execution_integration_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ var _ = Describe("UpdateRun execution tests - double stages", func() {
160160
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
161161

162162
By("Validating the initialization succeeded and the execution started")
163-
initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride)
163+
initialized := generateSucceededInitializationStatus(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy, clusterResourceOverride)
164164
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
165165
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
166166

@@ -521,7 +521,7 @@ var _ = Describe("UpdateRun execution tests - double stages", func() {
521521
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
522522

523523
By("Validating the initialization succeeded and the execution started")
524-
initialized := generateSucceededInitializationStatus(crp, updateRun, policySnapshot, updateStrategy, clusterResourceOverride)
524+
initialized := generateSucceededInitializationStatus(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy, clusterResourceOverride)
525525
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
526526
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
527527

@@ -680,7 +680,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
680680
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
681681

682682
By("Validating the initialization succeeded and the execution started")
683-
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
683+
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
684684
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
685685
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
686686

@@ -774,7 +774,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
774774
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
775775

776776
By("Validating the initialization succeeded and the execution started")
777-
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
777+
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
778778
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
779779
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
780780

@@ -883,7 +883,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
883883
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
884884

885885
By("Validating the initialization succeeded and the execution started")
886-
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
886+
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
887887
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
888888
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
889889

@@ -1014,7 +1014,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
10141014
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
10151015

10161016
By("Validating the initialization succeeded and the execution started")
1017-
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
1017+
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
10181018
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
10191019
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
10201020

@@ -1106,7 +1106,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
11061106
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
11071107

11081108
By("Validating the initialization succeeded and the execution started")
1109-
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
1109+
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
11101110
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
11111111
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
11121112

@@ -1163,7 +1163,7 @@ var _ = Describe("UpdateRun execution tests - single stage", func() {
11631163
Expect(k8sClient.Create(ctx, updateRun)).To(Succeed())
11641164

11651165
By("Validating the initialization succeeded and the execution started")
1166-
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, policySnapshot, updateStrategy)
1166+
initialized := generateSucceededInitializationStatusForSmallClusters(crp, updateRun, testResourceSnapshotIndex, policySnapshot, updateStrategy)
11671167
wantStatus = generateExecutionStartedStatus(updateRun, initialized)
11681168
validateClusterStagedUpdateRunStatus(ctx, updateRun, wantStatus, "")
11691169
})

pkg/controllers/updaterun/initialization.go

Lines changed: 62 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -471,30 +471,10 @@ func validateAfterStageTask(tasks []placementv1beta1.StageTask) error {
471471
func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey types.NamespacedName, updateRun placementv1beta1.UpdateRunObj) error {
472472
updateRunRef := klog.KObj(updateRun)
473473
updateRunSpec := updateRun.GetUpdateRunSpec()
474-
placementName := placementKey.Name
475474

476-
snapshotIndex, err := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
477-
if err != nil || snapshotIndex < 0 {
478-
err := controller.NewUserError(fmt.Errorf("invalid resource snapshot index `%s` provided, expected an integer >= 0", updateRunSpec.ResourceSnapshotIndex))
479-
klog.ErrorS(err, "Failed to parse the resource snapshot index", "updateRun", updateRunRef)
480-
// no more retries here.
481-
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
482-
}
483-
484-
resourceSnapshotList, err := controller.ListAllResourceSnapshotWithAnIndex(ctx, r.Client, updateRunSpec.ResourceSnapshotIndex, placementName, placementKey.Namespace)
475+
resourceSnapshotObjs, err := r.getResourceSnapshotObjs(ctx, placementKey, updateRun)
485476
if err != nil {
486-
klog.ErrorS(err, "Failed to list the resourceSnapshots associated with the placement",
487-
"placement", placementKey, "resourceSnapshotIndex", snapshotIndex, "updateRun", updateRunRef)
488-
// err can be retried.
489-
return controller.NewAPIServerError(true, err)
490-
}
491-
492-
resourceSnapshotObjs := resourceSnapshotList.GetResourceSnapshotObjs()
493-
if len(resourceSnapshotObjs) == 0 {
494-
err := controller.NewUserError(fmt.Errorf("no resourceSnapshots with index `%d` found for placement `%s`", snapshotIndex, placementKey))
495-
klog.ErrorS(err, "No specified resourceSnapshots found", "updateRun", updateRunRef)
496-
// no more retries here.
497-
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
477+
return err
498478
}
499479

500480
// Look for the master resourceSnapshot.
@@ -509,12 +489,18 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t
509489

510490
// No masterResourceSnapshot found.
511491
if masterResourceSnapshot == nil {
512-
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("no master resourceSnapshot found for placement `%s` with index `%d`", placementKey, snapshotIndex))
492+
err := controller.NewUnexpectedBehaviorError(fmt.Errorf("no master resourceSnapshot found for placement %s", placementKey))
513493
klog.ErrorS(err, "Failed to find master resourceSnapshot", "updateRun", updateRunRef)
514494
// no more retries here.
515495
return fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
516496
}
517-
klog.V(2).InfoS("Found master resourceSnapshot", "placement", placementKey, "index", snapshotIndex, "updateRun", updateRunRef)
497+
498+
klog.V(2).InfoS("Found master resourceSnapshot", "placement", placementKey, "masterResourceSnapshot", masterResourceSnapshot.GetName(), "updateRun", updateRunRef)
499+
500+
// Record the resource snapshot index used.
501+
updateRunStatus := updateRun.GetUpdateRunStatus()
502+
updateRunStatus.ResourceSnapshotIndexUsed = masterResourceSnapshot.GetLabels()[placementv1beta1.ResourceIndexLabel]
503+
updateRun.SetUpdateRunStatus(*updateRunStatus)
518504

519505
resourceSnapshotRef := klog.KObj(masterResourceSnapshot)
520506
// Fetch all the matching overrides.
@@ -526,7 +512,6 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t
526512
}
527513

528514
// Pick the overrides associated with each target cluster.
529-
updateRunStatus := updateRun.GetUpdateRunStatus()
530515
for _, stageStatus := range updateRunStatus.StagesStatus {
531516
for i := range stageStatus.Clusters {
532517
clusterStatus := &stageStatus.Clusters[i]
@@ -543,6 +528,58 @@ func (r *Reconciler) recordOverrideSnapshots(ctx context.Context, placementKey t
543528
return nil
544529
}
545530

531+
// getResourceSnapshotObjs retrieves the list of resource snapshot objects from the specified ResourceSnapshotIndex.
532+
// If ResourceSnapshotIndex is unspecified, it returns the list of latest resource snapshots.
533+
func (r *Reconciler) getResourceSnapshotObjs(ctx context.Context, placementKey types.NamespacedName, updateRun placementv1beta1.UpdateRunObj) ([]placementv1beta1.ResourceSnapshotObj, error) {
534+
updateRunRef := klog.KObj(updateRun)
535+
updateRunSpec := updateRun.GetUpdateRunSpec()
536+
var resourceSnapshotObjs []placementv1beta1.ResourceSnapshotObj
537+
if updateRunSpec.ResourceSnapshotIndex != "" {
538+
snapshotIndex, err := strconv.Atoi(updateRunSpec.ResourceSnapshotIndex)
539+
if err != nil || snapshotIndex < 0 {
540+
err := controller.NewUserError(fmt.Errorf("invalid resource snapshot index `%s` provided, expected an integer >= 0", updateRunSpec.ResourceSnapshotIndex))
541+
klog.ErrorS(err, "Failed to parse the resource snapshot index", "updateRun", updateRunRef)
542+
// no more retries here.
543+
return nil, fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
544+
}
545+
546+
resourceSnapshotList, err := controller.ListAllResourceSnapshotWithAnIndex(ctx, r.Client, updateRunSpec.ResourceSnapshotIndex, placementKey.Name, placementKey.Namespace)
547+
if err != nil {
548+
klog.ErrorS(err, "Failed to list the resourceSnapshots associated with the placement",
549+
"placement", placementKey, "resourceSnapshotIndex", snapshotIndex, "updateRun", updateRunRef)
550+
// list err can be retried.
551+
return nil, controller.NewAPIServerError(true, err)
552+
}
553+
554+
resourceSnapshotObjs = resourceSnapshotList.GetResourceSnapshotObjs()
555+
if len(resourceSnapshotObjs) == 0 {
556+
err := controller.NewUserError(fmt.Errorf("no resourceSnapshots with index `%d` found for placement `%s`", snapshotIndex, placementKey))
557+
klog.ErrorS(err, "No specified resourceSnapshots found", "updateRun", updateRunRef)
558+
// no more retries here.
559+
return resourceSnapshotObjs, fmt.Errorf("%w: %s", errInitializedFailed, err.Error())
560+
}
561+
return resourceSnapshotObjs, nil
562+
}
563+
564+
klog.V(2).InfoS("No resource snapshot index specified, fetching latest resource snapshots", "placement", placementKey, "updateRun", updateRunRef)
565+
latestResourceSnapshots, err := controller.ListLatestResourceSnapshots(ctx, r.Client, placementKey)
566+
if err != nil {
567+
klog.ErrorS(err, "Failed to list the latest resourceSnapshots associated with the placement",
568+
"placement", placementKey, "updateRun", updateRunRef)
569+
// list err can be retried.
570+
return nil, controller.NewAPIServerError(true, err)
571+
}
572+
573+
resourceSnapshotObjs = latestResourceSnapshots.GetResourceSnapshotObjs()
574+
if len(resourceSnapshotObjs) == 0 {
575+
err := fmt.Errorf("no latest resourceSnapshots found for placement `%s`. This might be a transient state, need retry", placementKey)
576+
klog.ErrorS(err, "No latest resourceSnapshots found for placement. This might be transient, need retry", "placement", placementKey, "updateRun", updateRunRef)
577+
// retryable error.
578+
return resourceSnapshotObjs, err
579+
}
580+
return resourceSnapshotObjs, nil
581+
}
582+
546583
// recordInitializationSucceeded records the successful initialization condition in the UpdateRun status.
547584
func (r *Reconciler) recordInitializationSucceeded(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error {
548585
updateRunStatus := updateRun.GetUpdateRunStatus()

0 commit comments

Comments
 (0)