Skip to content

Allow 2h block conversion if it has no compact mark #6865

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
* [ENHANCEMENT] Parquet Converter: Allow 2h blocks conversion if the block has a no-compact-mark.json file. #6865
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715
Expand Down
12 changes: 9 additions & 3 deletions pkg/compactor/blocks_cleaner.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type BlocksCleanerConfig struct {
ShardingStrategy string
CompactionStrategy string
BlockRanges []int64
NoCompactMarkCheckAfter time.Duration
}

type BlocksCleaner struct {
Expand Down Expand Up @@ -752,7 +753,12 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
}
level.Info(userLogger).Log("msg", "finish writing new index", "duration", time.Since(begin), "duration_ms", time.Since(begin).Milliseconds())
}
c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction))

noCompactMarkCheckFunc := func(blockID ulid.ULID) bool {
return cortex_parquet.ExistBlockNoCompact(ctx, userBucket, userLogger, blockID)
}

c.updateBucketMetrics(userID, parquetEnabled, idx, float64(len(partials)), float64(totalBlocksBlocksMarkedForNoCompaction), noCompactMarkCheckFunc)

if c.cfg.ShardingStrategy == util.ShardingStrategyShuffle && c.cfg.CompactionStrategy == util.CompactionStrategyPartitioning {
begin = time.Now()
Expand All @@ -762,7 +768,7 @@ func (c *BlocksCleaner) cleanUser(ctx context.Context, userLogger log.Logger, us
return nil
}

func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64) {
func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool, idx *bucketindex.Index, partials, totalBlocksBlocksMarkedForNoCompaction float64, noCompactMarkCheckFunc cortex_parquet.NoCompactMarkCheckFunc) {
c.tenantBlocks.WithLabelValues(userID).Set(float64(len(idx.Blocks)))
c.tenantBlocksMarkedForDelete.WithLabelValues(userID).Set(float64(len(idx.BlockDeletionMarks)))
c.tenantBlocksMarkedForNoCompaction.WithLabelValues(userID).Set(totalBlocksBlocksMarkedForNoCompaction)
Expand All @@ -772,7 +778,7 @@ func (c *BlocksCleaner) updateBucketMetrics(userID string, parquetEnabled bool,
c.tenantParquetBlocks.WithLabelValues(userID).Set(float64(len(idx.ParquetBlocks())))
remainingBlocksToConvert := 0
for _, b := range idx.NonParquetBlocks() {
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.BlockRanges) {
if cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.NoCompactMarkCheckAfter.Milliseconds(), c.cfg.BlockRanges, b.ID, noCompactMarkCheckFunc) {
remainingBlocksToConvert++
}
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/compactor/blocks_cleaner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func testBlocksCleanerWithOptions(t *testing.T, options testBlocksCleanerOptions
cortex_bucket_parquet_blocks_count{user="user-6"} 1
# HELP cortex_bucket_parquet_unconverted_blocks_count Total number of unconverted parquet blocks in the bucket. Blocks marked for deletion are included.
# TYPE cortex_bucket_parquet_unconverted_blocks_count gauge
cortex_bucket_parquet_unconverted_blocks_count{user="user-5"} 0
cortex_bucket_parquet_unconverted_blocks_count{user="user-5"} 1
cortex_bucket_parquet_unconverted_blocks_count{user="user-6"} 0
`),
"cortex_bucket_blocks_count",
Expand Down Expand Up @@ -1109,8 +1109,12 @@ func TestBlocksCleaner_ParquetMetrics(t *testing.T) {
},
}

mockNoCompactMarkCheckFunc := func(blockID ulid.ULID) bool {
return false
}

// Update metrics
cleaner.updateBucketMetrics("user1", true, idx, 0, 0)
cleaner.updateBucketMetrics("user1", true, idx, 0, 0, mockNoCompactMarkCheckFunc)

// Verify metrics
require.NoError(t, prom_testutil.CollectAndCompare(cleaner.tenantParquetBlocks, strings.NewReader(`
Expand Down
4 changes: 4 additions & 0 deletions pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ type Config struct {
AcceptMalformedIndex bool `yaml:"accept_malformed_index"`
CachingBucketEnabled bool `yaml:"caching_bucket_enabled"`
CleanerCachingBucketEnabled bool `yaml:"cleaner_caching_bucket_enabled"`

// Injected internally
NoCompactMarkCheckAfter time.Duration `yaml:"-"`
}

// RegisterFlags registers the Compactor flags.
Expand Down Expand Up @@ -753,6 +756,7 @@ func (c *Compactor) starting(ctx context.Context) error {
ShardingStrategy: c.compactorCfg.ShardingStrategy,
CompactionStrategy: c.compactorCfg.CompactionStrategy,
BlockRanges: c.compactorCfg.BlockRanges.ToMilliseconds(),
NoCompactMarkCheckAfter: c.compactorCfg.NoCompactMarkCheckAfter,
}, cleanerBucketClient, cleanerUsersScanner, c.compactorCfg.CompactionVisitMarkerTimeout, c.limits, c.parentLogger, cleanerRingLifecyclerID, c.registerer, c.compactorCfg.CleanerVisitMarkerTimeout, c.compactorCfg.CleanerVisitMarkerFileUpdateInterval,
c.compactorMetrics.syncerBlocksMarkedForDeletion, c.compactorMetrics.remainingPlannedCompactions)

Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,7 @@ func (t *Cortex) initParquetConverter() (serv services.Service, err error) {
func (t *Cortex) initCompactor() (serv services.Service, err error) {
t.Cfg.Compactor.ShardingRing.ListenPort = t.Cfg.Server.GRPCListenPort
ingestionReplicationFactor := t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor
t.Cfg.Compactor.NoCompactMarkCheckAfter = t.Cfg.ParquetConverter.NoCompactMarkCheckAfter

t.Compactor, err = compactor.NewCompactor(t.Cfg.Compactor, t.Cfg.BlocksStorage, util_log.Logger, prometheus.DefaultRegisterer, t.Overrides, ingestionReplicationFactor)
if err != nil {
Expand Down
19 changes: 14 additions & 5 deletions pkg/parquetconverter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid/v2"
"github.com/parquet-go/parquet-go"
"github.com/pkg/errors"
"github.com/prometheus-community/parquet-common/convert"
Expand Down Expand Up @@ -53,11 +54,14 @@ const (

var RingOp = ring.NewOp([]ring.InstanceState{ring.ACTIVE}, nil)

type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool

type Config struct {
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
MetaSyncConcurrency int `yaml:"meta_sync_concurrency"`
ConversionInterval time.Duration `yaml:"conversion_interval"`
MaxRowsPerRowGroup int `yaml:"max_rows_per_row_group"`
FileBufferEnabled bool `yaml:"file_buffer_enabled"`
NoCompactMarkCheckAfter time.Duration `yaml:"no_compact_mark_check_after"`

DataDir string `yaml:"data_dir"`

Expand Down Expand Up @@ -109,6 +113,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxRowsPerRowGroup, "parquet-converter.max-rows-per-row-group", 1e6, "Max number of rows per parquet row group.")
f.DurationVar(&cfg.ConversionInterval, "parquet-converter.conversion-interval", time.Minute, "The frequency at which the conversion job runs.")
f.BoolVar(&cfg.FileBufferEnabled, "parquet-converter.file-buffer-enabled", true, "Whether to enable buffering the writes in disk to reduce memory utilization.")
f.DurationVar(&cfg.NoCompactMarkCheckAfter, "parquet-converter.no-compact-mark-check-after", time.Hour*13, "The time after which a `no-compact-mark.json` file should be checked.")
}

func NewConverter(cfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, blockRanges []int64, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Converter, error) {
Expand Down Expand Up @@ -392,7 +397,11 @@ func (c *Converter) convertUser(ctx context.Context, logger log.Logger, ring rin
continue
}

if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.blockRanges) {
noCompactMarkCheckFunc := func(bId ulid.ULID) bool {
return cortex_parquet.ExistBlockNoCompact(ctx, uBucket, logger, b.ULID)
}

if !cortex_parquet.ShouldConvertBlockToParquet(b.MinTime, b.MaxTime, c.cfg.NoCompactMarkCheckAfter.Milliseconds(), c.blockRanges, b.ULID, noCompactMarkCheckFunc) {
continue
}

Expand Down
108 changes: 92 additions & 16 deletions pkg/parquetconverter/converter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ import (
"github.com/go-kit/log"
"github.com/oklog/ulid/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/thanos-io/objstore/providers/filesystem"
Expand Down Expand Up @@ -232,6 +234,76 @@ func TestConverter_CleanupMetricsForNotOwnedUser(t *testing.T) {
assert.Equal(t, 0.0, testutil.ToFloat64(converter.metrics.convertBlockFailures.WithLabelValues(userID)))
}

func TestConverter_ShouldConvertNoCompactMarkBlocks(t *testing.T) {
cfg := prepareConfig()
userID := "user"
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil)
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
dir := t.TempDir()

cfg.Ring.InstanceID = "parquet-converter-1"
cfg.Ring.InstanceAddr = "1.2.3.4"
cfg.Ring.KVStore.Mock = ringStore
bucketClient, err := filesystem.NewBucket(t.TempDir())
require.NoError(t, err)
userBucket := bucket.NewPrefixedBucketClient(bucketClient, userID)
limits := &validation.Limits{}
flagext.DefaultValues(limits)
limits.ParquetConverterEnabled = true

c, logger, _ := prepare(t, cfg, objstore.WithNoopInstr(bucketClient), limits)
c.ringLifecycler = &ring.Lifecycler{
Addr: "1.2.3.4",
}

ctx := context.Background()

lbls := labels.Labels{labels.Label{
Name: "__name__",
Value: "test",
}}

blocks := []ulid.ULID{}
numBlocks := 2
mint := time.Hour * 13
maxt := time.Hour * 15
// Create 2h blocks
for i := 0; i < numBlocks; i++ {
rnd := rand.New(rand.NewSource(time.Now().Unix()))
id, err := e2e.CreateBlock(ctx, rnd, dir, []labels.Labels{lbls}, 2, mint.Milliseconds(), maxt.Milliseconds(), time.Minute.Milliseconds(), 10)
require.NoError(t, err)
blocks = append(blocks, id)
}

for _, bId := range blocks {
blockDir := fmt.Sprintf("%s/%s", dir, bId.String())
b, err := tsdb.OpenBlock(nil, blockDir, nil, nil)
require.NoError(t, err)
// upload block
err = block.Upload(ctx, logger, userBucket, b.Dir(), metadata.NoneFunc)
require.NoError(t, err)

// upload no-compact-mark.json
err = block.MarkForNoCompact(ctx, logger, userBucket, bId, metadata.ManualNoCompactReason, "", promauto.With(nil).NewCounter(prometheus.CounterOpts{}))
require.NoError(t, err)
}

ringMock := &ring.RingMock{}
ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{
Addr: "1.2.3.4",
},
},
}, nil)

err = c.convertUser(ctx, log.NewNopLogger(), ringMock, userID)
require.NoError(t, err)

// Verify the converted metric was incremented
assert.Equal(t, float64(numBlocks), testutil.ToFloat64(c.metrics.convertedBlocks.WithLabelValues(userID)))
}

func TestConverter_BlockConversionFailure(t *testing.T) {
// Create a new registry for testing
reg := prometheus.NewRegistry()
Expand Down Expand Up @@ -283,7 +355,16 @@ func TestConverter_BlockConversionFailure(t *testing.T) {
Addr: "1.2.3.4",
}

err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID)
ringMock := &ring.RingMock{}
ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{
Addr: "1.2.3.4",
},
},
}, nil)

