Skip to content

Commit 369dbf3

Browse files
committed
Allow 2h block conversion if it has no compact mark
Signed-off-by: SungJin1212 <[email protected]>
1 parent e4b0f1b commit 369dbf3

File tree

10 files changed

+211
-32
lines changed

10 files changed

+211
-32
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1919
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
2020
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
21+
* [ENHANCEMENT] Parquet Converter: Allow 2h blocks conversion if the block has a no-compact-mark.json file. #6865
2122
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
2223
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
2324
* [ENHANCEMENT] Querier: Support query parameters to metadata api (/api/v1/metadata) to allow user to limit metadata to return. Add a `-ingester.return-all-metadata` flag to make the metadata API run when the deployment. Please set this flag to `false` to use the metadata API with the limits later. #6681 #6744

pkg/compactor/blocks_cleaner.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type BlocksCleanerConfig struct {
4545
ShardingStrategy string
4646
CompactionStrategy string
4747
BlockRanges []int64
48+
NoCompactMarkCheckAfter time.Duration
4849
}
4950

5051
type BlocksCleaner struct {
@@ -752,7 +753,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
752753
}
753754
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
754755
}
755-
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction))
756+
757+
noCompactMarkCheckFunc := func(blockID ulid.ULID) bool {
758+
return cortex_parquet.ExistBlockNoCompact(ctx, userBucket, userLogger, blockID)
759+
}
760+
761+
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction), noCompactMarkCheckFunc)
756762

757763
if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
758764
begin = time.Now()
@@ -762,7 +768,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
762768
return nil
763769
}
764770

765-
func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64) {
771+
func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64, noCompactMarkCheckFunc cortex_parquet.NoCompactMarkCheckFunc) {
766772
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
767773
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
768774
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(totalBlocksBlocksMarkedForNoCompaction)
@@ -772,7 +778,7 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
772778
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
773779
remainingBlocksToConvert := 0
774780
for _, b := range idx.NonParquetBlocks() {
775-
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges) {
781+
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.NoCompactMarkCheckAfter.Milliseconds(), c.cfg.BlockRanges, b.ID, noCompactMarkCheckFunc) {
776782
remainingBlocksToConvert++
777783
}
778784
}

pkg/compactor/blocks_cleaner_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -353,7 +353,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
353353
cortex_bucket_parquet_blocks_count{user="user-6"} 1
354354
# HELP cortex_bucket_parquet_unconverted_blocks_count Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included.
355355
# TYPE cortex_bucket_parquet_unconverted_blocks_count gauge
356-
cortex_bucket_parquet_unconverted_blocks_count{user="user-5"} 0
356+
cortex_bucket_parquet_unconverted_blocks_count{user="user-5"} 1
357357
cortex_bucket_parquet_unconverted_blocks_count{user="user-6"} 0
358358
`),
359359
"cortex_bucket_blocks_count",
@@ -1109,8 +1109,12 @@ func TestBlocksCleaner_ParquetMetrics(t *testing.T) {
11091109
},
11101110
}
11111111

1112+
mockNoCompactMarkCheckFunc := func(blockID ulid.ULID) bool {
1113+
return false
1114+
}
1115+
11121116
// Update metrics
1113-
cleaner.updateBucketMetrics("user1", true, idx, 0, 0)
1117+
cleaner.updateBucketMetrics("user1", true, idx, 0, 0, mockNoCompactMarkCheckFunc)
11141118

11151119
// Verify metrics
11161120
require.NoError(t, prom_testutil.CollectAndCompare(cleaner.tenantParquetBlocks, strings.NewReader(`

pkg/compactor/compactor.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,9 @@ type Config struct {
304304
AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
305305
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
306306
CleanerCachingBucketEnabled bool `yaml:"cleaner_caching_bucket_enabled"`
307+
308+
// Injected internally
309+
NoCompactMarkCheckAfter time.Duration `yaml:"-"`
307310
}
308311

309312
// RegisterFlags registers the Compactor flags.
@@ -753,6 +756,7 @@ func (c *Compactor) starting(ctx context.Context) error {
753756
ShardingStrategy: c.compactorCfg.ShardingStrategy,
754757
CompactionStrategy: c.compactorCfg.CompactionStrategy,
755758
BlockRanges: c.compactorCfg.BlockRanges.ToMilliseconds(),
759+
NoCompactMarkCheckAfter: c.compactorCfg.NoCompactMarkCheckAfter,
756760
}, cleanerBucketClient, cleanerUsersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
757761
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)
758762

