Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 64 additions & 33 deletions rules/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ type Group struct {
// defaults to DefaultEvalIterationFunc.
evalIterationFunc GroupEvalIterationFunc

operatorControllableErrorClassifier OperatorControllableErrorClassifier

appOpts *storage.AppendOptions
alignEvaluationTimeOnInterval bool
}
Expand All @@ -85,18 +87,32 @@ type Group struct {
// DefaultEvalIterationFunc is the default implementation.
type GroupEvalIterationFunc func(ctx context.Context, g *Group, evalTimestamp time.Time)

// OperatorControllableErrorClassifier classifies whether rule evaluation errors are operator-controllable.
type OperatorControllableErrorClassifier interface {
IsOperatorControllable(error) bool
}

type GroupOptions struct {
Name, File string
Interval time.Duration
Limit int
Rules []Rule
SourceTenants []string
ShouldRestore bool
Opts *ManagerOptions
QueryOffset *time.Duration
done chan struct{}
EvalIterationFunc GroupEvalIterationFunc
AlignEvaluationTimeOnInterval bool
Name, File string
Interval time.Duration
Limit int
Rules []Rule
SourceTenants []string
ShouldRestore bool
Opts *ManagerOptions
QueryOffset *time.Duration
done chan struct{}
EvalIterationFunc GroupEvalIterationFunc
AlignEvaluationTimeOnInterval bool
OperatorControllableErrorClassifier OperatorControllableErrorClassifier
}

// DefaultOperatorControllableErrorClassifier is the default implementation of
// OperatorControllableErrorClassifier that classifies no errors as operator-controllable.
type DefaultOperatorControllableErrorClassifier struct{}

func (*DefaultOperatorControllableErrorClassifier) IsOperatorControllable(_ error) bool {
return false
}

// NewGroup makes a new Group with the given name, options, and rules.
Expand All @@ -114,7 +130,8 @@ func NewGroup(o GroupOptions) *Group {
metrics.IterationsMissed.WithLabelValues(key)
metrics.IterationsScheduled.WithLabelValues(key)
metrics.EvalTotal.WithLabelValues(key)
metrics.EvalFailures.WithLabelValues(key)
metrics.EvalFailures.WithLabelValues(key, "user")
metrics.EvalFailures.WithLabelValues(key, "operator")
metrics.GroupLastEvalTime.WithLabelValues(key)
metrics.GroupLastDuration.WithLabelValues(key)
metrics.GroupLastRuleDurationSum.WithLabelValues(key)
Expand All @@ -127,29 +144,35 @@ func NewGroup(o GroupOptions) *Group {
evalIterationFunc = DefaultEvalIterationFunc
}

operatorControllableErrorClassifier := o.OperatorControllableErrorClassifier
if operatorControllableErrorClassifier == nil {
operatorControllableErrorClassifier = &DefaultOperatorControllableErrorClassifier{}
}

if opts.Logger == nil {
opts.Logger = promslog.NewNopLogger()
}

return &Group{
name: o.Name,
file: o.File,
interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: opts,
sourceTenants: o.SourceTenants,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: opts.Logger.With("file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
alignEvaluationTimeOnInterval: o.AlignEvaluationTimeOnInterval,
name: o.Name,
file: o.File,
interval: o.Interval,
queryOffset: o.QueryOffset,
limit: o.Limit,
rules: o.Rules,
shouldRestore: o.ShouldRestore,
opts: opts,
sourceTenants: o.SourceTenants,
seriesInPreviousEval: make([]map[string]labels.Labels, len(o.Rules)),
done: make(chan struct{}),
managerDone: o.done,
terminated: make(chan struct{}),
logger: opts.Logger.With("file", o.File, "group", o.Name),
metrics: metrics,
evalIterationFunc: evalIterationFunc,
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
alignEvaluationTimeOnInterval: o.AlignEvaluationTimeOnInterval,
operatorControllableErrorClassifier: operatorControllableErrorClassifier,
}
}

Expand Down Expand Up @@ -546,7 +569,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
g.incrementEvalFailures(err)

// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
Expand Down Expand Up @@ -576,7 +599,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
sp.SetStatus(codes.Error, err.Error())
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
g.incrementEvalFailures(err)

logger.Warn("Rule sample appending failed", "err", err)
return
Expand Down Expand Up @@ -701,6 +724,14 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
g.cleanupStaleSeries(ctx, ts)
}

func (g *Group) incrementEvalFailures(err error) {
reason := "user"
if g.operatorControllableErrorClassifier != nil && g.operatorControllableErrorClassifier.IsOperatorControllable(err) {
reason = "operator"
}
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name()), reason).Inc()
}

func (g *Group) QueryOffset() time.Duration {
if g.queryOffset != nil {
return *g.queryOffset
Expand Down Expand Up @@ -1010,7 +1041,7 @@ func NewGroupMetrics(reg prometheus.Registerer) *Metrics {
Name: "rule_evaluation_failures_total",
Help: "The total number of rule evaluation failures.",
},
[]string{"rule_group"},
[]string{"rule_group", "reason"},
),
GroupInterval: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Expand Down
157 changes: 157 additions & 0 deletions rules/group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,20 @@
package rules

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"

"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/util/teststorage"
)

func TestGroup_Equals(t *testing.T) {
Expand Down Expand Up @@ -248,6 +255,156 @@ func TestGroup_Equals(t *testing.T) {
}
}

// HTTPStatusOperatorControllableErrorClassifier is a test classifier that identifies
// 429 and 5xx status codes as operator-controllable errors.
type HTTPStatusOperatorControllableErrorClassifier struct{}

