Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/api/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"github.com/prometheus/common/route"
"github.com/prometheus/common/version"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/storage"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/weaveworks/common/instrument"
"github.com/weaveworks/common/middleware"

"github.com/cortexproject/cortex/pkg/api/queryapi"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/querier/codec"
"github.com/cortexproject/cortex/pkg/querier/stats"
Expand Down Expand Up @@ -163,7 +163,7 @@ func NewQuerierHandler(
cfg Config,
queryable storage.SampleAndChunkQueryable,
exemplarQueryable storage.ExemplarQueryable,
engine promql.QueryEngine,
engine engine.QueryEngine,
metadataQuerier querier.MetadataQuerier,
reg prometheus.Registerer,
logger log.Logger,
Expand Down
52 changes: 44 additions & 8 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/httputil"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/engine"
Expand All @@ -25,7 +26,7 @@ import (

type QueryAPI struct {
queryable storage.SampleAndChunkQueryable
queryEngine promql.QueryEngine
queryEngine engine.QueryEngine
now func() time.Time
statsRenderer v1.StatsRenderer
logger log.Logger
Expand All @@ -34,7 +35,7 @@ type QueryAPI struct {
}

func NewQueryAPI(
qe promql.QueryEngine,
qe engine.QueryEngine,
q storage.SampleAndChunkQueryable,
statsRenderer v1.StatsRenderer,
logger log.Logger,
Expand Down Expand Up @@ -100,10 +101,29 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(start), convertMsToTime(end), convertMsToDuration(step))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

var qry promql.Query
startTime := convertMsToTime(start)
endTime := convertMsToTime(end)
stepDuration := convertMsToDuration(step)

byteLP := []byte(r.PostFormValue("plan"))
if len(byteLP) != 0 {
logicalPlan, err := logicalplan.Unmarshal(byteLP)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
}
qry, err = q.queryEngine.MakeRangeQueryFromPlan(ctx, q.queryable, opts, logicalPlan, startTime, endTime, stepDuration, r.FormValue("query"))
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil}
}
} else { // if there is logical plan field is empty, fall back
qry, err = q.queryEngine.NewRangeQuery(ctx, q.queryable, opts, r.FormValue("query"), startTime, endTime, stepDuration)
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
}
}

// From now on, we must only return with a finalizer in the result (to
// be called by the caller) or call qry.Close ourselves (which is
// required in the case of a panic).
Expand Down Expand Up @@ -156,9 +176,25 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {

ctx = engine.AddEngineTypeToContext(ctx, r)
ctx = querier.AddBlockStoreTypeToContext(ctx, r.Header.Get(querier.BlockStoreTypeHeader))
qry, err := q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), convertMsToTime(ts))
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")

var qry promql.Query
tsTime := convertMsToTime(ts)

byteLP := []byte(r.PostFormValue("plan"))
if len(byteLP) != 0 {
logicalPlan, err := logicalplan.Unmarshal(byteLP)
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
}
qry, err = q.queryEngine.MakeInstantQueryFromPlan(ctx, q.queryable, opts, logicalPlan, tsTime, r.FormValue("query"))
if err != nil {
return apiFuncResult{nil, &apiError{errorBadData, fmt.Errorf("failed to create range query from logical plan: %v", err)}, nil, nil}
}
} else { // if there is logical plan field is empty, fall back
qry, err = q.queryEngine.NewInstantQuery(ctx, q.queryable, opts, r.FormValue("query"), tsTime)
if err != nil {
return invalidParamError(httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error()), "query")
}
}

// From now on, we must only return with a finalizer in the result (to
Expand Down
Loading
Loading