diff --git a/integration/parquet_querier_test.go b/integration/parquet_querier_test.go index ca31a019c9a..570b4c0c45a 100644 --- a/integration/parquet_querier_test.go +++ b/integration/parquet_querier_test.go @@ -63,8 +63,9 @@ func TestParquetFuzz(t *testing.T) { "-store-gateway.sharding-enabled": "false", "--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways // alert manager - "-alertmanager.web.external-url": "http://localhost/alertmanager", - "-frontend.query-vertical-shard-size": "1", + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // Enable vertical sharding. + "-frontend.query-vertical-shard-size": "3", "-frontend.max-cache-freshness": "1m", // enable experimental promQL funcs "-querier.enable-promql-experimental-functions": "true", @@ -130,16 +131,20 @@ func TestParquetFuzz(t *testing.T) { // Wait until we convert the blocks cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { found := false + foundBucketIndex := false err := bkt.Iter(context.Background(), "", func(name string) error { fmt.Println(name) if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { found = true } + if name == "bucket-index.json.gz" { + foundBucketIndex = true + } return nil }, objstore.WithRecursiveIter()) require.NoError(t, err) - return found + return found && foundBucketIndex }) att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz") @@ -178,7 +183,7 @@ func TestParquetFuzz(t *testing.T) { } ps := promqlsmith.New(rnd, lbls, opts...) - runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 500, false) + runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false) require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers( labels.MustNewMatcher(labels.MatchEqual, "type", "parquet")))) diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index d4c501737e3..cc8d272fd2f 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -799,7 +799,7 @@ func TestVerticalShardingFuzz(t *testing.T) { } ps := promqlsmith.New(rnd, lbls, opts...) - runQueryFuzzTestCases(t, ps, c1, c2, now, start, end, scrapeInterval, 1000, false) + runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false) } func TestProtobufCodecFuzz(t *testing.T) { @@ -1838,7 +1838,7 @@ func runQueryFuzzTestCases(t *testing.T, ps *promqlsmith.PromQLSmith, c1, c2 *e2 failures++ } } else if !cmp.Equal(tc.res1, tc.res2, comparer) { - t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String()) + t.Logf("case %d results mismatch.\n%s: %s\nres1 len: %d data: %s\nres2 len: %d data: %s\n", i, qt, tc.query, resultLength(tc.res1), tc.res1.String(), resultLength(tc.res2), tc.res2.String()) failures++ } } @@ -1872,3 +1872,17 @@ func isValidQuery(generatedQuery parser.Expr, skipStdAggregations bool) bool { } return isValid } + +func resultLength(x model.Value) int { + vx, xvec := x.(model.Vector) + if xvec { + return vx.Len() + } + + mx, xMatrix := x.(model.Matrix) + if xMatrix { + return mx.Len() + } + // Other type, return 0 + return 0 +} diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index e9a51f2c3c6..967f7aba1e3 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -44,6 +44,7 @@ import ( "github.com/cortexproject/cortex/pkg/querier/tripperware/instantquery" "github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange" querier_worker "github.com/cortexproject/cortex/pkg/querier/worker" + cortexquerysharding "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv/codec" "github.com/cortexproject/cortex/pkg/ring/kv/memberlist" @@ -511,7 +512,13 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) { // initQueryFrontendTripperware instantiates the tripperware used by the query frontend // to optimize Prometheus query requests. func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) { - queryAnalyzer := querysharding.NewQueryAnalyzer() + var queryAnalyzer querysharding.Analyzer + queryAnalyzer = querysharding.NewQueryAnalyzer() + if t.Cfg.Querier.EnableParquetQueryable { + // Disable vertical sharding for binary expression with ignore for parquet queryable. + queryAnalyzer = cortexquerysharding.NewDisableBinaryExpressionAnalyzer(queryAnalyzer) + } + // PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses. prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec) // ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats) diff --git a/pkg/querier/parquet_queryable.go b/pkg/querier/parquet_queryable.go index 8d7fe7152ed..520438c5414 100644 --- a/pkg/querier/parquet_queryable.go +++ b/pkg/querier/parquet_queryable.go @@ -6,13 +6,13 @@ import ( "time" "github.com/go-kit/log" - "github.com/go-kit/log/level" lru "github.com/hashicorp/golang-lru/v2" "github.com/opentracing/opentracing-go" "github.com/parquet-go/parquet-go" "github.com/pkg/errors" "github.com/prometheus-community/parquet-common/queryable" "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" parquet_storage "github.com/prometheus-community/parquet-common/storage" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -20,17 +20,18 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/strutil" "golang.org/x/sync/errgroup" "github.com/cortexproject/cortex/pkg/cortexpb" + "github.com/cortexproject/cortex/pkg/querysharding" "github.com/cortexproject/cortex/pkg/storage/bucket" cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" "github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex" "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" - util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/multierror" "github.com/cortexproject/cortex/pkg/util/services" "github.com/cortexproject/cortex/pkg/util/validation" @@ -153,6 +154,7 @@ func NewParquetQueryable( userID, _ := tenant.TenantID(ctx) return int64(limits.ParquetMaxFetchedDataBytes(userID)) }), + queryable.WithMaterializedLabelsFilterCallback(materializedLabelsFilterCallback), queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error { queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx) lbls := make([][]cortexpb.LabelAdapter, 0, len(cs)) @@ -432,17 +434,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.Select") defer span.Finish() - userID, err := tenant.TenantID(ctx) + newMatchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers) if err != nil { return storage.ErrSeriesSet(err) } - - if q.limits.QueryVerticalShardSize(userID) > 1 { - uLogger := util_log.WithUserID(userID, q.logger) - level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage") - - return q.blocksStoreQuerier.Select(ctx, sortSeries, h, matchers...) - } + defer shardMatcher.Close() hints := storage.SelectHints{ Start: q.minT, @@ -483,7 +479,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool go func() { span, _ := opentracing.StartSpanFromContext(ctx, "parquetQuerier.Select") defer span.Finish() - p <- q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, &hints, matchers...) + parquetCtx := InjectBlocksIntoContext(ctx, parquet...) + if shardMatcher != nil { + parquetCtx = injectShardMatcherIntoContext(parquetCtx, shardMatcher) + } + p <- q.parquetQuerier.Select(parquetCtx, sortSeries, &hints, newMatchers...) }() } @@ -570,6 +570,26 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining } } +type shardMatcherLabelsFilter struct { + shardMatcher *storepb.ShardMatcher +} + +func (f *shardMatcherLabelsFilter) Filter(lbls labels.Labels) bool { + return f.shardMatcher.MatchesLabels(lbls) +} + +func (f *shardMatcherLabelsFilter) Close() { + f.shardMatcher.Close() +} + +func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHints) (search.MaterializedLabelsFilter, bool) { + shardMatcher, exists := extractShardMatcherFromContext(ctx) + if !exists || !shardMatcher.IsSharded() { + return nil, false + } + return &shardMatcherLabelsFilter{shardMatcher: shardMatcher}, true +} + type cacheInterface[T any] interface { Get(path string) T Set(path string, reader T) @@ -655,3 +675,19 @@ func (n noopCache[T]) Get(_ string) (r T) { func (n noopCache[T]) Set(_ string, _ T) { } + +var ( + shardMatcherCtxKey contextKey = 1 +) + +func injectShardMatcherIntoContext(ctx context.Context, sm *storepb.ShardMatcher) context.Context { + return context.WithValue(ctx, shardMatcherCtxKey, sm) +} + +func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, bool) { + if sm := ctx.Value(shardMatcherCtxKey); sm != nil { + return sm.(*storepb.ShardMatcher), true + } + + return nil, false +} diff --git a/pkg/querier/parquet_queryable_test.go b/pkg/querier/parquet_queryable_test.go index 13cdde6cd57..73f7c50af21 100644 --- a/pkg/querier/parquet_queryable_test.go +++ b/pkg/querier/parquet_queryable_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "path/filepath" + "sync" "testing" "time" @@ -75,49 +76,6 @@ func TestParquetQueryableFallbackLogic(t *testing.T) { } ctx := user.InjectOrgID(context.Background(), "user-1") - t.Run("should fallback when vertical sharding is enabled", func(t *testing.T) { - finder := &blocksFinderMock{} - stores := createStore() - - q := &blocksStoreQuerier{ - minT: minT, - maxT: maxT, - finder: finder, - stores: stores, - consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil), - logger: log.NewNopLogger(), - metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()), - limits: &blocksStoreLimitsMock{}, - - storeGatewayConsistencyCheckMaxAttempts: 3, - } - - mParquetQuerier := &mockParquetQuerier{} - pq := &parquetQuerierWithFallback{ - minT: minT, - maxT: maxT, - finder: finder, - blocksStoreQuerier: q, - parquetQuerier: mParquetQuerier, - metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()), - limits: defaultOverrides(t, 4), - logger: log.NewNopLogger(), - defaultBlockStoreType: parquetBlockStore, - } - - finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{ - &bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - &bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}}, - }, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil) - - t.Run("select", func(t *testing.T) { - ss := pq.Select(ctx, true, nil, matchers...) - require.NoError(t, ss.Err()) - require.Len(t, stores.queriedBlocks, 2) - require.Len(t, mParquetQuerier.queriedBlocks, 0) - }) - }) - t.Run("should fallback all blocks", func(t *testing.T) { finder := &blocksFinderMock{} stores := createStore() @@ -671,3 +629,90 @@ func (m *mockParquetQuerier) Reset() { func (mockParquetQuerier) Close() error { return nil } + +func TestMaterializedLabelsFilterCallback(t *testing.T) { + tests := []struct { + name string + setupContext func() context.Context + expectedFilterReturned bool + expectedCallbackReturned bool + }{ + { + name: "no shard matcher in context", + setupContext: func() context.Context { + return context.Background() + }, + expectedFilterReturned: false, + expectedCallbackReturned: false, + }, + { + name: "shard matcher exists but is not sharded", + setupContext: func() context.Context { + // Create a ShardInfo with TotalShards = 0 (not sharded) + shardInfo := &storepb.ShardInfo{ + ShardIndex: 0, + TotalShards: 0, // Not sharded + By: true, + Labels: []string{"__name__"}, + } + + buffers := &sync.Pool{New: func() interface{} { + b := make([]byte, 0, 100) + return &b + }} + shardMatcher := shardInfo.Matcher(buffers) + + return injectShardMatcherIntoContext(context.Background(), shardMatcher) + }, + expectedFilterReturned: false, + expectedCallbackReturned: false, + }, + { + name: "shard matcher exists and is sharded", + setupContext: func() context.Context { + // Create a ShardInfo with TotalShards > 0 (sharded) + shardInfo := &storepb.ShardInfo{ + ShardIndex: 0, + TotalShards: 2, // Sharded + By: true, + Labels: []string{"__name__"}, + } + + buffers := &sync.Pool{New: func() interface{} { + b := make([]byte, 0, 100) + return &b + }} + shardMatcher := shardInfo.Matcher(buffers) + + return injectShardMatcherIntoContext(context.Background(), shardMatcher) + }, + expectedFilterReturned: true, + expectedCallbackReturned: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := tt.setupContext() + + filter, exists := materializedLabelsFilterCallback(ctx, nil) + + require.Equal(t, tt.expectedCallbackReturned, exists) + + if tt.expectedFilterReturned { + require.NotNil(t, filter) + + // Test that the filter can be used + testLabels := labels.FromStrings("__name__", "test_metric", "label1", "value1") + // We can't easily test the actual filtering logic without knowing the internal + // shard matching implementation, but we can at least verify the filter interface works + _ = filter.Filter(testLabels) + + // Cleanup + filter.Close() + } else { + require.Nil(t, filter) + } + }) + } +} diff --git a/pkg/querysharding/util.go b/pkg/querysharding/util.go index 2b438ce275e..eafc3a71b4f 100644 --- a/pkg/querysharding/util.go +++ b/pkg/querysharding/util.go @@ -4,8 +4,10 @@ import ( "encoding/base64" "sync" + "github.com/pkg/errors" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/thanos/pkg/querysharding" "github.com/thanos-io/thanos/pkg/store/storepb" cortexparser "github.com/cortexproject/cortex/pkg/parser" @@ -20,6 +22,8 @@ var ( b := make([]byte, 0, 100) return &b }} + + stop = errors.New("stop") ) func InjectShardingInfo(query string, shardInfo *storepb.ShardInfo) (string, error) { @@ -77,3 +81,43 @@ func ExtractShardingMatchers(matchers []*labels.Matcher) ([]*labels.Matcher, *st return r, shardInfo.Matcher(&buffers), nil } + +type disableBinaryExpressionAnalyzer struct { + analyzer querysharding.Analyzer +} + +// NewDisableBinaryExpressionAnalyzer is a wrapper around the analyzer that disables binary expressions. +func NewDisableBinaryExpressionAnalyzer(analyzer querysharding.Analyzer) *disableBinaryExpressionAnalyzer { + return &disableBinaryExpressionAnalyzer{analyzer: analyzer} +} + +func (d *disableBinaryExpressionAnalyzer) Analyze(query string) (querysharding.QueryAnalysis, error) { + analysis, err := d.analyzer.Analyze(query) + if err != nil || !analysis.IsShardable() { + return analysis, err + } + + expr, _ := cortexparser.ParseExpr(query) + isShardable := true + parser.Inspect(expr, func(node parser.Node, nodes []parser.Node) error { + switch n := node.(type) { + case *parser.BinaryExpr: + // No vector matching means one operand is not vector. Skip it. + if n.VectorMatching == nil { + return nil + } + // Vector matching ignore will add MetricNameLabel as sharding label. + // Mark this type of query not shardable. + if !n.VectorMatching.On { + isShardable = false + return stop + } + } + return nil + }) + if !isShardable { + // Mark as not shardable. + return querysharding.QueryAnalysis{}, nil + } + return analysis, nil +} diff --git a/pkg/querysharding/util_test.go b/pkg/querysharding/util_test.go new file mode 100644 index 00000000000..cba23190723 --- /dev/null +++ b/pkg/querysharding/util_test.go @@ -0,0 +1,145 @@ +package querysharding + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/querysharding" +) + +func TestDisableBinaryExpressionAnalyzer_Analyze(t *testing.T) { + tests := []struct { + name string + query string + expectShardable bool + expectError bool + description string + }{ + { + name: "binary expression with vector matching on", + query: `up{job="prometheus"} + on(instance) rate(cpu_usage[5m])`, + expectShardable: true, + expectError: false, + description: "Binary expression with 'on' matching should remain shardable", + }, + { + name: "binary expression without explicit vector matching", + query: `up{job="prometheus"} + rate(cpu_usage[5m])`, + expectShardable: false, + expectError: false, + description: "No explicit vector matching means without. Not shardable.", + }, + { + name: "binary expression with vector matching ignoring", + query: `up{job="prometheus"} + ignoring(instance) rate(cpu_usage[5m])`, + expectShardable: false, + expectError: false, + description: "Binary expression with 'ignoring' matching should not be shardable", + }, + { + name: "complex expression with binary expr using on", + query: `sum(rate(http_requests_total[5m])) by (job) + on(job) avg(cpu_usage) by (job)`, + expectShardable: true, + expectError: false, + description: "Complex expression with 'on' matching should remain shardable", + }, + { + name: "complex expression with binary expr using ignoring", + query: `sum(rate(http_requests_total[5m])) by (job) + ignoring(instance) avg(cpu_usage) by (job)`, + expectShardable: false, + expectError: false, + description: "Complex expression with 'ignoring' matching should not be shardable", + }, + { + name: "nested binary expressions with one ignoring", + query: `(up + on(job) rate(cpu[5m])) * ignoring(instance) memory_usage`, + expectShardable: false, + expectError: false, + description: "Nested expressions with any 'ignoring' should not be shardable", + }, + { + name: "aggregation", + query: `sum(rate(http_requests_total[5m])) by (job)`, + expectShardable: true, + expectError: false, + description: "Aggregations should remain shardable", + }, + { + name: "aggregation with binary expression and scalar", + query: `sum(rate(http_requests_total[5m])) by (job) * 100`, + expectShardable: true, + expectError: false, + description: "Aggregations should remain shardable", + }, + { + name: "invalid query", + query: "invalid{query", + expectShardable: false, + expectError: true, + description: "Invalid queries should return error", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create the actual thanos analyzer + thanosAnalyzer := querysharding.NewQueryAnalyzer() + + // Wrap it with our disable binary expression analyzer + analyzer := NewDisableBinaryExpressionAnalyzer(thanosAnalyzer) + + // Test the wrapped analyzer + result, err := analyzer.Analyze(tt.query) + + if tt.expectError { + require.Error(t, err, tt.description) + return + } + + require.NoError(t, err, tt.description) + assert.Equal(t, tt.expectShardable, result.IsShardable(), tt.description) + }) + } +} + +func TestDisableBinaryExpressionAnalyzer_ComparedToOriginal(t *testing.T) { + // Test cases that verify the wrapper correctly modifies behavior + testCases := []struct { + name string + query string + }{ + { + name: "ignoring expression should be disabled", + query: `up + ignoring(instance) rate(cpu[5m])`, + }, + { + name: "nested ignoring expression should be disabled", + query: `(sum(rate(http_requests_total[5m])) by (job)) + ignoring(instance) avg(cpu_usage) by (job)`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Test with original analyzer + originalAnalyzer := querysharding.NewQueryAnalyzer() + originalResult, err := originalAnalyzer.Analyze(tc.query) + require.NoError(t, err) + + // Test with wrapped analyzer + wrappedAnalyzer := NewDisableBinaryExpressionAnalyzer(originalAnalyzer) + wrappedResult, err := wrappedAnalyzer.Analyze(tc.query) + require.NoError(t, err) + + // The wrapped analyzer should make previously shardable queries non-shardable + // if they contain binary expressions with ignoring + if originalResult.IsShardable() { + assert.False(t, wrappedResult.IsShardable(), + "Wrapped analyzer should disable sharding for queries with ignoring vector matching") + } else { + // If original wasn't shardable, wrapped shouldn't be either + assert.False(t, wrappedResult.IsShardable()) + } + }) + } +}