func (*HTTPStatusOperatorControllableErrorClassifier) IsOperatorControllable(err error) bool {
if err == nil {
return false
}
errMsg := err.Error()
return strings.Contains(errMsg, "429") || strings.Contains(errMsg, "50")
}

func TestEvalOperatorControllableFailures(t *testing.T) {
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })

expr, err := parser.ParseExpr("up")
require.NoError(t, err)
rule := NewRecordingRule("test_rule", expr, labels.EmptyLabels())

customClassifier := &HTTPStatusOperatorControllableErrorClassifier{}

testCases := []struct {
name string
errorMessage string
classifier OperatorControllableErrorClassifier
expectOperatorControllable bool
}{
{"default classifier", "any error", nil, false},
{"custom 429 classified as operator controllable", "HTTP 429 Too Many Requests", customClassifier, true},
{"custom 500 classified as operator controllable", "HTTP 500 Internal Server Error", customClassifier, true},
{"custom 502 classified as operator controllable", "HTTP 502 Bad Gateway", customClassifier, true},
{"custom 400 not operator controllable", "HTTP 400 Bad Request", customClassifier, false},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
errorQueryFunc := func(_ context.Context, _ string, _ time.Time) (promql.Vector, error) {
return nil, fmt.Errorf("%s", tc.errorMessage)
}

opts := &ManagerOptions{
Context: context.Background(),
QueryFunc: errorQueryFunc,
Appendable: storage,
Queryable: storage,
Logger: promslog.NewNopLogger(),
}

group := NewGroup(GroupOptions{
Name: "test_group",
File: "test.yml",
Interval: time.Second,
Rules: []Rule{rule},
Opts: opts,
OperatorControllableErrorClassifier: tc.classifier,
})

group.Eval(context.Background(), time.Now())

groupKey := GroupKey("test.yml", "test_group")
evalUserFailures := testutil.ToFloat64(group.metrics.EvalFailures.WithLabelValues(groupKey, "user"))
evalOperatorFailures := testutil.ToFloat64(group.metrics.EvalFailures.WithLabelValues(groupKey, "operator"))

if tc.expectOperatorControllable {
require.Equal(t, float64(0), evalUserFailures)
require.Equal(t, float64(1), evalOperatorFailures)
} else {
require.Equal(t, float64(1), evalUserFailures)
require.Equal(t, float64(0), evalOperatorFailures)
}
})
}
}

func TestEvalDiscardedSamplesDoNotIncrementFailureMetrics(t *testing.T) {
testCases := []struct {
name string
setupStorage func(storage *teststorage.TestStorage)
offsetMs int64 // milliseconds offset from evaluation time
}{
{
name: "out of order samples",
setupStorage: func(s *teststorage.TestStorage) {
app := s.Appender(context.Background())
app.Append(0, labels.FromStrings("__name__", "test_metric", "job", "test"), time.Now().UnixMilli(), 1.0)
app.Commit()
},
offsetMs: -10000, // 10 seconds in past
},
{
name: "too old samples",
setupStorage: func(_ *teststorage.TestStorage) {},
offsetMs: -86400000, // 24 hours in past
},
{
name: "duplicate samples",
setupStorage: func(s *teststorage.TestStorage) {
app := s.Appender(context.Background())
app.Append(0, labels.FromStrings("__name__", "test_metric", "job", "test"), time.Now().UnixMilli(), 1.0)
app.Commit()
},
offsetMs: 0, // Same timestamp, different value
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
storage := teststorage.New(t)
t.Cleanup(func() { storage.Close() })

tc.setupStorage(storage)

expr, err := parser.ParseExpr("up")
require.NoError(t, err)
rule := NewRecordingRule("test_rule", expr, labels.EmptyLabels())

queryFunc := func(_ context.Context, _ string, ts time.Time) (promql.Vector, error) {
return promql.Vector{
promql.Sample{
Metric: labels.FromStrings("__name__", "test_metric", "job", "test"),
T: ts.UnixMilli() + tc.offsetMs,
F: 2.0, // Different value for duplicate case
},
}, nil
}

group := NewGroup(GroupOptions{
Name: "test_group",
File: "test.yml",
Interval: time.Second,
Rules: []Rule{rule},
Opts: &ManagerOptions{
Context: context.Background(),
QueryFunc: queryFunc,
Appendable: storage,
Queryable: storage,
Logger: promslog.NewNopLogger(),
},
})

group.Eval(context.Background(), time.Now())

groupKey := GroupKey("test.yml", "test_group")
require.Equal(t, float64(0), testutil.ToFloat64(group.metrics.EvalFailures.WithLabelValues(groupKey, "user")))
require.Equal(t, float64(0), testutil.ToFloat64(group.metrics.EvalFailures.WithLabelValues(groupKey, "operator")))
})
}
}

func pointerOf[T any](value T) *T {
return &value
}
3 changes: 2 additions & 1 deletion rules/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ func (m *Manager) Update(interval time.Duration, files []string, externalLabels
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n, "user")
m.EvalFailures.DeleteLabelValues(n, "operator")
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
Expand Down
6 changes: 3 additions & 3 deletions rules/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1196,19 +1196,19 @@ func TestMetricsUpdate(t *testing.T) {
}{
{
files: files,
metrics: 12,
metrics: 14,
},
{
files: files[:1],
metrics: 6,
metrics: 7,
},
{
files: files[:0],
metrics: 0,
},
{
files: files[1:],
metrics: 6,
metrics: 7,
},
}

Expand Down
Loading