Skip to content

Commit 28142a5

Browse files
committed
adding gs
1 parent da779f2 commit 28142a5

File tree

11 files changed

+237
-81
lines changed

11 files changed

+237
-81
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -326,17 +326,40 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
326326
}
327327
}
328328

329+
// Reading bucket index sync stats
330+
idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, userLogger)
331+
332+
if err != nil {
333+
level.Warn(userLogger).Log("msg", "error reading the bucket index status", "err", err)
334+
idxs = bucketindex.Status{Version: bucketindex.SyncStatusFileVersion, NonQueryableReason: bucketindex.Unknown}
335+
}
336+
337+
idxs.Status = bucketindex.Ok
338+
idxs.SyncTime = time.Now().Unix()
339+
329340
// Read the bucket index.
330341
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)
342+
343+
defer func() {
344+
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
345+
}()
346+
331347
if errors.Is(err, bucketindex.ErrIndexCorrupted) {
332348
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
333349
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
334350
// Give up cleaning if we get access denied
335351
level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err)
336-
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.CustomerManagedKeyError, userLogger)
352+
idxs.Status = bucketindex.CustomerManagedKeyError
353+
// Making the tenant non queryable until 2x the cleanup interval to give time to compactors and storegateways
354+
// to reload the bucket index in case the key access is re-granted
355+
idxs.NonQueryableUntil = time.Now().Add(2 * c.cfg.CleanupInterval).Unix()
356+
idxs.NonQueryableReason = bucketindex.CustomerManagedKeyError
357+
358+
// Update the bucket index update time
359+
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
337360
return nil
338361
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
339-
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.GenericError, userLogger)
362+
idxs.Status = bucketindex.GenericError
340363
return err
341364
}
342365

@@ -355,7 +378,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
355378
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
356379
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
357380
if err != nil {
358-
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.GenericError, userLogger)
381+
idxs.Status = bucketindex.GenericError
359382
return err
360383
}
361384

@@ -406,7 +429,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
406429
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
407430
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
408431
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
409-
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.Ok, userLogger)
410432
return nil
411433
}
412434

pkg/compactor/blocks_cleaner_test.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"crypto/rand"
66
"fmt"
7-
"os"
87
"path"
98
"strings"
109
"testing"
@@ -58,14 +57,14 @@ func TestBlocksCleaner(t *testing.T) {
5857
func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
5958
const userID = "user-1"
6059

61-
bucketClient, dir := cortex_testutil.PrepareFilesystemBucket(t)
62-
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
60+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
61+
bkt = bucketindex.BucketWithGlobalMarkers(bkt)
6362

6463
// Create blocks.
6564
ctx := context.Background()
6665
deletionDelay := 12 * time.Hour
67-
bucketClient = &cortex_testutil.MockBucketFailure{
68-
Bucket: bucketClient,
66+
mbucket := &cortex_testutil.MockBucketFailure{
67+
Bucket: bkt,
6968
GetFailures: map[string]error{
7069
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
7170
},
@@ -78,18 +77,37 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
7877
}
7978

8079
logger := log.NewNopLogger()
81-
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
80+
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
8281
cfgProvider := newMockConfigProvider()
8382

84-
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
85-
err := cleaner.cleanUser(ctx, userID, true)
83+
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil)
84+
85+
// Clean User with no error
86+
cleaner.bucketClient = bkt
87+
err := cleaner.cleanUser(ctx, userID, false)
8688
require.NoError(t, err)
87-
stat, err := os.Stat(path.Join(dir, userID, bucketindex.SyncStatusFile))
89+
s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
8890
require.NoError(t, err)
89-
require.True(t, stat.Size() > 0)
90-
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
91+
require.Equal(t, bucketindex.Ok, s.Status)
92+
require.Equal(t, int64(0), s.NonQueryableUntil)
93+
94+
// Clean with cmk error
95+
cleaner.bucketClient = mbucket
96+
err = cleaner.cleanUser(ctx, userID, false)
97+
require.NoError(t, err)
98+
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
99+
require.NoError(t, err)
100+
require.Equal(t, bucketindex.CustomerManagedKeyError, s.Status)
101+
require.Less(t, int64(0), s.NonQueryableUntil)
102+
103+
// Re grant access to the key
104+
cleaner.bucketClient = bkt
105+
err = cleaner.cleanUser(ctx, userID, false)
106+
require.NoError(t, err)
107+
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
91108
require.NoError(t, err)
92-
require.Equal(t, bucketindex.CustomerManagedKeyError, s)
109+
require.Equal(t, bucketindex.Ok, s.Status)
110+
require.Less(t, int64(0), s.NonQueryableUntil)
93111
}
94112

