Skip to content
64 changes: 64 additions & 0 deletions api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 30 additions & 1 deletion docs/eventbus/jetstream.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,36 @@ For Jetstream, TLS is turned on for all client-server communication as well as b

## How it works under the hood

Jetstream has the concept of a Stream, and Subjects (i.e. topics) which are used on a Stream. From the documentation: “Each Stream defines how messages are stored and what the limits (duration, size, interest) of the retention are.” For Argo Events, we have one Stream called "default" with a single set of settings, but we have multiple subjects, each of which is named `default.<eventsourcename>.<eventname>`. Sensors subscribe to the subjects they need using durable consumers.
Jetstream has the concept of a Stream, and Subjects (i.e. topics) which are used on a Stream. From the documentation: "Each Stream defines how messages are stored and what the limits (duration, size, interest) of the retention are." For Argo Events, we have one Stream called "default" with a single set of settings, but we have multiple subjects, each of which is named `default.<eventsourcename>.<eventname>`. Sensors subscribe to the subjects they need using durable consumers.

### Sensor Deliver Policy Configuration

When using JetStream as the EventBus, you can configure the deliver policy for each sensor dependency using the `jetStream` field in the `EventDependency` specification. This allows you to control how messages are delivered to your sensor:

- **`all`**: Start receiving from the earliest available message in the stream
- **`last`**: Start with the last message added to the stream, or the last message matching the consumer's filter subject if defined
- **`new`**: Start receiving messages created after the consumer was created

Example sensor configuration:

```yaml
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: example
spec:
dependencies:
- name: test-dep
eventSourceName: webhook
eventName: example
jetStream:
deliverPolicy: "all" # Start from earliest available message
triggers:
- template:
name: http-trigger
http:
url: http://example.com/webhook
```

### Exotic

Expand Down
2 changes: 2 additions & 0 deletions eventbus/common/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package common

import (
eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

// Auth contains the auth infor for event bus
Expand Down Expand Up @@ -33,4 +34,5 @@ type Dependency struct {
Name string
EventSourceName string
EventName string
JetStream *sensorv1alpha1.JetStreamConsumerConfig
}
51 changes: 50 additions & 1 deletion eventbus/jetstream/sensor/trigger_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
jetstreambase "github.com/argoproj/argo-events/eventbus/jetstream/base"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

type JetstreamTriggerConn struct {
Expand Down Expand Up @@ -101,6 +102,53 @@ func (conn *JetstreamTriggerConn) String() string {
return fmt.Sprintf("JetstreamTriggerConn{Sensor:%s,Trigger:%s}", conn.sensorName, conn.triggerName)
}

type jsDeliverConfig struct {
policy sensorv1alpha1.JetStreamDeliverPolicy
}

func (conn *JetstreamTriggerConn) consumerOptionsForDependency(dep eventbuscommon.Dependency) []nats.SubOpt {
options := []nats.SubOpt{nats.AckExplicit()}
config, warning := conn.resolveJetStreamDeliverConfig(dep)
if warning != "" && conn.Logger != nil {
conn.Logger.Warn(warning)
}

// Note: The policies included here are the ones directly supported by the NATS Go client
// Advanced policies like DeliverByStartSequence and DeliverByStartTime are not supported
// by PullSubscribe options and would require consumer configuration
switch config.policy {
case sensorv1alpha1.JetStreamDeliverAll:
return append(options, nats.DeliverAll())
case sensorv1alpha1.JetStreamDeliverLast:
return append(options, nats.DeliverLast())
case sensorv1alpha1.JetStreamDeliverNew:
return append(options, nats.DeliverNew())
default:
return append(options, nats.DeliverNew())
}
}

func (conn *JetstreamTriggerConn) resolveJetStreamDeliverConfig(dep eventbuscommon.Dependency) (jsDeliverConfig, string) {
cfg := dep.JetStream
if cfg == nil || cfg.DeliverPolicy == "" {
return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverNew}, ""
}

// Note: Only the deliver policies directly supported by NATS Go client are handled here
// Advanced policies like DeliverByStartSequence and DeliverByStartTime are not supported
// and will fall through to the default case
switch cfg.DeliverPolicy {
case sensorv1alpha1.JetStreamDeliverAll:
return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverAll}, ""
case sensorv1alpha1.JetStreamDeliverLast:
return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverLast}, ""
case sensorv1alpha1.JetStreamDeliverNew:
return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverNew}, ""
default:
return jsDeliverConfig{policy: sensorv1alpha1.JetStreamDeliverNew}, fmt.Sprintf("Unrecognized or unsupported JetStream deliver policy %q for dependency %s; defaulting to DeliverNew", cfg.DeliverPolicy, dep.Name)
}
}

