From b8917eb5a62593b42aa250fc40a73f598777de0e Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 11 May 2023 17:08:39 +0200 Subject: [PATCH 01/12] Load all open incidents from DB on daemon startup --- cmd/icinga-notifications-daemon/main.go | 6 ++ internal/event/event.go | 1 + internal/incident/incident.go | 78 ++++++++++++++++-- internal/incident/incidents.go | 105 +++++++++++++++--------- internal/object/db_types.go | 43 ++++++++++ 5 files changed, 189 insertions(+), 44 deletions(-) diff --git a/cmd/icinga-notifications-daemon/main.go b/cmd/icinga-notifications-daemon/main.go index f9243ffb..b0f8f438 100644 --- a/cmd/icinga-notifications-daemon/main.go +++ b/cmd/icinga-notifications-daemon/main.go @@ -8,6 +8,7 @@ import ( "github.com/icinga/icinga-notifications/internal/channel" "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" + "github.com/icinga/icinga-notifications/internal/incident" "github.com/icinga/icinga-notifications/internal/listener" "github.com/icinga/icingadb/pkg/logging" "github.com/icinga/icingadb/pkg/utils" @@ -91,6 +92,11 @@ func main() { go runtimeConfig.PeriodicUpdates(ctx, 1*time.Second) + err = incident.LoadOpenIncidents(ctx, db, logs.GetChildLogger("incident"), runtimeConfig) + if err != nil { + logger.Fatalw("Can't load incidents from database", zap.Error(err)) + } + if err := listener.NewListener(db, runtimeConfig, logs).Run(ctx); err != nil { logger.Errorw("Listener has finished with an error", zap.Error(err)) } else { diff --git a/internal/event/event.go b/internal/event/event.go index dd9eee0f..48b39fbf 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -31,6 +31,7 @@ type Event struct { const ( TypeState = "state" TypeAcknowledgement = "acknowledgement" + TypeInternal = "internal" ) func (e *Event) String() string { diff --git a/internal/incident/incident.go b/internal/incident/incident.go index dc00b2a3..661e1ff7 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -42,6 +42,20 @@ type Incident struct { sync.Mutex } +func NewIncident( + db *icingadb.DB, obj *object.Object, runtimeConfig *config.RuntimeConfig, logger *zap.SugaredLogger, +) *Incident { + return &Incident{ + db: db, + Object: obj, + logger: logger, + runtimeConfig: runtimeConfig, + EscalationState: map[escalationID]*EscalationState{}, + Rules: map[ruleID]struct{}{}, + Recipients: map[recipient.Key]*RecipientState{}, + } +} + func (i *Incident) ObjectDisplayName() string { return i.Object.DisplayName() } @@ -133,7 +147,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo } // Re-evaluate escalations based on the newly evaluated rules. - if err := i.evaluateEscalations(ctx, tx, ev, causedBy); err != nil { + if _, err := i.evaluateEscalations(ctx, tx, ev, causedBy); err != nil { return err } @@ -299,12 +313,14 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 } // evaluateEscalations evaluates this incidents rule escalations if they aren't already. -// Returns error on database failure. -func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) error { +// Returns whether a new escalation triggered or an error on database failure. +func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) (bool, error) { if i.EscalationState == nil { i.EscalationState = make(map[int64]*EscalationState) } + newEscalationMatched := false + for rID := range i.Rules { r := i.runtimeConfig.Rules[rID] @@ -338,6 +354,8 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve } if matched { + newEscalationMatched = true + state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} i.EscalationState[escalation.ID] = state @@ -349,7 +367,7 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve zap.String("escalation", escalation.DisplayName()), zap.Error(err), ) - return errors.New("failed to upsert escalation state") + return false, errors.New("failed to upsert escalation state") } history := &HistoryRow{ @@ -367,18 +385,18 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve zap.String("escalation", escalation.DisplayName()), zap.Error(err), ) - return errors.New("failed to insert escalation triggered incident history") + return false, errors.New("failed to insert escalation triggered incident history") } if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { - return err + return false, err } } } } } - return nil + return newEscalationMatched, nil } // notifyContacts executes all the given pending notifications of the current incident. @@ -542,6 +560,52 @@ func (i *Incident) getRecipientsChannel(t time.Time) contactChannels { return contactChs } +// restoreRecipients reloads the current incident recipients from the database. +// Returns error on database failure. +func (i *Incident) restoreRecipients(ctx context.Context) error { + contact := &ContactRow{} + var contacts []*ContactRow + err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID()) + if err != nil { + i.logger.Errorw( + "Failed to restore incident recipients from the database", zap.String("object", i.ObjectDisplayName()), + zap.String("incident", i.String()), zap.Error(err), + ) + + return errors.New("failed to restore incident recipients") + } + + recipients := make(map[recipient.Key]*RecipientState) + for _, contact := range contacts { + recipients[contact.Key] = &RecipientState{Role: contact.Role} + } + + i.Recipients = recipients + + return nil +} + +// restoreEscalationsState restores all escalation states matching the current incident id from the database. +// Returns error on database failure. +func (i *Incident) restoreEscalationsState(ctx context.Context) error { + state := &EscalationState{} + var states []*EscalationState + err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID()) + if err != nil { + i.logger.Errorw("Failed to restore incident rule escalation states", zap.Error(err)) + + return errors.New("failed to restore incident rule escalation states") + } + + for _, state := range states { + i.EscalationState[state.RuleEscalationID] = state + } + + i.RestoreEscalationStateRules(states) + + return nil +} + type EscalationState struct { IncidentID int64 `db:"incident_id"` RuleEscalationID int64 `db:"rule_escalation_id"` diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 0d1bbe5d..640ef5a2 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -5,12 +5,14 @@ import ( "database/sql" "errors" "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" - "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" "go.uber.org/zap" "sync" + "time" ) var ( @@ -18,6 +20,64 @@ var ( currentIncidentsMu sync.Mutex ) +// LoadOpenIncidents loads all active (not yet closed) incidents from the database and restores all their states. +// Returns error ony database failure. +func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig) error { + logger.Info("Loading all active incidents from database") + + var objectIDs []types.Binary + err := db.SelectContext(ctx, &objectIDs, `SELECT object_id FROM incident WHERE "recovered_at" IS NULL`) + if err != nil { + logger.Errorw("Failed to load active incidents from database", zap.Error(err)) + + return errors.New("failed to fetch open incidents") + } + + for _, objectID := range objectIDs { + obj, err := object.LoadFromDB(ctx, db, objectID) + if err != nil { + logger.Errorw("Failed to retrieve incident object from database", zap.Error(err)) + continue + } + + incident, _, err := GetCurrent(ctx, db, obj, logger, runtimeConfig, false) + if err != nil { + continue + } + + evaluateRulesAndEscalations := func(ctx context.Context) error { + ev := &event.Event{Time: time.Now(), Type: event.TypeEscalation} + if !incident.evaluateEscalations() { + return nil + } + + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + err = incident.notifyContacts(ctx, tx, ev, types.Int{}) + if err != nil { + return err + } + + if err = tx.Commit(); err != nil { + incident.logger.Errorw("Failed to commit database transaction", zap.Error(err)) + return err + } + + return nil + } + + if evaluateRulesAndEscalations(ctx) != nil { + continue + } + } + + return nil +} + func GetCurrent( ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, create bool, @@ -30,15 +90,8 @@ func GetCurrent( if currentIncident == nil { ir := &IncidentRow{} - incident := &Incident{ - Object: obj, - db: db, - logger: logger.With(zap.String("object", obj.DisplayName())), - runtimeConfig: runtimeConfig, - Recipients: map[recipient.Key]*RecipientState{}, - EscalationState: map[escalationID]*EscalationState{}, - Rules: map[ruleID]struct{}{}, - } + incidentLogger := logger.With(zap.String("object", obj.DisplayName())) + incident := NewIncident(db, obj, runtimeConfig, incidentLogger) err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) if err != nil && !errors.Is(err, sql.ErrNoRows) { @@ -48,23 +101,13 @@ func GetCurrent( } else if err == nil { incident.incidentRowID = ir.ID incident.StartedAt = ir.StartedAt.Time() + incident.Severity = ir.Severity incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String())) - state := &EscalationState{} - var states []*EscalationState - err = db.SelectContext(ctx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID) - if err != nil { - incident.logger.Errorw("Failed to load incident rule escalation states", zap.Error(err)) - - return nil, false, errors.New("failed to load incident rule escalation states") - } - - for _, state := range states { - incident.EscalationState[state.RuleEscalationID] = state + if err := incident.restoreEscalationsState(ctx); err != nil { + return nil, false, err } - incident.RestoreEscalationStateRules(states) - currentIncident = incident } @@ -82,21 +125,9 @@ func GetCurrent( currentIncident.Lock() defer currentIncident.Unlock() - contact := &ContactRow{} - var contacts []*ContactRow - err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID()) - if err != nil { - currentIncident.logger.Errorw("Failed to reload incident recipients", zap.Error(err)) - - return nil, false, errors.New("failed to load incident recipients") - } - - recipients := make(map[recipient.Key]*RecipientState) - for _, contact := range contacts { - recipients[contact.Key] = &RecipientState{Role: contact.Role} + if err := currentIncident.restoreRecipients(ctx); err != nil { + return nil, false, err } - - currentIncident.Recipients = recipients } return currentIncident, created, nil diff --git a/internal/object/db_types.go b/internal/object/db_types.go index c3a8a496..09327201 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -1,6 +1,9 @@ package object import ( + "context" + "fmt" + "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" ) @@ -37,3 +40,43 @@ func (or *ObjectRow) Upsert() interface{} { URL types.String `db:"url"` }{} } + +// LoadFromDB loads objects from the database matching the given id. +// This is only used to load the objects at daemon startup before the listener becomes ready, +// therefore it doesn't lock the objects cache mutex and panics when the given object ID is already +// in the cache. Otherwise, loads all the required data and returns error on database failure. +func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, error) { + if obj, ok := cache[id.String()]; ok { + panic(fmt.Sprintf("Object %s is already in cache", obj.DisplayName())) + } + + objectRow := &ObjectRow{ID: id} + err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow) + if err != nil { + return nil, fmt.Errorf("failed to fetch object: %w", err) + } + + tags := map[string]string{"host": objectRow.Host} + if objectRow.Service.Valid { + tags["service"] = objectRow.Service.String + } + + var extraTagRows []*ExtraTagRow + err = db.SelectContext( + ctx, &extraTagRows, + db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id, + ) + if err != nil { + return nil, fmt.Errorf("failed to fetch object extra tags: %w", err) + } + + extraTags := map[string]string{} + for _, extraTag := range extraTagRows { + extraTags[extraTag.Tag] = extraTag.Value + } + + obj := &Object{db: db, ID: id, Tags: tags, ExtraTags: extraTags} + cache[id.String()] = obj + + return obj, nil +} From 5b378515ce60876249e8446c1a712c36da967e82 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 23 Oct 2023 12:11:25 +0200 Subject: [PATCH 02/12] Merge `All,Any,None` filter types into a single `Chain` type --- internal/filter/parser.go | 16 ++--- internal/filter/parser_test.go | 17 +++--- internal/filter/types.go | 106 ++++++++++++++++++--------------- 3 files changed, 73 insertions(+), 66 deletions(-) diff --git a/internal/filter/parser.go b/internal/filter/parser.go index 1ce642bf..3a3c1642 100644 --- a/internal/filter/parser.go +++ b/internal/filter/parser.go @@ -15,7 +15,7 @@ type Parser struct { func Parse(expression string) (Filter, error) { parser := &Parser{tag: expression, length: len(expression)} if parser.length == 0 { - return &All{}, nil + return &Chain{op: ALL}, nil } return parser.readFilter(0, "", nil) @@ -97,7 +97,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( if operator != "!" && (next == "&" || next == "|") { if operator == "&" { if len(rules) > 1 { - rules = []Filter{&All{rules: rules}} + rules = []Filter{&Chain{op: ALL, rules: rules}} } operator = next @@ -122,7 +122,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( } else { if negate { negate = false - rules = append(rules, &None{rules: []Filter{condition}}) + rules = append(rules, &Chain{op: NONE, rules: []Filter{condition}}) } else { rules = append(rules, condition) } @@ -154,7 +154,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( if next == "&" || next == "|" { if operator == "" || operator == "&" { if operator == "&" && len(rules) > 1 { - all := &All{rules: rules} + all := &Chain{op: ALL, rules: rules} rules = []Filter{all} } @@ -195,18 +195,18 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( var chain Filter switch operator { case "&": - chain = &All{rules: rules} + chain = &Chain{op: ALL, rules: rules} case "|": - chain = &Any{rules: rules} + chain = &Chain{op: ANY, rules: rules} case "!": - chain = &None{rules: rules} + chain = &Chain{op: NONE, rules: rules} case "": if nestingLevel == 0 && rules != nil { // There is only one filter tag, no chain return rules[0], nil } - chain = &All{rules: rules} + chain = &Chain{op: ALL, rules: rules} default: return nil, p.parseError(operator, "") } diff --git a/internal/filter/parser_test.go b/internal/filter/parser_test.go index ebbba4c7..2dfce534 100644 --- a/internal/filter/parser_test.go +++ b/internal/filter/parser_test.go @@ -94,34 +94,33 @@ func TestFilter(t *testing.T) { rule, err = Parse("foo=bar&bar=foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &All{}, rule) + assert.IsType(t, &Chain{}, rule) rule, err = Parse("foo=bar|bar=foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &Any{}, rule) + assert.IsType(t, &Chain{}, rule) rule, err = Parse("!(foo=bar|bar=foo)") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &None{}, rule) + assert.IsType(t, &Chain{}, rule) rule, err = Parse("!foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - - assert.Equal(t, &None{rules: []Filter{NewExists("foo")}}, rule) + assert.Equal(t, &Chain{op: NONE, rules: []Filter{NewExists("foo")}}, rule) rule, err = Parse("foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.Equal(t, NewExists("foo"), rule) + assert.Equal(t, &Exists{column: "foo"}, rule) rule, err = Parse("!(foo=bar|bar=foo)&(foo=bar|bar=foo)") assert.Nil(t, err, "There should be no errors but got: %s", err) - expected := &All{rules: []Filter{ - &None{rules: []Filter{ + expected := &Chain{op: ALL, rules: []Filter{ + &Chain{op: NONE, rules: []Filter{ &Equal{column: "foo", value: "bar"}, &Equal{column: "bar", value: "foo"}, }}, - &Any{rules: []Filter{ + &Chain{op: ANY, rules: []Filter{ &Equal{column: "foo", value: "bar"}, &Equal{column: "bar", value: "foo"}, }}, diff --git a/internal/filter/types.go b/internal/filter/types.go index 3153cf33..53de449a 100644 --- a/internal/filter/types.go +++ b/internal/filter/types.go @@ -1,63 +1,74 @@ package filter -// All represents a filter chain type that matches when all of its Rules matches. -type All struct { - rules []Filter -} - -func (a *All) Eval(filterable Filterable) (bool, error) { - for _, rule := range a.rules { - matched, err := rule.Eval(filterable) - if err != nil { - return false, err - } +import ( + "fmt" +) - if !matched { - return false, nil - } - } +// LogicalOp is a type used for grouping the logical operators of a filter string. +type LogicalOp string - return true, nil -} +const ( + // NONE represents a filter chain type that matches when none of its ruleset matches. + NONE LogicalOp = "!" + // ALL represents a filter chain type that matches when all of its ruleset matches. + ALL LogicalOp = "&" + // ANY represents a filter chain type that matches when at least one of its ruleset matches. + ANY LogicalOp = "|" +) -// Any represents a filter chain type that matches when at least one of its Rules matches. -type Any struct { +// Chain is a filter type that wraps other filter rules and itself. +// Therefore, it implements the Filter interface to allow it to be part of its ruleset. +// It supports also adding and popping filter rules individually. +type Chain struct { + op LogicalOp // The filter chain operator to be used to evaluate the rules rules []Filter } -func (a *Any) Eval(filterable Filterable) (bool, error) { - for _, rule := range a.rules { - matched, err := rule.Eval(filterable) - if err != nil { - return false, err - } +// Eval evaluates the filter rule sets recursively based on their operator type. +func (c *Chain) Eval(filterable Filterable) (bool, error) { + switch c.op { + case NONE: + for _, rule := range c.rules { + matched, err := rule.Eval(filterable) + if err != nil { + return false, err + } - if matched { - return true, nil + if matched { + return false, nil + } } - } - - return false, nil -} -// None represents a filter chain type that matches when none of its Rules matches. -type None struct { - rules []Filter -} - -func (n *None) Eval(filterable Filterable) (bool, error) { - for _, rule := range n.rules { - matched, err := rule.Eval(filterable) - if err != nil { - return false, err + return true, nil + case ALL: + for _, rule := range c.rules { + matched, err := rule.Eval(filterable) + if err != nil { + return false, err + } + + if !matched { + return false, nil + } } - if matched { - return false, nil + return true, nil + case ANY: + for _, rule := range c.rules { + matched, err := rule.Eval(filterable) + if err != nil { + return false, err + } + + if matched { + return true, nil + } } - } - return true, nil + return false, nil + default: + return false, fmt.Errorf("invalid logical operator provided: %q", c.op) + } } // Condition represents a single filter condition. @@ -172,10 +183,7 @@ func (goe *GreaterThanOrEqual) Eval(filterable Filterable) (bool, error) { } var ( - _ Filter = (*All)(nil) - _ Filter = (*Any)(nil) - _ Filter = (*None)(nil) - + _ Filter = (*Chain)(nil) _ Filter = (*Exists)(nil) _ Filter = (*Equal)(nil) _ Filter = (*UnEqual)(nil) From b678b2e52c4f2952f6cd7739c6bdeb15e80431ff Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 26 Oct 2023 10:58:29 +0200 Subject: [PATCH 03/12] Merge all filter condition types into `Condition` type --- internal/filter/parser.go | 16 +-- internal/filter/parser_test.go | 53 ++++++---- internal/filter/types.go | 178 ++++++++++++++++----------------- 3 files changed, 129 insertions(+), 118 deletions(-) diff --git a/internal/filter/parser.go b/internal/filter/parser.go index 3a3c1642..934f3298 100644 --- a/internal/filter/parser.go +++ b/internal/filter/parser.go @@ -257,24 +257,24 @@ func (p *Parser) createCondition(column string, operator string, value string) ( switch operator { case "=": if strings.Contains(value, "*") { - return &Like{column: column, value: value}, nil + return &Condition{op: Like, column: column, value: value}, nil } - return &Equal{column: column, value: value}, nil + return &Condition{op: Equal, column: column, value: value}, nil case "!=": if strings.Contains(value, "*") { - return &Unlike{column: column, value: value}, nil + return &Condition{op: UnLike, column: column, value: value}, nil } - return &UnEqual{column: column, value: value}, nil + return &Condition{op: UnEqual, column: column, value: value}, nil case ">": - return &GreaterThan{column: column, value: value}, nil + return &Condition{op: GreaterThan, column: column, value: value}, nil case ">=": - return &GreaterThanOrEqual{column: column, value: value}, nil + return &Condition{op: GreaterThanEqual, column: column, value: value}, nil case "<": - return &LessThan{column: column, value: value}, nil + return &Condition{op: LessThan, column: column, value: value}, nil case "<=": - return &LessThanOrEqual{column: column, value: value}, nil + return &Condition{op: LessThanEqual, column: column, value: value}, nil default: return nil, fmt.Errorf("invalid operator %s provided", operator) } diff --git a/internal/filter/parser_test.go b/internal/filter/parser_test.go index 2dfce534..82b2ff61 100644 --- a/internal/filter/parser_test.go +++ b/internal/filter/parser_test.go @@ -62,35 +62,43 @@ func TestFilter(t *testing.T) { t.Run("ParserIdentifiesAllKindOfFilters", func(t *testing.T) { rule, err := Parse("foo=bar") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &Equal{}, rule) + expected := &Condition{op: Equal, column: "foo", value: "bar"} + assert.Equal(t, expected, rule) rule, err = Parse("foo!=bar") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &UnEqual{}, rule) + expected = &Condition{op: UnEqual, column: "foo", value: "bar"} + assert.Equal(t, expected, rule) rule, err = Parse("foo=bar*") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &Like{}, rule) + expected = &Condition{op: Like, column: "foo", value: "bar*"} + assert.Equal(t, expected, rule) rule, err = Parse("foo!=bar*") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &Unlike{}, rule) + expected = &Condition{op: UnLike, column: "foo", value: "bar*"} + assert.Equal(t, expected, rule) rule, err = Parse("foobar") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &GreaterThan{}, rule) + expected = &Condition{op: GreaterThan, column: "foo", value: "bar"} + assert.Equal(t, expected, rule) rule, err = Parse("foo>=bar") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.IsType(t, &GreaterThanOrEqual{}, rule) + expected = &Condition{op: GreaterThanEqual, column: "foo", value: "bar"} + assert.Equal(t, expected, rule) rule, err = Parse("foo=bar&bar=foo") assert.Nil(t, err, "There should be no errors but got: %s", err) @@ -115,47 +123,52 @@ func TestFilter(t *testing.T) { rule, err = Parse("!(foo=bar|bar=foo)&(foo=bar|bar=foo)") assert.Nil(t, err, "There should be no errors but got: %s", err) - expected := &Chain{op: ALL, rules: []Filter{ + expectedChain := &Chain{op: ALL, rules: []Filter{ &Chain{op: NONE, rules: []Filter{ - &Equal{column: "foo", value: "bar"}, - &Equal{column: "bar", value: "foo"}, + &Condition{op: Equal, column: "foo", value: "bar"}, + &Condition{op: Equal, column: "bar", value: "foo"}, }}, &Chain{op: ANY, rules: []Filter{ - &Equal{column: "foo", value: "bar"}, - &Equal{column: "bar", value: "foo"}, + &Condition{op: Equal, column: "foo", value: "bar"}, + &Condition{op: Equal, column: "bar", value: "foo"}, }}, }} - assert.Equal(t, expected, rule) + assert.Equal(t, expectedChain, rule) }) t.Run("ParserIdentifiesSingleCondition", func(t *testing.T) { rule, err := Parse("foo=bar") assert.Nil(t, err, "There should be no errors but got: %s", err) - expected := &Equal{column: "foo", value: "bar"} + expected := &Condition{op: Equal, column: "foo", value: "bar"} assert.Equal(t, expected, rule, "Parser doesn't parse single condition correctly") }) t.Run("UrlEncodedFilterExpression", func(t *testing.T) { rule, err := Parse("col%3Cumnval%28ue") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.Equal(t, &GreaterThan{column: "col(umn", value: "val(ue"}, rule) + expected = &Condition{op: GreaterThan, column: "col(umn", value: "val(ue"} + assert.Equal(t, expected, rule) rule, err = Parse("col%29umn>=val%29ue") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.Equal(t, &GreaterThanOrEqual{column: "col)umn", value: "val)ue"}, rule) + expected = &Condition{op: GreaterThanEqual, column: "col)umn", value: "val)ue"} + assert.Equal(t, expected, rule) }) } diff --git a/internal/filter/types.go b/internal/filter/types.go index 53de449a..93a4e199 100644 --- a/internal/filter/types.go +++ b/internal/filter/types.go @@ -71,126 +71,124 @@ func (c *Chain) Eval(filterable Filterable) (bool, error) { } } +// CompOperator is a type used for grouping the individual comparison operators of a filter string. +type CompOperator string + +// List of the supported comparison operators. +const ( + Equal CompOperator = "=" + UnEqual CompOperator = "!=" + Like CompOperator = "~" + UnLike CompOperator = "!~" + LessThan CompOperator = "<" + LessThanEqual CompOperator = "<=" + GreaterThan CompOperator = ">" + GreaterThanEqual CompOperator = ">=" +) + // Condition represents a single filter condition. +// It provides an implementation of the Filter interface for each of the supported CompOperator. +// All it's fields are read-only and aren't supposed to change at runtime. For read access, you can +// check the available exported methods. type Condition struct { + op CompOperator column string value string } -func NewCondition(column string, value string) *Condition { - return &Condition{ - column: column, - value: value, +// Eval evaluates this Condition based on its operator. +// Returns true when the filter evaluates to true false otherwise. +func (c *Condition) Eval(filterable Filterable) (bool, error) { + if !filterable.EvalExists(c.column) { + return false, nil } -} - -type Exists Condition -func NewExists(column string) *Exists { - return &Exists{column: column} -} - -func (e *Exists) Eval(filterable Filterable) (bool, error) { - return filterable.EvalExists(e.column), nil -} + switch c.op { + case Equal: + match, err := filterable.EvalEqual(c.column, c.value) + if err != nil { + return false, err + } -type Equal Condition + return match, nil + case UnEqual: + match, err := filterable.EvalEqual(c.column, c.value) + if err != nil { + return false, err + } -func (e *Equal) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalEqual(e.column, e.value) - if err != nil { - return false, err - } + return !match, nil + case Like: + match, err := filterable.EvalLike(c.column, c.value) + if err != nil { + return false, err + } - return match, nil -} + return match, nil + case UnLike: + match, err := filterable.EvalLike(c.column, c.value) + if err != nil { + return false, err + } -type UnEqual Condition + return !match, nil + case LessThan: + match, err := filterable.EvalLess(c.column, c.value) + if err != nil { + return false, err + } -func (u *UnEqual) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalEqual(u.column, u.value) - if err != nil { - return false, err - } + return match, nil + case LessThanEqual: + match, err := filterable.EvalLessOrEqual(c.column, c.value) + if err != nil { + return false, err + } - return filterable.EvalExists(u.column) && !match, nil -} + return match, nil + case GreaterThan: + match, err := filterable.EvalLessOrEqual(c.column, c.value) + if err != nil { + return false, err + } -type Like Condition + return !match, nil + case GreaterThanEqual: + match, err := filterable.EvalLess(c.column, c.value) + if err != nil { + return false, err + } -func (l *Like) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLike(l.column, l.value) - if err != nil { - return false, err + return !match, nil + default: + return false, fmt.Errorf("invalid comparison operator provided: %q", c.op) } - - return match, nil } -type Unlike Condition - -func (u *Unlike) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLike(u.column, u.value) - if err != nil { - return false, err - } - - return filterable.EvalExists(u.column) && !match, nil +// Column returns the column of this Condition. +func (c *Condition) Column() string { + return c.column } -type LessThan Condition - -func (less *LessThan) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLess(less.column, less.value) - if err != nil { - return false, err - } - - return match, nil +// Value returns the value of this Condition. +func (c *Condition) Value() string { + return c.value } -type LessThanOrEqual Condition - -func (loe *LessThanOrEqual) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLessOrEqual(loe.column, loe.value) - if err != nil { - return false, err - } - - return match, nil +type Exists struct { + column string } -type GreaterThan Condition - -func (g *GreaterThan) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLessOrEqual(g.column, g.value) - if err != nil { - return false, err - } - - return filterable.EvalExists(g.column) && !match, nil +func NewExists(column string) *Exists { + return &Exists{column: column} } -type GreaterThanOrEqual Condition - -func (goe *GreaterThanOrEqual) Eval(filterable Filterable) (bool, error) { - match, err := filterable.EvalLess(goe.column, goe.value) - if err != nil { - return false, err - } - - return filterable.EvalExists(goe.column) && !match, nil +func (e *Exists) Eval(filterable Filterable) (bool, error) { + return filterable.EvalExists(e.column), nil } var ( _ Filter = (*Chain)(nil) _ Filter = (*Exists)(nil) - _ Filter = (*Equal)(nil) - _ Filter = (*UnEqual)(nil) - _ Filter = (*Like)(nil) - _ Filter = (*Unlike)(nil) - _ Filter = (*LessThan)(nil) - _ Filter = (*LessThanOrEqual)(nil) - _ Filter = (*GreaterThan)(nil) - _ Filter = (*GreaterThanOrEqual)(nil) + _ Filter = (*Condition)(nil) ) From 5054d13053a4f4ab3209446b6b1398fd7bb4f936 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 27 Oct 2023 11:39:17 +0200 Subject: [PATCH 04/12] Allow extracting all individual conditions/comparisons from a filter expresion --- internal/filter/contracts.go | 1 + internal/filter/types.go | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/internal/filter/contracts.go b/internal/filter/contracts.go index 6ccbb6be..12543dd0 100644 --- a/internal/filter/contracts.go +++ b/internal/filter/contracts.go @@ -12,4 +12,5 @@ type Filterable interface { // Filter is implemented by every filter chains and filter conditions. type Filter interface { Eval(filterable Filterable) (bool, error) + ExtractConditions() []*Condition } diff --git a/internal/filter/types.go b/internal/filter/types.go index 93a4e199..bcc15e5f 100644 --- a/internal/filter/types.go +++ b/internal/filter/types.go @@ -71,6 +71,19 @@ func (c *Chain) Eval(filterable Filterable) (bool, error) { } } +func (c *Chain) ExtractConditions() []*Condition { + var conditions []*Condition + for _, rule := range c.rules { + if _, ok := rule.(*Exists); ok { + continue + } + + conditions = append(conditions, rule.ExtractConditions()...) + } + + return conditions +} + // CompOperator is a type used for grouping the individual comparison operators of a filter string. type CompOperator string @@ -165,6 +178,10 @@ func (c *Condition) Eval(filterable Filterable) (bool, error) { } } +func (c *Condition) ExtractConditions() []*Condition { + return []*Condition{c} +} + // Column returns the column of this Condition. func (c *Condition) Column() string { return c.column @@ -179,6 +196,10 @@ type Exists struct { column string } +func (e *Exists) ExtractConditions() []*Condition { + panic("filter exists doesn't support extract conditions") +} + func NewExists(column string) *Exists { return &Exists{column: column} } From c117c49304180c458f20815783da1b10717895ca Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 12 May 2023 13:13:29 +0200 Subject: [PATCH 05/12] Rename EscalationFilter receiver variable name --- internal/rule/condition.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/rule/condition.go b/internal/rule/condition.go index 77e005f9..6c539178 100644 --- a/internal/rule/condition.go +++ b/internal/rule/condition.go @@ -11,7 +11,7 @@ type EscalationFilter struct { IncidentSeverity event.Severity } -func (c *EscalationFilter) EvalEqual(key string, value string) (bool, error) { +func (e *EscalationFilter) EvalEqual(key string, value string) (bool, error) { switch key { case "incident_age": age, err := time.ParseDuration(value) @@ -19,20 +19,20 @@ func (c *EscalationFilter) EvalEqual(key string, value string) (bool, error) { return false, err } - return c.IncidentAge == age, nil + return e.IncidentAge == age, nil case "incident_severity": severity, err := event.GetSeverityByName(value) if err != nil { return false, err } - return c.IncidentSeverity == severity, nil + return e.IncidentSeverity == severity, nil default: return false, nil } } -func (c *EscalationFilter) EvalLess(key string, value string) (bool, error) { +func (e *EscalationFilter) EvalLess(key string, value string) (bool, error) { switch key { case "incident_age": age, err := time.ParseDuration(value) @@ -40,24 +40,24 @@ func (c *EscalationFilter) EvalLess(key string, value string) (bool, error) { return false, err } - return c.IncidentAge < age, nil + return e.IncidentAge < age, nil case "incident_severity": severity, err := event.GetSeverityByName(value) if err != nil { return false, err } - return c.IncidentSeverity < severity, nil + return e.IncidentSeverity < severity, nil default: return false, nil } } -func (c *EscalationFilter) EvalLike(key string, value string) (bool, error) { +func (e *EscalationFilter) EvalLike(key string, value string) (bool, error) { return false, fmt.Errorf("escalation filter doesn't support wildcard matches") } -func (c *EscalationFilter) EvalLessOrEqual(key string, value string) (bool, error) { +func (e *EscalationFilter) EvalLessOrEqual(key string, value string) (bool, error) { switch key { case "incident_age": age, err := time.ParseDuration(value) @@ -65,20 +65,20 @@ func (c *EscalationFilter) EvalLessOrEqual(key string, value string) (bool, erro return false, err } - return c.IncidentAge <= age, nil + return e.IncidentAge <= age, nil case "incident_severity": severity, err := event.GetSeverityByName(value) if err != nil { return false, err } - return c.IncidentSeverity <= severity, nil + return e.IncidentSeverity <= severity, nil default: return false, nil } } -func (c *EscalationFilter) EvalExists(key string) bool { +func (e *EscalationFilter) EvalExists(key string) bool { switch key { case "incident_age": fallthrough From 209e8c9c6aaf0efc2d5e90104eb69e8020372328 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 25 Oct 2023 11:29:02 +0200 Subject: [PATCH 06/12] EscalationFilter: add ReevaluateAfter() Co-Authored-By: Julian Brost --- internal/rule/condition.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/internal/rule/condition.go b/internal/rule/condition.go index 6c539178..9bf4db3b 100644 --- a/internal/rule/condition.go +++ b/internal/rule/condition.go @@ -3,14 +3,44 @@ package rule import ( "fmt" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/filter" + "math" "time" ) +// RetryNever indicates that an escalation condition should never be retried once it has been evaluated. +const RetryNever = time.Duration(math.MaxInt64) + type EscalationFilter struct { IncidentAge time.Duration IncidentSeverity event.Severity } +// ReevaluateAfter returns the duration after which escalationCond should be reevaluated the +// next time on the incident represented by e. +// +// escalationCond must correspond to an escalation that did not trigger on the incident +// represented by e before. If nothing in the incident changes apart from time passing by, +// the escalation is guaranteed to not trigger within the returned duration. After that +// duration, the escalation should be reevaluated, and it may or may not trigger. If anything +// else changes, for example due to an external event, the escalation must be reevaluated as +// well. +func (e *EscalationFilter) ReevaluateAfter(escalationCond filter.Filter) time.Duration { + retryAfter := RetryNever + for _, condition := range escalationCond.ExtractConditions() { + if condition.Column() == "incident_age" { + v, err := time.ParseDuration(condition.Value()) + if err == nil && v > e.IncidentAge { + // The incident age is compared with a value in the future. Once that age is + // reached, the escalation could trigger, so consider that time for reevaluation. + retryAfter = min(retryAfter, v-e.IncidentAge) + } + } + } + + return retryAfter +} + func (e *EscalationFilter) EvalEqual(key string, value string) (bool, error) { switch key { case "incident_age": From e4a78ad2901ef70c39968585a976795ba8778ac0 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 27 Oct 2023 16:59:36 +0200 Subject: [PATCH 07/12] Make all logical op names CamelCase --- internal/filter/parser.go | 16 ++++++++-------- internal/filter/parser_test.go | 8 ++++---- internal/filter/types.go | 18 +++++++++--------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/internal/filter/parser.go b/internal/filter/parser.go index 934f3298..71dfe6b4 100644 --- a/internal/filter/parser.go +++ b/internal/filter/parser.go @@ -15,7 +15,7 @@ type Parser struct { func Parse(expression string) (Filter, error) { parser := &Parser{tag: expression, length: len(expression)} if parser.length == 0 { - return &Chain{op: ALL}, nil + return &Chain{op: All}, nil } return parser.readFilter(0, "", nil) @@ -97,7 +97,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( if operator != "!" && (next == "&" || next == "|") { if operator == "&" { if len(rules) > 1 { - rules = []Filter{&Chain{op: ALL, rules: rules}} + rules = []Filter{&Chain{op: All, rules: rules}} } operator = next @@ -122,7 +122,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( } else { if negate { negate = false - rules = append(rules, &Chain{op: NONE, rules: []Filter{condition}}) + rules = append(rules, &Chain{op: None, rules: []Filter{condition}}) } else { rules = append(rules, condition) } @@ -154,7 +154,7 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( if next == "&" || next == "|" { if operator == "" || operator == "&" { if operator == "&" && len(rules) > 1 { - all := &Chain{op: ALL, rules: rules} + all := &Chain{op: All, rules: rules} rules = []Filter{all} } @@ -195,18 +195,18 @@ func (p *Parser) readFilter(nestingLevel int, operator string, rules []Filter) ( var chain Filter switch operator { case "&": - chain = &Chain{op: ALL, rules: rules} + chain = &Chain{op: All, rules: rules} case "|": - chain = &Chain{op: ANY, rules: rules} + chain = &Chain{op: Any, rules: rules} case "!": - chain = &Chain{op: NONE, rules: rules} + chain = &Chain{op: None, rules: rules} case "": if nestingLevel == 0 && rules != nil { // There is only one filter tag, no chain return rules[0], nil } - chain = &Chain{op: ALL, rules: rules} + chain = &Chain{op: All, rules: rules} default: return nil, p.parseError(operator, "") } diff --git a/internal/filter/parser_test.go b/internal/filter/parser_test.go index 82b2ff61..b23a7955 100644 --- a/internal/filter/parser_test.go +++ b/internal/filter/parser_test.go @@ -114,7 +114,7 @@ func TestFilter(t *testing.T) { rule, err = Parse("!foo") assert.Nil(t, err, "There should be no errors but got: %s", err) - assert.Equal(t, &Chain{op: NONE, rules: []Filter{NewExists("foo")}}, rule) + assert.Equal(t, &Chain{op: None, rules: []Filter{NewExists("foo")}}, rule) rule, err = Parse("foo") assert.Nil(t, err, "There should be no errors but got: %s", err) @@ -123,12 +123,12 @@ func TestFilter(t *testing.T) { rule, err = Parse("!(foo=bar|bar=foo)&(foo=bar|bar=foo)") assert.Nil(t, err, "There should be no errors but got: %s", err) - expectedChain := &Chain{op: ALL, rules: []Filter{ - &Chain{op: NONE, rules: []Filter{ + expectedChain := &Chain{op: All, rules: []Filter{ + &Chain{op: None, rules: []Filter{ &Condition{op: Equal, column: "foo", value: "bar"}, &Condition{op: Equal, column: "bar", value: "foo"}, }}, - &Chain{op: ANY, rules: []Filter{ + &Chain{op: Any, rules: []Filter{ &Condition{op: Equal, column: "foo", value: "bar"}, &Condition{op: Equal, column: "bar", value: "foo"}, }}, diff --git a/internal/filter/types.go b/internal/filter/types.go index bcc15e5f..beca7aac 100644 --- a/internal/filter/types.go +++ b/internal/filter/types.go @@ -8,12 +8,12 @@ import ( type LogicalOp string const ( - // NONE represents a filter chain type that matches when none of its ruleset matches. - NONE LogicalOp = "!" - // ALL represents a filter chain type that matches when all of its ruleset matches. - ALL LogicalOp = "&" - // ANY represents a filter chain type that matches when at least one of its ruleset matches. - ANY LogicalOp = "|" + // None represents a filter chain type that matches when none of its ruleset matches. + None LogicalOp = "!" + // All represents a filter chain type that matches when all of its ruleset matches. + All LogicalOp = "&" + // Any represents a filter chain type that matches when at least one of its ruleset matches. + Any LogicalOp = "|" ) // Chain is a filter type that wraps other filter rules and itself. @@ -27,7 +27,7 @@ type Chain struct { // Eval evaluates the filter rule sets recursively based on their operator type. func (c *Chain) Eval(filterable Filterable) (bool, error) { switch c.op { - case NONE: + case None: for _, rule := range c.rules { matched, err := rule.Eval(filterable) if err != nil { @@ -40,7 +40,7 @@ func (c *Chain) Eval(filterable Filterable) (bool, error) { } return true, nil - case ALL: + case All: for _, rule := range c.rules { matched, err := rule.Eval(filterable) if err != nil { @@ -53,7 +53,7 @@ func (c *Chain) Eval(filterable Filterable) (bool, error) { } return true, nil - case ANY: + case Any: for _, rule := range c.rules { matched, err := rule.Eval(filterable) if err != nil { From c873703ae96419df7781d135adb3e856589bdb09 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 25 Oct 2023 11:34:12 +0200 Subject: [PATCH 08/12] Trigger time based escalations Co-authored-by: Yonas Habteab --- go.mod | 2 +- internal/incident/incident.go | 202 ++++++++++++++++++++++++++------- internal/incident/incidents.go | 33 +----- internal/utils/utils.go | 20 ++++ 4 files changed, 185 insertions(+), 72 deletions(-) diff --git a/go.mod b/go.mod index 8d2f9b48..fd2e5068 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/icinga/icinga-notifications -go 1.20 +go 1.21 require ( github.com/creasty/defaults v1.7.0 diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 661e1ff7..c98cb2f5 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -35,6 +35,14 @@ type Incident struct { incidentRowID int64 + // timer calls RetriggerEscalations the next time any escalation could be reached on the incident. + // + // For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident + // is less than an hour old, timer will fire 1h after incident start, if the incident is between 1h and 2h + // old, timer will fire after 2h, and if the incident is already older than 2h, no future escalations can + // be reached solely based on the incident aging, so no more timer is necessary and timer stores nil. + timer *time.Timer + db *icingadb.DB logger *zap.SugaredLogger runtimeConfig *config.RuntimeConfig @@ -147,7 +155,12 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo } // Re-evaluate escalations based on the newly evaluated rules. - if _, err := i.evaluateEscalations(ctx, tx, ev, causedBy); err != nil { + escalations, err := i.evaluateEscalations(ev.Time) + if err != nil { + return err + } + + if err := i.triggerEscalations(ctx, tx, ev, causedBy, escalations); err != nil { return err } @@ -165,6 +178,68 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo return i.notifyContacts(ctx, ev, notifications) } +// RetriggerEscalations tries to re-evaluate the escalations and notify contacts. +func (i *Incident) RetriggerEscalations(ev *event.Event) { + i.Lock() + defer i.Unlock() + + i.runtimeConfig.RLock() + defer i.runtimeConfig.RUnlock() + + if !i.RecoveredAt.IsZero() { + // Incident is recovered in the meantime. + return + } + + if !time.Now().After(ev.Time) { + i.logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev)) + return + } + + escalations, err := i.evaluateEscalations(ev.Time) + if err != nil { + i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + return + } + + if len(escalations) == 0 { + i.logger.Debug("Reevaluated escalations, no new escalations triggered") + return + } + + var notifications []*NotificationEntry + ctx := context.Background() + err = utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { + err := ev.Sync(ctx, tx, i.db, i.Object.ID) + if err != nil { + return err + } + + if err = i.triggerEscalations(ctx, tx, ev, types.Int{}, escalations); err != nil { + return err + } + + channels := make(contactChannels) + for _, escalation := range escalations { + channels.loadEscalationRecipientsChannel(escalation, i, ev.Time) + } + + notifications, err = i.addPendingNotifications(ctx, tx, ev, channels, types.Int{}) + + return err + }) + if err != nil { + i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + } else { + if err = i.notifyContacts(ctx, ev, notifications); err != nil { + i.logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) + return + } + + i.logger.Info("Successfully reevaluated time-based escalations") + } +} + func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) (types.Int, error) { var causedByHistoryId types.Int oldSeverity := i.Severity @@ -214,6 +289,10 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, return types.Int{}, errors.New("can't insert incident closed history to the database") } + + if i.timer != nil { + i.timer.Stop() + } } i.Severity = newSeverity @@ -312,14 +391,25 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 return causedBy, nil } -// evaluateEscalations evaluates this incidents rule escalations if they aren't already. -// Returns whether a new escalation triggered or an error on database failure. -func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int) (bool, error) { +// evaluateEscalations evaluates this incidents rule escalations to be triggered if they aren't already. +// Returns the newly evaluated escalations to be triggered or an error on database failure. +func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, error) { if i.EscalationState == nil { i.EscalationState = make(map[int64]*EscalationState) } - newEscalationMatched := false + // Escalations are reevaluated now, reset any existing timer, if there might be future time-based escalations, + // this function will start a new timer. + if i.timer != nil { + i.logger.Info("Stopping reevaluate timer due to escalation evaluation") + i.timer.Stop() + i.timer = nil + } + + filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity} + + var escalations []*rule.Escalation + retryAfter := rule.RetryNever for rID := range i.Rules { r := i.runtimeConfig.Rules[rID] @@ -336,13 +426,8 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve if escalation.Condition == nil { matched = true } else { - cond := &rule.EscalationFilter{ - IncidentAge: time.Since(i.StartedAt), - IncidentSeverity: i.Severity, - } - var err error - matched, err = escalation.Condition.Eval(cond) + matched, err = escalation.Condition.Eval(filterContext) if err != nil { i.logger.Warnw( "Failed to evaluate escalation condition", zap.String("rule", r.Name), @@ -350,53 +435,84 @@ func (i *Incident) evaluateEscalations(ctx context.Context, tx *sqlx.Tx, ev *eve ) matched = false + } else if !matched { + incidentAgeFilter := filterContext.ReevaluateAfter(escalation.Condition) + retryAfter = min(retryAfter, incidentAgeFilter) } } if matched { - newEscalationMatched = true + escalations = append(escalations, escalation) + } + } + } + } - state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} - i.EscalationState[escalation.ID] = state + if retryAfter != rule.RetryNever { + // The retryAfter duration is relative to the incident duration represented by the escalation filter, + // i.e. if an incident is 15m old and an escalation rule evaluates incident_age>=1h the retryAfter would + // contain 45m (1h - incident age (15m)). Therefore, we have to use the event time instead of the incident + // start time here. + nextEvalAt := eventTime.Add(retryAfter) - i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName()) + i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) + i.timer = time.AfterFunc(retryAfter, func() { + i.logger.Info("Reevaluating escalations") - if err := i.AddEscalationTriggered(ctx, tx, state); err != nil { - i.logger.Errorw( - "Failed to upsert escalation state", zap.String("rule", r.Name), - zap.String("escalation", escalation.DisplayName()), zap.Error(err), - ) + i.RetriggerEscalations(&event.Event{ + Type: event.TypeInternal, + Time: nextEvalAt, + Message: fmt.Sprintf("Incident reached age %v", retryAfter), + }) + }) + } - return false, errors.New("failed to upsert escalation state") - } + return escalations, nil +} - history := &HistoryRow{ - Time: state.TriggeredAt, - EventID: utils.ToDBInt(ev.ID), - RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), - RuleID: utils.ToDBInt(r.ID), - Type: EscalationTriggered, - CausedByIncidentHistoryID: causedBy, - } +// triggerEscalations triggers the given escalations and generates incident history items for each of them. +// Returns an error on database failure. +func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, causedBy types.Int, escalations []*rule.Escalation) error { + for _, escalation := range escalations { + r := i.runtimeConfig.Rules[escalation.RuleID] + i.logger.Infof("Rule %q reached escalation %q", r.Name, escalation.DisplayName()) - if _, err := i.AddHistory(ctx, tx, history, false); err != nil { - i.logger.Errorw( - "Failed to insert escalation triggered incident history", zap.String("rule", r.Name), - zap.String("escalation", escalation.DisplayName()), zap.Error(err), - ) + state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} + i.EscalationState[escalation.ID] = state - return false, errors.New("failed to insert escalation triggered incident history") - } + if err := i.AddEscalationTriggered(ctx, tx, state); err != nil { + i.logger.Errorw( + "Failed to upsert escalation state", zap.String("rule", r.Name), + zap.String("escalation", escalation.DisplayName()), zap.Error(err), + ) - if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { - return false, err - } - } - } + return errors.New("failed to upsert escalation state") + } + + history := &HistoryRow{ + Time: state.TriggeredAt, + EventID: utils.ToDBInt(ev.ID), + RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), + RuleID: utils.ToDBInt(r.ID), + Type: EscalationTriggered, + CausedByIncidentHistoryID: causedBy, + } + + if _, err := i.AddHistory(ctx, tx, history, false); err != nil { + i.logger.Errorw( + "Failed to insert escalation triggered incident history", zap.String("rule", r.Name), + zap.String("escalation", escalation.DisplayName()), zap.Error(err), + ) + + return errors.New("failed to insert escalation triggered incident history") + } + + if err := i.AddRecipient(ctx, tx, escalation, ev.ID); err != nil { + return err } } - return newEscalationMatched, nil + return nil } // notifyContacts executes all the given pending notifications of the current incident. diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 640ef5a2..56d0b166 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -45,34 +45,11 @@ func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Log continue } - evaluateRulesAndEscalations := func(ctx context.Context) error { - ev := &event.Event{Time: time.Now(), Type: event.TypeEscalation} - if !incident.evaluateEscalations() { - return nil - } - - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return err - } - defer func() { _ = tx.Rollback() }() - - err = incident.notifyContacts(ctx, tx, ev, types.Int{}) - if err != nil { - return err - } - - if err = tx.Commit(); err != nil { - incident.logger.Errorw("Failed to commit database transaction", zap.Error(err)) - return err - } - - return nil - } - - if evaluateRulesAndEscalations(ctx) != nil { - continue - } + incident.RetriggerEscalations(&event.Event{ + Time: time.Now(), + Type: event.TypeInternal, + Message: "Incident reevaluation at daemon startup", + }) } return nil diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 7bf6b53b..a809bbe3 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -30,6 +30,26 @@ func BuildInsertStmtWithout(db *icingadb.DB, into interface{}, withoutColumn str ) } +// RunInTx allows running a function in a database transaction without requiring manual transaction handling. +// +// A new transaction is started on db which is then passed to fn. After fn returns, the transaction is +// committed unless an error was returned. If fn returns an error, that error is returned, otherwise an +// error is returned if a database operation fails. +func RunInTx(ctx context.Context, db *icingadb.DB, fn func(tx *sqlx.Tx) error) error { + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + err = fn(tx) + if err != nil { + return err + } + + return tx.Commit() +} + // InsertAndFetchId executes the given query and fetches the last inserted ID. func InsertAndFetchId(ctx context.Context, tx *sqlx.Tx, stmt string, args any) (int64, error) { var lastInsertId int64 From bbb9edc1930f0f07b925cb6b31fd8da94e6cf5f6 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 10 Nov 2023 10:17:30 +0100 Subject: [PATCH 09/12] Don't panic in `Exists#ExtractConditions()` & drop superfluous type assertion --- internal/filter/types.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/internal/filter/types.go b/internal/filter/types.go index beca7aac..4c1104d0 100644 --- a/internal/filter/types.go +++ b/internal/filter/types.go @@ -74,10 +74,6 @@ func (c *Chain) Eval(filterable Filterable) (bool, error) { func (c *Chain) ExtractConditions() []*Condition { var conditions []*Condition for _, rule := range c.rules { - if _, ok := rule.(*Exists); ok { - continue - } - conditions = append(conditions, rule.ExtractConditions()...) } @@ -197,7 +193,7 @@ type Exists struct { } func (e *Exists) ExtractConditions() []*Condition { - panic("filter exists doesn't support extract conditions") + return nil } func NewExists(column string) *Exists { From 584cf6fa077aa159096c504076fb317586092810 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 17 Nov 2023 09:06:30 +0100 Subject: [PATCH 10/12] Fix incorrect duration used in the dummy event message --- internal/incident/incident.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/incident/incident.go b/internal/incident/incident.go index c98cb2f5..af328941 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -462,7 +462,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, i.RetriggerEscalations(&event.Event{ Type: event.TypeInternal, Time: nextEvalAt, - Message: fmt.Sprintf("Incident reached age %v", retryAfter), + Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)), }) }) } From 95d048c24fb0f5df4c2a34ad4f0baabd25ccfe93 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 17 Nov 2023 09:34:20 +0100 Subject: [PATCH 11/12] Fix objects `name` & `url` aren't set at daemon startup --- internal/object/db_types.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/internal/object/db_types.go b/internal/object/db_types.go index 09327201..e8e6eec0 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -75,7 +75,14 @@ func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, extraTags[extraTag.Tag] = extraTag.Value } - obj := &Object{db: db, ID: id, Tags: tags, ExtraTags: extraTags} + obj := &Object{ + db: db, + ID: id, + Name: objectRow.Name, + URL: objectRow.URL.String, + Tags: tags, + ExtraTags: extraTags, + } cache[id.String()] = obj return obj, nil From f45fe6fa18e2c834ce5a082351acb1f106fa3a4f Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 17 Nov 2023 09:34:59 +0100 Subject: [PATCH 12/12] Set `severity` field of the dummy events Otherwise, the notification subject is looking odd, since this field is used to build the notification messages. --- internal/incident/incident.go | 17 +++++++++-------- internal/incident/incidents.go | 7 ++++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/internal/incident/incident.go b/internal/incident/incident.go index af328941..c708030a 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -155,7 +155,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event, created bo } // Re-evaluate escalations based on the newly evaluated rules. - escalations, err := i.evaluateEscalations(ev.Time) + escalations, err := i.evaluateEscalations(ev) if err != nil { return err } @@ -196,7 +196,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return } - escalations, err := i.evaluateEscalations(ev.Time) + escalations, err := i.evaluateEscalations(ev) if err != nil { i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) return @@ -393,7 +393,7 @@ func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64 // evaluateEscalations evaluates this incidents rule escalations to be triggered if they aren't already. // Returns the newly evaluated escalations to be triggered or an error on database failure. -func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, error) { +func (i *Incident) evaluateEscalations(e *event.Event) ([]*rule.Escalation, error) { if i.EscalationState == nil { i.EscalationState = make(map[int64]*EscalationState) } @@ -406,7 +406,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, i.timer = nil } - filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity} + filterContext := &rule.EscalationFilter{IncidentAge: e.Time.Sub(i.StartedAt), IncidentSeverity: i.Severity} var escalations []*rule.Escalation retryAfter := rule.RetryNever @@ -453,16 +453,17 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, // i.e. if an incident is 15m old and an escalation rule evaluates incident_age>=1h the retryAfter would // contain 45m (1h - incident age (15m)). Therefore, we have to use the event time instead of the incident // start time here. - nextEvalAt := eventTime.Add(retryAfter) + nextEvalAt := e.Time.Add(retryAfter) i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) i.timer = time.AfterFunc(retryAfter, func() { i.logger.Info("Reevaluating escalations") i.RetriggerEscalations(&event.Event{ - Type: event.TypeInternal, - Time: nextEvalAt, - Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)), + Type: event.TypeInternal, + Time: nextEvalAt, + Severity: e.Severity, + Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)), }) }) } diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 56d0b166..02721765 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -46,9 +46,10 @@ func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Log } incident.RetriggerEscalations(&event.Event{ - Time: time.Now(), - Type: event.TypeInternal, - Message: "Incident reevaluation at daemon startup", + Time: time.Now(), + Type: event.TypeInternal, + Severity: incident.Severity, + Message: "Incident reevaluation at daemon startup", }) }