diff --git a/cmd/prometheus/main.go b/cmd/prometheus/main.go
index 95a581a74..300f49f39 100644
--- a/cmd/prometheus/main.go
+++ b/cmd/prometheus/main.go
@@ -136,6 +136,7 @@ type flagConfig struct {
forGracePeriod model.Duration
outageTolerance model.Duration
resendDelay model.Duration
+ maxConcurrentEvals int64
web web.Options
scrape scrape.Options
tsdb tsdbOptions
@@ -154,6 +155,7 @@ type flagConfig struct {
enableNewSDManager bool
enablePerStepStats bool
enableAutoGOMAXPROCS bool
+ enableConcurrentRuleEval bool
prometheusURL string
corsRegexString string
@@ -197,6 +199,9 @@ func (c *flagConfig) setFeatureListOptions(logger log.Logger) error {
case "auto-gomaxprocs":
c.enableAutoGOMAXPROCS = true
level.Info(logger).Log("msg", "Automatically set GOMAXPROCS to match Linux container CPU quota")
+ case "concurrent-rule-eval":
+ c.enableConcurrentRuleEval = true
+ level.Info(logger).Log("msg", "Experimental concurrent rule evaluation enabled.")
case "no-default-scrape-port":
c.scrape.NoDefaultPort = true
level.Info(logger).Log("msg", "No default port will be appended to scrape targets' addresses.")
@@ -402,6 +407,9 @@ func main() {
serverOnlyFlag(a, "rules.alert.resend-delay", "Minimum amount of time to wait before resending an alert to Alertmanager.").
Default("1m").SetValue(&cfg.resendDelay)
+ serverOnlyFlag(a, "rules.max-concurrent-evals", "Global concurrency limit for independent rules which can run concurrently.").
+ Default("4").Int64Var(&cfg.maxConcurrentEvals)
+
a.Flag("scrape.adjust-timestamps", "Adjust scrape timestamps by up to `scrape.timestamp-tolerance` to align them to the intended schedule. See https://github.com/prometheus/prometheus/issues/7846 for more context. Experimental. This flag will be removed in a future release.").
Hidden().Default("true").BoolVar(&scrape.AlignScrapeTimestamps)
@@ -714,17 +722,19 @@ func main() {
queryEngine = promql.NewEngine(opts)
ruleManager = rules.NewManager(&rules.ManagerOptions{
- Appendable: fanoutStorage,
- Queryable: localStorage,
- QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
- NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
- Context: ctxRule,
- ExternalURL: cfg.web.ExternalURL,
- Registerer: prometheus.DefaultRegisterer,
- Logger: log.With(logger, "component", "rule manager"),
- OutageTolerance: time.Duration(cfg.outageTolerance),
- ForGracePeriod: time.Duration(cfg.forGracePeriod),
- ResendDelay: time.Duration(cfg.resendDelay),
+ Appendable: fanoutStorage,
+ Queryable: localStorage,
+ QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
+ NotifyFunc: rules.SendAlerts(notifierManager, cfg.web.ExternalURL.String()),
+ Context: ctxRule,
+ ExternalURL: cfg.web.ExternalURL,
+ Registerer: prometheus.DefaultRegisterer,
+ Logger: log.With(logger, "component", "rule manager"),
+ OutageTolerance: time.Duration(cfg.outageTolerance),
+ ForGracePeriod: time.Duration(cfg.forGracePeriod),
+ ResendDelay: time.Duration(cfg.resendDelay),
+ MaxConcurrentEvals: cfg.maxConcurrentEvals,
+ ConcurrentEvalsEnabled: cfg.enableConcurrentRuleEval,
})
}
diff --git a/docs/command-line/prometheus.md b/docs/command-line/prometheus.md
index cd6dac555..68c61c2bc 100644
--- a/docs/command-line/prometheus.md
+++ b/docs/command-line/prometheus.md
@@ -47,6 +47,7 @@ The Prometheus monitoring server
| --rules.alert.for-outage-tolerance
| Max time to tolerate prometheus outage for restoring "for" state of alert. Use with server mode only. | `1h` |
| --rules.alert.for-grace-period
| Minimum duration between alert and restored "for" state. This is maintained only for alerts with configured "for" time greater than grace period. Use with server mode only. | `10m` |
| --rules.alert.resend-delay
| Minimum amount of time to wait before resending an alert to Alertmanager. Use with server mode only. | `1m` |
+| --rules.max-concurrent-evals
| Global concurrency limit for independent rules which can run concurrently. Use with server mode only. | `4` |
| --alertmanager.notification-queue-capacity
| The capacity of the queue for pending Alertmanager notifications. Use with server mode only. | `10000` |
| --query.lookback-delta
| The maximum lookback duration for retrieving metrics during expression evaluations and federation. Use with server mode only. | `5m` |
| --query.timeout
| Maximum time a query may take before being aborted. Use with server mode only. | `2m` |
diff --git a/docs/feature_flags.md b/docs/feature_flags.md
index bcf8309b5..341539f20 100644
--- a/docs/feature_flags.md
+++ b/docs/feature_flags.md
@@ -204,3 +204,13 @@ Enables ingestion of created timestamp. Created timestamps are injected as 0 val
Currently Prometheus supports created timestamps only on the traditional Prometheus Protobuf protocol (WIP for other protocols). As a result, when enabling this feature, the Prometheus protobuf scrape protocol will be prioritized (See `scrape_config.scrape_protocols` settings for more details).
Besides enabling this feature in Prometheus, created timestamps need to be exposed by the application being scraped.
+
+## Concurrent evaluation of independent rules
+
+`--enable-feature=concurrent-rule-eval`
+
+Rule groups execute concurrently, but the rules within a group execute sequentially; this is because rules can use the
+output of a preceding rule as its input. However, if there is no detectable relationship between rules then there is no
+reason to run them sequentially. This can improve rule reliability at the expense of adding more concurrent query
+load. The number of concurrent rule evaluations can be configured with `--rules.max-concurrent-rule-evals` which is set
+to `4` by default.
diff --git a/rules/alerting.go b/rules/alerting.go
index 6602d2dff..90f822668 100644
--- a/rules/alerting.go
+++ b/rules/alerting.go
@@ -323,8 +323,8 @@ const resolvedRetention = 15 * time.Minute
// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
-func (r *AlertingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int) (promql.Vector, error) {
- ctx = NewOriginContext(ctx, NewRuleDetail(r))
+func (r *AlertingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, externalURL *url.URL, limit int, independent bool) (promql.Vector, error) {
+ ctx = NewOriginContext(ctx, NewRuleDetail(r, independent))
res, err := query(ctx, r.vector.String(), ts.Add(-evalDelay))
if err != nil {
diff --git a/rules/alerting_test.go b/rules/alerting_test.go
index f13015f32..69fae1454 100644
--- a/rules/alerting_test.go
+++ b/rules/alerting_test.go
@@ -16,6 +16,7 @@ package rules
import (
"context"
"errors"
+ "fmt"
"testing"
"time"
@@ -166,7 +167,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
- res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@@ -183,7 +184,7 @@ func TestAlertingRuleLabelsUpdate(t *testing.T) {
require.Equal(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
- res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
require.Empty(t, res)
}
@@ -251,7 +252,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalLabels.Eval(
- context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
+ context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
@@ -265,7 +266,7 @@ func TestAlertingRuleExternalLabelsInTemplate(t *testing.T) {
}
res, err = ruleWithExternalLabels.Eval(
- context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
+ context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
@@ -344,7 +345,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := ruleWithoutExternalURL.Eval(
- context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
+ context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
@@ -358,7 +359,7 @@ func TestAlertingRuleExternalURLInTemplate(t *testing.T) {
}
res, err = ruleWithExternalURL.Eval(
- context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
+ context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
@@ -413,7 +414,7 @@ func TestAlertingRuleEmptyLabelFromTemplate(t *testing.T) {
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
res, err := rule.Eval(
- context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0,
+ context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false,
)
require.NoError(t, err)
for _, smpl := range res {
@@ -480,7 +481,7 @@ instance: {{ $v.Labels.instance }}, value: {{ printf "%.0f" $v.Value }};
close(getDoneCh)
}()
_, err = ruleWithQueryInTemplate.Eval(
- context.TODO(), 0, evalTime, slowQueryFunc, nil, 0,
+ context.TODO(), 0, evalTime, slowQueryFunc, nil, 0, false,
)
require.NoError(t, err)
}
@@ -532,7 +533,7 @@ func TestAlertingRuleDuplicate(t *testing.T) {
"",
true, log.NewNopLogger(),
)
- _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
+ _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0, false)
require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying alert labels")
}
@@ -580,7 +581,7 @@ func TestAlertingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0)
for _, test := range tests {
- switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
+ switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit, false); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":
@@ -809,7 +810,7 @@ func TestKeepFiringFor(t *testing.T) {
t.Logf("case %d", i)
evalTime := baseTime.Add(time.Duration(i) * time.Minute)
result[0].T = timestamp.FromTime(evalTime)
- res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@@ -826,7 +827,7 @@ func TestKeepFiringFor(t *testing.T) {
require.Equal(t, result, filteredRes)
}
evalTime := baseTime.Add(time.Duration(len(results)) * time.Minute)
- res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
require.Empty(t, res)
}
@@ -863,7 +864,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
baseTime := time.Unix(0, 0)
result.T = timestamp.FromTime(baseTime)
- res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ res, err := rule.Eval(context.TODO(), 0, baseTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
require.Len(t, res, 2)
@@ -878,7 +879,7 @@ func TestPendingAndKeepFiringFor(t *testing.T) {
}
evalTime := baseTime.Add(time.Minute)
- res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ res, err = rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
require.Empty(t, res)
}
@@ -912,11 +913,15 @@ func TestAlertingEvalWithOrigin(t *testing.T) {
true, log.NewNopLogger(),
)
- _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
- detail = FromOriginContext(ctx)
- return nil, nil
- }, nil, 0)
+ for _, independent := range []bool{true, false} {
+ t.Run(fmt.Sprintf("independent = %t", independent), func(t *testing.T) {
+ _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
+ detail = FromOriginContext(ctx)
+ return nil, nil
+ }, nil, 0, independent)
- require.NoError(t, err)
- require.Equal(t, detail, NewRuleDetail(rule))
+ require.NoError(t, err)
+ require.Equal(t, detail, NewRuleDetail(rule, independent))
+ })
+ }
}
diff --git a/rules/fixtures/rules_dependencies.yaml b/rules/fixtures/rules_dependencies.yaml
new file mode 100644
index 000000000..31d2c6176
--- /dev/null
+++ b/rules/fixtures/rules_dependencies.yaml
@@ -0,0 +1,7 @@
+groups:
+ - name: test
+ rules:
+ - record: job:http_requests:rate5m
+ expr: sum by (job)(rate(http_requests_total[5m]))
+ - record: HighRequestRate
+ expr: job:http_requests:rate5m > 100
diff --git a/rules/fixtures/rules_multiple.yaml b/rules/fixtures/rules_multiple.yaml
new file mode 100644
index 000000000..db57bede1
--- /dev/null
+++ b/rules/fixtures/rules_multiple.yaml
@@ -0,0 +1,14 @@
+groups:
+ - name: test
+ rules:
+ # independents
+ - record: job:http_requests:rate1m
+ expr: sum by (job)(rate(http_requests_total[1m]))
+ - record: job:http_requests:rate5m
+ expr: sum by (job)(rate(http_requests_total[5m]))
+
+ # dependents
+ - record: job:http_requests:rate15m
+ expr: sum by (job)(rate(http_requests_total[15m]))
+ - record: TooManyRequests
+ expr: job:http_requests:rate15m > 100
diff --git a/rules/fixtures/rules_multiple_groups.yaml b/rules/fixtures/rules_multiple_groups.yaml
new file mode 100644
index 000000000..87f31a6ca
--- /dev/null
+++ b/rules/fixtures/rules_multiple_groups.yaml
@@ -0,0 +1,28 @@
+groups:
+ - name: http
+ rules:
+ # independents
+ - record: job:http_requests:rate1m
+ expr: sum by (job)(rate(http_requests_total[1m]))
+ - record: job:http_requests:rate5m
+ expr: sum by (job)(rate(http_requests_total[5m]))
+
+ # dependents
+ - record: job:http_requests:rate15m
+ expr: sum by (job)(rate(http_requests_total[15m]))
+ - record: TooManyHTTPRequests
+ expr: job:http_requests:rate15m > 100
+
+ - name: grpc
+ rules:
+ # independents
+ - record: job:grpc_requests:rate1m
+ expr: sum by (job)(rate(grpc_requests_total[1m]))
+ - record: job:grpc_requests:rate5m
+ expr: sum by (job)(rate(grpc_requests_total[5m]))
+
+ # dependents
+ - record: job:grpc_requests:rate15m
+ expr: sum by (job)(rate(grpc_requests_total[15m]))
+ - record: TooManyGRPCRequests
+ expr: job:grpc_requests:rate15m > 100
diff --git a/rules/fixtures/rules_multiple_independent.yaml b/rules/fixtures/rules_multiple_independent.yaml
new file mode 100644
index 000000000..e071be3ef
--- /dev/null
+++ b/rules/fixtures/rules_multiple_independent.yaml
@@ -0,0 +1,15 @@
+groups:
+ - name: independents
+ rules:
+ - record: job:http_requests:rate1m
+ expr: sum by (job)(rate(http_requests_total[1m]))
+ - record: job:http_requests:rate5m
+ expr: sum by (job)(rate(http_requests_total[5m]))
+ - record: job:http_requests:rate15m
+ expr: sum by (job)(rate(http_requests_total[15m]))
+ - record: job:http_requests:rate30m
+ expr: sum by (job)(rate(http_requests_total[30m]))
+ - record: job:http_requests:rate1h
+ expr: sum by (job)(rate(http_requests_total[1h]))
+ - record: job:http_requests:rate2h
+ expr: sum by (job)(rate(http_requests_total[2h]))
diff --git a/rules/group.go b/rules/group.go
index 29afb5e5b..600715253 100644
--- a/rules/group.go
+++ b/rules/group.go
@@ -22,8 +22,11 @@ import (
"sync"
"time"
+ "go.uber.org/atomic"
"golang.org/x/exp/slices"
+ "github.com/prometheus/prometheus/promql/parser"
+
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
@@ -437,9 +440,14 @@ func (g *Group) CopyState(from *Group) {
}
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
+// Rules can be evaluated concurrently if the `concurrent-rule-eval` feature flag is enabled.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
- var samplesTotal float64
- evaluationDelay := g.EvaluationDelay()
+ var (
+ samplesTotal atomic.Float64
+ wg sync.WaitGroup
+ evaluationDelay = g.EvaluationDelay()
+ )
+
for i, rule := range g.rules {
select {
case <-g.done:
@@ -447,7 +455,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
default:
}
- func(i int, rule Rule) {
+ eval := func(i int, rule Rule, independent, async bool) {
+ defer func() {
+ if async {
+ wg.Done()
+ g.opts.RuleConcurrencyController.Done()
+ }
+ }()
+
logger := log.WithPrefix(g.logger, "name", rule.Name(), "index", i)
ctx, sp := otel.Tracer("").Start(ctx, "rule")
sp.SetAttributes(attribute.String("name", rule.Name()))
@@ -466,7 +481,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
- vector, err := rule.Eval(ctx, evaluationDelay, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit())
+ vector, err := rule.Eval(ctx, evaluationDelay, ts, g.opts.QueryFunc, g.opts.ExternalURL, g.Limit(), independent)
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
@@ -483,7 +498,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
rule.SetHealth(HealthGood)
rule.SetLastError(nil)
- samplesTotal += float64(len(vector))
+ samplesTotal.Add(float64(len(vector)))
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
@@ -572,10 +587,23 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
}
}
}
- }(i, rule)
+ }
+
+ // If the rule has no dependencies, it can run concurrently because no other rules in this group depend on its output.
+ independent := g.opts.RuleDependencyController.IsRuleIndependent(g, rule)
+
+ // Try run concurrently if there are slots available.
+ if ctrl := g.opts.RuleConcurrencyController; independent && ctrl != nil && ctrl.Allow() {
+ wg.Add(1)
+ go eval(i, rule, independent, true)
+ } else {
+ eval(i, rule, independent, false)
+ }
}
+
+ wg.Wait()
if g.metrics != nil {
- g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
+ g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal.Load())
}
g.cleanupStaleSeries(ctx, ts)
}
@@ -921,3 +949,113 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics {
return m
}
+
+// dependencyMap is a data-structure which contains the relationships between rules within a group.
+// It is used to describe the dependency associations between rules in a group whereby one rule uses the
+// output metric produced by another rule in its expression (i.e. as its "input").
+type dependencyMap map[Rule][]Rule
+
+// dependents returns the count of rules which use the output of the given rule as one of their inputs.
+func (m dependencyMap) dependents(r Rule) int {
+ return len(m[r])
+}
+
+// dependencies returns the count of rules on which the given rule is dependent for input.
+func (m dependencyMap) dependencies(r Rule) int {
+ if len(m) == 0 {
+ return 0
+ }
+
+ var count int
+ for _, children := range m {
+ for _, child := range children {
+ if child == r {
+ count++
+ }
+ }
+ }
+
+ return count
+}
+
+// isIndependent determines whether the given rule is not dependent on another rule for its input, nor is any other rule
+// dependent on its output.
+func (m dependencyMap) isIndependent(r Rule) bool {
+ if m == nil {
+ return false
+ }
+
+ return m.dependents(r)+m.dependencies(r) == 0
+}
+
+// buildDependencyMap builds a data-structure which contains the relationships between rules within a group.
+//
+// Alert rules, by definition, cannot have any dependents - but they can have dependencies. Any recording rule on whose
+// output an Alert rule depends will not be able to run concurrently.
+//
+// There is a class of rule expressions which are considered "indeterminate", because either relationships cannot be
+// inferred, or concurrent evaluation of rules depending on these series would produce undefined/unexpected behaviour:
+// - wildcard queriers like {cluster="prod1"} which would match every series with that label selector
+// - any "meta" series (series produced by Prometheus itself) like ALERTS, ALERTS_FOR_STATE
+//
+// Rules which are independent can run concurrently with no side-effects.
+func buildDependencyMap(rules []Rule) dependencyMap {
+ dependencies := make(dependencyMap)
+
+ if len(rules) <= 1 {
+ // No relationships if group has 1 or fewer rules.
+ return nil
+ }
+
+ inputs := make(map[string][]Rule, len(rules))
+ outputs := make(map[string][]Rule, len(rules))
+
+ var indeterminate bool
+
+ for _, rule := range rules {
+ rule := rule
+
+ name := rule.Name()
+ outputs[name] = append(outputs[name], rule)
+
+ parser.Inspect(rule.Query(), func(node parser.Node, path []parser.Node) error {
+ if n, ok := node.(*parser.VectorSelector); ok {
+ // A wildcard metric expression means we cannot reliably determine if this rule depends on any other,
+ // which means we cannot safely run any rules concurrently.
+ if n.Name == "" && len(n.LabelMatchers) > 0 {
+ indeterminate = true
+ return nil
+ }
+
+ // Rules which depend on "meta-metrics" like ALERTS and ALERTS_FOR_STATE will have undefined behaviour
+ // if they run concurrently.
+ if n.Name == alertMetricName || n.Name == alertForStateMetricName {
+ indeterminate = true
+ return nil
+ }
+
+ inputs[n.Name] = append(inputs[n.Name], rule)
+ }
+ return nil
+ })
+ }
+
+ if indeterminate {
+ return nil
+ }
+
+ if len(inputs) == 0 || len(outputs) == 0 {
+ // No relationships can be inferred.
+ return nil
+ }
+
+ for output, outRules := range outputs {
+ for _, outRule := range outRules {
+ if rs, found := inputs[output]; found && len(rs) > 0 {
+ dependencies[outRule] = append(dependencies[outRule], rs...)
+ }
+ }
+ }
+
+ return dependencies
+}
diff --git a/rules/manager.go b/rules/manager.go
index 5ed34be2b..b6e938563 100644
--- a/rules/manager.go
+++ b/rules/manager.go
@@ -26,6 +26,7 @@ import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/exp/slices"
+ "golang.org/x/sync/semaphore"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/rulefmt"
@@ -105,22 +106,28 @@ type ContextWrapFunc func(ctx context.Context, g *Group) context.Context
// ManagerOptions bundles options for the Manager.
type ManagerOptions struct {
- ExternalURL *url.URL
- QueryFunc QueryFunc
- NotifyFunc NotifyFunc
- Context context.Context
+ ExternalURL *url.URL
+ QueryFunc QueryFunc
+ NotifyFunc NotifyFunc
+ Context context.Context
+ Appendable storage.Appendable
+ Queryable storage.Queryable
+ Logger log.Logger
+ Registerer prometheus.Registerer
+ OutageTolerance time.Duration
+ ForGracePeriod time.Duration
+ ResendDelay time.Duration
+ GroupLoader GroupLoader
+ MaxConcurrentEvals int64
+ ConcurrentEvalsEnabled bool
+ RuleDependencyController RuleDependencyController
+ RuleConcurrencyController RuleConcurrencyController
+
+ DefaultEvaluationDelay func() time.Duration
+
// GroupEvaluationContextFunc will be called to wrap Context based on the group being evaluated.
// Will be skipped if nil.
GroupEvaluationContextFunc ContextWrapFunc
- Appendable storage.Appendable
- Queryable storage.Queryable
- Logger log.Logger
- Registerer prometheus.Registerer
- OutageTolerance time.Duration
- ForGracePeriod time.Duration
- ResendDelay time.Duration
- GroupLoader GroupLoader
- DefaultEvaluationDelay func() time.Duration
// AlwaysRestoreAlertState forces all new or changed groups in calls to Update to restore.
// Useful when you know you will be adding alerting rules after the manager has already started.
@@ -140,6 +147,14 @@ func NewManager(o *ManagerOptions) *Manager {
o.GroupLoader = FileLoader{}
}
+ if o.RuleDependencyController == nil {
+ o.RuleDependencyController = NewRuleDependencyController()
+ }
+
+ if o.RuleConcurrencyController == nil {
+ o.RuleConcurrencyController = newRuleConcurrencyController(o.ConcurrentEvalsEnabled, o.MaxConcurrentEvals)
+ }
+
m := &Manager{
groups: map[string]*Group{},
opts: o,
@@ -186,6 +201,10 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
m.mtx.Lock()
defer m.mtx.Unlock()
+ if m.opts.RuleDependencyController != nil {
+ m.opts.RuleDependencyController.Invalidate()
+ }
+
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, groupEvalIterationFunc, files...)
if errs != nil {
@@ -421,3 +440,85 @@ func SendAlerts(s Sender, externalURL string) NotifyFunc {
}
}
}
+
+// TODO doc
+type RuleDependencyController interface {
+ // TODO doc
+ IsRuleIndependent(g *Group, r Rule) bool
+
+ // TODO doc
+ Invalidate()
+}
+
+// TODO unit test
+type ruleDependencyController struct {
+ depMapsMu sync.Mutex
+ depMaps map[*Group]dependencyMap
+}
+
+func NewRuleDependencyController() RuleDependencyController {
+ return &ruleDependencyController{
+ depMaps: map[*Group]dependencyMap{},
+ }
+}
+
+func (c *ruleDependencyController) IsRuleIndependent(g *Group, r Rule) bool {
+ c.depMapsMu.Lock()
+ defer c.depMapsMu.Unlock()
+
+ depMap, found := c.depMaps[g]
+ if !found {
+ depMap = buildDependencyMap(g.rules)
+ c.depMaps[g] = depMap
+ }
+
+ return depMap.isIndependent(r)
+}
+
+func (c *ruleDependencyController) Invalidate() {
+ c.depMapsMu.Lock()
+ defer c.depMapsMu.Unlock()
+
+ // Clear out the memoized dependency maps because some or all groups may have been updated.
+ c.depMaps = map[*Group]dependencyMap{}
+}
+
+// RuleConcurrencyController controls whether rules can be evaluated concurrently. Its purpose it to bound the amount
+// of concurrency in rule evaluations, to not overwhelm the Prometheus server with additional query load.
+// Concurrency is controlled globally, not on a per-group basis.
+type RuleConcurrencyController interface {
+ // Allow determines whether any concurrent evaluation slots are available.
+ Allow() bool
+
+ // Done releases a concurrent evaluation slot.
+ Done()
+}
+
+func newRuleConcurrencyController(enabled bool, maxConcurrency int64) RuleConcurrencyController {
+ return &concurrentRuleEvalController{
+ enabled: enabled,
+ sema: semaphore.NewWeighted(maxConcurrency),
+ }
+}
+
+// concurrentRuleEvalController holds a weighted semaphore which controls the concurrent evaluation of rules.
+type concurrentRuleEvalController struct {
+ enabled bool
+ sema *semaphore.Weighted
+}
+
+func (c *concurrentRuleEvalController) Allow() bool {
+ if !c.enabled {
+ return false
+ }
+
+ return c.sema.TryAcquire(1)
+}
+
+func (c *concurrentRuleEvalController) Done() {
+ if !c.enabled {
+ return
+ }
+
+ c.sema.Release(1)
+}
diff --git a/rules/manager_test.go b/rules/manager_test.go
index 40c3b5b4c..6786ff892 100644
--- a/rules/manager_test.go
+++ b/rules/manager_test.go
@@ -21,11 +21,13 @@ import (
"os"
"path"
"sort"
+ "sync"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
+ "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
@@ -41,11 +43,11 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/teststorage"
- "github.com/prometheus/prometheus/util/testutil"
+ promtestutil "github.com/prometheus/prometheus/util/testutil"
)
func TestMain(m *testing.M) {
- testutil.TolerantVerifyLeak(m)
+ promtestutil.TolerantVerifyLeak(m)
}
func TestAlertingRule(t *testing.T) {
@@ -159,7 +161,7 @@ func TestAlertingRule(t *testing.T) {
evalTime := baseTime.Add(test.time)
- res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ res, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS_FOR_STATE' samples.
@@ -307,7 +309,7 @@ func TestForStateAddSamples(t *testing.T) {
forState = float64(value.StaleNaN)
}
- res, err := rule.Eval(context.TODO(), evalDelay, evalTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ res, err := rule.Eval(context.TODO(), evalDelay, evalTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
var filteredRes promql.Vector // After removing 'ALERTS' samples.
@@ -369,6 +371,8 @@ func TestForStateRestore(t *testing.T) {
NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {},
OutageTolerance: 30 * time.Minute,
ForGracePeriod: 10 * time.Minute,
+
+ RuleDependencyController: NewRuleDependencyController(),
}
alertForDuration := 25 * time.Minute
@@ -541,6 +545,8 @@ func TestStaleness(t *testing.T) {
Queryable: st,
Context: context.Background(),
Logger: log.NewNopLogger(),
+
+ RuleDependencyController: NewRuleDependencyController(),
}
expr, err := parser.ParseExpr("a + 1")
@@ -733,6 +739,8 @@ func TestUpdate(t *testing.T) {
QueryFunc: EngineQueryFunc(engine, st),
Context: context.Background(),
Logger: log.NewNopLogger(),
+
+ RuleDependencyController: NewRuleDependencyController(),
})
ruleManager.start()
defer ruleManager.Stop()
@@ -810,11 +818,12 @@ func TestUpdate_AlwaysRestore(t *testing.T) {
defer st.Close()
ruleManager := NewManager(&ManagerOptions{
- Appendable: st,
- Queryable: st,
- Context: context.Background(),
- Logger: log.NewNopLogger(),
- AlwaysRestoreAlertState: true,
+ Appendable: st,
+ Queryable: st,
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ AlwaysRestoreAlertState: true,
+ RuleDependencyController: NewRuleDependencyController(),
})
ruleManager.start()
defer ruleManager.Stop()
@@ -842,11 +851,12 @@ func TestUpdate_AlwaysRestoreDoesntAffectUnchangedGroups(t *testing.T) {
defer st.Close()
ruleManager := NewManager(&ManagerOptions{
- Appendable: st,
- Queryable: st,
- Context: context.Background(),
- Logger: log.NewNopLogger(),
- AlwaysRestoreAlertState: true,
+ Appendable: st,
+ Queryable: st,
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ AlwaysRestoreAlertState: true,
+ RuleDependencyController: NewRuleDependencyController(),
})
ruleManager.start()
defer ruleManager.Stop()
@@ -880,11 +890,12 @@ func TestUpdateSetsSourceTenants(t *testing.T) {
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
- Appendable: st,
- Queryable: st,
- QueryFunc: EngineQueryFunc(engine, st),
- Context: context.Background(),
- Logger: log.NewNopLogger(),
+ Appendable: st,
+ Queryable: st,
+ QueryFunc: EngineQueryFunc(engine, st),
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ RuleDependencyController: NewRuleDependencyController(),
})
ruleManager.start()
defer ruleManager.Stop()
@@ -927,6 +938,8 @@ func TestAlignEvaluationTimeOnInterval(t *testing.T) {
QueryFunc: EngineQueryFunc(engine, st),
Context: context.Background(),
Logger: log.NewNopLogger(),
+
+ RuleDependencyController: NewRuleDependencyController(),
})
ruleManager.start()
defer ruleManager.Stop()
@@ -1000,6 +1013,7 @@ func TestGroupEvaluationContextFuncIsCalledWhenSupplied(t *testing.T) {
Context: context.Background(),
Logger: log.NewNopLogger(),
GroupEvaluationContextFunc: mockContextWrapFunc,
+ RuleDependencyController: NewRuleDependencyController(),
})
rgs, errs := rulefmt.ParseFile("fixtures/rules_with_source_tenants.yaml")
@@ -1114,6 +1128,8 @@ func TestNotify(t *testing.T) {
Logger: log.NewNopLogger(),
NotifyFunc: notifyFunc,
ResendDelay: 2 * time.Second,
+
+ RuleDependencyController: NewRuleDependencyController(),
}
expr, err := parser.ParseExpr("a > 1")
@@ -1184,6 +1200,8 @@ func TestMetricsUpdate(t *testing.T) {
Context: context.Background(),
Logger: log.NewNopLogger(),
Registerer: registry,
+
+ RuleDependencyController: NewRuleDependencyController(),
})
ruleManager.start()
defer ruleManager.Stop()
@@ -1257,6 +1275,8 @@ func TestGroupStalenessOnRemoval(t *testing.T) {
QueryFunc: EngineQueryFunc(engine, storage),
Context: context.Background(),
Logger: log.NewNopLogger(),
+
+ RuleDependencyController: NewRuleDependencyController(),
})
var stopped bool
ruleManager.start()
@@ -1334,6 +1354,8 @@ func TestMetricsStalenessOnManagerShutdown(t *testing.T) {
QueryFunc: EngineQueryFunc(engine, storage),
Context: context.Background(),
Logger: log.NewNopLogger(),
+
+ RuleDependencyController: NewRuleDependencyController(),
})
var stopped bool
ruleManager.start()
@@ -1436,6 +1458,8 @@ func TestRuleHealthUpdates(t *testing.T) {
Queryable: st,
Context: context.Background(),
Logger: log.NewNopLogger(),
+
+ RuleDependencyController: NewRuleDependencyController(),
}
expr, err := parser.ParseExpr("a + 1")
@@ -1751,6 +1775,8 @@ func TestRuleGroupEvalIterationFunc(t *testing.T) {
NotifyFunc: func(ctx context.Context, expr string, alerts ...*Alert) {},
OutageTolerance: 30 * time.Minute,
ForGracePeriod: 10 * time.Minute,
+
+ RuleDependencyController: NewRuleDependencyController(),
}
activeAlert := &Alert{
@@ -1831,6 +1857,8 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
Queryable: storage,
Context: context.Background(),
Logger: log.NewNopLogger(),
+
+ RuleDependencyController: NewRuleDependencyController(),
}
expr, err := parser.ParseExpr("sum(histogram_metric)")
@@ -1868,3 +1896,550 @@ func TestNativeHistogramsInRecordingRules(t *testing.T) {
require.Equal(t, expHist, fh)
require.Equal(t, chunkenc.ValNone, it.Next())
}
+
+func TestDependencyMap(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ expr, err = parser.ParseExpr("user:requests:rate1m <= 0")
+ require.NoError(t, err)
+ rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger())
+
+ expr, err = parser.ParseExpr("sum by (user) (rate(requests[5m]))")
+ require.NoError(t, err)
+ rule3 := NewRecordingRule("user:requests:rate5m", expr, labels.Labels{})
+
+ expr, err = parser.ParseExpr("increase(user:requests:rate1m[1h])")
+ require.NoError(t, err)
+ rule4 := NewRecordingRule("user:requests:increase1h", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule, rule2, rule3, rule4},
+ Opts: opts,
+ })
+
+ depMap := buildDependencyMap(group.rules)
+
+ require.Zero(t, depMap.dependencies(rule))
+ require.Equal(t, 2, depMap.dependents(rule))
+ require.False(t, depMap.isIndependent(rule))
+
+ require.Zero(t, depMap.dependents(rule2))
+ require.Equal(t, 1, depMap.dependencies(rule2))
+ require.False(t, depMap.isIndependent(rule2))
+
+ require.Zero(t, depMap.dependents(rule3))
+ require.Zero(t, depMap.dependencies(rule3))
+ require.True(t, depMap.isIndependent(rule3))
+
+ require.Zero(t, depMap.dependents(rule4))
+ require.Equal(t, 1, depMap.dependencies(rule4))
+ require.False(t, depMap.isIndependent(rule4))
+}
+
+func TestNoDependency(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule},
+ Opts: opts,
+ })
+
+ depMap := buildDependencyMap(group.rules)
+ // A group with only one rule cannot have dependencies.
+ require.Empty(t, depMap)
+}
+
+func TestDependenciesEdgeCases(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ t.Run("empty group", func(t *testing.T) {
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{}, // empty group
+ Opts: opts,
+ })
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ depMap := buildDependencyMap(group.rules)
+ // A group with no rules has no dependency map, but doesn't panic if the map is queried.
+ require.Nil(t, depMap)
+ require.False(t, depMap.isIndependent(rule))
+ })
+
+ t.Run("rules which reference no series", func(t *testing.T) {
+ expr, err := parser.ParseExpr("one")
+ require.NoError(t, err)
+ rule1 := NewRecordingRule("1", expr, labels.Labels{})
+
+ expr, err = parser.ParseExpr("two")
+ require.NoError(t, err)
+ rule2 := NewRecordingRule("2", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule1, rule2},
+ Opts: opts,
+ })
+
+ depMap := buildDependencyMap(group.rules)
+ // A group with rules which reference no series will still produce a dependency map
+ require.True(t, depMap.isIndependent(rule1))
+ require.True(t, depMap.isIndependent(rule2))
+ })
+}
+
+func TestNoMetricSelector(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ expr, err = parser.ParseExpr(`count({user="bob"})`)
+ require.NoError(t, err)
+ rule2 := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule, rule2},
+ Opts: opts,
+ })
+
+ depMap := buildDependencyMap(group.rules)
+ // A rule with no metric selector cannot be reliably determined to have no dependencies on other rules, and therefore
+ // all rules are not considered independent.
+ require.False(t, depMap.isIndependent(rule))
+ require.False(t, depMap.isIndependent(rule2))
+}
+
+func TestDependentRulesWithNonMetricExpression(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ expr, err := parser.ParseExpr("sum by (user) (rate(requests[1m]))")
+ require.NoError(t, err)
+ rule := NewRecordingRule("user:requests:rate1m", expr, labels.Labels{})
+
+ expr, err = parser.ParseExpr("user:requests:rate1m <= 0")
+ require.NoError(t, err)
+ rule2 := NewAlertingRule("ZeroRequests", expr, 0, 0, labels.Labels{}, labels.Labels{}, labels.EmptyLabels(), "", true, log.NewNopLogger())
+
+ expr, err = parser.ParseExpr("3")
+ require.NoError(t, err)
+ rule3 := NewRecordingRule("three", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule, rule2, rule3},
+ Opts: opts,
+ })
+
+ depMap := buildDependencyMap(group.rules)
+ require.False(t, depMap.isIndependent(rule))
+ require.False(t, depMap.isIndependent(rule2))
+ require.True(t, depMap.isIndependent(rule3))
+}
+
+func TestRulesDependentOnMetaMetrics(t *testing.T) {
+ ctx := context.Background()
+ opts := &ManagerOptions{
+ Context: ctx,
+ Logger: log.NewNopLogger(),
+ }
+
+ // This rule is not dependent on any other rules in its group but it does depend on `ALERTS`, which is produced by
+ // the rule engine, and is therefore not independent.
+ expr, err := parser.ParseExpr("count(ALERTS)")
+ require.NoError(t, err)
+ rule := NewRecordingRule("alert_count", expr, labels.Labels{})
+
+ // Create another rule so a dependency map is built (no map is built if a group contains one or fewer rules).
+ expr, err = parser.ParseExpr("1")
+ require.NoError(t, err)
+ rule2 := NewRecordingRule("one", expr, labels.Labels{})
+
+ group := NewGroup(GroupOptions{
+ Name: "rule_group",
+ Interval: time.Second,
+ Rules: []Rule{rule, rule2},
+ Opts: opts,
+ })
+
+ depMap := buildDependencyMap(group.rules)
+ require.False(t, depMap.isIndependent(rule))
+}
+
+func TestDependencyMapUpdatesOnGroupUpdate(t *testing.T) {
+ files := []string{"fixtures/rules.yaml"}
+ ruleManager := NewManager(&ManagerOptions{
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ })
+
+ ruleManager.start()
+ defer ruleManager.Stop()
+
+ err := ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
+ require.NoError(t, err)
+ require.Greater(t, len(ruleManager.groups), 0, "expected non-empty rule groups")
+
+ orig := make(map[string]dependencyMap, len(ruleManager.groups))
+ for _, g := range ruleManager.groups {
+ depMap := buildDependencyMap(g.rules)
+ // No dependency map is expected because there is only one rule in the group.
+ require.Empty(t, depMap)
+ orig[g.Name()] = depMap
+ }
+
+ // Update once without changing groups.
+ err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
+ require.NoError(t, err)
+ for h, g := range ruleManager.groups {
+ depMap := buildDependencyMap(g.rules)
+ // Dependency maps are the same because of no updates.
+ require.Equal(t, orig[h], depMap)
+ }
+
+ // Groups will be recreated when updated.
+ files[0] = "fixtures/rules_dependencies.yaml"
+ err = ruleManager.Update(10*time.Second, files, labels.EmptyLabels(), "", nil)
+ require.NoError(t, err)
+
+ for h, g := range ruleManager.groups {
+ const ruleName = "job:http_requests:rate5m"
+ var rr *RecordingRule
+
+ for _, r := range g.rules {
+ if r.Name() == ruleName {
+ rr = r.(*RecordingRule)
+ }
+ }
+
+ require.NotEmptyf(t, rr, "expected to find %q recording rule in fixture", ruleName)
+
+ depMap := buildDependencyMap(g.rules)
+ // Dependency maps must change because the groups would've been updated.
+ require.NotEqual(t, orig[h], depMap)
+ // We expect there to be some dependencies since the new rule group contains a dependency.
+ require.Greater(t, len(depMap), 0)
+ require.Equal(t, 1, depMap.dependents(rr))
+ require.Zero(t, depMap.dependencies(rr))
+ }
+}
+
+func TestAsyncRuleEvaluation(t *testing.T) {
+ storage := teststorage.New(t)
+ t.Cleanup(func() { storage.Close() })
+
+ const artificialDelay = 500 * time.Millisecond
+
+ var (
+ inflightQueries atomic.Int32
+ maxInflight atomic.Int32
+ )
+
+ optsFactory := func() *ManagerOptions {
+ return &ManagerOptions{
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ Appendable: storage,
+ QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
+ inflightQueries.Add(1)
+ defer func() {
+ inflightQueries.Add(-1)
+ }()
+
+ // Artificially delay all query executions to highlight concurrent execution improvement.
+ time.Sleep(artificialDelay)
+
+ // Return a stub sample.
+ return promql.Vector{
+ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
+ }, nil
+ },
+ RuleDependencyController: NewRuleDependencyController(),
+ }
+ }
+
+ inflightTracker := func(ctx context.Context) {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ highWatermark := maxInflight.Load()
+ current := inflightQueries.Load()
+ if current > highWatermark {
+ maxInflight.Store(current)
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+ }
+ }
+
+ t.Run("synchronous evaluation with independent rules", func(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+
+ ruleManager := NewManager(optsFactory())
+ groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...)
+ require.Empty(t, errs)
+ require.Len(t, groups, 1)
+
+ ruleCount := 4
+
+ for _, group := range groups {
+ require.Len(t, group.rules, ruleCount)
+
+ start := time.Now()
+
+ // Never expect more than 1 inflight query at a time.
+ go inflightTracker(ctx)
+
+ group.Eval(ctx, start)
+
+ require.EqualValues(t, 1, maxInflight.Load())
+ // Each rule should take at least 1 second to execute sequentially.
+ require.GreaterOrEqual(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
+ // Each rule produces one vector.
+ require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
+ }
+
+ cancel()
+ })
+
+ t.Run("asynchronous evaluation with independent and dependent rules", func(t *testing.T) {
+ // Reset.
+ inflightQueries.Store(0)
+ maxInflight.Store(0)
+ ctx, cancel := context.WithCancel(context.Background())
+
+ ruleCount := 4
+ opts := optsFactory()
+
+ // Configure concurrency settings.
+ opts.ConcurrentEvalsEnabled = true
+ opts.MaxConcurrentEvals = 2
+ opts.RuleConcurrencyController = nil
+ ruleManager := NewManager(opts)
+
+ groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple.yaml"}...)
+ require.Empty(t, errs)
+ require.Len(t, groups, 1)
+
+ for _, group := range groups {
+ require.Len(t, group.rules, ruleCount)
+
+ start := time.Now()
+
+ go inflightTracker(ctx)
+
+ group.Eval(ctx, start)
+
+ // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
+ require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
+ // Some rules should execute concurrently so should complete quicker.
+ require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
+ // Each rule produces one vector.
+ require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
+ }
+
+ cancel()
+ })
+
+ t.Run("asynchronous evaluation of all independent rules, insufficient concurrency", func(t *testing.T) {
+ // Reset.
+ inflightQueries.Store(0)
+ maxInflight.Store(0)
+ ctx, cancel := context.WithCancel(context.Background())
+
+ ruleCount := 6
+ opts := optsFactory()
+
+ // Configure concurrency settings.
+ opts.ConcurrentEvalsEnabled = true
+ opts.MaxConcurrentEvals = 2
+ opts.RuleConcurrencyController = nil
+ ruleManager := NewManager(opts)
+
+ groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...)
+ require.Empty(t, errs)
+ require.Len(t, groups, 1)
+
+ for _, group := range groups {
+ require.Len(t, group.rules, ruleCount)
+
+ start := time.Now()
+
+ go inflightTracker(ctx)
+
+ group.Eval(ctx, start)
+
+ // Max inflight can be 1 synchronous eval and up to MaxConcurrentEvals concurrent evals.
+ require.EqualValues(t, opts.MaxConcurrentEvals+1, maxInflight.Load())
+ // Some rules should execute concurrently so should complete quicker.
+ require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
+ // Each rule produces one vector.
+ require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
+
+ }
+
+ cancel()
+ })
+
+ t.Run("asynchronous evaluation of all independent rules, sufficient concurrency", func(t *testing.T) {
+ // Reset.
+ inflightQueries.Store(0)
+ maxInflight.Store(0)
+ ctx, cancel := context.WithCancel(context.Background())
+
+ ruleCount := 6
+ opts := optsFactory()
+
+ // Configure concurrency settings.
+ opts.ConcurrentEvalsEnabled = true
+ opts.MaxConcurrentEvals = int64(ruleCount) * 2
+ opts.RuleConcurrencyController = nil
+ ruleManager := NewManager(opts)
+
+ groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, []string{"fixtures/rules_multiple_independent.yaml"}...)
+ require.Empty(t, errs)
+ require.Len(t, groups, 1)
+
+ for _, group := range groups {
+ require.Len(t, group.rules, ruleCount)
+
+ start := time.Now()
+
+ go inflightTracker(ctx)
+
+ group.Eval(ctx, start)
+
+ // Max inflight can be up to MaxConcurrentEvals concurrent evals, since there is sufficient concurrency to run all rules at once.
+ require.LessOrEqual(t, int64(maxInflight.Load()), opts.MaxConcurrentEvals)
+ // Some rules should execute concurrently so should complete quicker.
+ require.Less(t, time.Since(start).Seconds(), (time.Duration(ruleCount) * artificialDelay).Seconds())
+ // Each rule produces one vector.
+ require.EqualValues(t, ruleCount, testutil.ToFloat64(group.metrics.GroupSamples))
+ }
+
+ cancel()
+ })
+}
+
+func TestBoundedRuleEvalConcurrency(t *testing.T) {
+ storage := teststorage.New(t)
+ t.Cleanup(func() { storage.Close() })
+
+ const artificialDelay = time.Millisecond * 100
+
+ var (
+ inflightQueries atomic.Int32
+ maxInflight atomic.Int32
+ maxConcurrency int64 = 3
+ groupCount = 2
+ )
+
+ files := []string{"fixtures/rules_multiple_groups.yaml"}
+ ruleManager := NewManager(&ManagerOptions{
+ Context: context.Background(),
+ Logger: log.NewNopLogger(),
+ Appendable: storage,
+ ConcurrentEvalsEnabled: true,
+ MaxConcurrentEvals: maxConcurrency,
+ QueryFunc: func(ctx context.Context, q string, ts time.Time) (promql.Vector, error) {
+ inflightQueries.Add(1)
+ defer func() {
+ inflightQueries.Add(-1)
+ }()
+
+ // Artificially delay all query executions to highlight concurrent execution improvement.
+ time.Sleep(artificialDelay)
+
+ // Return a stub sample.
+ return promql.Vector{
+ promql.Sample{Metric: labels.FromStrings("__name__", "test"), T: ts.UnixMilli(), F: 12345},
+ }, nil
+ },
+ RuleDependencyController: NewRuleDependencyController(),
+ })
+
+ groups, errs := ruleManager.LoadGroups(time.Second, labels.EmptyLabels(), "", nil, files...)
+ require.Empty(t, errs)
+ require.Len(t, groups, groupCount)
+
+ ctx, cancel := context.WithCancel(context.Background())
+
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ highWatermark := maxInflight.Load()
+ current := inflightQueries.Load()
+ if current > highWatermark {
+ maxInflight.Store(current)
+ }
+
+ time.Sleep(time.Millisecond)
+ }
+ }
+ }()
+
+ // Evaluate groups concurrently (like they normally do).
+ var wg sync.WaitGroup
+ for _, group := range groups {
+ group := group
+
+ wg.Add(1)
+ go func() {
+ group.Eval(ctx, time.Now())
+ wg.Done()
+ }()
+ }
+
+ wg.Wait()
+ cancel()
+
+ // Synchronous queries also count towards inflight, so at most we can have maxConcurrency+$groupCount inflight evaluations.
+ require.EqualValues(t, maxInflight.Load(), int32(maxConcurrency)+int32(groupCount))
+}
diff --git a/rules/origin.go b/rules/origin.go
index 996538767..9c1a12536 100644
--- a/rules/origin.go
+++ b/rules/origin.go
@@ -28,6 +28,10 @@ type RuleDetail struct {
Query string
Labels labels.Labels
Kind string
+
+ // Independent holds whether this rule depends on the result of other rules
+ // within the same rule group or not.
+ Independent bool
}
const (
@@ -36,7 +40,7 @@ const (
)
// NewRuleDetail creates a RuleDetail from a given Rule.
-func NewRuleDetail(r Rule) RuleDetail {
+func NewRuleDetail(r Rule, independent bool) RuleDetail {
var kind string
switch r.(type) {
case *AlertingRule:
@@ -48,10 +52,11 @@ func NewRuleDetail(r Rule) RuleDetail {
}
return RuleDetail{
- Name: r.Name(),
- Query: r.Query().String(),
- Labels: r.Labels(),
- Kind: kind,
+ Name: r.Name(),
+ Query: r.Query().String(),
+ Labels: r.Labels(),
+ Kind: kind,
+ Independent: independent,
}
}
diff --git a/rules/origin_test.go b/rules/origin_test.go
index eda5f9247..ce21b3607 100644
--- a/rules/origin_test.go
+++ b/rules/origin_test.go
@@ -30,7 +30,7 @@ type unknownRule struct{}
func (u unknownRule) Name() string { return "" }
func (u unknownRule) Labels() labels.Labels { return labels.EmptyLabels() }
-func (u unknownRule) Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error) {
+func (u unknownRule) Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int, bool) (promql.Vector, error) {
return nil, nil
}
func (u unknownRule) String() string { return "" }
@@ -46,6 +46,16 @@ func (u unknownRule) GetEvaluationTimestamp() time.Time { return time.Time{}
func TestNewRuleDetailPanics(t *testing.T) {
require.PanicsWithValue(t, `unknown rule type "rules.unknownRule"`, func() {
- NewRuleDetail(unknownRule{})
+ NewRuleDetail(unknownRule{}, false)
+ })
+}
+
+func TestFromOriginContext(t *testing.T) {
+ t.Run("should return zero value if RuleDetail is missing in the context", func(t *testing.T) {
+ detail := FromOriginContext(context.Background())
+ require.Zero(t, detail)
+
+ // The zero value for the Independent rule must be the most conservative option.
+ require.False(t, detail.Independent)
})
}
diff --git a/rules/recording.go b/rules/recording.go
index 8abf8fa5f..739fa7fc9 100644
--- a/rules/recording.go
+++ b/rules/recording.go
@@ -72,8 +72,8 @@ func (rule *RecordingRule) Labels() labels.Labels {
}
// Eval evaluates the rule and then overrides the metric names and labels accordingly.
-func (rule *RecordingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int) (promql.Vector, error) {
- ctx = NewOriginContext(ctx, NewRuleDetail(rule))
+func (rule *RecordingRule) Eval(ctx context.Context, evalDelay time.Duration, ts time.Time, query QueryFunc, _ *url.URL, limit int, independent bool) (promql.Vector, error) {
+ ctx = NewOriginContext(ctx, NewRuleDetail(rule, independent))
vector, err := query(ctx, rule.vector.String(), ts.Add(-evalDelay))
if err != nil {
diff --git a/rules/recording_test.go b/rules/recording_test.go
index 984151307..98d609130 100644
--- a/rules/recording_test.go
+++ b/rules/recording_test.go
@@ -15,6 +15,7 @@ package rules
import (
"context"
+ "fmt"
"testing"
"time"
@@ -124,7 +125,7 @@ func TestRuleEval(t *testing.T) {
for _, scenario := range ruleEvalTestScenarios {
t.Run(scenario.name, func(t *testing.T) {
rule := NewRecordingRule("test_rule", scenario.expr, scenario.ruleLabels)
- result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ result, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
require.NoError(t, err)
require.Equal(t, scenario.expected, result)
})
@@ -142,7 +143,7 @@ func BenchmarkRuleEval(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- _, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0)
+ _, err := rule.Eval(context.TODO(), 0, ruleEvaluationTime, EngineQueryFunc(testEngine, storage), nil, 0, false)
if err != nil {
require.NoError(b, err)
}
@@ -171,7 +172,7 @@ func TestRuleEvalDuplicate(t *testing.T) {
expr, _ := parser.ParseExpr(`vector(0) or label_replace(vector(0),"test","x","","")`)
rule := NewRecordingRule("foo", expr, labels.FromStrings("test", "test"))
- _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0)
+ _, err := rule.Eval(ctx, 0, now, EngineQueryFunc(engine, storage), nil, 0, false)
require.Error(t, err)
require.EqualError(t, err, "vector contains metrics with the same labelset after applying rule labels")
}
@@ -213,7 +214,7 @@ func TestRecordingRuleLimit(t *testing.T) {
evalTime := time.Unix(0, 0)
for _, test := range tests {
- switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit); {
+ switch _, err := rule.Eval(context.TODO(), 0, evalTime, EngineQueryFunc(testEngine, storage), nil, test.limit, false); {
case err != nil:
require.EqualError(t, err, test.err)
case test.err != "":
@@ -240,12 +241,16 @@ func TestRecordingEvalWithOrigin(t *testing.T) {
expr, err := parser.ParseExpr(query)
require.NoError(t, err)
- rule := NewRecordingRule(name, expr, lbs)
- _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
- detail = FromOriginContext(ctx)
- return nil, nil
- }, nil, 0)
+ for _, independent := range []bool{true, false} {
+ t.Run(fmt.Sprintf("independent = %t", independent), func(t *testing.T) {
+ rule := NewRecordingRule(name, expr, lbs)
+ _, err = rule.Eval(ctx, 0, now, func(ctx context.Context, qs string, _ time.Time) (promql.Vector, error) {
+ detail = FromOriginContext(ctx)
+ return nil, nil
+ }, nil, 0, independent)
- require.NoError(t, err)
- require.Equal(t, detail, NewRuleDetail(rule))
+ require.NoError(t, err)
+ require.Equal(t, detail, NewRuleDetail(rule, independent))
+ })
+ }
}
diff --git a/rules/rule.go b/rules/rule.go
index 42e882918..336605d8a 100644
--- a/rules/rule.go
+++ b/rules/rule.go
@@ -41,7 +41,7 @@ type Rule interface {
Labels() labels.Labels
// Eval evaluates the rule, including any associated recording or alerting actions.
// The duration passed is the evaluation delay.
- Eval(context.Context, time.Duration, time.Time, QueryFunc, *url.URL, int) (promql.Vector, error)
+ Eval(_ context.Context, evalDelay time.Duration, ts time.Time, _ QueryFunc, externalURL *url.URL, limit int, independent bool) (promql.Vector, error)
// String returns a human-readable string representation of the rule.
String() string
// Query returns the rule query expression.
diff --git a/web/api/v1/api_test.go b/web/api/v1/api_test.go
index c9ab84087..59a841f40 100644
--- a/web/api/v1/api_test.go
+++ b/web/api/v1/api_test.go
@@ -293,6 +293,8 @@ func (m *rulesRetrieverMock) CreateRuleGroups() {
Context: context.Background(),
Logger: log.NewNopLogger(),
NotifyFunc: func(ctx context.Context, expr string, alerts ...*rules.Alert) {},
+
+ RuleDependencyController: rules.NewRuleDependencyController(),
}
var r []rules.Rule