95113
func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
@@ -241,7 +259,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
241259
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
242260
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger)
243261
require.NoError(t, err)
244-
require.Equal(t, bucketindex.Ok, s)
262+
require.Equal(t, bucketindex.Ok, s.Status)
245263
}
246264

247265
assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
@@ -397,7 +415,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
397415
assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs())
398416
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
399417
require.NoError(t, err)
400-
require.Equal(t, bucketindex.Ok, s)
418+
require.Equal(t, bucketindex.Ok, s.Status)
401419
}
402420

403421
func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) {

pkg/compactor/compactor.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,14 @@ func (c *Compactor) compactUsers(ctx context.Context) {
675675
continue
676676
}
677677

678+
if idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, util_log.WithUserID(userID, c.logger)); err == nil {
679+
if idxs.Status == bucketindex.CustomerManagedKeyError {
680+
c.compactionRunSkippedTenants.Inc()
681+
level.Info(c.logger).Log("msg", "skipping compactUser due CustomerManagedKeyError", "user", userID)
682+
continue
683+
}
684+
}
685+
678686
ownedUsers[userID] = struct{}{}
679687

680688
if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil {

pkg/compactor/compactor_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,38 @@ func TestConfig_Validate(t *testing.T) {
151151
}
152152
}
153153

154+
func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) {
155+
t.Parallel()
156+
userID := "user-1"
157+
158+
ss := bucketindex.Status{Status: bucketindex.CustomerManagedKeyError, Version: bucketindex.SyncStatusFileVersion}
159+
content, err := json.Marshal(ss)
160+
require.NoError(t, err)
161+
162+
// No user blocks stored in the bucket.
163+
bucketClient := &bucket.ClientMock{}
164+
bucketClient.MockIter("", []string{userID}, nil)
165+
bucketClient.MockIter(userID+"/", []string{}, nil)
166+
bucketClient.MockIter(userID+"/markers/", nil, nil)
167+
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil)
168+
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
169+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
170+
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
171+
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
172+
173+
cfg := prepareConfig()
174+
c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil)
175+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))
176+
177+
// Wait until a run has completed.
178+
cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} {
179+
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
180+
})
181+
182+
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
183+
assert.Contains(t, strings.Split(strings.TrimSpace(logs.String()), "\n"), `level=info component=compactor msg="skipping compactUser due CustomerManagedKeyError" user=user-1`)
184+
}
185+
154186
func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
155187
t.Parallel()
156188

@@ -535,6 +567,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
535567
bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", "", nil)
536568
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
537569
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
570+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
571+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
538572
bucketClient.MockIter("user-1/markers/", nil, nil)
539573
bucketClient.MockIter("user-2/markers/", nil, nil)
540574
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
@@ -676,6 +710,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
676710
bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil)
677711
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
678712
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
713+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
679714
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
680715
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
681716

@@ -799,6 +834,8 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
799834

800835
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
801836
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
837+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
838+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
802839
bucketClient.MockIter("user-1/markers/", nil, nil)
803840
bucketClient.MockIter("user-2/markers/", nil, nil)
804841
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
@@ -1031,6 +1068,8 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
10311068
bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", nil)
10321069
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
10331070
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
1071+
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
1072+
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
10341073
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
10351074
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
10361075
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

pkg/ingester/ingester.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/prometheus/prometheus/config"
1717
"github.com/prometheus/prometheus/tsdb/chunks"
1818

19+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
20+
1921
"github.com/go-kit/log"
2022
"github.com/go-kit/log/level"
2123
"github.com/gogo/status"
@@ -2289,6 +2291,13 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants)
22892291
}
22902292
defer userDB.casState(activeShipping, active)
22912293

2294+
if idxs, err := bucketindex.ReadSyncStatus(ctx, i.TSDBState.bucket, userID, logutil.WithContext(ctx, i.logger)); err == nil {
2295+
if idxs.Status == bucketindex.CustomerManagedKeyError {
2296+
level.Info(logutil.WithContext(ctx, i.logger)).Log("msg", "skipping shipping blocks due CustomerManagedKeyError", "user", userID)
2297+
return nil
2298+
}
2299+
}
2300+
22922301
uploaded, err := userDB.shipper.Sync(ctx)
22932302
if err != nil {
22942303
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)

pkg/ingester/ingester_test.go