err = converter.convertUser(context.Background(), logger, ringMock, userID)
require.NoError(t, err)

// Verify the failure metric was incremented
Expand Down Expand Up @@ -352,7 +433,16 @@ func TestConverter_ShouldNotFailOnAccessDenyError(t *testing.T) {
Addr: "1.2.3.4",
}

err = converter.convertUser(context.Background(), logger, &RingMock{ReadRing: &ring.Ring{}}, userID)
ringMock := &ring.RingMock{}
ringMock.On("Get", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{
Addr: "1.2.3.4",
},
},
}, nil)

err = converter.convertUser(context.Background(), logger, ringMock, userID)
require.Error(t, err)

// Verify the failure metric was not incremented
Expand All @@ -379,17 +469,3 @@ func (m *mockBucket) Get(ctx context.Context, name string) (io.ReadCloser, error
}
return m.Bucket.Get(ctx, name)
}

type RingMock struct {
ring.ReadRing
}

func (r *RingMock) Get(key uint32, op ring.Operation, bufDescs []ring.InstanceDesc, bufHosts []string, bufZones map[string]int) (ring.ReplicationSet, error) {
return ring.ReplicationSet{
Instances: []ring.InstanceDesc{
{
Addr: "1.2.3.4",
},
},
}, nil
}
11 changes: 11 additions & 0 deletions pkg/storage/parquet/converter_marker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (

"github.com/efficientgo/core/errors"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/oklog/ulid/v2"
"github.com/thanos-io/objstore"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/runutil"

"github.com/cortexproject/cortex/pkg/storage/tsdb"
Expand All @@ -26,6 +28,15 @@ type ConverterMark struct {
Version int `json:"version"`
}

func ExistBlockNoCompact(ctx context.Context, userBkt objstore.InstrumentedBucket, logger log.Logger, blockID ulid.ULID) bool {
noCompactMarkerExists, err := userBkt.Exists(ctx, path.Join(blockID.String(), metadata.NoCompactMarkFilename))
if err != nil {
level.Warn(logger).Log("msg", "unable to get stats of no-compact-mark.json for block", "block", blockID.String())
return false
}
return noCompactMarkerExists
}

func ReadConverterMark(ctx context.Context, id ulid.ULID, userBkt objstore.InstrumentedBucket, logger log.Logger) (*ConverterMark, error) {
markerPath := path.Join(id.String(), ConverterMarkerFileName)
reader, err := userBkt.WithExpectedErrs(tsdb.IsOneOfTheExpectedErrors(userBkt.IsAccessDeniedErr, userBkt.IsObjNotFoundErr)).Get(ctx, markerPath)
Expand Down
24 changes: 22 additions & 2 deletions pkg/storage/parquet/util.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,28 @@
package parquet

func ShouldConvertBlockToParquet(mint, maxt int64, timeRanges []int64) bool {
import (
"github.com/oklog/ulid/v2"
)

type NoCompactMarkCheckFunc = func(bId ulid.ULID) bool

func ShouldConvertBlockToParquet(mint, maxt, noCompactMarkCheckAfter int64, timeRanges []int64, bId ulid.ULID, checkFunc NoCompactMarkCheckFunc) bool {
// We assume timeRanges[0] is the TSDB block duration (2h), and we don't convert them.
return getBlockTimeRange(mint, maxt, timeRanges) > timeRanges[0]
blockTimeRange := getBlockTimeRange(mint, maxt, timeRanges)
if blockTimeRange > timeRanges[0] {
return true
}

// We should check if 2h blocks have a `no-compact-mark.json` file
// since these will never be compacted to 12h block.
// We check if the `no-compact-mark.json` file exists only for blocks
// after the noCompactMarkCheckAfter to reduce object storage calls.
if mint >= noCompactMarkCheckAfter {
if blockTimeRange == timeRanges[0] && checkFunc(bId) {
return true
}
}
return false
}

func getBlockTimeRange(mint, maxt int64, timeRanges []int64) int64 {
Expand Down
Loading
Loading