Skip to content

Commit f1b6e20

Browse files
authored
Support vertical sharding for parquet queryable (#6879)
1 parent 186c988 commit f1b6e20

File tree

7 files changed

+357
-61
lines changed

7 files changed

+357
-61
lines changed

integration/parquet_querier_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ func TestParquetFuzz(t *testing.T) {
6363
"-store-gateway.sharding-enabled": "false",
6464
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
6565
// alert manager
66-
"-alertmanager.web.external-url": "http://localhost/alertmanager",
67-
"-frontend.query-vertical-shard-size": "1",
66+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
67+
// Enable vertical sharding.
68+
"-frontend.query-vertical-shard-size": "3",
6869
"-frontend.max-cache-freshness": "1m",
6970
// enable experimental promQL funcs
7071
"-querier.enable-promql-experimental-functions": "true",
@@ -130,16 +131,20 @@ func TestParquetFuzz(t *testing.T) {
130131
// Wait until we convert the blocks
131132
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
132133
found := false
134+
foundBucketIndex := false
133135

134136
err := bkt.Iter(context.Background(), "", func(name string) error {
135137
fmt.Println(name)
136138
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
137139
found = true
138140
}
141+
if name == "bucket-index.json.gz" {
142+
foundBucketIndex = true
143+
}
139144
return nil
140145
}, objstore.WithRecursiveIter())
141146
require.NoError(t, err)
142-
return found
147+
return found && foundBucketIndex
143148
})
144149

145150
att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
@@ -178,7 +183,7 @@ func TestParquetFuzz(t *testing.T) {
178183
}
179184
ps := promqlsmith.New(rnd, lbls, opts...)
180185

181-
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 500, false)
186+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
182187

183188
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
184189
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))

integration/query_fuzz_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ func TestVerticalShardingFuzz(t *testing.T) {
799799
}
800800
ps := promqlsmith.New(rnd, lbls, opts...)
801801

802-
runQueryFuzzTestCases(t, ps, c1, c2, now, start, end, scrapeInterval, 1000, false)
802+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
803803
}
804804

805805
func TestProtobufCodecFuzz(t *testing.T) {
@@ -1838,7 +1838,7 @@ func runQueryFuzzTestCases(t *testing.T, ps *promqlsmith.PromQLSmith, c1, c2 *e2
18381838
failures++
18391839
}
18401840
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
1841-
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
1841+
t.Logf("case %d results mismatch.\n%s: %s\nres1 len: %d data: %s\nres2 len: %d data: %s\n", i, qt, tc.query, resultLength(tc.res1), tc.res1.String(), resultLength(tc.res2), tc.res2.String())
18421842
failures++
18431843
}
18441844
}
@@ -1872,3 +1872,17 @@ func isValidQuery(generatedQuery parser.Expr, skipStdAggregations bool) bool {
18721872
}
18731873
return isValid
18741874
}
1875+
1876+
func resultLength(x model.Value) int {
1877+
vx, xvec := x.(model.Vector)
1878+
if xvec {
1879+
return vx.Len()
1880+
}
1881+
1882+
mx, xMatrix := x.(model.Matrix)
1883+
if xMatrix {
1884+
return mx.Len()
1885+
}
1886+
// Other type, return 0
1887+
return 0
1888+
}

pkg/cortex/modules.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
"github.com/cortexproject/cortex/pkg/querier/tripperware/instantquery"
4545
"github.com/cortexproject/cortex/pkg/querier/tripperware/queryrange"
4646
querier_worker "github.com/cortexproject/cortex/pkg/querier/worker"
47+
cortexquerysharding "github.com/cortexproject/cortex/pkg/querysharding"
4748
"github.com/cortexproject/cortex/pkg/ring"
4849
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
4950
"github.com/cortexproject/cortex/pkg/ring/kv/memberlist"
@@ -511,7 +512,13 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) {
511512
// initQueryFrontendTripperware instantiates the tripperware used by the query frontend
512513
// to optimize Prometheus query requests.
513514
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
514-
queryAnalyzer := querysharding.NewQueryAnalyzer()
515+
var queryAnalyzer querysharding.Analyzer
516+
queryAnalyzer = querysharding.NewQueryAnalyzer()
517+
if t.Cfg.Querier.EnableParquetQueryable {
518+
// Disable vertical sharding for binary expression with ignore for parquet queryable.
519+
queryAnalyzer = cortexquerysharding.NewDisableBinaryExpressionAnalyzer(queryAnalyzer)
520+
}
521+
515522
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
516523
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
517524
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)

