Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/api/queryapi/query_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ import (
"github.com/prometheus/prometheus/util/annotations"
"github.com/prometheus/prometheus/util/httputil"
v1 "github.com/prometheus/prometheus/web/api/v1"
"github.com/thanos-io/promql-engine/logicalplan"
"github.com/weaveworks/common/httpgrpc"

"github.com/cortexproject/cortex/pkg/distributed_execution"
"github.com/cortexproject/cortex/pkg/engine"
"github.com/cortexproject/cortex/pkg/querier"
"github.com/cortexproject/cortex/pkg/util"
Expand Down Expand Up @@ -110,7 +110,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {

byteLP := []byte(r.PostFormValue("plan"))
if len(byteLP) != 0 {
logicalPlan, err := logicalplan.Unmarshal(byteLP)
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
}
Expand Down Expand Up @@ -183,7 +183,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {

byteLP := []byte(r.PostFormValue("plan"))
if len(byteLP) != 0 {
logicalPlan, err := logicalplan.Unmarshal(byteLP)
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
if err != nil {
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.DefaultEvaluationInterval,
t.Cfg.Querier.DistributedExecEnabled,
t.Cfg.Querier.ThanosEngine.LogicalOptimizers,
)
if err != nil {
return nil, err
Expand All @@ -553,7 +554,8 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
queryAnalyzer,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.DefaultEvaluationInterval,
t.Cfg.Querier.DistributedExecEnabled)
t.Cfg.Querier.DistributedExecEnabled,
t.Cfg.Querier.ThanosEngine.LogicalOptimizers)
if err != nil {
return nil, err
}
Expand Down
196 changes: 196 additions & 0 deletions pkg/distributed_execution/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package distributed_execution

import (
"bytes"
"encoding/json"
"math"

"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/promql-engine/logicalplan"
)

type jsonNode struct {
Type logicalplan.NodeType `json:"type"`
Data json.RawMessage `json:"data"`
Children []json.RawMessage `json:"children,omitempty"`
}

const (
nanVal = `"NaN"`
infVal = `"+Inf"`
negInfVal = `"-Inf"`
)

// Unmarshal deserializes a logical plan node from JSON data.
// This is a custom implementation for Cortex that is copied from Thanos engine's unmarshaling func
// to support remote nodes. We maintain this separate implementation because Thanos engine's
// logical plan codec currently doesn't support custom node types in its unmarshaling process.
func Unmarshal(data []byte) (logicalplan.Node, error) {
return unmarshalNode(data)
}

func unmarshalNode(data []byte) (logicalplan.Node, error) {
t := jsonNode{}
if err := json.Unmarshal(data, &t); err != nil {
return nil, err
}

switch t.Type {
case logicalplan.VectorSelectorNode:
v := &logicalplan.VectorSelector{}
if err := json.Unmarshal(t.Data, v); err != nil {
return nil, err
}
var err error
for i, m := range v.LabelMatchers {
v.LabelMatchers[i], err = labels.NewMatcher(m.Type, m.Name, m.Value)
if err != nil {
return nil, err
}
}
return v, nil
case logicalplan.MatrixSelectorNode:
m := &logicalplan.MatrixSelector{}
if err := json.Unmarshal(t.Data, m); err != nil {
return nil, err
}
vs, err := unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
m.VectorSelector = vs.(*logicalplan.VectorSelector)
return m, nil
case logicalplan.AggregationNode:
a := &logicalplan.Aggregation{}
if err := json.Unmarshal(t.Data, a); err != nil {
return nil, err
}
var err error
a.Expr, err = unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
if len(t.Children) > 1 {
a.Param, err = unmarshalNode(t.Children[1])
if err != nil {
return nil, err
}
}
return a, nil
case logicalplan.BinaryNode:
b := &logicalplan.Binary{}
if err := json.Unmarshal(t.Data, b); err != nil {
return nil, err
}
var err error
b.LHS, err = unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
b.RHS, err = unmarshalNode(t.Children[1])
if err != nil {
return nil, err
}
return b, nil
case logicalplan.FunctionNode:
f := &logicalplan.FunctionCall{}
if err := json.Unmarshal(t.Data, f); err != nil {
return nil, err
}
for _, c := range t.Children {
child, err := unmarshalNode(c)
if err != nil {
return nil, err
}
f.Args = append(f.Args, child)
}
return f, nil
case logicalplan.NumberLiteralNode:
n := &logicalplan.NumberLiteral{}
if bytes.Equal(t.Data, []byte(infVal)) {
n.Val = math.Inf(1)
} else if bytes.Equal(t.Data, []byte(negInfVal)) {
n.Val = math.Inf(-1)
} else if bytes.Equal(t.Data, []byte(nanVal)) {
n.Val = math.NaN()
} else {
if err := json.Unmarshal(t.Data, n); err != nil {
return nil, err
}
}
return n, nil
case logicalplan.StringLiteralNode:
s := &logicalplan.StringLiteral{}
if err := json.Unmarshal(t.Data, s); err != nil {
return nil, err
}
return s, nil
case logicalplan.SubqueryNode:
s := &logicalplan.Subquery{}
if err := json.Unmarshal(t.Data, s); err != nil {
return nil, err
}
var err error
s.Expr, err = unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
return s, nil
case logicalplan.CheckDuplicateNode:
c := &logicalplan.CheckDuplicateLabels{}
if err := json.Unmarshal(t.Data, c); err != nil {
return nil, err
}
var err error
c.Expr, err = unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
return c, nil
case logicalplan.StepInvariantNode:
s := &logicalplan.StepInvariantExpr{}
if err := json.Unmarshal(t.Data, s); err != nil {
return nil, err
}
var err error
s.Expr, err = unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
return s, nil
case logicalplan.ParensNode:
p := &logicalplan.Parens{}
if err := json.Unmarshal(t.Data, p); err != nil {
return nil, err
}
var err error
p.Expr, err = unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
return p, nil
case logicalplan.UnaryNode:
u := &logicalplan.Unary{}
if err := json.Unmarshal(t.Data, u); err != nil {
return nil, err
}
var err error
u.Expr, err = unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
return u, nil
case RemoteNode:
r := &Remote{}
if err := json.Unmarshal(t.Data, r); err != nil {
return nil, err
}
var err error
r.Expr, err = unmarshalNode(t.Children[0])
if err != nil {
return nil, err
}
return r, nil
}
return nil, nil
}
70 changes: 70 additions & 0 deletions pkg/distributed_execution/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package distributed_execution

import (
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/thanos-io/promql-engine/logicalplan"
)

func TestUnmarshalWithLogicalPlan(t *testing.T) {
t.Run("unmarshal complex query plan", func(t *testing.T) {
start := time.Now()
end := start.Add(1 * time.Hour)
step := 15 * time.Second

testCases := []struct {
name string
query string
}{
{
name: "binary operation",
query: "http_requests_total + rate(node_cpu_seconds_total[5m])",
},
{
name: "aggregation",
query: "sum(rate(http_requests_total[5m])) by (job)",
},
{
name: "complex query",
query: "sum(rate(http_requests_total{job='prometheus'}[5m])) by (job) / sum(rate(node_cpu_seconds_total[5m])) by (job)",
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
plan, _, err := CreateTestLogicalPlan(tc.query, start, end, step)
require.NoError(t, err)
require.NotNil(t, plan)

data, err := logicalplan.Marshal((*plan).Root())
require.NoError(t, err)

node, err := Unmarshal(data)
require.NoError(t, err)
require.NotNil(t, node)

// the logical plan node before and after marshal/unmarshal should be the same
verifyNodeStructure(t, (*plan).Root(), node)
})
}
})
}

func verifyNodeStructure(t *testing.T, expected logicalplan.Node, actual logicalplan.Node) {
require.Equal(t, expected.Type(), actual.Type())
require.Equal(t, expected.String(), actual.String())
require.Equal(t, expected.ReturnType(), actual.ReturnType())

expectedChildren := expected.Children()
actualChildren := actual.Children()

require.Equal(t, len(expectedChildren), len(actualChildren))

for i := 0; i < len(expectedChildren); i++ {
if expectedChildren[i] != nil && actualChildren[i] != nil {
verifyNodeStructure(t, *expectedChildren[i], *actualChildren[i])
}
}
}
49 changes: 49 additions & 0 deletions pkg/distributed_execution/distributed_optimizer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package distributed_execution

import (
"github.com/thanos-io/promql-engine/query"

"github.com/prometheus/prometheus/util/annotations"
"github.com/thanos-io/promql-engine/logicalplan"
)

// This is a simplified implementation that only handles binary aggregation cases
// Future versions of the distributed optimizer are expected to:
// - Support more complex query patterns
// - Incorporate diverse optimization strategies
// - Extend support to node types beyond binary operations

type DistributedOptimizer struct{}

func (d *DistributedOptimizer) Optimize(root logicalplan.Node, opts *query.Options) (logicalplan.Node, annotations.Annotations) {
warns := annotations.New()

logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {

if (*current).Type() == logicalplan.BinaryNode && d.hasAggregation(current) {
ch := (*current).Children()

for _, child := range ch {
temp := (*child).Clone()
*child = NewRemoteNode(temp)
*(*child).Children()[0] = temp
}
}

return false
})

return root, *warns
}

func (d *DistributedOptimizer) hasAggregation(root *logicalplan.Node) bool {
isAggr := false
logicalplan.TraverseBottomUp(nil, root, func(parent, current *logicalplan.Node) bool {
if (*current).Type() == logicalplan.AggregationNode {
isAggr = true
return true
}
return false
})
return isAggr
}
Loading
Loading