Skip to content

Adds Redis stream trimming by time(stream ID) #3710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
May 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/component/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type RedisClient interface {
ConfigurationSubscribe(ctx context.Context, args *ConfigurationSubscribeArgs)
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error)
EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error)
XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error)
XAdd(ctx context.Context, stream string, maxLenApprox int64, streamTTL string, values map[string]interface{}) (string, error)
XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error
XAck(ctx context.Context, stream string, group string, messageID string) error
XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error)
Expand Down
12 changes: 12 additions & 0 deletions common/component/redis/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ type Settings struct {
// The max len of stream
MaxLenApprox int64 `mapstructure:"maxLenApprox" mdonly:"pubsub"`

// The TTL of stream entries
StreamTTL time.Duration `mapstructure:"streamTTL" mdonly:"pubsub"`

// EntraID / AzureAD Authentication based on the shared code which essentially uses the DefaultAzureCredential
// from the official Azure Identity SDK for Go
UseEntraID bool `mapstructure:"useEntraID" mapstructurealiases:"useAzureAD"`
Expand All @@ -127,6 +130,15 @@ func (s *Settings) SetCertificate(fn func(cert *tls.Certificate)) error {
return nil
}

func (s *Settings) GetMinID(now time.Time) string {
// If StreamTTL is not set, return empty string (no trimming)
if s.StreamTTL == 0 {
return ""
}
Comment on lines +135 to +137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if s.StreamTTL == 0 {
return ""
}
if s.StreamTTL <= 0 {
return ""
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we shouldn't fail silently, and it would be too late to return the error here (we use this on message send). We should probably catch it on init.


return fmt.Sprintf("%d-1", now.Add(-s.StreamTTL).UnixMilli())
}

type Duration time.Duration

func (r *Duration) DecodeString(value string) error {
Expand Down
31 changes: 31 additions & 0 deletions common/component/redis/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package redis
import (
"crypto/tls"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -39,4 +40,34 @@ func TestSettings(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, c)
})

t.Run("stream TTL", func(t *testing.T) {
fixedTime := time.Date(2025, 3, 14, 0o1, 59, 26, 0, time.UTC)

tests := []struct {
name string
streamTTL time.Duration
want string
}{
{
name: "with one hour TTL",
streamTTL: time.Hour,
want: "1741913966000-1",
},
{
name: "with zero TTL",
streamTTL: 0,
want: "",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
settings := &Settings{
StreamTTL: tt.streamTTL,
}
require.Equal(t, tt.want, settings.GetMinID(fixedTime))
})
}
})
}
10 changes: 6 additions & 4 deletions common/component/redis/v8client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c v8Client) SetNX(ctx context.Context, key string, value interface{}, expi
return &val, nx.Err()
}

func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
Expand All @@ -171,9 +171,11 @@ func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v
writeCtx = ctx
}
return c.client.XAdd(writeCtx, &v8.XAddArgs{
Stream: stream,
Values: values,
MaxLenApprox: maxLenApprox,
Stream: stream,
Values: values,
MaxLen: maxLenApprox,
MinID: minIDApprox,
Approx: true,
}).Result()
}

Expand Down
3 changes: 2 additions & 1 deletion common/component/redis/v9client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (c v9Client) SetNX(ctx context.Context, key string, value interface{}, expi
return &val, nx.Err()
}

func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
Expand All @@ -174,6 +174,7 @@ func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v
Stream: stream,
Values: values,
MaxLen: maxLenApprox,
MinID: minIDApprox,
Approx: true,
}).Result()
}
Expand Down
14 changes: 12 additions & 2 deletions pubsub/redis/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ metadata:
- name: failover
required: false
description: |
Property to enabled failover configuration. Needs sentinalMasterName to be set. Defaults to "false"
Property to enabled failover configuration. Needs sentinelMasterName to be set. Defaults to "false"
example: "false"
type: bool
- name: sentinelMasterName
Expand All @@ -175,9 +175,19 @@ metadata:
type: string
- name: maxLenApprox
required: false
description: Maximum number of items inside a stream.The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
description: Maximum number of items inside a stream. The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
example: "10000"
type: number
- name: streamTTL
required: false
description: |
TTL duration for stream entries. Entries older than this duration will be evicted.
This is an approximate value, as it's implemented using Redis stream's MINID trimming with the '~' modifier.
The actual retention may include slightly more entries than strictly defined by the TTL,
as Redis optimizes the trimming operation for efficiency by potentially keeping some additional entries.
example: "30d"
type: duration

builtinAuthenticationProfiles:
- name: "azuread"
metadata:
Expand Down
3 changes: 2 additions & 1 deletion pubsub/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ const (
queueDepth = "queueDepth"
concurrency = "concurrency"
maxLenApprox = "maxLenApprox"
streamTTL = "streamTTL"
)

// redisStreams handles consuming from a Redis stream using
Expand Down Expand Up @@ -112,7 +113,7 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest)
redisPayload["metadata"] = serializedMetadata
}

_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, redisPayload)
_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, r.clientSettings.GetMinID(time.Now()), redisPayload)
if err != nil {
return fmt.Errorf("redis streams: error from publish: %s", err)
}
Expand Down
3 changes: 3 additions & 0 deletions pubsub/redis/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strconv"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -35,6 +36,7 @@ func getFakeProperties() map[string]string {
consumerID: "fakeConsumer",
enableTLS: "true",
maxLenApprox: "1000",
streamTTL: "1h",
}
}

Expand All @@ -54,6 +56,7 @@ func TestParseRedisMetadata(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, fakeProperties[consumerID], m.ConsumerID)
assert.Equal(t, int64(1000), m.MaxLenApprox)
assert.Equal(t, 1*time.Hour, m.StreamTTL)
})

// TODO: fix the code to return the error for the missing property to make this test work
Expand Down