Skip to content

Commit 422b1f2

Browse files
committed
move distributed exec feature flag to querier & refactor request handler code
Signed-off-by: rubywtl <[email protected]>
1 parent 4c56a2b commit 422b1f2

File tree

9 files changed

+88
-107
lines changed

9 files changed

+88
-107
lines changed

pkg/api/api.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ type Config struct {
7777
buildInfoEnabled bool `yaml:"build_info_enabled"`
7878

7979
QuerierDefaultCodec string `yaml:"querier_default_codec"`
80-
81-
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
8280
}
8381

8482
var (
@@ -91,7 +89,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
9189
f.Var(&cfg.HTTPRequestHeadersToLog, "api.http-request-headers-to-log", "Which HTTP Request headers to add to logs")
9290
f.BoolVar(&cfg.buildInfoEnabled, "api.build-info-enabled", false, "If enabled, build Info API will be served by query frontend or querier.")
9391
f.StringVar(&cfg.QuerierDefaultCodec, "api.querier-default-codec", "json", "Choose default codec for querier response serialization. Supports 'json' and 'protobuf'.")
94-
f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
9592
cfg.RegisterFlagsWithPrefix("", f)
9693
}
9794

@@ -129,15 +126,14 @@ func compileCORSRegexString(s string) (*regexp.Regexp, error) {
129126
}
130127

131128
type API struct {
132-
AuthMiddleware middleware.Interface
133-
cfg Config
134-
server *server.Server
135-
logger log.Logger
136-
sourceIPs *middleware.SourceIPExtractor
137-
indexPage *IndexPageContent
138-
HTTPHeaderMiddleware *HTTPHeaderMiddleware
139-
corsOrigin *regexp.Regexp
140-
distributedExecEnabled bool
129+
AuthMiddleware middleware.Interface
130+
cfg Config
131+
server *server.Server
132+
logger log.Logger
133+
sourceIPs *middleware.SourceIPExtractor
134+
indexPage *IndexPageContent
135+
HTTPHeaderMiddleware *HTTPHeaderMiddleware
136+
corsOrigin *regexp.Regexp
141137
}
142138

143139
func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logger) (*API, error) {
@@ -160,14 +156,13 @@ func New(cfg Config, serverCfg server.Config, s *server.Server, logger log.Logge
160156
}
161157

162158
api := &API{
163-
cfg: cfg,
164-
AuthMiddleware: cfg.HTTPAuthMiddleware,
165-
server: s,
166-
logger: logger,
167-
sourceIPs: sourceIPs,
168-
indexPage: newIndexPageContent(),
169-
corsOrigin: corsOrigin,
170-
distributedExecEnabled: cfg.DistributedExecEnabled,
159+
cfg: cfg,
160+
AuthMiddleware: cfg.HTTPAuthMiddleware,
161+
server: s,
162+
logger: logger,
163+
sourceIPs: sourceIPs,
164+
indexPage: newIndexPageContent(),
165+
corsOrigin: corsOrigin,
171166
}
172167

173168
// If no authentication middleware is present in the config, use the default authentication middleware.

pkg/api/handlers.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,10 +163,11 @@ func NewQuerierHandler(
163163
cfg Config,
164164
queryable storage.SampleAndChunkQueryable,
165165
exemplarQueryable storage.ExemplarQueryable,
166-
engine engine.BaseEngine,
166+
engine engine.QueryEngine,
167167
metadataQuerier querier.MetadataQuerier,
168168
reg prometheus.Registerer,
169169
logger log.Logger,
170+
distributedExecEnabled bool,
170171
) http.Handler {
171172
// Prometheus histograms for requests to the querier.
172173
querierRequestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
@@ -280,7 +281,7 @@ func NewQuerierHandler(
280281
legacyPromRouter := route.New().WithPrefix(path.Join(legacyPrefix, "/api/v1"))
281282
api.Register(legacyPromRouter)
282283

283-
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, cfg.DistributedExecEnabled)
284+
queryAPI := queryapi.NewQueryAPI(engine, translateSampleAndChunkQueryable, statsRenderer, logger, codecs, corsOrigin, distributedExecEnabled)
284285

285286
// TODO(gotjosh): This custom handler is temporary until we're able to vendor the changes in:
286287
// https://github.com/prometheus/prometheus/pull/7125/files

pkg/api/handlers_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func TestBuildInfoAPI(t *testing.T) {
232232
version.Version = tc.version
233233
version.Branch = tc.branch
234234
version.Revision = tc.revision
235-
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, &FakeLogger{})
235+
handler := NewQuerierHandler(cfg, nil, nil, nil, nil, nil, &FakeLogger{}, false)
236236
writer := httptest.NewRecorder()
237237
req := httptest.NewRequest("GET", "/api/v1/status/buildinfo", nil)
238238
req = req.WithContext(user.InjectOrgID(req.Context(), "test"))

pkg/api/queryapi/query_api.go

Lines changed: 60 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727

2828
type QueryAPI struct {
2929
queryable storage.SampleAndChunkQueryable
30-
queryEngine engine.BaseEngine
30+
queryEngine engine.QueryEngine
3131
now func() time.Time
3232
statsRenderer v1.StatsRenderer
3333
logger log.Logger
@@ -37,7 +37,7 @@ type QueryAPI struct {
3737
}
3838

3939
func NewQueryAPI(
40-
qe engine.BaseEngine,
40+
qe engine.QueryEngine,
4141
q storage.SampleAndChunkQueryable,
4242
statsRenderer v1.StatsRenderer,
4343
logger log.Logger,
@@ -57,69 +57,6 @@ func NewQueryAPI(
5757
}
5858
}
5959

60-
func (q *QueryAPI) createQueryFromPlan(query string, byteLP []byte, start time.Time, end time.Time, step time.Duration, ctx context.Context, opts promql.QueryOpts) (promql.Query, apiFuncResult, error) {
61-
if step == 0 {
62-
// instant query logic (step is 0)
63-
if len(byteLP) != 0 {
64-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
65-
if err != nil {
66-
return nil, apiFuncResult{
67-
nil,
68-
&apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)},
69-
nil,
70-
nil,
71-
}, err
72-
}
73-
qry, err := q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, start, query)
74-
if err != nil {
75-
return nil, apiFuncResult{
76-
nil,
77-
&apiError{errorBadData, fmt.Errorf("failed to create instant query from logical plan: %v", err)},
78-
nil,
79-
nil,
80-
}, err
81-
}
82-
return qry, apiFuncResult{}, nil
83-
} else {
84-
// fallback to query string for instant query
85-
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, query, start)
86-
if err != nil {
87-
return nil, invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query"), err
88-
}
89-
return qry, apiFuncResult{}, nil
90-
}
91-
}
92-
93-
// range query logic (step is non-zero)
94-
if len(byteLP) != 0 {
95-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
96-
if err != nil {
97-
return nil, apiFuncResult{
98-
nil,
99-
&apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)},
100-
nil,
101-
nil,
102-
}, err
103-
}
104-
qry, err := q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, start, end, step, query)
105-
if err != nil {
106-
return nil, apiFuncResult{
107-
nil,
108-
&apiError{errorBadData, fmt.Errorf("failed to create range query from logical plan: %v", err)},
109-
nil,
110-
nil,
111-
}, err
112-
}
113-
return qry, apiFuncResult{}, nil
114-
} else {
115-
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, query, start, end, step)
116-
if err != nil {
117-
return nil, invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query"), err
118-
}
119-
return qry, apiFuncResult{}, nil
120-
}
121-
}
122-
12360
func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
12461
// TODO(Sungjin1212): Change to emit basic error (not gRPC)
12562
start, err := util.ParseTime(r.FormValue("start"))
@@ -170,18 +107,42 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
170107
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
171108