func (conn *JetstreamTriggerConn) Subscribe(ctx context.Context,
closeCh <-chan struct{},
resetConditionsCh <-chan struct{},
Expand Down Expand Up @@ -144,7 +192,8 @@ func (conn *JetstreamTriggerConn) Subscribe(ctx context.Context,

conn.Logger.Debugf("durable name for sensor='%s', trigger='%s', dep='%s': '%s'", conn.sensorName, conn.triggerName, dependency.Name, durableName)
log.Infof("Subscribing to subject %s with durable name %s", subject, durableName)
subscriptions[subscriptionIndex], err = conn.JSContext.PullSubscribe(subject, durableName, nats.AckExplicit(), nats.DeliverNew())
opts := conn.consumerOptionsForDependency(dependency)
subscriptions[subscriptionIndex], err = conn.JSContext.PullSubscribe(subject, durableName, opts...)
if err != nil {
errorStr := fmt.Sprintf("Failed to subscribe to subject %s using group %s: %v", subject, durableName, err)
log.Error(errorStr)
Expand Down
146 changes: 146 additions & 0 deletions eventbus/jetstream/sensor/trigger_conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
package sensor

import (
"testing"

eventbuscommon "github.com/argoproj/argo-events/eventbus/common"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
"github.com/stretchr/testify/assert"
)

func TestConsumerOptionsForDependency(t *testing.T) {
conn := &JetstreamTriggerConn{}

tests := []struct {
name string
dependency eventbuscommon.Dependency
}{
{
name: "DeliverAll policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverAll},
},
},
{
name: "DeliverLast policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverLast},
},
},
{
name: "DeliverNew policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverNew},
},
},
{
name: "No JetStream config",
dependency: eventbuscommon.Dependency{Name: "test-dep"},
},
{
name: "Empty deliver policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: ""},
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
opts := conn.consumerOptionsForDependency(tt.dependency)
assert.NotEmpty(t, opts)
assert.GreaterOrEqual(t, len(opts), 1)
})
}
}

func TestResolveJetStreamDeliverConfig(t *testing.T) {
conn := &JetstreamTriggerConn{}

tests := []struct {
name string
dependency eventbuscommon.Dependency
expectedPolicy sensorv1alpha1.JetStreamDeliverPolicy
expectWarning bool
}{
{
name: "DeliverAll policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverAll},
},
expectedPolicy: sensorv1alpha1.JetStreamDeliverAll,
},
{
name: "DeliverLast policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverLast},
},
expectedPolicy: sensorv1alpha1.JetStreamDeliverLast,
},
{
name: "DeliverNew policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: sensorv1alpha1.JetStreamDeliverNew},
},
expectedPolicy: sensorv1alpha1.JetStreamDeliverNew,
},
{
name: "No JetStream config",
dependency: eventbuscommon.Dependency{Name: "test-dep"},
expectedPolicy: sensorv1alpha1.JetStreamDeliverNew,
},
{
name: "Empty deliver policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: ""},
},
expectedPolicy: sensorv1alpha1.JetStreamDeliverNew,
},
{
name: "Unsupported policy",
dependency: eventbuscommon.Dependency{
Name: "test-dep",
JetStream: &sensorv1alpha1.JetStreamConsumerConfig{DeliverPolicy: "unsupported"},
},
expectedPolicy: sensorv1alpha1.JetStreamDeliverNew,
expectWarning: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
config, warning := conn.resolveJetStreamDeliverConfig(tt.dependency)
assert.Equal(t, tt.expectedPolicy, config.policy)

if tt.expectWarning {
assert.NotEmpty(t, warning)
} else {
assert.Empty(t, warning)
}
})
}
}

func TestJetStreamDeliverPolicyConstants(t *testing.T) {
assert.Equal(t, "all", string(sensorv1alpha1.JetStreamDeliverAll))
assert.Equal(t, "last", string(sensorv1alpha1.JetStreamDeliverLast))
assert.Equal(t, "new", string(sensorv1alpha1.JetStreamDeliverNew))
}

func TestJetStreamConsumerConfig(t *testing.T) {
config := &sensorv1alpha1.JetStreamConsumerConfig{
DeliverPolicy: sensorv1alpha1.JetStreamDeliverAll,
}
assert.Equal(t, sensorv1alpha1.JetStreamDeliverAll, config.DeliverPolicy)

emptyConfig := &sensorv1alpha1.JetStreamConsumerConfig{}
assert.Equal(t, sensorv1alpha1.JetStreamDeliverPolicy(""), emptyConfig.DeliverPolicy)
}
Loading