Skip to content

Commit edea965

Browse files
committed
Refactor engine config
Signed-off-by: 🌲 Harry 🌊 John 🏔 <[email protected]>
1 parent 4c0957a commit edea965

File tree

8 files changed

+205
-33
lines changed

8 files changed

+205
-33
lines changed

docs/blocks-storage/querier.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,22 @@ querier:
252252
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
253253
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
254254

255-
# Experimental. Use Thanos promql engine
256-
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
257-
# engine.
258-
# CLI flag: -querier.thanos-engine
259-
[thanos_engine: <boolean> | default = false]
255+
thanos_engine:
256+
# Experimental. Use Thanos promql engine
257+
# https://github.com/thanos-io/promql-engine rather than the Prometheus
258+
# promql engine.
259+
# CLI flag: -querier.thanos-engine
260+
[enable_thanos_engine: <boolean> | default = false]
261+
262+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
263+
# CLI flag: -querier.enable-x-functions
264+
[enable_x_functions: <boolean> | default = false]
265+
266+
# Logical plan optimizers. Multiple optimizers can be provided as a
267+
# comma-separated list. Supported values: default, all, propagate-matchers,
268+
# sort-matchers, merge-selects, detect-histogram-stats
269+
# CLI flag: -querier.optimizers
270+
[optimizers: <string> | default = "default"]
260271

261272
# If enabled, ignore max query length check at Querier select method. Users
262273
# can choose to ignore it since the validation can be done before Querier

docs/configuration/config-file-reference.md

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4302,11 +4302,22 @@ store_gateway_client:
43024302
# CLI flag: -querier.shuffle-sharding-ingesters-lookback-period
43034303
[shuffle_sharding_ingesters_lookback_period: <duration> | default = 0s]
43044304

4305-
# Experimental. Use Thanos promql engine
4306-
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
4307-
# engine.
4308-
# CLI flag: -querier.thanos-engine
4309-
[thanos_engine: <boolean> | default = false]
4305+
thanos_engine:
4306+
# Experimental. Use Thanos promql engine
4307+
# https://github.com/thanos-io/promql-engine rather than the Prometheus promql
4308+
# engine.
4309+
# CLI flag: -querier.thanos-engine
4310+
[enable_thanos_engine: <boolean> | default = false]
4311+
4312+
# Enable xincrease, xdelta, xrate etc from Thanos engine.
4313+
# CLI flag: -querier.enable-x-functions
4314+
[enable_x_functions: <boolean> | default = false]
4315+
4316+
# Logical plan optimizers. Multiple optimizers can be provided as a
4317+
# comma-separated list. Supported values: default, all, propagate-matchers,
4318+
# sort-matchers, merge-selects, detect-histogram-stats
4319+
# CLI flag: -querier.optimizers
4320+
[optimizers: <string> | default = "default"]
43104321

43114322
# If enabled, ignore max query length check at Querier select method. Users can
43124323
# choose to ignore it since the validation can be done before Querier evaluation

integration/querier_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/prometheus/prometheus/prompb"
2020
"github.com/stretchr/testify/assert"
2121
"github.com/stretchr/testify/require"
22+
"github.com/thanos-io/promql-engine/execution/parse"
2223

