Skip to content

Commit ccd0b25

Browse files
committed
refactor
Signed-off-by: Britania Rodriguez Reyes <[email protected]>
1 parent 246e2f9 commit ccd0b25

File tree

1 file changed

+174
-93
lines changed

1 file changed

+174
-93
lines changed

pkg/controllers/updaterun/execution.go

Lines changed: 174 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ func (r *Reconciler) executeUpdatingStage(
107107
resourceIndex, _ := strconv.Atoi(updateRunStatus.ResourceSnapshotIndexUsed)
108108
resourceSnapshotName := fmt.Sprintf(placementv1beta1.ResourceSnapshotNameFmt, updateRunSpec.PlacementName, resourceIndex)
109109
updateRunRef := klog.KObj(updateRun)
110+
110111
// Create the map of the toBeUpdatedBindings.
111112
toBeUpdatedBindingsMap := make(map[string]placementv1beta1.BindingObj, len(toBeUpdatedBindings))
112113
for _, binding := range toBeUpdatedBindings {
@@ -118,101 +119,47 @@ func (r *Reconciler) executeUpdatingStage(
118119
clusterUpdatingCount := 0
119120
var stuckClusterNames []string
120121
var clusterUpdateErrors []error
122+
121123
// Go through each cluster in the stage and check if it's updating/succeeded/failed.
122124
for i := 0; i < len(updatingStageStatus.Clusters) && clusterUpdatingCount < maxConcurrency; i++ {
123125
clusterStatus := &updatingStageStatus.Clusters[i]
124-
clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
125-
if clusterUpdateSucceededCond != nil && clusterUpdateSucceededCond.Status == metav1.ConditionTrue {
126-
// The cluster has been updated successfully.
126+
127+
// Process cluster status to determine if we should skip or handle errors
128+
processResult := r.processClusterStatus(clusterStatus, updatingStageStatus, updateRunRef)
129+
if processResult.skip {
127130
finishedClusterCount++
128131
continue
129132
}
130-
clusterUpdatingCount++
131-
if clusterUpdateSucceededCond != nil && clusterUpdateSucceededCond.Status == metav1.ConditionFalse {
132-
// The cluster is marked as failed to update, this cluster is counted as updating cluster since it's not finished to avoid processing more clusters than maxConcurrency in this round.
133-
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
134-
klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef)
135-
clusterUpdateErrors = append(clusterUpdateErrors, fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()))
133+
if processResult.err != nil {
134+
clusterUpdatingCount++
135+
clusterUpdateErrors = append(clusterUpdateErrors, processResult.err)
136136
continue
137137
}
138-
// The cluster needs to be processed.
139-
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
138+
139+
clusterUpdatingCount++
140140
binding := toBeUpdatedBindingsMap[clusterStatus.ClusterName]
141+
clusterStartedCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionStarted))
142+
141143
if clusterStartedCond == nil || clusterStartedCond.Status == metav1.ConditionFalse {
142144
// The cluster has not started updating yet.
143-
if !isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus) {
144-
klog.V(2).InfoS("Found the first cluster that needs to be updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
145-
// The binding is not up-to-date with the cluster status.
146-
bindingSpec := binding.GetBindingSpec()
147-
bindingSpec.State = placementv1beta1.BindingStateBound
148-
bindingSpec.ResourceSnapshotName = resourceSnapshotName
149-
bindingSpec.ResourceOverrideSnapshots = clusterStatus.ResourceOverrideSnapshots
150-
bindingSpec.ClusterResourceOverrideSnapshots = clusterStatus.ClusterResourceOverrideSnapshots
151-
bindingSpec.ApplyStrategy = updateRunStatus.ApplyStrategy
152-
if err := r.Client.Update(ctx, binding); err != nil {
153-
klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "updateRun", updateRunRef)
154-
clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err))
155-
continue
156-
}
157-
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
158-
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
159-
clusterUpdateErrors = append(clusterUpdateErrors, err)
160-
continue
161-
}
162-
} else {
163-
klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
164-
bindingSpec := binding.GetBindingSpec()
165-
if bindingSpec.State != placementv1beta1.BindingStateBound {
166-
bindingSpec.State = placementv1beta1.BindingStateBound
167-
if err := r.Client.Update(ctx, binding); err != nil {
168-
klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
169-
clusterUpdateErrors = append(clusterUpdateErrors, controller.NewUpdateIgnoreConflictError(err))
170-
continue
171-
}
172-
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
173-
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
174-
clusterUpdateErrors = append(clusterUpdateErrors, err)
175-
continue
176-
}
177-
} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.GetBindingStatus().Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.GetGeneration()) {
178-
klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
179-
if err := r.updateBindingRolloutStarted(ctx, binding, updateRun); err != nil {
180-
clusterUpdateErrors = append(clusterUpdateErrors, err)
181-
continue
182-
}
183-
} else {
184-
if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
185-
clusterUpdateErrors = append(clusterUpdateErrors, updateErr)
186-
continue
187-
}
188-
}
145+
if err := r.updateBindingForCluster(ctx, binding, clusterStatus, resourceSnapshotName, updateRun, updatingStageStatus); err != nil {
146+
clusterUpdateErrors = append(clusterUpdateErrors, err)
147+
continue
189148
}
190149
markClusterUpdatingStarted(clusterStatus, updateRun.GetGeneration())
191150
if finishedClusterCount == 0 {
192151
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
193152
}
194-
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
195153
continue
196154
}
197155

