Skip to content

Commit c160724

Browse files
committed
Querier: Support configuring optimizers and XFunctions
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 4c0957a commit c160724

File tree

9 files changed

+207
-33
lines changed

9 files changed

+207
-33
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
1919
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
2020
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
21+
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
2122
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
2223
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859
2324
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715

docs/blocks-storage/querier.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,22 @@ querier:
252252
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
253253
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
254254

255-
# Experimental. Use Thanos promql engine
256-
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
257-
# engine.
258-
# CLI flag: -querier.thanos-engine
259-
[thanos_engine: <boolean> | default = false]
255+
thanos_engine:
256+
# Experimental. Use Thanos promql engine
257+
# https://github.com/thanos-io/promql-engine rather than the Prometheus
258+
# promql engine.
259+
# CLI flag: -querier.thanos-engine
260+
[enable_thanos_engine: <boolean> | default = false]
261+
262+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
263+
# CLI flag: -querier.enable-x-functions
264+
[enable_x_functions: <boolean> | default = false]
265+
266+
# Logical plan optimizers. Multiple optimizers can be provided as a
267+
# comma-separated list. Supported values: default, all, propagate-matchers,
268+
# sort-matchers, merge-selects, detect-histogram-stats
269+
# CLI flag: -querier.optimizers
270+
[optimizers: <string> | default = "default"]
260271

261272
# If enabled, ignore max query length check at Querier select method. Users
262273
# can choose to ignore it since the validation can be done before Querier

docs/configuration/config-file-reference.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4302,11 +4302,22 @@ store_gateway_client:
43024302
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
43034303
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
43044304

4305-
# Experimental. Use Thanos promql engine
4306-
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
4307-
# engine.
4308-
# CLI flag: -querier.thanos-engine
4309-
[thanos_engine: <boolean> | default = false]
4305+
thanos_engine:
4306+
# Experimental. Use Thanos promql engine
4307+
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
4308+
# engine.
4309+
# CLI flag: -querier.thanos-engine
4310+
[enable_thanos_engine: <boolean> | default = false]
4311+
4312+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
4313+
# CLI flag: -querier.enable-x-functions
4314+
[enable_x_functions: <boolean> | default = false]
4315+
4316+
# Logical plan optimizers. Multiple optimizers can be provided as a
4317+
# comma-separated list. Supported values: default, all, propagate-matchers,
4318+
# sort-matchers, merge-selects, detect-histogram-stats
4319+
# CLI flag: -querier.optimizers
4320+
[optimizers: <string> | default = "default"]
43104321

43114322
# If enabled, ignore max query length check at Querier select method. Users can
43124323
# choose to ignore it since the validation can be done before Querier evaluation

integration/querier_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/prometheus/prometheus/prompb"
2020
"github.com/stretchr/testify/assert"
2121
"github.com/stretchr/testify/require"
22+
"github.com/thanos-io/promql-engine/execution/parse"
2223

2324
"github.com/cortexproject/cortex/integration/e2e"
2425
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
@@ -416,6 +417,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
416417
"-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled),
417418
"-querier.query-store-for-labels-enabled": "true",
418419
"-querier.thanos-engine": strconv.FormatBool(thanosEngine),
420+
"-querier.enable-x-functions": strconv.FormatBool(thanosEngine),
419421
// Ingester.
420422
"-ring.store": "consul",
421423
"-consul.hostname": consul.NetworkHTTPEndpoint(),
@@ -1310,3 +1312,67 @@ func TestQuerierMaxSamplesLimit(t *testing.T) {
13101312
Error: "query processing would load too many samples into memory in query execution",
13111313
})
13121314
}
1315+
1316+
func TestQuerierEngineConfigs(t *testing.T) {
1317+
const blockRangePeriod = 5 * time.Second
1318+
1319+
s, err := e2e.NewScenario(networkName)
1320+
require.NoError(t, err)
1321+
defer s.Close()
1322+
1323+
// Configure the blocks storage to frequently compact TSDB head
1324+
// and ship blocks to the storage.
1325+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
1326+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
1327+
"-blocks-storage.tsdb.ship-interval": "1s",
1328+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
1329+
"-querier.thanos_engine": "true",
1330+
"-querier.enable-x-functions": "true",
1331+
"-querier.optimizers": "all",
1332+
})
1333+
1334+
// Start dependencies.
1335+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
1336+
consul := e2edb.NewConsul()
1337+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1338+
1339+
// Start Cortex components for the write path.
1340+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1341+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1342+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1343+
1344+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
1345+
require.NoError(t, s.Start(queryFrontend))
1346+
1347+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1348+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1349+
}), "")
1350+
require.NoError(t, s.StartAndWaitReady(querier))
1351+
1352+
// Wait until the distributor and querier has updated the ring.
1353+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1354+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1355+
1356+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
1357+
require.NoError(t, err)
1358+
1359+
// Push some series to Cortex.
1360+
series1Timestamp := time.Now()
1361+
series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
1362+
series2, _ := generateSeries("series_2", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
1363+
1364+
res, err := c.Push(series1)
1365+
require.NoError(t, err)
1366+
require.Equal(t, 200, res.StatusCode)
1367+
res, err = c.Push(series2)
1368+
require.NoError(t, err)
1369+
require.Equal(t, 200, res.StatusCode)
1370+
1371+
for xFunc, _ := range parse.XFunctions {
1372+
// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
1373+
result, err := c.Query(fmt.Sprintf(`%s({job="test"}[1m])`, xFunc), series1Timestamp)
1374+
require.NoError(t, err)
1375+
require.Equal(t, model.ValVector, result.Type())
1376+
}
1377+
1378+
}

