Skip to content

Commit 0ed10a5

Browse files
elena-kolevskacicoylenelson-parenteyaron2berndverst
authored andcommitted
Adds Redis stream trimming by time(stream ID) (dapr#3710)
Signed-off-by: Elena Kolevska <[email protected]> Co-authored-by: Cassie Coyle <[email protected]> Co-authored-by: Nelson Parente <[email protected]> Co-authored-by: Yaron Schneider <[email protected]> Co-authored-by: Bernd Verst <[email protected]>
1 parent 5deb249 commit 0ed10a5

File tree

8 files changed

+69
-9
lines changed

8 files changed

+69
-9
lines changed

common/component/redis/redis.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ type RedisClient interface {
8282
ConfigurationSubscribe(ctx context.Context, args *ConfigurationSubscribeArgs)
8383
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error)
8484
EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error)
85-
XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error)
85+
XAdd(ctx context.Context, stream string, maxLenApprox int64, streamTTL string, values map[string]interface{}) (string, error)
8686
XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error
8787
XAck(ctx context.Context, stream string, group string, messageID string) error
8888
XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error)

common/component/redis/settings.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ type Settings struct {
102102
// The max len of stream
103103
MaxLenApprox int64 `mapstructure:"maxLenApprox" mdonly:"pubsub"`
104104

105+
// The TTL of stream entries
106+
StreamTTL time.Duration `mapstructure:"streamTTL" mdonly:"pubsub"`
107+
105108
// EntraID / AzureAD Authentication based on the shared code which essentially uses the DefaultAzureCredential
106109
// from the official Azure Identity SDK for Go
107110
UseEntraID bool `mapstructure:"useEntraID" mapstructurealiases:"useAzureAD"`
@@ -127,6 +130,15 @@ func (s *Settings) SetCertificate(fn func(cert *tls.Certificate)) error {
127130
return nil
128131
}
129132

133+
func (s *Settings) GetMinID(now time.Time) string {
134+
// If StreamTTL is not set, return empty string (no trimming)
135+
if s.StreamTTL == 0 {
136+
return ""
137+
}
138+
139+
return fmt.Sprintf("%d-1", now.Add(-s.StreamTTL).UnixMilli())
140+
}
141+
130142
type Duration time.Duration
131143

132144
func (r *Duration) DecodeString(value string) error {

common/component/redis/settings_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package redis
33
import (
44
"crypto/tls"
55
"testing"
6+
"time"
67

78
"github.com/stretchr/testify/require"
89
)
@@ -39,4 +40,34 @@ func TestSettings(t *testing.T) {
3940
require.NoError(t, err)
4041
require.NotNil(t, c)
4142
})
43+
44+
t.Run("stream TTL", func(t *testing.T) {
45+
fixedTime := time.Date(2025, 3, 14, 0o1, 59, 26, 0, time.UTC)
46+
47+
tests := []struct {
48+
name string
49+
streamTTL time.Duration
50+
want string
51+
}{
52+
{
53+
name: "with one hour TTL",
54+
streamTTL: time.Hour,
55+
want: "1741913966000-1",
56+
},
57+
{
58+
name: "with zero TTL",
59+
streamTTL: 0,
60+
want: "",
61+
},
62+
}
63+
64+
for _, tt := range tests {
65+
t.Run(tt.name, func(t *testing.T) {
66+
settings := &Settings{
67+
StreamTTL: tt.streamTTL,
68+
}
69+
require.Equal(t, tt.want, settings.GetMinID(fixedTime))
70+
})
71+
}
72+
})
4273
}

common/component/redis/v8client.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (c v8Client) SetNX(ctx context.Context, key string, value interface{}, expi
161161
return &val, nx.Err()
162162
}
163163

164-
func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
164+
func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) {
165165
var writeCtx context.Context
166166
if c.writeTimeout > 0 {
167167
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
@@ -171,9 +171,11 @@ func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v
171171
writeCtx = ctx
172172
}
173173
return c.client.XAdd(writeCtx, &v8.XAddArgs{
174-
Stream: stream,
175-
Values: values,
176-
MaxLenApprox: maxLenApprox,
174+
Stream: stream,
175+
Values: values,
176+
MaxLen: maxLenApprox,
177+
MinID: minIDApprox,
178+
Approx: true,
177179
}).Result()
178180
}
179181

common/component/redis/v9client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (c v9Client) SetNX(ctx context.Context, key string, value interface{}, expi
161161
return &val, nx.Err()
162162
}
163163

164-
func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
164+
func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) {
165165
var writeCtx context.Context
166166
if c.writeTimeout > 0 {
167167
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
@@ -174,6 +174,7 @@ func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v
174174
Stream: stream,
175175
Values: values,
176176
MaxLen: maxLenApprox,
177+
MinID: minIDApprox,
177178
Approx: true,
178179
}).Result()
179180
}

pubsub/redis/metadata.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ metadata:
165165
- name: failover
166166
required: false
167167
description: |
168-
Property to enabled failover configuration. Needs sentinalMasterName to be set. Defaults to "false"
168+
Property to enabled failover configuration. Needs sentinelMasterName to be set. Defaults to "false"
169169
example: "false"
170170
type: bool
171171
- name: sentinelMasterName
@@ -175,9 +175,19 @@ metadata:
175175
type: string
176176
- name: maxLenApprox
177177
required: false
178-
description: Maximum number of items inside a stream.The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
178+
description: Maximum number of items inside a stream. The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
179179
example: "10000"
180180
type: number
181+
- name: streamTTL
182+
required: false
183+
description: |
184+
TTL duration for stream entries. Entries older than this duration will be evicted.
185+
This is an approximate value, as it's implemented using Redis stream's MINID trimming with the '~' modifier.
186+
The actual retention may include slightly more entries than strictly defined by the TTL,
187+
as Redis optimizes the trimming operation for efficiency by potentially keeping some additional entries.
188+
example: "30d"
189+
type: duration
190+
181191
builtinAuthenticationProfiles:
182192
- name: "azuread"
183193
metadata:

pubsub/redis/redis.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ const (
3838
queueDepth = "queueDepth"
3939
concurrency = "concurrency"
4040
maxLenApprox = "maxLenApprox"
41+
streamTTL = "streamTTL"
4142
)
4243

4344
// redisStreams handles consuming from a Redis stream using
@@ -112,7 +113,7 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest)
112113
redisPayload["metadata"] = serializedMetadata
113114
}
114115

115-
_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, redisPayload)
116+
_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, r.clientSettings.GetMinID(time.Now()), redisPayload)
116117
if err != nil {
117118
return fmt.Errorf("redis streams: error from publish: %s", err)
118119
}

pubsub/redis/redis_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"strconv"
2020
"sync"
2121
"testing"
22+
"time"
2223

2324
"github.com/stretchr/testify/assert"
2425
"github.com/stretchr/testify/require"
@@ -35,6 +36,7 @@ func getFakeProperties() map[string]string {
3536
consumerID: "fakeConsumer",
3637
enableTLS: "true",
3738
maxLenApprox: "1000",
39+
streamTTL: "1h",
3840
}
3941
}
4042

@@ -54,6 +56,7 @@ func TestParseRedisMetadata(t *testing.T) {
5456
require.NoError(t, err)
5557
assert.Equal(t, fakeProperties[consumerID], m.ConsumerID)
5658
assert.Equal(t, int64(1000), m.MaxLenApprox)
59+
assert.Equal(t, 1*time.Hour, m.StreamTTL)
5760
})
5861

5962
// TODO: fix the code to return the error for the missing property to make this test work

0 commit comments

Comments
 (0)