Skip to content

Commit e8c9f1d

Browse files
committed
fixing race
Signed-off-by: Alan Protasio <[email protected]>
1 parent 7ee5d1a commit e8c9f1d

File tree

2 files changed

+47
-23
lines changed

2 files changed

+47
-23
lines changed

pkg/storegateway/bucket_stores.go

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,11 @@ type BucketStores struct {
6565

6666
// Keeps a bucket store for each tenant.
6767
storesMu sync.RWMutex
68-
stores map[string]*BucketStoreWithLastError
68+
stores map[string]*store.BucketStore
69+
70+
// Keeps the last sync error for the bucket store for each tenant.
71+
storesErrorsMu sync.RWMutex
72+
storesErrors map[string]error
6973

7074
// Metrics.
7175
syncTimes prometheus.Histogram
@@ -100,7 +104,8 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra
100104
limits: limits,
101105
bucket: cachingBucket,
102106
shardingStrategy: shardingStrategy,
103-
stores: map[string]*BucketStoreWithLastError{},
107+
stores: map[string]*store.BucketStore{},
108+
storesErrors: map[string]error{},
104109
logLevel: logLevel,
105110
bucketStoreMetrics: NewBucketStoreMetrics(),
106111
metaFetcherMetrics: NewMetadataFetcherMetrics(),
@@ -198,7 +203,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
198203

199204
type job struct {
200205
userID string
201-
store *BucketStoreWithLastError
206+
store *store.BucketStore
202207
}
203208

204209
wg := &sync.WaitGroup{}
@@ -231,16 +236,20 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte
231236
defer wg.Done()
232237

233238
for job := range jobs {
234-
if err := f(ctx, job.store.BucketStore); err != nil {
239+
if err := f(ctx, job.store); err != nil {
235240
if errors.Is(err, bucket.ErrCustomerManagedKeyError) {
236-
job.store.err = err
241+
u.storesErrorsMu.Lock()
242+
u.storesErrors[job.userID] = err
243+
u.storesErrorsMu.Unlock()
237244
} else {
238245
errsMx.Lock()
239246
errs.Add(errors.Wrapf(err, "failed to synchronize TSDB blocks for user %s", job.userID))
240247
errsMx.Unlock()
241248
}
242249
} else {
243-
job.store.err = nil
250+
u.storesErrorsMu.Lock()
251+
delete(u.storesErrors, job.userID)
252+
u.storesErrorsMu.Unlock()
244253
}
245254
}
246255
}()
@@ -298,11 +307,13 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri
298307
return nil
299308
}
300309