2324
"github.com/cortexproject/cortex/integration/e2e"
2425
e2ecache "github.com/cortexproject/cortex/integration/e2e/cache"
@@ -416,6 +417,7 @@ func TestQuerierWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) {
416417
"-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled),
417418
"-querier.query-store-for-labels-enabled": "true",
418419
"-querier.thanos-engine": strconv.FormatBool(thanosEngine),
420+
"-querier.enable-x-functions": strconv.FormatBool(thanosEngine),
419421
// Ingester.
420422
"-ring.store": "consul",
421423
"-consul.hostname": consul.NetworkHTTPEndpoint(),
@@ -1310,3 +1312,67 @@ func TestQuerierMaxSamplesLimit(t *testing.T) {
13101312
Error: "query processing would load too many samples into memory in query execution",
13111313
})
13121314
}
1315+
1316+
func TestQuerierEngineConfigs(t *testing.T) {
1317+
const blockRangePeriod = 5 * time.Second
1318+
1319+
s, err := e2e.NewScenario(networkName)
1320+
require.NoError(t, err)
1321+
defer s.Close()
1322+
1323+
// Configure the blocks storage to frequently compact TSDB head
1324+
// and ship blocks to the storage.
1325+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
1326+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
1327+
"-blocks-storage.tsdb.ship-interval": "1s",
1328+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
1329+
"-querier.thanos_engine": "true",
1330+
"-querier.enable-x-functions": "true",
1331+
"-querier.optimizers": "all",
1332+
})
1333+
1334+
// Start dependencies.
1335+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
1336+
consul := e2edb.NewConsul()
1337+
require.NoError(t, s.StartAndWaitReady(consul, minio))
1338+
1339+
// Start Cortex components for the write path.
1340+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1341+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
1342+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
1343+
1344+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
1345+
require.NoError(t, s.Start(queryFrontend))
1346+
1347+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
1348+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
1349+
}), "")
1350+
require.NoError(t, s.StartAndWaitReady(querier))
1351+
1352+
// Wait until the distributor and querier has updated the ring.
1353+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1354+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
1355+
1356+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), queryFrontend.HTTPEndpoint(), "", "", "user-1")
1357+
require.NoError(t, err)
1358+
1359+
// Push some series to Cortex.
1360+
series1Timestamp := time.Now()
1361+
series1, _ := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
1362+
series2, _ := generateSeries("series_2", series1Timestamp, prompb.Label{Name: "job", Value: "test"})
1363+
1364+
res, err := c.Push(series1)
1365+
require.NoError(t, err)
1366+
require.Equal(t, 200, res.StatusCode)
1367+
res, err = c.Push(series2)
1368+
require.NoError(t, err)
1369+
require.Equal(t, 200, res.StatusCode)
1370+
1371+
for xFunc, _ := range parse.XFunctions {
1372+
// Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both).
1373+
result, err := c.Query(fmt.Sprintf(`%s({job="test"}[1m])`, xFunc), series1Timestamp)
1374+
require.NoError(t, err)
1375+
require.Equal(t, model.ValVector, result.Type())
1376+
}
1377+
1378+
}