198-
// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
199-
inSync := isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus)
200-
rolloutStarted := condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.GetBindingStatus().Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.GetGeneration())
201-
bindingSpec := binding.GetBindingSpec()
202-
if !inSync || !rolloutStarted || bindingSpec.State != placementv1beta1.BindingStateBound {
203-
// This issue mostly happens when there are concurrent updateRuns referencing the same clusterResourcePlacement but releasing different versions.
204-
// After the 1st updateRun updates the binding, and before the controller re-checks the binding status, the 2nd updateRun updates the same binding, and thus the 1st updateRun is preempted and observes the binding not matching the desired state.
205-
preemptedErr := controller.NewUserError(fmt.Errorf("the binding of the updating cluster `%s` in the stage `%s` is not up-to-date with the desired status, "+
206-
"please check the status of binding `%s` and see if there is a concurrent updateRun referencing the same clusterResourcePlacement and updating the same cluster",
207-
clusterStatus.ClusterName, updatingStageStatus.StageName, klog.KObj(binding)))
208-
klog.ErrorS(preemptedErr, "The binding has been changed during updating",
209-
"bindingSpecInSync", inSync, "bindingState", bindingSpec.State,
210-
"bindingRolloutStarted", rolloutStarted, "binding", klog.KObj(binding), "updateRun", updateRunRef)
211-
markClusterUpdatingFailed(clusterStatus, updateRun.GetGeneration(), preemptedErr.Error())
212-
clusterUpdateErrors = append(clusterUpdateErrors, fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error()))
156+
// The cluster is already updating - validate it's properly synchronized
157+
if err := r.validateUpdatingCluster(binding, clusterStatus, resourceSnapshotName, updateRun, updatingStageStatus); err != nil {
158+
clusterUpdateErrors = append(clusterUpdateErrors, err)
213159
continue
214160
}
215161

