Skip to content

Commit ab32284

Browse files
committed
support vertical sharding for parquet queryable
Signed-off-by: yeya24 <[email protected]>
1 parent 186c988 commit ab32284

File tree

3 files changed

+138
-56
lines changed

3 files changed

+138
-56
lines changed

integration/parquet_querier_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ func TestParquetFuzz(t *testing.T) {
6363
"-store-gateway.sharding-enabled": "false",
6464
"--querier.store-gateway-addresses": "nonExistent", // Make sure we do not call Store gateways
6565
// alert manager
66-
"-alertmanager.web.external-url": "http://localhost/alertmanager",
67-
"-frontend.query-vertical-shard-size": "1",
66+
"-alertmanager.web.external-url": "http://localhost/alertmanager",
67+
// Enable vertical sharding.
68+
"-frontend.query-vertical-shard-size": "3",
6869
"-frontend.max-cache-freshness": "1m",
6970
// enable experimental promQL funcs
7071
"-querier.enable-promql-experimental-functions": "true",

pkg/querier/parquet_queryable.go

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,31 +6,32 @@ import (
66
"time"
77

88
"github.com/go-kit/log"
9-
"github.com/go-kit/log/level"
109
lru "github.com/hashicorp/golang-lru/v2"
1110
"github.com/opentracing/opentracing-go"
1211
"github.com/parquet-go/parquet-go"
1312
"github.com/pkg/errors"
1413
"github.com/prometheus-community/parquet-common/queryable"
1514
"github.com/prometheus-community/parquet-common/schema"
15+
"github.com/prometheus-community/parquet-common/search"
1616
parquet_storage "github.com/prometheus-community/parquet-common/storage"
1717
"github.com/prometheus/client_golang/prometheus"
1818
"github.com/prometheus/client_golang/prometheus/promauto"
1919
"github.com/prometheus/prometheus/model/labels"
2020
"github.com/prometheus/prometheus/storage"
2121
"github.com/prometheus/prometheus/tsdb/chunkenc"
2222
"github.com/prometheus/prometheus/util/annotations"
23+
"github.com/thanos-io/thanos/pkg/store/storepb"
2324
"github.com/thanos-io/thanos/pkg/strutil"
2425
"golang.org/x/sync/errgroup"
2526

2627
"github.com/cortexproject/cortex/pkg/cortexpb"
28+
"github.com/cortexproject/cortex/pkg/querysharding"
2729
"github.com/cortexproject/cortex/pkg/storage/bucket"
2830
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
2931
"github.com/cortexproject/cortex/pkg/storage/tsdb/bucketindex"
3032
"github.com/cortexproject/cortex/pkg/tenant"
3133
"github.com/cortexproject/cortex/pkg/util"
3234
"github.com/cortexproject/cortex/pkg/util/limiter"
33-
util_log "github.com/cortexproject/cortex/pkg/util/log"
3435
"github.com/cortexproject/cortex/pkg/util/multierror"
3536
"github.com/cortexproject/cortex/pkg/util/services"
3637
"github.com/cortexproject/cortex/pkg/util/validation"
@@ -153,6 +154,7 @@ func NewParquetQueryable(
153154
userID, _ := tenant.TenantID(ctx)
154155
return int64(limits.ParquetMaxFetchedDataBytes(userID))
155156
}),
157+
queryable.WithMaterializedLabelsFilterCallback(materializedLabelsFilterCallback),
156158
queryable.WithMaterializedSeriesCallback(func(ctx context.Context, cs []storage.ChunkSeries) error {
157159
queryLimiter := limiter.QueryLimiterFromContextWithFallback(ctx)
158160
lbls := make([][]cortexpb.LabelAdapter, 0, len(cs))
@@ -432,17 +434,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
432434
span, ctx := opentracing.StartSpanFromContext(ctx, "parquetQuerierWithFallback.Select")
433435
defer span.Finish()
434436

435-
userID, err := tenant.TenantID(ctx)
437+
newMatchers, shardMatcher, err := querysharding.ExtractShardingMatchers(matchers)
436438
if err != nil {
437439
return storage.ErrSeriesSet(err)
438440
}
439-
440-
if q.limits.QueryVerticalShardSize(userID) > 1 {
441-
uLogger := util_log.WithUserID(userID, q.logger)
442-
level.Warn(uLogger).Log("msg", "parquet queryable enabled but vertical sharding > 1. Falling back to the block storage")
443-
444-
return q.blocksStoreQuerier.Select(ctx, sortSeries, h, matchers...)
445-
}
441+
defer shardMatcher.Close()
446442

447443
hints := storage.SelectHints{
448444
Start: q.minT,
@@ -483,7 +479,11 @@ func (q *parquetQuerierWithFallback) Select(ctx context.Context, sortSeries bool
483479
go func() {
484480
span, _ := opentracing.StartSpanFromContext(ctx, "parquetQuerier.Select")
485481
defer span.Finish()
486-
p <- q.parquetQuerier.Select(InjectBlocksIntoContext(ctx, parquet...), sortSeries, &hints, matchers...)
482+
parquetCtx := InjectBlocksIntoContext(ctx, parquet...)
483+
if shardMatcher != nil {
484+
parquetCtx = injectShardMatcherIntoContext(parquetCtx, shardMatcher)
485+
}
486+
p <- q.parquetQuerier.Select(parquetCtx, sortSeries, &hints, newMatchers...)
487487
}()
488488
}
489489

@@ -570,6 +570,26 @@ func (q *parquetQuerierWithFallback) incrementOpsMetric(method string, remaining
570570
}
571571
}
572572

573+
type shardMatcherLabelsFilter struct {
574+
shardMatcher *storepb.ShardMatcher
575+
}
576+
577+
func (f *shardMatcherLabelsFilter) Filter(lbls labels.Labels) bool {
578+
return f.shardMatcher.MatchesLabels(lbls)
579+
}
580+
581+
func (f *shardMatcherLabelsFilter) Close() {
582+
f.shardMatcher.Close()
583+
}
584+
585+
func materializedLabelsFilterCallback(ctx context.Context, _ *storage.SelectHints) (search.MaterializedLabelsFilter, bool) {
586+
shardMatcher, exists := extractShardMatcherFromContext(ctx)
587+
if !exists || !shardMatcher.IsSharded() {
588+
return nil, false
589+
}
590+
return &shardMatcherLabelsFilter{shardMatcher: shardMatcher}, true
591+
}
592+
573593
type cacheInterface[T any] interface {
574594
Get(path string) T
575595
Set(path string, reader T)
@@ -655,3 +675,19 @@ func (n noopCache[T]) Get(_ string) (r T) {
655675
func (n noopCache[T]) Set(_ string, _ T) {
656676

657677
}
678+
679+
var (
680+
shardMatcherCtxKey contextKey = 1
681+
)
682+
683+
func injectShardMatcherIntoContext(ctx context.Context, sm *storepb.ShardMatcher) context.Context {
684+
return context.WithValue(ctx, shardMatcherCtxKey, sm)
685+
}
686+
687+
func extractShardMatcherFromContext(ctx context.Context) (*storepb.ShardMatcher, bool) {
688+
if sm := ctx.Value(shardMatcherCtxKey); sm != nil {
689+
return sm.(*storepb.ShardMatcher), true
690+
}
691+
692+
return nil, false
693+
}

pkg/querier/parquet_queryable_test.go

Lines changed: 88 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"math/rand"
77
"path/filepath"
8+
"sync"
89
"testing"
910
"time"
1011

@@ -75,49 +76,6 @@ func TestParquetQueryableFallbackLogic(t *testing.T) {
7576
}
7677
ctx := user.InjectOrgID(context.Background(), "user-1")
7778

78-
t.Run("should fallback when vertical sharding is enabled", func(t *testing.T) {
79-
finder := &blocksFinderMock{}
80-
stores := createStore()
81-
82-
q := &blocksStoreQuerier{
83-
minT: minT,
84-
maxT: maxT,
85-
finder: finder,
86-
stores: stores,
87-
consistency: NewBlocksConsistencyChecker(0, 0, log.NewNopLogger(), nil),
88-
logger: log.NewNopLogger(),
89-
metrics: newBlocksStoreQueryableMetrics(prometheus.NewPedanticRegistry()),
90-
limits: &blocksStoreLimitsMock{},
91-
92-
storeGatewayConsistencyCheckMaxAttempts: 3,
93-
}
94-
95-
mParquetQuerier := &mockParquetQuerier{}
96-
pq := &parquetQuerierWithFallback{
97-
minT: minT,
98-
maxT: maxT,
99-
finder: finder,
100-
blocksStoreQuerier: q,
101-
parquetQuerier: mParquetQuerier,
102-
metrics: newParquetQueryableFallbackMetrics(prometheus.NewRegistry()),
103-
limits: defaultOverrides(t, 4),
104-
logger: log.NewNopLogger(),
105-
defaultBlockStoreType: parquetBlockStore,
106-
}
107-
108-
finder.On("GetBlocks", mock.Anything, "user-1", minT, maxT).Return(bucketindex.Blocks{
109-
&bucketindex.Block{ID: block1, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
110-
&bucketindex.Block{ID: block2, Parquet: &parquet.ConverterMarkMeta{Version: 1}},
111-
}, map[ulid.ULID]*bucketindex.BlockDeletionMark(nil), nil)
112-
113-
t.Run("select", func(t *testing.T) {
114-
ss := pq.Select(ctx, true, nil, matchers...)
115-
require.NoError(t, ss.Err())
116-
require.Len(t, stores.queriedBlocks, 2)
117-
require.Len(t, mParquetQuerier.queriedBlocks, 0)
118-
})
119-
})
120-
12179
t.Run("should fallback all blocks", func(t *testing.T) {
12280
finder := &blocksFinderMock{}
12381
stores := createStore()
@@ -671,3 +629,90 @@ func (m *mockParquetQuerier) Reset() {
671629
func (mockParquetQuerier) Close() error {
672630
return nil
673631
}
632+
633+
func TestMaterializedLabelsFilterCallback(t *testing.T) {
634+
tests := []struct {
635+
name string
636+
setupContext func() context.Context
637+
expectedFilterReturned bool
638+
expectedCallbackReturned bool
639+
}{
640+
{
641+
name: "no shard matcher in context",
642+
setupContext: func() context.Context {
643+
return context.Background()
644+
},
645+
expectedFilterReturned: false,
646+
expectedCallbackReturned: false,
647+
},
648+
{
649+
name: "shard matcher exists but is not sharded",
650+
setupContext: func() context.Context {
651+
// Create a ShardInfo with TotalShards = 0 (not sharded)
652+
shardInfo := &storepb.ShardInfo{
653+
ShardIndex: 0,
654+
TotalShards: 0, // Not sharded
655+
By: true,
656+
Labels: []string{"__name__"},
657+
}
658+
659+
buffers := &sync.Pool{New: func() interface{} {
660+
b := make([]byte, 0, 100)
661+
return &b
662+
}}
663+
shardMatcher := shardInfo.Matcher(buffers)
664+
665+
return injectShardMatcherIntoContext(context.Background(), shardMatcher)
666+
},
667+
expectedFilterReturned: false,
668+
expectedCallbackReturned: false,
669+
},
670+
{
671+
name: "shard matcher exists and is sharded",
672+
setupContext: func() context.Context {
673+
// Create a ShardInfo with TotalShards > 0 (sharded)
674+
shardInfo := &storepb.ShardInfo{
675+
ShardIndex: 0,
676+
TotalShards: 2, // Sharded
677+
By: true,
678+
Labels: []string{"__name__"},
679+
}
680+
681+
buffers := &sync.Pool{New: func() interface{} {
682+
b := make([]byte, 0, 100)
683+
return &b
684+
}}
685+
shardMatcher := shardInfo.Matcher(buffers)
686+
687+
return injectShardMatcherIntoContext(context.Background(), shardMatcher)
688+
},
689+
expectedFilterReturned: true,
690+
expectedCallbackReturned: true,
691+
},
692+
}
693+
694+
for _, tt := range tests {
695+
t.Run(tt.name, func(t *testing.T) {
696+
ctx := tt.setupContext()
697+
698+
filter, exists := materializedLabelsFilterCallback(ctx, nil)
699+
700+
require.Equal(t, tt.expectedCallbackReturned, exists)
701+
702+
if tt.expectedFilterReturned {
703+
require.NotNil(t, filter)
704+
705+
// Test that the filter can be used
706+
testLabels := labels.FromStrings("__name__", "test_metric", "label1", "value1")
707+
// We can't easily test the actual filtering logic without knowing the internal
708+
// shard matching implementation, but we can at least verify the filter interface works
709+
_ = filter.Filter(testLabels)
710+
711+
// Cleanup
712+
filter.Close()
713+
} else {
714+
require.Nil(t, filter)
715+
}
716+
})
717+
}
718+
}

0 commit comments

Comments
 (0)