Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package common

import (
"fmt"
"strings"
"time"

apierr "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -48,6 +49,14 @@ func IsRetryableKubeAPIError(err error) bool {
return true
}

// IsResourceConstraintError returns true if the error is a ResourceQuota error
func IsResourceConstraintError(err error) bool {
if apierr.IsForbidden(err) {
return strings.Contains(err.Error(), "exceeded quota")
}
return false
}

// Convert2WaitBackoff converts to a wait backoff option
func Convert2WaitBackoff(backoff *apicommon.Backoff) (*wait.Backoff, error) {
result := wait.Backoff{}
Expand Down Expand Up @@ -113,3 +122,42 @@ func DoWithRetry(backoff *apicommon.Backoff, f func() error) error {
}
return nil
}

// DoWithResourceAwareRetry performs retry with different strategies based on error type
// Uses resourceBackoff for resource constraint errors (quota, limits), defaultBackoff for others
func DoWithResourceAwareRetry(defaultBackoff *apicommon.Backoff, resourceBackoff *apicommon.Backoff, f func() error) error {
if defaultBackoff == nil {
defaultBackoff = &DefaultBackoff
}

// Try the function once to determine error type
err := f()
if err == nil {
return nil
}

// Select retry strategy based on error type
strategy := defaultBackoff
if IsResourceConstraintError(err) && resourceBackoff != nil {
strategy = resourceBackoff
}

// Convert to wait backoff
b, convErr := Convert2WaitBackoff(strategy)
if convErr != nil {
return fmt.Errorf("invalid backoff configuration, %w", convErr)
}

// Perform retry
_ = wait.ExponentialBackoff(*b, func() (bool, error) {
if err = f(); err != nil {
return false, nil
}
return true, nil
})

if err != nil {
return fmt.Errorf("failed after retries: %w", err)
}
return nil
}
107 changes: 107 additions & 0 deletions common/retry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,110 @@ func TestConvert2WaitBackoff(t *testing.T) {
Steps: 2,
}, *waitBackoff)
}

func TestIsResourceConstraintError(t *testing.T) {
tests := []struct {
name string
err error
expected bool
}{
{
name: "nil error",
err: nil,
expected: false,
},
{
name: "quota exceeded forbidden error",
err: errors.NewForbidden(v1alpha1.Resource("workflows"), "test", fmt.Errorf("exceeded quota: workflow-limit")),
expected: true,
},
{
name: "regular forbidden error",
err: errors.NewForbidden(v1alpha1.Resource("sensor"), "test", fmt.Errorf("access denied")),
expected: false,
},
{
name: "not found error",
err: errors.NewNotFound(v1alpha1.Resource("sensor"), "test"),
expected: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := IsResourceConstraintError(tt.err)
assert.Equal(t, tt.expected, result)
})
}
}

