Skip to content

Commit 3631c26

Browse files
committed
frontend: Implement logical query plan middleware under new distributed execution feature flag
Signed-off-by: rubywtl <[email protected]>
1 parent cc48e77 commit 3631c26

19 files changed

+1045
-18
lines changed

pkg/cortex/modules.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,12 +533,16 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
533533
prometheusCodec,
534534
shardedPrometheusCodec,
535535
t.Cfg.Querier.LookbackDelta,
536+
t.Cfg.Querier.EnablePerStepStats,
537+
t.Cfg.Frontend.DistributedExecEnabled,
538+
t.Cfg.Frontend.DisableDuplicateLabelChecks,
536539
)
537540
if err != nil {
538541
return nil, err
539542
}
540543

541-
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta)
544+
instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta, t.Cfg.Querier.EnablePerStepStats,
545+
t.Cfg.Frontend.DistributedExecEnabled, t.Cfg.Frontend.DisableDuplicateLabelChecks)
542546
if err != nil {
543547
return nil, err
544548
}

pkg/frontend/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ type CombinedFrontendConfig struct {
2020
FrontendV1 v1.Config `yaml:",inline"`
2121
FrontendV2 v2.Config `yaml:",inline"`
2222

23-
DownstreamURL string `yaml:"downstream_url"`
23+
DownstreamURL string `yaml:"downstream_url"`
24+
DistributedExecEnabled bool `yaml:"distributed_exec_enabled"`
25+
DisableDuplicateLabelChecks bool `yaml:"disable_duplicate_label_checks"`
2426
}
2527

2628
func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
@@ -29,6 +31,8 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
2931
cfg.FrontendV2.RegisterFlags(f)
3032

3133
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
34+
f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed_exec_enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
35+
f.BoolVar(&cfg.DisableDuplicateLabelChecks, "frontend.disable_duplicate_label_checks", false, "Experimental: Disables duplicate label checks during logical plan generation")
3236
}
3337

3438
// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package instantquery
2+
3+
import (
4+
"context"
5+
"github.com/cortexproject/cortex/pkg/querier/tripperware"
6+
"github.com/prometheus/prometheus/promql/parser"
7+
"github.com/thanos-io/promql-engine/logicalplan"
8+
"github.com/thanos-io/promql-engine/query"
9+
"github.com/weaveworks/common/httpgrpc"
10+
"net/http"
11+
"time"
12+
)
13+
14+
const (
15+
stepBatch = 10
16+
)
17+
18+
func InstantLogicalPlanGenMiddleware(lookbackDelta time.Duration, enablePerStepStats bool, disableDuplicateLabelChecks bool) tripperware.Middleware {
19+
return tripperware.MiddlewareFunc(func(next tripperware.Handler) tripperware.Handler {
20+
return instantLogicalPlanGen{
21+
lookbackDelta: lookbackDelta,
22+
enabledPerStepStats: enablePerStepStats,
23+
disableDuplicateLabelChecks: disableDuplicateLabelChecks,
24+
next: next,
25+
}
26+
})
27+
}
28+
29+
type instantLogicalPlanGen struct {
30+
next tripperware.Handler
31+
lookbackDelta time.Duration
32+
enabledPerStepStats bool
33+
disableDuplicateLabelChecks bool
34+
}
35+
36+
func (l instantLogicalPlanGen) NewInstantLogicalPlan(qs string, ts time.Time) ([]byte, error) {
37+
38+
qOpts := query.Options{
39+
Start: ts,
40+
End: ts,
41+
Step: 0,
42+
StepsBatch: stepBatch,
43+
LookbackDelta: l.lookbackDelta,
44+
EnablePerStepStats: l.disableDuplicateLabelChecks,
45+
}
46+
47+
expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
48+
if err != nil {
49+
return nil, err
50+
}
51+
52+
planOpts := logicalplan.PlanOptions{
53+
DisableDuplicateLabelCheck: l.disableDuplicateLabelChecks,
54+
}
55+
56+
logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts)
57+
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
58+
59+
// TODO: Add distributed optimizer for remote node insertion
60+
61+
byteLP, err := logicalplan.Marshal(optimizedPlan.Root())
62+
if err != nil {
63+
return nil, err
64+
}
65+
66+
return byteLP, nil
67+
}
68+
69+
func (l instantLogicalPlanGen) Do(ctx context.Context, r tripperware.Request) (tripperware.Response, error) {
70+
promReq, ok := r.(*tripperware.PrometheusRequest)
71+
if !ok {
72+
return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format")
73+
}
74+
75+
instantTime := time.Unix(0, promReq.Start*int64(time.Millisecond))
76+
77+
byteLP, err := l.NewInstantLogicalPlan(promReq.Query, instantTime)
78+
if err != nil {
79+
return nil, err
80+
}
81+
promReq.LogicalPlan = byteLP
82+
83+
return l.next.Do(ctx, r)
84+
}

0 commit comments

Comments
 (0)