pkg/cortex/modules.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ func (t *Cortex) initParquetConverter() (serv services.Service, err error) {
737737
func (t *Cortex) initCompactor() (serv services.Service, err error) {
738738
t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort
739739
ingestionReplicationFactor := t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor
740+
t.Cfg.Compactor.NoCompactMarkCheckAfter = t.Cfg.ParquetConverter.NoCompactMarkCheckAfter
740741

741742
t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides, ingestionReplicationFactor)
742743
if err != nil {

pkg/parquetconverter/converter.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"github.com/go-kit/log"
1717
"github.com/go-kit/log/level"
18+
"github.com/oklog/ulid/v2"
1819
"github.com/parquet-go/parquet-go"
1920
"github.com/pkg/errors"
2021
"github.com/prometheus-community/parquet-common/convert"
@@ -50,11 +51,14 @@ const (
5051

5152
var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)
5253

54+
type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool
55+
5356
type Config struct {
54-
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
55-
ConversionInterval time.Duration `yaml:"conversion_interval"`
56-
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
57-
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
57+
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
58+
ConversionInterval time.Duration `yaml:"conversion_interval"`
59+
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
60+
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
61+
NoCompactMarkCheckAfter time.Duration `yaml:"no_compact_mark_check_after"`
5862

5963
DataDir string `yaml:"data_dir"`
6064

@@ -106,6 +110,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
106110
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.")
107111
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
108112
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.")
113+
f.DurationVar(&cfg.NoCompactMarkCheckAfter, "parquet-converter.no-compact-mark-check-after", time.Hour*13, "The time after which a `no-compact-mark.json` file should be checked.")
109114
}
110115

111116
func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
@@ -385,7 +390,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
385390
continue
386391
}
387392

388-
if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges) {
393+
noCompactMarkCheckFunc := func(bId ulid.ULID) bool {
394+
return cortex_parquet.ExistBlockNoCompact(ctx, uBucket, logger, b.ULID)
395+
}
396+
397+
if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.NoCompactMarkCheckAfter.Milliseconds(), c.blockRanges, b.ULID, noCompactMarkCheckFunc) {
389398
continue
390399
}
391400

pkg/parquetconverter/converter_test.go

Lines changed: 82 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,12 @@ import (
1212
"github.com/go-kit/log"
1313
"github.com/oklog/ulid/v2"
1414
"github.com/prometheus/client_golang/prometheus"
15+
"github.com/prometheus/client_golang/prometheus/promauto"
1516
"github.com/prometheus/client_golang/prometheus/testutil"
1617
"github.com/prometheus/prometheus/model/labels"
1718
"github.com/prometheus/prometheus/tsdb"
1819
"github.com/stretchr/testify/assert"
20+
"github.com/stretchr/testify/mock"
1921
"github.com/stretchr/testify/require"
2022
"github.com/thanos-io/objstore"
2123
"github.com/thanos-io/objstore/providers/filesystem"
@@ -227,6 +229,76 @@ func TestConverter_CleanupMetricsForNotOwnedUser(t *testing.T) {
227229
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID)))
228230
}
229231

232+
func TestConverter_ShouldConvertNoCompactMarkBlocks(t *testing.T) {
233+
cfg := prepareConfig()
234+
userID := "user"
235+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
236+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
237+
dir := t.TempDir()
238+
239+
cfg.Ring.InstanceID = "parquet-converter-1"
240+
cfg.Ring.InstanceAddr = "1.2.3.4"
241+
cfg.Ring.KVStore.Mock = ringStore
242+
bucketClient, err := filesystem.NewBucket(t.TempDir())
243+
require.NoError(t, err)
244+
userBucket := bucket.NewPrefixedBucketClient(bucketClient, userID)
245+
limits := &validation.Limits{}
246+
flagext.DefaultValues(limits)
247+
limits.ParquetConverterEnabled = true
248+
249+
c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits)
250+
c.ringLifecycler = &ring.Lifecycler{
251+
Addr: "1.2.3.4",
252+
}
253+
254+
ctx := context.Background()
255+
256+
lbls := labels.Labels{labels.Label{
257+
Name: "__name__",
258+
Value: "test",
259+
}}
260+
261+
blocks := []ulid.ULID{}
262+
numBlocks := 2
263+
mint := time.Hour * 13
264+
maxt := time.Hour * 14
265+
// Create 2h blocks
266+
for i := 0; i < numBlocks; i++ {
267+
rnd := rand.New(rand.NewSource(time.Now().Unix()))
268+
id, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, mint.Milliseconds(), maxt.Milliseconds(), time.Minute.Milliseconds(), 10)
269+
require.NoError(t, err)
270+
blocks = append(blocks, id)
271+
}
272+
273+
for _, bId := range blocks {
274+
blockDir := fmt.Sprintf("%s/%s", dir, bId.String())
275+
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
276+
require.NoError(t, err)
277+
// upload block
278+
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
279+
require.NoError(t, err)
280+
281+
// upload no-compact-mark.json
282+
err = block.MarkForNoCompact(ctx, logger, userBucket, bId, metadata.ManualNoCompactReason, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))
283+
require.NoError(t, err)
284+
}
285+
286+
ringMock := &ring.RingMock{}
287+
ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{
288+
Instances: []ring.InstanceDesc{
289+
{
290+
Addr: "1.2.3.4",
291+
},
292+
},
293+
}, nil)
294+
295+
err = c.convertUser(ctx, log.NewNopLogger(), ringMock, userID)
296+
require.NoError(t, err)
297+
298+
// Verify the converted metric was incremented
299+
assert.Equal(t, 2.0, testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(userID)))
300+
}
301+
230302
func TestConverter_BlockConversionFailure(t *testing.T) {
231303
// Create a new registry for testing
232304
reg := prometheus.NewRegistry()
@@ -277,7 +349,16 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
277349
Addr: "1.2.3.4",
278350
}
279351