func TestDoWithResourceAwareRetry(t *testing.T) {
t.Run("successful execution", func(t *testing.T) {
callCount := 0
err := DoWithResourceAwareRetry(nil, nil, func() error {
callCount++
return nil
})
assert.NoError(t, err)
assert.Equal(t, 1, callCount)
})

t.Run("regular error uses default backoff", func(t *testing.T) {
callCount := 0
defaultBackoff := &apicommon.Backoff{Steps: 2}
err := DoWithResourceAwareRetry(defaultBackoff, nil, func() error {
callCount++
if callCount < 2 {
return fmt.Errorf("regular error")
}
return nil
})
assert.NoError(t, err)
assert.Equal(t, 2, callCount)
})

t.Run("resource constraint error uses resource backoff", func(t *testing.T) {
callCount := 0
defaultBackoff := &apicommon.Backoff{Steps: 5}
resourceBackoff := &apicommon.Backoff{Steps: 2}

err := DoWithResourceAwareRetry(defaultBackoff, resourceBackoff, func() error {
callCount++
if callCount < 2 {
return errors.NewForbidden(v1alpha1.Resource("pods"), "test", fmt.Errorf("exceeded quota"))
}
return nil
})
assert.NoError(t, err)
assert.Equal(t, 2, callCount)
})

t.Run("resource constraint error without resource backoff uses default", func(t *testing.T) {
callCount := 0
defaultBackoff := &apicommon.Backoff{Steps: 2}

err := DoWithResourceAwareRetry(defaultBackoff, nil, func() error {
callCount++
if callCount < 2 {
return errors.NewForbidden(v1alpha1.Resource("pods"), "test", fmt.Errorf("exceeded quota"))
}
return nil
})
assert.NoError(t, err)
assert.Equal(t, 2, callCount)
})

t.Run("all retries exhausted", func(t *testing.T) {
callCount := 0
defaultBackoff := &apicommon.Backoff{Steps: 2}

err := DoWithResourceAwareRetry(defaultBackoff, nil, func() error {
callCount++
return fmt.Errorf("persistent error")
})
assert.Error(t, err)
assert.Contains(t, err.Error(), "failed after retries")
assert.Contains(t, err.Error(), "persistent error")
assert.Equal(t, 3, callCount) // Initial attempt + 2 retries
})
}
3 changes: 3 additions & 0 deletions pkg/apis/sensor/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,9 @@ type Trigger struct {
// Retry strategy, defaults to no retry
// +optional
RetryStrategy *apicommon.Backoff `json:"retryStrategy,omitempty" protobuf:"bytes,4,opt,name=retryStrategy"`
// Resource constraint retry strategy (for quota, limits, etc.), defaults to retryStrategy
// +optional
ResourceRetryStrategy *apicommon.Backoff `json:"resourceRetryStrategy,omitempty" protobuf:"bytes,8,opt,name=resourceRetryStrategy"`
// Rate limit, default unit is Second
// +optional
RateLimit *RateLimit `json:"rateLimit,omitempty" protobuf:"bytes,5,opt,name=rateLimit"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 23 additions & 1 deletion sensors/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,33 @@ func (sensorCtx *SensorContext) triggerWithRateLimit(ctx context.Context, sensor
}

log := logging.FromContext(ctx)
if err := sensorCtx.triggerOne(ctx, sensor, trigger, eventsMapping, depNames, eventIDs, log); err != nil {

// Use resource-aware retry: detect quota errors and use different retry strategy
retryStrategy := trigger.RetryStrategy
if retryStrategy == nil {
retryStrategy = &apicommon.Backoff{Steps: 1}
}
resourceRetryStrategy := trigger.ResourceRetryStrategy

err := common.DoWithResourceAwareRetry(retryStrategy, resourceRetryStrategy, func() error {
return sensorCtx.triggerOne(ctx, sensor, trigger, eventsMapping, depNames, eventIDs, log)
})

if err != nil {
// Log the error, and let it continue
log.Errorw("Failed to execute a trigger", zap.Error(err), zap.String(logging.LabelTriggerName, trigger.Template.Name),
zap.Any("triggeredBy", depNames), zap.Any("triggeredByEvents", eventIDs))
sensorCtx.metrics.ActionFailed(sensor.Name, trigger.Template.Name)

// If all retries exhausted and DLQ is configured, invoke DLQ trigger
if trigger.DlqTrigger != nil {
log.Debugf("All retries exhausted, invoking DLQ trigger")
dlqErr := sensorCtx.triggerOne(ctx, sensor, *trigger.DlqTrigger, eventsMapping, depNames, eventIDs, log)
if dlqErr != nil {
log.Errorw("Failed to execute DLQ trigger", zap.Error(dlqErr), zap.String(logging.LabelTriggerName, trigger.DlqTrigger.Template.Name))
sensorCtx.metrics.ActionFailed(sensor.Name, trigger.DlqTrigger.Template.Name)
}
}
} else {
sensorCtx.metrics.ActionTriggered(sensor.Name, trigger.Template.Name)
}
Expand Down