172109
var qry promql.Query
173-
var funcRes apiFuncResult
110+
startTime := convertMsToTime(start)
111+
endTime := convertMsToTime(end)
112+
stepDuration := convertMsToDuration(step)
113+
114+
byteLP, err := io.ReadAll(r.Body)
174115
if q.distributedExecEnabled {
175-
byteLP, err := io.ReadAll(r.Body)
176116
if err != nil {
177117
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
178118
}
179-
qry, funcRes, err = q.createQueryFromPlan(r.FormValue("query"), byteLP, convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step), ctx, opts)
180-
if err != nil {
181-
return funcRes
119+
if len(byteLP) != 0 {
120+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
121+
if err != nil {
122+
return apiFuncResult{
123+
nil,
124+
&apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)},
125+
nil,
126+
nil,
127+
}
128+
}
129+
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query"))
130+
if err != nil {
131+
return apiFuncResult{
132+
nil,
133+
&apiError{errorBadData, fmt.Errorf("failed to create range query from logical plan: %v", err)},
134+
nil,
135+
nil,
136+
}
137+
}
138+
} else { // if there is logical plan field is empty, fall back
139+
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration)
140+
if err != nil {
141+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
142+
}
182143
}
183144
} else {
184-
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
145+
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration)
185146
if err != nil {
186147
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
187148
}
@@ -241,18 +202,41 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
241202
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
242203

243204
var qry promql.Query
244-
var funcRes apiFuncResult
205+
tsTime := convertMsToTime(ts)
206+
245207
if q.distributedExecEnabled {
246208
byteLP, err := io.ReadAll(r.Body)
247209
if err != nil {
248210
return apiFuncResult{nil, &apiError{errorBadData, err}, nil, nil}
249211
}
250-
qry, funcRes, err = q.createQueryFromPlan(r.FormValue("query"), byteLP, convertMsToTime(ts), convertMsToTime(ts), convertMsToDuration(0), ctx, opts)
251-
if err != nil {
252-
return funcRes
212+
if len(byteLP) != 0 {
213+
logicalPlan, err := logicalplan.Unmarshal(byteLP)
214+
if err != nil {
215+
return apiFuncResult{
216+
nil,
217+
&apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)},
218+
nil,
219+
nil,
220+
}
221+
}
222+
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
223+
if err != nil {
224+
return apiFuncResult{
225+
nil,
226+
&apiError{errorBadData, fmt.Errorf("failed to create instant query from logical plan: %v", err)},
227+
nil,
228+
nil,
229+
}
230+
}
231+
} else {
232+
// fallback to query string for instant query
233+
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime)
234+
if err != nil {
235+
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
236+
}
253237
}
254238
} else {
255-
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
239+
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime)
256240
if err != nil {
257241
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
258242
}

