Skip to content

Implementing Bucket index sync status #33

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
if err := bucketindex.DeleteIndex(ctx, c.bucketClient, userID, c.cfgProvider); err != nil {
return err
}

// Delete the bucket sync status
if err := bucketindex.DeleteIndexSyncStatus(ctx, c.bucketClient, userID); err != nil {
return err
}
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)

var deletedBlocks, failed int
Expand Down Expand Up @@ -321,15 +326,40 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
}
}

// Reading bucket index sync stats
idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, userLogger)

if err != nil {
level.Warn(userLogger).Log("msg", "error reading the bucket index status", "err", err)
idxs = bucketindex.Status{Version: bucketindex.SyncStatusFileVersion, NonQueryableReason: bucketindex.Unknown}
}

idxs.Status = bucketindex.Ok
idxs.SyncTime = time.Now().Unix()

// Read the bucket index.
idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger)

defer func() {
bucketindex.WriteSyncStatus(ctx, c.bucketClient, userID, idxs, userLogger)
}()

if errors.Is(err, bucketindex.ErrIndexCorrupted) {
level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it")
} else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) {
// Give up cleaning if we get access denied
level.Warn(userLogger).Log("msg", err.Error())
level.Warn(userLogger).Log("msg", "customer manager key access denied", "err", err)
idxs.Status = bucketindex.CustomerManagedKeyError
// Making the tenant non queryable until 2x the cleanup interval to give time to compactors and storegateways
// to reload the bucket index in case the key access is re-granted
idxs.NonQueryableUntil = time.Now().Add(2 * c.cfg.CleanupInterval).Unix()
idxs.NonQueryableReason = bucketindex.CustomerManagedKeyError

// Update the bucket index update time
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
return nil
} else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) {
idxs.Status = bucketindex.GenericError
return err
}

Expand All @@ -348,6 +378,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
if err != nil {
idxs.Status = bucketindex.GenericError
return err
}

Expand Down Expand Up @@ -398,7 +429,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))

return nil
}

