Skip to content

Commit d5ce6f4

Browse files
authored
Upload blocks no compaction marks to the global location and introduce a new cortex_bucket_blocks_marked_for_no_compaction_count metric (#4729)
* Uploading no compact markers to the global marker index * Creating new Metric cortex_compactor_blocks_marked_for_no_compaction_on_storage_total * Fix Lint * Update MigrateBlockDeletionMarksToGlobalLocation to also migrate non compact markers * Not saving the NonCompactionMark information on the index yet * Update Changelog and rename the metric to be in the same pattern of cortex_bucket_blocks_marked_for_deletion_count * rename var names * Sharing the MarkersMap var
1 parent fce7bbd commit d5ce6f4

14 files changed

+291
-103
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
* [BUGFIX] Distributor: update defaultReplicationStrategy to not fail with extend-write when a single instance is unhealthy. #4636
3737
* [BUGFIX] Distributor: Fix race condition on `/series` introduced by #4683. #4716
3838
* [BUGFIX] Distributor: Fix a memory leak in distributor due to the cluster label. #4739
39+
* [ENHANCEMENT] Compactor: uploading blocks no compaction marks to the global location and introduce a new metric #4729
40+
* `cortex_bucket_blocks_marked_for_no_compaction_count`: Total number of blocks marked for no compaction in the bucket.
3941

4042
## 1.11.0 2021-11-25
4143

pkg/compactor/blocks_cleaner.go

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,18 @@ type BlocksCleaner struct {
4545
lastOwnedUsers []string
4646

4747
// Metrics.
48-
runsStarted prometheus.Counter
49-
runsCompleted prometheus.Counter
50-
runsFailed prometheus.Counter
51-
runsLastSuccess prometheus.Gauge
52-
blocksCleanedTotal prometheus.Counter
53-
blocksFailedTotal prometheus.Counter
54-
blocksMarkedForDeletion prometheus.Counter
55-
tenantBlocks *prometheus.GaugeVec
56-
tenantMarkedBlocks *prometheus.GaugeVec
57-
tenantPartialBlocks *prometheus.GaugeVec
58-
tenantBucketIndexLastUpdate *prometheus.GaugeVec
48+
runsStarted prometheus.Counter
49+
runsCompleted prometheus.Counter
50+
runsFailed prometheus.Counter
51+
runsLastSuccess prometheus.Gauge
52+
blocksCleanedTotal prometheus.Counter
53+
blocksFailedTotal prometheus.Counter
54+
blocksMarkedForDeletion prometheus.Counter
55+
tenantBlocks *prometheus.GaugeVec
56+
tenantBlocksMarkedForDelete *prometheus.GaugeVec
57+
tenantBlocksMarkedForNoCompaction *prometheus.GaugeVec
58+
tenantPartialBlocks *prometheus.GaugeVec
59+
tenantBucketIndexLastUpdate *prometheus.GaugeVec
5960
}
6061

6162
func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, usersScanner *cortex_tsdb.UsersScanner, cfgProvider ConfigProvider, logger log.Logger, reg prometheus.Registerer) *BlocksCleaner {
@@ -102,10 +103,14 @@ func NewBlocksCleaner(cfg BlocksCleanerConfig, bucketClient objstore.Bucket, use
102103
Name: "cortex_bucket_blocks_count",
103104
Help: "Total number of blocks in the bucket. Includes blocks marked for deletion, but not partial blocks.",
104105
}, []string{"user"}),
105-
tenantMarkedBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
106+
tenantBlocksMarkedForDelete: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
106107
Name: "cortex_bucket_blocks_marked_for_deletion_count",
107108
Help: "Total number of blocks marked for deletion in the bucket.",
108109
}, []string{"user"}),
110+
tenantBlocksMarkedForNoCompaction: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
111+
Name: "cortex_bucket_blocks_marked_for_no_compaction_count",
112+
Help: "Total number of blocks marked for no compaction in the bucket.",
113+
}, []string{"user"}),
109114
tenantPartialBlocks: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
110115
Name: "cortex_bucket_blocks_partials_count",
111116
Help: "Total number of partial blocks.",
@@ -168,7 +173,8 @@ func (c *BlocksCleaner) cleanUsers(ctx context.Context, firstRun bool) error {
168173
for _, userID := range c.lastOwnedUsers {
169174
if !isActive[userID] && !isDeleted[userID] {
170175
c.tenantBlocks.DeleteLabelValues(userID)
171-
c.tenantMarkedBlocks.DeleteLabelValues(userID)
176+
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
177+
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
172178
c.tenantPartialBlocks.DeleteLabelValues(userID)
173179
c.tenantBucketIndexLastUpdate.DeleteLabelValues(userID)
174180
}
@@ -231,15 +237,16 @@ func (c *BlocksCleaner) deleteUserMarkedForDeletion(ctx context.Context, userID
231237
// to delete. We also consider them all marked for deletion given the next run will try
232238
// to delete them again.
233239
c.tenantBlocks.WithLabelValues(userID).Set(float64(failed))
234-
c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(failed))
240+
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(failed))
235241
c.tenantPartialBlocks.WithLabelValues(userID).Set(0)
236242

237243
return errors.Errorf("failed to delete %d blocks", failed)
238244
}
239245

