Skip to content

Commit ca36512

Browse files
authored
Querier: Support configuring optimizers and XFunctions (#6873)
* Querier: Support configuring optimizers and XFunctions Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> * Address comments Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]> --------- Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 956ce98 commit ca36512

33 files changed

+298
-79
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
* [FEATURE] Distributor/Ingester: Implemented experimental feature to use gRPC stream connection for push requests. This can be enabled by setting `-distributor.use-stream-push=true`. #6580
2020
* [FEATURE] Compactor: Add support for percentage based sharding for compactors. #6738
2121
* [FEATURE] Querier: Allow choosing PromQL engine via header. #6777
22+
* [FEATURE] Querier: Support for configuring query optimizers and enabling XFunctions in the Thanos engine. #6873
2223
* [ENHANCEMENT] Tenant Federation: Add a # of query result limit logic when the `-tenant-federation.regex-matcher-enabled` is enabled. #6845
2324
* [ENHANCEMENT] Query Frontend: Add a `cortex_slow_queries_total` metric to track # of slow queries per user. #6859
2425
* [ENHANCEMENT] Query Frontend: Change to return 400 when the tenant resolving fail. #6715

docs/blocks-storage/querier.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,22 @@ querier:
252252
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
253253
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
254254

255-
# Experimental. Use Thanos promql engine
256-
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
257-
# engine.
258-
# CLI flag: -querier.thanos-engine
259-
[thanos_engine: <boolean> | default = false]
255+
thanos_engine:
256+
# Experimental. Use Thanos promql engine
257+
# https://github.com/thanos-io/promql-engine rather than the Prometheus
258+
# promql engine.
259+
# CLI flag: -querier.thanos-engine
260+
[enabled: <boolean> | default = false]
261+
262+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
263+
# CLI flag: -querier.enable-x-functions
264+
[enable_x_functions: <boolean> | default = false]
265+
266+
# Logical plan optimizers. Multiple optimizers can be provided as a
267+
# comma-separated list. Supported values: default, all, propagate-matchers,
268+
# sort-matchers, merge-selects, detect-histogram-stats
269+
# CLI flag: -querier.optimizers
270+
[optimizers: <string> | default = "default"]
260271

261272
# If enabled, ignore max query length check at Querier select method. Users
262273
# can choose to ignore it since the validation can be done before Querier

docs/configuration/config-file-reference.md

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4302,11 +4302,22 @@ store_gateway_client:
43024302
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
43034303
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
43044304

4305-
# Experimental. Use Thanos promql engine
4306-
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
4307-
# engine.
4308-
# CLI flag: -querier.thanos-engine
4309-
[thanos_engine: <boolean> | default = false]
4305+
thanos_engine:
4306+
# Experimental. Use Thanos promql engine
4307+
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
4308+
# engine.
4309+
# CLI flag: -querier.thanos-engine
4310+
[enabled: <boolean> | default = false]
4311+
4312+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
4313+
# CLI flag: -querier.enable-x-functions
4314+
[enable_x_functions: <boolean> | default = false]
4315+
4316+
# Logical plan optimizers. Multiple optimizers can be provided as a
4317+
# comma-separated list. Supported values: default, all, propagate-matchers,
4318+
# sort-matchers, merge-selects, detect-histogram-stats
4319+
# CLI flag: -querier.optimizers
4320+
[optimizers: <string> | default = "default"]
43104321

