Skip to content

Commit efb09e0

Browse files
committed
disable vertical sharding for query containing without metric name
Signed-off-by: yeya24 <[email protected]>
1 parent 4fe5612 commit efb09e0

File tree

4 files changed

+47
-5
lines changed

4 files changed

+47
-5
lines changed

integration/parquet_querier_test.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,16 +131,20 @@ func TestParquetFuzz(t *testing.T) {
131131
// Wait until we convert the blocks
132132
cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} {
133133
found := false
134+
foundBucketIndex := false
134135

135136
err := bkt.Iter(context.Background(), "", func(name string) error {
136137
fmt.Println(name)
137138
if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) {
138139
found = true
139140
}
141+
if name == "bucket-index.json.gz" {
142+
foundBucketIndex = true
143+
}
140144
return nil
141145
}, objstore.WithRecursiveIter())
142146
require.NoError(t, err)
143-
return found
147+
return found && foundBucketIndex
144148
})
145149

146150
att, err := bkt.Attributes(context.Background(), "bucket-index.json.gz")
@@ -179,7 +183,7 @@ func TestParquetFuzz(t *testing.T) {
179183
}
180184
ps := promqlsmith.New(rnd, lbls, opts...)
181185

182-
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 500, false)
186+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
183187

184188
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Greater(0), []string{"cortex_parquet_queryable_blocks_queried_total"}, e2e.WithLabelMatchers(
185189
labels.MustNewMatcher(labels.MatchEqual, "type", "parquet"))))

integration/query_fuzz_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ func TestVerticalShardingFuzz(t *testing.T) {
799799
}
800800
ps := promqlsmith.New(rnd, lbls, opts...)
801801

802-
runQueryFuzzTestCases(t, ps, c1, c2, now, start, end, scrapeInterval, 1000, false)
802+
runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000, false)
803803
}
804804

805805
func TestProtobufCodecFuzz(t *testing.T) {
@@ -1838,7 +1838,7 @@ func runQueryFuzzTestCases(t *testing.T, ps *promqlsmith.PromQLSmith, c1, c2 *e2
18381838
failures++
18391839
}
18401840
} else if !cmp.Equal(tc.res1, tc.res2, comparer) {
1841-
t.Logf("case %d results mismatch.\n%s: %s\nres1: %s\nres2: %s\n", i, qt, tc.query, tc.res1.String(), tc.res2.String())
1841+
t.Logf("case %d results mismatch.\n%s: %s\nres1 len: %d data: %s\nres2 len: %d data: %s\n", i, qt, tc.query, resultLength(tc.res1), tc.res1.String(), resultLength(tc.res2), tc.res2.String())
18421842
failures++
18431843
}
18441844
}
@@ -1872,3 +1872,17 @@ func isValidQuery(generatedQuery parser.Expr, skipStdAggregations bool) bool {
18721872
}
18731873
return isValid
18741874
}
1875+
1876+
func resultLength(x model.Value) int {
1877+
vx, xvec := x.(model.Vector)
1878+
if xvec {
1879+
return vx.Len()
1880+
}
1881+
1882+
mx, xMatrix := x.(model.Matrix)
1883+
if xMatrix {
1884+
return mx.Len()
1885+
}
1886+
// Other type, return 0
1887+
return 0
1888+
}

pkg/cortex/modules.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,7 @@ func (t *Cortex) initFlusher() (serv services.Service, err error) {
511511
// initQueryFrontendTripperware instantiates the tripperware used by the query frontend
512512
// to optimize Prometheus query requests.
513513
func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err error) {
514-
queryAnalyzer := querysharding.NewQueryAnalyzer()
514+
queryAnalyzer := tripperware.NewDisableWithoutNameAnalyzer(querysharding.NewQueryAnalyzer())
515515
// PrometheusCodec is a codec to encode and decode Prometheus query range requests and responses.
516516
prometheusCodec := queryrange.NewPrometheusCodec(false, t.Cfg.Querier.ResponseCompression, t.Cfg.API.QuerierDefaultCodec)
517517
// ShardedPrometheusCodec is same as PrometheusCodec but to be used on the sharded queries (it sum up the stats)

pkg/querier/tripperware/shard_by.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package tripperware
22

33
import (
44
"context"
5+
"github.com/prometheus/prometheus/model/labels"
56
"net/http"
7+
"slices"
68

79
"github.com/go-kit/log"
810
"github.com/go-kit/log/level"
@@ -124,3 +126,25 @@ func VerticalShardSizeFromContext(ctx context.Context) (int, bool) {
124126
func InjectVerticalShardSizeToContext(ctx context.Context, verticalShardSize int) context.Context {
125127
return context.WithValue(ctx, verticalShardsKey{}, verticalShardSize)
126128
}
129+
130+
type disableWithoutNameAnalyzer struct {
131+
analyzer querysharding.Analyzer
132+
}
133+
134+
func NewDisableWithoutNameAnalyzer(analyzer querysharding.Analyzer) *disableWithoutNameAnalyzer {
135+
return &disableWithoutNameAnalyzer{analyzer: analyzer}
136+
}
137+
138+
func (d *disableWithoutNameAnalyzer) Analyze(query string) (querysharding.QueryAnalysis, error) {
139+
analysis, err := d.analyzer.Analyze(query)
140+
if err != nil || !analysis.IsShardable() || analysis.ShardBy() {
141+
return analysis, err
142+
}
143+
144+
// We are only interested in not shard by case.
145+
if slices.Contains(analysis.ShardingLabels(), labels.MetricName) {
146+
// Mark as not shardable.
147+
return querysharding.QueryAnalysis{}, nil
148+
}
149+
return analysis, nil
150+
}

0 commit comments

Comments
 (0)