Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 85 additions & 21 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// Emit the update run status metric based on status conditions in the updateRun.
defer emitUpdateRunStatusMetric(updateRun)

state := updateRun.GetUpdateRunSpec().State
switch state { // Early check for abandoned state - this is a terminal state, no initialization needed.
case placementv1beta1.StateAbandoned:
klog.V(2).InfoS("The updateRun is abandoned, terminating", "state", state, "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunAbandoned(ctx, updateRun)
case placementv1beta1.StateStopped: // Early check for stopped state - pause the update run if needed.
klog.V(2).InfoS("The updateRun is paused, waiting to resume", "state", state, "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunPaused(ctx, updateRun)
}

var updatingStageIndex int
var toBeUpdatedBindings, toBeDeletedBindings []placementv1beta1.BindingObj
updateRunStatus := updateRun.GetUpdateRunStatus()
initCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
if !condition.IsConditionStatusTrue(initCond, updateRun.GetGeneration()) {
// Check if initialized regardless of generation.
// The updateRun spec fields are immutable except for the state field. When the state changes,
// the update run generation increments, but we don't need to reinitialize since initialization is a one-time setup.
isInitialized := initCond != nil && initCond.Status == metav1.ConditionTrue
if !isInitialized {
// Check if initialization failed for the current generation.
if condition.IsConditionStatusFalse(initCond, updateRun.GetGeneration()) {
klog.V(2).InfoS("The updateRun has failed to initialize", "errorMsg", initCond.Message, "updateRun", runObjRef)
return runtime.Result{}, nil
Expand All @@ -122,7 +137,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
}
return runtime.Result{}, initErr
}
updatingStageIndex = 0 // start from the first stage.
updatingStageIndex = 0 // start from the first stage (typically for NotStarted or Started states).
klog.V(2).InfoS("Initialized the updateRun", "updateRun", runObjRef)
} else {
klog.V(2).InfoS("The updateRun is initialized", "updateRun", runObjRef)
Expand All @@ -134,6 +149,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
}
var validateErr error
// Validate the updateRun status to ensure the update can be continued and get the updating stage index and cluster indices.
// For Stopped → Started transition, this will resume from where it left off.
if updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings, validateErr = r.validate(ctx, updateRun); validateErr != nil {
// errStagedUpdatedAborted cannot be retried.
if errors.Is(validateErr, errStagedUpdatedAborted) {
Expand All @@ -151,28 +167,32 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
}

// Execute the updateRun.
klog.V(2).InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, execErr.Error())
}
if state == placementv1beta1.StateStarted {
klog.V(2).InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, execErr.Error())
}

if finished {
klog.V(2).InfoS("The updateRun is completed", "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
}
if finished {
klog.V(2).InfoS("The updateRun is completed", "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
}

// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
}
return runtime.Result{RequeueAfter: waitTime}, nil
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
klog.V(2).InfoS("The updateRun is not started, waiting to be started", "state", state, "updateRun", runObjRef)
return runtime.Result{}, nil
}

// handleDelete handles the deletion of the updateRun object.
Expand Down Expand Up @@ -265,6 +285,50 @@ func (r *Reconciler) recordUpdateRunFailed(ctx context.Context, updateRun placem
return nil
}

// recordUpdateRunPaused records the progressing condition as paused in the updateRun status.
func (r *Reconciler) recordUpdateRunPaused(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error {
updateRunStatus := updateRun.GetUpdateRunStatus()
meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.GetGeneration(),
Reason: condition.UpdateRunPausedReason,
Message: "The update run is paused",
})
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the updateRun status as paused", "updateRun", klog.KObj(updateRun))
// updateErr can be retried.
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}

// recordUpdateRunAbandoned records the succeeded and progressing condition as abandoned in the updateRun status.
func (r *Reconciler) recordUpdateRunAbandoned(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error {
updateRunStatus := updateRun.GetUpdateRunStatus()
meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.GetGeneration(),
Reason: condition.UpdateRunAbandonedReason,
Message: "The stages are aborted due to abandonment",
})
meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{
Type: string(placementv1beta1.StagedUpdateRunConditionSucceeded),
Status: metav1.ConditionFalse,
ObservedGeneration: updateRun.GetGeneration(),
Reason: condition.UpdateRunAbandonedReason,
Message: "The update run has been abandoned",
})

if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
klog.ErrorS(updateErr, "Failed to update the updateRun status as failed", "updateRun", klog.KObj(updateRun))
// updateErr can be retried.
return controller.NewUpdateIgnoreConflictError(updateErr)
}
return nil
}
Comment on lines +289 to +330
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to update the status of each cluster as those could be in the middle of upgrading.


// recordUpdateRunStatus records the updateRun status.
func (r *Reconciler) recordUpdateRunStatus(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error {
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
Expand Down
54 changes: 38 additions & 16 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ func generateMetricsLabels(
}
}

func generateInitializationSucceededMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
string(metav1.ConditionTrue), condition.UpdateRunInitializeSucceededReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateInitializationFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
Expand Down Expand Up @@ -312,6 +322,26 @@ func generateStuckMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *pr
}
}

func generatePausedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionFalse), condition.UpdateRunPausedReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateAbandonedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
string(metav1.ConditionFalse), condition.UpdateRunAbandonedReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
Expand Down Expand Up @@ -341,6 +371,7 @@ func generateTestClusterStagedUpdateRun() *placementv1beta1.ClusterStagedUpdateR
PlacementName: testCRPName,
ResourceSnapshotIndex: testResourceSnapshotIndex,
StagedUpdateStrategyName: testUpdateStrategyName,
State: placementv1beta1.StateStarted,
},
}
}
Expand Down Expand Up @@ -796,23 +827,14 @@ func generateFalseCondition(obj client.Object, condType any) metav1.Condition {
}
}

func generateFalseProgressingCondition(obj client.Object, condType any, succeeded bool) metav1.Condition {
func generateFalseProgressingCondition(obj client.Object, condType any, reason string) metav1.Condition {
falseCond := generateFalseCondition(obj, condType)
falseCond.Reason = reason
return falseCond
}

func generateFalseSucceededCondition(obj client.Object, condType any, reason string) metav1.Condition {
falseCond := generateFalseCondition(obj, condType)
reason := ""
switch condType {
case placementv1beta1.StagedUpdateRunConditionProgressing:
if succeeded {
reason = condition.UpdateRunSucceededReason
} else {
reason = condition.UpdateRunFailedReason
}
case placementv1beta1.StageUpdatingConditionProgressing:
if succeeded {
reason = condition.StageUpdatingSucceededReason
} else {
reason = condition.StageUpdatingFailedReason
}
}
falseCond.Reason = reason
return falseCond
}
58 changes: 37 additions & 21 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,31 +241,46 @@ func (r *Reconciler) executeUpdatingStage(
}

if finishedClusterCount == len(updatingStageStatus.Clusters) {
// All the clusters in the stage have been updated.
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
markStageUpdatingWaiting(updatingStageStatus, updateRun.GetGeneration())
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)
// Check if the after stage tasks are ready.
approved, waitTime, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun)
if err != nil {
return 0, err
}
if approved {
markUpdateRunProgressing(updateRun)
markStageUpdatingSucceeded(updatingStageStatus, updateRun.GetGeneration())
// No need to wait to get to the next stage.
return 0, nil
}
// The after stage tasks are not ready yet.
if waitTime < 0 {
waitTime = stageUpdatingWaitTime
}
return waitTime, nil
return r.handleStageCompletion(ctx, updatingStageIndex, updateRun, updatingStageStatus)
}

// Some clusters are still updating.
return clusterUpdatingWaitTime, nil
}

// handleStageCompletion handles the completion logic when all clusters in a stage are finished.
// Returns the wait time and any error encountered.
func (r *Reconciler) handleStageCompletion(
ctx context.Context,
updatingStageIndex int,
updateRun placementv1beta1.UpdateRunObj,
updatingStageStatus *placementv1beta1.StageUpdatingStatus,
) (time.Duration, error) {
updateRunRef := klog.KObj(updateRun)

// All the clusters in the stage have been updated.
markUpdateRunWaiting(updateRun, updatingStageStatus.StageName)
markStageUpdatingWaiting(updatingStageStatus, updateRun.GetGeneration())
klog.V(2).InfoS("The stage has finished all cluster updating", "stage", updatingStageStatus.StageName, "updateRun", updateRunRef)

// Check if the after stage tasks are ready.
approved, waitTime, err := r.checkAfterStageTasksStatus(ctx, updatingStageIndex, updateRun)
if err != nil {
return 0, err
}
if approved {
markUpdateRunProgressing(updateRun)
markStageUpdatingSucceeded(updatingStageStatus, updateRun.GetGeneration())
// No need to wait to get to the next stage.
return 0, nil
}
// The after stage tasks are not ready yet.
if waitTime < 0 {
waitTime = stageUpdatingWaitTime
}
return waitTime, nil
}

// executeDeleteStage executes the delete stage by deleting the bindings.
func (r *Reconciler) executeDeleteStage(
ctx context.Context,
Expand Down Expand Up @@ -293,7 +308,8 @@ func (r *Reconciler) executeDeleteStage(
// In validation, we already check the binding must exist in the status.
delete(existingDeleteStageClusterMap, bindingSpec.TargetCluster)
// Make sure the cluster is not marked as deleted as the binding is still there.
if condition.IsConditionStatusTrue(meta.FindStatusCondition(curCluster.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded)), updateRun.GetGeneration()) {
clusterDeleteSucceededCond := meta.FindStatusCondition(curCluster.Conditions, string(placementv1beta1.ClusterUpdatingConditionSucceeded))
if clusterDeleteSucceededCond != nil && clusterDeleteSucceededCond.Status == metav1.ConditionTrue {
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("the deleted cluster `%s` in the deleting stage still has a binding", bindingSpec.TargetCluster))
klog.ErrorS(unexpectedErr, "The cluster in the deleting stage is not removed yet but marked as deleted", "cluster", curCluster.ClusterName, "updateRun", updateRunRef)
return false, fmt.Errorf("%w: %s", errStagedUpdatedAborted, unexpectedErr.Error())
Expand Down
Loading
Loading