pkg/cortex/cortex.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ type Cortex struct {
322322
QuerierQueryable prom_storage.SampleAndChunkQueryable
323323
ExemplarQueryable prom_storage.ExemplarQueryable
324324
MetadataQuerier querier.MetadataQuerier
325-
QuerierEngine engine.BaseEngine
325+
QuerierEngine engine.QueryEngine
326326
QueryFrontendTripperware tripperware.Tripperware
327327
ResourceMonitor *resource.Monitor
328328

pkg/cortex/modules.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ func (t *Cortex) initQuerier() (serv services.Service, err error) {
370370
t.MetadataQuerier,
371371
prometheus.DefaultRegisterer,
372372
util_log.Logger,
373+
t.Cfg.Querier.DistributedExecEnabled,
373374
)
374375

375376
// If the querier is running standalone without the query-frontend or query-scheduler, we must register it's internal
@@ -534,7 +535,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
534535
shardedPrometheusCodec,
535536
t.Cfg.Querier.LookbackDelta,
536537
t.Cfg.Querier.DefaultEvaluationInterval,
537-
t.Cfg.Frontend.DistributedExecEnabled,
538+
t.Cfg.Querier.DistributedExecEnabled,
538539
)
539540
if err != nil {
540541
return nil, err
@@ -547,7 +548,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
547548
queryAnalyzer,
548549
t.Cfg.Querier.LookbackDelta,
549550
t.Cfg.Querier.DefaultEvaluationInterval,
550-
t.Cfg.Frontend.DistributedExecEnabled)
551+
t.Cfg.Querier.DistributedExecEnabled)
551552
if err != nil {
552553
return nil, err
553554
}

pkg/engine/engine.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func GetEngineType(ctx context.Context) Type {
4444
return None
4545
}
4646

47-
type BaseEngine interface {
47+
type QueryEngine interface {
4848
NewInstantQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, ts time.Time) (promql.Query, error)
4949
NewRangeQuery(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, qs string, start, end time.Time, interval time.Duration) (promql.Query, error)
5050
MakeInstantQueryFromPlan(ctx context.Context, q storage.Queryable, opts promql.QueryOpts, root logicalplan.Node, ts time.Time, qs string) (promql.Query, error)

pkg/frontend/config.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ type CombinedFrontendConfig struct {
2020
FrontendV1 v1.Config `yaml:",inline"`
2121
FrontendV2 v2.Config `yaml:",inline"`
2222

23-
DownstreamURL string `yaml:"downstream_url"`
24-
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
23+
DownstreamURL string `yaml:"downstream_url"`
2524
}
2625

2726
func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
@@ -30,7 +29,6 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) {
3029
cfg.FrontendV2.RegisterFlags(f)
3130

3231
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
33-
f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
3432
}
3533

3634
// InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at

pkg/querier/querier.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ type Config struct {
9797
EnableParquetQueryable bool `yaml:"enable_parquet_queryable" doc:"hidden"`
9898
ParquetQueryableShardCacheSize int `yaml:"parquet_queryable_shard_cache_size" doc:"hidden"`
9999
ParquetQueryableDefaultBlockStore string `yaml:"parquet_queryable_default_block_store" doc:"hidden"`
100+
DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"`
100101
}
101102

102103
var (
@@ -146,6 +147,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
146147
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
147148
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
148149
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
150+
f.BoolVar(&cfg.DistributedExecEnabled, "querier.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.")
149151
}
150152

151153
// Validate the config
@@ -197,7 +199,7 @@ func getChunksIteratorFunction(_ Config) chunkIteratorFunc {
197199
}
198200

199201
// New builds a queryable and promql engine.
200-
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.BaseEngine) {
202+
func New(cfg Config, limits *validation.Overrides, distributor Distributor, stores []QueryableWithFilter, reg prometheus.Registerer, logger log.Logger, isPartialDataEnabled partialdata.IsCfgEnabledFunc) (storage.SampleAndChunkQueryable, storage.ExemplarQueryable, engine.QueryEngine) {
201203
iteratorFunc := getChunksIteratorFunction(cfg)
202204

203205
distributorQueryable := newDistributorQueryable(distributor, cfg.IngesterMetadataStreaming, cfg.IngesterLabelNamesWithMatchers, iteratorFunc, cfg.QueryIngestersWithin, isPartialDataEnabled, cfg.IngesterQueryMaxAttempts)

0 commit comments

Comments
 (0)