Skip to content

Commit

Permalink
Restore active incidents from DB gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Nov 21, 2023
1 parent c5ac8b0 commit eef0004
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 188 deletions.
48 changes: 0 additions & 48 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,11 @@
package incident

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
)

type IncidentRow struct {
ID int64 `db:"id"`
ObjectID types.Binary `db:"object_id"`
StartedAt types.UnixMilli `db:"started_at"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
Severity event.Severity `db:"severity"`
}

// TableName implements the contracts.TableNamer interface.
func (i *IncidentRow) TableName() string {
return "incident"
}

// Upsert implements the contracts.Upserter interface.
func (i *IncidentRow) Upsert() interface{} {
return &struct {
Severity event.Severity `db:"severity"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
}{Severity: i.Severity, RecoveredAt: i.RecoveredAt}
}

// Sync synchronizes incidents to the database.
// Fetches the last inserted incident id and modifies this incident's id.
// Returns an error on database failure.
func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
if upsert {
stmt, _ := db.BuildUpsertStmt(i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
if err != nil {
return err
}

i.ID = incidentId
}

return nil
}

// EventRow represents a single incident event database entry.
type EventRow struct {
IncidentID int64 `db:"incident_id"`
Expand Down
29 changes: 15 additions & 14 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ type ruleID = int64
type escalationID = int64

type Incident struct {
Object *object.Object
StartedAt time.Time
RecoveredAt time.Time
Severity event.Severity
Id int64 `db:"id"`
ObjectID types.Binary `db:"object_id"`
StartedAt types.UnixMilli `db:"started_at"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
Severity event.Severity `db:"severity"`

Object *object.Object `db:"-"`

EscalationState map[escalationID]*EscalationState
Rules map[ruleID]struct{}
Recipients map[recipient.Key]*RecipientState

incidentRowID int64

// timer calls RetriggerEscalations the next time any escalation could be reached on the incident.
//
// For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident
Expand Down Expand Up @@ -69,11 +70,11 @@ func (i *Incident) ObjectDisplayName() string {
}

func (i *Incident) String() string {
return fmt.Sprintf("#%d", i.incidentRowID)
return fmt.Sprintf("#%d", i.ID())
}

func (i *Incident) ID() int64 {
return i.incidentRowID
return i.Id
}

func (i *Incident) HasManager() bool {
Expand Down Expand Up @@ -186,7 +187,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

if !i.RecoveredAt.IsZero() {
if !i.RecoveredAt.Time().IsZero() {
// Incident is recovered in the meantime.
return
}
Expand Down Expand Up @@ -272,14 +273,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
causedByHistoryId = historyId

if newSeverity == event.SeverityOK {
i.RecoveredAt = time.Now()
i.RecoveredAt = types.UnixMilli(time.Now())
i.logger.Info("All sources recovered, closing incident")

RemoveCurrent(i.Object)

history := &HistoryRow{
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(i.RecoveredAt),
Time: i.RecoveredAt,
Type: Closed,
}

Expand All @@ -306,7 +307,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
}

func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error {
i.StartedAt = ev.Time
i.StartedAt = types.UnixMilli(ev.Time)
i.Severity = ev.Severity
if err := i.Sync(ctx, tx); err != nil {
i.logger.Errorw("Can't insert incident to the database", zap.Error(err))
Expand Down Expand Up @@ -406,7 +407,7 @@ func (i *Incident) evaluateEscalations(e *event.Event) ([]*rule.Escalation, erro
i.timer = nil
}

filterContext := &rule.EscalationFilter{IncidentAge: e.Time.Sub(i.StartedAt), IncidentSeverity: i.Severity}
filterContext := &rule.EscalationFilter{IncidentAge: e.Time.Sub(i.StartedAt.Time()), IncidentSeverity: i.Severity}

var escalations []*rule.Escalation
retryAfter := rule.RetryNever
Expand Down Expand Up @@ -463,7 +464,7 @@ func (i *Incident) evaluateEscalations(e *event.Event) ([]*rule.Escalation, erro
Type: event.TypeInternal,
Time: nextEvalAt,
Severity: e.Severity,
Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)),
Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())),
})
})
}
Expand Down
78 changes: 37 additions & 41 deletions internal/incident/incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package incident

import (
"context"
"database/sql"
"errors"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/object"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"go.uber.org/zap"
"sync"
"time"
Expand All @@ -25,32 +23,53 @@ var (
func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig) error {
logger.Info("Loading all active incidents from database")

var objectIDs []types.Binary
err := db.SelectContext(ctx, &objectIDs, `SELECT object_id FROM incident WHERE "recovered_at" IS NULL`)
query := `
SELECT incident.id, started_at, severity, object_id, object.id, object.source_id, object.name, object.url
FROM incident
INNER JOIN object on object.id = incident.object_id
WHERE "recovered_at" IS NULL`

rows, err := db.QueryxContext(ctx, query)
if err != nil {
logger.Errorw("Failed to load active incidents from database", zap.Error(err))

return errors.New("failed to fetch open incidents")
}
defer func() { _ = rows.Close() }()

for _, objectID := range objectIDs {
obj, err := object.LoadFromDB(ctx, db, objectID)
currentIncidentsMu.Lock()
defer currentIncidentsMu.Unlock()

ev := &event.Event{Tags: make(map[string]string), ExtraTags: make(map[string]string)}
for rows.Next() {
i := NewIncident(db, object.NewObject(db, ev), runtimeConfig, nil)
err := rows.Scan(&i.Id, &i.StartedAt, &i.Severity, &i.ObjectID, &i.Object.ID, &i.Object.SourceID, &i.Object.Name, &i.Object.URL)
if err != nil {
logger.Errorw("Failed to retrieve incident object from database", zap.Error(err))
return err
}

i.logger = logger.With(zap.String("object", i.ObjectDisplayName()), zap.String("incident", i.String()))

if err := i.Object.LoadMetadata(ctx); err != nil {
i.logger.Errorw("Failed to retrieve incident object meta data from database", zap.Error(err))
continue
}

incident, _, err := GetCurrent(ctx, db, obj, logger, runtimeConfig, false)
if err != nil {
if i.restoreEscalationsState(ctx) != nil {
// Error is already logged within the function
continue
}

incident.RetriggerEscalations(&event.Event{
object.Cache(i.Object)

i.RetriggerEscalations(&event.Event{
Time: time.Now(),
Type: event.TypeInternal,
Severity: incident.Severity,
Severity: i.Severity,
Message: "Incident reevaluation at daemon startup",
})

currentIncidents[i.Object] = i
}

return nil
Expand All @@ -66,37 +85,14 @@ func GetCurrent(
created := false
currentIncident := currentIncidents[obj]

if currentIncident == nil {
ir := &IncidentRow{}
incidentLogger := logger.With(zap.String("object", obj.DisplayName()))
incident := NewIncident(db, obj, runtimeConfig, incidentLogger)

err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
logger.Errorw("Failed to load incident from database", zap.String("object", obj.DisplayName()), zap.Error(err))

return nil, false, errors.New("failed to load incident from database")
} else if err == nil {
incident.incidentRowID = ir.ID
incident.StartedAt = ir.StartedAt.Time()
incident.Severity = ir.Severity
incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String()))
if currentIncident == nil && create {
created = true

if err := incident.restoreEscalationsState(ctx); err != nil {
return nil, false, err
}

currentIncident = incident
}

if create && currentIncident == nil {
created = true
currentIncident = incident
}
incidentLogger := logger.With(zap.String("object", obj.DisplayName()))
currentIncident = NewIncident(db, obj, runtimeConfig, incidentLogger)
currentIncident.ObjectID = obj.ID

if currentIncident != nil {
currentIncidents[obj] = currentIncident
}
currentIncidents[obj] = currentIncident
}

if !created && currentIncident != nil {
Expand Down Expand Up @@ -129,7 +125,7 @@ func GetCurrentIncidents() map[int64]*Incident {

m := make(map[int64]*Incident)
for _, incident := range currentIncidents {
m[incident.incidentRowID] = incident
m[incident.ID()] = incident
}
return m
}
51 changes: 33 additions & 18 deletions internal/incident/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package incident
import (
"context"
"errors"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/rule"
Expand All @@ -13,30 +14,44 @@ import (
"time"
)

// TableName implements the contracts.TableNamer interface.
func (i *Incident) TableName() string {
return "incident"
}

// Upsert implements the contracts.Upserter interface.
func (i *Incident) Upsert() interface{} {
return &struct {
Severity event.Severity `db:"severity"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
}{Severity: i.Severity, RecoveredAt: i.RecoveredAt}
}

// Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database.
// Before syncing any incident related database entries, this method should be called at least once.
// Returns an error on db failure.
func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error {
incidentRow := &IncidentRow{
ID: i.incidentRowID,
ObjectID: i.Object.ID,
StartedAt: types.UnixMilli(i.StartedAt),
RecoveredAt: types.UnixMilli(i.RecoveredAt),
Severity: i.Severity,
}
if i.ID() != 0 {
stmt, _ := i.db.BuildUpsertStmt(i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
stmt := utils.BuildInsertStmtWithout(i.db, i, "id")
incidentId, err := utils.InsertAndFetchId(ctx, tx, stmt, i)
if err != nil {
return err
}

err := incidentRow.Sync(ctx, tx, i.db, i.incidentRowID != 0)
if err != nil {
return err
i.Id = incidentId
}

i.incidentRowID = incidentRow.ID

return nil
}

func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) {
historyRow.IncidentID = i.incidentRowID
historyRow.IncidentID = i.ID()

stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id")
if fetchId {
Expand All @@ -57,7 +72,7 @@ func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *Hist
}

func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error {
state.IncidentID = i.incidentRowID
state.IncidentID = i.ID()

stmt, _ := i.db.BuildUpsertStmt(state)
_, err := tx.NamedExecContext(ctx, stmt, state)
Expand All @@ -67,7 +82,7 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat

// AddEvent Inserts incident history record to the database and returns an error on db failure.
func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error {
ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID}
ie := &EventRow{IncidentID: i.ID(), EventID: ev.ID}
stmt, _ := i.db.BuildInsertStmt(ie)
_, err := tx.NamedExecContext(ctx, stmt, ie)

Expand All @@ -84,7 +99,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru

for _, escalationRecipient := range escalation.Recipients {
r := escalationRecipient.Recipient
cr := &ContactRow{IncidentID: i.incidentRowID, Role: newRole}
cr := &ContactRow{IncidentID: i.ID(), Role: newRole}

recipientKey := recipient.ToKey(r)
cr.Key = recipientKey
Expand All @@ -100,7 +115,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru
i.logger.Infof("Contact %q role changed from %s to %s", r, state.Role.String(), newRole.String())

hr := &HistoryRow{
IncidentID: i.incidentRowID,
IncidentID: i.ID(),
EventID: utils.ToDBInt(eventId),
Key: cr.Key,
Time: types.UnixMilli(time.Now()),
Expand Down Expand Up @@ -140,7 +155,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru
// AddRuleMatched syncs the given *rule.Rule to the database.
// Returns an error on database failure.
func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error {
rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID}
rr := &RuleRow{IncidentID: i.ID(), RuleID: r.ID}
stmt, _ := i.db.BuildUpsertStmt(rr)
_, err := tx.NamedExecContext(ctx, stmt, rr)

Expand Down
Loading

0 comments on commit eef0004

Please sign in to comment.