301-
if store.err != nil && errors.Is(store.err, bucket.ErrCustomerManagedKeyError) {
302-
return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", store.err)
310+
err := u.getStoreError(userID)
311+
312+
if err != nil && errors.Is(err, bucket.ErrCustomerManagedKeyError) {
313+
return httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", err)
303314
}
304315

305-
err := store.Series(req, spanSeriesServer{
316+
err = store.Series(req, spanSeriesServer{
306317
Store_SeriesServer: srv,
307318
ctx: spanCtx,
308319
})
@@ -366,12 +377,18 @@ func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) {
366377
return users, err
367378
}
368379

369-
func (u *BucketStores) getStore(userID string) *BucketStoreWithLastError {
380+
func (u *BucketStores) getStore(userID string) *store.BucketStore {
370381
u.storesMu.RLock()
371382
defer u.storesMu.RUnlock()
372383
return u.stores[userID]
373384
}
374385

386+
func (u *BucketStores) getStoreError(userID string) error {
387+
u.storesErrorsMu.RLock()
388+
defer u.storesErrorsMu.RUnlock()
389+
return u.storesErrors[userID]
390+
}
391+
375392
var (
376393
errBucketStoreNotEmpty = errors.New("bucket store not empty")
377394
errBucketStoreNotFound = errors.New("bucket store not found")
@@ -409,7 +426,7 @@ func (u *BucketStores) closeEmptyBucketStore(userID string) error {
409426
return bs.Close()
410427
}
411428

412-
func isEmptyBucketStore(bs *BucketStoreWithLastError) bool {
429+
func isEmptyBucketStore(bs *store.BucketStore) bool {
413430
min, max := bs.TimeRange()
414431
return min == math.MaxInt64 && max == math.MinInt64
415432
}
@@ -418,7 +435,7 @@ func (u *BucketStores) syncDirForUser(userID string) string {
418435
return filepath.Join(u.cfg.BucketStore.SyncDir, userID)
419436
}
420437

421-
func (u *BucketStores) getOrCreateStore(userID string) (*BucketStoreWithLastError, error) {
438+
func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) {
422439
// Check if the store already exists.
423440
bs := u.getStore(userID)
424441
if bs != nil {
@@ -522,7 +539,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStoreWithLastErro
522539
bucketStoreOpts = append(bucketStoreOpts, store.WithDebugLogging())
523540
}
524541

525-
s, err := store.NewBucketStore(
542+
bs, err := store.NewBucketStore(
526543
userBkt,
527544
fetcher,
528545
u.syncDirForUser(userID),
@@ -542,10 +559,6 @@ func (u *BucketStores) getOrCreateStore(userID string) (*BucketStoreWithLastErro
542559
return nil, err
543560
}
544561

545-
bs = &BucketStoreWithLastError{
546-
BucketStore: s,
547-
}
548-
549562
u.stores[userID] = bs
550563
u.metaFetcherMetrics.AddUserRegistry(userID, fetcherReg)
551564
u.bucketStoreMetrics.AddUserRegistry(userID, bucketStoreReg)

pkg/storegateway/bucket_stores_test.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,10 @@ import (
2929
"github.com/thanos-io/thanos/pkg/store"
3030
"github.com/thanos-io/thanos/pkg/store/labelpb"
3131
"github.com/thanos-io/thanos/pkg/store/storepb"
32+
"github.com/weaveworks/common/httpgrpc"
3233
"github.com/weaveworks/common/logging"
3334
"go.uber.org/atomic"
35+
"google.golang.org/grpc/codes"
3436
"google.golang.org/grpc/metadata"
3537

3638
cortex_testutil "github.com/cortexproject/cortex/pkg/storage/tsdb/testutil"
@@ -74,17 +76,26 @@ func TestBucketStores_CustomerKeyError(t *testing.T) {
7476

7577
// Should set the error on user-1
7678
require.NoError(t, stores.InitialSync(ctx))
77-
require.ErrorIs(t, stores.stores["user-1"].err, bucket.ErrCustomerManagedKeyError)
78-
require.ErrorIs(t, stores.stores["user-2"].err, nil)
79+
require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyError)
80+
require.ErrorIs(t, stores.storesErrors["user-2"], nil)
7981
require.NoError(t, stores.SyncBlocks(context.Background()))
80-
require.ErrorIs(t, stores.stores["user-1"].err, bucket.ErrCustomerManagedKeyError)
81-
require.ErrorIs(t, stores.stores["user-2"].err, nil)
82+
require.ErrorIs(t, stores.storesErrors["user-1"], bucket.ErrCustomerManagedKeyError)
83+
require.ErrorIs(t, stores.storesErrors["user-2"], nil)
84+
85+
_, _, err = querySeries(stores, "user-1", "anything", 0, 100)
86+
require.Equal(t, err, httpgrpc.Errorf(int(codes.ResourceExhausted), "store error: %s", bucket.ErrCustomerManagedKeyError))
87+
_, _, err = querySeries(stores, "user-2", "anything", 0, 100)
88+
require.NoError(t, err)
8289

8390
// Cleaning the error
8491
mBucket.GetFailures = map[string]error{}
8592
require.NoError(t, stores.SyncBlocks(context.Background()))
86-
require.ErrorIs(t, stores.stores["user-1"].err, nil)
87-
require.ErrorIs(t, stores.stores["user-2"].err, nil)
93+
require.ErrorIs(t, stores.storesErrors["user-1"], nil)
94+
require.ErrorIs(t, stores.storesErrors["user-2"], nil)
95+
_, _, err = querySeries(stores, "user-1", "anything", 0, 100)
96+
require.NoError(t, err)
97+
_, _, err = querySeries(stores, "user-2", "anything", 0, 100)
98+
require.NoError(t, err)
8899
}
89100

90101
func TestBucketStores_InitialSync(t *testing.T) {

0 commit comments

Comments
 (0)