pkg/cortex/modules.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
78
"log/slog"
89
"net/http"
910
"runtime"
@@ -18,8 +19,6 @@ import (
1819
"github.com/prometheus/prometheus/rules"
1920
prom_storage "github.com/prometheus/prometheus/storage"
2021
"github.com/thanos-io/objstore"
21-
"github.com/thanos-io/promql-engine/engine"
22-
"github.com/thanos-io/promql-engine/logicalplan"
2322
"github.com/thanos-io/thanos/pkg/discovery/dns"
2423
"github.com/thanos-io/thanos/pkg/querysharding"
2524
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
@@ -32,6 +31,7 @@ import (
3231
configAPI "github.com/cortexproject/cortex/pkg/configs/api"
3332
"github.com/cortexproject/cortex/pkg/configs/db"
3433
"github.com/cortexproject/cortex/pkg/distributor"
34+
"github.com/cortexproject/cortex/pkg/engine"
3535
"github.com/cortexproject/cortex/pkg/flusher"
3636
"github.com/cortexproject/cortex/pkg/frontend"
3737
"github.com/cortexproject/cortex/pkg/frontend/transport"
@@ -634,7 +634,6 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
634634
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
635635
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
636636

637-
var queryEngine promql.QueryEngine
638637
opts := promql.EngineOpts{
639638
Logger: util_log.SLogger,
640639
Reg: rulerRegisterer,
@@ -649,15 +648,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
649648
return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds()
650649
},
651650
}
652-
if t.Cfg.Querier.ThanosEngine {
653-
queryEngine = engine.New(engine.Opts{
654-
EngineOpts: opts,
655-
LogicalOptimizers: logicalplan.AllOptimizers,
656-
EnableAnalysis: true,
657-
})
658-
} else {
659-
queryEngine = promql.NewEngine(opts)
660-
}
651+
queryEngine := engine.New(opts, t.Cfg.Querier.Engine, rulerRegisterer)
661652

662653
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
663654
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)

pkg/engine/config.go

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package engine
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/thanos-io/promql-engine/logicalplan"
9+
)
10+
11+
var supportedOptimizers = []string{"default", "all", "propagate-matchers", "sort-matchers", "merge-selects", "detect-histogram-stats"}
12+
13+
// Config contains the configuration to create engine
14+
type Config struct {
15+
EnableThanosEngine bool `yaml:"enable_thanos_engine"`
16+
EnableXFunctions bool `yaml:"enable_x_functions"`
17+
Optimizers string `yaml:"optimizers"`
18+
LogicalOptimizers []logicalplan.Optimizer `yaml:"-"`
19+
}
20+
21+
// RegisterFlags adds the flags required to config this to the given FlagSet.
22+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
23+
f.BoolVar(&cfg.EnableThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
24+
f.BoolVar(&cfg.EnableXFunctions, "querier.enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.")
25+
f.StringVar(&cfg.Optimizers, "querier.optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", "))
26+
}
27+
28+
func (cfg *Config) Validate() error {
29+
splitOptimizers := strings.Split(cfg.Optimizers, ",")
30+
31+
for _, optimizer := range splitOptimizers {
32+
if optimizer == "all" || optimizer == "default" {
33+
if len(splitOptimizers) > 1 {
34+
return fmt.Errorf("special optimizer %s cannot be combined with other optimizers", optimizer)
35+
}
36+
}
37+
optimizers, err := getOptimizer(optimizer)
38+
if err != nil {
39+
return err
40+
}
41+
cfg.LogicalOptimizers = append(cfg.LogicalOptimizers, optimizers...)
42+
}
43+
44+
return nil
45+
}
46+
47+
func getOptimizer(name string) ([]logicalplan.Optimizer, error) {
48+
switch name {
49+
case "default":
50+
return logicalplan.DefaultOptimizers, nil
51+
case "all":
52+
return logicalplan.AllOptimizers, nil
53+
case "propagate-matchers":
54+
return []logicalplan.Optimizer{logicalplan.PropagateMatchersOptimizer{}}, nil
55+
case "sort-matchers":
56+
return []logicalplan.Optimizer{logicalplan.SortMatchers{}}, nil
57+
case "merge-selects":
58+
return []logicalplan.Optimizer{logicalplan.MergeSelectsOptimizer{}}, nil
59+
case "detect-histogram-stats":
60+
return []logicalplan.Optimizer{logicalplan.DetectHistogramStatsOptimizer{}}, nil
61+
default:
62+
return nil, fmt.Errorf("unknown optimizer %s", name)
63+
}
64+
}

pkg/engine/engine.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/prometheus/prometheus/promql"
1111
"github.com/prometheus/prometheus/storage"
1212
thanosengine "github.com/thanos-io/promql-engine/engine"
13-
"github.com/thanos-io/promql-engine/logicalplan"
1413
)
1514