pkg/cortex/modules.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
78
"log/slog"
89
"net/http"
910
"runtime"
@@ -18,8 +19,6 @@ import (
1819
"github.com/prometheus/prometheus/rules"
1920
prom_storage "github.com/prometheus/prometheus/storage"
2021
"github.com/thanos-io/objstore"
21-
"github.com/thanos-io/promql-engine/engine"
22-
"github.com/thanos-io/promql-engine/logicalplan"
2322
"github.com/thanos-io/thanos/pkg/discovery/dns"
2423
"github.com/thanos-io/thanos/pkg/querysharding"
2524
httpgrpc_server "github.com/weaveworks/common/httpgrpc/server"
@@ -32,6 +31,7 @@ import (
3231
configAPI "github.com/cortexproject/cortex/pkg/configs/api"
3332
"github.com/cortexproject/cortex/pkg/configs/db"
3433
"github.com/cortexproject/cortex/pkg/distributor"
34+
"github.com/cortexproject/cortex/pkg/engine"
3535
"github.com/cortexproject/cortex/pkg/flusher"
3636
"github.com/cortexproject/cortex/pkg/frontend"
3737
"github.com/cortexproject/cortex/pkg/frontend/transport"
@@ -634,7 +634,6 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
634634
if t.Cfg.ExternalPusher != nil && t.Cfg.ExternalQueryable != nil {
635635
rulerRegisterer := prometheus.WrapRegistererWith(prometheus.Labels{"engine": "ruler"}, prometheus.DefaultRegisterer)
636636

637-
var queryEngine promql.QueryEngine
638637
opts := promql.EngineOpts{
639638
Logger: util_log.SLogger,
640639
Reg: rulerRegisterer,
@@ -649,15 +648,7 @@ func (t *Cortex) initRuler() (serv services.Service, err error) {
649648
return t.Cfg.Querier.DefaultEvaluationInterval.Milliseconds()
650649
},
651650
}
652-
if t.Cfg.Querier.ThanosEngine {
653-
queryEngine = engine.New(engine.Opts{
654-
EngineOpts: opts,
655-
LogicalOptimizers: logicalplan.AllOptimizers,
656-
EnableAnalysis: true,
657-
})
658-
} else {
659-
queryEngine = promql.NewEngine(opts)
660-
}
651+
queryEngine := engine.New(opts, t.Cfg.Querier.Engine, rulerRegisterer)
661652

662653
managerFactory := ruler.DefaultTenantManagerFactory(t.Cfg.Ruler, t.Cfg.ExternalPusher, t.Cfg.ExternalQueryable, queryEngine, t.Overrides, metrics, prometheus.DefaultRegisterer)
663654
manager, err = ruler.NewDefaultMultiTenantManager(t.Cfg.Ruler, t.Overrides, managerFactory, metrics, prometheus.DefaultRegisterer, util_log.Logger)

pkg/engine/config.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package engine
2+
3+
import (
4+
"flag"
5+
"fmt"
6+
"github.com/thanos-io/promql-engine/logicalplan"
7+
"strings"
8+
)
9+
10+
var supportedOptimizers = []string{"default", "all", "propagate-matchers", "sort-matchers", "merge-selects", "detect-histogram-stats"}
11+
12+
// Config contains the configuration to create engine
13+
type Config struct {
14+
EnableThanosEngine bool `yaml:"enable_thanos_engine"`
15+
EnableXFunctions bool `yaml:"enable_x_functions"`
16+
Optimizers string `yaml:"optimizers"`
17+
LogicalOptimizers []logicalplan.Optimizer `yaml:"-"`
18+
}
19+
20+
// RegisterFlags adds the flags required to config this to the given FlagSet.
21+
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
22+
f.BoolVar(&cfg.EnableThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
23+
f.BoolVar(&cfg.EnableXFunctions, "querier.enable-x-functions", false, "Enable xincrease, xdelta, xrate etc from Thanos engine.")
24+
f.StringVar(&cfg.Optimizers, "querier.optimizers", "default", "Logical plan optimizers. Multiple optimizers can be provided as a comma-separated list. Supported values: "+strings.Join(supportedOptimizers, ", "))
25+
}
26+
27+
func (cfg *Config) Validate() error {
28+
splitOptimizers := strings.Split(cfg.Optimizers, ",")
29+
30+
for _, optimizer := range splitOptimizers {
31+
if optimizer == "all" || optimizer == "default" {
32+
if len(splitOptimizers) > 1 {
33+
return fmt.Errorf("special optimizer %s cannot be combined with other optimizers", optimizer)
34+
}
35+
}
36+
optimizers, err := getOptimizer(optimizer)
37+
if err != nil {
38+
return err
39+
}
40+
cfg.LogicalOptimizers = append(cfg.LogicalOptimizers, optimizers...)
41+
}
42+
43+
return nil
44+
}
45+
46+
func getOptimizer(name string) ([]logicalplan.Optimizer, error) {
47+
switch name {
48+
case "default":
49+
return logicalplan.DefaultOptimizers, nil
50+
case "all":
51+
return logicalplan.AllOptimizers, nil
52+
case "propagate-matchers":
53+
return []logicalplan.Optimizer{logicalplan.PropagateMatchersOptimizer{}}, nil
54+
case "sort-matchers":
55+
return []logicalplan.Optimizer{logicalplan.SortMatchers{}}, nil
56+
case "merge-selects":
57+
return []logicalplan.Optimizer{logicalplan.MergeSelectsOptimizer{}}, nil
58+
case "detect-histogram-stats":
59+
return []logicalplan.Optimizer{logicalplan.DetectHistogramStatsOptimizer{}}, nil
60+
default:
61+
return nil, fmt.Errorf("unknown optimizer %s", name)
62+
}
63+
}

pkg/engine/engine.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/prometheus/prometheus/promql"
1111
"github.com/prometheus/prometheus/storage"
1212
thanosengine "github.com/thanos-io/promql-engine/engine"
13-
"github.com/thanos-io/promql-engine/logicalplan"
1413
)
1514

1615
type engineKeyType struct{}
@@ -52,15 +51,16 @@ type Engine struct {
5251
engineSwitchQueriesTotal *prometheus.CounterVec
5352
}
5453

55-
func New(opts promql.EngineOpts, enableThanosEngine bool, reg prometheus.Registerer) *Engine {
54+
func New(opts promql.EngineOpts, engineCfg Config, reg prometheus.Registerer) *Engine {
5655
prometheusEngine := promql.NewEngine(opts)
5756

5857
var thanosEngine *thanosengine.Engine
59-
if enableThanosEngine {
58+
if engineCfg.EnableThanosEngine {
6059
thanosEngine = thanosengine.New(thanosengine.Opts{
6160
EngineOpts: opts,
62-
LogicalOptimizers: logicalplan.DefaultOptimizers,
61+
LogicalOptimizers: engineCfg.LogicalOptimizers,
6362
EnableAnalysis: true,
63+
EnableXFunctions: engineCfg.EnableXFunctions,
6464
})
6565
}
6666

pkg/engine/engine_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package engine
33
import (
44
"bytes"
55
"context"
6+
"fmt"
67
"net/http"
78
"testing"
89
"time"
@@ -14,6 +15,7 @@ import (
1415
"github.com/prometheus/prometheus/promql/parser"
1516
"github.com/prometheus/prometheus/promql/promqltest"
1617
"github.com/stretchr/testify/require"
18+
"github.com/thanos-io/promql-engine/execution/parse"
1719

1820
utillog "github.com/cortexproject/cortex/pkg/util/log"
1921
)
@@ -37,7 +39,7 @@ func TestEngine_Fallback(t *testing.T) {
3739
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
3840
Reg: reg,
3941
}
40-
queryEngine := New(opts, true, reg)
42+
queryEngine := New(opts, Config{EnableThanosEngine: true}, reg)
4143

4244
// instant query, should go to fallback
4345
_, _ = queryEngine.NewInstantQuery(ctx, queryable, nil, "unimplemented(foo)", now)
@@ -68,7 +70,7 @@ func TestEngine_Switch(t *testing.T) {
6870
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
6971
Reg: reg,
7072
}
71-
queryEngine := New(opts, true, reg)
73+
queryEngine := New(opts, Config{EnableThanosEngine: true}, reg)
7274

7375
// Query Prometheus engine
7476
r := &http.Request{Header: http.Header{}}
@@ -96,3 +98,28 @@ func TestEngine_Switch(t *testing.T) {
9698
cortex_engine_switch_queries_total{engine_type="thanos"} 2
9799
`), "cortex_engine_switch_queries_total"))
98100
}
101+
102+
func TestEngine_XFunctions(t *testing.T) {
103+
ctx := context.Background()
104+
reg := prometheus.NewRegistry()
105+
106+
now := time.Now()
107+
start := time.Now().Add(-time.Minute * 5)
108+
step := time.Minute
109+
queryable := promqltest.LoadedStorage(t, "")
110+
opts := promql.EngineOpts{
111+
Logger: utillog.GoKitLogToSlog(log.NewNopLogger()),
112+
Reg: reg,
113+
}
114+
queryEngine := New(opts, Config{EnableThanosEngine: true, EnableXFunctions: true}, reg)
115+
116+
for name := range parse.XFunctions {
117+
t.Run(name, func(t *testing.T) {
118+
_, err := queryEngine.NewInstantQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), now)
119+
require.NoError(t, err)
120+
121+
_, err = queryEngine.NewRangeQuery(ctx, queryable, nil, fmt.Sprintf("%s(foo[1m])", name), start, now, step)
122+
require.NoError(t, err)
123+
})
124+
}
125+
}

pkg/querier/querier.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,7 @@ type Config struct {
8585

8686
ShuffleShardingIngestersLookbackPeriod time.Duration `yaml:"shuffle_sharding_ingesters_lookback_period"`
8787

88-
// Experimental. Use https://github.com/thanos-io/promql-engine rather than
89-
// the Prometheus query engine.
90-
ThanosEngine bool `yaml:"thanos_engine"`
88+
Engine engine.Config `yaml:"thanos_engine"`
9189

9290
// Ignore max query length check at Querier.
9391
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
@@ -139,13 +137,14 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
139137
f.IntVar(&cfg.IngesterQueryMaxAttempts, "querier.ingester-query-max-attempts", 1, "The maximum number of times we attempt fetching data from ingesters for retryable errors (ex. partial data returned).")
140138
f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
141139
f.DurationVar(&cfg.ShuffleShardingIngestersLookbackPeriod, "querier.shuffle-sharding-ingesters-lookback-period", 0, "When distributor's sharding strategy is shuffle-sharding and this setting is > 0, queriers fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since 'now - lookback period'. The lookback period should be greater or equal than the configured 'query store after' and 'query ingesters within'. If this setting is 0, queriers always query all ingesters (ingesters shuffle sharding on read path is disabled).")
142-
f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
143140
f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.")
144141
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
145142
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
146143
f.BoolVar(&cfg.EnableParquetQueryable, "querier.enable-parquet-queryable", false, "[Experimental] If true, querier will try to query the parquet files if available.")
147144
f.IntVar(&cfg.ParquetQueryableShardCacheSize, "querier.parquet-queryable-shard-cache-size", 512, "[Experimental] [Experimental] Maximum size of the Parquet queryable shard cache. 0 to disable.")
148145
f.StringVar(&cfg.ParquetQueryableDefaultBlockStore, "querier.parquet-queryable-default-block-store", string(parquetBlockStore), "Parquet queryable's default block store to query. Valid options are tsdb and parquet. If it is set to tsdb, parquet queryable always fallback to store gateway.")
146+
147+
cfg.Engine.RegisterFlags(f)
149148
}
150149

151150
// Validate the config
@@ -181,6 +180,10 @@ func (cfg *Config) Validate() error {
181180
}
182181
}
183182

183+
if err := cfg.Engine.Validate(); err != nil {
184+
return err
185+
}
186+
184187
return nil
185188
}
186189

@@ -246,7 +249,7 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
246249
return cfg.DefaultEvaluationInterval.Milliseconds()
247250
},
248251
}
249-
queryEngine := engine.New(opts, cfg.ThanosEngine, reg)
252+
queryEngine := engine.New(opts, cfg.Engine, reg)
250253
return NewSampleAndChunkQueryable(lazyQueryable), exemplarQueryable, queryEngine
251254
}
252255

0 commit comments

Comments
 (0)