Skip to content

Commit 399d3c2

Browse files
committed
Load all open incidents from DB on daemon startup
1 parent afa8711 commit 399d3c2

File tree

5 files changed

+202
-19
lines changed

5 files changed

+202
-19
lines changed

cmd/icinga-notifications-daemon/main.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"flag"
66
"fmt"
77
"github.com/icinga/icinga-notifications/internal/config"
8+
"github.com/icinga/icinga-notifications/internal/incident"
89
"github.com/icinga/icinga-notifications/internal/listener"
910
"github.com/icinga/icingadb/pkg/logging"
1011
"github.com/icinga/icingadb/pkg/utils"
@@ -62,6 +63,11 @@ func main() {
6263

6364
go runtimeConfig.PeriodicUpdates(context.TODO(), 1*time.Second)
6465

66+
err = incident.LoadOpenIncidents(context.TODO(), db, logs.GetChildLogger("incident"), runtimeConfig, conf)
67+
if err != nil {
68+
logger.Fatalw("Can't load incidents from database", zap.Error(err))
69+
}
70+
6571
if err := listener.NewListener(db, conf, runtimeConfig, logs).Run(); err != nil {
6672
panic(err)
6773
}

internal/incident/incident.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,29 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
572572
return nil
573573
}
574574

575+
// ReloadRecipients reloads the current incident recipients from the database.
576+
// Returns error on database failure.
577+
func (i *Incident) ReloadRecipients(ctx context.Context, tx *sqlx.Tx) error {
578+
i.Lock()
579+
defer i.Unlock()
580+
581+
contact := &ContactRow{}
582+
var contacts []*ContactRow
583+
err := tx.SelectContext(ctx, &contacts, tx.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID())
584+
if err != nil {
585+
return fmt.Errorf("failed to fetch incident recipients: %w", err)
586+
}
587+
588+
recipients := make(map[recipient.Key]*RecipientState)
589+
for _, contact := range contacts {
590+
recipients[contact.Key] = &RecipientState{Role: contact.Role}
591+
}
592+
593+
i.Recipients = recipients
594+
595+
return nil
596+
}
597+
575598
type EscalationState struct {
576599
IncidentID int64 `db:"incident_id"`
577600
RuleEscalationID int64 `db:"rule_escalation_id"`

internal/incident/incidents.go

Lines changed: 122 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/icinga/icinga-notifications/internal/recipient"
1111
"github.com/icinga/icingadb/pkg/icingadb"
1212
"github.com/icinga/icingadb/pkg/logging"
13+
"github.com/jmoiron/sqlx"
1314
"golang.org/x/sync/errgroup"
1415
"sync"
1516
)
@@ -19,8 +20,123 @@ var (
1920
currentIncidentsMu sync.Mutex
2021
)
2122

