Skip to content

Commit 46f063d

Browse files
committed
add logical plan distributed optimizer to query frontend
Signed-off-by: rubywtl <[email protected]>
1 parent 52b9672 commit 46f063d

File tree

9 files changed

+609
-4
lines changed

9 files changed

+609
-4
lines changed

pkg/api/queryapi/query_api.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ import (
1616
"github.com/prometheus/prometheus/util/annotations"
1717
"github.com/prometheus/prometheus/util/httputil"
1818
v1 "github.com/prometheus/prometheus/web/api/v1"
19-
"github.com/thanos-io/promql-engine/logicalplan"
2019
"github.com/weaveworks/common/httpgrpc"
2120

21+
"github.com/cortexproject/cortex/pkg/distributed_execution"
2222
"github.com/cortexproject/cortex/pkg/engine"
2323
"github.com/cortexproject/cortex/pkg/querier"
2424
"github.com/cortexproject/cortex/pkg/util"
@@ -110,7 +110,7 @@ func (q *QueryAPI) RangeQueryHandler(r *http.Request) (result apiFuncResult) {
110110

111111
byteLP := []byte(r.PostFormValue("plan"))
112112
if len(byteLP) != 0 {
113-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
113+
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
114114
if err != nil {
115115
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
116116
}
@@ -183,7 +183,7 @@ func (q *QueryAPI) InstantQueryHandler(r *http.Request) (result apiFuncResult) {
183183

184184
byteLP := []byte(r.PostFormValue("plan"))
185185
if len(byteLP) != 0 {
186-
logicalPlan, err := logicalplan.Unmarshal(byteLP)
186+
logicalPlan, err := distributed_execution.Unmarshal(byteLP)
187187
if err != nil {
188188
return apiFuncResult{nil, &apiError{errorInternal, fmt.Errorf("invalid logical plan: %v", err)}, nil, nil}
189189
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
package distributed_execution
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/prometheus/prometheus/util/annotations"
7+
"github.com/thanos-io/promql-engine/logicalplan"
8+
)
9+
10+
// This is a simplified implementation that only handles binary aggregation cases
11+
// Future versions of the distributed optimizer are expected to:
12+
// - Support more complex query patterns
13+
// - Incorporate diverse optimization strategies
14+
// - Extend support to node types beyond binary operations
15+
16+
type DistributedOptimizer struct{}
17+
18+
func (d *DistributedOptimizer) Optimize(root logicalplan.Node) (logicalplan.Node, annotations.Annotations, error) {
19+
warns := annotations.New()
20+
21+
if root == nil {
22+
return nil, *warns, fmt.Errorf("nil root node")
23+
}
24+
25+
logicalplan.TraverseBottomUp(nil, &root, func(parent, current *logicalplan.Node) bool {
26+
27+
if (*current).Type() == logicalplan.BinaryNode {
28+
ch := (*current).Children()
29+
30+
for _, child := range ch {
31+
temp := (*child).Clone()
32+
*child = NewRemoteNode()
33+
*(*child).Children()[0] = temp
34+
}
35+
}
36+
37+
return false
38+
})
39+
return root, *warns, nil
40+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
package distributed_execution
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/prometheus/prometheus/promql/parser"
8+
"github.com/stretchr/testify/require"
9+
"github.com/thanos-io/promql-engine/logicalplan"
10+
"github.com/thanos-io/promql-engine/query"
11+
)
12+
13+
func TestDistributedOptimizer(t *testing.T) {
14+
now := time.Now()
15+
testCases := []struct {
16+
name string
17+
query string
18+
start time.Time
19+
end time.Time
20+
step time.Duration
21+
remoteExecCount int
22+
}{
23+
{
24+
name: "binary operation with aggregations",
25+
query: "sum(rate(node_cpu_seconds_total{mode!=\"idle\"}[5m])) + sum(rate(node_memory_Active_bytes[5m]))",
26+
start: now,
27+
end: now,
28+
step: time.Minute,
29+
remoteExecCount: 2,
30+
},
31+
{
32+
name: "multiple binary operations with aggregations",
33+
query: "sum(rate(http_requests_total{job=\"api\"}[5m])) + sum(rate(http_requests_total{job=\"web\"}[5m])) - sum(rate(http_requests_total{job=\"cache\"}[5m]))",
34+
start: now,
35+
end: now,
36+
step: time.Minute,
37+
remoteExecCount: 4,
38+
},
39+
{
40+
name: "subquery with aggregation",
41+
query: "sum(rate(container_network_transmit_bytes_total[5m:1m]))",
42+
start: now,
43+
end: now,
44+
step: time.Minute,
45+
remoteExecCount: 0,
46+
},
47+
{
48+
name: "function applied on binary operation",
49+
query: "rate(http_requests_total[5m]) + rate(http_errors_total[5m]) > bool 0",
50+
start: now,
51+
end: now,
52+
step: time.Minute,
53+
remoteExecCount: 4,
54+
},
55+
{
56+
name: "numerical binary query",
57+
query: "(1 + 1) + (1 + 1)",
58+
start: now,
59+
end: now,
60+
step: time.Minute,
61+
remoteExecCount: 0,
62+
},
63+
}
64+
65+
for _, tc := range testCases {
66+
t.Run(tc.name, func(t *testing.T) {
67+
lp, _, err := CreateTestLogicalPlan(tc.query, tc.start, tc.end, tc.step)
68+
require.NoError(t, err)
69+
70+
d := DistributedOptimizer{}
71+
newRoot, _, err := d.Optimize((*lp).Root())
72+
require.NoError(t, err)
73+
74+
remoteNodeCount := 0
75+
logicalplan.TraverseBottomUp(nil, &newRoot, func(parent, current *logicalplan.Node) bool {
76+
if RemoteNode == (*current).Type() {
77+
remoteNodeCount++
78+
}
79+
return false
80+
})
81+
require.Equal(t, tc.remoteExecCount, remoteNodeCount)
82+
})
83+
}
84+
}
85+
86+
func getStartAndEnd(start time.Time, end time.Time, step time.Duration) (time.Time, time.Time) {
87+
if step == 0 {
88+
return start, start
89+
}
90+
return start, end
91+
}
92+
93+
func CreateTestLogicalPlan(qs string, start time.Time, end time.Time, step time.Duration) (*logicalplan.Plan, query.Options, error) {
94+
95+
start, end = getStartAndEnd(start, end, step)
96+
97+
qOpts := query.Options{
98+
Start: start,
99+
End: end,
100+
Step: step,
101+
StepsBatch: 10,
102+
NoStepSubqueryIntervalFn: func(duration time.Duration) time.Duration {
103+
return 0
104+
},
105+
LookbackDelta: 0,
106+
EnablePerStepStats: false,
107+
}
108+
109+
expr, err := parser.NewParser(qs, parser.WithFunctions(parser.Functions)).ParseExpr()
110+
if err != nil {
111+
return nil, qOpts, err
112+
}
113+
114+
planOpts := logicalplan.PlanOptions{
115+
DisableDuplicateLabelCheck: false,
116+
}
117+
118+
logicalPlan, err := logicalplan.NewFromAST(expr, &qOpts, planOpts)
119+
if err != nil {
120+
return nil, qOpts, err
121+
}
122+
optimizedPlan, _ := logicalPlan.Optimize(logicalplan.DefaultOptimizers)
123+
124+
return &optimizedPlan, qOpts, nil
125+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package distributed_execution
2+
3+
type FragmentKey struct {
4+
queryID uint64
5+
fragmentID uint64
6+
}
7+
8+
func MakeFragmentKey(queryID uint64, fragmentID uint64) *FragmentKey {
9+
return &FragmentKey{
10+
queryID: queryID,
11+
fragmentID: fragmentID,
12+
}
13+
}
14+
15+
func (f FragmentKey) GetQueryID() uint64 {
16+
return f.queryID
17+
}
18+
19+
func (f FragmentKey) GetFragmentID() uint64 {
20+
return f.fragmentID
21+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package distributed_execution
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
7+
"github.com/prometheus/prometheus/promql/parser"
8+
"github.com/thanos-io/promql-engine/logicalplan"
9+
)
10+
11+
type NodeType = logicalplan.NodeType
12+
type Node = logicalplan.Node
13+
14+
const (
15+
RemoteNode = "RemoteNode"
16+
)
17+
18+
// (to verify interface implementations)
19+
var _ logicalplan.Node = (*Remote)(nil)
20+
21+
type Remote struct {
22+
Op parser.ItemType
23+
Expr Node `json:"-"`
24+
25+
FragmentKey FragmentKey
26+
FragmentAddr string
27+
}
28+
29+
func NewRemoteNode() Node {
30+
return &Remote{
31+
// initialize the fragment key pointer first
32+
FragmentKey: FragmentKey{},
33+
}
34+
}
35+
func (r *Remote) Clone() Node {
36+
return &Remote{Op: r.Op, Expr: r.Expr.Clone(), FragmentKey: r.FragmentKey, FragmentAddr: r.FragmentAddr}
37+
}
38+
func (r *Remote) Children() []*Node {
39+
return []*Node{&r.Expr}
40+
}
41+
func (r *Remote) String() string {
42+
return fmt.Sprintf("%s%s", r.Op.String(), r.Expr.String())
43+
}
44+
func (r *Remote) ReturnType() parser.ValueType {
45+
return r.Expr.ReturnType()
46+
}
47+
func (r *Remote) Type() NodeType { return RemoteNode }
48+
49+
type remote struct {
50+
QueryID uint64
51+
FragmentID uint64
52+
FragmentAddr string
53+
}
54+
55+
func (r *Remote) MarshalJSON() ([]byte, error) {
56+
return json.Marshal(remote{
57+
QueryID: r.FragmentKey.queryID,
58+
FragmentID: r.FragmentKey.fragmentID,
59+
FragmentAddr: r.FragmentAddr,
60+
})
61+
}
62+
63+
func (r *Remote) UnmarshalJSON(data []byte) error {
64+
re := remote{}
65+
if err := json.Unmarshal(data, &re); err != nil {
66+
return err
67+
}
68+
69+
r.FragmentKey = *MakeFragmentKey(re.QueryID, re.FragmentID)
70+
r.FragmentAddr = re.FragmentAddr
71+
return nil
72+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package distributed_execution
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
"github.com/prometheus/prometheus/promql/parser"
8+
"github.com/stretchr/testify/require"
9+
"github.com/thanos-io/promql-engine/logicalplan"
10+
)
11+
12+
func TestRemoteNode(t *testing.T) {
13+
t.Run("NewRemoteNode creates valid node", func(t *testing.T) {
14+
node := NewRemoteNode()
15+
require.NotNil(t, node)
16+
require.IsType(t, &Remote{}, node)
17+
require.Equal(t, (&Remote{}).Type(), node.Type())
18+
})
19+
20+
t.Run("Clone creates correct copy", func(t *testing.T) {
21+
original := &Remote{
22+
Op: parser.ADD,
23+
FragmentKey: FragmentKey{queryID: 1, fragmentID: 2},
24+
FragmentAddr: "[IP_ADDRESS]:9090",
25+
Expr: &logicalplan.NumberLiteral{Val: 42},
26+
}
27+
28+
cloned := original.Clone()
29+
require.NotNil(t, cloned)
30+
31+
remote, ok := cloned.(*Remote)
32+
require.True(t, ok)
33+
require.Equal(t, original.Op, remote.Op)
34+
require.Equal(t, original.FragmentKey, remote.FragmentKey)
35+
require.Equal(t, original.FragmentAddr, remote.FragmentAddr)
36+
require.Equal(t, original.Expr.String(), remote.Expr.String())
37+
})
38+
39+
t.Run("JSON marshaling/unmarshaling", func(t *testing.T) {
40+
original := &Remote{
41+
FragmentKey: *MakeFragmentKey(1, 2),
42+
FragmentAddr: "[IP_ADDRESS]:9090",
43+
}
44+
45+
data, err := json.Marshal(original)
46+
require.NoError(t, err)
47+
48+
var unmarshaled Remote
49+
err = json.Unmarshal(data, &unmarshaled)
50+
require.NoError(t, err)
51+
52+
require.Equal(t, original.FragmentKey.queryID, unmarshaled.FragmentKey.queryID)
53+
require.Equal(t, original.FragmentKey.fragmentID, unmarshaled.FragmentKey.fragmentID)
54+
require.Equal(t, original.FragmentAddr, unmarshaled.FragmentAddr)
55+
})
56+
57+
t.Run("Children returns correct nodes", func(t *testing.T) {
58+
expr := &logicalplan.NumberLiteral{Val: 42}
59+
node := &Remote{
60+
Expr: expr,
61+
}
62+
63+
children := node.Children()
64+
require.Len(t, children, 1)
65+
require.Equal(t, expr, *children[0])
66+
})
67+
68+
t.Run("ReturnType matches expression type", func(t *testing.T) {
69+
expr := &logicalplan.NumberLiteral{Val: 42}
70+
node := &Remote{
71+
Expr: expr,
72+
}
73+
74+
require.Equal(t, expr.ReturnType(), node.ReturnType())
75+
})
76+
}

0 commit comments

Comments
 (0)