Skip to content

Commit 22df406

Browse files
committed
remove unneccessary configs in query frontend + restructure logical-plan-gen middleware helper functions
Signed-off-by: rubywtl <[email protected]>
1 parent e8d1d0b commit 22df406

13 files changed

+143
-141
lines changed

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4451,6 +4451,11 @@ grpc_client_config:
44514451
# URL of downstream Prometheus.
44524452
# CLI flag: -frontend.downstream-url
44534453
[downstream_url: <string> | default = ""]
4454+
4455+
# Experimental: Enables distributed execution of queries by passing logical
4456+
# query plan fragments to downstream components.
4457+
# CLI flag: -frontend.distributed_exec_enabled
4458+
[distributed_exec_enabled: <boolean> | default = false]
44544459
```
44554460

44564461
### `query_range_config`

pkg/cortex/modules.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -533,16 +533,13 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
533533
prometheusCodec,
534534
shardedPrometheusCodec,
535535
t.Cfg.Querier.LookbackDelta,
536-
t.Cfg.Querier.EnablePerStepStats,
537536
t.Cfg.Frontend.DistributedExecEnabled,
538-
t.Cfg.Frontend.DisableDuplicateLabelChecks,
539537
)
540538
if err != nil {
541539
return nil, err
542540
}
543541

544-
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta, t.Cfg.Querier.EnablePerStepStats,
545-
t.Cfg.Frontend.DistributedExecEnabled, t.Cfg.Frontend.DisableDuplicateLabelChecks)
542+
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta, t.Cfg.Frontend.DistributedExecEnabled)
546543
if err != nil {
547544
return nil, err
548545
}

pkg/frontend/config.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@ type CombinedFrontendConfig struct {
2020
FrontendV1 v1.Config `yaml:",inline"`
2121
FrontendV2 v2.Config `yaml:",inline"`
2222

23-
DownstreamURL string `yaml:"downstream_url"`
24-
DistributedExecEnabled bool `yaml:"distributed_exec_enabled"`
25-
DisableDuplicateLabelChecks bool `yaml:"disable_duplicate_label_checks"`
23+
DownstreamURL string `yaml:"downstream_url"`
24+
DistributedExecEnabled bool `yaml:"distributed_exec_enabled"`
2625
}
2726

2827
func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
@@ -32,7 +31,6 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
3231

3332
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
3433
f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed_exec_enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
35-
f.BoolVar(&cfg.DisableDuplicateLabelChecks, "frontend.disable_duplicate_label_checks", false, "Experimental: Disables duplicate label checks during logical plan generation")
3634
}
3735

3836
// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at

pkg/querier/tripperware/instantquery/instant_query.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ import (
2323
"github.com/cortexproject/cortex/pkg/util"
2424
"github.com/cortexproject/cortex/pkg/util/limiter"
2525
"github.com/cortexproject/cortex/pkg/util/spanlogger"
26+
27+
"github.com/thanos-io/promql-engine/logicalplan"
2628
)
2729

2830
var (
@@ -141,6 +143,19 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response,
141143
return &resp, nil
142144
}
143145

146+
func (c instantQueryCodec) getSerializedBody(promReq *tripperware.PrometheusRequest) ([]byte, error) {
147+
var byteLP []byte
148+
var err error
149+
150+
if promReq.LogicalPlan != nil && *promReq.LogicalPlan != nil {
151+
byteLP, err = logicalplan.Marshal((*promReq.LogicalPlan).Root())
152+
if err != nil {
153+
return nil, err
154+
}
155+
}
156+
return byteLP, nil
157+
}
158+
144159
func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) {
145160
promReq, ok := r.(*tripperware.PrometheusRequest)
146161
if !ok {
@@ -176,11 +191,16 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ
176191
tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression)
177192
}
178193

194+
byteBody, err := c.getSerializedBody(promReq)
195+
if err != nil {
196+
return nil, err
197+
}
198+
179199
req := &http.Request{
180200
Method: "POST",
181201
RequestURI: u.String(), // This is what the httpgrpc code looks at.
182202
URL: u,
183-
Body: io.NopCloser(bytes.NewReader(promReq.LogicalPlan)),
203+
Body: io.NopCloser(bytes.NewReader(byteBody)),
184204
Header: h,
185205
}
186206

pkg/querier/tripperware/instantquery/instant_query_middlewares.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@ func Middlewares(
1515
merger tripperware.Merger,
1616
queryAnalyzer querysharding.Analyzer,
1717
lookbackDelta time.Duration,
18-
enablePerStepStats bool,
1918
distributedExecEnabled bool,
20-
disableDuplicateLabelChecks bool,
2119
) ([]tripperware.Middleware, error) {
2220
m := []tripperware.Middleware{
2321
NewLimitsMiddleware(limits, lookbackDelta),
@@ -26,7 +24,7 @@ func Middlewares(
2624

2725
if distributedExecEnabled {
2826
m = append(m,
29-
tripperware.LogicalPlanGenMiddleware(lookbackDelta, enablePerStepStats, disableDuplicateLabelChecks))
27+
tripperware.LogicalPlanGenMiddleware())
3028
}
3129

3230
return m, nil

pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package instantquery
22

33
import (
44
"context"
5+
"github.com/thanos-io/promql-engine/logicalplan"
56
"io"
67
"net/http"
78
"net/http/httptest"
@@ -60,9 +61,7 @@ func TestRoundTrip(t *testing.T) {
6061
nil,
6162
qa,
6263
5*time.Minute,
63-
false,
6464
distributedExecEnabled,
65-
true,
6665
)
6766
require.NoError(t, err)
6867

@@ -147,8 +146,6 @@ func TestRoundTripWithDistributedExec(t *testing.T) {
147146
nil,
148147
qa,
149148
5*time.Minute,
150-
false,
151-
true,
152149
true,
153150
)
154151
require.NoError(t, err)
@@ -195,9 +192,14 @@ func TestRoundTripWithDistributedExec(t *testing.T) {
195192
req = req.WithContext(ctx)
196193
err = user.InjectOrgIDIntoHTTPRequest(ctx, req)
197194
require.NoError(t, err)
195+
198196
body, err := io.ReadAll(req.Body)
199197
require.NotEmpty(t, body)
200-
require.Equal(t, body, tc.pReq.LogicalPlan)
198+
require.NoError(t, err)
199+
200+
byteLP, err := logicalplan.Marshal((*tc.pReq.LogicalPlan).Root())
201+
require.NoError(t, err)
202+
require.Equal(t, byteLP, body)
201203

202204
resp, err := tw(downstream).RoundTrip(req)
203205
require.NoError(t, err)
@@ -244,8 +246,6 @@ func TestRoundTripWithoutDistributedExec(t *testing.T) {
244246
qa,
245247
5*time.Minute,
246248
false,
247-
false,
248-
true,
249249
)
250250
require.NoError(t, err)
251251

pkg/querier/tripperware/instantquery/logical_plan_gen_instant_test.go

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,7 @@ func TestInstantLogicalPlan(t *testing.T) {
7272
t.Run(strconv.Itoa(i), func(t *testing.T) {
7373
t.Parallel()
7474

75-
lpm := tripperware.LogicalPlanGenMiddleware(
76-
time.Duration(1000),
77-
false,
78-
true,
79-
)
75+
lpm := tripperware.LogicalPlanGenMiddleware()
8076
handler := lpm.Wrap(tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) {
8177
return nil, nil
8278
}))
@@ -85,25 +81,34 @@ func TestInstantLogicalPlan(t *testing.T) {
8581
_, _ = handler.Do(context.Background(), tc.input)
8682
require.NotEmpty(t, tc.input.LogicalPlan, "prom request should not be empty")
8783

88-
// Test Group 2: Ensure the logical plan can be deserialized back
89-
qOpts := query.Options{
90-
Start: time.Unix(tc.input.Start, 0),
91-
End: time.Unix(tc.input.End, 0),
92-
Step: time.Duration(1000),
93-
}
94-
planOpts := logicalplan.PlanOptions{DisableDuplicateLabelCheck: true}
95-
_, err := logicalplan.NewFromBytes(tc.input.LogicalPlan, &qOpts, planOpts)
96-
require.NoError(t, err)
97-
98-
// Test 3: Encode the request and validate method and body
84+
// Test 2: Encode the request and validate method and body
9985
httpReq, err := tripperware.Codec.EncodeRequest(testInstantQueryCodec, context.Background(), tc.input)
10086
require.NoError(t, err)
10187
require.Equal(t, httpReq.Method, http.MethodPost, "Method should be POST")
10288

10389
bodyBytes, err := io.ReadAll(httpReq.Body)
10490
require.NoError(t, err)
10591
require.NotEmpty(t, bodyBytes, "HTTP body should not be empty")
106-
require.Equal(t, bodyBytes, tc.input.LogicalPlan, "logical plan in request body does not match expected bytes")
92+
93+
// Test Group 3: Ensure the logical plan can be deserialized back
94+
qOpts := query.Options{
95+
Start: time.Unix(tc.input.Start, 0),
96+
End: time.Unix(tc.input.Start, 0),
97+
Step: 0,
98+
}
99+
100+
qOpts = query.Options{
101+
Start: time.Unix(tc.input.Start, 0),
102+
End: time.Unix(tc.input.Start, 0),
103+
Step: 0,
104+
StepsBatch: 10,
105+
LookbackDelta: 0,
106+
EnablePerStepStats: false,
107+
}
108+
109+
planOpts := logicalplan.PlanOptions{DisableDuplicateLabelCheck: false}
110+
_, err = logicalplan.NewFromBytes(bodyBytes, &qOpts, planOpts)
111+
require.NoError(t, err)
107112
})
108113
}
109114
}

pkg/querier/tripperware/logicalplan_gen.go

Lines changed: 31 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -14,33 +14,39 @@ const (
1414
stepBatch = 10
1515
)
1616

17-
func LogicalPlanGenMiddleware(lookbackDelta time.Duration, enablePerStepStats bool, disableDuplicateLabelChecks bool) Middleware {
17+
func LogicalPlanGenMiddleware() Middleware {
1818
return MiddlewareFunc(func(next Handler) Handler {
1919
return logicalPlanGen{
20-
lookbackDelta: lookbackDelta,
21-
enabledPerStepStats: enablePerStepStats,
22-
next: next,
23-
disableDuplicateLabelChecks: disableDuplicateLabelChecks,
20+
next: next,
2421
}
2522
})
2623
}
2724

2825
type logicalPlanGen struct {
29-
lookbackDelta time.Duration
30-
enabledPerStepStats bool
31-
next Handler
32-
disableDuplicateLabelChecks bool
26+
next Handler
3327
}
3428

35-
func (l logicalPlanGen) NewInstantLogicalPlan(qs string, ts time.Time) ([]byte, error) {
36-
37-
qOpts := query.Options{
38-
Start: ts,
39-
End: ts,
40-
Step: 0,
41-
StepsBatch: stepBatch,
42-
LookbackDelta: l.lookbackDelta,
43-
EnablePerStepStats: l.disableDuplicateLabelChecks,
29+
func (l logicalPlanGen) NewLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, error) {
30+
31+
qOpts := query.Options{}
32+
if step == 0 {
33+
qOpts = query.Options{
34+
Start: start,
35+
End: start,
36+
Step: 0,
37+
StepsBatch: stepBatch,
38+
LookbackDelta: 0,
39+
EnablePerStepStats: false,
40+
}
41+
} else {
42+
qOpts = query.Options{
43+
Start: start,
44+
End: end,
45+
Step: step,
46+
StepsBatch: stepBatch,
47+
LookbackDelta: 0,
48+
EnablePerStepStats: false,
49+
}
4450
}
4551

4652
expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
@@ -49,52 +55,15 @@ func (l logicalPlanGen) NewInstantLogicalPlan(qs string, ts time.Time) ([]byte,
4955
}
5056

5157
planOpts := logicalplan.PlanOptions{
52-
DisableDuplicateLabelCheck: l.disableDuplicateLabelChecks,
58+
DisableDuplicateLabelCheck: false,
5359
}
5460

5561
logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts)
5662
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
5763

58-
// TODO: Add distributed optimizer for remote node insertion
59-
60-
byteLP, err := logicalplan.Marshal(optimizedPlan.Root())
61-
if err != nil {
62-
return nil, err
63-
}
64+
//TODO: Add distributed query optimizer
6465

65-
return byteLP, nil
66-
}
67-
68-
func (l logicalPlanGen) NewRangeLogicalPlan(qs string, start, end time.Time, interval time.Duration) ([]byte, error) {
69-
70-
qOpts := query.Options{
71-
Start: start,
72-
End: end,
73-
Step: interval,
74-
StepsBatch: stepBatch,
75-
LookbackDelta: l.lookbackDelta,
76-
EnablePerStepStats: l.enabledPerStepStats,
77-
}
78-
79-
expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
80-
if err != nil {
81-
return nil, err
82-
}
83-
84-
planOpts := logicalplan.PlanOptions{
85-
DisableDuplicateLabelCheck: l.disableDuplicateLabelChecks,
86-
}
87-
88-
lPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts)
89-
optimizedPlan, _ := lPlan.Optimize(logicalplan.DefaultOptimizers)
90-
byteLP, err := logicalplan.Marshal(optimizedPlan.Root())
91-
if err != nil {
92-
return nil, err
93-
}
94-
95-
// TODO: Add distributed optimizer for remote node insertion
96-
97-
return byteLP, nil
66+
return &optimizedPlan, nil
9867
}
9968

10069
func (l logicalPlanGen) Do(ctx context.Context, r Request) (Response, error) {
@@ -105,20 +74,16 @@ func (l logicalPlanGen) Do(ctx context.Context, r Request) (Response, error) {
10574

10675
startTime := time.Unix(0, promReq.Start*int64(time.Millisecond))
10776
endTime := time.Unix(0, promReq.End*int64(time.Millisecond))
108-
duration := time.Duration(promReq.Step) * time.Millisecond
77+
step := time.Duration(promReq.Step) * time.Millisecond
10978

110-
var byteLP []byte
11179
var err error
112-
if promReq.Step != 0 {
113-
byteLP, err = l.NewRangeLogicalPlan(promReq.Query, startTime, endTime, duration)
114-
} else {
115-
byteLP, err = l.NewInstantLogicalPlan(promReq.Query, startTime)
116-
}
11780

81+
newLogicalPlan, err := l.NewLogicalPlan(promReq.Query, startTime, endTime, step)
11882
if err != nil {
11983
return nil, err
12084
}
121-
promReq.LogicalPlan = byteLP
85+
86+
promReq.LogicalPlan = newLogicalPlan
12287

12388
return l.next.Do(ctx, r)
12489
}

0 commit comments

Comments
 (0)