23+
func LoadOpenIncidents(
24+
ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile,
25+
) error {
26+
tx, err := db.BeginTxx(ctx, nil)
27+
if err != nil {
28+
return err
29+
}
30+
defer func() { _ = tx.Rollback() }()
31+
32+
var incidentRows []*IncidentRow
33+
err = tx.SelectContext(ctx, &incidentRows, db.BuildSelectStmt(&IncidentRow{}, &IncidentRow{})+` WHERE "recovered_at" IS NULL`)
34+
if err != nil {
35+
return fmt.Errorf("failed to fetch open incidents: %w", err)
36+
}
37+
38+
incidents := make(map[*object.Object]*Incident)
39+
g, childCtx := errgroup.WithContext(ctx)
40+
for _, incidentRow := range incidentRows {
41+
incident := &Incident{
42+
db: db,
43+
logger: logger,
44+
runtimeConfig: runtimeConfig,
45+
StartedAt: incidentRow.StartedAt.Time(),
46+
incidentRowID: incidentRow.ID,
47+
configFile: configFile,
48+
SeverityBySource: map[int64]event.Severity{},
49+
EscalationState: map[escalationID]*EscalationState{},
50+
Rules: map[ruleID]struct{}{},
51+
Recipients: map[recipient.Key]*RecipientState{},
52+
}
53+
54+
obj, err := object.FromDB(ctx, db, tx, incidentRow.ObjectID)
55+
if err != nil {
56+
return err
57+
}
58+
59+
incident.Object = obj
60+
61+
g.Go(func() error {
62+
sourceSeverity := &SourceSeverity{IncidentID: incidentRow.ID}
63+
var sources []SourceSeverity
64+
err = db.SelectContext(
65+
childCtx, &sources,
66+
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
67+
incidentRow.ID, event.SeverityOK,
68+
)
69+
if err != nil {
70+
return fmt.Errorf("failed to fetch incident sources Severity: %w", err)
71+
}
72+
73+
for _, source := range sources {
74+
incident.SeverityBySource[source.SourceID] = source.Severity
75+
}
76+
77+
return childCtx.Err()
78+
})
79+
80+
g.Go(func() error {
81+
state := &EscalationState{}
82+
var states []*EscalationState
83+
err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), incidentRow.ID)
84+
if err != nil {
85+
return fmt.Errorf("failed to fetch incident rule escalation state: %w", err)
86+
}
87+
88+
for _, state := range states {
89+
incident.EscalationState[state.RuleEscalationID] = state
90+
}
91+
92+
return childCtx.Err()
93+
})
94+
95+
g.Go(func() error {
96+
return incident.ReloadRecipients(childCtx, tx)
97+
})
98+
99+
g.Go(func() error {
100+
incident.Lock()
101+
incident.runtimeConfig.RLock()
102+
defer func() {
103+
defer incident.runtimeConfig.RUnlock()
104+
defer incident.Unlock()
105+
}()
106+
107+
err = incident.evaluateRules(childCtx, tx, 0)
108+
if err != nil {
109+
return err
110+
}
111+
112+
err = incident.evaluateEscalations()
113+
if err != nil {
114+
return err
115+
}
116+
117+
return childCtx.Err()
118+
})
119+
120+
incidents[obj] = incident
121+
}
122+
123+
if err = g.Wait(); err != nil {
124+
return err
125+
}
126+
if err = tx.Commit(); err != nil {
127+
return err
128+
}
129+
130+
currentIncidentsMu.Lock()
131+
defer currentIncidentsMu.Unlock()
132+
133+
currentIncidents = incidents
134+
135+
return nil
136+
}
137+
22138
func GetCurrent(
23-
ctx context.Context, db *icingadb.DB, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, create bool,
139+
ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, obj *object.Object, logger *logging.Logger, runtimeConfig *config.RuntimeConfig, configFile *config.ConfigFile, create bool,
24140
) (*Incident, bool, error) {
25141
currentIncidentsMu.Lock()
26142
defer currentIncidentsMu.Unlock()
@@ -35,7 +151,7 @@ func GetCurrent(
35151
incident.EscalationState = make(map[escalationID]*EscalationState)
36152
incident.Recipients = make(map[recipient.Key]*RecipientState)
37153

38-
err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
154+
err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
39155
if err != nil && err != sql.ErrNoRows {
40156
return nil, false, fmt.Errorf("incident query failed with: %w", err)
41157
} else if err == nil {
@@ -46,7 +162,7 @@ func GetCurrent(
46162
g.Go(func() error {
47163
sourceSeverity := &SourceSeverity{IncidentID: ir.ID}
48164
var sources []SourceSeverity
49-
err := db.SelectContext(
165+
err := tx.SelectContext(
50166
childCtx, &sources,
51167
db.Rebind(db.BuildSelectStmt(sourceSeverity, sourceSeverity)+` WHERE "incident_id" = ? AND "severity" != ?`),
52168
ir.ID, event.SeverityOK,
@@ -65,7 +181,7 @@ func GetCurrent(
65181
g.Go(func() error {
66182
state := &EscalationState{}
67183
var states []*EscalationState
68-
err = db.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
184+
err = tx.SelectContext(childCtx, &states, db.Rebind(db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), ir.ID)
69185
if err != nil {
70186
return fmt.Errorf("failed to fetch incident rule escalation state: %w", err)
71187
}
@@ -95,22 +211,10 @@ func GetCurrent(
95211
}
96212

97213
if !created && currentIncident != nil {
98-
currentIncident.Lock()
99-
defer currentIncident.Unlock()
100-
101-
contact := &ContactRow{}
102-
var contacts []*ContactRow
103-
err := db.SelectContext(ctx, &contacts, db.Rebind(db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), currentIncident.ID())
214+
err := currentIncident.ReloadRecipients(ctx, tx)
104215
if err != nil {
105-
return nil, false, fmt.Errorf("failed to fetch incident recipients: %w", err)
216+
return nil, false, err
106217
}
107-
108-
recipients := make(map[recipient.Key]*RecipientState)
109-
for _, contact := range contacts {
110-
recipients[contact.Key] = &RecipientState{Role: contact.Role}
111-
}
112-
113-
currentIncident.Recipients = recipients
114218
}
115219

116220
return currentIncident, created, nil

internal/listener/listener.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) {
106106
}
107107

108108
createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK
109-
currentIncident, created, err := incident.GetCurrent(ctx, l.db, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident)
109+
currentIncident, created, err := incident.GetCurrent(ctx, l.db, tx, obj, l.logs.GetChildLogger("incident"), l.runtimeConfig, l.configFile, createIncident)
110110
if err != nil {
111111
w.WriteHeader(http.StatusInternalServerError)
112112
_, _ = fmt.Fprintln(w, err)

internal/object/db_types.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
package object
22

33
import (
4+
"context"
5+
"fmt"
6+
"github.com/icinga/icingadb/pkg/icingadb"
47
"github.com/icinga/icingadb/pkg/types"
8+
"github.com/jmoiron/sqlx"
59
)
610

711
// ExtraTagRow represents a single database object extra tag like `hostgroup/foo: null`.
@@ -27,3 +31,49 @@ type ObjectRow struct {
2731
func (d *ObjectRow) TableName() string {
2832
return "object"
2933
}
34+
35+
func FromDB(ctx context.Context, db *icingadb.DB, tx *sqlx.Tx, id types.Binary) (*Object, error) {
36+
objectRow := &ObjectRow{ID: id}
37+
err := tx.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow)
38+
if err != nil {
39+
return nil, fmt.Errorf("failed to fetch object: %w", err)
40+
}
41+
42+
tags := map[string]string{"host": objectRow.Host}
43+
if objectRow.Service.Valid {
44+
tags["service"] = objectRow.Service.String
45+
}
46+
47+
metadata := make(map[int64]*SourceMetadata)
48+
var sourceMetas []*SourceMetadata
49+
err = tx.SelectContext(ctx, &sourceMetas, db.Rebind(db.BuildSelectStmt(&SourceMetadata{}, &SourceMetadata{})+` WHERE "object_id" = ?`), id)
50+
if err != nil {
51+
return nil, fmt.Errorf("failed to fetch source object: %w", err)
52+
}
53+
54+
var extraTags []*ExtraTagRow
55+
err = tx.SelectContext(
56+
ctx, &extraTags,
57+
db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id,
58+
)
59+
if err != nil {
60+
return nil, fmt.Errorf("failed to fetch object extra tags: %w", err)
61+
}
62+
63+
for _, sourceMeta := range sourceMetas {
64+
sourceMeta.ExtraTags = map[string]string{}
65+
metadata[sourceMeta.SourceId] = sourceMeta
66+
}
67+
68+
for _, extraTag := range extraTags {
69+
sourceMeta, ok := metadata[extraTag.SourceId]
70+
if ok {
71+
sourceMeta.ExtraTags[extraTag.Tag] = extraTag.Value
72+
}
73+
}
74+
75+
obj := &Object{db: db, ID: id, Tags: tags, Metadata: metadata}
76+
obj.UpdateCache()
77+
78+
return obj, nil
79+
}

0 commit comments

Comments
 (0)