Skip to content

Commit 287d2af

Browse files
committed
add feature flag to querier api
1 parent abf8236 commit 287d2af

File tree

4 files changed

+76
-53
lines changed

4 files changed

+76
-53
lines changed

pkg/api/api.go

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ type Config struct {
7777
buildInfoEnabled bool `yaml:"build_info_enabled"`
7878

7979
QuerierDefaultCodec string `yaml:"querier_default_codec"`
80+
81+
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
8082
}
8183

8284
var (
@@ -89,6 +91,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
8991
f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs")
9092
f.BoolVar(&cfg.buildInfoEnabled, "api.build-info-enabled", false, "If enabled, build Info API will be served by query frontend or querier.")
9193
f.StringVar(&cfg.QuerierDefaultCodec, "api.querier-default-codec", "json", "Choose default codec for querier response serialization. Supports 'json' and 'protobuf'.")
94+
f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
9295
cfg.RegisterFlagsWithPrefix("", f)
9396
}
9497

@@ -126,14 +129,15 @@ func compileCORSRegexString(s string) (*regexp.Regexp, error) {
126129
}
127130

128131
type API struct {
129-
AuthMiddleware middleware.Interface
130-
cfg Config
131-
server *server.Server
132-
logger log.Logger
133-
sourceIPs *middleware.SourceIPExtractor
134-
indexPage *IndexPageContent
135-
HTTPHeaderMiddleware *HTTPHeaderMiddleware
136-
corsOrigin *regexp.Regexp
132+
AuthMiddleware middleware.Interface
133+
cfg Config
134+
server *server.Server
135+
logger log.Logger
136+
sourceIPs *middleware.SourceIPExtractor
137+
indexPage *IndexPageContent
138+
HTTPHeaderMiddleware *HTTPHeaderMiddleware
139+
corsOrigin *regexp.Regexp
140+
distributedExecEnabled bool
137141
}
138142

139143
func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logger) (*API, error) {
@@ -156,13 +160,14 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
156160
}
157161

158162
api := &API{
159-
cfg: cfg,
160-
AuthMiddleware: cfg.HTTPAuthMiddleware,
161-
server: s,
162-
logger: logger,
163-
sourceIPs: sourceIPs,
164-
indexPage: newIndexPageContent(),
165-
corsOrigin: corsOrigin,
163+
cfg: cfg,
164+
AuthMiddleware: cfg.HTTPAuthMiddleware,
165+
server: s,
166+
logger: logger,
167+
sourceIPs: sourceIPs,
168+
indexPage: newIndexPageContent(),
169+
corsOrigin: corsOrigin,
170+
distributedExecEnabled: cfg.DistributedExecEnabled,
166171
}
167172

168173
// If no authentication middleware is present in the config, use the default authentication middleware.

pkg/api/handlers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ func NewQuerierHandler(
280280
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
281281
api.Register(legacyPromRouter)
282282

283-
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin)
283+
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, cfg.DistributedExecEnabled)
284284

285285
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
286286
// https://github.com/prometheus/prometheus/pull/7125/files

pkg/api/queryapi/query_api.go

Lines changed: 51 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import (
2626
)
2727

2828
type QueryAPI struct {
29-
queryable storage.SampleAndChunkQueryable
30-
queryEngine engine.BaseEngine
31-
now func() time.Time
32-
statsRenderer v1.StatsRenderer
33-
logger log.Logger
34-
codecs []v1.Codec
35-
CORSOrigin *regexp.Regexp
29+
queryable storage.SampleAndChunkQueryable
30+
queryEngine engine.BaseEngine
31+
now func() time.Time
32+
statsRenderer v1.StatsRenderer
33+
logger log.Logger
34+
codecs []v1.Codec
35+
CORSOrigin *regexp.Regexp
36+
distributedExecEnabled bool
3637
}
3738