280-
err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID)
352+
ringMock := &ring.RingMock{}
353+
ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{
354+
Instances: []ring.InstanceDesc{
355+
{
356+
Addr: "1.2.3.4",
357+
},
358+
},
359+
}, nil)
360+
361+
err = converter.convertUser(context.Background(), logger, ringMock, userID)
281362
require.NoError(t, err)
282363

283364
// Verify the failure metric was incremented
@@ -292,17 +373,3 @@ type mockBucket struct {
292373
func (m *mockBucket) Upload(ctx context.Context, name string, r io.Reader) error {
293374
return fmt.Errorf("mock upload failure")
294375
}
295-
296-
type RingMock struct {
297-
ring.ReadRing
298-
}
299-
300-
func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDesc, bufHosts []string, bufZones map[string]int) (ring.ReplicationSet, error) {
301-
return ring.ReplicationSet{
302-
Instances: []ring.InstanceDesc{
303-
{
304-
Addr: "1.2.3.4",
305-
},
306-
},
307-
}, nil
308-
}

pkg/storage/parquet/converter_marker.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@ import (
99

1010
"github.com/efficientgo/core/errors"
1111
"github.com/go-kit/log"
12+
"github.com/go-kit/log/level"
1213
"github.com/oklog/ulid/v2"
1314
"github.com/thanos-io/objstore"
15+
"github.com/thanos-io/thanos/pkg/block/metadata"
1416
"github.com/thanos-io/thanos/pkg/runutil"
1517

1618
"github.com/cortexproject/cortex/pkg/storage/tsdb"
@@ -26,6 +28,15 @@ type ConverterMark struct {
2628
Version int `json:"version"`
2729
}
2830

31+
func ExistBlockNoCompact(ctx context.Context, userBkt objstore.InstrumentedBucket, logger log.Logger, blockID ulid.ULID) bool {
32+
noCompactMarkerExists, err := userBkt.Exists(ctx, path.Join(blockID.String(), metadata.NoCompactMarkFilename))
33+
if err != nil {
34+
level.Warn(logger).Log("msg", "unable to get stats of no-compact-mark.json for block", "block", blockID.String())
35+
return false
36+
}
37+
return noCompactMarkerExists
38+
}
39+
2940
func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*ConverterMark, error) {
3041
markerPath := path.Join(id.String(), ConverterMarkerFileName)
3142
reader, err := userBkt.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(userBkt.IsAccessDeniedErr, userBkt.IsObjNotFoundErr)).Get(ctx, markerPath)

pkg/storage/parquet/util.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,28 @@
11
package parquet
22

3-
func ShouldConvertBlockToParquet(mint, maxt int64, timeRanges []int64) bool {
3+
import (
4+
"github.com/oklog/ulid/v2"
5+
)
6+
7+
type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool
8+
9+
func ShouldConvertBlockToParquet(mint, maxt, noCompactMarkCheckAfter int64, timeRanges []int64, bId ulid.ULID, checkFunc NoCompactMarkCheckFunc) bool {
410
// We assume timeRanges[0] is the TSDB block duration (2h), and we don't convert them.
5-
return getBlockTimeRange(mint, maxt, timeRanges) > timeRanges[0]
11+
blockTimeRange := getBlockTimeRange(mint, maxt, timeRanges)
12+
if blockTimeRange > timeRanges[0] {
13+
return true
14+
}
15+
16+
// We should check if 2h blocks have a `no-compact-mark.json` file
17+
// since these will never be compacted to 12h block.
18+
// We check if the `no-compact-mark.json` file exists only for blocks
19+
// after the noCompactMarkCheckAfter to reduce calls to `checkFunc`.
20+
if mint >= noCompactMarkCheckAfter {
21+
if blockTimeRange == timeRanges[0] && checkFunc(bId) {
22+
return true
23+
}
24+
}
25+
return false
626
}
727

828
func getBlockTimeRange(mint, maxt int64, timeRanges []int64) int64 {

0 commit comments

Comments
 (0)