@@ -3,22 +3,27 @@ package tripperware
3
3
import (
4
4
"context"
5
5
"net/http"
6
- "slices"
7
6
8
7
"github.com/go-kit/log"
9
8
"github.com/go-kit/log/level"
10
- "github.com/prometheus/prometheus/model/labels"
9
+ "github.com/pkg/errors"
10
+ promqlparser "github.com/prometheus/prometheus/promql/parser"
11
11
"github.com/thanos-io/thanos/pkg/querysharding"
12
12
"github.com/thanos-io/thanos/pkg/store/storepb"
13
13
"github.com/weaveworks/common/httpgrpc"
14
14
15
+ "github.com/cortexproject/cortex/pkg/parser"
15
16
querier_stats "github.com/cortexproject/cortex/pkg/querier/stats"
16
17
cquerysharding "github.com/cortexproject/cortex/pkg/querysharding"
17
18
"github.com/cortexproject/cortex/pkg/tenant"
18
19
util_log "github.com/cortexproject/cortex/pkg/util/log"
19
20
"github.com/cortexproject/cortex/pkg/util/validation"
20
21
)
21
22
23
+ var (
24
+ stop = errors .New ("stop" )
25
+ )
26
+
22
27
func ShardByMiddleware (logger log.Logger , limits Limits , merger Merger , queryAnalyzer querysharding.Analyzer ) Middleware {
23
28
return MiddlewareFunc (func (next Handler ) Handler {
24
29
return shardBy {
@@ -127,22 +132,33 @@ func InjectVerticalShardSizeToContext(ctx context.Context, verticalShardSize int
127
132
return context .WithValue (ctx , verticalShardsKey {}, verticalShardSize )
128
133
}
129
134
130
- type disableWithoutNameAnalyzer struct {
135
+ type disableBinaryExpressionAnalyzer struct {
131
136
analyzer querysharding.Analyzer
132
137
}
133
138
134
- func NewDisableWithoutNameAnalyzer (analyzer querysharding.Analyzer ) * disableWithoutNameAnalyzer {
135
- return & disableWithoutNameAnalyzer {analyzer : analyzer }
139
+ func NewDisableBinaryExpressionAnalyzer (analyzer querysharding.Analyzer ) * disableBinaryExpressionAnalyzer {
140
+ return & disableBinaryExpressionAnalyzer {analyzer : analyzer }
136
141
}
137
142
138
- func (d * disableWithoutNameAnalyzer ) Analyze (query string ) (querysharding.QueryAnalysis , error ) {
143
+ func (d * disableBinaryExpressionAnalyzer ) Analyze (query string ) (querysharding.QueryAnalysis , error ) {
139
144
analysis , err := d .analyzer .Analyze (query )
140
- if err != nil || ! analysis .IsShardable () || analysis . ShardBy () {
145
+ if err != nil || ! analysis .IsShardable () {
141
146
return analysis , err
142
147
}
143
148
144
- // We are only interested in not shard by case.
145
- if slices .Contains (analysis .ShardingLabels (), labels .MetricName ) {
149
+ expr , _ := parser .ParseExpr (query )
150
+ isShardable := true
151
+ promqlparser .Inspect (expr , func (node promqlparser.Node , nodes []promqlparser.Node ) error {
152
+ switch n := node .(type ) {
153
+ case * promqlparser.BinaryExpr :
154
+ if ! n .VectorMatching .On && len (n .VectorMatching .MatchingLabels ) == 0 {
155
+ isShardable = false
156
+ return stop
157
+ }
158
+ }
159
+ return nil
160
+ })
161
+ if ! isShardable {
146
162
// Mark as not shardable.
147
163
return querysharding.QueryAnalysis {}, nil
148
164
}
0 commit comments