Skip to content

Commit 461062d

Browse files
authored
feat: Improve refs garbage collection for redis Redis using TTL (#983)
* feat: add merge request handle to garbage collect closed mr refs * refactor: simplify the code we already have a deleteRef function * fix: if no pipeline found check for merge results pipeline * fix: add missing return case when no pipeline found * feat: add ttl on redis field only redis 7.4 support expire on hashmap thus i had to make a workaround using keys * fix: continue if key has expired * chore: better debug for garbage collection * fix: debug log reporting wrong progress * fix: a couple issue with the rebase
1 parent e6417c7 commit 461062d

File tree

12 files changed

+219
-40
lines changed

12 files changed

+219
-40
lines changed

pkg/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/creasty/defaults"
78
"github.com/go-playground/validator/v10"
@@ -131,6 +132,10 @@ type Redis struct {
131132
// URL used to connect onto the redis endpoint
132133
// format: redis[s]://[:password@]host[:port][/db-number][?option=value])
133134
URL string `yaml:"url"`
135+
136+
ProjectTTL time.Duration `default:"168h" yaml:"project_ttl"`
137+
RefTTL time.Duration `default:"1h" yaml:"ref_ttl"`
138+
MetricTTL time.Duration `default:"1h" yaml:"metric_ttl"`
134139
}
135140

136141
// Pull ..

pkg/config/config_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package config
22

33
import (
44
"testing"
5+
"time"
56

67
log "github.com/sirupsen/logrus"
78
"github.com/stretchr/testify/assert"
@@ -74,6 +75,10 @@ func TestNew(t *testing.T) {
7475
c.ProjectDefaults.Pull.Pipeline.Jobs.RunnerDescription.AggregationRegexp = `shared-runners-manager-(\d*)\.gitlab\.com`
7576
c.ProjectDefaults.Pull.Pipeline.Variables.Regexp = `.*`
7677

78+
c.Redis.ProjectTTL = 168 * time.Hour
79+
c.Redis.RefTTL = 1 * time.Hour
80+
c.Redis.MetricTTL = 1 * time.Hour
81+
7782
assert.Equal(t, c, New())
7883
}
7984

pkg/controller/controller.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,23 @@ func New(ctx context.Context, cfg config.Config, version string) (c Controller,
4747
return
4848
}
4949

50-
if err = c.configureRedis(ctx, cfg.Redis.URL); err != nil {
50+
if err = c.configureRedis(ctx, &cfg.Redis); err != nil {
5151
return
5252
}
5353

5454
c.TaskController = NewTaskController(ctx, c.Redis, cfg.Gitlab.MaximumJobsQueueSize)
5555
c.registerTasks()
5656

57-
c.Store = store.New(ctx, c.Redis, c.Config.Projects)
57+
var redisStore *store.Redis
58+
if c.Redis != nil {
59+
redisStore = store.NewRedisStore(c.Redis, store.WithTTLConfig(&store.RedisTTLConfig{
60+
Project: cfg.Redis.ProjectTTL,
61+
Ref: cfg.Redis.RefTTL,
62+
Metric: cfg.Redis.MetricTTL,
63+
}))
64+
}
65+
66+
c.Store = store.New(ctx, redisStore, c.Config.Projects)
5867

5968
if err = c.configureGitlab(cfg.Gitlab, version); err != nil {
6069
return
@@ -169,11 +178,11 @@ func (c *Controller) configureGitlab(cfg config.Gitlab, version string) (err err
169178
return
170179
}
171180

172-
func (c *Controller) configureRedis(ctx context.Context, url string) (err error) {
181+
func (c *Controller) configureRedis(ctx context.Context, config *config.Redis) (err error) {
173182
ctx, span := otel.Tracer(tracerName).Start(ctx, "controller:configureRedis")
174183
defer span.End()
175184

176-
if len(url) <= 0 {
185+
if len(config.URL) <= 0 {
177186
log.Debug("redis url is not configured, skipping configuration & using local driver")
178187

179188
return
@@ -183,7 +192,7 @@ func (c *Controller) configureRedis(ctx context.Context, url string) (err error)
183192

184193
var opt *redis.Options
185194

186-
if opt, err = redis.ParseURL(url); err != nil {
195+
if opt, err = redis.ParseURL(config.URL); err != nil {
187196
return
188197
}
189198

pkg/controller/garbage_collector.go

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,20 @@ func (c *Controller) GarbageCollectRefs(ctx context.Context) error {
168168
return err
169169
}
170170

171+
storedRefsLen := len(storedRefs)
172+
var i int
171173
for _, ref := range storedRefs {
174+
i++
175+
log.WithFields(log.Fields{"progress": i, "total": storedRefsLen}).Debug("ongoing 'refs' garbage collection")
176+
if c.Store.HasRefExpired(ctx, ref.Key()) {
177+
if err = deleteRef(ctx, c.Store, ref, "expired"); err != nil {
178+
return err
179+
}
180+
181+
continue
182+
}
183+
184+
// Check Project Still Exist
172185
projectExists, err := c.Store.ProjectExists(ctx, ref.Project.Key())
173186
if err != nil {
174187
return err
@@ -275,23 +288,28 @@ func (c *Controller) GarbageCollectMetrics(ctx context.Context) error {
275288
return err
276289
}
277290

291+
storedMetricsLen := len(storedMetrics)
292+
var i int
278293
for k, m := range storedMetrics {
294+
i++
295+
log.WithFields(log.Fields{"progress": i, "total": storedMetricsLen}).Debug("ongoing 'metrics' garbage collection")
296+
if c.Store.HasMetricExpired(ctx, m.Key()) {
297+
if err = deleteMetric(ctx, c.Store, m, "expired"); err != nil {
298+
return err
299+
}
300+
301+
continue
302+
}
279303
// In order to save some memory space we chose to have to recompose
280304
// the Ref the metric belongs to
281305
metricLabelProject, metricLabelProjectExists := m.Labels["project"]
282306
metricLabelRef, metricLabelRefExists := m.Labels["ref"]
283307
metricLabelEnvironment, metricLabelEnvironmentExists := m.Labels["environment"]
284308

285309
if !metricLabelProjectExists || (!metricLabelRefExists && !metricLabelEnvironmentExists) {
286-
if err = c.Store.DelMetric(ctx, k); err != nil {
310+
if err = deleteMetric(ctx, c.Store, m, "project-or-ref-and-environment-label-undefined"); err != nil {
287311
return err
288312
}
289-
290-
log.WithFields(log.Fields{
291-
"metric-kind": m.Kind,
292-
"metric-labels": m.Labels,
293-
"reason": "project-or-ref-and-environment-label-undefined",
294-
}).Info("deleted metric from the store")
295313
}
296314

297315
if metricLabelRefExists && !metricLabelEnvironmentExists {
@@ -374,33 +392,21 @@ func (c *Controller) GarbageCollectMetrics(ctx context.Context) error {
374392

375393
// If the ref does not exist anymore, delete the metric
376394
if !envExists {
377-
if err = c.Store.DelMetric(ctx, k); err != nil {
395+
if err = deleteMetric(ctx, c.Store, m, "non-existent-environment"); err != nil {
378396
return err
379397
}
380398

381-
log.WithFields(log.Fields{
382-
"metric-kind": m.Kind,
383-
"metric-labels": m.Labels,
384-
"reason": "non-existent-environment",
385-
}).Info("deleted metric from the store")
386-
387399
continue
388400
}
389401

390402
// Check if 'output sparse statuses metrics' has been enabled
391403
switch m.Kind {
392404
case schemas.MetricKindEnvironmentDeploymentStatus:
393405
if env.OutputSparseStatusMetrics && m.Value != 1 {
394-
if err = c.Store.DelMetric(ctx, k); err != nil {
406+
if err = deleteMetric(ctx, c.Store, m, "output-sparse-metrics-enabled-on-environment"); err != nil {
395407
return err
396408
}
397409

398-
log.WithFields(log.Fields{
399-
"metric-kind": m.Kind,
400-
"metric-labels": m.Labels,
401-
"reason": "output-sparse-metrics-enabled-on-environment",
402-
}).Info("deleted metric from the store")
403-
404410
continue
405411
}
406412
}
@@ -438,3 +444,17 @@ func deleteRef(ctx context.Context, s store.Store, ref schemas.Ref, reason strin
438444

439445
return
440446
}
447+
448+
func deleteMetric(ctx context.Context, s store.Store, m schemas.Metric, reason string) (err error) {
449+
if err = s.DelMetric(ctx, m.Key()); err != nil {
450+
return
451+
}
452+
453+
log.WithFields(log.Fields{
454+
"metric-kind": m.Kind,
455+
"metric-labels": m.Labels,
456+
"reason": reason,
457+
}).Info("deleted metric from the store")
458+
459+
return
460+
}

pkg/controller/pipelines.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,21 @@ func (c *Controller) PullRefMetrics(ctx context.Context, ref schemas.Ref) error
5353
return fmt.Errorf("error fetching project pipelines for %s: %v", ref.Project.Name, err)
5454
}
5555

56+
if len(pipelines) == 0 && ref.Kind == schemas.RefKindMergeRequest {
57+
refName = fmt.Sprintf("refs/merge-requests/%s/merge", ref.Name)
58+
pipelines, _, err = c.Gitlab.GetProjectPipelines(ctx, ref.Project.Name, &goGitlab.ListProjectPipelinesOptions{
59+
// We only need the most recent pipeline
60+
ListOptions: goGitlab.ListOptions{
61+
PerPage: 1,
62+
Page: 1,
63+
},
64+
Ref: &refName,
65+
})
66+
if err != nil {
67+
return fmt.Errorf("error fetching project pipelines for %s: %v", ref.Project.Name, err)
68+
}
69+
}
70+
5671
if len(pipelines) == 0 {
5772
log.WithFields(logFields).Debug("could not find any pipeline for the ref")
5873

pkg/controller/webhooks.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,16 +133,28 @@ func (c *Controller) processMergeEvent(ctx context.Context, e goGitlab.MergeEven
133133

134134
switch e.ObjectAttributes.Action {
135135
case "close":
136-
_ = deleteRef(ctx, c.Store, ref, "received merge request close event from webhook")
136+
c.triggerRefDeletion(ctx, ref)
137137
case "merge":
138-
_ = deleteRef(ctx, c.Store, ref, "received merge request merge event from webhook")
138+
c.triggerRefDeletion(ctx, ref)
139139
default:
140140
log.
141141
WithField("merge-request-event-type", e.ObjectAttributes.Action).
142142
Debug("received a non supported merge-request event type as a webhook")
143143
}
144144
}
145145

146+
func (c *Controller) triggerRefDeletion(ctx context.Context, ref schemas.Ref) {
147+
err := c.Store.DelRef(ctx, ref.Key())
148+
if err != nil {
149+
log.WithContext(ctx).
150+
WithFields(log.Fields{
151+
"project-name": ref.Project.Name,
152+
"ref": ref.Name,
153+
}).
154+
Error("failed deleting ref")
155+
}
156+
}
157+
146158
func (c *Controller) triggerRefMetricsPull(ctx context.Context, ref schemas.Ref) {
147159
logFields := log.Fields{
148160
"project-name": ref.Project.Name,

pkg/schemas/ref.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
const (
13-
mergeRequestRegexp string = `^((\d+)|refs/merge-requests/(\d+)/head)$`
13+
mergeRequestRegexp string = `^((\d+)|refs/merge-requests/(\d+)/(?:head|merge))$`
1414

1515
// RefKindBranch refers to a branch.
1616
RefKindBranch RefKind = "branch"

pkg/store/local.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,21 @@ type Local struct {
2626
executedTasksCount uint64
2727
}
2828

29+
// HasProjectExpired ..
30+
func (l *Local) HasProjectExpired(ctx context.Context, key schemas.ProjectKey) bool {
31+
return false
32+
}
33+
34+
// HasRefExpired ..
35+
func (l *Local) HasRefExpired(ctx context.Context, key schemas.RefKey) bool {
36+
return false
37+
}
38+
39+
// HasMetricExpired ..
40+
func (l *Local) HasMetricExpired(ctx context.Context, key schemas.MetricKey) bool {
41+
return false
42+
}
43+
2944
// SetProject ..
3045
func (l *Local) SetProject(_ context.Context, p schemas.Project) error {
3146
l.projectsMutex.Lock()

pkg/store/redis.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,29 @@ const (
2525
// Redis ..
2626
type Redis struct {
2727
*redis.Client
28+
StoreConfig *RedisStoreConfig
2829
}
2930

31+
// RedisStoreConfig allows to fine tune the store behaviour.
32+
type RedisStoreConfig struct {
33+
TTLConfig *RedisTTLConfig
34+
}
35+
36+
// RedisTTLConfig allows to set the TTL values for the various fields tracked.
37+
type RedisTTLConfig struct {
38+
Project time.Duration
39+
Ref time.Duration
40+
Metric time.Duration
41+
}
42+
43+
func WithTTLConfig(opt *RedisTTLConfig) func(*RedisStoreConfig) {
44+
return func(cfg *RedisStoreConfig) {
45+
cfg.TTLConfig = opt
46+
}
47+
}
48+
49+
type RedisStoreOptions func(opts *RedisStoreConfig)
50+
3051
// SetProject ..
3152
func (r *Redis) SetProject(ctx context.Context, p schemas.Project) error {
3253
marshalledProject, err := msgpack.Marshal(p)
@@ -35,6 +56,13 @@ func (r *Redis) SetProject(ctx context.Context, p schemas.Project) error {
3556
}
3657

3758
_, err = r.HSet(ctx, redisProjectsKey, string(p.Key()), marshalledProject).Result()
59+
if err != nil {
60+
return err
61+
}
62+
63+
if r.StoreConfig.TTLConfig != nil {
64+
_, err = r.Set(ctx, getTTLProjectKey(p.Key()), true, r.StoreConfig.TTLConfig.Project).Result()
65+
}
3866

3967
return err
4068
}
@@ -183,6 +211,13 @@ func (r *Redis) SetRef(ctx context.Context, ref schemas.Ref) error {
183211
}
184212

185213
_, err = r.HSet(ctx, redisRefsKey, string(ref.Key()), marshalledRef).Result()
214+
if err != nil {
215+
return err
216+
}
217+
218+
if r.StoreConfig.TTLConfig != nil {
219+
_, err = r.Set(ctx, getTTLRefKey(ref.Key()), true, r.StoreConfig.TTLConfig.Ref).Result()
220+
}
186221

187222
return err
188223
}
@@ -257,6 +292,13 @@ func (r *Redis) SetMetric(ctx context.Context, m schemas.Metric) error {
257292
}
258293

259294
_, err = r.HSet(ctx, redisMetricsKey, string(m.Key()), marshalledMetric).Result()
295+
if err != nil {
296+
return err
297+
}
298+
299+
if r.StoreConfig.TTLConfig != nil {
300+
_, err = r.Set(ctx, getTTLMetricKey(m.Key()), true, r.StoreConfig.TTLConfig.Metric).Result()
301+
}
260302

261303
return err
262304
}
@@ -418,3 +460,45 @@ func (r *Redis) ExecutedTasksCount(ctx context.Context) (uint64, error) {
418460

419461
return uint64(c), err
420462
}
463+
464+
// HasProjectExpired ..
465+
func (r *Redis) HasProjectExpired(ctx context.Context, key schemas.ProjectKey) bool {
466+
reply, err := r.Exists(ctx, getTTLProjectKey(key)).Result()
467+
if err != nil {
468+
return false
469+
}
470+
471+
return reply > 0
472+
}
473+
474+
func getTTLProjectKey(key schemas.ProjectKey) string {
475+
return fmt.Sprintf("%s:%s", redisProjectsKey, key)
476+
}
477+
478+
// HasRefExpired ..
479+
func (r *Redis) HasRefExpired(ctx context.Context, key schemas.RefKey) bool {
480+
reply, err := r.Exists(ctx, getTTLRefKey(key)).Result()
481+
if err != nil {
482+
return false
483+
}
484+
485+
return reply > 0
486+
}
487+
488+
func getTTLRefKey(key schemas.RefKey) string {
489+
return fmt.Sprintf("%s:%s", redisRefsKey, key)
490+
}
491+
492+
// HasMetricExpired ..
493+
func (r *Redis) HasMetricExpired(ctx context.Context, key schemas.MetricKey) bool {
494+
reply, err := r.Exists(ctx, getTTLMetricKey(key)).Result()
495+
if err != nil {
496+
return false
497+
}
498+
499+
return reply > 0
500+
}
501+
502+
func getTTLMetricKey(key schemas.MetricKey) string {
503+
return fmt.Sprintf("%s:%s", redisMetricsKey, key)
504+
}

0 commit comments

Comments
 (0)