Skip to content

Commit 0c64f1b

Browse files
committed
support vertical sharding for parquet queryable
Signed-off-by: yeya24 <[email protected]>
1 parent 960b372 commit 0c64f1b

File tree

10 files changed

+365
-67
lines changed

10 files changed

+365
-67
lines changed

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,3 +325,5 @@ replace github.com/thanos-io/objstore => github.com/thanos-io/objstore v0.0.0-20
325325

326326
// v3.3.1 with https://github.com/prometheus/prometheus/pull/16252. (same as thanos)
327327
replace github.com/prometheus/prometheus => github.com/thanos-io/thanos-prometheus v0.0.0-20250610133519-082594458a88
328+
329+
replace github.com/prometheus-community/parquet-common => github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -814,8 +814,6 @@ github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndr
814814
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
815815
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
816816
github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P1IAHhP5U=
817-
github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643 h1:XoOXq+q+CcY8MZqAVoPtdG3R6o84aeZpZFDM+C9DJXg=
818-
github.com/prometheus-community/parquet-common v0.0.0-20250710090957-8fdc99f06643/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is=
819817
github.com/prometheus-community/prom-label-proxy v0.11.1 h1:jX+m+BQCNM0z3/P6V6jVxbiDKgugvk91SaICD6bVhT4=
820818
github.com/prometheus-community/prom-label-proxy v0.11.1/go.mod h1:uTeQW+wZ/VPV1LL3IPfvUE++wR2nPLex+Y4RE38Cpis=
821819
github.com/prometheus/alertmanager v0.28.1 h1:BK5pCoAtaKg01BYRUJhEDV1tqJMEtYBGzPw8QdvnnvA=
@@ -955,6 +953,8 @@ github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8
955953
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
956954
github.com/xyproto/randomstring v1.0.5 h1:YtlWPoRdgMu3NZtP45drfy1GKoojuR7hmRcnhZqKjWU=
957955
github.com/xyproto/randomstring v1.0.5/go.mod h1:rgmS5DeNXLivK7YprL0pY+lTuhNQW3iGxZ18UQApw/E=
956+
github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee h1:bjvKyYMZvukXL7/vqcL0jNxo5JNtSAZlQHAIWeFyzuc=
957+
github.com/yeya24/parquet-common v0.0.0-20250714023615-b9170df584ee/go.mod h1:zJNGzMKctJoOESjRVaNTlPis3C9VcY3cRzNxj6ll3Is=
958958
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 h1:ilQV1hzziu+LLM3zUTJ0trRztfwgjqKnBWNtSRkbmwM=
959959
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78/go.mod h1:aL8wCCfTfSfmXjznFBSZNN13rSJjlIOI1fUNAtF7rmI=
960960
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=

