Skip to content

Commit 74cab07

Browse files
committed
Use sqlx.Tx for event processing
1 parent 6dc9d8a commit 74cab07

File tree

7 files changed

+155
-155
lines changed

7 files changed

+155
-155
lines changed

internal/event/event.go

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"bytes"
55
"fmt"
66
"github.com/icinga/icinga-notifications/internal/utils"
7-
"github.com/icinga/icingadb/pkg/icingadb"
87
"github.com/icinga/icingadb/pkg/types"
98
"time"
109
)
@@ -73,30 +72,6 @@ func (e *Event) FullString() string {
7372
return b.String()
7473
}
7574

76-
// Sync transforms this event to *event.EventRow and calls its sync method.
77-
func (e *Event) Sync(db *icingadb.DB, objectId types.Binary) error {
78-
if e.ID != 0 {
79-
return nil
80-
}
81-
82-
eventRow := &EventRow{
83-
Time: types.UnixMilli(e.Time),
84-
SourceID: e.SourceId,
85-
ObjectID: objectId,
86-
Type: utils.ToDBString(e.Type),
87-
Severity: e.Severity,
88-
Username: utils.ToDBString(e.Username),
89-
Message: utils.ToDBString(e.Message),
90-
}
91-
92-
err := eventRow.Sync(db)
93-
if err == nil {
94-
e.ID = eventRow.ID
95-
}
96-
97-
return err
98-
}
99-
10075
// EventRow represents a single event database row and isn't an in-memory representation of an event.
10176
type EventRow struct {
10277
ID int64 `db:"id"`
@@ -114,15 +89,14 @@ func (er *EventRow) TableName() string {
11489
return "event"
11590
}
11691

117-
// Sync synchronizes this types data to the database.
118-
// Returns an error when any of the database operation fails.
119-
func (er *EventRow) Sync(db *icingadb.DB) error {
120-
eventId, err := utils.InsertAndFetchId(db, utils.BuildInsertStmtWithout(db, er, "id"), er)
121-
if err != nil {
122-
return err
92+
func NewEventRow(e Event, objectId types.Binary) *EventRow {
93+
return &EventRow{
94+
Time: types.UnixMilli(e.Time),
95+
SourceID: e.SourceId,
96+
ObjectID: objectId,
97+
Type: utils.ToDBString(e.Type),
98+
Severity: e.Severity,
99+
Username: utils.ToDBString(e.Username),
100+
Message: utils.ToDBString(e.Message),
123101
}
124-
125-
er.ID = eventId
126-
127-
return nil
128102
}

internal/incident/db_types.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/icinga/icinga-notifications/internal/utils"
88
"github.com/icinga/icingadb/pkg/icingadb"
99
"github.com/icinga/icingadb/pkg/types"
10+
"github.com/jmoiron/sqlx"
1011
)
1112

1213
type IncidentRow struct {
@@ -32,15 +33,15 @@ func (i *IncidentRow) Upsert() interface{} {
3233
// Sync synchronizes incidents to the database.
3334
// Fetches the last inserted incident id and modifies this incident's id.
3435
// Returns an error on database failure.
35-
func (i *IncidentRow) Sync(db *icingadb.DB, upsert bool) error {
36+
func (i *IncidentRow) Sync(tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
3637
if upsert {
3738
stmt, _ := db.BuildUpsertStmt(i)
38-
_, err := db.NamedExec(stmt, i)
39+
_, err := tx.NamedExec(stmt, i)
3940
if err != nil {
4041
return fmt.Errorf("failed to upsert incident: %s", err)
4142
}
4243
} else {
43-
incidentId, err := utils.InsertAndFetchId(db, utils.BuildInsertStmtWithout(db, i, "id"), i)
44+
incidentId, err := utils.InsertAndFetchId(tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
4445
if err != nil {
4546
return err
4647
}

internal/incident/history.go

Lines changed: 23 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@ import (
77
"github.com/icinga/icinga-notifications/internal/rule"
88
"github.com/icinga/icinga-notifications/internal/utils"
99
"github.com/icinga/icingadb/pkg/types"
10+
"github.com/jmoiron/sqlx"
1011
"time"
1112
)
1213

1314
// Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database.
1415
// Before syncing any incident related database entries, this method should be called at least once.
1516
// Returns an error on db failure.
16-
func (i *Incident) Sync() error {
17+
func (i *Incident) Sync(tx *sqlx.Tx) error {
1718
incidentRow := &IncidentRow{
1819
ID: i.incidentRowID,
1920
ObjectID: i.Object.ID,
@@ -22,7 +23,7 @@ func (i *Incident) Sync() error {
2223
Severity: i.Severity(),
2324
}
2425

25-
err := incidentRow.Sync(i.db, i.incidentRowID != 0)
26+
err := incidentRow.Sync(tx, i.db, i.incidentRowID != 0)
2627
if err != nil {
2728
return err
2829
}
@@ -32,54 +33,51 @@ func (i *Incident) Sync() error {
3233
return nil
3334
}
3435

35-
func (i *Incident) AddHistory(historyRow *HistoryRow, fetchId bool) (types.Int, error) {
36+
func (i *Incident) AddHistory(tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) {
3637
historyRow.IncidentID = i.incidentRowID
3738

3839
stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id")
3940
if fetchId {
40-
historyId, err := utils.InsertAndFetchId(i.db, stmt, historyRow)
41+
historyId, err := utils.InsertAndFetchId(tx, stmt, historyRow)
4142
if err != nil {
4243
return types.Int{}, err
4344
}
4445

4546
return utils.ToDBInt(historyId), nil
4647
} else {
47-
_, err := i.db.NamedExec(stmt, historyRow)
48+
_, err := tx.NamedExec(stmt, historyRow)
4849
if err != nil {
49-
return types.Int{}, fmt.Errorf("failed to insert incident history: %w", err)
50+
return types.Int{}, err
5051
}
5152
}
5253

5354
return types.Int{}, nil
5455
}
5556

56-
func (i *Incident) AddEscalationTriggered(state *EscalationState, hr *HistoryRow) (types.Int, error) {
57+
func (i *Incident) AddEscalationTriggered(tx *sqlx.Tx, state *EscalationState, hr *HistoryRow) (types.Int, error) {
5758
state.IncidentID = i.incidentRowID
5859

5960
stmt, _ := i.db.BuildUpsertStmt(state)
60-
_, err := i.db.NamedExec(stmt, state)
61+
_, err := tx.NamedExec(stmt, state)
6162
if err != nil {
62-
return types.Int{}, fmt.Errorf("failed to insert incident rule escalation state: %w", err)
63+
return types.Int{}, err
6364
}
6465

65-
return i.AddHistory(hr, true)
66+
return i.AddHistory(tx, hr, true)
6667
}
6768

6869
// AddEvent Inserts incident history record to the database and returns an error on db failure.
69-
func (i *Incident) AddEvent(ev *event.Event) error {
70+
func (i *Incident) AddEvent(tx *sqlx.Tx, ev *event.Event) error {
7071
ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID}
7172
stmt, _ := i.db.BuildInsertStmt(ie)
72-
_, err := i.db.NamedExec(stmt, ie)
73-
if err != nil {
74-
return fmt.Errorf("failed to insert incident event: %w", err)
75-
}
73+
_, err := tx.NamedExec(stmt, ie)
7674

77-
return nil
75+
return err
7876
}
7977

8078
// AddRecipient adds recipient from the given *rule.Escalation to this incident.
8179
// Syncs also all the recipients with the database and returns an error on db failure.
82-
func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) error {
80+
func (i *Incident) AddRecipient(tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error {
8381
newRole := RoleRecipient
8482
if i.HasManager() {
8583
newRole = RoleSubscriber
@@ -112,7 +110,7 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro
112110
OldRecipientRole: oldRole,
113111
}
114112

115-
_, err := i.AddHistory(hr, false)
113+
_, err := i.AddHistory(tx, hr, false)
116114
if err != nil {
117115
return err
118116
}
@@ -121,7 +119,7 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro
121119
}
122120

123121
stmt, _ := i.db.BuildUpsertStmt(cr)
124-
_, err := i.db.NamedExec(stmt, cr)
122+
_, err := tx.NamedExec(stmt, cr)
125123
if err != nil {
126124
return fmt.Errorf("failed to upsert incident contact %s: %w", r, err)
127125
}
@@ -132,18 +130,18 @@ func (i *Incident) AddRecipient(escalation *rule.Escalation, eventId int64) erro
132130

133131
// AddRuleMatchedHistory syncs the given *rule.Rule and history entry to the database.
134132
// Returns an error on database failure.
135-
func (i *Incident) AddRuleMatchedHistory(r *rule.Rule, hr *HistoryRow) (types.Int, error) {
133+
func (i *Incident) AddRuleMatchedHistory(tx *sqlx.Tx, r *rule.Rule, hr *HistoryRow) (types.Int, error) {
136134
rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID}
137135
stmt, _ := i.db.BuildUpsertStmt(rr)
138-
_, err := i.db.NamedExec(stmt, rr)
136+
_, err := tx.NamedExec(stmt, rr)
139137
if err != nil {
140-
return types.Int{}, fmt.Errorf("failed to insert incident rule: %w", err)
138+
return types.Int{}, err
141139
}
142140

143-
return i.AddHistory(hr, true)
141+
return i.AddHistory(tx, hr, true)
144142
}
145143

146-
func (i *Incident) AddSourceSeverity(severity event.Severity, sourceID int64) error {
144+
func (i *Incident) AddSourceSeverity(tx *sqlx.Tx, severity event.Severity, sourceID int64) error {
147145
i.SeverityBySource[sourceID] = severity
148146

149147
sourceSeverity := &SourceSeverity{
@@ -153,7 +151,7 @@ func (i *Incident) AddSourceSeverity(severity event.Severity, sourceID int64) er
153151
}
154152

155153
stmt, _ := i.db.BuildUpsertStmt(sourceSeverity)
156-
_, err := i.db.NamedExec(stmt, sourceSeverity)
154+
_, err := tx.NamedExec(stmt, sourceSeverity)
157155

158156
return err
159157
}

0 commit comments

Comments
 (0)