3839
func NewQueryAPI(
@@ -42,15 +43,17 @@ func NewQueryAPI(
4243
logger log.Logger,
4344
codecs []v1.Codec,
4445
CORSOrigin *regexp.Regexp,
46+
distributedExecEnabled bool,
4547
) *QueryAPI {
4648
return &QueryAPI{
47-
queryEngine: qe,
48-
queryable: q,
49-
statsRenderer: statsRenderer,
50-
logger: logger,
51-
codecs: codecs,
52-
CORSOrigin: CORSOrigin,
53-
now: time.Now,
49+
queryEngine: qe,
50+
queryable: q,
51+
statsRenderer: statsRenderer,
52+
logger: logger,
53+
codecs: codecs,
54+
CORSOrigin: CORSOrigin,
55+
now: time.Now,
56+
distributedExecEnabled: distributedExecEnabled,
5457
}
5558
}
5659

@@ -104,19 +107,27 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
104107
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
105108

106109
var qry promql.Query
107-
byteLP, err := io.ReadAll(r.Body)
108-
if err != nil {
109-
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
110-
}
111110

112-
if len(byteLP) != 0 {
113-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
111+
if q.distributedExecEnabled {
112+
byteLP, err := io.ReadAll(r.Body)
114113
if err != nil {
115114
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
116115
}
117-
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step), r.FormValue("query"))
118-
if err != nil {
119-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
116+
117+
if len(byteLP) != 0 {
118+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
119+
if err != nil {
120+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
121+
}
122+
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step), r.FormValue("query"))
123+
if err != nil {
124+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
125+
}
126+
} else {
127+
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
128+
if err != nil {
129+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
130+
}
120131
}
121132
} else {
122133
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
@@ -179,19 +190,26 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
179190
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
180191

181192
var qry promql.Query
182-
byteLP, err := io.ReadAll(r.Body)
183-
if err != nil {
184-
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
185-
}
186-
187-
if len(byteLP) != 0 {
188-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
193+
if q.distributedExecEnabled {
194+
byteLP, err := io.ReadAll(r.Body)
189195
if err != nil {
190196
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
191197
}
192-
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, convertMsToTime(ts), r.FormValue("query"))
193-
if err != nil {
194-
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
198+
199+
if len(byteLP) != 0 {
200+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
201+
if err != nil {
202+
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
203+
}
204+
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, convertMsToTime(ts), r.FormValue("query"))
205+
if err != nil {
206+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
207+
}
208+
} else {
209+
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
210+
if err != nil {
211+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
212+
}
195213
}
196214
} else {
197215
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))

pkg/api/queryapi/query_api_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func Test_CustomAPI(t *testing.T) {
182182

183183
for _, test := range tests {
184184
t.Run(test.name, func(t *testing.T) {
185-
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
185+
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
186186

187187
router := mux.NewRouter()
188188
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
@@ -243,7 +243,7 @@ func Test_InvalidCodec(t *testing.T) {
243243
},
244244
}
245245

246-
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"))
246+
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{&mockCodec{}}, regexp.MustCompile(".*"), false)
247247
router := mux.NewRouter()
248248
router.Path("/api/v1/query").Methods("POST").Handler(queryAPI.Wrap(queryAPI.InstantQueryHandler))
249249

@@ -284,7 +284,7 @@ func Test_CustomAPI_StatsRenderer(t *testing.T) {
284284
},
285285
}
286286

287-
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
287+
queryAPI := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), false)
288288

289289
router := mux.NewRouter()
290290
router.Path("/api/v1/query_range").Methods("POST").Handler(queryAPI.Wrap(queryAPI.RangeQueryHandler))
@@ -440,7 +440,7 @@ func Test_Logicalplan_Requests(t *testing.T) {
440440

441441
for _, tt := range tests {
442442
t.Run(tt.name, func(t *testing.T) {
443-
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"))
443+
c := NewQueryAPI(engine, mockQueryable, querier.StatsRenderer, log.NewNopLogger(), []v1.Codec{v1.JSONCodec{}}, regexp.MustCompile(".*"), true)
444444
router := mux.NewRouter()
445445
router.Path("/api/v1/query").Methods("POST").Handler(c.Wrap(c.InstantQueryHandler))
446446
router.Path("/api/v1/query_range").Methods("POST").Handler(c.Wrap(c.RangeQueryHandler))

0 commit comments

Comments
 (0)