diff --git a/common/retry.go b/common/retry.go index f7284fa461..3083d6bf3d 100644 --- a/common/retry.go +++ b/common/retry.go @@ -18,6 +18,7 @@ package common import ( "fmt" + "strings" "time" apierr "k8s.io/apimachinery/pkg/api/errors" @@ -48,6 +49,14 @@ func IsRetryableKubeAPIError(err error) bool { return true } +// IsResourceConstraintError returns true if the error is a ResourceQuota error +func IsResourceConstraintError(err error) bool { + if apierr.IsForbidden(err) { + return strings.Contains(err.Error(), "exceeded quota") + } + return false +} + // Convert2WaitBackoff converts to a wait backoff option func Convert2WaitBackoff(backoff *apicommon.Backoff) (*wait.Backoff, error) { result := wait.Backoff{} @@ -113,3 +122,42 @@ func DoWithRetry(backoff *apicommon.Backoff, f func() error) error { } return nil } + +// DoWithResourceAwareRetry performs retry with different strategies based on error type +// Uses resourceBackoff for resource constraint errors (quota, limits), defaultBackoff for others +func DoWithResourceAwareRetry(defaultBackoff *apicommon.Backoff, resourceBackoff *apicommon.Backoff, f func() error) error { + if defaultBackoff == nil { + defaultBackoff = &DefaultBackoff + } + + // Try the function once to determine error type + err := f() + if err == nil { + return nil + } + + // Select retry strategy based on error type + strategy := defaultBackoff + if IsResourceConstraintError(err) && resourceBackoff != nil { + strategy = resourceBackoff + } + + // Convert to wait backoff + b, convErr := Convert2WaitBackoff(strategy) + if convErr != nil { + return fmt.Errorf("invalid backoff configuration, %w", convErr) + } + + // Perform retry + _ = wait.ExponentialBackoff(*b, func() (bool, error) { + if err = f(); err != nil { + return false, nil + } + return true, nil + }) + + if err != nil { + return fmt.Errorf("failed after retries: %w", err) + } + return nil +} diff --git a/common/retry_test.go b/common/retry_test.go index 05abc81995..22548ff558 100644 --- a/common/retry_test.go +++ b/common/retry_test.go @@ -138,3 +138,110 @@ func TestConvert2WaitBackoff(t *testing.T) { Steps: 2, }, *waitBackoff) } + +func TestIsResourceConstraintError(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "quota exceeded forbidden error", + err: errors.NewForbidden(v1alpha1.Resource("workflows"), "test", fmt.Errorf("exceeded quota: workflow-limit")), + expected: true, + }, + { + name: "regular forbidden error", + err: errors.NewForbidden(v1alpha1.Resource("sensor"), "test", fmt.Errorf("access denied")), + expected: false, + }, + { + name: "not found error", + err: errors.NewNotFound(v1alpha1.Resource("sensor"), "test"), + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IsResourceConstraintError(tt.err) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestDoWithResourceAwareRetry(t *testing.T) { + t.Run("successful execution", func(t *testing.T) { + callCount := 0 + err := DoWithResourceAwareRetry(nil, nil, func() error { + callCount++ + return nil + }) + assert.NoError(t, err) + assert.Equal(t, 1, callCount) + }) + + t.Run("regular error uses default backoff", func(t *testing.T) { + callCount := 0 + defaultBackoff := &apicommon.Backoff{Steps: 2} + err := DoWithResourceAwareRetry(defaultBackoff, nil, func() error { + callCount++ + if callCount < 2 { + return fmt.Errorf("regular error") + } + return nil + }) + assert.NoError(t, err) + assert.Equal(t, 2, callCount) + }) + + t.Run("resource constraint error uses resource backoff", func(t *testing.T) { + callCount := 0 + defaultBackoff := &apicommon.Backoff{Steps: 5} + resourceBackoff := &apicommon.Backoff{Steps: 2} + + err := DoWithResourceAwareRetry(defaultBackoff, resourceBackoff, func() error { + callCount++ + if callCount < 2 { + return errors.NewForbidden(v1alpha1.Resource("pods"), "test", fmt.Errorf("exceeded quota")) + } + return nil + }) + assert.NoError(t, err) + assert.Equal(t, 2, callCount) + }) + + t.Run("resource constraint error without resource backoff uses default", func(t *testing.T) { + callCount := 0 + defaultBackoff := &apicommon.Backoff{Steps: 2} + + err := DoWithResourceAwareRetry(defaultBackoff, nil, func() error { + callCount++ + if callCount < 2 { + return errors.NewForbidden(v1alpha1.Resource("pods"), "test", fmt.Errorf("exceeded quota")) + } + return nil + }) + assert.NoError(t, err) + assert.Equal(t, 2, callCount) + }) + + t.Run("all retries exhausted", func(t *testing.T) { + callCount := 0 + defaultBackoff := &apicommon.Backoff{Steps: 2} + + err := DoWithResourceAwareRetry(defaultBackoff, nil, func() error { + callCount++ + return fmt.Errorf("persistent error") + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed after retries") + assert.Contains(t, err.Error(), "persistent error") + assert.Equal(t, 3, callCount) // Initial attempt + 2 retries + }) +} diff --git a/pkg/apis/sensor/v1alpha1/types.go b/pkg/apis/sensor/v1alpha1/types.go index 9678245fbe..ecaee5c4a4 100644 --- a/pkg/apis/sensor/v1alpha1/types.go +++ b/pkg/apis/sensor/v1alpha1/types.go @@ -330,6 +330,9 @@ type Trigger struct { // Retry strategy, defaults to no retry // +optional RetryStrategy *apicommon.Backoff `json:"retryStrategy,omitempty" protobuf:"bytes,4,opt,name=retryStrategy"` + // Resource constraint retry strategy (for quota, limits, etc.), defaults to retryStrategy + // +optional + ResourceRetryStrategy *apicommon.Backoff `json:"resourceRetryStrategy,omitempty" protobuf:"bytes,8,opt,name=resourceRetryStrategy"` // Rate limit, default unit is Second // +optional RateLimit *RateLimit `json:"rateLimit,omitempty" protobuf:"bytes,5,opt,name=rateLimit"` diff --git a/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go index 9eaf1b4e21..d73166f469 100644 --- a/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go @@ -1262,6 +1262,11 @@ func (in *Trigger) DeepCopyInto(out *Trigger) { *out = new(common.Backoff) (*in).DeepCopyInto(*out) } + if in.ResourceRetryStrategy != nil { + in, out := &in.ResourceRetryStrategy, &out.ResourceRetryStrategy + *out = new(common.Backoff) + (*in).DeepCopyInto(*out) + } if in.RateLimit != nil { in, out := &in.RateLimit, &out.RateLimit *out = new(RateLimit) diff --git a/sensors/listener.go b/sensors/listener.go index ec14df6ea3..2252b8e641 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -358,11 +358,33 @@ func (sensorCtx *SensorContext) triggerWithRateLimit(ctx context.Context, sensor } log := logging.FromContext(ctx) - if err := sensorCtx.triggerOne(ctx, sensor, trigger, eventsMapping, depNames, eventIDs, log); err != nil { + + // Use resource-aware retry: detect quota errors and use different retry strategy + retryStrategy := trigger.RetryStrategy + if retryStrategy == nil { + retryStrategy = &apicommon.Backoff{Steps: 1} + } + resourceRetryStrategy := trigger.ResourceRetryStrategy + + err := common.DoWithResourceAwareRetry(retryStrategy, resourceRetryStrategy, func() error { + return sensorCtx.triggerOne(ctx, sensor, trigger, eventsMapping, depNames, eventIDs, log) + }) + + if err != nil { // Log the error, and let it continue log.Errorw("Failed to execute a trigger", zap.Error(err), zap.String(logging.LabelTriggerName, trigger.Template.Name), zap.Any("triggeredBy", depNames), zap.Any("triggeredByEvents", eventIDs)) sensorCtx.metrics.ActionFailed(sensor.Name, trigger.Template.Name) + + // If all retries exhausted and DLQ is configured, invoke DLQ trigger + if trigger.DlqTrigger != nil { + log.Debugf("All retries exhausted, invoking DLQ trigger") + dlqErr := sensorCtx.triggerOne(ctx, sensor, *trigger.DlqTrigger, eventsMapping, depNames, eventIDs, log) + if dlqErr != nil { + log.Errorw("Failed to execute DLQ trigger", zap.Error(dlqErr), zap.String(logging.LabelTriggerName, trigger.DlqTrigger.Template.Name)) + sensorCtx.metrics.ActionFailed(sensor.Name, trigger.DlqTrigger.Template.Name) + } + } } else { sensorCtx.metrics.ActionTriggered(sensor.Name, trigger.Template.Name) }