From da779f232d8a4ec70b15710e9a1ad662b1ba6721 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Mon, 3 Jul 2023 20:15:14 -0700 Subject: [PATCH 1/2] Implementing Bucket index sync status --- pkg/compactor/blocks_cleaner.go | 12 ++- pkg/compactor/blocks_cleaner_test.go | 15 ++- pkg/compactor/compactor_test.go | 11 ++ pkg/querier/blocks_finder_bucket_index.go | 6 +- .../blocks_finder_bucket_index_test.go | 24 +++++ pkg/storage/tsdb/bucketindex/loader.go | 34 +++--- pkg/storage/tsdb/bucketindex/loader_test.go | 48 ++++----- pkg/storage/tsdb/bucketindex/storage.go | 102 ++++++++++++++++++ pkg/storage/tsdb/caching_bucket.go | 6 +- pkg/storage/tsdb/caching_bucket_test.go | 11 +- 10 files changed, 220 insertions(+), 49 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 920155b2f37..5f519e235e9 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -206,6 +206,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil { return err } + + // Delete the bucket sync status + if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil { + return err + } c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID) var deletedBlocks, failed int @@ -327,9 +332,11 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it") } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { // Give up cleaning if we get access denied - level.Warn(userLogger).Log("msg", err.Error()) + level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err) + bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.CustomerManagedKeyError, userLogger) return nil } else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) { + bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.GenericError, userLogger) return err } @@ -348,6 +355,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx) if err != nil { + bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.GenericError, userLogger) return err } @@ -398,7 +406,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials))) - + bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.Ok, userLogger) return nil } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index 707d5f36ab6..ce50882238f 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "fmt" + "os" "path" "strings" "testing" @@ -57,7 +58,7 @@ func TestBlocksCleaner(t *testing.T) { func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { const userID = "user-1" - bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t) + bucketClient, dir := cortex_testutil.PrepareFilesystemBucket(t) bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) // Create blocks. @@ -83,6 +84,12 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) err := cleaner.cleanUser(ctx, userID, true) require.NoError(t, err) + stat, err := os.Stat(path.Join(dir, userID, bucketindex.SyncStatusFile)) + require.NoError(t, err) + require.True(t, stat.Size() > 0) + s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger) + require.NoError(t, err) + require.Equal(t, bucketindex.CustomerManagedKeyError, s) } func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) { @@ -232,6 +239,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions require.NoError(t, err) assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs()) assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs()) + s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger) + require.NoError(t, err) + require.Equal(t, bucketindex.Ok, s) } assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` @@ -385,6 +395,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { require.NoError(t, err) assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs()) assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs()) + s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger) + require.NoError(t, err) + require.Equal(t, bucketindex.Ok, s) } func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index c4622e4da7e..6b4721c130c 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -473,6 +473,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant( bucketClient.MockUpload(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/visit-mark.json", nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) + bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil) tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan")) @@ -538,6 +539,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockIter("user-2/markers/", nil, nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) + bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil) c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil) @@ -674,6 +677,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil) @@ -799,6 +803,8 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { bucketClient.MockIter("user-2/markers/", nil, nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) + bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil) c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil) @@ -850,6 +856,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil) bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil) bucketClient.MockDelete("user-1/bucket-index.json.gz", nil) + bucketClient.MockDelete("user-1/bucket-index-sync-status.json", nil) c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil) @@ -1026,6 +1033,8 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) + bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) + bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil) ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) t.Cleanup(func() { assert.NoError(t, closer.Close()) }) @@ -1107,6 +1116,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) + bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) } // Create a shared KV Store @@ -1230,6 +1240,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) + bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) } // Create a shared KV Store diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index ccf9e7b7728..260ca792d81 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -58,7 +58,7 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, } // Get the bucket index for this user. - idx, err := f.loader.GetIndex(ctx, userID) + idx, ss, err := f.loader.GetIndex(ctx, userID) if errors.Is(err, bucketindex.ErrIndexNotFound) { // This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet // so the bucket index hasn't been created yet. @@ -69,6 +69,10 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, return nil, nil, validation.AccessDeniedError(err.Error()) } + if ss == bucketindex.CustomerManagedKeyError { + return nil, nil, validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error()) + } + if err != nil { return nil, nil, err } diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 68523816b83..8cff9a23014 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -13,6 +13,8 @@ import ( "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/util/validation" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" @@ -221,6 +223,28 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOld(t *testing.T) { require.Equal(t, errBucketIndexTooOld, err) } +func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOldWithCustomerKeyError(t *testing.T) { + t.Parallel() + + const userID = "user-1" + + ctx := context.Background() + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + finder := prepareBucketIndexBlocksFinder(t, bkt) + + require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{ + Version: bucketindex.IndexVersion1, + Blocks: bucketindex.Blocks{}, + BlockDeletionMarks: bucketindex.BlockDeletionMarks{}, + UpdatedAt: time.Now().Add(-2 * time.Hour).Unix(), + })) + + bucketindex.WriteSyncStatus(ctx, bkt, userID, bucketindex.CustomerManagedKeyError, log.NewNopLogger()) + + _, _, err := finder.GetBlocks(ctx, userID, 10, 20) + require.Equal(t, validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error()), err) +} + func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIndexBlocksFinder { ctx := context.Background() cfg := BucketIndexBlocksFinderConfig{ diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index 2961b864057..f3296df8dad 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -91,7 +91,7 @@ func NewLoader(cfg LoaderConfig, bucketClient objstore.Bucket, cfgProvider bucke // GetIndex returns the bucket index for the given user. It returns the in-memory cached // index if available, or load it from the bucket otherwise. -func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { +func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, SyncStatus, error) { l.indexesMx.RLock() if entry := l.indexes[userID]; entry != nil { idx := entry.index @@ -101,7 +101,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { // We don't check if the index is stale because it's the responsibility // of the background job to keep it updated. entry.requestedAt.Store(time.Now().Unix()) - return idx, err + return idx, entry.syncStatus, err } l.indexesMx.RUnlock() @@ -111,7 +111,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { if err != nil { // Cache the error, to avoid hammering the object store in case of persistent issues // (eg. corrupted bucket index or not existing). - l.cacheIndex(userID, nil, err) + l.cacheIndex(userID, nil, Unknown, err) if errors.Is(err, ErrIndexNotFound) { level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) @@ -124,25 +124,31 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { level.Error(l.logger).Log("msg", "unable to load bucket index", "user", userID, "err", err) } - return nil, err + return nil, Unknown, err + } + + ss, err := ReadSyncStatus(ctx, l.bkt, userID, l.logger) + + if err != nil { + level.Warn(l.logger).Log("msg", "unable to read bucket index status", "user", userID, "err", err) } // Cache the index. - l.cacheIndex(userID, idx, nil) + l.cacheIndex(userID, idx, ss, nil) elapsedTime := time.Since(startTime) l.loadDuration.Observe(elapsedTime.Seconds()) level.Info(l.logger).Log("msg", "loaded bucket index", "user", userID, "duration", elapsedTime) - return idx, nil + return idx, ss, nil } -func (l *Loader) cacheIndex(userID string, idx *Index, err error) { +func (l *Loader) cacheIndex(userID string, idx *Index, ss SyncStatus, err error) { l.indexesMx.Lock() defer l.indexesMx.Unlock() // Not an issue if, due to concurrency, another index was already cached // and we overwrite it: last will win. - l.indexes[userID] = newCachedIndex(idx, err) + l.indexes[userID] = newCachedIndex(idx, ss, err) } // checkCachedIndexes checks all cached indexes and, for each of them, does two things: @@ -240,8 +246,9 @@ func (l *Loader) countLoadedIndexesMetric() float64 { type cachedIndex struct { // We cache either the index or the error occurred while fetching it. They're // mutually exclusive. - index *Index - err error + index *Index + syncStatus SyncStatus + err error // Unix timestamp (seconds) of when the index has been updated from the storage the last time. updatedAt atomic.Int64 @@ -250,10 +257,11 @@ type cachedIndex struct { requestedAt atomic.Int64 } -func newCachedIndex(idx *Index, err error) *cachedIndex { +func newCachedIndex(idx *Index, ss SyncStatus, err error) *cachedIndex { entry := &cachedIndex{ - index: idx, - err: err, + index: idx, + err: err, + syncStatus: ss, } now := time.Now() diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index e73ca0de1e9..1cd9af0022a 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -65,7 +65,7 @@ func TestLoader_GetIndex_ShouldLazyLoadBucketIndex(t *testing.T) { // Request the index multiple times. for i := 0; i < 10; i++ { - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) } @@ -105,7 +105,7 @@ func TestLoader_GetIndex_ShouldCacheError(t *testing.T) { // Request the index multiple times. for i := 0; i < 10; i++ { - _, err := loader.GetIndex(ctx, "user-1") + _, _, err := loader.GetIndex(ctx, "user-1") require.Equal(t, ErrIndexCorrupted, err) } @@ -141,7 +141,7 @@ func TestLoader_GetIndex_ShouldCacheIndexNotFoundError(t *testing.T) { // Request the index multiple times. for i := 0; i < 10; i++ { - _, err := loader.GetIndex(ctx, "user-1") + _, _, err := loader.GetIndex(ctx, "user-1") require.Equal(t, ErrIndexNotFound, err) } @@ -193,7 +193,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -203,14 +203,14 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadSuccess(t *testing.T) // Wait until the index has been updated in background. test.Poll(t, 3*time.Second, 2, func() interface{} { - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") if err != nil { return 0 } return len(actualIdx.Blocks) }) - actualIdx, err = loader.GetIndex(ctx, "user-1") + actualIdx, _, err = loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -250,7 +250,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T) require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - _, err := loader.GetIndex(ctx, "user-1") + _, _, err := loader.GetIndex(ctx, "user-1") assert.Equal(t, ErrIndexCorrupted, err) // Upload the bucket index. @@ -266,11 +266,11 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousLoadFailure(t *testing.T) // Wait until the index has been updated in background. test.Poll(t, 3*time.Second, nil, func() interface{} { - _, err := loader.GetIndex(ctx, "user-1") + _, _, err := loader.GetIndex(ctx, "user-1") return err }) - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -303,7 +303,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousIndexNotFound(t *testing. require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - _, err := loader.GetIndex(ctx, "user-1") + _, _, err := loader.GetIndex(ctx, "user-1") assert.Equal(t, ErrIndexNotFound, err) // Upload the bucket index. @@ -319,11 +319,11 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousIndexNotFound(t *testing. // Wait until the index has been updated in background. test.Poll(t, 3*time.Second, nil, func() interface{} { - _, err := loader.GetIndex(ctx, "user-1") + _, _, err := loader.GetIndex(ctx, "user-1") return err }) - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -367,7 +367,7 @@ func TestLoader_ShouldNotCacheCriticalErrorOnBackgroundUpdates(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -379,7 +379,7 @@ func TestLoader_ShouldNotCacheCriticalErrorOnBackgroundUpdates(t *testing.T) { return testutil.ToFloat64(loader.loadFailures) > 0 }) - actualIdx, err = loader.GetIndex(ctx, "user-1") + actualIdx, _, err = loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -423,7 +423,7 @@ func TestLoader_ShouldCacheIndexNotFoundOnBackgroundUpdates(t *testing.T) { require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -447,7 +447,7 @@ func TestLoader_ShouldCacheIndexNotFoundOnBackgroundUpdates(t *testing.T) { // Try to get the index again. We expect no load attempt because the error has been cached. prevLoads = testutil.ToFloat64(loader.loadAttempts) - actualIdx, err = loader.GetIndex(ctx, "user-1") + actualIdx, _, err = loader.GetIndex(ctx, "user-1") assert.Equal(t, ErrIndexNotFound, err) assert.Nil(t, actualIdx) assert.Equal(t, prevLoads, testutil.ToFloat64(loader.loadAttempts)) @@ -483,7 +483,7 @@ func TestLoader_ShouldOffloadIndexIfNotFoundDuringBackgroundUpdates(t *testing.T require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -495,7 +495,7 @@ func TestLoader_ShouldOffloadIndexIfNotFoundDuringBackgroundUpdates(t *testing.T return testutil.ToFloat64(loader.loaded) }) - _, err = loader.GetIndex(ctx, "user-1") + _, _, err = loader.GetIndex(ctx, "user-1") require.Equal(t, ErrIndexNotFound, err) // Ensure metrics have been updated accordingly. @@ -538,7 +538,7 @@ func TestLoader_ShouldOffloadIndexIfIdleTimeoutIsReachedDuringBackgroundUpdates( require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -561,7 +561,7 @@ func TestLoader_ShouldOffloadIndexIfIdleTimeoutIsReachedDuringBackgroundUpdates( )) // Load it again. - actualIdx, err = loader.GetIndex(ctx, "user-1") + actualIdx, _, err = loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -602,7 +602,7 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - _, err := loader.GetIndex(ctx, user) + _, _, err := loader.GetIndex(ctx, user) require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) // Check cached @@ -623,13 +623,13 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing // Wait until the index has been updated in background. test.Poll(t, 3*time.Second, nil, func() interface{} { - _, err := loader.GetIndex(ctx, "user-1") + _, _, err := loader.GetIndex(ctx, "user-1") // Check cached require.NoError(t, loader.checkCachedIndexes(ctx)) return err }) - actualIdx, err := loader.GetIndex(ctx, "user-1") + actualIdx, _, err := loader.GetIndex(ctx, "user-1") require.NoError(t, err) assert.Equal(t, idx, actualIdx) @@ -668,7 +668,7 @@ func TestLoader_GetIndex_ShouldCacheKeyDeniedErrors(t *testing.T) { // Request the index multiple times. for i := 0; i < 10; i++ { - _, err := loader.GetIndex(ctx, "user-1") + _, _, err := loader.GetIndex(ctx, "user-1") require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) } diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index 5e66a1357bc..9484393ec84 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -5,8 +5,11 @@ import ( "compress/gzip" "context" "encoding/json" + "io" + "time" "github.com/go-kit/log" + "github.com/go-kit/log/level" "github.com/pkg/errors" "github.com/thanos-io/objstore" @@ -17,11 +20,38 @@ import ( "github.com/cortexproject/cortex/pkg/util/runutil" ) +// SyncStatus is an enum for the possibles sync status. +type SyncStatus string + +// Possible MatchTypes. +const ( + Ok SyncStatus = "Ok" + GenericError SyncStatus = "GenericError" + CustomerManagedKeyError SyncStatus = "CustomerManagedKeyError" + Unknown SyncStatus = "Unknown" +) + +const ( + // SyncStatusFile is the known json filename for representing the most recent bucket index sync. + SyncStatusFile = "bucket-index-sync-status.json" + // SyncStatusFileVersion is the current supported version of bucket-index-sync-status.json file. + SyncStatusFileVersion = 1 +) + var ( ErrIndexNotFound = errors.New("bucket index not found") ErrIndexCorrupted = errors.New("bucket index corrupted") ) +type status struct { + // SyncTime is a unix timestamp of when the bucket index was synced + SyncTime int64 `json:"syncTime"` + // Version of the file. + Version int `json:"version"` + // Last Sync status + Status SyncStatus `json:"status"` +} + // ReadIndex reads, parses and returns a bucket index from the bucket. func ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvider bucket.TenantConfigProvider, logger log.Logger) (*Index, error) { userBkt := bucket.NewUserBucketClient(userID, bkt, cfgProvider) @@ -99,3 +129,75 @@ func DeleteIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgPro } return nil } + +// DeleteIndexSyncStatus deletes the bucket index sync status file from the storage. No error is returned if the file +// does not exist. +func DeleteIndexSyncStatus(ctx context.Context, bkt objstore.Bucket, userID string) error { + // Inject the user/tenant prefix. + bkt = bucket.NewPrefixedBucketClient(bkt, userID) + + err := bkt.Delete(ctx, SyncStatusFile) + if err != nil && !bkt.IsObjNotFoundErr(err) { + return errors.Wrap(err, "delete bucket index") + } + return nil +} + +// WriteSyncStatus upload the sync status file with the corresponding SyncStatus +// This file is not encrypted using the CMK configuration +func WriteSyncStatus(ctx context.Context, bkt objstore.Bucket, userID string, ss SyncStatus, logger log.Logger) { + // Inject the user/tenant prefix. + bkt = bucket.NewPrefixedBucketClient(bkt, userID) + + s := status{ + SyncTime: time.Now().Unix(), + Status: ss, + Version: SyncStatusFileVersion, + } + + // Marshal the index. + content, err := json.Marshal(s) + if err != nil { + level.Warn(logger).Log("msg", "failed to write bucket index status", "err", err) + return + } + + // Upload sync stats. + if err := bkt.Upload(ctx, SyncStatusFile, bytes.NewReader(content)); err != nil { + level.Warn(logger).Log("msg", "failed to upload index sync status", "err", err) + } +} + +// ReadSyncStatus retrieves the SyncStatus from the sync status file +// If the file is not found, it returns `Unknown` +func ReadSyncStatus(ctx context.Context, b objstore.Bucket, userID string, logger log.Logger) (SyncStatus, error) { + // Inject the user/tenant prefix. + bkt := bucket.NewPrefixedBucketClient(b, userID) + + reader, err := bkt.WithExpectedErrs(bkt.IsObjNotFoundErr).Get(ctx, SyncStatusFile) + + if err != nil { + if bkt.IsObjNotFoundErr(err) { + return Unknown, nil + } + return Unknown, err + } + + defer runutil.CloseWithLogOnErr(logger, reader, "close sync status reader") + + content, err := io.ReadAll(reader) + + if err != nil { + return Unknown, err + } + + s := status{} + if err = json.Unmarshal(content, &s); err != nil { + return Unknown, errors.Wrap(err, "error unmarshalling sync status") + } + if s.Version != SyncStatusFileVersion { + return Unknown, errors.New("bucket index sync version mismatch") + } + + return s.Status, nil +} diff --git a/pkg/storage/tsdb/caching_bucket.go b/pkg/storage/tsdb/caching_bucket.go index b468e31d533..293fe92dc48 100644 --- a/pkg/storage/tsdb/caching_bucket.go +++ b/pkg/storage/tsdb/caching_bucket.go @@ -136,7 +136,7 @@ func CreateCachingBucket(chunksConfig ChunksCacheConfig, metadataConfig Metadata cfg.CacheGet("metafile", metadataCache, isMetaFile, metadataConfig.MetafileMaxSize, metadataConfig.MetafileContentTTL, metadataConfig.MetafileExistsTTL, metadataConfig.MetafileDoesntExistTTL) cfg.CacheAttributes("metafile", metadataCache, isMetaFile, metadataConfig.MetafileAttributesTTL) cfg.CacheAttributes("block-index", metadataCache, isBlockIndexFile, metadataConfig.BlockIndexAttributesTTL) - cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFile, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0) + cfg.CacheGet("bucket-index", metadataCache, isBucketIndexFiles, metadataConfig.BucketIndexMaxSize, metadataConfig.BucketIndexContentTTL /* do not cache exist / not exist: */, 0, 0) codec := snappyIterCodec{storecache.JSONIterCodec{}} cfg.CacheIter("tenants-iter", metadataCache, isTenantsDir, metadataConfig.TenantsListTTL, codec) @@ -196,9 +196,9 @@ func isBlockIndexFile(name string) bool { return err == nil } -func isBucketIndexFile(name string) bool { +func isBucketIndexFiles(name string) bool { // TODO can't reference bucketindex because of a circular dependency. To be fixed. - return strings.HasSuffix(name, "/bucket-index.json.gz") + return strings.HasSuffix(name, "/bucket-index.json.gz") || strings.HasSuffix(name, "/bucket-index-sync-status.json") } func isTenantsDir(name string) bool { diff --git a/pkg/storage/tsdb/caching_bucket_test.go b/pkg/storage/tsdb/caching_bucket_test.go index 0ce401991c2..e2fdc395a60 100644 --- a/pkg/storage/tsdb/caching_bucket_test.go +++ b/pkg/storage/tsdb/caching_bucket_test.go @@ -17,11 +17,12 @@ func TestIsTenantDir(t *testing.T) { } func TestIsBucketIndexFile(t *testing.T) { - assert.False(t, isBucketIndexFile("")) - assert.False(t, isBucketIndexFile("test")) - assert.False(t, isBucketIndexFile("test/block")) - assert.False(t, isBucketIndexFile("test/block/chunks")) - assert.True(t, isBucketIndexFile("test/bucket-index.json.gz")) + assert.False(t, isBucketIndexFiles("")) + assert.False(t, isBucketIndexFiles("test")) + assert.False(t, isBucketIndexFiles("test/block")) + assert.False(t, isBucketIndexFiles("test/block/chunks")) + assert.True(t, isBucketIndexFiles("test/bucket-index.json.gz")) + assert.True(t, isBucketIndexFiles("test/bucket-index-sync-status.json")) } func TestIsBlockIndexFile(t *testing.T) { From 28142a5147335adc546cdc7a3a285afe80cbcca6 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Fri, 7 Jul 2023 11:06:09 -0700 Subject: [PATCH 2/2] adding gs --- pkg/compactor/blocks_cleaner.go | 30 +++++++- pkg/compactor/blocks_cleaner_test.go | 46 +++++++---- pkg/compactor/compactor.go | 8 ++ pkg/compactor/compactor_test.go | 39 ++++++++++ pkg/ingester/ingester.go | 9 +++ pkg/ingester/ingester_test.go | 76 ++++++++++++------- pkg/querier/blocks_finder_bucket_index.go | 7 +- .../blocks_finder_bucket_index_test.go | 41 ++++++++-- pkg/storage/tsdb/bucketindex/loader.go | 14 ++-- pkg/storage/tsdb/bucketindex/storage.go | 46 ++++++----- pkg/storage/tsdb/testutil/objstore.go | 2 +- 11 files changed, 237 insertions(+), 81 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 5f519e235e9..80287667660 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -326,17 +326,40 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b } } + // Reading bucket index sync stats + idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, userLogger) + + if err != nil { + level.Warn(userLogger).Log("msg", "error reading the bucket index status", "err", err) + idxs = bucketindex.Status{Version: bucketindex.SyncStatusFileVersion, NonQueryableReason: bucketindex.Unknown} + } + + idxs.Status = bucketindex.Ok + idxs.SyncTime = time.Now().Unix() + // Read the bucket index. idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger) + + defer func() { + bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger) + }() + if errors.Is(err, bucketindex.ErrIndexCorrupted) { level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it") } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { // Give up cleaning if we get access denied level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err) - bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.CustomerManagedKeyError, userLogger) + idxs.Status = bucketindex.CustomerManagedKeyError + // Making the tenant non queryable until 2x the cleanup interval to give time to compactors and storegateways + // to reload the bucket index in case the key access is re-granted + idxs.NonQueryableUntil = time.Now().Add(2 * c.cfg.CleanupInterval).Unix() + idxs.NonQueryableReason = bucketindex.CustomerManagedKeyError + + // Update the bucket index update time + c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() return nil } else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) { - bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.GenericError, userLogger) + idxs.Status = bucketindex.GenericError return err } @@ -355,7 +378,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger) idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx) if err != nil { - bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.GenericError, userLogger) + idxs.Status = bucketindex.GenericError return err } @@ -406,7 +429,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction)) c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime() c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials))) - bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, bucketindex.Ok, userLogger) return nil } diff --git a/pkg/compactor/blocks_cleaner_test.go b/pkg/compactor/blocks_cleaner_test.go index ce50882238f..6391e243bbe 100644 --- a/pkg/compactor/blocks_cleaner_test.go +++ b/pkg/compactor/blocks_cleaner_test.go @@ -4,7 +4,6 @@ import ( "context" "crypto/rand" "fmt" - "os" "path" "strings" "testing" @@ -58,14 +57,14 @@ func TestBlocksCleaner(t *testing.T) { func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { const userID = "user-1" - bucketClient, dir := cortex_testutil.PrepareFilesystemBucket(t) - bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient) + bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) + bkt = bucketindex.BucketWithGlobalMarkers(bkt) // Create blocks. ctx := context.Background() deletionDelay := 12 * time.Hour - bucketClient = &cortex_testutil.MockBucketFailure{ - Bucket: bucketClient, + mbucket := &cortex_testutil.MockBucketFailure{ + Bucket: bkt, GetFailures: map[string]error{ path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, }, @@ -78,18 +77,37 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) { } logger := log.NewNopLogger() - scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger) + scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger) cfgProvider := newMockConfigProvider() - cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil) - err := cleaner.cleanUser(ctx, userID, true) + cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil) + + // Clean User with no error + cleaner.bucketClient = bkt + err := cleaner.cleanUser(ctx, userID, false) require.NoError(t, err) - stat, err := os.Stat(path.Join(dir, userID, bucketindex.SyncStatusFile)) + s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger) require.NoError(t, err) - require.True(t, stat.Size() > 0) - s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger) + require.Equal(t, bucketindex.Ok, s.Status) + require.Equal(t, int64(0), s.NonQueryableUntil) + + // Clean with cmk error + cleaner.bucketClient = mbucket + err = cleaner.cleanUser(ctx, userID, false) + require.NoError(t, err) + s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger) + require.NoError(t, err) + require.Equal(t, bucketindex.CustomerManagedKeyError, s.Status) + require.Less(t, int64(0), s.NonQueryableUntil) + + // Re grant access to the key + cleaner.bucketClient = bkt + err = cleaner.cleanUser(ctx, userID, false) + require.NoError(t, err) + s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger) require.NoError(t, err) - require.Equal(t, bucketindex.CustomerManagedKeyError, s) + require.Equal(t, bucketindex.Ok, s.Status) + require.Less(t, int64(0), s.NonQueryableUntil) } func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) { @@ -241,7 +259,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs()) s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger) require.NoError(t, err) - require.Equal(t, bucketindex.Ok, s) + require.Equal(t, bucketindex.Ok, s.Status) } assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(` @@ -397,7 +415,7 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) { assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs()) s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger) require.NoError(t, err) - require.Equal(t, bucketindex.Ok, s) + require.Equal(t, bucketindex.Ok, s.Status) } func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) { diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 2dab64860db..edf884f4935 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -675,6 +675,14 @@ func (c *Compactor) compactUsers(ctx context.Context) { continue } + if idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, util_log.WithUserID(userID, c.logger)); err == nil { + if idxs.Status == bucketindex.CustomerManagedKeyError { + c.compactionRunSkippedTenants.Inc() + level.Info(c.logger).Log("msg", "skipping compactUser due CustomerManagedKeyError", "user", userID) + continue + } + } + ownedUsers[userID] = struct{}{} if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil { diff --git a/pkg/compactor/compactor_test.go b/pkg/compactor/compactor_test.go index 6b4721c130c..8f82c27bb24 100644 --- a/pkg/compactor/compactor_test.go +++ b/pkg/compactor/compactor_test.go @@ -151,6 +151,38 @@ func TestConfig_Validate(t *testing.T) { } } +func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) { + t.Parallel() + userID := "user-1" + + ss := bucketindex.Status{Status: bucketindex.CustomerManagedKeyError, Version: bucketindex.SyncStatusFileVersion} + content, err := json.Marshal(ss) + require.NoError(t, err) + + // No user blocks stored in the bucket. + bucketClient := &bucket.ClientMock{} + bucketClient.MockIter("", []string{userID}, nil) + bucketClient.MockIter(userID+"/", []string{}, nil) + bucketClient.MockIter(userID+"/markers/", nil, nil) + bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil) + bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil) + bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil) + bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil) + bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil) + + cfg := prepareConfig() + c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), c)) + + // Wait until a run has completed. + cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} { + return prom_testutil.ToFloat64(c.compactionRunsCompleted) + }) + + require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c)) + assert.Contains(t, strings.Split(strings.TrimSpace(logs.String()), "\n"), `level=info component=compactor msg="skipping compactUser due CustomerManagedKeyError" user=user-1`) +} + func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) { t.Parallel() @@ -535,6 +567,8 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) { bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", "", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil) + bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockIter("user-2/markers/", nil, nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) @@ -676,6 +710,7 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) { bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil) bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) @@ -799,6 +834,8 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) { bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil) + bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil) bucketClient.MockIter("user-1/markers/", nil, nil) bucketClient.MockIter("user-2/markers/", nil, nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) @@ -1031,6 +1068,8 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", nil) bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil) bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil) + bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil) + bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil) bucketClient.MockUpload("user-1/bucket-index.json.gz", nil) bucketClient.MockUpload("user-2/bucket-index.json.gz", nil) bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index d1fbb91a269..bb504d41c27 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -16,6 +16,8 @@ import ( "github.com/prometheus/prometheus/config" "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/status" @@ -2289,6 +2291,13 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants) } defer userDB.casState(activeShipping, active) + if idxs, err := bucketindex.ReadSyncStatus(ctx, i.TSDBState.bucket, userID, logutil.WithContext(ctx, i.logger)); err == nil { + if idxs.Status == bucketindex.CustomerManagedKeyError { + level.Info(logutil.WithContext(ctx, i.logger)).Log("msg", "skipping shipping blocks due CustomerManagedKeyError", "user", userID) + return nil + } + } + uploaded, err := userDB.shipper.Sync(ctx) if err != nil { level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err) diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 21d7a5f35f4..2b6f7e02019 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -40,6 +40,8 @@ import ( "golang.org/x/sync/errgroup" "google.golang.org/grpc" + "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" + "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/encoding" "github.com/cortexproject/cortex/pkg/cortexpb" @@ -2642,40 +2644,60 @@ func TestIngester_OpenExistingTSDBOnStartup(t *testing.T) { } func TestIngester_shipBlocks(t *testing.T) { - cfg := defaultIngesterTestConfig(t) - cfg.LifecyclerConfig.JoinAfter = 0 - cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2 + testCases := map[string]struct { + ss bucketindex.Status + expectetNumberOfCall int + }{ + "should ship blocks if status ok": { + ss: bucketindex.Status{Version: bucketindex.IndexVersion1, Status: bucketindex.Ok}, + expectetNumberOfCall: 1, + }, + "should not ship on cmk errors": { + ss: bucketindex.Status{Version: bucketindex.IndexVersion1, Status: bucketindex.CustomerManagedKeyError}, + expectetNumberOfCall: 0, + }, + } - // Create ingester - i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) - require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) - defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { - // Wait until it's ACTIVE - test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { - return i.lifecycler.GetState() - }) + cfg := defaultIngesterTestConfig(t) + cfg.LifecyclerConfig.JoinAfter = 0 + cfg.BlocksStorageConfig.TSDB.ShipConcurrency = 2 - // Create the TSDB for 3 users and then replace the shipper with the mocked one - mocks := []*shipperMock{} - for _, userID := range []string{"user-1", "user-2", "user-3"} { - userDB, err := i.getOrCreateTSDB(userID, false) - require.NoError(t, err) - require.NotNil(t, userDB) + // Create ingester + i, err := prepareIngesterWithBlocksStorage(t, cfg, nil) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), i)) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck - m := &shipperMock{} - m.On("Sync", mock.Anything).Return(0, nil) - mocks = append(mocks, m) + // Wait until it's ACTIVE + test.Poll(t, 1*time.Second, ring.ACTIVE, func() interface{} { + return i.lifecycler.GetState() + }) - userDB.shipper = m - } + // Create the TSDB for 3 users and then replace the shipper with the mocked one + mocks := []*shipperMock{} + for _, userID := range []string{"user-1", "user-2", "user-3"} { + bucketindex.WriteSyncStatus(context.Background(), i.TSDBState.bucket, userID, tc.ss, log.NewNopLogger()) + userDB, err := i.getOrCreateTSDB(userID, false) + require.NoError(t, err) + require.NotNil(t, userDB) - // Ship blocks and assert on the mocked shipper - i.shipBlocks(context.Background(), nil) + m := &shipperMock{} + m.On("Sync", mock.Anything).Return(0, nil) + mocks = append(mocks, m) - for _, m := range mocks { - m.AssertNumberOfCalls(t, "Sync", 1) + userDB.shipper = m + } + + // Ship blocks and assert on the mocked shipper + i.shipBlocks(context.Background(), nil) + + for _, m := range mocks { + m.AssertNumberOfCalls(t, "Sync", tc.expectetNumberOfCall) + } + }) } } diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index 260ca792d81..ca7c9a38c8b 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -63,13 +63,12 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, // This is a legit edge case, happening when a new tenant has not shipped blocks to the storage yet // so the bucket index hasn't been created yet. return nil, nil, nil - } - - if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { return nil, nil, validation.AccessDeniedError(err.Error()) } - if ss == bucketindex.CustomerManagedKeyError { + // Short circuit when bucket failed to be updated due CMK errors recently + if time.Since(ss.GetNonQueryableUntil()) < 0 && ss.NonQueryableReason == bucketindex.CustomerManagedKeyError { return nil, nil, validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error()) } diff --git a/pkg/querier/blocks_finder_bucket_index_test.go b/pkg/querier/blocks_finder_bucket_index_test.go index 8cff9a23014..c9fac029a11 100644 --- a/pkg/querier/blocks_finder_bucket_index_test.go +++ b/pkg/querier/blocks_finder_bucket_index_test.go @@ -230,19 +230,50 @@ func TestBucketIndexBlocksFinder_GetBlocks_BucketIndexIsTooOldWithCustomerKeyErr ctx := context.Background() bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) - finder := prepareBucketIndexBlocksFinder(t, bkt) require.NoError(t, bucketindex.WriteIndex(ctx, bkt, userID, nil, &bucketindex.Index{ Version: bucketindex.IndexVersion1, Blocks: bucketindex.Blocks{}, BlockDeletionMarks: bucketindex.BlockDeletionMarks{}, - UpdatedAt: time.Now().Add(-2 * time.Hour).Unix(), + UpdatedAt: time.Now().Unix(), })) - bucketindex.WriteSyncStatus(ctx, bkt, userID, bucketindex.CustomerManagedKeyError, log.NewNopLogger()) + testCases := map[string]struct { + err error + ss bucketindex.Status + }{ + "should return AccessDeniedError when CustomerManagedKeyError and still not queryable": { + err: validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error()), + ss: bucketindex.Status{ + Version: bucketindex.SyncStatusFileVersion, + SyncTime: time.Now().Unix(), + Status: bucketindex.Ok, + NonQueryableReason: bucketindex.CustomerManagedKeyError, + NonQueryableUntil: time.Now().Add(time.Minute * 10).Unix(), + }, + }, + "should not return error after NonQueryableUntil": { + ss: bucketindex.Status{ + Version: bucketindex.SyncStatusFileVersion, + SyncTime: time.Now().Unix(), + Status: bucketindex.Ok, + NonQueryableReason: bucketindex.CustomerManagedKeyError, + NonQueryableUntil: time.Now().Add(-time.Minute * 10).Unix(), + }, + }, + "should not return error when UnknownStatus": { + ss: bucketindex.UnknownStatus, + }, + } - _, _, err := finder.GetBlocks(ctx, userID, 10, 20) - require.Equal(t, validation.AccessDeniedError(bucket.ErrCustomerManagedKeyAccessDenied.Error()), err) + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + bucketindex.WriteSyncStatus(ctx, bkt, userID, tc.ss, log.NewNopLogger()) + finder := prepareBucketIndexBlocksFinder(t, bkt) + _, _, err := finder.GetBlocks(ctx, userID, 10, 20) + require.Equal(t, tc.err, err) + }) + } } func prepareBucketIndexBlocksFinder(t testing.TB, bkt objstore.Bucket) *BucketIndexBlocksFinder { diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index f3296df8dad..085ef46298c 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -91,7 +91,7 @@ func NewLoader(cfg LoaderConfig, bucketClient objstore.Bucket, cfgProvider bucke // GetIndex returns the bucket index for the given user. It returns the in-memory cached // index if available, or load it from the bucket otherwise. -func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, SyncStatus, error) { +func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, Status, error) { l.indexesMx.RLock() if entry := l.indexes[userID]; entry != nil { idx := entry.index @@ -101,7 +101,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, SyncStatu // We don't check if the index is stale because it's the responsibility // of the background job to keep it updated. entry.requestedAt.Store(time.Now().Unix()) - return idx, entry.syncStatus, err + return idx, UnknownStatus, err } l.indexesMx.RUnlock() @@ -111,7 +111,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, SyncStatu if err != nil { // Cache the error, to avoid hammering the object store in case of persistent issues // (eg. corrupted bucket index or not existing). - l.cacheIndex(userID, nil, Unknown, err) + l.cacheIndex(userID, nil, UnknownStatus, err) if errors.Is(err, ErrIndexNotFound) { level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) @@ -124,7 +124,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, SyncStatu level.Error(l.logger).Log("msg", "unable to load bucket index", "user", userID, "err", err) } - return nil, Unknown, err + return nil, UnknownStatus, err } ss, err := ReadSyncStatus(ctx, l.bkt, userID, l.logger) @@ -142,7 +142,7 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, SyncStatu return idx, ss, nil } -func (l *Loader) cacheIndex(userID string, idx *Index, ss SyncStatus, err error) { +func (l *Loader) cacheIndex(userID string, idx *Index, ss Status, err error) { l.indexesMx.Lock() defer l.indexesMx.Unlock() @@ -247,7 +247,7 @@ type cachedIndex struct { // We cache either the index or the error occurred while fetching it. They're // mutually exclusive. index *Index - syncStatus SyncStatus + syncStatus Status err error // Unix timestamp (seconds) of when the index has been updated from the storage the last time. @@ -257,7 +257,7 @@ type cachedIndex struct { requestedAt atomic.Int64 } -func newCachedIndex(idx *Index, ss SyncStatus, err error) *cachedIndex { +func newCachedIndex(idx *Index, ss Status, err error) *cachedIndex { entry := &cachedIndex{ index: idx, err: err, diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index 9484393ec84..aeff2f8f63c 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -23,7 +23,7 @@ import ( // SyncStatus is an enum for the possibles sync status. type SyncStatus string -// Possible MatchTypes. +// Possible SyncStatus. const ( Ok SyncStatus = "Ok" GenericError SyncStatus = "GenericError" @@ -41,15 +41,29 @@ const ( var ( ErrIndexNotFound = errors.New("bucket index not found") ErrIndexCorrupted = errors.New("bucket index corrupted") + + UnknownStatus = Status{ + Version: SyncStatusFileVersion, + Status: Unknown, + NonQueryableReason: Unknown, + } ) -type status struct { +type Status struct { // SyncTime is a unix timestamp of when the bucket index was synced - SyncTime int64 `json:"syncTime"` + SyncTime int64 `json:"sync_ime"` // Version of the file. Version int `json:"version"` // Last Sync status Status SyncStatus `json:"status"` + // Should not allow query until this time + NonQueryableUntil int64 `json:"non_queryable_until"` + // Should not allow query until this time + NonQueryableReason SyncStatus `json:"non_queryable_reason"` +} + +func (s *Status) GetNonQueryableUntil() time.Time { + return time.Unix(s.NonQueryableUntil, 0) } // ReadIndex reads, parses and returns a bucket index from the bucket. @@ -145,18 +159,12 @@ func DeleteIndexSyncStatus(ctx context.Context, bkt objstore.Bucket, userID stri // WriteSyncStatus upload the sync status file with the corresponding SyncStatus // This file is not encrypted using the CMK configuration -func WriteSyncStatus(ctx context.Context, bkt objstore.Bucket, userID string, ss SyncStatus, logger log.Logger) { +func WriteSyncStatus(ctx context.Context, bkt objstore.Bucket, userID string, ss Status, logger log.Logger) { // Inject the user/tenant prefix. bkt = bucket.NewPrefixedBucketClient(bkt, userID) - s := status{ - SyncTime: time.Now().Unix(), - Status: ss, - Version: SyncStatusFileVersion, - } - // Marshal the index. - content, err := json.Marshal(s) + content, err := json.Marshal(ss) if err != nil { level.Warn(logger).Log("msg", "failed to write bucket index status", "err", err) return @@ -170,7 +178,7 @@ func WriteSyncStatus(ctx context.Context, bkt objstore.Bucket, userID string, ss // ReadSyncStatus retrieves the SyncStatus from the sync status file // If the file is not found, it returns `Unknown` -func ReadSyncStatus(ctx context.Context, b objstore.Bucket, userID string, logger log.Logger) (SyncStatus, error) { +func ReadSyncStatus(ctx context.Context, b objstore.Bucket, userID string, logger log.Logger) (Status, error) { // Inject the user/tenant prefix. bkt := bucket.NewPrefixedBucketClient(b, userID) @@ -178,9 +186,9 @@ func ReadSyncStatus(ctx context.Context, b objstore.Bucket, userID string, logge if err != nil { if bkt.IsObjNotFoundErr(err) { - return Unknown, nil + return UnknownStatus, nil } - return Unknown, err + return UnknownStatus, err } defer runutil.CloseWithLogOnErr(logger, reader, "close sync status reader") @@ -188,16 +196,16 @@ func ReadSyncStatus(ctx context.Context, b objstore.Bucket, userID string, logge content, err := io.ReadAll(reader) if err != nil { - return Unknown, err + return UnknownStatus, err } - s := status{} + s := Status{} if err = json.Unmarshal(content, &s); err != nil { - return Unknown, errors.Wrap(err, "error unmarshalling sync status") + return UnknownStatus, errors.Wrap(err, "error unmarshalling sync status") } if s.Version != SyncStatusFileVersion { - return Unknown, errors.New("bucket index sync version mismatch") + return UnknownStatus, errors.New("bucket index sync version mismatch") } - return s.Status, nil + return s, nil } diff --git a/pkg/storage/tsdb/testutil/objstore.go b/pkg/storage/tsdb/testutil/objstore.go index 353fba0f10e..115bba18853 100644 --- a/pkg/storage/tsdb/testutil/objstore.go +++ b/pkg/storage/tsdb/testutil/objstore.go @@ -110,5 +110,5 @@ func (m *MockBucketFailure) ReaderWithExpectedErrs(expectedFunc objstore.IsOpFai } func (m *MockBucketFailure) IsCustomerManagedKeyError(err error) bool { - return errors.Is(errors.Cause(err), ErrKeyAccessDeniedError) + return ErrKeyAccessDeniedError == err }