240246
// Given all blocks have been deleted, we can also remove the metrics.
241247
c.tenantBlocks.DeleteLabelValues(userID)
242-
c.tenantMarkedBlocks.DeleteLabelValues(userID)
248+
c.tenantBlocksMarkedForDelete.DeleteLabelValues(userID)
249+
c.tenantBlocksMarkedForNoCompaction.DeleteLabelValues(userID)
243250
c.tenantPartialBlocks.DeleteLabelValues(userID)
244251

245252
if deletedBlocks > 0 {
@@ -330,7 +337,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
330337

331338
// Generate an updated in-memory version of the bucket index.
332339
w := bucketindex.NewUpdater(c.bucketClient, userID, c.cfgProvider, c.logger)
333-
idx, partials, err := w.UpdateIndex(ctx, idx)
340+
idx, partials, totalBlocksBlocksMarkedForNoCompaction, err := w.UpdateIndex(ctx, idx)
334341
if err != nil {
335342
return err
336343
}
@@ -367,9 +374,10 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userID string, firstRun b
367374
}
368375

369376
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
370-
c.tenantMarkedBlocks.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
371-
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
377+
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
378+
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(float64(totalBlocksBlocksMarkedForNoCompaction))
372379
c.tenantBucketIndexLastUpdate.WithLabelValues(userID).SetToCurrentTime()
380+
c.tenantPartialBlocks.WithLabelValues(userID).Set(float64(len(partials)))
373381

374382
return nil
375383
}

pkg/compactor/blocks_cleaner_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,11 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
9898
user4DebugMetaFile := path.Join("user-4", block.DebugMetas, "meta.json")
9999
require.NoError(t, bucketClient.Upload(context.Background(), user4DebugMetaFile, strings.NewReader("some random content here")))
100100

