diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 5f4679b9d4d..a13f35e6a9d 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -533,12 +533,21 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro prometheusCodec, shardedPrometheusCodec, t.Cfg.Querier.LookbackDelta, + t.Cfg.Querier.DefaultEvaluationInterval, + t.Cfg.Frontend.DistributedExecEnabled, ) if err != nil { return nil, err } - instantQueryMiddlewares, err := instantquery.Middlewares(util_log.Logger, t.Overrides, instantQueryCodec, queryAnalyzer, t.Cfg.Querier.LookbackDelta) + instantQueryMiddlewares, err := instantquery.Middlewares( + util_log.Logger, + t.Overrides, + instantQueryCodec, + queryAnalyzer, + t.Cfg.Querier.LookbackDelta, + t.Cfg.Querier.DefaultEvaluationInterval, + t.Cfg.Frontend.DistributedExecEnabled) if err != nil { return nil, err } diff --git a/pkg/frontend/config.go b/pkg/frontend/config.go index 03dff13980e..a1109f213ad 100644 --- a/pkg/frontend/config.go +++ b/pkg/frontend/config.go @@ -20,7 +20,8 @@ type CombinedFrontendConfig struct { FrontendV1 v1.Config `yaml:",inline"` FrontendV2 v2.Config `yaml:",inline"` - DownstreamURL string `yaml:"downstream_url"` + DownstreamURL string `yaml:"downstream_url"` + DistributedExecEnabled bool `yaml:"distributed_exec_enabled" doc:"hidden"` } func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) { @@ -29,6 +30,7 @@ func (cfg *CombinedFrontendConfig) RegisterFlags(f *flag.FlagSet) { cfg.FrontendV2.RegisterFlags(f) f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.") + f.BoolVar(&cfg.DistributedExecEnabled, "frontend.distributed-exec-enabled", false, "Experimental: Enables distributed execution of queries by passing logical query plan fragments to downstream components.") } // InitFrontend initializes frontend (either V1 -- without scheduler, or V2 -- with scheduler) or no frontend at diff --git a/pkg/querier/tripperware/distributed_query.go b/pkg/querier/tripperware/distributed_query.go new file mode 100644 index 00000000000..02a0692153d --- /dev/null +++ b/pkg/querier/tripperware/distributed_query.go @@ -0,0 +1,93 @@ +package tripperware + +import ( + "context" + "net/http" + "time" + + "github.com/prometheus/prometheus/promql/parser" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/promql-engine/query" + "github.com/weaveworks/common/httpgrpc" +) + +const ( + stepBatch = 10 +) + +func DistributedQueryMiddleware(defaultEvaluationInterval time.Duration, lookbackDelta time.Duration) Middleware { + return MiddlewareFunc(func(next Handler) Handler { + return distributedQueryMiddleware{ + next: next, + lookbackDelta: lookbackDelta, + defaultEvaluationInterval: defaultEvaluationInterval, + } + }) +} + +func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) { + if step == 0 { + return start, start + } + return start, end +} + +type distributedQueryMiddleware struct { + next Handler + defaultEvaluationInterval time.Duration + lookbackDelta time.Duration +} + +func (d distributedQueryMiddleware) newLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, error) { + + start, end = getStartAndEnd(start, end, step) + + qOpts := query.Options{ + Start: start, + End: end, + Step: step, + StepsBatch: stepBatch, + NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration { + return d.defaultEvaluationInterval + }, + // Hardcoded value for execution-time-params that will be re-populated again in the querier stage + LookbackDelta: d.lookbackDelta, + EnablePerStepStats: false, + } + + expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr() + if err != nil { + return nil, err + } + + planOpts := logicalplan.PlanOptions{ + DisableDuplicateLabelCheck: false, + } + + logicalPlan := logicalplan.NewFromAST(expr, &qOpts, planOpts) + optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers) + + return &optimizedPlan, nil +} + +func (d distributedQueryMiddleware) Do(ctx context.Context, r Request) (Response, error) { + promReq, ok := r.(*PrometheusRequest) + if !ok { + return nil, httpgrpc.Errorf(http.StatusBadRequest, "invalid request format") + } + + startTime := time.Unix(0, promReq.Start*int64(time.Millisecond)) + endTime := time.Unix(0, promReq.End*int64(time.Millisecond)) + step := time.Duration(promReq.Step) * time.Millisecond + + var err error + + newLogicalPlan, err := d.newLogicalPlan(promReq.Query, startTime, endTime, step) + if err != nil { + return nil, err + } + + promReq.LogicalPlan = *newLogicalPlan + + return d.next.Do(ctx, r) +} diff --git a/pkg/querier/tripperware/distributed_query_test.go b/pkg/querier/tripperware/distributed_query_test.go new file mode 100644 index 00000000000..d11a3dfbbaa --- /dev/null +++ b/pkg/querier/tripperware/distributed_query_test.go @@ -0,0 +1,146 @@ +package tripperware + +import ( + "context" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLogicalPlanGeneration(t *testing.T) { + testCases := []struct { + name string + queryType string // "instant" or "range" + input *PrometheusRequest + err error + }{ + // instant query test cases + { + name: "instant - rate vector selector", + queryType: "instant", + input: &PrometheusRequest{ + Start: 100000, + End: 100000, + Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])", + }, + }, + { + name: "instant - memory usage expression", + queryType: "instant", + input: &PrometheusRequest{ + Start: 100000, + End: 100000, + Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))", + }, + }, + { + name: "instant - scalar only query", + queryType: "instant", + input: &PrometheusRequest{ + Start: 100000, + End: 100000, + Query: "42", + }, + }, + { + name: "instant - vector arithmetic", + queryType: "instant", + input: &PrometheusRequest{ + Start: 100000, + End: 100000, + Query: "node_load1 / ignoring(cpu) node_cpu_seconds_total", + }, + }, + { + name: "instant - avg_over_time with nested rate", + queryType: "instant", + input: &PrometheusRequest{ + Start: 100000, + End: 100000, + Query: "avg_over_time(rate(http_requests_total[5m])[30m:5m])", + }, + }, + + // query range test cases + { + name: "range - rate vector over time", + queryType: "range", + input: &PrometheusRequest{ + Start: 100000, + End: 200000, + Step: 15000, + Query: "rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])", + }, + }, + { + name: "range - memory usage ratio", + queryType: "range", + input: &PrometheusRequest{ + Start: 100000, + End: 200000, + Step: 30000, + Query: "100 * (1 - (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))", + }, + }, + { + name: "range - avg_over_time function", + queryType: "range", + input: &PrometheusRequest{ + Start: 100000, + End: 200000, + Step: 60000, + Query: "avg_over_time(http_requests_total[5m])", + }, + }, + { + name: "range - vector arithmetic with range", + queryType: "range", + input: &PrometheusRequest{ + Start: 100000, + End: 200000, + Step: 10000, + Query: "rate(node_network_receive_bytes_total[1m]) / rate(node_network_transmit_bytes_total[1m])", + }, + }, + { + name: "range - simple scalar operation", + queryType: "range", + input: &PrometheusRequest{ + Start: 100000, + End: 200000, + Step: 15000, + Query: "2 + 2", + }, + }, + } + + for i, tc := range testCases { + tc := tc + t.Run(strconv.Itoa(i)+"_"+tc.name, func(t *testing.T) { + t.Parallel() + + middleware := DistributedQueryMiddleware(time.Minute, 5*time.Minute) + + handler := middleware.Wrap(HandlerFunc(func(_ context.Context, req Request) (Response, error) { + return nil, nil + })) + + // additional validation on the test cases based on query type + if tc.queryType == "range" { + require.NotZero(t, tc.input.Step, "range query should have non-zero step") + require.NotEqual(t, tc.input.Start, tc.input.End, "range query should have different start and end times") + } else { + require.Equal(t, tc.input.Start, tc.input.End, "instant query should have equal start and end times") + require.Zero(t, tc.input.Step, "instant query should have zero step") + } + + // test: execute middleware to populate the logical plan + _, err := handler.Do(context.Background(), tc.input) + require.NoError(t, err) + require.NotEmpty(t, tc.input.LogicalPlan, "logical plan should be populated") + + }) + } +} diff --git a/pkg/querier/tripperware/instantquery/instant_query.go b/pkg/querier/tripperware/instantquery/instant_query.go index 54fe4aeba0d..a3977207199 100644 --- a/pkg/querier/tripperware/instantquery/instant_query.go +++ b/pkg/querier/tripperware/instantquery/instant_query.go @@ -23,6 +23,8 @@ import ( "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/spanlogger" + + "github.com/thanos-io/promql-engine/logicalplan" ) var ( @@ -141,6 +143,19 @@ func (c instantQueryCodec) DecodeResponse(ctx context.Context, r *http.Response, return &resp, nil } +func (c instantQueryCodec) getSerializedBody(promReq *tripperware.PrometheusRequest) ([]byte, error) { + var byteLP []byte + var err error + + if promReq.LogicalPlan != nil { + byteLP, err = logicalplan.Marshal(promReq.LogicalPlan.Root()) + if err != nil { + return nil, err + } + } + return byteLP, nil +} + func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) { promReq, ok := r.(*tripperware.PrometheusRequest) if !ok { @@ -168,17 +183,24 @@ func (c instantQueryCodec) EncodeRequest(ctx context.Context, r tripperware.Requ } } + h.Add("Content-Type", "application/json") + isSourceRuler := strings.Contains(h.Get("User-Agent"), tripperware.RulerUserAgent) if !isSourceRuler { // When the source is the Ruler, skip set header tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) } + byteBody, err := c.getSerializedBody(promReq) + if err != nil { + return nil, err + } + req := &http.Request{ - Method: "GET", + Method: "POST", RequestURI: u.String(), // This is what the httpgrpc code looks at. URL: u, - Body: http.NoBody, + Body: io.NopCloser(bytes.NewReader(byteBody)), Header: h, } diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go index 0fd48760480..a74a9ad1479 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_middlewares.go +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares.go @@ -15,10 +15,18 @@ func Middlewares( merger tripperware.Merger, queryAnalyzer querysharding.Analyzer, lookbackDelta time.Duration, + defaultEvaluationInterval time.Duration, + distributedExecEnabled bool, ) ([]tripperware.Middleware, error) { m := []tripperware.Middleware{ NewLimitsMiddleware(limits, lookbackDelta), tripperware.ShardByMiddleware(log, limits, merger, queryAnalyzer), } + + if distributedExecEnabled { + m = append(m, + tripperware.DistributedQueryMiddleware(defaultEvaluationInterval, lookbackDelta)) + } + return m, nil } diff --git a/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go new file mode 100644 index 00000000000..b6d445fe20b --- /dev/null +++ b/pkg/querier/tripperware/instantquery/instant_query_middlewares_test.go @@ -0,0 +1,290 @@ +package instantquery + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" + "github.com/thanos-io/thanos/pkg/querysharding" + "github.com/weaveworks/common/middleware" + "github.com/weaveworks/common/user" + + "github.com/cortexproject/cortex/pkg/querier/tripperware" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +const ( + queryAll = "/api/v1/instant_query?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536716898&stats=all" + responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}` +) + +func TestRoundTrip(t *testing.T) { + s := httptest.NewServer( + middleware.AuthenticateUser.Wrap( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + switch r.RequestURI { + case queryAll: + _, err = w.Write([]byte(responseBody)) + default: + _, err = w.Write([]byte("bar")) + } + if err != nil { + t.Fatal(err) + } + }), + ), + ) + defer s.Close() + + u, err := url.Parse(s.URL) + require.NoError(t, err) + + downstream := singleHostRoundTripper{ + host: u.Host, + next: http.DefaultTransport, + } + + qa := querysharding.NewQueryAnalyzer() + instantQueryMiddleware, err := Middlewares( + log.NewNopLogger(), + mockLimitsShard{maxQueryLookback: 2}, + nil, + qa, + 5*time.Minute, + time.Minute, + false, + ) + require.NoError(t, err) + + defaultLimits := validation.NewOverrides(validation.Limits{}, nil) + + tw := tripperware.NewQueryTripperware(log.NewNopLogger(), + nil, + nil, + nil, + instantQueryMiddleware, + testInstantQueryCodec, + nil, + defaultLimits, + qa, + time.Minute, + 0, + 0, + false, + ) + + for i, tc := range []struct { + path, expectedBody string + }{ + {"/foo", "bar"}, + {queryAll, responseBody}, + } { + t.Run(strconv.Itoa(i), func(t *testing.T) { + req, err := http.NewRequest("POST", tc.path, http.NoBody) + require.NoError(t, err) + + ctx := user.InjectOrgID(context.Background(), "1") + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + resp, err := tw(downstream).RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + bs, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, tc.expectedBody, string(bs)) + }) + } +} + +func TestRoundTripWithAndWithoutDistributedExec(t *testing.T) { + // Common test server setup + s := httptest.NewServer( + middleware.AuthenticateUser.Wrap( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + switch r.RequestURI { + case queryAll: + _, err = w.Write([]byte(responseBody)) + default: + _, err = w.Write([]byte("bar")) + } + if err != nil { + t.Fatal(err) + } + }), + ), + ) + defer s.Close() + + u, err := url.Parse(s.URL) + require.NoError(t, err) + + downstream := singleHostRoundTripper{ + host: u.Host, + next: http.DefaultTransport, + } + + testCases := []struct { + name string + distributedEnabled bool + pReq *tripperware.PrometheusRequest + expectEmptyBody bool + }{ + { + name: "With distributed execution", + distributedEnabled: true, + pReq: &tripperware.PrometheusRequest{ + Start: 100000, + End: 100000, + Query: "node_cpu_seconds_total{mode!=\"idle\"}[5m]", + }, + expectEmptyBody: false, + }, + { + name: "Without distributed execution", + distributedEnabled: false, + pReq: &tripperware.PrometheusRequest{ + Start: 100000, + End: 100000, + Query: "node_cpu_seconds_total{mode!=\"idle\"}[5m]", + }, + expectEmptyBody: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + qa := querysharding.NewQueryAnalyzer() + instantQueryMiddlewares, err := Middlewares( + log.NewNopLogger(), + mockLimitsShard{shardSize: 2}, + nil, + qa, + 5*time.Minute, + time.Minute, + tc.distributedEnabled, + ) + require.NoError(t, err) + + defaultLimits := validation.NewOverrides(validation.Limits{}, nil) + + tw := tripperware.NewQueryTripperware(log.NewNopLogger(), + nil, + nil, + nil, + instantQueryMiddlewares, + testInstantQueryCodec, + nil, + defaultLimits, + qa, + time.Minute, + 0, + 0, + false, + ) + + ctx := user.InjectOrgID(context.Background(), "1") + + // test middlewares + for _, mw := range instantQueryMiddlewares { + handler := mw.Wrap(tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { + return nil, nil + })) + _, err := handler.Do(ctx, tc.pReq) + require.NoError(t, err) + } + + // encode and prepare request + req, err := tripperware.Codec.EncodeRequest(testInstantQueryCodec, context.Background(), tc.pReq) + require.NoError(t, err) + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + // check request body + body, err := io.ReadAll(req.Body) + require.NoError(t, err) + if tc.expectEmptyBody { + require.Empty(t, body) + } else { + require.NotEmpty(t, body) + byteLP, err := logicalplan.Marshal(tc.pReq.LogicalPlan.Root()) + require.NoError(t, err) + require.Equal(t, byteLP, body) + } + + // test round trip + resp, err := tw(downstream).RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + }) + } +} + +type mockLimitsShard struct { + maxQueryLookback time.Duration + maxQueryLength time.Duration + maxCacheFreshness time.Duration + maxQueryResponseSize int64 + shardSize int + queryPriority validation.QueryPriority + queryRejection validation.QueryRejection +} + +func (m mockLimitsShard) MaxQueryLookback(string) time.Duration { + return m.maxQueryLookback +} + +func (m mockLimitsShard) MaxQueryLength(string) time.Duration { + return m.maxQueryLength +} + +func (mockLimitsShard) MaxQueryParallelism(string) int { + return 14 // Flag default. +} + +func (m mockLimitsShard) MaxCacheFreshness(string) time.Duration { + return m.maxCacheFreshness +} + +func (m mockLimitsShard) MaxQueryResponseSize(string) int64 { + return m.maxQueryResponseSize +} + +func (m mockLimitsShard) QueryVerticalShardSize(userID string) int { + return m.shardSize +} + +func (m mockLimitsShard) QueryPriority(userID string) validation.QueryPriority { + return m.queryPriority +} + +func (m mockLimitsShard) QueryRejection(userID string) validation.QueryRejection { + return m.queryRejection +} + +type singleHostRoundTripper struct { + host string + next http.RoundTripper +} + +func (s singleHostRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { + r.URL.Scheme = "http" + r.URL.Host = s.host + return s.next.RoundTrip(r) +} diff --git a/pkg/querier/tripperware/instantquery/instant_query_test.go b/pkg/querier/tripperware/instantquery/instant_query_test.go index b9dec5bbef9..6aa4e797842 100644 --- a/pkg/querier/tripperware/instantquery/instant_query_test.go +++ b/pkg/querier/tripperware/instantquery/instant_query_test.go @@ -93,7 +93,7 @@ func TestRequest(t *testing.T) { tc := tc t.Run(tc.url, func(t *testing.T) { t.Parallel() - r, err := http.NewRequest("GET", tc.url, nil) + r, err := http.NewRequest("POST", tc.url, http.NoBody) require.NoError(t, err) r.Header.Add("Test-Header", "test") diff --git a/pkg/querier/tripperware/query.go b/pkg/querier/tripperware/query.go index e20ab6e3c4e..42e2d9eebf0 100644 --- a/pkg/querier/tripperware/query.go +++ b/pkg/querier/tripperware/query.go @@ -29,6 +29,8 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/runutil" + + "github.com/thanos-io/promql-engine/logicalplan" ) var ( @@ -96,6 +98,8 @@ type Request interface { GetStep() int64 // GetQuery returns the query of the request. GetQuery() string + // GetLogicalPlan returns the logical plan + GetLogicalPlan() logicalplan.Plan // WithStartEnd clone the current request with different start and end timestamp. WithStartEnd(startTime int64, endTime int64) Request // WithQuery clone the current request with a different query. @@ -152,6 +156,7 @@ type PrometheusRequest struct { Headers http.Header Stats string CachingOptions CachingOptions + LogicalPlan logicalplan.Plan } func (m *PrometheusRequest) GetPath() string { @@ -217,6 +222,13 @@ func (m *PrometheusRequest) GetStats() string { return "" } +func (m *PrometheusRequest) GetLogicalPlan() logicalplan.Plan { + if m == nil { + return nil + } + return m.LogicalPlan +} + // WithStartEnd clones the current `PrometheusRequest` with a new `start` and `end` timestamp. func (m *PrometheusRequest) WithStartEnd(start int64, end int64) Request { new := *m diff --git a/pkg/querier/tripperware/queryrange/query_range.go b/pkg/querier/tripperware/queryrange/query_range.go index 9d82031fc0b..df721146f66 100644 --- a/pkg/querier/tripperware/queryrange/query_range.go +++ b/pkg/querier/tripperware/queryrange/query_range.go @@ -14,13 +14,13 @@ import ( "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" + "github.com/thanos-io/promql-engine/logicalplan" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/api/queryapi" "github.com/cortexproject/cortex/pkg/querier/stats" "github.com/cortexproject/cortex/pkg/querier/tripperware" "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/limiter" "github.com/cortexproject/cortex/pkg/util/spanlogger" ) @@ -151,6 +151,21 @@ func (c prometheusCodec) DecodeRequest(_ context.Context, r *http.Request, forwa return &result, nil } +// getSerializedBody serializes the logical plan from a Prometheus request. +// Returns an empty byte array if the logical plan is nil. +func (c prometheusCodec) getSerializedBody(promReq *tripperware.PrometheusRequest) ([]byte, error) { + var byteLP []byte + var err error + + if promReq.LogicalPlan != nil { + byteLP, err = logicalplan.Marshal(promReq.LogicalPlan.Root()) + if err != nil { + return nil, err + } + } + return byteLP, nil +} + func (c prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Request) (*http.Request, error) { promReq, ok := r.(*tripperware.PrometheusRequest) if !ok { @@ -175,13 +190,20 @@ func (c prometheusCodec) EncodeRequest(ctx context.Context, r tripperware.Reques } } + h.Add("Content-Type", "application/json") + tripperware.SetRequestHeaders(h, c.defaultCodecType, c.compression) + bodyBytes, err := c.getSerializedBody(promReq) + if err != nil { + return nil, err + } + req := &http.Request{ - Method: "GET", + Method: "POST", RequestURI: u.String(), // This is what the httpgrpc code looks at. URL: u, - Body: http.NoBody, + Body: io.NopCloser(bytes.NewReader(bodyBytes)), Header: h, } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares.go b/pkg/querier/tripperware/queryrange/query_range_middlewares.go index 69185351bdc..38493f54130 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares.go @@ -102,6 +102,8 @@ func Middlewares( prometheusCodec tripperware.Codec, shardedPrometheusCodec tripperware.Codec, lookbackDelta time.Duration, + defaultEvaluationInterval time.Duration, + distributedExecEnabled bool, ) ([]tripperware.Middleware, cache.Cache, error) { // Metric used to keep track of each middleware execution duration. metrics := tripperware.NewInstrumentMiddlewareMetrics(registerer) @@ -137,5 +139,11 @@ func Middlewares( queryRangeMiddleware = append(queryRangeMiddleware, tripperware.InstrumentMiddleware("shardBy", metrics), tripperware.ShardByMiddleware(log, limits, shardedPrometheusCodec, queryAnalyzer)) + if distributedExecEnabled { + queryRangeMiddleware = append(queryRangeMiddleware, + tripperware.InstrumentMiddleware("range_logical_plan_gen", metrics), + tripperware.DistributedQueryMiddleware(defaultEvaluationInterval, lookbackDelta)) + } + return queryRangeMiddleware, c, nil } diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index b5cdef60386..4e19fe84a02 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -12,6 +12,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/require" + "github.com/thanos-io/promql-engine/logicalplan" "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" @@ -31,7 +32,7 @@ func TestRoundTrip(t *testing.T) { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var err error switch r.RequestURI { - case query: + case queryAll: _, err = w.Write([]byte(responseBody)) case queryWithWarnings: _, err = w.Write([]byte(responseBodyWithWarnings)) @@ -66,6 +67,8 @@ func TestRoundTrip(t *testing.T) { PrometheusCodec, ShardedPrometheusCodec, 5*time.Minute, + time.Minute, + false, ) require.NoError(t, err) @@ -90,11 +93,11 @@ func TestRoundTrip(t *testing.T) { path, expectedBody string }{ {"/foo", "bar"}, - {query, responseBody}, + {queryAll, responseBody}, } { t.Run(strconv.Itoa(i), func(t *testing.T) { //parallel testing causes data race - req, err := http.NewRequest("GET", tc.path, http.NoBody) + req, err := http.NewRequest("POST", tc.path, http.NoBody) require.NoError(t, err) // query-frontend doesn't actually authenticate requests, we rely on @@ -116,6 +119,142 @@ func TestRoundTrip(t *testing.T) { } } +func TestRoundTripWithAndWithoutDistributedExec(t *testing.T) { + s := httptest.NewServer( + middleware.AuthenticateUser.Wrap( + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var err error + switch r.RequestURI { + case queryAll: + _, err = w.Write([]byte(responseBody)) + case queryWithWarnings: + _, err = w.Write([]byte(responseBodyWithWarnings)) + case queryWithInfos: + _, err = w.Write([]byte(responseBodyWithInfos)) + default: + _, err = w.Write([]byte("bar")) + } + if err != nil { + t.Fatal(err) + } + }), + ), + ) + defer s.Close() + + u, err := url.Parse(s.URL) + require.NoError(t, err) + + downstream := singleHostRoundTripper{ + host: u.Host, + next: http.DefaultTransport, + } + + testCases := []struct { + name string + distributedEnabled bool + pReq *tripperware.PrometheusRequest + expectEmptyBody bool + }{ + { + name: "With distributed execution", + distributedEnabled: true, + pReq: &tripperware.PrometheusRequest{ + Start: 100000, + End: 200000, + Step: 15000, + Query: "node_cpu_seconds_total{mode!=\"idle\"}[5m]", + }, + expectEmptyBody: false, + }, + { + name: "Without distributed execution", + distributedEnabled: false, + pReq: &tripperware.PrometheusRequest{ + Start: 100000, + End: 200000, + Step: 15000, + Query: "node_cpu_seconds_total{mode!=\"idle\"}[5m]", + }, + expectEmptyBody: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + qa := querysharding.NewQueryAnalyzer() + queyrangemiddlewares, _, err := Middlewares(Config{}, + log.NewNopLogger(), + mockLimits{}, + nil, + nil, + qa, + PrometheusCodec, + ShardedPrometheusCodec, + 5*time.Minute, + time.Minute, + tc.distributedEnabled, + ) + require.NoError(t, err) + + defaultLimits := validation.NewOverrides(validation.Limits{}, nil) + + tw := tripperware.NewQueryTripperware(log.NewNopLogger(), + nil, + nil, + queyrangemiddlewares, + nil, + PrometheusCodec, + nil, + defaultLimits, + qa, + time.Minute, + 0, + 0, + false, + ) + + ctx := user.InjectOrgID(context.Background(), "1") + + // test middlewares + for _, mw := range queyrangemiddlewares { + handler := mw.Wrap(tripperware.HandlerFunc(func(_ context.Context, req tripperware.Request) (tripperware.Response, error) { + return nil, nil + })) + _, err := handler.Do(ctx, tc.pReq) + require.NoError(t, err) + } + + // encode and prepare request + req, err := tripperware.Codec.EncodeRequest(PrometheusCodec, context.Background(), tc.pReq) + require.NoError(t, err) + req = req.WithContext(ctx) + err = user.InjectOrgIDIntoHTTPRequest(ctx, req) + require.NoError(t, err) + + // check request body + body, err := io.ReadAll(req.Body) + require.NoError(t, err) + if tc.expectEmptyBody { + require.Empty(t, body) + } else { + require.NotEmpty(t, body) + byteLP, err := logicalplan.Marshal(tc.pReq.LogicalPlan.Root()) + require.NoError(t, err) + require.Equal(t, byteLP, body) + } + + // test round trip + resp, err := tw(downstream).RoundTrip(req) + require.NoError(t, err) + require.Equal(t, 200, resp.StatusCode) + + _, err = io.ReadAll(resp.Body) + require.NoError(t, err) + }) + } +} + type singleHostRoundTripper struct { host string next http.RoundTripper diff --git a/pkg/querier/tripperware/queryrange/query_range_test.go b/pkg/querier/tripperware/queryrange/query_range_test.go index 1f3ebb137d8..27fba6b1bab 100644 --- a/pkg/querier/tripperware/queryrange/query_range_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_test.go @@ -43,7 +43,7 @@ func TestRequest(t *testing.T) { expectedErr error }{ { - url: query, + url: queryAll, expected: &parsedRequestWithHeaders, }, { @@ -74,7 +74,7 @@ func TestRequest(t *testing.T) { tc := tc t.Run(tc.url, func(t *testing.T) { t.Parallel() - r, err := http.NewRequest("GET", tc.url, nil) + r, err := http.NewRequest("POST", tc.url, http.NoBody) require.NoError(t, err) r.Header.Add("Test-Header", "test") diff --git a/pkg/querier/tripperware/queryrange/results_cache_test.go b/pkg/querier/tripperware/queryrange/results_cache_test.go index b029e85443c..8d4c32cfd25 100644 --- a/pkg/querier/tripperware/queryrange/results_cache_test.go +++ b/pkg/querier/tripperware/queryrange/results_cache_test.go @@ -26,7 +26,7 @@ import ( ) const ( - query = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" + queryAll = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" queryWithWarnings = "/api/v1/query_range?end=1536716898&query=sumsum%28warnings%29&start=1536673680&stats=all&step=120&warnings=true" queryWithInfos = "/api/v1/query_range?end=1536716898&query=rate%28go_gc_gogc_percent%5B5m%5D%29&start=1536673680&stats=all&step=120" responseBody = `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"foo":"bar"},"values":[[1536673680,"137"],[1536673780,"137"]]}]}}` diff --git a/pkg/querier/tripperware/queryrange/split_by_interval_test.go b/pkg/querier/tripperware/queryrange/split_by_interval_test.go index c854959be03..2f219182bdb 100644 --- a/pkg/querier/tripperware/queryrange/split_by_interval_test.go +++ b/pkg/querier/tripperware/queryrange/split_by_interval_test.go @@ -294,7 +294,7 @@ func TestSplitByDay(t *testing.T) { intervalFn IntervalFn }{ { - path: query, + path: queryAll, expectedBody: string(mergedHTTPResponseBody), expectedQueryCount: 2, intervalFn: func(ctx context.Context, _ tripperware.Request) (context.Context, time.Duration, error) { @@ -302,7 +302,7 @@ func TestSplitByDay(t *testing.T) { }, }, { - path: query, + path: queryAll, expectedBody: string(mergedHTTPResponseBody), expectedQueryCount: 2, intervalFn: dynamicIntervalFn(Config{SplitQueriesByInterval: day}, mockLimits{}, querysharding.NewQueryAnalyzer(), lookbackDelta), @@ -344,7 +344,7 @@ func TestSplitByDay(t *testing.T) { next: http.DefaultTransport, }, PrometheusCodec, nil, NewLimitsMiddleware(mockLimits{}, 5*time.Minute), SplitByIntervalMiddleware(tc.intervalFn, mockLimits{}, PrometheusCodec, nil, lookbackDelta)) - req, err := http.NewRequest("GET", tc.path, http.NoBody) + req, err := http.NewRequest("POST", tc.path, http.NoBody) require.NoError(t, err) ctx := user.InjectOrgID(context.Background(), "1") diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 0ef5e1519d5..a73623a0b70 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -28,7 +28,7 @@ import ( const ( queryRange = "/api/v1/query_range?end=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29&start=1536673680&stats=all&step=120" - query = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29" + queryAll = "/api/v1/query?time=1536716898&query=sum%28container_memory_rss%29+by+%28namespace%29" queryNonShardable = "/api/v1/query?time=1536716898&query=container_memory_rss" queryExemplar = "/api/v1/query_exemplars?query=test_exemplar_metric_total&start=2020-09-14T15:22:25.479Z&end=2020-09-14T15:23:25.479Z'" querySubqueryStepSizeTooSmall = "/api/v1/query?query=up%5B30d%3A%5D" @@ -232,7 +232,7 @@ cortex_query_frontend_queries_total{op="query_range", source="api", user="1"} 1 `, }, { - path: query, + path: queryAll, expectedBody: instantResponseBody, limits: defaultOverrides, maxSubQuerySteps: 11000, @@ -256,7 +256,7 @@ cortex_query_frontend_queries_total{op="query", source="api", user="1"} 1 `, }, { - path: query, + path: queryAll, expectedBody: instantResponseBody, limits: shardingOverrides, maxSubQuerySteps: 11000, @@ -321,7 +321,7 @@ cortex_query_frontend_queries_total{op="query", source="api", user="1"} 1 } { t.Run(tc.path, func(t *testing.T) { //parallel testing causes data race - req, err := http.NewRequest("GET", tc.path, http.NoBody) + req, err := http.NewRequest("POST", tc.path, http.NoBody) require.NoError(t, err) // query-frontend doesn't actually authenticate requests, we rely on diff --git a/pkg/querier/tripperware/test_shard_by_query_utils.go b/pkg/querier/tripperware/test_shard_by_query_utils.go index bb13f3e1438..f03555f7fd4 100644 --- a/pkg/querier/tripperware/test_shard_by_query_utils.go +++ b/pkg/querier/tripperware/test_shard_by_query_utils.go @@ -453,7 +453,7 @@ http_requests_total`, ctx := user.InjectOrgID(context.Background(), "1") - req, err := http.NewRequest("GET", tt.path, http.NoBody) + req, err := http.NewRequest("POST", tt.path, http.NoBody) req = req.WithContext(ctx) require.NoError(t, err)