From 83c27b4d6cedc97725c578b9ba5b69a3d11517f6 Mon Sep 17 00:00:00 2001 From: Alan Protasio Date: Thu, 6 Jul 2023 17:11:23 -0700 Subject: [PATCH] no encrypting bucket index --- pkg/compactor/blocks_cleaner.go | 4 -- pkg/querier/blocks_finder_bucket_index.go | 6 --- pkg/storage/bucket/client.go | 2 - pkg/storage/tsdb/bucketindex/loader.go | 4 +- pkg/storage/tsdb/bucketindex/loader_test.go | 50 ----------------- pkg/storage/tsdb/bucketindex/storage.go | 6 --- pkg/storage/tsdb/bucketindex/storage_test.go | 16 ------ pkg/storage/tsdb/bucketindex/updater.go | 3 +- .../bucket_index_metadata_fetcher.go | 13 +---- .../bucket_index_metadata_fetcher_test.go | 48 ----------------- pkg/storegateway/bucket_stores.go | 54 ++----------------- pkg/storegateway/bucket_stores_test.go | 18 +------ 12 files changed, 8 insertions(+), 216 deletions(-) diff --git a/pkg/compactor/blocks_cleaner.go b/pkg/compactor/blocks_cleaner.go index 920155b2f37..3259944987c 100644 --- a/pkg/compactor/blocks_cleaner.go +++ b/pkg/compactor/blocks_cleaner.go @@ -325,10 +325,6 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b idx, err := bucketindex.ReadIndex(ctx, c.bucketClient, userID, c.cfgProvider, c.logger) if errors.Is(err, bucketindex.ErrIndexCorrupted) { level.Warn(userLogger).Log("msg", "found a corrupted bucket index, recreating it") - } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { - // Give up cleaning if we get access denied - level.Warn(userLogger).Log("msg", err.Error()) - return nil } else if err != nil && !errors.Is(err, bucketindex.ErrIndexNotFound) { return err } diff --git a/pkg/querier/blocks_finder_bucket_index.go b/pkg/querier/blocks_finder_bucket_index.go index ccf9e7b7728..3e819e1ff82 100644 --- a/pkg/querier/blocks_finder_bucket_index.go +++ b/pkg/querier/blocks_finder_bucket_index.go @@ -10,8 +10,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/thanos-io/objstore" - "github.com/cortexproject/cortex/pkg/util/validation" - "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/util/services" @@ -65,10 +63,6 @@ func (f *BucketIndexBlocksFinder) GetBlocks(ctx context.Context, userID string, return nil, nil, nil } - if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { - return nil, nil, validation.AccessDeniedError(err.Error()) - } - if err != nil { return nil, nil, err } diff --git a/pkg/storage/bucket/client.go b/pkg/storage/bucket/client.go index 852b4a7cc66..aee3296dc7d 100644 --- a/pkg/storage/bucket/client.go +++ b/pkg/storage/bucket/client.go @@ -40,8 +40,6 @@ var ( SupportedBackends = []string{S3, GCS, Azure, Swift, Filesystem} ErrUnsupportedStorageBackend = errors.New("unsupported storage backend") - - ErrCustomerManagedKeyAccessDenied = errors.New("access denied: customer key") ) // Config holds configuration for accessing long-term storage. diff --git a/pkg/storage/tsdb/bucketindex/loader.go b/pkg/storage/tsdb/bucketindex/loader.go index 2961b864057..0d2ab7cce33 100644 --- a/pkg/storage/tsdb/bucketindex/loader.go +++ b/pkg/storage/tsdb/bucketindex/loader.go @@ -115,8 +115,6 @@ func (l *Loader) GetIndex(ctx context.Context, userID string) (*Index, error) { if errors.Is(err, ErrIndexNotFound) { level.Warn(l.logger).Log("msg", "bucket index not found", "user", userID) - } else if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { - level.Warn(l.logger).Log("msg", "key access denied when reading bucket index", "user", userID) } else { // We don't track ErrIndexNotFound as failure because it's a legit case (eg. a tenant just // started to remote write and its blocks haven't uploaded to storage yet). @@ -198,7 +196,7 @@ func (l *Loader) updateCachedIndex(ctx context.Context, userID string) { l.loadAttempts.Inc() startTime := time.Now() idx, err := ReadIndex(readCtx, l.bkt, userID, l.cfgProvider, l.logger) - if err != nil && !errors.Is(err, ErrIndexNotFound) && !errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + if err != nil && !errors.Is(err, ErrIndexNotFound) { l.loadFailures.Inc() level.Warn(l.logger).Log("msg", "unable to update bucket index", "user", userID, "err", err) return diff --git a/pkg/storage/tsdb/bucketindex/loader_test.go b/pkg/storage/tsdb/bucketindex/loader_test.go index e73ca0de1e9..d5c6b3788ad 100644 --- a/pkg/storage/tsdb/bucketindex/loader_test.go +++ b/pkg/storage/tsdb/bucketindex/loader_test.go @@ -10,14 +10,11 @@ import ( "github.com/go-kit/log" "github.com/oklog/ulid" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/cortexproject/cortex/pkg/storage/bucket" - cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/test" @@ -602,9 +599,6 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) }) - _, err := loader.GetIndex(ctx, user) - require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) - // Check cached require.NoError(t, loader.checkCachedIndexes(ctx)) @@ -646,50 +640,6 @@ func TestLoader_ShouldUpdateIndexInBackgroundOnPreviousKeyAcessDenied(t *testing )) } -func TestLoader_GetIndex_ShouldCacheKeyDeniedErrors(t *testing.T) { - user := "user-1" - ctx := context.Background() - reg := prometheus.NewPedanticRegistry() - bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) - - bkt = &cortex_testutil.MockBucketFailure{ - Bucket: bkt, - GetFailures: map[string]error{ - path.Join(user, "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, - }, - } - - // Create the loader. - loader := NewLoader(prepareLoaderConfig(), bkt, nil, log.NewNopLogger(), reg) - require.NoError(t, services.StartAndAwaitRunning(ctx, loader)) - t.Cleanup(func() { - require.NoError(t, services.StopAndAwaitTerminated(ctx, loader)) - }) - - // Request the index multiple times. - for i := 0; i < 10; i++ { - _, err := loader.GetIndex(ctx, "user-1") - require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) - } - - // Ensure metrics have been updated accordingly. - assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP cortex_bucket_index_load_failures_total Total number of bucket index loading failures. - # TYPE cortex_bucket_index_load_failures_total counter - cortex_bucket_index_load_failures_total 0 - # HELP cortex_bucket_index_loaded Number of bucket indexes currently loaded in-memory. - # TYPE cortex_bucket_index_loaded gauge - cortex_bucket_index_loaded 0 - # HELP cortex_bucket_index_loads_total Total number of bucket index loading attempts. - # TYPE cortex_bucket_index_loads_total counter - cortex_bucket_index_loads_total 1 - `), - "cortex_bucket_index_loads_total", - "cortex_bucket_index_load_failures_total", - "cortex_bucket_index_loaded", - )) -} - func prepareLoaderConfig() LoaderConfig { return LoaderConfig{ CheckInterval: time.Minute, diff --git a/pkg/storage/tsdb/bucketindex/storage.go b/pkg/storage/tsdb/bucketindex/storage.go index 5e66a1357bc..426d22e2622 100644 --- a/pkg/storage/tsdb/bucketindex/storage.go +++ b/pkg/storage/tsdb/bucketindex/storage.go @@ -13,7 +13,6 @@ import ( "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/bucket" - cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" "github.com/cortexproject/cortex/pkg/util/runutil" ) @@ -32,11 +31,6 @@ func ReadIndex(ctx context.Context, bkt objstore.Bucket, userID string, cfgProvi if userBkt.IsObjNotFoundErr(err) { return nil, ErrIndexNotFound } - - if userBkt.IsCustomerManagedKeyError(err) { - return nil, cortex_errors.WithCause(bucket.ErrCustomerManagedKeyAccessDenied, err) - } - return nil, errors.Wrap(err, "read bucket index") } defer runutil.CloseWithLogOnErr(logger, reader, "close bucket index reader") diff --git a/pkg/storage/tsdb/bucketindex/storage_test.go b/pkg/storage/tsdb/bucketindex/storage_test.go index a3f28d8dbfd..c27e37aac4a 100644 --- a/pkg/storage/tsdb/bucketindex/storage_test.go +++ b/pkg/storage/tsdb/bucketindex/storage_test.go @@ -2,7 +2,6 @@ package bucketindex import ( "context" - "errors" "path" "strings" "testing" @@ -11,8 +10,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/cortexproject/cortex/pkg/storage/bucket" - "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil" ) @@ -39,19 +36,6 @@ func TestReadIndex_ShouldReturnErrorIfIndexIsCorrupted(t *testing.T) { require.Nil(t, idx) } -func TestReadIndex_ShouldReturnErrorIfKeyAccessDeniedErr(t *testing.T) { - bkt, _ := cortex_testutil.PrepareFilesystemBucket(t) - bkt = &cortex_testutil.MockBucketFailure{ - Bucket: bkt, - GetFailures: map[string]error{ - path.Join("user-1", "bucket-index.json.gz"): cortex_testutil.ErrKeyAccessDeniedError, - }, - } - idx, err := ReadIndex(context.Background(), bkt, "user-1", nil, log.NewNopLogger()) - require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) - require.Nil(t, idx) -} - func TestReadIndex_ShouldReturnTheParsedIndexOnSuccess(t *testing.T) { const userID = "user-1" diff --git a/pkg/storage/tsdb/bucketindex/updater.go b/pkg/storage/tsdb/bucketindex/updater.go index 64c84f06efc..07a1b3994b8 100644 --- a/pkg/storage/tsdb/bucketindex/updater.go +++ b/pkg/storage/tsdb/bucketindex/updater.go @@ -113,8 +113,7 @@ func (w *Updater) updateBlocks(ctx context.Context, old []*Block) (blocks []*Blo continue } if errors.Is(err, errBlockMetaKeyAccessDeniedErr) { - partials[id] = err - level.Warn(w.logger).Log("msg", "skipped partial block when updating bucket index due key permission", "block", id.String()) + level.Warn(w.logger).Log("msg", "skipped block when updating bucket index due key permission", "block", id.String()) continue } if errors.Is(err, ErrBlockMetaCorrupted) { diff --git a/pkg/storegateway/bucket_index_metadata_fetcher.go b/pkg/storegateway/bucket_index_metadata_fetcher.go index 7dc1ea9048d..3e9d6db2a00 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher.go @@ -19,7 +19,6 @@ import ( const ( corruptedBucketIndex = "corrupted-bucket-index" - keyAccessDenied = "key-access-denied" noBucketIndex = "no-bucket-index" ) @@ -68,7 +67,7 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. start := time.Now() defer func() { f.metrics.SyncDuration.Observe(time.Since(start).Seconds()) - if err != nil && !errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + if err != nil { f.metrics.SyncFailures.Inc() } }() @@ -95,16 +94,6 @@ func (f *BucketIndexMetadataFetcher) Fetch(ctx context.Context) (metas map[ulid. return nil, nil, nil } - if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { - // stop the job and return the error - // this error should be used to return Access Denied to the caller - level.Error(f.logger).Log("msg", "bucket index key permission revoked", "user", f.userID, "err", err) - f.metrics.Synced.WithLabelValues(keyAccessDenied).Set(1) - f.metrics.Submit() - - return nil, nil, err - } - if err != nil { f.metrics.Synced.WithLabelValues(block.FailedMeta).Set(1) f.metrics.Submit() diff --git a/pkg/storegateway/bucket_index_metadata_fetcher_test.go b/pkg/storegateway/bucket_index_metadata_fetcher_test.go index 129bd0c39f9..b7279fb018c 100644 --- a/pkg/storegateway/bucket_index_metadata_fetcher_test.go +++ b/pkg/storegateway/bucket_index_metadata_fetcher_test.go @@ -3,7 +3,6 @@ package storegateway import ( "bytes" "context" - "errors" "path" "strings" "testing" @@ -100,53 +99,6 @@ func TestBucketIndexMetadataFetcher_Fetch(t *testing.T) { )) } -func TestBucketIndexMetadataFetcher_Fetch_KeyPermissionDenied(t *testing.T) { - const userID = "user-1" - - bkt := &bucket.ClientMock{} - reg := prometheus.NewPedanticRegistry() - ctx := context.Background() - - bkt.MockGet(userID+"/bucket-index.json.gz", "c", bucket.ErrCustomerManagedKeyAccessDenied) - - fetcher := NewBucketIndexMetadataFetcher(userID, bkt, NewNoShardingStrategy(), nil, log.NewNopLogger(), reg, nil) - metas, _, err := fetcher.Fetch(ctx) - require.True(t, errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied)) - assert.Empty(t, metas) - - assert.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` - # HELP blocks_meta_modified Number of blocks whose metadata changed - # TYPE blocks_meta_modified gauge - blocks_meta_modified{modified="replica-label-removed"} 0 - # HELP blocks_meta_sync_failures_total Total blocks metadata synchronization failures - # TYPE blocks_meta_sync_failures_total counter - blocks_meta_sync_failures_total 0 - # HELP blocks_meta_synced Number of block metadata synced - # TYPE blocks_meta_synced gauge - blocks_meta_synced{state="corrupted-bucket-index"} 0 - blocks_meta_synced{state="corrupted-meta-json"} 0 - blocks_meta_synced{state="duplicate"} 0 - blocks_meta_synced{state="failed"} 0 - blocks_meta_synced{state="key-access-denied"} 1 - blocks_meta_synced{state="label-excluded"} 0 - blocks_meta_synced{state="loaded"} 0 - blocks_meta_synced{state="marked-for-deletion"} 0 - blocks_meta_synced{state="marked-for-no-compact"} 0 - blocks_meta_synced{state="no-bucket-index"} 0 - blocks_meta_synced{state="no-meta-json"} 0 - blocks_meta_synced{state="time-excluded"} 0 - blocks_meta_synced{state="too-fresh"} 0 - # HELP blocks_meta_syncs_total Total blocks metadata synchronization attempts - # TYPE blocks_meta_syncs_total counter - blocks_meta_syncs_total 1 - `), - "blocks_meta_modified", - "blocks_meta_sync_failures_total", - "blocks_meta_synced", - "blocks_meta_syncs_total", - )) -} - func TestBucketIndexMetadataFetcher_Fetch_NoBucketIndex(t *testing.T) { t.Parallel() const userID = "user-1" diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index 1a3b791445c..bb665da9bc3 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -29,13 +29,11 @@ import ( "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/logging" - "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "github.com/cortexproject/cortex/pkg/storage/bucket" "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/util/backoff" - cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/cortexproject/cortex/pkg/util/validation" @@ -68,10 +66,6 @@ type BucketStores struct { storesMu sync.RWMutex stores map[string]*store.BucketStore - // Keeps the last sync error for the bucket store for each tenant. - storesErrorsMu sync.RWMutex - storesErrors map[string]error - // Metrics. syncTimes prometheus.Histogram syncLastSuccess prometheus.Gauge @@ -101,7 +95,6 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra bucket: cachingBucket, shardingStrategy: shardingStrategy, stores: map[string]*store.BucketStore{}, - storesErrors: map[string]error{}, logLevel: logLevel, bucketStoreMetrics: NewBucketStoreMetrics(), metaFetcherMetrics: NewMetadataFetcherMetrics(), @@ -233,19 +226,9 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte for job := range jobs { if err := f(ctx, job.store); err != nil { - if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { - u.storesErrorsMu.Lock() - u.storesErrors[job.userID] = err - u.storesErrorsMu.Unlock() - } else { - errsMx.Lock() - errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID)) - errsMx.Unlock() - } - } else { - u.storesErrorsMu.Lock() - delete(u.storesErrors, job.userID) - u.storesErrorsMu.Unlock() + errsMx.Lock() + errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID)) + errsMx.Unlock() } } }() @@ -299,23 +282,14 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri } store := u.getStore(userID) - userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) if store == nil { return nil } - err := u.getStoreError(userID) - - if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) { - return httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) - } - - err = store.Series(req, spanSeriesServer{ + return store.Series(req, spanSeriesServer{ Store_SeriesServer: srv, ctx: spanCtx, }) - - return err } // LabelNames implements the Storegateway proto service. @@ -329,17 +303,10 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe } store := u.getStore(userID) - userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) if store == nil { return &storepb.LabelNamesResponse{}, nil } - err := u.getStoreError(userID) - - if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) { - return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) - } - resp, err := store.LabelNames(ctx, req) return resp, err @@ -356,17 +323,10 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues } store := u.getStore(userID) - userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) if store == nil { return &storepb.LabelValuesResponse{}, nil } - err := u.getStoreError(userID) - - if err != nil && cortex_errors.ErrorIs(err, userBkt.IsCustomerManagedKeyError) { - return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) - } - resp, err := store.LabelValues(ctx, req) return resp, err @@ -394,12 +354,6 @@ func (u *BucketStores) getStore(userID string) *store.BucketStore { return u.stores[userID] } -func (u *BucketStores) getStoreError(userID string) error { - u.storesErrorsMu.RLock() - defer u.storesErrorsMu.RUnlock() - return u.storesErrors[userID] -} - var ( errBucketStoreNotEmpty = errors.New("bucket store not empty") errBucketStoreNotFound = errors.New("bucket store not found") diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 4e82b885253..25ed399aadd 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -100,13 +100,7 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { mockInitialSync bool GetFailures map[string]error }{ - "should return ResourceExhausted when fail to get bucket index": { - mockInitialSync: true, - GetFailures: map[string]error{ - "user-1/bucket-index.json.gz": cortex_testutil.ErrKeyAccessDeniedError, - }, - }, - "should return ResourceExhausted when fail to block index": { + "should return PermissionDenied when fail to block index": { mockInitialSync: false, GetFailures: map[string]error{ "user-1/" + bucketIndexes["user-1"].Blocks[0].ID.String() + "/index": cortex_testutil.ErrKeyAccessDeniedError, @@ -131,15 +125,7 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { // Should set the error on user-1 require.NoError(t, stores.InitialSync(ctx)) - if tc.mockInitialSync { - require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) - } require.NoError(t, stores.SyncBlocks(context.Background())) - if tc.mockInitialSync { - require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyAccessDenied) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) - } mBucket.GetFailures = tc.GetFailures @@ -164,8 +150,6 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { // Cleaning the error mBucket.GetFailures = map[string]error{} require.NoError(t, stores.SyncBlocks(context.Background())) - require.ErrorIs(t, stores.storesErrors["user-1"], nil) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) _, _, err = querySeries(stores, "user-1", "series", 0, 100) require.NoError(t, err) _, _, err = querySeries(stores, "user-2", "series", 0, 100)