43114322
# If enabled, ignore max query length check at Querier select method. Users can
43124323
# choose to ignore it since the validation can be done before Querier evaluation
@@ -5023,6 +5034,23 @@ ring:
50235034
# ruler.enable-ha-evaluation is true.
50245035
# CLI flag: -ruler.liveness-check-timeout
50255036
[liveness_check_timeout: <duration> | default = 1s]
5037+
5038+
thanos_engine:
5039+
# Experimental. Use Thanos promql engine
5040+
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
5041+
# engine.
5042+
# CLI flag: -ruler.thanos-engine
5043+
[enabled: <boolean> | default = false]
5044+
5045+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
5046+
# CLI flag: -ruler.enable-x-functions
5047+
[enable_x_functions: <boolean> | default = false]
5048+
5049+
# Logical plan optimizers. Multiple optimizers can be provided as a
5050+
# comma-separated list. Supported values: default, all, propagate-matchers,
5051+
# sort-matchers, merge-selects, detect-histogram-stats
5052+
# CLI flag: -ruler.optimizers
5053+
[optimizers: <string> | default = "default"]
50265054
```
50275055
50285056
### `ruler_storage_config`

integration/querier_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/prometheus/prometheus/prompb"
2020
"github.com/stretchr/testify/assert"
2121
"github.com/stretchr/testify/require"
22+
"github.com/thanos-io/promql-engine/execution/parse"
2223

2324
"github.com/cortexproject/cortex/integration/e2e"
2425
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
@@ -416,6 +417,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
416417
"-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled),
417418
"-querier.query-store-for-labels-enabled": "true",
418419
"-querier.thanos-engine": strconv.FormatBool(thanosEngine),
420+
"-querier.enable-x-functions": strconv.FormatBool(thanosEngine),
419421
// Ingester.
420422
"-ring.store": "consul",
421423
"-consul.hostname": consul.NetworkHTTPEndpoint(),
@@ -1310,3 +1312,66 @@ func TestQuerierMaxSamplesLimit(t *testing.T) {
13101312
Error: "query processing would load too many samples into memory in query execution",
13111313
})
13121314
}
1315+
1316+
func TestQuerierEngineConfigs(t *testing.T) {
1317+
const blockRangePeriod = 5 * time.Second
1318+
1319+
s, err := e2e.NewScenario(networkName)
1320+
require.NoError(t, err)
1321+
defer s.Close()
1322+
1323+
// Configure the blocks storage to frequently compact TSDB head
1324+
// and ship blocks to the storage.
1325+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
1326+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
1327+
"-blocks-storage.tsdb.ship-interval": "1s",
1328+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
1329+
"-querier.thanos-engine": "true",
1330+
"-querier.enable-x-functions": "true",
1331+
"-querier.optimizers": "all",
1332+
})
1333+
1334+
// Start dependencies.
1335+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
1336+
consul := e2edb.NewConsul()
1337+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1338+
1339+
// Start Cortex components for the write path.
1340+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1341+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1342+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1343+
1344+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
1345+
require.NoError(t, s.Start(queryFrontend))
1346+
1347+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1348+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1349+
}), "")
1350+
require.NoError(t, s.StartAndWaitReady(querier))
1351+
1352+
// Wait until the distributor and querier has updated the ring.
1353+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1354+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1355+
1356+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
1357+
require.NoError(t, err)
1358+
1359+
// Push some series to Cortex.
1360+
series1Timestamp := time.Now()
1361+
series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
1362+
series2, _ := generateSeries("series_2", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
1363+
1364+
res, err := c.Push(series1)
1365+
require.NoError(t, err)
1366+
require.Equal(t, 200, res.StatusCode)
1367+
res, err = c.Push(series2)
1368+
require.NoError(t, err)
1369+
require.Equal(t, 200, res.StatusCode)
1370+
1371+
for xFunc := range parse.XFunctions {
1372+
result, err := c.Query(fmt.Sprintf(`%s(series_1{job="test"}[1m])`, xFunc), series1Timestamp)
1373+
require.NoError(t, err)
1374+
require.Equal(t, model.ValVector, result.Type())
1375+
}
1376+
1377+
}

pkg/configs/userconfig/config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,10 @@ import (
99
"github.com/pkg/errors"
1010
"github.com/prometheus/prometheus/model/labels"
1111
"github.com/prometheus/prometheus/model/rulefmt"
12-
"github.com/prometheus/prometheus/promql/parser"
1312
"github.com/prometheus/prometheus/rules"
1413
"gopkg.in/yaml.v3"
1514

15+
"github.com/cortexproject/cortex/pkg/parser"
1616
util_log "github.com/cortexproject/cortex/pkg/util/log"
1717
)
1818

pkg/configs/userconfig/config_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,12 @@ import (
1212
"github.com/prometheus/common/model"
1313
"github.com/prometheus/prometheus/model/labels"
1414
"github.com/prometheus/prometheus/model/rulefmt"
15-
"github.com/prometheus/prometheus/promql/parser"
1615
"github.com/prometheus/prometheus/rules"
1716
"github.com/stretchr/testify/assert"
1817
"github.com/stretchr/testify/require"
1918
"gopkg.in/yaml.v3"
2019

20+
"github.com/cortexproject/cortex/pkg/parser"
2121
util_log "github.com/cortexproject/cortex/pkg/util/log"
2222
)
2323

pkg/cortex/cortex.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,7 @@ func New(cfg Config) (*Cortex, error) {
379379
return nil, err
380380
}
381381

382+
cortex.setupPromQLFunctions()
382383
return cortex, nil
383384
}
384385

@@ -537,3 +538,9 @@ func (t *Cortex) readyHandler(sm *services.Manager) http.HandlerFunc {
537538
util.WriteTextResponse(w, "ready")
538539
}
539540
}
541+
542+
func (t *Cortex) setupPromQLFunctions() {
543+
// The holt_winters function is renamed to double_exponential_smoothing and has been experimental since Prometheus v3. (https://github.com/prometheus/prometheus/pull/14930)
544+
// The cortex supports holt_winters for users using this function.
545+
querier.EnableExperimentalPromQLFunctions(t.Cfg.Querier.EnablePromQLExperimentalFunctions, true)
546+
}

pkg/cortex/modules.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
78
"log/slog"
89
"net/http"
910
"runtime"
@@ -18,8 +19,6 @@ import (
1819
"github.com/prometheus/prometheus/rules"
1920
prom_storage "github.com/prometheus/prometheus/storage"
2021
"github.com/thanos-io/objstore"
21-
"github.com/thanos-io/promql-engine/engine"
22-
"github.com/thanos-io/promql-engine/logicalplan"
2322
"github.com/thanos-io/thanos/pkg/discovery/dns"
2423
"github.com/thanos-io/thanos/pkg/querysharding"
2524
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
@@ -32,6 +31,7 @@ import (
3231
configAPI "github.com/cortexproject/cortex/pkg/configs/api"
3332
"github.com/cortexproject/cortex/pkg/configs/db"
3433
"github.com/cortexproject/cortex/pkg/distributor"
34+
"github.com/cortexproject/cortex/pkg/engine"
3535
"github.com/cortexproject/cortex/pkg/flusher"
3636
"github.com/cortexproject/cortex/pkg/frontend"
3737
"github.com/cortexproject/cortex/pkg/frontend/transport"
@@ -564,7 +564,6 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
564564
t.Cfg.Querier.DefaultEvaluationInterval,
565565
t.Cfg.Querier.MaxSubQuerySteps,
566566
t.Cfg.Querier.LookbackDelta,
567-
t.Cfg.Querier.EnablePromQLExperimentalFunctions,
568567
)
569568

570569
return services.NewIdleService(nil, func(_ error) error {
@@ -643,7 +642,6 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
643642
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
644643
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
645644

646-
var queryEngine promql.QueryEngine
647645
opts := promql.EngineOpts{
648646
Logger: util_log.SLogger,
649647
Reg: rulerRegisterer,
@@ -658,15 +656,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
658656
return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds()
659657
},
660658
}
661-
if t.Cfg.Querier.ThanosEngine {
662-
queryEngine = engine.New(engine.Opts{
663-
EngineOpts: opts,
664-
LogicalOptimizers: logicalplan.AllOptimizers,
665-
EnableAnalysis: true,
666-
})
667-
} else {
668-
queryEngine = promql.NewEngine(opts)
669-
}
659+
queryEngine := engine.New(opts, t.Cfg.Ruler.ThanosEngine, rulerRegisterer)
670660

671661
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
672662
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)

pkg/engine/config.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package engine
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"strings"
7+
8+
"github.com/thanos-io/promql-engine/logicalplan"
9+
)
10+
11+
var supportedOptimizers = []string{"default", "all", "propagate-matchers", "sort-matchers", "merge-selects", "detect-histogram-stats"}
12+
13+
// ThanosEngineConfig contains the configuration to create engine
14+
type ThanosEngineConfig struct {
15+
Enabled bool `yaml:"enabled"`
16+
EnableXFunctions bool `yaml:"enable_x_functions"`
17+
Optimizers string `yaml:"optimizers"`
18+
LogicalOptimizers []logicalplan.Optimizer `yaml:"-"`
19+
}
20+
21+
func (cfg *ThanosEngineConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
22+
f.BoolVar(&cfg.Enabled, prefix+"thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
23+
f.BoolVar(&cfg.EnableXFunctions, prefix+"enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.")
24+
f.StringVar(&cfg.Optimizers, prefix+"optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", "))
25+
}
26+
27+
func (cfg *ThanosEngineConfig) Validate() error {
28+
splitOptimizers := strings.Split(cfg.Optimizers, ",")
29+
30+
for _, optimizer := range splitOptimizers {
31+
if optimizer == "all" || optimizer == "default" {
32+
if len(splitOptimizers) > 1 {
33+
return fmt.Errorf("special optimizer %s cannot be combined with other optimizers", optimizer)
34+
}
35+
}
36+
optimizers, err := getOptimizer(optimizer)
37+
if err != nil {
38+
return err
39+
}
40+
cfg.LogicalOptimizers = append(cfg.LogicalOptimizers, optimizers...)
41+
}
42+
43+
return nil
44+
}
45+
46+
func getOptimizer(name string) ([]logicalplan.Optimizer, error) {
47+
switch name {
48+
case "default":
49+
return logicalplan.DefaultOptimizers, nil
50+
case "all":
51+
return logicalplan.AllOptimizers, nil
52+
case "propagate-matchers":
53+
return []logicalplan.Optimizer{logicalplan.PropagateMatchersOptimizer{}}, nil
54+
case "sort-matchers":
55+
return []logicalplan.Optimizer{logicalplan.SortMatchers{}}, nil
56+
case "merge-selects":
57+
return []logicalplan.Optimizer{logicalplan.MergeSelectsOptimizer{}}, nil
58+
case "detect-histogram-stats":
59+
return []logicalplan.Optimizer{logicalplan.DetectHistogramStatsOptimizer{}}, nil
60+
default:
61+
return nil, fmt.Errorf("unknown optimizer %s", name)
62+
}
63+
}

pkg/engine/engine.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/prometheus/prometheus/promql"
1111
"github.com/prometheus/prometheus/storage"
1212
thanosengine "github.com/thanos-io/promql-engine/engine"
13-
"github.com/thanos-io/promql-engine/logicalplan"
1413
)
1514

1615
type engineKeyType struct{}
@@ -52,15 +51,16 @@ type Engine struct {
5251
engineSwitchQueriesTotal *prometheus.CounterVec
5352
}
5453

55-
func New(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *Engine {
54+
func New(opts promql.EngineOpts, thanosEngineCfg ThanosEngineConfig, reg prometheus.Registerer) *Engine {
5655
prometheusEngine := promql.NewEngine(opts)
5756

5857
var thanosEngine *thanosengine.Engine
59-
if enableThanosEngine {
58+
if thanosEngineCfg.Enabled {
6059
thanosEngine = thanosengine.New(thanosengine.Opts{
6160
EngineOpts: opts,
62-
LogicalOptimizers: logicalplan.DefaultOptimizers,
61+
LogicalOptimizers: thanosEngineCfg.LogicalOptimizers,
6362
EnableAnalysis: true,
63+
EnableXFunctions: thanosEngineCfg.EnableXFunctions,
6464
})
6565
}
6666

0 commit comments

Comments
 (0)