Lines changed: 49 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ import (
4040
"golang.org/x/sync/errgroup"
4141
"google.golang.org/grpc"
4242

43+
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
44+
4345
"github.com/cortexproject/cortex/pkg/chunk"
4446
"github.com/cortexproject/cortex/pkg/chunk/encoding"
4547
"github.com/cortexproject/cortex/pkg/cortexpb"
@@ -2642,40 +2644,60 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) {
26422644
}
26432645

26442646
func TestIngester_shipBlocks(t *testing.T) {
2645-
cfg := defaultIngesterTestConfig(t)
2646-
cfg.LifecyclerConfig.JoinAfter = 0
2647-
cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2
2647+
testCases := map[string]struct {
2648+
ss bucketindex.Status
2649+
expectetNumberOfCall int
2650+
}{
2651+
"should ship blocks if status ok": {
2652+
ss: bucketindex.Status{Version: bucketindex.IndexVersion1, Status: bucketindex.Ok},
2653+
expectetNumberOfCall: 1,
2654+
},
2655+
"should not ship on cmk errors": {
2656+
ss: bucketindex.Status{Version: bucketindex.IndexVersion1, Status: bucketindex.CustomerManagedKeyError},
2657+
expectetNumberOfCall: 0,
2658+
},
2659+
}
26482660

2649-
// Create ingester
2650-
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil)
2651-
require.NoError(t, err)
2652-
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
2653-
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
2661+
for name, tc := range testCases {
2662+
t.Run(name, func(t *testing.T) {
26542663

2655-
// Wait until it's ACTIVE
2656-
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
2657-
return i.lifecycler.GetState()
2658-
})
2664+
cfg := defaultIngesterTestConfig(t)
2665+
cfg.LifecyclerConfig.JoinAfter = 0
2666+
cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2
26592667

2660-
// Create the TSDB for 3 users and then replace the shipper with the mocked one
2661-
mocks := []*shipperMock{}
2662-
for _, userID := range []string{"user-1", "user-2", "user-3"} {
2663-
userDB, err := i.getOrCreateTSDB(userID, false)
2664-
require.NoError(t, err)
2665-
require.NotNil(t, userDB)
2668+
// Create ingester
2669+
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil)
2670+
require.NoError(t, err)
2671+
require.NoError(t, services.StartAndAwaitRunning(context.Background(), i))
2672+
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
26662673

2667-
m := &shipperMock{}
2668-
m.On("Sync", mock.Anything).Return(0, nil)
2669-
mocks = append(mocks, m)
2674+
// Wait until it's ACTIVE
2675+
test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} {
2676+
return i.lifecycler.GetState()
2677+
})
26702678

2671-
userDB.shipper = m
2672-
}
2679+
// Create the TSDB for 3 users and then replace the shipper with the mocked one
2680+
mocks := []*shipperMock{}
2681+
for _, userID := range []string{"user-1", "user-2", "user-3"} {
2682+
bucketindex.WriteSyncStatus(context.Background(), i.TSDBState.bucket, userID, tc.ss, log.NewNopLogger())
2683+
userDB, err := i.getOrCreateTSDB(userID, false)
2684+
require.NoError(t, err)
2685+
require.NotNil(t, userDB)
26732686

2674-
// Ship blocks and assert on the mocked shipper
2675-
i.shipBlocks(context.Background(), nil)
2687+
m := &shipperMock{}
2688+
m.On("Sync", mock.Anything).Return(0, nil)
2689+
mocks = append(mocks, m)
26762690

2677-
for _, m := range mocks {
2678-
m.AssertNumberOfCalls(t, "Sync", 1)
2691+
userDB.shipper = m
2692+
}
2693+
2694+
// Ship blocks and assert on the mocked shipper
2695+
i.shipBlocks(context.Background(), nil)
2696+
2697+
for _, m := range mocks {
2698+
m.AssertNumberOfCalls(t, "Sync", tc.expectetNumberOfCall)
2699+
}
2700+
})
26792701
}
26802702
}
26812703

pkg/querier/blocks_finder_bucket_index.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,12 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string,
6363
// This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet
6464
// so the bucket index hasn't been created yet.
6565
return nil, nil, nil
66-
}
67-
68-
if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
66+
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
6967
return nil, nil, validation.AccessDeniedError(err.Error())
7068
}
7169

72-
if ss == bucketindex.CustomerManagedKeyError {
70+
// Short circuit when bucket failed to be updated due CMK errors recently
71+
if time.Since(ss.GetNonQueryableUntil()) < 0 && ss.NonQueryableReason == bucketindex.CustomerManagedKeyError {
7372
return nil, nil, validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error())
7473
}
7574

0 commit comments

Comments
 (0)