Skip to content

Commit 3881da5

Browse files
authored
fix: abandon the work applier reconciliation loop when the main context exits (#343)
* Minor changes Signed-off-by: michaelawyu <[email protected]> * Minor fixes Signed-off-by: michaelawyu <[email protected]> * Minor fixes Signed-off-by: michaelawyu <[email protected]> --------- Signed-off-by: michaelawyu <[email protected]>
1 parent 6bf6d62 commit 3881da5

File tree

5 files changed

+59
-7
lines changed

5 files changed

+59
-7
lines changed

pkg/controllers/workapplier/availability_tracker.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,8 @@ import (
3535
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
3636
)
3737

38-
// trackInMemberClusterObjAvailability tracks the availability of an applied objects in the member cluster.
39-
func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bundles []*manifestProcessingBundle, workRef klog.ObjectRef) {
38+
// trackInMemberClusterObjAvailability tracks the availability of applied objects in the member cluster.
39+
func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bundles []*manifestProcessingBundle, workRef klog.ObjectRef) error {
4040
// Track the availability of all the applied objects in the member cluster in parallel.
4141
//
4242
// This is concurrency-safe as the bundles slice has been pre-allocated.
@@ -83,6 +83,17 @@ func (r *Reconciler) trackInMemberClusterObjAvailability(ctx context.Context, bu
8383

8484
// Run the availability check in parallel.
8585
r.parallelizer.ParallelizeUntil(childCtx, len(bundles), doWork, "trackInMemberClusterObjAvailability")
86+
87+
// Unlike some other steps in the reconciliation loop, the availability checking step does not end
88+
// with a contextual API call; consequently, if the context has been cancelled during this step,
89+
// some checks might not run at all, and passing such bundles to the next step may trigger
90+
// unexpected behaviors. To address this, at the end of this step the work applier checks for context
91+
// cancellation directly.
92+
if err := ctx.Err(); err != nil {
93+
klog.V(2).InfoS("availability checking has been interrupted as the main context has been cancelled")
94+
return fmt.Errorf("availability checking has been interrupted: %w", err)
95+
}
96+
return nil
8697
}
8798

8899
// trackInMemberClusterObjAvailabilityByGVR tracks the availability of an object in the member cluster based

pkg/controllers/workapplier/availability_tracker_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1126,7 +1126,10 @@ func TestTrackInMemberClusterObjAvailability(t *testing.T) {
11261126
parallelizer: parallelizer.NewParallelizer(2),
11271127
}
11281128

1129-
r.trackInMemberClusterObjAvailability(ctx, tc.bundles, workRef)
1129+
if err := r.trackInMemberClusterObjAvailability(ctx, tc.bundles, workRef); err != nil {
1130+
// Normally this would never occur.
1131+
t.Fatalf("trackInMemberClusterObjAvailability() = %v, want no error", err)
1132+
}
11301133

11311134
// A special less func to sort the bundles by their ordinal.
11321135
lessFuncManifestProcessingBundle := func(i, j *manifestProcessingBundle) bool {

pkg/controllers/workapplier/controller.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -479,18 +479,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
479479
// c) report configuration differences if applicable;
480480
// d) check for configuration drifts if applicable;
481481
// e) apply each manifest.
482-
r.processManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef)
482+
if err := r.processManifests(ctx, bundles, work, expectedAppliedWorkOwnerRef); err != nil {
483+
klog.ErrorS(err, "Failed to process the manifests", "work", workRef)
484+
return ctrl.Result{}, err
485+
}
483486

484487
// Track the availability information.
485-
r.trackInMemberClusterObjAvailability(ctx, bundles, workRef)
488+
if err := r.trackInMemberClusterObjAvailability(ctx, bundles, workRef); err != nil {
489+
klog.ErrorS(err, "Failed to check for object availability", "work", workRef)
490+
return ctrl.Result{}, err
491+
}
486492

487493
// Refresh the status of the Work object.
488494
if err := r.refreshWorkStatus(ctx, work, bundles); err != nil {
495+
klog.ErrorS(err, "Failed to refresh work object status", "work", workRef)
489496
return ctrl.Result{}, err
490497
}
491498

492499
// Refresh the status of the AppliedWork object.
493500
if err := r.refreshAppliedWorkStatus(ctx, appliedWork, bundles); err != nil {
501+
klog.ErrorS(err, "Failed to refresh appliedWork object status", "appliedWork", klog.KObj(appliedWork))
494502
return ctrl.Result{}, err
495503
}
496504

pkg/controllers/workapplier/process.go

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func (r *Reconciler) processManifests(
3636
bundles []*manifestProcessingBundle,
3737
work *fleetv1beta1.Work,
3838
expectedAppliedWorkOwnerRef *metav1.OwnerReference,
39-
) {
39+
) error {
4040
// Process all manifests in parallel.
4141
//
4242
// There are cases where certain groups of manifests should not be processed in parallel with
@@ -58,7 +58,17 @@ func (r *Reconciler) processManifests(
5858
}
5959

6060
r.parallelizer.ParallelizeUntil(ctx, len(bundles), doWork, "processingManifestsInReportDiffMode")
61-
return
61+
62+
// Unlike some other steps in the reconciliation loop, the manifest processing step does not end
63+
// with a contextual API call; consequently, if the context has been cancelled during this step,
64+
// some manifest might not get processed at all, and passing such bundles to the next step may trigger
65+
// unexpected behaviors. To address this, at the end of this step the work applier checks for context
66+
// cancellation directly.
67+
if err := ctx.Err(); err != nil {
68+
klog.V(2).InfoS("manifest processing has been interrupted as the main context has been cancelled")
69+
return fmt.Errorf("manifest processing has been interrupted: %w", err)
70+
}
71+
return nil
6272
}
6373

6474
// Organize the bundles into different waves of bundles for parallel processing based on their
@@ -83,7 +93,18 @@ func (r *Reconciler) processManifests(
8393
}
8494

8595
r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, fmt.Sprintf("processingManifestsInWave%d", idx))
96+
97+
// Unlike some other steps in the reconciliation loop, the manifest processing step does not end
98+
// with a contextual API call; consequently, if the context has been cancelled during this step,
99+
// some manifest might not get processed at all, and passing such bundles to the next step may trigger
100+
// unexpected behaviors. To address this, at the end of this step the work applier checks for context
101+
// cancellation directly.
102+
if err := ctx.Err(); err != nil {
103+
klog.V(2).InfoS("manifest processing has been interrupted as the main context has been cancelled")
104+
return fmt.Errorf("manifest processing has been interrupted: %w", err)
105+
}
86106
}
107+
return nil
87108
}
88109

89110
// processOneManifest processes a manifest (in the JSON format) embedded in the Work object.

pkg/utils/parallelizer/parallelizer.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,13 @@ func (p *parallelizer) ParallelizeUntil(ctx context.Context, pieces int, doWork
5656
}
5757

5858
workqueue.ParallelizeUntil(ctx, p.numOfWorkers, pieces, doWorkWithLogs)
59+
60+
// Note (chenyu1): the ParallelizeUntil method is essentially a thin wrapper around the
61+
// workqueue.ParallelizeUntil method. Note that the workqueue.ParallelizeUntil method
62+
// right now does not return any error; it returns when the context is cancelled, possibly
63+
// in a willingly manner. Some of the KubeFleet code makes use of this to facilitate a
64+
// fail-fast pattern (i.e., pass in a child context to the parallelizer; if one worker
65+
// has exited, cancel the child context in the worker and consequently the whole parallelization).
66+
// As only the caller knows why a context is cancelled (willingly by a worker or not), we leave it to the
67+
// caller to inspect the context after this method returns rather than trying to do it here.
5968
}

0 commit comments

Comments
 (0)