101+
// No Compact blocks marker
102+
createTSDBBlock(t, bucketClient, "user-5", 10, 30, nil)
103+
block12 := createTSDBBlock(t, bucketClient, "user-5", 30, 50, nil)
104+
createNoCompactionMark(t, bucketClient, "user-5", block12)
105+
101106
// The fixtures have been created. If the bucket client wasn't wrapped to write
102107
// deletion marks to the global location too, then this is the right time to do it.
103108
if options.markersMigrationEnabled {
@@ -202,17 +207,26 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
202207
# TYPE cortex_bucket_blocks_count gauge
203208
cortex_bucket_blocks_count{user="user-1"} 2
204209
cortex_bucket_blocks_count{user="user-2"} 1
210+
cortex_bucket_blocks_count{user="user-5"} 2
205211
# HELP cortex_bucket_blocks_marked_for_deletion_count Total number of blocks marked for deletion in the bucket.
206212
# TYPE cortex_bucket_blocks_marked_for_deletion_count gauge
207213
cortex_bucket_blocks_marked_for_deletion_count{user="user-1"} 1
208214
cortex_bucket_blocks_marked_for_deletion_count{user="user-2"} 0
215+
cortex_bucket_blocks_marked_for_deletion_count{user="user-5"} 0
216+
# HELP cortex_bucket_blocks_marked_for_no_compaction_count Total number of blocks marked for no compaction in the bucket.
217+
# TYPE cortex_bucket_blocks_marked_for_no_compaction_count gauge
218+
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-1"} 0
219+
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-2"} 0
220+
cortex_bucket_blocks_marked_for_no_compaction_count{user="user-5"} 1
209221
# HELP cortex_bucket_blocks_partials_count Total number of partial blocks.
210222
# TYPE cortex_bucket_blocks_partials_count gauge
211223
cortex_bucket_blocks_partials_count{user="user-1"} 2
212224
cortex_bucket_blocks_partials_count{user="user-2"} 0
225+
cortex_bucket_blocks_partials_count{user="user-5"} 0
213226
`),
214227
"cortex_bucket_blocks_count",
215228
"cortex_bucket_blocks_marked_for_deletion_count",
229+
"cortex_bucket_blocks_marked_for_no_compaction_count",
216230
"cortex_bucket_blocks_partials_count",
217231
))
218232
}
@@ -421,7 +435,7 @@ func TestBlocksCleaner_ListBlocksOutsideRetentionPeriod(t *testing.T) {
421435
id3 := createTSDBBlock(t, bucketClient, "user-1", 7000, 8000, nil)
422436

423437
w := bucketindex.NewUpdater(bucketClient, "user-1", nil, logger)
424-
idx, _, err := w.UpdateIndex(ctx, nil)
438+
idx, _, _, err := w.UpdateIndex(ctx, nil)
425439
require.NoError(t, err)
426440

427441
assert.ElementsMatch(t, []ulid.ULID{id1, id2, id3}, idx.Blocks.GetULIDs())

pkg/compactor/compactor_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1378,6 +1378,14 @@ func createDeletionMark(t *testing.T, bkt objstore.Bucket, userID string, blockI
13781378
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
13791379
}
13801380

1381+
func createNoCompactionMark(t *testing.T, bkt objstore.Bucket, userID string, blockID ulid.ULID) {
1382+
content := mockNoCompactBlockJSON(blockID.String())
1383+
blockPath := path.Join(userID, blockID.String())
1384+
markPath := path.Join(blockPath, metadata.NoCompactMarkFilename)
1385+
1386+
require.NoError(t, bkt.Upload(context.Background(), markPath, strings.NewReader(content)))
1387+
}
1388+
13811389
func findCompactorByUserID(compactors []*Compactor, logs []*concurrency.SyncBuffer, userID string) (*Compactor, *concurrency.SyncBuffer, error) {
13821390
var compactor *Compactor
13831391
var log *concurrency.SyncBuffer

pkg/storage/tsdb/bucketindex/markers.go

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,25 @@ const (
2121
MarkersPathname = "markers"
2222
)
2323

24+
var (
25+
MarkersMap = map[string]func(ulid.ULID) string{
26+
metadata.DeletionMarkFilename: BlockDeletionMarkFilepath,
27+
metadata.NoCompactMarkFilename: NoCompactMarkFilenameMarkFilepath,
28+
}
29+
)
30+
2431
// BlockDeletionMarkFilepath returns the path, relative to the tenant's bucket location,
2532
// of a block deletion mark in the bucket markers location.
2633
func BlockDeletionMarkFilepath(blockID ulid.ULID) string {
2734
return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), metadata.DeletionMarkFilename)
2835
}
2936

37+
// NoCompactMarkFilenameMarkFilepath returns the path, relative to the tenant's bucket location,
38+
// of a block no compact mark in the bucket markers location.
39+
func NoCompactMarkFilenameMarkFilepath(blockID ulid.ULID) string {
40+
return fmt.Sprintf("%s/%s-%s", MarkersPathname, blockID.String(), metadata.NoCompactMarkFilename)
41+
}
42+
3043
// IsBlockDeletionMarkFilename returns whether the input filename matches the expected pattern
3144
// of block deletion markers stored in the markers location.
3245
func IsBlockDeletionMarkFilename(name string) (ulid.ULID, bool) {
@@ -45,6 +58,24 @@ func IsBlockDeletionMarkFilename(name string) (ulid.ULID, bool) {
4558
return id, err == nil
4659
}
4760

61+
// IsBlockNoCompactMarkFilename returns whether the input filename matches the expected pattern
62+
// of block no compact markers stored in the markers location.
63+
func IsBlockNoCompactMarkFilename(name string) (ulid.ULID, bool) {
64+
parts := strings.SplitN(name, "-", 2)
65+
if len(parts) != 2 {
66+
return ulid.ULID{}, false
67+
}
68+
69+
// Ensure the 2nd part matches the block deletion mark filename.
70+
if parts[1] != metadata.NoCompactMarkFilename {
71+
return ulid.ULID{}, false
72+
}
73+
74+
// Ensure the 1st part is a valid block ID.
75+
id, err := ulid.Parse(filepath.Base(parts[0]))
76+
return id, err == nil
77+
}
78+
4879
// MigrateBlockDeletionMarksToGlobalLocation list all tenant's blocks and, for each of them, look for
4980
// a deletion mark in the block location. Found deletion marks are copied to the global markers location.
5081
// The migration continues on error and returns once all blocks have been checked.
@@ -67,22 +98,24 @@ func MigrateBlockDeletionMarksToGlobalLocation(ctx context.Context, bkt objstore
6798
errs := tsdb_errors.NewMulti()
6899

69100
for _, blockID := range blocks {
70-
// Look up the deletion mark (if any).
71-
reader, err := userBucket.Get(ctx, path.Join(blockID.String(), metadata.DeletionMarkFilename))
72-
if userBucket.IsObjNotFoundErr(err) {
73-
continue
74-
} else if err != nil {
75-
errs.Add(err)
76-
continue
77-
}
101+
for mark, globalFilePath := range MarkersMap {
102+
// Look up mark (if any).
103+
reader, err := userBucket.Get(ctx, path.Join(blockID.String(), mark))
104+
if userBucket.IsObjNotFoundErr(err) {
105+
continue
106+
} else if err != nil {
107+
errs.Add(err)
108+
continue
109+
}
78110

79-
// Upload it to the global markers location.
80-
uploadErr := userBucket.Upload(ctx, BlockDeletionMarkFilepath(blockID), reader)
81-
if closeErr := reader.Close(); closeErr != nil {
82-
errs.Add(closeErr)
83-
}
84-
if uploadErr != nil {
85-
errs.Add(uploadErr)
111+
// Upload it to the global markers location.
112+
uploadErr := userBucket.Upload(ctx, globalFilePath(blockID), reader)
113+
if closeErr := reader.Close(); closeErr != nil {
114+
errs.Add(closeErr)
115+
}
116+
if uploadErr != nil {
117+
errs.Add(uploadErr)
118+
}
86119
}
87120
}
88121

pkg/storage/tsdb/bucketindex/markers_bucket_client.go

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,7 @@ import (
77
"io/ioutil"
88
"path"
99

10-
"github.com/oklog/ulid"
1110
"github.com/thanos-io/thanos/pkg/block"
12-
"github.com/thanos-io/thanos/pkg/block/metadata"
1311
"github.com/thanos-io/thanos/pkg/objstore"
1412
)
1513

@@ -29,7 +27,7 @@ func BucketWithGlobalMarkers(b objstore.Bucket) objstore.Bucket {
2927

3028
// Upload implements objstore.Bucket.
3129
func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Reader) error {
32-
blockID, ok := b.isBlockDeletionMark(name)
30+
globalMarkPath, ok := b.isMark(name)
3331
if !ok {
3432
return b.parent.Upload(ctx, name, r)
3533
}
@@ -46,7 +44,6 @@ func (b *globalMarkersBucket) Upload(ctx context.Context, name string, r io.Read
4644
}
4745

4846
// Upload it to the global markers location too.
49-
globalMarkPath := path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
5047
return b.parent.Upload(ctx, globalMarkPath, bytes.NewBuffer(body))
5148
}
5249

@@ -58,8 +55,7 @@ func (b *globalMarkersBucket) Delete(ctx context.Context, name string) error {
5855
}
5956

6057
// Delete the marker in the global markers location too.
61-
if blockID, ok := b.isBlockDeletionMark(name); ok {
62-
globalMarkPath := path.Clean(path.Join(path.Dir(name), "../", BlockDeletionMarkFilepath(blockID)))
58+
if globalMarkPath, ok := b.isMark(name); ok {
6359
if err := b.parent.Delete(ctx, globalMarkPath); err != nil {
6460
if !b.parent.IsObjNotFoundErr(err) {
6561
return err
@@ -128,12 +124,21 @@ func (b *globalMarkersBucket) ReaderWithExpectedErrs(fn objstore.IsOpFailureExpe
128124
return b
129125
}
130126

131-
func (b *globalMarkersBucket) isBlockDeletionMark(name string) (ulid.ULID, bool) {
132-
if path.Base(name) != metadata.DeletionMarkFilename {
133-
return ulid.ULID{}, false
127+
func (b *globalMarkersBucket) isMark(name string) (string, bool) {
128+
129+
for mark, globalFilePath := range MarkersMap {
130+
if path.Base(name) == mark {
131+
// Parse the block ID in the path. If there's not block ID, then it's not the per-block
132+
// deletion mark.
133+
id, ok := block.IsBlockDir(path.Dir(name))
134+
135+
if ok {
136+
return path.Clean(path.Join(path.Dir(name), "../", globalFilePath(id))), ok
137+
}
138+
139+
return "", ok
140+
}
134141
}
135142

136-
// Parse the block ID in the path. If there's not block ID, then it's not the per-block
137-
// deletion mark.
138-
return block.IsBlockDir(path.Dir(name))
143+
return "", false
139144
}

0 commit comments

Comments
 (0)