Skip to content

Commit f72aff0

Browse files
committed
Implementing Bucket index sync status
1 parent e2e5bcf commit f72aff0

File tree

10 files changed

+220
-49
lines changed

10 files changed

+220
-49
lines changed

pkg/compactor/blocks_cleaner.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
206206
if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
207207
return err
208208
}
209+
210+
// Delete the bucket sync status
211+
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
212+
return err
213+
}
209214
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
210215

211216
var deletedBlocks, failed int
@@ -327,9 +332,11 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
327332
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
328333
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
329334
// Give up cleaning if we get access denied
330-
level.Warn(userLogger).Log("msg", err.Error())
335+
level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err)
336+
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.CustomerManagedKeyError, userLogger)
331337
return nil
332338
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
339+
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.GenericError, userLogger)
333340
return err
334341
}
335342

@@ -348,6 +355,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
348355
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
349356
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
350357
if err != nil {
358+
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.GenericError, userLogger)
351359
return err
352360
}
353361

@@ -398,7 +406,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
398406
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
399407
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
400408
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
401-
409+
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.Ok, userLogger)
402410
return nil
403411
}
404412

pkg/compactor/blocks_cleaner_test.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"crypto/rand"
66
"fmt"
7+
"os"
78
"path"
89
"strings"
910
"testing"
@@ -57,7 +58,7 @@ func TestBlocksCleaner(t *testing.T) {
5758
func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
5859
const userID = "user-1"
5960

60-
bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
61+
bucketClient, dir := cortex_testutil.PrepareFilesystemBucket(t)
6162
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
6263

6364
// Create blocks.
@@ -83,6 +84,12 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
8384
cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
8485
err := cleaner.cleanUser(ctx, userID, true)
8586
require.NoError(t, err)
87+
stat, err := os.Stat(path.Join(dir, userID, bucketindex.SyncStatusFile))
88+
require.NoError(t, err)
89+
require.True(t, stat.Size() > 0)
90+
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
91+
require.NoError(t, err)
92+
require.Equal(t, bucketindex.CustomerManagedKeyError, s)
8693
}
8794

8895
func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
@@ -232,6 +239,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
232239
require.NoError(t, err)
233240
assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs())
234241
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
242+
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger)
243+
require.NoError(t, err)
244+
require.Equal(t, bucketindex.Ok, s)
235245
}
236246

237247
assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
@@ -385,6 +395,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
385395
require.NoError(t, err)
386396
assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs())
387397
assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs())
398+
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
399+
require.NoError(t, err)
400+
require.Equal(t, bucketindex.Ok, s)
388401
}
389402

390403
func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) {

pkg/compactor/compactor_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
473473
bucketClient.MockUpload(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/visit-mark.json", nil)
474474
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
475475
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
476+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
476477

477478
c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
478479
tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan"))
@@ -538,6 +539,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
538539
bucketClient.MockIter("user-2/markers/", nil, nil)
539540
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
540541
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
542+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
543+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
541544

542545
c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)
543546

@@ -674,6 +677,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
674677
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
675678
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
676679
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
680+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
677681

678682
c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)
679683

@@ -799,6 +803,8 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {
799803
bucketClient.MockIter("user-2/markers/", nil, nil)
800804
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
801805
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
806+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
807+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
802808

803809
c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
804810

@@ -850,6 +856,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
850856
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil)
851857
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil)
852858
bucketClient.MockDelete("user-1/bucket-index.json.gz", nil)
859+
bucketClient.MockDelete("user-1/bucket-index-sync-status.json", nil)
853860

854861
c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)
855862

@@ -1026,6 +1033,8 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
10261033
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
10271034
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
10281035
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
1036+
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
1037+
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)
10291038

10301039
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
10311040
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
@@ -1107,6 +1116,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
11071116
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
11081117
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
11091118
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
1119+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
11101120
}
11111121

11121122
// Create a shared KV Store
@@ -1230,6 +1240,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
12301240
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
12311241
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
12321242
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
1243+
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
12331244
}
12341245

12351246
// Create a shared KV Store

pkg/querier/blocks_finder_bucket_index.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string,
5858
}
5959

6060
// Get the bucket index for this user.
61-
idx, err := f.loader.GetIndex(ctx, userID)
61+
idx, ss, err := f.loader.GetIndex(ctx, userID)
6262
if errors.Is(err, bucketindex.ErrIndexNotFound) {
6363
// This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet
6464
// so the bucket index hasn't been created yet.
@@ -69,6 +69,10 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string,
6969
return nil, nil, validation.AccessDeniedError(err.Error())
7070
}
7171

72+
if ss == bucketindex.CustomerManagedKeyError {
73+
return nil, nil, validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error())
74+
}
75+
7276
if err != nil {
7377
return nil, nil, err
7478
}

pkg/querier/blocks_finder_bucket_index_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/stretchr/testify/require"
1414
"github.com/thanos-io/objstore"
1515

16+
"github.com/cortexproject/cortex/pkg/storage/bucket"
17+
1618
"github.com/cortexproject/cortex/pkg/util/validation"
1719

