diff --git a/cmd/icinga-notifications-daemon/main.go b/cmd/icinga-notifications-daemon/main.go index f9243ffb..b0f8f438 100644 --- a/cmd/icinga-notifications-daemon/main.go +++ b/cmd/icinga-notifications-daemon/main.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icinga-notifications/internal/channel" "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" + "github.com/icinga/icinga-notifications/internal/incident" "github.com/icinga/icinga-notifications/internal/listener" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" @@ -91,6 +92,11 @@ func main() { go runtimeConfig.PeriodicUpdates(ctx, 1*time.Second) + err = incident.LoadOpenIncidents(ctx, db, logs.GetChildLogger("incident"), runtimeConfig) + if err != nil { + logger.Fatalw("Can't load incidents from database", zap.Error(err)) + } + if err := listener.NewListener(db, runtimeConfig, logs).Run(ctx); err != nil { logger.Errorw("Listener has finished with an error", zap.Error(err)) } else { diff --git a/go.mod b/go.mod index 8d2f9b48..fd2e5068 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/icinga/icinga-notifications -go 1.20 +go 1.21 require ( github.com/creasty/defaults v1.7.0 diff --git a/internal/event/event.go b/internal/event/event.go index dd9eee0f..48b39fbf 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -31,6 +31,7 @@ type Event struct { const ( TypeState = "state" TypeAcknowledgement = "acknowledgement" + TypeInternal = "internal" ) func (e *Event) String() string { diff --git a/internal/filter/contracts.go b/internal/filter/contracts.go index 6ccbb6be..12543dd0 100644 --- a/internal/filter/contracts.go +++ b/internal/filter/contracts.go @@ -12,4 +12,5 @@ type Filterable interface { // Filter is implemented by every filter chains and filter conditions. type Filter interface { Eval(filterable Filterable) (bool, error) + ExtractConditions() []*Condition } diff --git a/internal/filter/parser.go b/internal/filter/parser.go index 1ce642bf..71dfe6b4 100644 --- a/internal/filter/parser.go +++ b/internal/filter/parser.go @@ -15,7 +15,7 @@ type Parser struct { func Parse(expression string) (Filter, error) { parser := &Parser{tag: expression, length: len(expression)} if parser.length == 0 { - return &All{}, nil + return &Chain{op: All}, nil } return parser.readFilter(0, "", nil) @@ -97,7 +97,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( if operator != "!" && (next == "&" || next == "|") { if operator == "&" { if len(rules) > 1 { - rules = []Filter{&All{rules: rules}} + rules = []Filter{&Chain{op: All, rules: rules}} } operator = next @@ -122,7 +122,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( } else { if negate { negate = false - rules = append(rules, &None{rules: []Filter{condition}}) + rules = append(rules, &Chain{op: None, rules: []Filter{condition}}) } else { rules = append(rules, condition) } @@ -154,7 +154,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( if next == "&" || next == "|" { if operator == "" || operator == "&" { if operator == "&" && len(rules) > 1 { - all := &All{rules: rules} + all := &Chain{op: All, rules: rules} rules = []Filter{all} } @@ -195,18 +195,18 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( var chain Filter switch operator { case "&": - chain = &All{rules: rules} + chain = &Chain{op: All, rules: rules} case "|": - chain = &Any{rules: rules} + chain = &Chain{op: Any, rules: rules} case "!": - chain = &None{rules: rules} + chain = &Chain{op: None, rules: rules} case "": if nestingLevel == 0 && rules != nil { // There is only one filter tag, no chain return rules[0], nil } - chain = &All{rules: rules} + chain = &Chain{op: All, rules: rules} default: return nil, p.parseError(operator, "") } @@ -257,24 +257,24 @@ func (p *Parser) createCondition(column string, operator string, value string) ( switch operator { case "=": if strings.Contains(value, "*") { - return &Like{column: column, value: value}, nil + return &Condition{op: Like, column: column, value: value}, nil } - return &Equal{column: column, value: value}, nil + return &Condition{op: Equal, column: column, value: value}, nil case "!=": if strings.Contains(value, "*") { - return &Unlike{column: column, value: value}, nil + return &Condition{op: UnLike, column: column, value: value}, nil } - return &UnEqual{column: column, value: value}, nil + return &Condition{op: UnEqual, column: column, value: value}, nil case ">": - return &GreaterThan{column: column, value: value}, nil + return &Condition{op: GreaterThan, column: column, value: value}, nil case ">=": - return &GreaterThanOrEqual{column: column, value: value}, nil + return &Condition{op: GreaterThanEqual, column: column, value: value}, nil case "<": - return &LessThan{column: column, value: value}, nil + return &Condition{op: LessThan, column: column, value: value}, nil case "<=": - return &LessThanOrEqual{column: column, value: value}, nil + return &Condition{op: LessThanEqual, column: column, value: value}, nil default: return nil, fmt.Errorf("invalid operator %s provided", operator) } diff --git a/internal/filter/parser_test.go b/internal/filter/parser_test.go index ebbba4c7..b23a7955 100644 --- a/internal/filter/parser_test.go +++ b/internal/filter/parser_test.go @@ -62,101 +62,113 @@ func TestFilter(t *testing.T) { t.Run("ParserIdentifiesAllKindOfFilters", func(t *testing.T) { rule, err := Parse("foo=bar") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &Equal{}, rule) + expected := &Condition{op: Equal, column: "foo", value: "bar"} + assert.Equal(t, expected, rule) rule, err = Parse("foo!=bar") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &UnEqual{}, rule) + expected = &Condition{op: UnEqual, column: "foo", value: "bar"} + assert.Equal(t, expected, rule) rule, err = Parse("foo=bar*") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &Like{}, rule) + expected = &Condition{op: Like, column: "foo", value: "bar*"} + assert.Equal(t, expected, rule) rule, err = Parse("foo!=bar*") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &Unlike{}, rule) + expected = &Condition{op: UnLike, column: "foo", value: "bar*"} + assert.Equal(t, expected, rule) rule, err = Parse("foobar") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &GreaterThan{}, rule) + expected = &Condition{op: GreaterThan, column: "foo", value: "bar"} + assert.Equal(t, expected, rule) rule, err = Parse("foo>=bar") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &GreaterThanOrEqual{}, rule) + expected = &Condition{op: GreaterThanEqual, column: "foo", value: "bar"} + assert.Equal(t, expected, rule) rule, err = Parse("foo=bar&bar=foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &All{}, rule) + assert.IsType(t, &Chain{}, rule) rule, err = Parse("foo=bar|bar=foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &Any{}, rule) + assert.IsType(t, &Chain{}, rule) rule, err = Parse("!(foo=bar|bar=foo)") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &None{}, rule) + assert.IsType(t, &Chain{}, rule) rule, err = Parse("!foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - - assert.Equal(t, &None{rules: []Filter{NewExists("foo")}}, rule) + assert.Equal(t, &Chain{op: None, rules: []Filter{NewExists("foo")}}, rule) rule, err = Parse("foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.Equal(t, NewExists("foo"), rule) + assert.Equal(t, &Exists{column: "foo"}, rule) rule, err = Parse("!(foo=bar|bar=foo)&(foo=bar|bar=foo)") assert.Nil(t, err, "There should be no errors but got: %s", err) - expected := &All{rules: []Filter{ - &None{rules: []Filter{ - &Equal{column: "foo", value: "bar"}, - &Equal{column: "bar", value: "foo"}, + expectedChain := &Chain{op: All, rules: []Filter{ + &Chain{op: None, rules: []Filter{ + &Condition{op: Equal, column: "foo", value: "bar"}, + &Condition{op: Equal, column: "bar", value: "foo"}, }}, - &Any{rules: []Filter{ - &Equal{column: "foo", value: "bar"}, - &Equal{column: "bar", value: "foo"}, + &Chain{op: Any, rules: []Filter{ + &Condition{op: Equal, column: "foo", value: "bar"}, + &Condition{op: Equal, column: "bar", value: "foo"}, }}, }} - assert.Equal(t, expected, rule) + assert.Equal(t, expectedChain, rule) }) t.Run("ParserIdentifiesSingleCondition", func(t *testing.T) { rule, err := Parse("foo=bar") assert.Nil(t, err, "There should be no errors but got: %s", err) - expected := &Equal{column: "foo", value: "bar"} + expected := &Condition{op: Equal, column: "foo", value: "bar"} assert.Equal(t, expected, rule, "Parser doesn't parse single condition correctly") }) t.Run("UrlEncodedFilterExpression", func(t *testing.T) { rule, err := Parse("col%3Cumnval%28ue") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.Equal(t, &GreaterThan{column: "col(umn", value: "val(ue"}, rule) + expected = &Condition{op: GreaterThan, column: "col(umn", value: "val(ue"} + assert.Equal(t, expected, rule) rule, err = Parse("col%29umn>=val%29ue") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.Equal(t, &GreaterThanOrEqual{column: "col)umn", value: "val)ue"}, rule) + expected = &Condition{op: GreaterThanEqual, column: "col)umn", value: "val)ue"} + assert.Equal(t, expected, rule) }) } diff --git a/internal/filter/types.go b/internal/filter/types.go index 3153cf33..4c1104d0 100644 --- a/internal/filter/types.go +++ b/internal/filter/types.go @@ -1,188 +1,211 @@ package filter -// All represents a filter chain type that matches when all of its Rules matches. -type All struct { - rules []Filter -} - -func (a *All) Eval(filterable Filterable) (bool, error) { - for _, rule := range a.rules { - matched, err := rule.Eval(filterable) - if err != nil { - return false, err - } +import ( + "fmt" +) - if !matched { - return false, nil - } - } +// LogicalOp is a type used for grouping the logical operators of a filter string. +type LogicalOp string - return true, nil -} +const ( + // None represents a filter chain type that matches when none of its ruleset matches. + None LogicalOp = "!" + // All represents a filter chain type that matches when all of its ruleset matches. + All LogicalOp = "&" + // Any represents a filter chain type that matches when at least one of its ruleset matches. + Any LogicalOp = "|" +) -// Any represents a filter chain type that matches when at least one of its Rules matches. -type Any struct { +// Chain is a filter type that wraps other filter rules and itself. +// Therefore, it implements the Filter interface to allow it to be part of its ruleset. +// It supports also adding and popping filter rules individually. +type Chain struct { + op LogicalOp // The filter chain operator to be used to evaluate the rules rules []Filter } -func (a *Any) Eval(filterable Filterable) (bool, error) { - for _, rule := range a.rules { - matched, err := rule.Eval(filterable) - if err != nil { - return false, err +// Eval evaluates the filter rule sets recursively based on their operator type. +func (c *Chain) Eval(filterable Filterable) (bool, error) { + switch c.op { + case None: + for _, rule := range c.rules { + matched, err := rule.Eval(filterable) + if err != nil { + return false, err + } + + if matched { + return false, nil + } } - if matched { - return true, nil + return true, nil + case All: + for _, rule := range c.rules { + matched, err := rule.Eval(filterable) + if err != nil { + return false, err + } + + if !matched { + return false, nil + } } - } - return false, nil -} + return true, nil + case Any: + for _, rule := range c.rules { + matched, err := rule.Eval(filterable) + if err != nil { + return false, err + } + + if matched { + return true, nil + } + } -// None represents a filter chain type that matches when none of its Rules matches. -type None struct { - rules []Filter + return false, nil + default: + return false, fmt.Errorf("invalid logical operator provided: %q", c.op) + } } -func (n *None) Eval(filterable Filterable) (bool, error) { - for _, rule := range n.rules { - matched, err := rule.Eval(filterable) - if err != nil { - return false, err - } - - if matched { - return false, nil - } +func (c *Chain) ExtractConditions() []*Condition { + var conditions []*Condition + for _, rule := range c.rules { + conditions = append(conditions, rule.ExtractConditions()...) } - return true, nil + return conditions } +// CompOperator is a type used for grouping the individual comparison operators of a filter string. +type CompOperator string + +// List of the supported comparison operators. +const ( + Equal CompOperator = "=" + UnEqual CompOperator = "!=" + Like CompOperator = "~" + UnLike CompOperator = "!~" + LessThan CompOperator = "<" + LessThanEqual CompOperator = "<=" + GreaterThan CompOperator = ">" + GreaterThanEqual CompOperator = ">=" +) + // Condition represents a single filter condition. +// It provides an implementation of the Filter interface for each of the supported CompOperator. +// All it's fields are read-only and aren't supposed to change at runtime. For read access, you can +// check the available exported methods. type Condition struct { + op CompOperator column string value string } -func NewCondition(column string, value string) *Condition { - return &Condition{ - column: column, - value: value, +// Eval evaluates this Condition based on its operator. +// Returns true when the filter evaluates to true false otherwise. +func (c *Condition) Eval(filterable Filterable) (bool, error) { + if !filterable.EvalExists(c.column) { + return false, nil } -} - -type Exists Condition - -func NewExists(column string) *Exists { - return &Exists{column: column} -} -func (e *Exists) Eval(filterable Filterable) (bool, error) { - return filterable.EvalExists(e.column), nil -} + switch c.op { + case Equal: + match, err := filterable.EvalEqual(c.column, c.value) + if err != nil { + return false, err + } -type Equal Condition + return match, nil + case UnEqual: + match, err := filterable.EvalEqual(c.column, c.value) + if err != nil { + return false, err + } -func (e *Equal) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalEqual(e.column, e.value) - if err != nil { - return false, err - } + return !match, nil + case Like: + match, err := filterable.EvalLike(c.column, c.value) + if err != nil { + return false, err + } - return match, nil -} + return match, nil + case UnLike: + match, err := filterable.EvalLike(c.column, c.value) + if err != nil { + return false, err + } -type UnEqual Condition + return !match, nil + case LessThan: + match, err := filterable.EvalLess(c.column, c.value) + if err != nil { + return false, err + } -func (u *UnEqual) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalEqual(u.column, u.value) - if err != nil { - return false, err - } + return match, nil + case LessThanEqual: + match, err := filterable.EvalLessOrEqual(c.column, c.value) + if err != nil { + return false, err + } - return filterable.EvalExists(u.column) && !match, nil -} + return match, nil + case GreaterThan: + match, err := filterable.EvalLessOrEqual(c.column, c.value) + if err != nil { + return false, err + } -type Like Condition + return !match, nil + case GreaterThanEqual: + match, err := filterable.EvalLess(c.column, c.value) + if err != nil { + return false, err + } -func (l *Like) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLike(l.column, l.value) - if err != nil { - return false, err + return !match, nil + default: + return false, fmt.Errorf("invalid comparison operator provided: %q", c.op) } - - return match, nil } -type Unlike Condition - -func (u *Unlike) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLike(u.column, u.value) - if err != nil { - return false, err - } - - return filterable.EvalExists(u.column) && !match, nil +func (c *Condition) ExtractConditions() []*Condition { + return []*Condition{c} } -type LessThan Condition - -func (less *LessThan) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLess(less.column, less.value) - if err != nil { - return false, err - } - - return match, nil +// Column returns the column of this Condition. +func (c *Condition) Column() string { + return c.column } -type LessThanOrEqual Condition - -func (loe *LessThanOrEqual) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLessOrEqual(loe.column, loe.value) - if err != nil { - return false, err - } - - return match, nil +// Value returns the value of this Condition. +func (c *Condition) Value() string { + return c.value } -type GreaterThan Condition - -func (g *GreaterThan) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLessOrEqual(g.column, g.value) - if err != nil { - return false, err - } - - return filterable.EvalExists(g.column) && !match, nil +type Exists struct { + column string } -type GreaterThanOrEqual Condition +func (e *Exists) ExtractConditions() []*Condition { + return nil +} -func (goe *GreaterThanOrEqual) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLess(goe.column, goe.value) - if err != nil { - return false, err - } +func NewExists(column string) *Exists { + return &Exists{column: column} +} - return filterable.EvalExists(goe.column) && !match, nil +func (e *Exists) Eval(filterable Filterable) (bool, error) { + return filterable.EvalExists(e.column), nil } var ( - _ Filter = (*All)(nil) - _ Filter = (*Any)(nil) - _ Filter = (*None)(nil) - + _ Filter = (*Chain)(nil) _ Filter = (*Exists)(nil) - _ Filter = (*Equal)(nil) - _ Filter = (*UnEqual)(nil) - _ Filter = (*Like)(nil) - _ Filter = (*Unlike)(nil) - _ Filter = (*LessThan)(nil) - _ Filter = (*LessThanOrEqual)(nil) - _ Filter = (*GreaterThan)(nil) - _ Filter = (*GreaterThanOrEqual)(nil) + _ Filter = (*Condition)(nil) ) diff --git a/internal/incident/incident.go b/internal/incident/incident.go index dc00b2a3..c708030a 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -35,6 +35,14 @@ type Incident struct { incidentRowID int64 + // timer calls RetriggerEscalations the next time any escalation could be reached on the incident. + // + // For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident + // is less than an hour old, timer will fire 1h after incident start, if the incident is between 1h and 2h + // old, timer will fire after 2h, and if the incident is already older than 2h, no future escalations can + // be reached solely based on the incident aging, so no more timer is necessary and timer stores nil. + timer *time.Timer + db *icingadb.DB logger *zap.SugaredLogger runtimeConfig *config.RuntimeConfig @@ -42,6 +50,20 @@ type Incident struct { sync.Mutex } +func NewIncident( + db *icingadb.DB, obj *object.Object, runtimeConfig *config.RuntimeConfig, logger *zap.SugaredLogger, +) *Incident { + return &Incident{ + db: db, + Object: obj, + logger: logger, + runtimeConfig: runtimeConfig, + EscalationState: map[escalationID]*EscalationState{}, + Rules: map[ruleID]struct{}{}, + Recipients: map[recipient.Key]*RecipientState{}, + } +} + func (i *Incident) ObjectDisplayName() string { return i.Object.DisplayName() } @@ -133,7 +155,12 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo } // Re-evaluate escalations based on the newly evaluated rules. - if err := i.evaluateEscalations(ctx, tx, ev, causedBy); err != nil { + escalations, err := i.evaluateEscalations(ev) + if err != nil { + return err + } + + if err := i.triggerEscalations(ctx, tx, ev, causedBy, escalations); err != nil { return err } @@ -151,6 +178,68 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo return i.notifyContacts(ctx, ev, notifications) } +// RetriggerEscalations tries to re-evaluate the escalations and notify contacts. +func (i *Incident) RetriggerEscalations(ev *event.Event) { + i.Lock() + defer i.Unlock() + + i.runtimeConfig.RLock() + defer i.runtimeConfig.RUnlock() + + if !i.RecoveredAt.IsZero() { + // Incident is recovered in the meantime. + return + } + + if !time.Now().After(ev.Time) { + i.logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev)) + return + } + + escalations, err := i.evaluateEscalations(ev) + if err != nil { + i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + return + } + + if len(escalations) == 0 { + i.logger.Debug("Reevaluated escalations, no new escalations triggered") + return + } + + var notifications []*NotificationEntry + ctx := context.Background() + err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { + err := ev.Sync(ctx, tx, i.db, i.Object.ID) + if err != nil { + return err + } + + if err = i.triggerEscalations(ctx, tx, ev, types.Int{}, escalations); err != nil { + return err + } + + channels := make(contactChannels) + for _, escalation := range escalations { + channels.loadEscalationRecipientsChannel(escalation, i, ev.Time) + } + + notifications, err = i.addPendingNotifications(ctx, tx, ev, channels, types.Int{}) + + return err + }) + if err != nil { + i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + } else { + if err = i.notifyContacts(ctx, ev, notifications); err != nil { + i.logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) + return + } + + i.logger.Info("Successfully reevaluated time-based escalations") + } +} + func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) (types.Int, error) { var causedByHistoryId types.Int oldSeverity := i.Severity @@ -200,6 +289,10 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, return types.Int{}, errors.New("can't insert incident closed history to the database") } + + if i.timer != nil { + i.timer.Stop() + } } i.Severity = newSeverity @@ -298,13 +391,26 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 return causedBy, nil } -// evaluateEscalations evaluates this incidents rule escalations if they aren't already. -// Returns error on database failure. -func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error { +// evaluateEscalations evaluates this incidents rule escalations to be triggered if they aren't already. +// Returns the newly evaluated escalations to be triggered or an error on database failure. +func (i *Incident) evaluateEscalations(e *event.Event) ([]*rule.Escalation, error) { if i.EscalationState == nil { i.EscalationState = make(map[int64]*EscalationState) } + // Escalations are reevaluated now, reset any existing timer, if there might be future time-based escalations, + // this function will start a new timer. + if i.timer != nil { + i.logger.Info("Stopping reevaluate timer due to escalation evaluation") + i.timer.Stop() + i.timer = nil + } + + filterContext := &rule.EscalationFilter{IncidentAge: e.Time.Sub(i.StartedAt), IncidentSeverity: i.Severity} + + var escalations []*rule.Escalation + retryAfter := rule.RetryNever + for rID := range i.Rules { r := i.runtimeConfig.Rules[rID] @@ -320,13 +426,8 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve if escalation.Condition == nil { matched = true } else { - cond := &rule.EscalationFilter{ - IncidentAge: time.Since(i.StartedAt), - IncidentSeverity: i.Severity, - } - var err error - matched, err = escalation.Condition.Eval(cond) + matched, err = escalation.Condition.Eval(filterContext) if err != nil { i.logger.Warnw( "Failed to evaluate escalation condition", zap.String("rule", r.Name), @@ -334,47 +435,81 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve ) matched = false + } else if !matched { + incidentAgeFilter := filterContext.ReevaluateAfter(escalation.Condition) + retryAfter = min(retryAfter, incidentAgeFilter) } } if matched { - state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} - i.EscalationState[escalation.ID] = state + escalations = append(escalations, escalation) + } + } + } + } - i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName()) + if retryAfter != rule.RetryNever { + // The retryAfter duration is relative to the incident duration represented by the escalation filter, + // i.e. if an incident is 15m old and an escalation rule evaluates incident_age>=1h the retryAfter would + // contain 45m (1h - incident age (15m)). Therefore, we have to use the event time instead of the incident + // start time here. + nextEvalAt := e.Time.Add(retryAfter) - if err := i.AddEscalationTriggered(ctx, tx, state); err != nil { - i.logger.Errorw( - "Failed to upsert escalation state", zap.String("rule", r.Name), - zap.String("escalation", escalation.DisplayName()), zap.Error(err), - ) + i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) + i.timer = time.AfterFunc(retryAfter, func() { + i.logger.Info("Reevaluating escalations") - return errors.New("failed to upsert escalation state") - } + i.RetriggerEscalations(&event.Event{ + Type: event.TypeInternal, + Time: nextEvalAt, + Severity: e.Severity, + Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)), + }) + }) + } - history := &HistoryRow{ - Time: state.TriggeredAt, - EventID: utils.ToDBInt(ev.ID), - RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), - RuleID: utils.ToDBInt(r.ID), - Type: EscalationTriggered, - CausedByIncidentHistoryID: causedBy, - } + return escalations, nil +} - if _, err := i.AddHistory(ctx, tx, history, false); err != nil { - i.logger.Errorw( - "Failed to insert escalation triggered incident history", zap.String("rule", r.Name), - zap.String("escalation", escalation.DisplayName()), zap.Error(err), - ) +// triggerEscalations triggers the given escalations and generates incident history items for each of them. +// Returns an error on database failure. +func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int, escalations []*rule.Escalation) error { + for _, escalation := range escalations { + r := i.runtimeConfig.Rules[escalation.RuleID] + i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName()) - return errors.New("failed to insert escalation triggered incident history") - } + state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} + i.EscalationState[escalation.ID] = state - if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { - return err - } - } - } + if err := i.AddEscalationTriggered(ctx, tx, state); err != nil { + i.logger.Errorw( + "Failed to upsert escalation state", zap.String("rule", r.Name), + zap.String("escalation", escalation.DisplayName()), zap.Error(err), + ) + + return errors.New("failed to upsert escalation state") + } + + history := &HistoryRow{ + Time: state.TriggeredAt, + EventID: utils.ToDBInt(ev.ID), + RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), + RuleID: utils.ToDBInt(r.ID), + Type: EscalationTriggered, + CausedByIncidentHistoryID: causedBy, + } + + if _, err := i.AddHistory(ctx, tx, history, false); err != nil { + i.logger.Errorw( + "Failed to insert escalation triggered incident history", zap.String("rule", r.Name), + zap.String("escalation", escalation.DisplayName()), zap.Error(err), + ) + + return errors.New("failed to insert escalation triggered incident history") + } + + if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { + return err } } @@ -542,6 +677,52 @@ func (i *Incident) getRecipientsChannel(t time.Time) contactChannels { return contactChs } +// restoreRecipients reloads the current incident recipients from the database. +// Returns error on database failure. +func (i *Incident) restoreRecipients(ctx context.Context) error { + contact := &ContactRow{} + var contacts []*ContactRow + err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID()) + if err != nil { + i.logger.Errorw( + "Failed to restore incident recipients from the database", zap.String("object", i.ObjectDisplayName()), + zap.String("incident", i.String()), zap.Error(err), + ) + + return errors.New("failed to restore incident recipients") + } + + recipients := make(map[recipient.Key]*RecipientState) + for _, contact := range contacts { + recipients[contact.Key] = &RecipientState{Role: contact.Role} + } + + i.Recipients = recipients + + return nil +} + +// restoreEscalationsState restores all escalation states matching the current incident id from the database. +// Returns error on database failure. +func (i *Incident) restoreEscalationsState(ctx context.Context) error { + state := &EscalationState{} + var states []*EscalationState + err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID()) + if err != nil { + i.logger.Errorw("Failed to restore incident rule escalation states", zap.Error(err)) + + return errors.New("failed to restore incident rule escalation states") + } + + for _, state := range states { + i.EscalationState[state.RuleEscalationID] = state + } + + i.RestoreEscalationStateRules(states) + + return nil +} + type EscalationState struct { IncidentID int64 `db:"incident_id"` RuleEscalationID int64 `db:"rule_escalation_id"` diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 0d1bbe5d..02721765 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -5,12 +5,14 @@ import ( "database/sql" "errors" "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" - "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" "go.uber.org/zap" "sync" + "time" ) var ( @@ -18,6 +20,42 @@ var ( currentIncidentsMu sync.Mutex ) +// LoadOpenIncidents loads all active (not yet closed) incidents from the database and restores all their states. +// Returns error ony database failure. +func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig) error { + logger.Info("Loading all active incidents from database") + + var objectIDs []types.Binary + err := db.SelectContext(ctx, &objectIDs, `SELECT object_id FROM incident WHERE "recovered_at" IS NULL`) + if err != nil { + logger.Errorw("Failed to load active incidents from database", zap.Error(err)) + + return errors.New("failed to fetch open incidents") + } + + for _, objectID := range objectIDs { + obj, err := object.LoadFromDB(ctx, db, objectID) + if err != nil { + logger.Errorw("Failed to retrieve incident object from database", zap.Error(err)) + continue + } + + incident, _, err := GetCurrent(ctx, db, obj, logger, runtimeConfig, false) + if err != nil { + continue + } + + incident.RetriggerEscalations(&event.Event{ + Time: time.Now(), + Type: event.TypeInternal, + Severity: incident.Severity, + Message: "Incident reevaluation at daemon startup", + }) + } + + return nil +} + func GetCurrent( ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool, @@ -30,15 +68,8 @@ func GetCurrent( if currentIncident == nil { ir := &IncidentRow{} - incident := &Incident{ - Object: obj, - db: db, - logger: logger.With(zap.String("object", obj.DisplayName())), - runtimeConfig: runtimeConfig, - Recipients: map[recipient.Key]*RecipientState{}, - EscalationState: map[escalationID]*EscalationState{}, - Rules: map[ruleID]struct{}{}, - } + incidentLogger := logger.With(zap.String("object", obj.DisplayName())) + incident := NewIncident(db, obj, runtimeConfig, incidentLogger) err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) if err != nil && !errors.Is(err, sql.ErrNoRows) { @@ -48,23 +79,13 @@ func GetCurrent( } else if err == nil { incident.incidentRowID = ir.ID incident.StartedAt = ir.StartedAt.Time() + incident.Severity = ir.Severity incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String())) - state := &EscalationState{} - var states []*EscalationState - err = db.SelectContext(ctx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) - if err != nil { - incident.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err)) - - return nil, false, errors.New("failed to load incident rule escalation states") - } - - for _, state := range states { - incident.EscalationState[state.RuleEscalationID] = state + if err := incident.restoreEscalationsState(ctx); err != nil { + return nil, false, err } - incident.RestoreEscalationStateRules(states) - currentIncident = incident } @@ -82,21 +103,9 @@ func GetCurrent( currentIncident.Lock() defer currentIncident.Unlock() - contact := &ContactRow{} - var contacts []*ContactRow - err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) - if err != nil { - currentIncident.logger.Errorw("Failed to reload incident recipients", zap.Error(err)) - - return nil, false, errors.New("failed to load incident recipients") + if err := currentIncident.restoreRecipients(ctx); err != nil { + return nil, false, err } - - recipients := make(map[recipient.Key]*RecipientState) - for _, contact := range contacts { - recipients[contact.Key] = &RecipientState{Role: contact.Role} - } - - currentIncident.Recipients = recipients } return currentIncident, created, nil diff --git a/internal/object/db_types.go b/internal/object/db_types.go index c3a8a496..e8e6eec0 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -1,6 +1,9 @@ package object import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" ) @@ -37,3 +40,50 @@ func (or *ObjectRow) Upsert() interface{} { URL types.String `db:"url"` }{} } + +// LoadFromDB loads objects from the database matching the given id. +// This is only used to load the objects at daemon startup before the listener becomes ready, +// therefore it doesn't lock the objects cache mutex and panics when the given object ID is already +// in the cache. Otherwise, loads all the required data and returns error on database failure. +func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, error) { + if obj, ok := cache[id.String()]; ok { + panic(fmt.Sprintf("Object %s is already in cache", obj.DisplayName())) + } + + objectRow := &ObjectRow{ID: id} + err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow) + if err != nil { + return nil, fmt.Errorf("failed to fetch object: %w", err) + } + + tags := map[string]string{"host": objectRow.Host} + if objectRow.Service.Valid { + tags["service"] = objectRow.Service.String + } + + var extraTagRows []*ExtraTagRow + err = db.SelectContext( + ctx, &extraTagRows, + db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch object extra tags: %w", err) + } + + extraTags := map[string]string{} + for _, extraTag := range extraTagRows { + extraTags[extraTag.Tag] = extraTag.Value + } + + obj := &Object{ + db: db, + ID: id, + Name: objectRow.Name, + URL: objectRow.URL.String, + Tags: tags, + ExtraTags: extraTags, + } + cache[id.String()] = obj + + return obj, nil +} diff --git a/internal/rule/condition.go b/internal/rule/condition.go index 77e005f9..9bf4db3b 100644 --- a/internal/rule/condition.go +++ b/internal/rule/condition.go @@ -3,15 +3,45 @@ package rule import ( "fmt" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/filter" + "math" "time" ) +// RetryNever indicates that an escalation condition should never be retried once it has been evaluated. +const RetryNever = time.Duration(math.MaxInt64) + type EscalationFilter struct { IncidentAge time.Duration IncidentSeverity event.Severity } -func (c *EscalationFilter) EvalEqual(key string, value string) (bool, error) { +// ReevaluateAfter returns the duration after which escalationCond should be reevaluated the +// next time on the incident represented by e. +// +// escalationCond must correspond to an escalation that did not trigger on the incident +// represented by e before. If nothing in the incident changes apart from time passing by, +// the escalation is guaranteed to not trigger within the returned duration. After that +// duration, the escalation should be reevaluated, and it may or may not trigger. If anything +// else changes, for example due to an external event, the escalation must be reevaluated as +// well. +func (e *EscalationFilter) ReevaluateAfter(escalationCond filter.Filter) time.Duration { + retryAfter := RetryNever + for _, condition := range escalationCond.ExtractConditions() { + if condition.Column() == "incident_age" { + v, err := time.ParseDuration(condition.Value()) + if err == nil && v > e.IncidentAge { + // The incident age is compared with a value in the future. Once that age is + // reached, the escalation could trigger, so consider that time for reevaluation. + retryAfter = min(retryAfter, v-e.IncidentAge) + } + } + } + + return retryAfter +} + +func (e *EscalationFilter) EvalEqual(key string, value string) (bool, error) { switch key { case "incident_age": age, err := time.ParseDuration(value) @@ -19,20 +49,20 @@ func (c *EscalationFilter) EvalEqual(key string, value string) (bool, error) { return false, err } - return c.IncidentAge == age, nil + return e.IncidentAge == age, nil case "incident_severity": severity, err := event.GetSeverityByName(value) if err != nil { return false, err } - return c.IncidentSeverity == severity, nil + return e.IncidentSeverity == severity, nil default: return false, nil } } -func (c *EscalationFilter) EvalLess(key string, value string) (bool, error) { +func (e *EscalationFilter) EvalLess(key string, value string) (bool, error) { switch key { case "incident_age": age, err := time.ParseDuration(value) @@ -40,24 +70,24 @@ func (c *EscalationFilter) EvalLess(key string, value string) (bool, error) { return false, err } - return c.IncidentAge < age, nil + return e.IncidentAge < age, nil case "incident_severity": severity, err := event.GetSeverityByName(value) if err != nil { return false, err } - return c.IncidentSeverity < severity, nil + return e.IncidentSeverity < severity, nil default: return false, nil } } -func (c *EscalationFilter) EvalLike(key string, value string) (bool, error) { +func (e *EscalationFilter) EvalLike(key string, value string) (bool, error) { return false, fmt.Errorf("escalation filter doesn't support wildcard matches") } -func (c *EscalationFilter) EvalLessOrEqual(key string, value string) (bool, error) { +func (e *EscalationFilter) EvalLessOrEqual(key string, value string) (bool, error) { switch key { case "incident_age": age, err := time.ParseDuration(value) @@ -65,20 +95,20 @@ func (c *EscalationFilter) EvalLessOrEqual(key string, value string) (bool, erro return false, err } - return c.IncidentAge <= age, nil + return e.IncidentAge <= age, nil case "incident_severity": severity, err := event.GetSeverityByName(value) if err != nil { return false, err } - return c.IncidentSeverity <= severity, nil + return e.IncidentSeverity <= severity, nil default: return false, nil } } -func (c *EscalationFilter) EvalExists(key string) bool { +func (e *EscalationFilter) EvalExists(key string) bool { switch key { case "incident_age": fallthrough diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 7bf6b53b..a809bbe3 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -30,6 +30,26 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str ) } +// RunInTx allows running a function in a database transaction without requiring manual transaction handling. +// +// A new transaction is started on db which is then passed to fn. After fn returns, the transaction is +// committed unless an error was returned. If fn returns an error, that error is returned, otherwise an +// error is returned if a database operation fails. +func RunInTx(ctx context.Context, db *icingadb.DB, fn func(tx *sqlx.Tx) error) error { + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + err = fn(tx) + if err != nil { + return err + } + + return tx.Commit() +} + // InsertAndFetchId executes the given query and fetches the last inserted ID. func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) (int64, error) { var lastInsertId int64