pkg/querier/parquet_queryable.go

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,32 @@ import (
66
"time"
77

88
"github.com/go-kit/log"
9-
"github.com/go-kit/log/level"
109
lru "github.com/hashicorp/golang-lru/v2"
1110
"github.com/opentracing/opentracing-go"
1211
"github.com/parquet-go/parquet-go"
1312
"github.com/pkg/errors"
1413
"github.com/prometheus-community/parquet-common/queryable"
1514
"github.com/prometheus-community/parquet-common/schema"
15+
"github.com/prometheus-community/parquet-common/search"
1616
parquet_storage "github.com/prometheus-community/parquet-common/storage"
1717
"github.com/prometheus/client_golang/prometheus"
1818
"github.com/prometheus/client_golang/prometheus/promauto"
1919
"github.com/prometheus/prometheus/model/labels"
2020
"github.com/prometheus/prometheus/storage"
2121
"github.com/prometheus/prometheus/tsdb/chunkenc"
2222
"github.com/prometheus/prometheus/util/annotations"
23+
"github.com/thanos-io/thanos/pkg/store/storepb"
2324
"github.com/thanos-io/thanos/pkg/strutil"
2425
"golang.org/x/sync/errgroup"
2526

2627
"github.com/cortexproject/cortex/pkg/cortexpb"
28+
"github.com/cortexproject/cortex/pkg/querysharding"
2729
"github.com/cortexproject/cortex/pkg/storage/bucket"
2830
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2931
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
3032
"github.com/cortexproject/cortex/pkg/tenant"
3133
"github.com/cortexproject/cortex/pkg/util"
3234
"github.com/cortexproject/cortex/pkg/util/limiter"
33-
util_log "github.com/cortexproject/cortex/pkg/util/log"
3435
"github.com/cortexproject/cortex/pkg/util/multierror"
3536
"github.com/cortexproject/cortex/pkg/util/services"
3637
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -153,6 +154,7 @@ func NewParquetQueryable(
153154
userID, _ := tenant.TenantID(ctx)
154155
return int64(limits.ParquetMaxFetchedDataBytes(userID))
155156
}),
157+
queryable.WithMaterializedLabelsFilterCallback(materializedLabelsFilterCallback),
156158
queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error {
157159
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
158160
lbls := make([][]cortexpb.LabelAdapter, 0, len(cs))
@@ -432,17 +434,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
432434
span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.Select")
433435
defer span.Finish()
434436

435-
userID, err := tenant.TenantID(ctx)
437+
newMatchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers)
436438
if err != nil {
437439
return storage.ErrSeriesSet(err)
438440
}
439-
440-
if q.limits.QueryVerticalShardSize(userID) > 1 {
441-
uLogger := util_log.WithUserID(userID, q.logger)
442-
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage")
443-
444-
return q.blocksStoreQuerier.Select(ctx, sortSeries, h, matchers...)
445-
}
441+
defer shardMatcher.Close()
446442

447443
hints := storage.SelectHints{
448444
Start: q.minT,
@@ -483,7 +479,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
483479
go func() {
484480
span, _ := opentracing.StartSpanFromContext(ctx, "parquetQuerier.Select")
485481
defer span.Finish()
486-
p <- q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, &hints, matchers...)
482+
parquetCtx := InjectBlocksIntoContext(ctx, parquet...)
483+
if shardMatcher != nil {
484+
parquetCtx = injectShardMatcherIntoContext(parquetCtx, shardMatcher)
485+
}
486+
p <- q.parquetQuerier.Select(parquetCtx, sortSeries, &hints, newMatchers...)
487487
}()
488488
}
489489

@@ -570,6 +570,26 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining
570570
}
571571
}
572572

573+
type shardMatcherLabelsFilter struct {
574+
shardMatcher *storepb.ShardMatcher
575+
}
576+
577+
func (f *shardMatcherLabelsFilter) Filter(lbls labels.Labels) bool {
578+
return f.shardMatcher.MatchesLabels(lbls)
579+
}
580+
581+
func (f *shardMatcherLabelsFilter) Close() {
582+
f.shardMatcher.Close()
583+
}
584+
585+
func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHints) (search.MaterializedLabelsFilter, bool) {
586+
shardMatcher, exists := extractShardMatcherFromContext(ctx)
587+
if !exists || !shardMatcher.IsSharded() {
588+
return nil, false
589+
}
590+
return &shardMatcherLabelsFilter{shardMatcher: shardMatcher}, true
591+
}
592+
573593
type cacheInterface[T any] interface {
574594
Get(path string) T
575595
Set(path string, reader T)
@@ -655,3 +675,19 @@ func (n noopCache[T]) Get(_ string) (r T) {
655675
func (n noopCache[T]) Set(_ string, _ T) {
656676

657677
}
678+
679+
var (
680+
shardMatcherCtxKey contextKey = 1
681+
)
682+
683+
func injectShardMatcherIntoContext(ctx context.Context, sm *storepb.ShardMatcher) context.Context {
684+
return context.WithValue(ctx, shardMatcherCtxKey, sm)
685+
}
686+
687+
func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, bool) {
688+
if sm := ctx.Value(shardMatcherCtxKey); sm != nil {
689+
return sm.(*storepb.ShardMatcher), true
690+
}
691+
692+
return nil, false
693+
}

pkg/querier/parquet_queryable_test.go

Lines changed: 88 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"math/rand"
77
"path/filepath"
8+
"sync"
89
"testing"
910
"time"
1011

@@ -75,49 +76,6 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
7576
}
7677
ctx := user.InjectOrgID(context.Background(), "user-1")
7778