1615
type engineKeyType struct{}
@@ -52,15 +51,16 @@ type Engine struct {
5251
engineSwitchQueriesTotal *prometheus.CounterVec
5352
}
5453

55-
func New(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *Engine {
54+
func New(opts promql.EngineOpts, engineCfg Config, reg prometheus.Registerer) *Engine {
5655
prometheusEngine := promql.NewEngine(opts)
5756

5857
var thanosEngine *thanosengine.Engine
59-
if enableThanosEngine {
58+
if engineCfg.EnableThanosEngine {
6059
thanosEngine = thanosengine.New(thanosengine.Opts{
6160
EngineOpts: opts,
62-
LogicalOptimizers: logicalplan.DefaultOptimizers,
61+
LogicalOptimizers: engineCfg.LogicalOptimizers,
6362
EnableAnalysis: true,
63+
EnableXFunctions: engineCfg.EnableXFunctions,
6464
})
6565
}
6666

pkg/engine/engine_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package engine
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"net/http"
78
"testing"
89
"time"
@@ -14,6 +15,7 @@ import (
1415
"github.com/prometheus/prometheus/promql/parser"
1516
"github.com/prometheus/prometheus/promql/promqltest"
1617
"github.com/stretchr/testify/require"
18+
"github.com/thanos-io/promql-engine/execution/parse"
1719

1820
utillog "github.com/cortexproject/cortex/pkg/util/log"
1921
)
@@ -37,7 +39,7 @@ func TestEngine_Fallback(t *testing.T) {
3739
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
3840
Reg: reg,
3941
}
40-
queryEngine := New(opts, true, reg)
42+
queryEngine := New(opts, Config{EnableThanosEngine: true}, reg)
4143

4244
// instant query, should go to fallback
4345
_, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now)
@@ -68,7 +70,7 @@ func TestEngine_Switch(t *testing.T) {
6870
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
6971
Reg: reg,
7072
}
71-
queryEngine := New(opts, true, reg)
73+
queryEngine := New(opts, Config{EnableThanosEngine: true}, reg)
7274

7375
// Query Prometheus engine
7476
r := &http.Request{Header: http.Header{}}
@@ -96,3 +98,28 @@ func TestEngine_Switch(t *testing.T) {
9698
cortex_engine_switch_queries_total{engine_type="thanos"} 2
9799
`), "cortex_engine_switch_queries_total"))
98100
}
101+
102+
func TestEngine_XFunctions(t *testing.T) {
103+
ctx := context.Background()
104+
reg := prometheus.NewRegistry()
105+
106+
now := time.Now()
107+
start := time.Now().Add(-time.Minute * 5)
108+
step := time.Minute
109+
queryable := promqltest.LoadedStorage(t, "")
110+
opts := promql.EngineOpts{
111+
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
112+
Reg: reg,
113+
}
114+
queryEngine := New(opts, Config{EnableThanosEngine: true, EnableXFunctions: true}, reg)
115+
116+
for name := range parse.XFunctions {
117+
t.Run(name, func(t *testing.T) {
118+
_, err := queryEngine.NewInstantQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), now)
119+
require.NoError(t, err)
120+
121+
_, err = queryEngine.NewRangeQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), start, now, step)
122+
require.NoError(t, err)
123+
})
124+
}
125+
}

0 commit comments

Comments
 (0)