162+
// Check if the cluster update has finished
216163
finished, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun)
217164
if updateErr != nil {
218165
clusterUpdateErrors = append(clusterUpdateErrors, updateErr)
@@ -222,7 +169,7 @@ func (r *Reconciler) executeUpdatingStage(
222169
// The cluster has finished successfully, we can process another cluster in this round.
223170
clusterUpdatingCount--
224171
} else {
225-
// If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
172+
// Check if cluster is stuck
226173
timeElapsed := time.Since(clusterStartedCond.LastTransitionTime.Time)
227174
if timeElapsed > updateRunStuckThreshold {
228175
klog.V(2).InfoS("Time waiting for cluster update to finish passes threshold, mark the update run as stuck", "time elapsed", timeElapsed, "threshold", updateRunStuckThreshold, "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
@@ -241,29 +188,163 @@ func (r *Reconciler) executeUpdatingStage(
241188
}
242189

243190
if finishedClusterCount == len(updatingStageStatus.Clusters) {
244-
// All the clusters in the stage have been updated.
245-
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
246-
markStageUpdatingWaiting(updatingStageStatus, updateRun.GetGeneration())
247-
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
248-
// Check if the after stage tasks are ready.
249-
approved, waitTime, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun)
250-
if err != nil {
251-
return 0, err
191+
return r.handleStageCompletion(ctx, updatingStageIndex, updateRun, updatingStageStatus)
192+
}
193+
194+
// Some clusters are still updating.
195+
return clusterUpdatingWaitTime, nil
196+
}
197+
198+
// clusterProcessResult represents the result of processing a cluster.
199+
type clusterProcessResult struct {
200+
finished bool
201+
skip bool // true if the cluster should be skipped (already processed)
202+
err error
203+
}
204+
205+
// processClusterStatus evaluates the status of a cluster and determines if it's finished, failed, or needs processing.
206+
// Returns a clusterProcessResult indicating how to proceed with this cluster.
207+
func (r *Reconciler) processClusterStatus(
208+
clusterStatus *placementv1beta1.ClusterUpdatingStatus,
209+
updatingStageStatus *placementv1beta1.StageUpdatingStatus,
210+
updateRunRef klog.ObjectRef,
211+
) clusterProcessResult {
212+
clusterUpdateSucceededCond := meta.FindStatusCondition(clusterStatus.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
213+
214+
if clusterUpdateSucceededCond != nil && clusterUpdateSucceededCond.Status == metav1.ConditionTrue {
215+
// The cluster has been updated successfully.
216+
return clusterProcessResult{finished: true, skip: true}
217+
}
218+
219+
if clusterUpdateSucceededCond != nil && clusterUpdateSucceededCond.Status == metav1.ConditionFalse {
220+
// The cluster is marked as failed to update, this cluster is counted as updating cluster
221+
// since it's not finished to avoid processing more clusters than maxConcurrency in this round.
222+
failedErr := fmt.Errorf("the cluster `%s` in the stage %s has failed", clusterStatus.ClusterName, updatingStageStatus.StageName)
223+
klog.ErrorS(failedErr, "The cluster has failed to be updated", "updateRun", updateRunRef)
224+
return clusterProcessResult{
225+
err: fmt.Errorf("%w: %s", errStagedUpdatedAborted, failedErr.Error()),
252226
}
253-
if approved {
254-
markUpdateRunProgressing(updateRun)
255-
markStageUpdatingSucceeded(updatingStageStatus, updateRun.GetGeneration())
256-
// No need to wait to get to the next stage.
257-
return 0, nil
227+
}
228+
229+
// The cluster needs to be processed.
230+
return clusterProcessResult{}
231+
}
232+
233+
// updateBindingForCluster handles updating the binding for a cluster that hasn't started updating yet.
234+
// Returns an error if the binding update fails.
235+
func (r *Reconciler) updateBindingForCluster(
236+
ctx context.Context,
237+
binding placementv1beta1.BindingObj,
238+
clusterStatus *placementv1beta1.ClusterUpdatingStatus,
239+
resourceSnapshotName string,
240+
updateRun placementv1beta1.UpdateRunObj,
241+
updatingStageStatus *placementv1beta1.StageUpdatingStatus,
242+
) error {
243+
updateRunRef := klog.KObj(updateRun)
244+
updateRunStatus := updateRun.GetUpdateRunStatus()
245+
246+
if !isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus) {
247+
klog.V(2).InfoS("Found the first cluster that needs to be updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
248+
// The binding is not up-to-date with the cluster status.
249+
bindingSpec := binding.GetBindingSpec()
250+
bindingSpec.State = placementv1beta1.BindingStateBound
251+
bindingSpec.ResourceSnapshotName = resourceSnapshotName
252+
bindingSpec.ResourceOverrideSnapshots = clusterStatus.ResourceOverrideSnapshots
253+
bindingSpec.ClusterResourceOverrideSnapshots = clusterStatus.ClusterResourceOverrideSnapshots
254+
bindingSpec.ApplyStrategy = updateRunStatus.ApplyStrategy
255+
if err := r.Client.Update(ctx, binding); err != nil {
256+
klog.ErrorS(err, "Failed to update binding to be bound with the matching spec of the updateRun", "binding", klog.KObj(binding), "updateRun", updateRunRef)
257+
return controller.NewUpdateIgnoreConflictError(err)
258+
}
259+
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
260+
return r.updateBindingRolloutStarted(ctx, binding, updateRun)
261+
}
262+
263+
// The binding is synced but needs other updates
264+
klog.V(2).InfoS("Found the first binding that is updating but the cluster status has not been updated", "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
265+
bindingSpec := binding.GetBindingSpec()
266+
if bindingSpec.State != placementv1beta1.BindingStateBound {
267+
bindingSpec.State = placementv1beta1.BindingStateBound
268+
if err := r.Client.Update(ctx, binding); err != nil {
269+
klog.ErrorS(err, "Failed to update a binding to be bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
270+
return controller.NewUpdateIgnoreConflictError(err)
258271
}
259-
// The after stage tasks are not ready yet.
260-
if waitTime < 0 {
261-
waitTime = stageUpdatingWaitTime
272+
klog.V(2).InfoS("Updated the status of a binding to bound", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
273+
return r.updateBindingRolloutStarted(ctx, binding, updateRun)
274+
} else if !condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.GetBindingStatus().Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.GetGeneration()) {
275+
klog.V(2).InfoS("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again", "binding", klog.KObj(binding), "cluster", clusterStatus.ClusterName, "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
276+
return r.updateBindingRolloutStarted(ctx, binding, updateRun)
277+
} else {
278+
if _, updateErr := checkClusterUpdateResult(binding, clusterStatus, updatingStageStatus, updateRun); updateErr != nil {
279+
return updateErr
262280
}
263-
return waitTime, nil
264281
}
265-
// Some clusters are still updating.
266-
return clusterUpdatingWaitTime, nil
282+
return nil
283+
}
284+
285+
// validateUpdatingCluster validates that an updating cluster's binding is properly synchronized.
286+
// Returns an error if validation fails (indicating a concurrent update conflict).
287+
func (r *Reconciler) validateUpdatingCluster(
288+
binding placementv1beta1.BindingObj,
289+
clusterStatus *placementv1beta1.ClusterUpdatingStatus,
290+
resourceSnapshotName string,
291+
updateRun placementv1beta1.UpdateRunObj,
292+
updatingStageStatus *placementv1beta1.StageUpdatingStatus,
293+
) error {
294+
updateRunRef := klog.KObj(updateRun)
295+
296+
// Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
297+
inSync := isBindingSyncedWithClusterStatus(resourceSnapshotName, updateRun, binding, clusterStatus)
298+
rolloutStarted := condition.IsConditionStatusTrue(meta.FindStatusCondition(binding.GetBindingStatus().Conditions, string(placementv1beta1.ResourceBindingRolloutStarted)), binding.GetGeneration())
299+
bindingSpec := binding.GetBindingSpec()
300+
301+
if !inSync || !rolloutStarted || bindingSpec.State != placementv1beta1.BindingStateBound {
302+
// This issue mostly happens when there are concurrent updateRuns referencing the same clusterResourcePlacement but releasing different versions.
303+
// After the 1st updateRun updates the binding, and before the controller re-checks the binding status, the 2nd updateRun updates the same binding, and thus the 1st updateRun is preempted and observes the binding not matching the desired state.
304+
preemptedErr := controller.NewUserError(fmt.Errorf("the binding of the updating cluster `%s` in the stage `%s` is not up-to-date with the desired status, "+
305+
"please check the status of binding `%s` and see if there is a concurrent updateRun referencing the same clusterResourcePlacement and updating the same cluster",
306+
clusterStatus.ClusterName, updatingStageStatus.StageName, klog.KObj(binding)))
307+
klog.ErrorS(preemptedErr, "The binding has been changed during updating",
308+
"bindingSpecInSync", inSync, "bindingState", bindingSpec.State,
309+
"bindingRolloutStarted", rolloutStarted, "binding", klog.KObj(binding), "updateRun", updateRunRef)
310+
markClusterUpdatingFailed(clusterStatus, updateRun.GetGeneration(), preemptedErr.Error())
311+
return fmt.Errorf("%w: %s", errStagedUpdatedAborted, preemptedErr.Error())
312+
}
313+
314+
return nil
315+
}
316+
317+
// handleStageCompletion handles the completion logic when all clusters in a stage are finished.
318+
// Returns the wait time and any error encountered.
319+
func (r *Reconciler) handleStageCompletion(
320+
ctx context.Context,
321+
updatingStageIndex int,
322+
updateRun placementv1beta1.UpdateRunObj,
323+
updatingStageStatus *placementv1beta1.StageUpdatingStatus,
324+
) (time.Duration, error) {
325+
updateRunRef := klog.KObj(updateRun)
326+
327+
// All the clusters in the stage have been updated.
328+
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
329+
markStageUpdatingWaiting(updatingStageStatus, updateRun.GetGeneration())
330+
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
331+
332+
// Check if the after stage tasks are ready.
333+
approved, waitTime, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun)
334+
if err != nil {
335+
return 0, err
336+
}
337+
if approved {
338+
markUpdateRunProgressing(updateRun)
339+
markStageUpdatingSucceeded(updatingStageStatus, updateRun.GetGeneration())
340+
// No need to wait to get to the next stage.
341+
return 0, nil
342+
}
343+
// The after stage tasks are not ready yet.
344+
if waitTime < 0 {
345+
waitTime = stageUpdatingWaitTime
346+
}
347+
return waitTime, nil
267348
}
268349

269350
// executeDeleteStage executes the delete stage by deleting the bindings.

0 commit comments

Comments
 (0)