diff --git a/common/component/redis/redis.go b/common/component/redis/redis.go index f08d3c7c1e..f7d701f817 100644 --- a/common/component/redis/redis.go +++ b/common/component/redis/redis.go @@ -82,7 +82,7 @@ type RedisClient interface { ConfigurationSubscribe(ctx context.Context, args *ConfigurationSubscribeArgs) SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error) EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error) - XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) + XAdd(ctx context.Context, stream string, maxLenApprox int64, streamTTL string, values map[string]interface{}) (string, error) XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error XAck(ctx context.Context, stream string, group string, messageID string) error XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error) diff --git a/common/component/redis/settings.go b/common/component/redis/settings.go index e154982050..d031f1c23b 100644 --- a/common/component/redis/settings.go +++ b/common/component/redis/settings.go @@ -102,6 +102,9 @@ type Settings struct { // The max len of stream MaxLenApprox int64 `mapstructure:"maxLenApprox" mdonly:"pubsub"` + // The TTL of stream entries + StreamTTL time.Duration `mapstructure:"streamTTL" mdonly:"pubsub"` + // EntraID / AzureAD Authentication based on the shared code which essentially uses the DefaultAzureCredential // from the official Azure Identity SDK for Go UseEntraID bool `mapstructure:"useEntraID" mapstructurealiases:"useAzureAD"` @@ -127,6 +130,15 @@ func (s *Settings) SetCertificate(fn func(cert *tls.Certificate)) error { return nil } +func (s *Settings) GetMinID(now time.Time) string { + // If StreamTTL is not set, return empty string (no trimming) + if s.StreamTTL == 0 { + return "" + } + + return fmt.Sprintf("%d-1", now.Add(-s.StreamTTL).UnixMilli()) +} + type Duration time.Duration func (r *Duration) DecodeString(value string) error { diff --git a/common/component/redis/settings_test.go b/common/component/redis/settings_test.go index 28b673a488..9b6ebc291c 100644 --- a/common/component/redis/settings_test.go +++ b/common/component/redis/settings_test.go @@ -3,6 +3,7 @@ package redis import ( "crypto/tls" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -39,4 +40,34 @@ func TestSettings(t *testing.T) { require.NoError(t, err) require.NotNil(t, c) }) + + t.Run("stream TTL", func(t *testing.T) { + fixedTime := time.Date(2025, 3, 14, 0o1, 59, 26, 0, time.UTC) + + tests := []struct { + name string + streamTTL time.Duration + want string + }{ + { + name: "with one hour TTL", + streamTTL: time.Hour, + want: "1741913966000-1", + }, + { + name: "with zero TTL", + streamTTL: 0, + want: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + settings := &Settings{ + StreamTTL: tt.streamTTL, + } + require.Equal(t, tt.want, settings.GetMinID(fixedTime)) + }) + } + }) } diff --git a/common/component/redis/v8client.go b/common/component/redis/v8client.go index 16571489f5..c74a057012 100644 --- a/common/component/redis/v8client.go +++ b/common/component/redis/v8client.go @@ -161,7 +161,7 @@ func (c v8Client) SetNX(ctx context.Context, key string, value interface{}, expi return &val, nx.Err() } -func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) { +func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) { var writeCtx context.Context if c.writeTimeout > 0 { timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) @@ -171,9 +171,11 @@ func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v writeCtx = ctx } return c.client.XAdd(writeCtx, &v8.XAddArgs{ - Stream: stream, - Values: values, - MaxLenApprox: maxLenApprox, + Stream: stream, + Values: values, + MaxLen: maxLenApprox, + MinID: minIDApprox, + Approx: true, }).Result() } diff --git a/common/component/redis/v9client.go b/common/component/redis/v9client.go index f1e2d318b2..8399e1e7ad 100644 --- a/common/component/redis/v9client.go +++ b/common/component/redis/v9client.go @@ -161,7 +161,7 @@ func (c v9Client) SetNX(ctx context.Context, key string, value interface{}, expi return &val, nx.Err() } -func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) { +func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) { var writeCtx context.Context if c.writeTimeout > 0 { timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout)) @@ -174,6 +174,7 @@ func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v Stream: stream, Values: values, MaxLen: maxLenApprox, + MinID: minIDApprox, Approx: true, }).Result() } diff --git a/pubsub/redis/metadata.yaml b/pubsub/redis/metadata.yaml index 9bafdb5f7d..dae86bd624 100644 --- a/pubsub/redis/metadata.yaml +++ b/pubsub/redis/metadata.yaml @@ -165,7 +165,7 @@ metadata: - name: failover required: false description: | - Property to enabled failover configuration. Needs sentinalMasterName to be set. Defaults to "false" + Property to enabled failover configuration. Needs sentinelMasterName to be set. Defaults to "false" example: "false" type: bool - name: sentinelMasterName @@ -175,9 +175,19 @@ metadata: type: string - name: maxLenApprox required: false - description: Maximum number of items inside a stream.The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited. + description: Maximum number of items inside a stream. The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited. example: "10000" type: number + - name: streamTTL + required: false + description: | + TTL duration for stream entries. Entries older than this duration will be evicted. + This is an approximate value, as it's implemented using Redis stream's MINID trimming with the '~' modifier. + The actual retention may include slightly more entries than strictly defined by the TTL, + as Redis optimizes the trimming operation for efficiency by potentially keeping some additional entries. + example: "30d" + type: duration + builtinAuthenticationProfiles: - name: "azuread" metadata: diff --git a/pubsub/redis/redis.go b/pubsub/redis/redis.go index 7df54777f8..5f88468fe5 100644 --- a/pubsub/redis/redis.go +++ b/pubsub/redis/redis.go @@ -38,6 +38,7 @@ const ( queueDepth = "queueDepth" concurrency = "concurrency" maxLenApprox = "maxLenApprox" + streamTTL = "streamTTL" ) // redisStreams handles consuming from a Redis stream using @@ -112,7 +113,7 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest) redisPayload["metadata"] = serializedMetadata } - _, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, redisPayload) + _, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, r.clientSettings.GetMinID(time.Now()), redisPayload) if err != nil { return fmt.Errorf("redis streams: error from publish: %s", err) } diff --git a/pubsub/redis/redis_test.go b/pubsub/redis/redis_test.go index 15008e577c..c2e442c1dc 100644 --- a/pubsub/redis/redis_test.go +++ b/pubsub/redis/redis_test.go @@ -19,6 +19,7 @@ import ( "strconv" "sync" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -35,6 +36,7 @@ func getFakeProperties() map[string]string { consumerID: "fakeConsumer", enableTLS: "true", maxLenApprox: "1000", + streamTTL: "1h", } } @@ -54,6 +56,7 @@ func TestParseRedisMetadata(t *testing.T) { require.NoError(t, err) assert.Equal(t, fakeProperties[consumerID], m.ConsumerID) assert.Equal(t, int64(1000), m.MaxLenApprox) + assert.Equal(t, 1*time.Hour, m.StreamTTL) }) // TODO: fix the code to return the error for the missing property to make this test work