diff --git a/common/common.go b/common/common.go index 3f6cf3e909..a9c834c4a3 100644 --- a/common/common.go +++ b/common/common.go @@ -103,6 +103,25 @@ const ( LabelSensorName = "sensor-name" ) +// Backpressure constants for quota-based flow control +const ( + // EnvVarResourceQuotaName is the ResourceQuota name to check before fetching messages + EnvVarResourceQuotaName = "RESOURCE_QUOTA_NAME" + // EnvVarBackpressureResourceName is the resource name in the quota + EnvVarBackpressureResourceName = "BACKPRESSURE_RESOURCE_NAME" + // EnvVarBackpressureCapacityRatio is the capacity threshold ratio + EnvVarBackpressureCapacityRatio = "BACKPRESSURE_CAPACITY_RATIO" + // EnvVarBackpressurePollInterval is the poll interval in seconds when blocked + EnvVarBackpressurePollInterval = "BACKPRESSURE_POLL_INTERVAL" + + // DefaultBackpressureResourceName is the default resource to check in quota + DefaultBackpressureResourceName = "count/workflows.argoproj.io" + // DefaultBackpressureCapacityRatio is the default threshold (0.95 = 5% buffer) + DefaultBackpressureCapacityRatio = 0.95 + // DefaultBackpressurePollInterval is the default poll interval in seconds + DefaultBackpressurePollInterval = 30 +) + // EventSource const ( // EnvVarEventSourceObject refers to the env of based64 encoded eventsource spec diff --git a/eventbus/jetstream/sensor/backpressure.go b/eventbus/jetstream/sensor/backpressure.go new file mode 100644 index 0000000000..5db9c44c9d --- /dev/null +++ b/eventbus/jetstream/sensor/backpressure.go @@ -0,0 +1,167 @@ +package sensor + +import ( + "context" + "time" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/argoproj/argo-events/metrics" +) + +// BackpressureWaiter checks ResourceQuota before allowing message fetch. +// This prevents fetching messages when downstream workflow capacity is exhausted, +// keeping messages safe in JetStream during backpressure conditions. +type BackpressureWaiter struct { + kubeClient kubernetes.Interface + namespace string + quotaName string + resourceName string // e.g., "count/workflows.argoproj.io" + capacityRatio float64 // e.g., 0.95 for 5% buffer + pollInterval time.Duration // How often to poll when blocked + logger *zap.SugaredLogger + metrics *metrics.Metrics + sensorName string + triggerName string +} + +// BackpressureConfig holds configuration for BackpressureWaiter +type BackpressureConfig struct { + QuotaName string + ResourceName string // Default: "count/workflows.argoproj.io" + CapacityRatio float64 // Default: 0.95 (5% buffer) + PollInterval time.Duration // Default: 30s + SensorName string + TriggerName string +} + +// NewBackpressureWaiter creates a new BackpressureWaiter +func NewBackpressureWaiter( + kubeClient kubernetes.Interface, + namespace string, + config BackpressureConfig, + m *metrics.Metrics, + logger *zap.SugaredLogger, +) *BackpressureWaiter { + // Apply defaults + if config.ResourceName == "" { + config.ResourceName = "count/workflows.argoproj.io" + } + if config.CapacityRatio <= 0 || config.CapacityRatio > 1 { + config.CapacityRatio = 0.95 // 5% buffer by default + } + if config.PollInterval <= 0 { + config.PollInterval = 30 * time.Second + } + + return &BackpressureWaiter{ + kubeClient: kubeClient, + namespace: namespace, + quotaName: config.QuotaName, + resourceName: config.ResourceName, + capacityRatio: config.CapacityRatio, + pollInterval: config.PollInterval, + logger: logger, + metrics: m, + sensorName: config.SensorName, + triggerName: config.TriggerName, + } +} + +// HasCapacity checks if there's capacity to process more workflows. +// Returns true if used < (hard * capacityRatio), false otherwise. +func (b *BackpressureWaiter) HasCapacity(ctx context.Context) (bool, error) { + quota, err := b.kubeClient.CoreV1().ResourceQuotas(b.namespace).Get(ctx, b.quotaName, metav1.GetOptions{}) + if err != nil { + return false, err + } + + resourceName := corev1.ResourceName(b.resourceName) + hard := quota.Status.Hard[resourceName] + used := quota.Status.Used[resourceName] + + hardVal := hard.Value() + usedVal := used.Value() + threshold := int64(float64(hardVal) * b.capacityRatio) + + hasCapacity := usedVal < threshold + + // Debug logging - only visible when debug level is enabled + b.logger.Debugw("Quota check performed", + "quotaName", b.quotaName, + "resourceName", b.resourceName, + "hard", hardVal, + "used", usedVal, + "threshold", threshold, + "hasCapacity", hasCapacity, + ) + + return hasCapacity, nil +} + +// WaitForCapacity blocks until there's capacity available or context is cancelled. +// This should be called BEFORE fetching messages from JetStream. +func (b *BackpressureWaiter) WaitForCapacity(ctx context.Context) error { + wasBlocked := false + + for { + hasCapacity, err := b.HasCapacity(ctx) + if err != nil { + // Fail-closed: if we can't verify quota, don't fetch + // This catches config errors (typos, RBAC issues) early + if !wasBlocked { + wasBlocked = true + if b.metrics != nil { + b.metrics.SetSensorQuotaBlocked(b.sensorName, b.triggerName, true) + } + } + b.logger.Errorw("Failed to check quota, will retry (message stays safe in JetStream)", + "error", err, + "quotaName", b.quotaName, + "pollInterval", b.pollInterval, + ) + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(b.pollInterval): + continue // Retry checking quota + } + } + + if hasCapacity { + // Clear blocked metric if we were blocked + if wasBlocked { + b.logger.Infow("Capacity available, resuming message fetch", + "quotaName", b.quotaName, + ) + if b.metrics != nil { + b.metrics.SetSensorQuotaBlocked(b.sensorName, b.triggerName, false) + } + } + return nil + } + + // Mark as blocked on first iteration without capacity + if !wasBlocked { + wasBlocked = true + b.logger.Infow("Backpressure active, blocking message fetch until capacity available", + "quotaName", b.quotaName, + "pollInterval", b.pollInterval, + ) + if b.metrics != nil { + b.metrics.SetSensorQuotaBlocked(b.sensorName, b.triggerName, true) + } + } + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(b.pollInterval): + // Continue checking + } + } +} + diff --git a/eventbus/jetstream/sensor/backpressure_test.go b/eventbus/jetstream/sensor/backpressure_test.go new file mode 100644 index 0000000000..35290dd19b --- /dev/null +++ b/eventbus/jetstream/sensor/backpressure_test.go @@ -0,0 +1,312 @@ +package sensor + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func TestNewBackpressureWaiter(t *testing.T) { + kubeClient := fake.NewSimpleClientset() + logger := zap.NewNop().Sugar() + + t.Run("applies defaults for empty config", func(t *testing.T) { + config := BackpressureConfig{ + QuotaName: "test-quota", + } + waiter := NewBackpressureWaiter(kubeClient, "default", config, nil, logger) + + assert.Equal(t, "test-quota", waiter.quotaName) + assert.Equal(t, "count/workflows.argoproj.io", waiter.resourceName) + assert.Equal(t, 0.95, waiter.capacityRatio) + assert.Equal(t, 30*time.Second, waiter.pollInterval) + }) + + t.Run("uses provided config values", func(t *testing.T) { + config := BackpressureConfig{ + QuotaName: "custom-quota", + ResourceName: "count/pods", + CapacityRatio: 0.8, + PollInterval: 10 * time.Second, + } + waiter := NewBackpressureWaiter(kubeClient, "test-ns", config, nil, logger) + + assert.Equal(t, "custom-quota", waiter.quotaName) + assert.Equal(t, "count/pods", waiter.resourceName) + assert.Equal(t, 0.8, waiter.capacityRatio) + assert.Equal(t, 10*time.Second, waiter.pollInterval) + }) + + t.Run("corrects invalid capacity ratio", func(t *testing.T) { + config := BackpressureConfig{ + QuotaName: "test-quota", + CapacityRatio: 1.5, // Invalid - greater than 1 + } + waiter := NewBackpressureWaiter(kubeClient, "default", config, nil, logger) + assert.Equal(t, 0.95, waiter.capacityRatio) + + config.CapacityRatio = -0.5 // Invalid - negative + waiter = NewBackpressureWaiter(kubeClient, "default", config, nil, logger) + assert.Equal(t, 0.95, waiter.capacityRatio) + + config.CapacityRatio = 0 // Invalid - zero + waiter = NewBackpressureWaiter(kubeClient, "default", config, nil, logger) + assert.Equal(t, 0.95, waiter.capacityRatio) + }) +} + +func TestHasCapacity(t *testing.T) { + logger := zap.NewNop().Sugar() + ctx := context.Background() + + t.Run("returns true when under threshold", func(t *testing.T) { + quota := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-quota", + Namespace: "default", + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("100"), + }, + Used: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("90"), // 90% used, threshold is 95% + }, + }, + } + kubeClient := fake.NewSimpleClientset(quota) + + waiter := NewBackpressureWaiter(kubeClient, "default", BackpressureConfig{ + QuotaName: "test-quota", + CapacityRatio: 0.95, + }, nil, logger) + + hasCapacity, err := waiter.HasCapacity(ctx) + assert.NoError(t, err) + assert.True(t, hasCapacity) + }) + + t.Run("returns false when at threshold", func(t *testing.T) { + quota := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-quota", + Namespace: "default", + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("100"), + }, + Used: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("95"), // At threshold + }, + }, + } + kubeClient := fake.NewSimpleClientset(quota) + + waiter := NewBackpressureWaiter(kubeClient, "default", BackpressureConfig{ + QuotaName: "test-quota", + CapacityRatio: 0.95, + }, nil, logger) + + hasCapacity, err := waiter.HasCapacity(ctx) + assert.NoError(t, err) + assert.False(t, hasCapacity) + }) + + t.Run("returns false when over threshold", func(t *testing.T) { + quota := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-quota", + Namespace: "default", + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("100"), + }, + Used: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("98"), + }, + }, + } + kubeClient := fake.NewSimpleClientset(quota) + + waiter := NewBackpressureWaiter(kubeClient, "default", BackpressureConfig{ + QuotaName: "test-quota", + CapacityRatio: 0.95, + }, nil, logger) + + hasCapacity, err := waiter.HasCapacity(ctx) + assert.NoError(t, err) + assert.False(t, hasCapacity) + }) + + t.Run("returns error when quota not found", func(t *testing.T) { + kubeClient := fake.NewSimpleClientset() // No quota + + waiter := NewBackpressureWaiter(kubeClient, "default", BackpressureConfig{ + QuotaName: "nonexistent-quota", + }, nil, logger) + + hasCapacity, err := waiter.HasCapacity(ctx) + assert.Error(t, err) + assert.False(t, hasCapacity) + }) +} + +func TestWaitForCapacity(t *testing.T) { + logger := zap.NewNop().Sugar() + + t.Run("returns immediately when capacity available", func(t *testing.T) { + quota := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-quota", + Namespace: "default", + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("100"), + }, + Used: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("50"), + }, + }, + } + kubeClient := fake.NewSimpleClientset(quota) + + waiter := NewBackpressureWaiter(kubeClient, "default", BackpressureConfig{ + QuotaName: "test-quota", + PollInterval: 100 * time.Millisecond, + }, nil, logger) + + ctx := context.Background() + start := time.Now() + err := waiter.WaitForCapacity(ctx) + elapsed := time.Since(start) + + assert.NoError(t, err) + assert.Less(t, elapsed, 50*time.Millisecond) // Should return almost immediately + }) + + t.Run("respects context cancellation", func(t *testing.T) { + quota := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-quota", + Namespace: "default", + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("100"), + }, + Used: corev1.ResourceList{ + "count/workflows.argoproj.io": resource.MustParse("100"), // Full + }, + }, + } + kubeClient := fake.NewSimpleClientset(quota) + + waiter := NewBackpressureWaiter(kubeClient, "default", BackpressureConfig{ + QuotaName: "test-quota", + PollInterval: 1 * time.Second, + }, nil, logger) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + err := waiter.WaitForCapacity(ctx) + assert.Error(t, err) + assert.Equal(t, context.DeadlineExceeded, err) + }) +} + +func TestThresholdCalculation(t *testing.T) { + logger := zap.NewNop().Sugar() + ctx := context.Background() + + tests := []struct { + name string + hard int64 + used int64 + capacityRatio float64 + expectCapacity bool + }{ + { + name: "150 quota, 0.95 ratio, 140 used = has capacity", + hard: 150, + used: 140, + capacityRatio: 0.95, + expectCapacity: true, // threshold = 142, used < threshold + }, + { + name: "150 quota, 0.95 ratio, 143 used = no capacity", + hard: 150, + used: 143, + capacityRatio: 0.95, + expectCapacity: false, // threshold = 142, used >= threshold + }, + { + name: "5 quota, 0.95 ratio, 4 used = no capacity", + hard: 5, + used: 4, + capacityRatio: 0.95, + expectCapacity: false, // threshold = 4, used >= threshold + }, + { + name: "5 quota, 0.95 ratio, 3 used = has capacity", + hard: 5, + used: 3, + capacityRatio: 0.95, + expectCapacity: true, // threshold = 4, used < threshold + }, + { + name: "100 quota, 0.80 ratio, 79 used = has capacity", + hard: 100, + used: 79, + capacityRatio: 0.80, + expectCapacity: true, // threshold = 80, used < threshold + }, + { + name: "100 quota, 0.80 ratio, 80 used = no capacity", + hard: 100, + used: 80, + capacityRatio: 0.80, + expectCapacity: false, // threshold = 80, used >= threshold + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + quota := &corev1.ResourceQuota{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-quota", + Namespace: "default", + }, + Status: corev1.ResourceQuotaStatus{ + Hard: corev1.ResourceList{ + "count/workflows.argoproj.io": *resource.NewQuantity(tt.hard, resource.DecimalSI), + }, + Used: corev1.ResourceList{ + "count/workflows.argoproj.io": *resource.NewQuantity(tt.used, resource.DecimalSI), + }, + }, + } + kubeClient := fake.NewSimpleClientset(quota) + + waiter := NewBackpressureWaiter(kubeClient, "default", BackpressureConfig{ + QuotaName: "test-quota", + CapacityRatio: tt.capacityRatio, + }, nil, logger) + + hasCapacity, err := waiter.HasCapacity(ctx) + assert.NoError(t, err) + assert.Equal(t, tt.expectCapacity, hasCapacity) + }) + } +} + diff --git a/eventbus/jetstream/sensor/trigger_conn.go b/eventbus/jetstream/sensor/trigger_conn.go index e05b759ee7..5ec1eceedd 100644 --- a/eventbus/jetstream/sensor/trigger_conn.go +++ b/eventbus/jetstream/sensor/trigger_conn.go @@ -30,6 +30,7 @@ type JetstreamTriggerConn struct { sourceDepMap map[string][]string // maps EventSource and EventName to dependency name recentMsgsByID map[string]*msg // prevent re-processing the same message as before (map of msg ID to time) recentMsgsByTime []*msg + backpressureWaiter *BackpressureWaiter // Optional: blocks fetch when quota is near capacity } type msg struct { @@ -102,6 +103,12 @@ func (conn *JetstreamTriggerConn) String() string { return fmt.Sprintf("JetstreamTriggerConn{Sensor:%s,Trigger:%s}", conn.sensorName, conn.triggerName) } +// SetBackpressureWaiter sets the backpressure waiter for quota-based flow control. +// When set, pullSubscribe will wait for capacity before fetching messages. +func (conn *JetstreamTriggerConn) SetBackpressureWaiter(waiter *BackpressureWaiter) { + conn.backpressureWaiter = waiter +} + type jsDeliverConfig struct { policy sensorv1alpha1.JetStreamDeliverPolicy } @@ -203,7 +210,7 @@ func (conn *JetstreamTriggerConn) Subscribe(ctx context.Context, } pullSubscribeCloseCh[subject] = make(chan struct{}) - go conn.pullSubscribe(subscriptions[subscriptionIndex], ch, pullSubscribeCloseCh[subject], &wg) + go conn.pullSubscribe(ctx, subscriptions[subscriptionIndex], ch, pullSubscribeCloseCh[subject], &wg) wg.Add(1) log.Debug("adding 1 to WaitGroup (pullSubscribe)") @@ -241,6 +248,7 @@ func (conn *JetstreamTriggerConn) shutdownSubscriptions(processMsgsCloseCh chan } func (conn *JetstreamTriggerConn) pullSubscribe( + ctx context.Context, subscription *nats.Subscription, msgChannel chan<- *nats.Msg, closeCh <-chan struct{}, @@ -249,6 +257,30 @@ func (conn *JetstreamTriggerConn) pullSubscribe( var previousErrTime time.Time for { + // If backpressure is configured, wait for capacity before fetching + // This keeps messages safe in JetStream when quota is near capacity + if conn.backpressureWaiter != nil { + if err := conn.backpressureWaiter.WaitForCapacity(ctx); err != nil { + conn.Logger.Warnw("Backpressure wait cancelled", "error", err) + wg.Done() + conn.Logger.Debug("wg.Done(): pullSubscribe (backpressure cancelled)") + return + } + // Check if close was requested during backpressure wait. + // When blocked in WaitForCapacity() (sleeping for up to 30 seconds waiting for quota), + // if the EventBus connection dies, the reconnection logic sends a signal to closeCh + // to close the old subscription. But since we were blocked inside WaitForCapacity(), + // we don't see the signal until the sleep ends. By then, the old subscription is stale. + // This check ensures we exit cleanly if connection dropped during the wait. + select { + case <-closeCh: + conn.Logger.Info("Close requested after backpressure wait, exiting pullSubscribe") + wg.Done() + return + default: + } + } + // call Fetch with timeout msgs, fetchErr := subscription.Fetch(1, nats.MaxWait(time.Second*1)) if fetchErr != nil && !errors.Is(fetchErr, nats.ErrTimeout) { diff --git a/metrics/metrics.go b/metrics/metrics.go index e3e33a0d23..067b8e811f 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -49,6 +49,7 @@ type Metrics struct { actionFailed *prometheus.CounterVec actionRetriesFailed *prometheus.CounterVec actionDuration *prometheus.SummaryVec + sensorQuotaBlocked *prometheus.GaugeVec } // NewMetrics returns a Metrics instance @@ -143,6 +144,14 @@ func NewMetrics(namespace string) *Metrics { labelNamespace: namespace, }, }, []string{labelSensorName, labelTriggerName}), + sensorQuotaBlocked: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: prefix, + Name: "sensor_quota_blocked", + Help: "Current state of sensor quota-based backpressure: 1 if blocked waiting for quota capacity, 0 if processing normally. https://argoproj.github.io/argo-events/metrics/#argo_events_sensor_quota_blocked", + ConstLabels: prometheus.Labels{ + labelNamespace: namespace, + }, + }, []string{labelSensorName, labelTriggerName}), } } @@ -158,6 +167,7 @@ func (m *Metrics) Collect(ch chan<- prometheus.Metric) { m.actionFailed.Collect(ch) m.actionRetriesFailed.Collect(ch) m.actionDuration.Collect(ch) + m.sensorQuotaBlocked.Collect(ch) } func (m *Metrics) Describe(ch chan<- *prometheus.Desc) { @@ -172,6 +182,7 @@ func (m *Metrics) Describe(ch chan<- *prometheus.Desc) { m.actionFailed.Describe(ch) m.actionRetriesFailed.Describe(ch) m.actionDuration.Describe(ch) + m.sensorQuotaBlocked.Describe(ch) } func (m *Metrics) IncRunningServices(eventSourceName string) { @@ -226,6 +237,15 @@ func (m *Metrics) ActionDuration(sensorName, triggerName string, num float64) { m.actionDuration.WithLabelValues(sensorName, triggerName).Observe(num) } +// SetSensorQuotaBlocked sets the sensor quota blocked state for backpressure monitoring +func (m *Metrics) SetSensorQuotaBlocked(sensorName, triggerName string, isBlocked bool) { + if isBlocked { + m.sensorQuotaBlocked.WithLabelValues(sensorName, triggerName).Set(1) + } else { + m.sensorQuotaBlocked.WithLabelValues(sensorName, triggerName).Set(0) + } +} + // Run starts a metrics server func (m *Metrics) Run(ctx context.Context, addr string) { log := logging.FromContext(ctx) diff --git a/sensors/listener.go b/sensors/listener.go index 5d0cc7add5..6c04988e22 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -19,6 +19,8 @@ package sensors import ( "context" "fmt" + "os" + "strconv" "strings" "sync" "sync/atomic" @@ -31,6 +33,7 @@ import ( "github.com/argoproj/argo-events/common/logging" "github.com/argoproj/argo-events/eventbus" eventbuscommon "github.com/argoproj/argo-events/eventbus/common" + jetstreamsensor "github.com/argoproj/argo-events/eventbus/jetstream/sensor" apicommon "github.com/argoproj/argo-events/pkg/apis/common" "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1" sensordependencies "github.com/argoproj/argo-events/sensors/dependencies" @@ -173,6 +176,9 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { } defer conn.Close() + // Configure backpressure if enabled + sensorCtx.setupBackpressure(conn, sensor.Name, trigger.Template.Name, triggerLogger) + transformFunc := func(depName string, event cloudevents.Event) (*cloudevents.Event, error) { dep, ok := depMapping[depName] if !ok { @@ -335,6 +341,9 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error { } triggerLogger.Infow("reconnected to EventBus.", zap.Any("connection", conn)) + // Re-setup backpressure on new connection + sensorCtx.setupBackpressure(conn, sensor.Name, trigger.Template.Name, triggerLogger) + if atomic.LoadUint32(&subLock) == 1 { triggerLogger.Debug("acquired sublock, instructing trigger to shutdown subscription") closeSubCh <- struct{}{} @@ -543,3 +552,77 @@ func unique(stringSlice []string) []string { } return list } + +// getBackpressureConfig returns backpressure configuration from environment variables. +// Returns nil if backpressure is not configured. +// Environment variables: +// - RESOURCE_QUOTA_NAME: Name of the ResourceQuota to check (required) +// - BACKPRESSURE_RESOURCE_NAME: Resource name in quota (default: count/workflows.argoproj.io) +// - BACKPRESSURE_CAPACITY_RATIO: Ratio of quota to use (default: 0.95 = 5% buffer) +// - BACKPRESSURE_POLL_INTERVAL: Interval to poll quota in seconds (default: 30) +func (sensorCtx *SensorContext) getBackpressureConfig() *jetstreamsensor.BackpressureConfig { + quotaName := os.Getenv(common.EnvVarResourceQuotaName) + if quotaName == "" { + return nil // Backpressure not configured + } + + resourceName := os.Getenv(common.EnvVarBackpressureResourceName) + if resourceName == "" { + resourceName = common.DefaultBackpressureResourceName + } + + capacityRatio := common.DefaultBackpressureCapacityRatio + if ratioStr := os.Getenv(common.EnvVarBackpressureCapacityRatio); ratioStr != "" { + if parsed, err := strconv.ParseFloat(ratioStr, 64); err == nil { + capacityRatio = parsed + } + } + + pollInterval := time.Duration(common.DefaultBackpressurePollInterval) * time.Second + if intervalStr := os.Getenv(common.EnvVarBackpressurePollInterval); intervalStr != "" { + if parsed, err := strconv.Atoi(intervalStr); err == nil { + pollInterval = time.Duration(parsed) * time.Second + } + } + + return &jetstreamsensor.BackpressureConfig{ + QuotaName: quotaName, + ResourceName: resourceName, + CapacityRatio: capacityRatio, + PollInterval: pollInterval, + } +} + +// setupBackpressure configures backpressure on a JetStream connection if enabled. +// Called from both initial connection and reconnection paths. +func (sensorCtx *SensorContext) setupBackpressure( + conn eventbuscommon.TriggerConnection, + sensorName string, + triggerName string, + logger *zap.SugaredLogger, +) { + jsConn, ok := conn.(*jetstreamsensor.JetstreamTriggerConn) + if !ok { + return // Not a JetStream connection + } + + backpressureCfg := sensorCtx.getBackpressureConfig() + if backpressureCfg == nil { + return // Backpressure not configured + } + + backpressureCfg.SensorName = sensorName + backpressureCfg.TriggerName = triggerName + waiter := jetstreamsensor.NewBackpressureWaiter( + sensorCtx.kubeClient, + sensorCtx.sensor.Namespace, + *backpressureCfg, + sensorCtx.metrics, + logger, + ) + jsConn.SetBackpressureWaiter(waiter) + logger.Infow("Backpressure enabled", + "quotaName", backpressureCfg.QuotaName, + "capacityRatio", backpressureCfg.CapacityRatio, + ) +}