Expand Down
45 changes: 38 additions & 7 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func TestBlocksCleaner(t *testing.T) {
func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
const userID = "user-1"

bucketClient, _ := cortex_testutil.PrepareFilesystemBucket(t)
bucketClient = bucketindex.BucketWithGlobalMarkers(bucketClient)
bkt, _ := cortex_testutil.PrepareFilesystemBucket(t)
bkt = bucketindex.BucketWithGlobalMarkers(bkt)

// Create blocks.
ctx := context.Background()
deletionDelay := 12 * time.Hour
bucketClient = &cortex_testutil.MockBucketFailure{
Bucket: bucketClient,
mbucket := &cortex_testutil.MockBucketFailure{
Bucket: bkt,
GetFailures: map[string]error{
path.Join(userID, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError,
},
Expand All @@ -77,12 +77,37 @@ func TestBlockCleaner_KeyPermissionDenied(t *testing.T) {
}

logger := log.NewNopLogger()
scanner := tsdb.NewUsersScanner(bucketClient, tsdb.AllUsers, logger)
scanner := tsdb.NewUsersScanner(mbucket, tsdb.AllUsers, logger)
cfgProvider := newMockConfigProvider()

cleaner := NewBlocksCleaner(cfg, bucketClient, scanner, cfgProvider, logger, nil)
err := cleaner.cleanUser(ctx, userID, true)
cleaner := NewBlocksCleaner(cfg, mbucket, scanner, cfgProvider, logger, nil)

// Clean User with no error
cleaner.bucketClient = bkt
err := cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err := bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
require.Equal(t, int64(0), s.NonQueryableUntil)

// Clean with cmk error
cleaner.bucketClient = mbucket
err = cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.CustomerManagedKeyError, s.Status)
require.Less(t, int64(0), s.NonQueryableUntil)

// Re grant access to the key
cleaner.bucketClient = bkt
err = cleaner.cleanUser(ctx, userID, false)
require.NoError(t, err)
s, err = bucketindex.ReadSyncStatus(ctx, bkt, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
require.Less(t, int64(0), s.NonQueryableUntil)
}

func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions) {
Expand Down Expand Up @@ -232,6 +257,9 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
require.NoError(t, err)
assert.ElementsMatch(t, tc.expectedBlocks, idx.Blocks.GetULIDs())
assert.ElementsMatch(t, tc.expectedMarks, idx.BlockDeletionMarks.GetULIDs())
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, tc.userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
}

assert.NoError(t, prom_testutil.GatherAndCompare(reg, strings.NewReader(`
Expand Down Expand Up @@ -385,6 +413,9 @@ func TestBlocksCleaner_ShouldRebuildBucketIndexOnCorruptedOne(t *testing.T) {
require.NoError(t, err)
assert.ElementsMatch(t, []ulid.ULID{block1, block3}, idx.Blocks.GetULIDs())
assert.ElementsMatch(t, []ulid.ULID{block3}, idx.BlockDeletionMarks.GetULIDs())
s, err := bucketindex.ReadSyncStatus(ctx, bucketClient, userID, logger)
require.NoError(t, err)
require.Equal(t, bucketindex.Ok, s.Status)
}

func TestBlocksCleaner_ShouldRemoveMetricsForTenantsNotBelongingAnymoreToTheShard(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,14 @@ func (c *Compactor) compactUsers(ctx context.Context) {
continue
}

if idxs, err := bucketindex.ReadSyncStatus(ctx, c.bucketClient, userID, util_log.WithUserID(userID, c.logger)); err == nil {
if idxs.Status == bucketindex.CustomerManagedKeyError {
c.compactionRunSkippedTenants.Inc()
level.Info(c.logger).Log("msg", "skipping compactUser due CustomerManagedKeyError", "user", userID)
continue
}
}

ownedUsers[userID] = struct{}{}

if markedForDeletion, err := cortex_tsdb.TenantDeletionMarkExists(ctx, c.bucketClient, userID); err != nil {
Expand Down
50 changes: 50 additions & 0 deletions pkg/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,38 @@ func TestConfig_Validate(t *testing.T) {
}
}

func TestCompactor_SkipCompactionWhenCmkError(t *testing.T) {
t.Parallel()
userID := "user-1"

ss := bucketindex.Status{Status: bucketindex.CustomerManagedKeyError, Version: bucketindex.SyncStatusFileVersion}
content, err := json.Marshal(ss)
require.NoError(t, err)

// No user blocks stored in the bucket.
bucketClient := &bucket.ClientMock{}
bucketClient.MockIter("", []string{userID}, nil)
bucketClient.MockIter(userID+"/", []string{}, nil)
bucketClient.MockIter(userID+"/markers/", nil, nil)
bucketClient.MockGet(userID+"/bucket-index-sync-status.json", string(content), nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)

cfg := prepareConfig()
c, _, _, logs, _ := prepare(t, cfg, bucketClient, nil)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), c))

// Wait until a run has completed.
cortex_testutil.Poll(t, time.Second, 1.0, func() interface{} {
return prom_testutil.ToFloat64(c.compactionRunsCompleted)
})

require.NoError(t, services.StopAndAwaitTerminated(context.Background(), c))
assert.Contains(t, strings.Split(strings.TrimSpace(logs.String()), "\n"), `level=info component=compactor msg="skipping compactUser due CustomerManagedKeyError" user=user-1`)
}

func TestCompactor_ShouldDoNothingOnNoUserBlocks(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -473,6 +505,7 @@ func TestCompactor_ShouldIncrementCompactionErrorIfFailedToCompactASingleTenant(
bucketClient.MockUpload(userID+"/01FN6CDF3PNEWWRY5MPGJPE3EX/visit-mark.json", nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)

c, _, tsdbPlannerMock, _, registry := prepare(t, prepareConfig(), bucketClient, nil)
tsdbPlannerMock.On("Plan", mock.Anything, mock.Anything).Return([]*metadata.Meta{}, errors.New("Failed to plan"))
Expand Down Expand Up @@ -534,10 +567,14 @@ func TestCompactor_ShouldIterateOverUsersAndRunCompaction(t *testing.T) {
bucketClient.MockGet("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", "", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("user-2/markers/", nil, nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, prepareConfig(), bucketClient, nil)

Expand Down Expand Up @@ -673,7 +710,9 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForDeletion(t *testing.T) {
bucketClient.MockDelete("user-1/markers/01DTW0ZCPDDNV4BV83Q2SV4QAZ-deletion-mark.json", nil)
bucketClient.MockDelete("user-1/01DTW0ZCPDDNV4BV83Q2SV4QAZ", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)

Expand Down Expand Up @@ -795,10 +834,14 @@ func TestCompactor_ShouldNotCompactBlocksMarkedForSkipCompact(t *testing.T) {

bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockIter("user-1/markers/", nil, nil)
bucketClient.MockIter("user-2/markers/", nil, nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, _, registry := prepare(t, prepareConfig(), bucketClient, nil)

Expand Down Expand Up @@ -850,6 +893,7 @@ func TestCompactor_ShouldNotCompactBlocksForUsersMarkedForDeletion(t *testing.T)
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/meta.json", nil)
bucketClient.MockDelete("user-1/01DTVP434PA9VFXSW2JKB3392D/index", nil)
bucketClient.MockDelete("user-1/bucket-index.json.gz", nil)
bucketClient.MockDelete("user-1/bucket-index-sync-status.json", nil)

c, _, tsdbPlanner, logs, registry := prepare(t, cfg, bucketClient, nil)

Expand Down Expand Up @@ -1024,8 +1068,12 @@ func TestCompactor_ShouldCompactAllUsersOnShardingEnabledButOnlyOneInstanceRunni
bucketClient.MockUpload("user-2/01FN3V83ABR9992RF8WRJZ76ZQ/visit-mark.json", nil)
bucketClient.MockGet("user-1/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-2/bucket-index.json.gz", "", nil)
bucketClient.MockGet("user-1/bucket-index-sync-status.json", "", nil)
bucketClient.MockGet("user-2/bucket-index-sync-status.json", "", nil)
bucketClient.MockUpload("user-1/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-2/bucket-index.json.gz", nil)
bucketClient.MockUpload("user-1/bucket-index-sync-status.json", nil)
bucketClient.MockUpload("user-2/bucket-index-sync-status.json", nil)

ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
Expand Down Expand Up @@ -1107,6 +1155,7 @@ func TestCompactor_ShouldCompactOnlyUsersOwnedByTheInstanceOnShardingEnabledAndM
bucketClient.MockUpload(userID+"/01DTVP434PA9VFXSW2JKB3392D/visit-mark.json", nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
}

// Create a shared KV Store
Expand Down Expand Up @@ -1230,6 +1279,7 @@ func TestCompactor_ShouldCompactOnlyShardsOwnedByTheInstanceOnShardingEnabledWit
bucketClient.MockExists(path.Join(userID, cortex_tsdb.TenantDeletionMarkPath), false, nil)
bucketClient.MockGet(userID+"/bucket-index.json.gz", "", nil)
bucketClient.MockUpload(userID+"/bucket-index.json.gz", nil)
bucketClient.MockUpload(userID+"/bucket-index-sync-status.json", nil)
}

// Create a shared KV Store
Expand Down
9 changes: 9 additions & 0 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/tsdb/chunks"

"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/status"
Expand Down Expand Up @@ -2289,6 +2291,13 @@ func (i *Ingester) shipBlocks(ctx context.Context, allowed *util.AllowedTenants)
}
defer userDB.casState(activeShipping, active)

if idxs, err := bucketindex.ReadSyncStatus(ctx, i.TSDBState.bucket, userID, logutil.WithContext(ctx, i.logger)); err == nil {
if idxs.Status == bucketindex.CustomerManagedKeyError {
level.Info(logutil.WithContext(ctx, i.logger)).Log("msg", "skipping shipping blocks due CustomerManagedKeyError", "user", userID)
return nil
}
}

uploaded, err := userDB.shipper.Sync(ctx)
if err != nil {
level.Warn(logutil.WithContext(ctx, i.logger)).Log("msg", "shipper failed to synchronize TSDB blocks with the storage", "user", userID, "uploaded", uploaded, "err", err)
Expand Down
Loading