1820
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
@@ -221,6 +223,28 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) {
221223
require.Equal(t, errBucketIndexTooOld, err)
222224
}
223225

226+
func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOldWithCustomerKeyError(t *testing.T) {
227+
t.Parallel()
228+
229+
const userID = "user-1"
230+
231+
ctx := context.Background()
232+
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
233+
finder := prepareBucketIndexBlocksFinder(t, bkt)
234+
235+
require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{
236+
Version: bucketindex.IndexVersion1,
237+
Blocks: bucketindex.Blocks{},
238+
BlockDeletionMarks: bucketindex.BlockDeletionMarks{},
239+
UpdatedAt: time.Now().Add(-2 * time.Hour).Unix(),
240+
}))
241+
242+
bucketindex.WriteSyncStatus(ctx, bkt, userID, bucketindex.CustomerManagedKeyError, log.NewNopLogger())
243+
244+
_, _, err := finder.GetBlocks(ctx, userID, 10, 20)
245+
require.Equal(t, validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error()), err)
246+
}
247+
224248
func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIndexBlocksFinder {
225249
ctx := context.Background()
226250
cfg := BucketIndexBlocksFinderConfig{

pkg/storage/tsdb/bucketindex/loader.go

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ func NewLoader(cfg LoaderConfig, bucketClient objstore.Bucket, cfgProvider bucke
9191

9292
// GetIndex returns the bucket index for the given user. It returns the in-memory cached
9393
// index if available, or load it from the bucket otherwise.
94-
func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {
94+
func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, SyncStatus, error) {
9595
l.indexesMx.RLock()
9696
if entry := l.indexes[userID]; entry != nil {
9797
idx := entry.index
@@ -101,7 +101,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {
101101
// We don't check if the index is stale because it's the responsibility
102102
// of the background job to keep it updated.
103103
entry.requestedAt.Store(time.Now().Unix())
104-
return idx, err
104+
return idx, entry.syncStatus, err
105105
}
106106
l.indexesMx.RUnlock()
107107

@@ -111,7 +111,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {
111111
if err != nil {
112112
// Cache the error, to avoid hammering the object store in case of persistent issues
113113
// (eg. corrupted bucket index or not existing).
114-
l.cacheIndex(userID, nil, err)
114+
l.cacheIndex(userID, nil, Unknown, err)
115115

116116
if errors.Is(err, ErrIndexNotFound) {
117117
level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID)
@@ -124,25 +124,31 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) {
124124
level.Error(l.logger).Log("msg", "unable to load bucket index", "user", userID, "err", err)
125125
}
126126

127-
return nil, err
127+
return nil, Unknown, err
128+
}
129+
130+
ss, err := ReadSyncStatus(ctx, l.bkt, userID, l.logger)
131+
132+
if err != nil {
133+
level.Warn(l.logger).Log("msg", "unable to read bucket index status", "user", userID, "err", err)
128134
}
129135

130136
// Cache the index.
131-
l.cacheIndex(userID, idx, nil)
137+
l.cacheIndex(userID, idx, ss, nil)
132138

133139
elapsedTime := time.Since(startTime)
134140
l.loadDuration.Observe(elapsedTime.Seconds())
135141
level.Info(l.logger).Log("msg", "loaded bucket index", "user", userID, "duration", elapsedTime)
136-
return idx, nil
142+
return idx, ss, nil
137143
}
138144

139-
func (l *Loader) cacheIndex(userID string, idx *Index, err error) {
145+
func (l *Loader) cacheIndex(userID string, idx *Index, ss SyncStatus, err error) {
140146
l.indexesMx.Lock()
141147
defer l.indexesMx.Unlock()
142148

143149
// Not an issue if, due to concurrency, another index was already cached
144150
// and we overwrite it: last will win.
145-
l.indexes[userID] = newCachedIndex(idx, err)
151+
l.indexes[userID] = newCachedIndex(idx, ss, err)
146152
}
147153

148154
// checkCachedIndexes checks all cached indexes and, for each of them, does two things:
@@ -240,8 +246,9 @@ func (l *Loader) countLoadedIndexesMetric() float64 {
240246
type cachedIndex struct {
241247
// We cache either the index or the error occurred while fetching it. They're
242248
// mutually exclusive.
243-
index *Index
244-
err error
249+
index *Index
250+
syncStatus SyncStatus
251+
err error
245252

246253
// Unix timestamp (seconds) of when the index has been updated from the storage the last time.
247254
updatedAt atomic.Int64
@@ -250,10 +257,11 @@ type cachedIndex struct {
250257
requestedAt atomic.Int64
251258
}
252259

253-
func newCachedIndex(idx *Index, err error) *cachedIndex {
260+
func newCachedIndex(idx *Index, ss SyncStatus, err error) *cachedIndex {
254261
entry := &cachedIndex{
255-
index: idx,
256-
err: err,
262+
index: idx,
263+
err: err,
264+
syncStatus: ss,
257265
}
258266

259267
now := time.Now()

0 commit comments

Comments
 (0)