diff --git a/api/sensor.md b/api/sensor.md
index ed6570cf98..112a2cf5d8 100644
--- a/api/sensor.md
+++ b/api/sensor.md
@@ -1220,6 +1220,19 @@ blank treated as and (&&).
+
+
+jetStream
+ JetStreamConsumerConfig
+
+ |
+
+
+(Optional)
+JetStream configuration for this dependency. Allows configuring the deliver policy for JetStream consumers.
+
+ |
+
@@ -1692,6 +1705,57 @@ always use the first URL, while push will use all of them.
+
+JetStreamConsumerConfig
+
+
+(Appears on:
+EventDependency)
+
+
+
+JetStreamConsumerConfig holds the JetStream consumer configuration
+
+
+
+
+
+|
+Field
+ |
+
+Description
+ |
+
+
+
+
+
+deliverPolicy
+ JetStreamDeliverPolicy
+
+ |
+
+
+(Optional)
+DeliverPolicy specifies the JetStream deliver policy. Available values: "all" (default - start from earliest available message), "last" (start with last message), "new" (start with messages created after consumer). Defaults to "new".
+
+ |
+
+
+
+
+JetStreamDeliverPolicy
+
+
+(Appears on:
+JetStreamConsumerConfig)
+
+
+
+JetStreamDeliverPolicy refers to the JetStream deliver policy
+
+
HTTPTrigger
diff --git a/docs/eventbus/jetstream.md b/docs/eventbus/jetstream.md
index 60d7e46a26..8edbd54b93 100644
--- a/docs/eventbus/jetstream.md
+++ b/docs/eventbus/jetstream.md
@@ -67,7 +67,36 @@ For Jetstream, TLS is turned on for all client-server communication as well as b
## How it works under the hood
-Jetstream has the concept of a Stream, and Subjects (i.e. topics) which are used on a Stream. From the documentation: “Each Stream defines how messages are stored and what the limits (duration, size, interest) of the retention are.” For Argo Events, we have one Stream called "default" with a single set of settings, but we have multiple subjects, each of which is named `default..`. Sensors subscribe to the subjects they need using durable consumers.
+Jetstream has the concept of a Stream, and Subjects (i.e. topics) which are used on a Stream. From the documentation: "Each Stream defines how messages are stored and what the limits (duration, size, interest) of the retention are." For Argo Events, we have one Stream called "default" with a single set of settings, but we have multiple subjects, each of which is named `default..`. Sensors subscribe to the subjects they need using durable consumers.
+
+### Sensor Deliver Policy Configuration
+
+When using JetStream as the EventBus, you can configure the deliver policy for each sensor dependency using the `jetStream` field in the `EventDependency` specification. This allows you to control how messages are delivered to your sensor:
+
+- **`all`**: Start receiving from the earliest available message in the stream
+- **`last`**: Start with the last message added to the stream, or the last message matching the consumer's filter subject if defined
+- **`new`**: Start receiving messages created after the consumer was created
+
+Example sensor configuration:
+
+```yaml
+apiVersion: argoproj.io/v1alpha1
+kind: Sensor
+metadata:
+ name: example
+spec:
+ dependencies:
+ - name: test-dep
+ eventSourceName: webhook
+ eventName: example
+ jetStream:
+ deliverPolicy: "all" # Start from earliest available message
+ triggers:
+ - template:
+ name: http-trigger
+ http:
+ url: http://example.com/webhook
+```
### Exotic
diff --git a/eventbus/common/structs.go b/eventbus/common/structs.go
index c19dcd4707..9934367340 100644
--- a/eventbus/common/structs.go
+++ b/eventbus/common/structs.go
@@ -2,6 +2,7 @@ package common
import (
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
+ sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)
// Auth contains the auth infor for event bus
@@ -33,4 +34,5 @@ type Dependency struct {
Name string
EventSourceName string
EventName string
+ JetStream *sensorv1alpha1.JetStreamConsumerConfig
}
diff --git a/eventbus/jetstream/sensor/trigger_conn.go b/eventbus/jetstream/sensor/trigger_conn.go
index bc97e3404f..e05b759ee7 100644
--- a/eventbus/jetstream/sensor/trigger_conn.go
+++ b/eventbus/jetstream/sensor/trigger_conn.go
@@ -15,6 +15,7 @@ import (
eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
jetstreambase "github.com/argoproj/argo-events/eventbus/jetstream/base"
+ sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)
type JetstreamTriggerConn struct {
@@ -101,6 +102,53 @@ func (conn *JetstreamTriggerConn) String() string {
return fmt.Sprintf("JetstreamTriggerConn{Sensor:%s,Trigger:%s}", conn.sensorName, conn.triggerName)
}
+type jsDeliverConfig struct {
+ policy sensorv1alpha1.JetStreamDeliverPolicy
+}
+
+func (conn *JetstreamTriggerConn) consumerOptionsForDependency(dep eventbuscommon.Dependency) []nats.SubOpt {
+ options := []nats.SubOpt{nats.AckExplicit()}
+ config, warning := conn.resolveJetStreamDeliverConfig(dep)
+ if warning != "" && conn.Logger != nil {
+ conn.Logger.Warn(warning)
+ }
+
+ // Note: The policies included here are the ones directly supported by the NATS Go client
+ // Advanced policies like DeliverByStartSequence and DeliverByStartTime are not supported
+ // by PullSubscribe options and would require consumer configuration
+ switch config.policy {
+ case sensorv1alpha1.JetStreamDeliverAll:
+ return append(options, nats.DeliverAll())
+ case sensorv1alpha1.JetStreamDeliverLast:
+ return append(options, nats.DeliverLast())
+ case sensorv1alpha1.JetStreamDeliverNew:
+ return append(options, nats.DeliverNew())
+ default:
+ return append(options, nats.DeliverNew())
+ }
+}
+
+func (conn *JetstreamTriggerConn) resolveJetStreamDeliverConfig(dep eventbuscommon.Dependency) (jsDeliverConfig, string) {
+ cfg := dep.JetStream
+ if cfg == nil || cfg.DeliverPolicy == "" {
+ return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverNew}, ""
+ }
+
+ // Note: Only the deliver policies directly supported by NATS Go client are handled here
+ // Advanced policies like DeliverByStartSequence and DeliverByStartTime are not supported
+ // and will fall through to the default case
+ switch cfg.DeliverPolicy {
+ case sensorv1alpha1.JetStreamDeliverAll:
+ return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverAll}, ""
+ case sensorv1alpha1.JetStreamDeliverLast:
+ return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverLast}, ""
+ case sensorv1alpha1.JetStreamDeliverNew:
+ return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverNew}, ""
+ default:
+ return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverNew}, fmt.Sprintf("Unrecognized or unsupported JetStream deliver policy %q for dependency %s; defaulting to DeliverNew", cfg.DeliverPolicy, dep.Name)
+ }
+}
+
func (conn *JetstreamTriggerConn) Subscribe(ctx context.Context,
closeCh <-chan struct{},
resetConditionsCh <-chan struct{},
@@ -144,7 +192,8 @@ func (conn *JetstreamTriggerConn) Subscribe(ctx context.Context,
conn.Logger.Debugf("durable name for sensor='%s', trigger='%s', dep='%s': '%s'", conn.sensorName, conn.triggerName, dependency.Name, durableName)
log.Infof("Subscribing to subject %s with durable name %s", subject, durableName)
- subscriptions[subscriptionIndex], err = conn.JSContext.PullSubscribe(subject, durableName, nats.AckExplicit(), nats.DeliverNew())
+ opts := conn.consumerOptionsForDependency(dependency)
+ subscriptions[subscriptionIndex], err = conn.JSContext.PullSubscribe(subject, durableName, opts...)
if err != nil {
errorStr := fmt.Sprintf("Failed to subscribe to subject %s using group %s: %v", subject, durableName, err)
log.Error(errorStr)
diff --git a/eventbus/jetstream/sensor/trigger_conn_test.go b/eventbus/jetstream/sensor/trigger_conn_test.go
new file mode 100644
index 0000000000..9bf39c2dd7
--- /dev/null
+++ b/eventbus/jetstream/sensor/trigger_conn_test.go
@@ -0,0 +1,146 @@
+package sensor
+
+import (
+ "testing"
+
+ eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
+ sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
+ "github.com/stretchr/testify/assert"
+)
+
+func TestConsumerOptionsForDependency(t *testing.T) {
+ conn := &JetstreamTriggerConn{}
+
+ tests := []struct {
+ name string
+ dependency eventbuscommon.Dependency
+ }{
+ {
+ name: "DeliverAll policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverAll},
+ },
+ },
+ {
+ name: "DeliverLast policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverLast},
+ },
+ },
+ {
+ name: "DeliverNew policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverNew},
+ },
+ },
+ {
+ name: "No JetStream config",
+ dependency: eventbuscommon.Dependency{Name: "test-dep"},
+ },
+ {
+ name: "Empty deliver policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: ""},
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ opts := conn.consumerOptionsForDependency(tt.dependency)
+ assert.NotEmpty(t, opts)
+ assert.GreaterOrEqual(t, len(opts), 1)
+ })
+ }
+}
+
+func TestResolveJetStreamDeliverConfig(t *testing.T) {
+ conn := &JetstreamTriggerConn{}
+
+ tests := []struct {
+ name string
+ dependency eventbuscommon.Dependency
+ expectedPolicy sensorv1alpha1.JetStreamDeliverPolicy
+ expectWarning bool
+ }{
+ {
+ name: "DeliverAll policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverAll},
+ },
+ expectedPolicy: sensorv1alpha1.JetStreamDeliverAll,
+ },
+ {
+ name: "DeliverLast policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverLast},
+ },
+ expectedPolicy: sensorv1alpha1.JetStreamDeliverLast,
+ },
+ {
+ name: "DeliverNew policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverNew},
+ },
+ expectedPolicy: sensorv1alpha1.JetStreamDeliverNew,
+ },
+ {
+ name: "No JetStream config",
+ dependency: eventbuscommon.Dependency{Name: "test-dep"},
+ expectedPolicy: sensorv1alpha1.JetStreamDeliverNew,
+ },
+ {
+ name: "Empty deliver policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: ""},
+ },
+ expectedPolicy: sensorv1alpha1.JetStreamDeliverNew,
+ },
+ {
+ name: "Unsupported policy",
+ dependency: eventbuscommon.Dependency{
+ Name: "test-dep",
+ JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: "unsupported"},
+ },
+ expectedPolicy: sensorv1alpha1.JetStreamDeliverNew,
+ expectWarning: true,
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ config, warning := conn.resolveJetStreamDeliverConfig(tt.dependency)
+ assert.Equal(t, tt.expectedPolicy, config.policy)
+
+ if tt.expectWarning {
+ assert.NotEmpty(t, warning)
+ } else {
+ assert.Empty(t, warning)
+ }
+ })
+ }
+}
+
+func TestJetStreamDeliverPolicyConstants(t *testing.T) {
+ assert.Equal(t, "all", string(sensorv1alpha1.JetStreamDeliverAll))
+ assert.Equal(t, "last", string(sensorv1alpha1.JetStreamDeliverLast))
+ assert.Equal(t, "new", string(sensorv1alpha1.JetStreamDeliverNew))
+}
+
+func TestJetStreamConsumerConfig(t *testing.T) {
+ config := &sensorv1alpha1.JetStreamConsumerConfig{
+ DeliverPolicy: sensorv1alpha1.JetStreamDeliverAll,
+ }
+ assert.Equal(t, sensorv1alpha1.JetStreamDeliverAll, config.DeliverPolicy)
+
+ emptyConfig := &sensorv1alpha1.JetStreamConsumerConfig{}
+ assert.Equal(t, sensorv1alpha1.JetStreamDeliverPolicy(""), emptyConfig.DeliverPolicy)
+}
diff --git a/examples/sensors/jetstream-deliver-policy.yaml b/examples/sensors/jetstream-deliver-policy.yaml
new file mode 100644
index 0000000000..5f837eb2cf
--- /dev/null
+++ b/examples/sensors/jetstream-deliver-policy.yaml
@@ -0,0 +1,51 @@
+apiVersion: argoproj.io/v1alpha1
+kind: Sensor
+metadata:
+ name: jetstream-deliver-policy-example
+spec:
+ dependencies:
+ # Example with DeliverAll policy - starts from earliest available message
+ - name: all-messages-dep
+ eventSourceName: webhook
+ eventName: example
+ jetStream:
+ deliverPolicy: "all"
+ # Example with DeliverLast policy - starts with last message in stream
+ - name: last-message-dep
+ eventSourceName: webhook
+ eventName: example
+ jetStream:
+ deliverPolicy: "last"
+ # Example with DeliverNew policy - starts with messages created after consumer
+ - name: new-messages-dep
+ eventSourceName: webhook
+ eventName: example
+ jetStream:
+ deliverPolicy: "new"
+ # Example without JetStream config - defaults to "new"
+ - name: default-dep
+ eventSourceName: webhook
+ eventName: example
+ triggers:
+ - template:
+ name: http-trigger
+ http:
+ url: http://http-server.argo-events.svc:8090/hello
+ method: POST
+ payload:
+ - src:
+ dependencyName: all-messages-dep
+ dataKey: body.message
+ dest: allMessage
+ - src:
+ dependencyName: last-message-dep
+ dataKey: body.message
+ dest: lastMessage
+ - src:
+ dependencyName: new-messages-dep
+ dataKey: body.message
+ dest: newMessage
+ - src:
+ dependencyName: default-dep
+ dataKey: body.message
+ dest: defaultMessage
diff --git a/pkg/apis/sensor/v1alpha1/types.go b/pkg/apis/sensor/v1alpha1/types.go
index e5557f7290..376d5c34bf 100644
--- a/pkg/apis/sensor/v1alpha1/types.go
+++ b/pkg/apis/sensor/v1alpha1/types.go
@@ -208,6 +208,9 @@ type EventDependency struct {
// Available values: and (&&), or (||)
// Is optional and if left blank treated as and (&&).
FiltersLogicalOperator LogicalOperator `json:"filtersLogicalOperator,omitempty" protobuf:"bytes,6,opt,name=filtersLogicalOperator,casttype=LogicalOperator"`
+ // JetStream consumer configuration for this dependency
+ // +optional
+ JetStream *JetStreamConsumerConfig `json:"jetStream,omitempty" protobuf:"bytes,7,opt,name=jetStream"`
}
// EventDependencyTransformer transforms the event
@@ -1108,3 +1111,22 @@ func (e EventContext) String() string {
func (a *ArtifactLocation) HasLocation() bool {
return a.S3 != nil || a.Inline != nil || a.File != nil || a.URL != nil
}
+
+// JetStreamDeliverPolicy refers to the JetStream deliver policy
+// Note: Only the deliver policies directly supported by the NATS Go client are included here
+// Advanced policies like DeliverByStartSequence and DeliverByStartTime are not supported
+// by PullSubscribe options and would require consumer configuration
+type JetStreamDeliverPolicy string
+
+const (
+ JetStreamDeliverAll JetStreamDeliverPolicy = "all"
+ JetStreamDeliverLast JetStreamDeliverPolicy = "last"
+ JetStreamDeliverNew JetStreamDeliverPolicy = "new"
+)
+
+// JetStreamConsumerConfig holds the JetStream consumer configuration
+type JetStreamConsumerConfig struct {
+ // DeliverPolicy specifies the JetStream deliver policy
+ // +optional
+ DeliverPolicy JetStreamDeliverPolicy `json:"deliverPolicy,omitempty" protobuf:"bytes,1,opt,name=deliverPolicy"`
+}
diff --git a/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go
index b8f7e6e886..9a9e629cbb 100644
--- a/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go
+++ b/pkg/apis/sensor/v1alpha1/zz_generated.deepcopy.go
@@ -425,6 +425,11 @@ func (in *EventDependency) DeepCopyInto(out *EventDependency) {
*out = new(EventDependencyTransformer)
**out = **in
}
+ if in.JetStream != nil {
+ in, out := &in.JetStream, &out.JetStream
+ *out = new(JetStreamConsumerConfig)
+ **out = **in
+ }
return
}
@@ -667,6 +672,22 @@ func (in *HTTPTrigger) DeepCopy() *HTTPTrigger {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *JetStreamConsumerConfig) DeepCopyInto(out *JetStreamConsumerConfig) {
+ *out = *in
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JetStreamConsumerConfig.
+func (in *JetStreamConsumerConfig) DeepCopy() *JetStreamConsumerConfig {
+ if in == nil {
+ return nil
+ }
+ out := new(JetStreamConsumerConfig)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *K8SResourcePolicy) DeepCopyInto(out *K8SResourcePolicy) {
*out = *in
diff --git a/sensors/listener.go b/sensors/listener.go
index b88e48979c..5d0cc7add5 100644
--- a/sensors/listener.go
+++ b/sensors/listener.go
@@ -155,6 +155,7 @@ func (sensorCtx *SensorContext) listenEvents(ctx context.Context) error {
Name: dep.Name,
EventSourceName: dep.EventSourceName,
EventName: dep.EventName,
+ JetStream: dep.JetStream,
}
deps = append(deps, d)
}
diff --git a/sensors/listener_test.go b/sensors/listener_test.go
index 2390c3ce2c..605ea8c556 100644
--- a/sensors/listener_test.go
+++ b/sensors/listener_test.go
@@ -146,3 +146,56 @@ func TestGetDependencyExpression(t *testing.T) {
assert.NoError(t, err)
})
}
+
+func TestDependencyMapping(t *testing.T) {
+ t.Run("JetStream config is mapped correctly", func(t *testing.T) {
+ obj := sensorObj.DeepCopy()
+ obj.Spec.Dependencies = []v1alpha1.EventDependency{
+ {
+ Name: "dep1",
+ EventSourceName: "webhook",
+ EventName: "example-1",
+ JetStream: &v1alpha1.JetStreamConsumerConfig{
+ DeliverPolicy: v1alpha1.JetStreamDeliverAll,
+ },
+ },
+ {
+ Name: "dep2",
+ EventSourceName: "webhook2",
+ EventName: "example-2",
+ JetStream: &v1alpha1.JetStreamConsumerConfig{
+ DeliverPolicy: v1alpha1.JetStreamDeliverLast,
+ },
+ },
+ {
+ Name: "dep3",
+ EventSourceName: "webhook3",
+ EventName: "example-3",
+ // No JetStream config - should be nil
+ },
+ }
+
+ // Create dependency mapping like the listener does
+ depMapping := make(map[string]v1alpha1.EventDependency)
+ for _, d := range obj.Spec.Dependencies {
+ depMapping[d.Name] = d
+ }
+
+ // Test that each dependency preserves JetStream config
+ for _, depName := range []string{"dep1", "dep2", "dep3"} {
+ dep, ok := depMapping[depName]
+ assert.True(t, ok, "Dependency %s should exist", depName)
+
+ // Verify the JetStream config is accessible
+ if depName == "dep1" {
+ assert.NotNil(t, dep.JetStream, "dep1 should have JetStream config")
+ assert.Equal(t, v1alpha1.JetStreamDeliverAll, dep.JetStream.DeliverPolicy)
+ } else if depName == "dep2" {
+ assert.NotNil(t, dep.JetStream, "dep2 should have JetStream config")
+ assert.Equal(t, v1alpha1.JetStreamDeliverLast, dep.JetStream.DeliverPolicy)
+ } else if depName == "dep3" {
+ assert.Nil(t, dep.JetStream, "dep3 should not have JetStream config")
+ }
+ }
+ })
+}