78-
t.Run("should fallback when vertical sharding is enabled", func(t *testing.T) {
79-
finder := &blocksFinderMock{}
80-
stores := createStore()
81-
82-
q := &blocksStoreQuerier{
83-
minT: minT,
84-
maxT: maxT,
85-
finder: finder,
86-
stores: stores,
87-
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
88-
logger: log.NewNopLogger(),
89-
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
90-
limits: &blocksStoreLimitsMock{},
91-
92-
storeGatewayConsistencyCheckMaxAttempts: 3,
93-
}
94-
95-
mParquetQuerier := &mockParquetQuerier{}
96-
pq := &parquetQuerierWithFallback{
97-
minT: minT,
98-
maxT: maxT,
99-
finder: finder,
100-
blocksStoreQuerier: q,
101-
parquetQuerier: mParquetQuerier,
102-
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
103-
limits: defaultOverrides(t, 4),
104-
logger: log.NewNopLogger(),
105-
defaultBlockStoreType: parquetBlockStore,
106-
}
107-
108-
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
109-
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
110-
&bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
111-
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
112-
113-
t.Run("select", func(t *testing.T) {
114-
ss := pq.Select(ctx, true, nil, matchers...)
115-
require.NoError(t, ss.Err())
116-
require.Len(t, stores.queriedBlocks, 2)
117-
require.Len(t, mParquetQuerier.queriedBlocks, 0)
118-
})
119-
})
120-
12179
t.Run("should fallback all blocks", func(t *testing.T) {
12280
finder := &blocksFinderMock{}
12381
stores := createStore()
@@ -671,3 +629,90 @@ func (m *mockParquetQuerier) Reset() {
671629
func (mockParquetQuerier) Close() error {
672630
return nil
673631
}
632+
633+
func TestMaterializedLabelsFilterCallback(t *testing.T) {
634+
tests := []struct {
635+
name string
636+
setupContext func() context.Context
637+
expectedFilterReturned bool
638+
expectedCallbackReturned bool
639+
}{
640+
{
641+
name: "no shard matcher in context",
642+
setupContext: func() context.Context {
643+
return context.Background()
644+
},
645+
expectedFilterReturned: false,
646+
expectedCallbackReturned: false,
647+
},
648+
{
649+
name: "shard matcher exists but is not sharded",
650+
setupContext: func() context.Context {
651+
// Create a ShardInfo with TotalShards = 0 (not sharded)
652+
shardInfo := &storepb.ShardInfo{
653+
ShardIndex: 0,
654+
TotalShards: 0, // Not sharded
655+
By: true,
656+
Labels: []string{"__name__"},
657+
}
658+
659+
buffers := &sync.Pool{New: func() interface{} {
660+
b := make([]byte, 0, 100)
661+
return &b
662+
}}
663+
shardMatcher := shardInfo.Matcher(buffers)
664+
665+
return injectShardMatcherIntoContext(context.Background(), shardMatcher)
666+
},
667+
expectedFilterReturned: false,
668+
expectedCallbackReturned: false,
669+
},
670+
{
671+
name: "shard matcher exists and is sharded",
672+
setupContext: func() context.Context {
673+
// Create a ShardInfo with TotalShards > 0 (sharded)
674+
shardInfo := &storepb.ShardInfo{
675+
ShardIndex: 0,
676+
TotalShards: 2, // Sharded
677+
By: true,
678+
Labels: []string{"__name__"},
679+
}
680+
681+
buffers := &sync.Pool{New: func() interface{} {
682+
b := make([]byte, 0, 100)
683+
return &b
684+
}}
685+
shardMatcher := shardInfo.Matcher(buffers)
686+
687+
return injectShardMatcherIntoContext(context.Background(), shardMatcher)
688+
},
689+
expectedFilterReturned: true,
690+
expectedCallbackReturned: true,
691+
},
692+
}
693+
694+
for _, tt := range tests {
695+
t.Run(tt.name, func(t *testing.T) {
696+
ctx := tt.setupContext()
697+
698+
filter, exists := materializedLabelsFilterCallback(ctx, nil)
699+
700+
require.Equal(t, tt.expectedCallbackReturned, exists)
701+
702+
if tt.expectedFilterReturned {
703+
require.NotNil(t, filter)
704+
705+
// Test that the filter can be used
706+
testLabels := labels.FromStrings("__name__", "test_metric", "label1", "value1")
707+
// We can't easily test the actual filtering logic without knowing the internal
708+
// shard matching implementation, but we can at least verify the filter interface works
709+
_ = filter.Filter(testLabels)
710+
711+
// Cleanup
712+
filter.Close()
713+
} else {
714+
require.Nil(t, filter)
715+
}
716+
})
717+
}
718+
}

0 commit comments

Comments
 (0)