Skip to content

Commit d58d52f

Browse files
Uses a dynamic MinID for trimming
Signed-off-by: Elena Kolevska <[email protected]>
1 parent ce8f070 commit d58d52f

File tree

6 files changed

+56
-15
lines changed

6 files changed

+56
-15
lines changed

common/component/redis/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type RedisClient interface {
8282
ConfigurationSubscribe(ctx context.Context, args *ConfigurationSubscribeArgs)
8383
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error)
8484
EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error)
85-
XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error)
85+
XAdd(ctx context.Context, stream string, maxLenApprox int64, streamTTL string, values map[string]interface{}) (string, error)
8686
XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error
8787
XAck(ctx context.Context, stream string, group string, messageID string) error
8888
XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error)

common/component/redis/settings.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ type Settings struct {
102102
// The max len of stream
103103
MaxLenApprox int64 `mapstructure:"maxLenApprox" mdonly:"pubsub"`
104104

105-
// Min ID of a stream (stream entry IDs lower than this threshold will be evicted)
106-
MinIDApprox string `mapstructure:"minIDApprox" mdonly:"pubsub"`
105+
// The TTL of stream entries
106+
StreamTTL time.Duration `mapstructure:"streamTTL" mdonly:"pubsub"`
107107

108108
// EntraID / AzureAD Authentication based on the shared code which essentially uses the DefaultAzureCredential
109109
// from the official Azure Identity SDK for Go
@@ -130,6 +130,15 @@ func (s *Settings) SetCertificate(fn func(cert *tls.Certificate)) error {
130130
return nil
131131
}
132132

133+
func (s *Settings) GetMinID(now time.Time) string {
134+
// If StreamTTL is not set, return empty string (no trimming)
135+
if s.StreamTTL == 0 {
136+
return ""
137+
}
138+
139+
return fmt.Sprintf("%d-1", now.Add(-s.StreamTTL).UnixMilli())
140+
}
141+
133142
type Duration time.Duration
134143

135144
func (r *Duration) DecodeString(value string) error {

common/component/redis/settings_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"crypto/tls"
55
"testing"
6+
"time"
67

78
"github.com/stretchr/testify/require"
89
)
@@ -39,4 +40,34 @@ func TestSettings(t *testing.T) {
3940
require.NoError(t, err)
4041
require.NotNil(t, c)
4142
})
43+
44+
t.Run("stream TTL", func(t *testing.T) {
45+
fixedTime := time.Date(2025, 3, 14, 0o1, 59, 26, 0, time.UTC)
46+
47+
tests := []struct {
48+
name string
49+
streamTTL time.Duration
50+
want string
51+
}{
52+
{
53+
name: "with one hour TTL",
54+
streamTTL: time.Hour,
55+
want: "1741913966000-1",
56+
},
57+
{
58+
name: "with zero TTL",
59+
streamTTL: 0,
60+
want: "",
61+
},
62+
}
63+
64+
for _, tt := range tests {
65+
t.Run(tt.name, func(t *testing.T) {
66+
settings := &Settings{
67+
StreamTTL: tt.streamTTL,
68+
}
69+
require.Equal(t, tt.want, settings.GetMinID(fixedTime))
70+
})
71+
}
72+
})
4273
}

pubsub/redis/metadata.yaml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -175,18 +175,18 @@ metadata:
175175
type: string
176176
- name: maxLenApprox
177177
required: false
178-
description: Maximum number of items inside a stream.The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
178+
description: Maximum number of items inside a stream. The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
179179
example: "10000"
180180
type: number
181-
- name: minIDApprox
181+
- name: streamTTL
182182
required: false
183183
description: |
184-
Minimum ID threshold for the stream. Entries with IDs smaller than the specified threshold are evicted,
185-
keeping the stream trimmed to only contain entries with IDs greater than or equal to this value.
186-
This can be used for time-based eviction since Redis stream IDs are in the format {timestamp-sequence},
187-
where timestamp is in milliseconds since epoch. Defaults to none.
188-
example: "1742397526022-0"
189-
type: string
184+
TTL duration for stream entries. Entries older than this duration will be evicted.
185+
This is an approximate value, as it's implemented using Redis stream's MINID trimming with the '~' modifier.
186+
The actual retention may include slightly more entries than strictly defined by the TTL,
187+
as Redis optimizes the trimming operation for efficiency by potentially keeping some additional entries.
188+
example: "30d"
189+
type: duration
190190

191191
builtinAuthenticationProfiles:
192192
- name: "azuread"

pubsub/redis/redis.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ const (
3838
queueDepth = "queueDepth"
3939
concurrency = "concurrency"
4040
maxLenApprox = "maxLenApprox"
41-
minIDApprox = "minIDApprox"
41+
streamTTL = "streamTTL"
4242
)
4343

4444
// redisStreams handles consuming from a Redis stream using
@@ -113,7 +113,7 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest)
113113
redisPayload["metadata"] = serializedMetadata
114114
}
115115

116-
_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, r.clientSettings.MinIDApprox, redisPayload)
116+
_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, r.clientSettings.GetMinID(time.Now()), redisPayload)
117117
if err != nil {
118118
return fmt.Errorf("redis streams: error from publish: %s", err)
119119
}

pubsub/redis/redis_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"strconv"
2020
"sync"
2121
"testing"
22+
"time"
2223

2324
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/require"
@@ -35,7 +36,7 @@ func getFakeProperties() map[string]string {
3536
consumerID: "fakeConsumer",
3637
enableTLS: "true",
3738
maxLenApprox: "1000",
38-
minIDApprox: "1742397526022-0",
39+
streamTTL: "1h",
3940
}
4041
}
4142

@@ -55,7 +56,7 @@ func TestParseRedisMetadata(t *testing.T) {
5556
require.NoError(t, err)
5657
assert.Equal(t, fakeProperties[consumerID], m.ConsumerID)
5758
assert.Equal(t, int64(1000), m.MaxLenApprox)
58-
assert.Equal(t, "1742397526022-0", m.MinIDApprox)
59+
assert.Equal(t, 1*time.Hour, m.StreamTTL)
5960
})
6061

6162
// TODO: fix the code to return the error for the missing property to make this test work

0 commit comments

Comments
 (0)