integration/parquet_querier_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ func TestParquetFuzz(t *testing.T) {
6363
"-store-gateway.sharding-enabled": "false",
6464
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
6565
// alert manager
66-
"-alertmanager.web.external-url": "http://localhost/alertmanager",
67-
"-frontend.query-vertical-shard-size": "1",
66+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
67+
// Enable vertical sharding.
68+
"-frontend.query-vertical-shard-size": "3",
6869
"-frontend.max-cache-freshness": "1m",
6970
// enable experimental promQL funcs
7071
"-querier.enable-promql-experimental-functions": "true",

pkg/querier/parquet_queryable.go

Lines changed: 77 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,32 @@ import (
66
"time"
77

88
"github.com/go-kit/log"
9-
"github.com/go-kit/log/level"
109
lru "github.com/hashicorp/golang-lru/v2"
1110
"github.com/opentracing/opentracing-go"
1211
"github.com/parquet-go/parquet-go"
1312
"github.com/pkg/errors"
1413
"github.com/prometheus-community/parquet-common/queryable"
1514
"github.com/prometheus-community/parquet-common/schema"
15+
"github.com/prometheus-community/parquet-common/search"
1616
parquet_storage "github.com/prometheus-community/parquet-common/storage"
1717
"github.com/prometheus/client_golang/prometheus"
1818
"github.com/prometheus/client_golang/prometheus/promauto"
1919
"github.com/prometheus/prometheus/model/labels"
2020
"github.com/prometheus/prometheus/storage"
2121
"github.com/prometheus/prometheus/tsdb/chunkenc"
2222
"github.com/prometheus/prometheus/util/annotations"
23+
"github.com/thanos-io/thanos/pkg/store/storepb"
2324
"github.com/thanos-io/thanos/pkg/strutil"
2425
"golang.org/x/sync/errgroup"
2526

2627
"github.com/cortexproject/cortex/pkg/cortexpb"
28+
"github.com/cortexproject/cortex/pkg/querysharding"
2729
"github.com/cortexproject/cortex/pkg/storage/bucket"
2830
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2931
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
3032
"github.com/cortexproject/cortex/pkg/tenant"
3133
"github.com/cortexproject/cortex/pkg/util"
3234
"github.com/cortexproject/cortex/pkg/util/limiter"
33-
util_log "github.com/cortexproject/cortex/pkg/util/log"
3435
"github.com/cortexproject/cortex/pkg/util/multierror"
3536
"github.com/cortexproject/cortex/pkg/util/services"
3637
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -153,6 +154,7 @@ func NewParquetQueryable(
153154
userID, _ := tenant.TenantID(ctx)
154155
return int64(limits.ParquetMaxFetchedDataBytes(userID))
155156
}),
157+
queryable.WithMaterializedLabelsCallback(materializedLabelsCallback),
156158
queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error {
157159
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
158160
lbls := make([][]cortexpb.LabelAdapter, 0, len(cs))
@@ -432,17 +434,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
432434
span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.Select")
433435
defer span.Finish()
434436

435-
userID, err := tenant.TenantID(ctx)
437+
newMatchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers)
436438
if err != nil {
437439
return storage.ErrSeriesSet(err)
438440
}
439-
440-
if q.limits.QueryVerticalShardSize(userID) > 1 {
441-
uLogger := util_log.WithUserID(userID, q.logger)
442-
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage")
443-
444-
return q.blocksStoreQuerier.Select(ctx, sortSeries, h, matchers...)
445-
}
441+
defer shardMatcher.Close()
446442

447443
hints := storage.SelectHints{
448444
Start: q.minT,
@@ -483,7 +479,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
483479
go func() {
484480
span, _ := opentracing.StartSpanFromContext(ctx, "parquetQuerier.Select")
485481
defer span.Finish()
486-
p <- q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, &hints, matchers...)
482+
parquetCtx := InjectBlocksIntoContext(ctx, parquet...)
483+
if shardMatcher != nil {
484+
parquetCtx = injectShardMatcherIntoContext(parquetCtx, shardMatcher)
485+
}
486+
p <- q.parquetQuerier.Select(parquetCtx, sortSeries, &hints, newMatchers...)
487487
}()
488488
}
489489

@@ -570,6 +570,56 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining
570570
}
571571
}
572572

573+
func materializedLabelsCallback(ctx context.Context, _ *storage.SelectHints, seriesLabels [][]labels.Label, rr []search.RowRange) ([][]labels.Label, []search.RowRange) {
574+
shardMatcher, exists := extractShardMatcherFromContext(ctx)
575+
if !exists || !shardMatcher.IsSharded() {
576+
return seriesLabels, rr
577+
}
578+
579+
var filteredLabels [][]labels.Label
580+
var filteredRowRanges []search.RowRange
581+
582+
// Track which individual rows match the shard matcher
583+
rowMatches := make([]bool, len(seriesLabels))
584+
for i, lbls := range seriesLabels {
585+
rowMatches[i] = shardMatcher.MatchesLabels(labels.New(lbls...))
586+
if rowMatches[i] {
587+
filteredLabels = append(filteredLabels, lbls)
588+
}
589+
}
590+
591+
// Convert matching rows back into row ranges
592+
currentRange := search.RowRange{}
593+
inRange := false
594+
595+
for i, matches := range rowMatches {
596+
if matches {
597+
if !inRange {
598+
// Start a new range
599+
currentRange.From = int64(i)
600+
currentRange.Count = 1
601+
inRange = true
602+
} else {
603+
// Extend the current range
604+
currentRange.Count++
605+
}
606+
} else {
607+
if inRange {
608+
// End the current range
609+
filteredRowRanges = append(filteredRowRanges, currentRange)
610+
inRange = false
611+
}
612+
}
613+
}
614+
615+
// Don't forget to add the last range if we're still in one
616+
if inRange {
617+
filteredRowRanges = append(filteredRowRanges, currentRange)
618+
}
619+
620+
return filteredLabels, filteredRowRanges
621+
}
622+
573623
type cacheInterface[T any] interface {
574624
Get(path string) T
575625
Set(path string, reader T)
@@ -655,3 +705,19 @@ func (n noopCache[T]) Get(_ string) (r T) {
655705
func (n noopCache[T]) Set(_ string, _ T) {
656706

657707
}
708+
709+
var (
710+
shardMatcherCtxKey contextKey = 1
711+
)
712+
713+
func injectShardMatcherIntoContext(ctx context.Context, sm *storepb.ShardMatcher) context.Context {
714+
return context.WithValue(ctx, shardMatcherCtxKey, sm)
715+
}
716+
717+
func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, bool) {
718+
if sm := ctx.Value(shardMatcherCtxKey); sm != nil {
719+
return sm.(*storepb.ShardMatcher), true
720+
}
721+
722+
return nil, false
723+
}

0 commit comments

Comments
 (0)