Skip to content

Commit

Permalink
Restore active incidents from DB gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Apr 15, 2024
1 parent 97e0114 commit 5e25990
Show file tree
Hide file tree
Showing 6 changed files with 285 additions and 257 deletions.
48 changes: 0 additions & 48 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,11 @@
package incident

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
)

type IncidentRow struct {
ID int64 `db:"id"`
ObjectID types.Binary `db:"object_id"`
StartedAt types.UnixMilli `db:"started_at"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
Severity event.Severity `db:"severity"`
}

// TableName implements the contracts.TableNamer interface.
func (i *IncidentRow) TableName() string {
return "incident"
}

// Upsert implements the contracts.Upserter interface.
func (i *IncidentRow) Upsert() interface{} {
return &struct {
Severity event.Severity `db:"severity"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
}{Severity: i.Severity, RecoveredAt: i.RecoveredAt}
}

// Sync synchronizes incidents to the database.
// Fetches the last inserted incident id and modifies this incident's id.
// Returns an error on database failure.
func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
if upsert {
stmt, _ := db.BuildUpsertStmt(i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
if err != nil {
return err
}

i.ID = incidentId
}

return nil
}

// EventRow represents a single incident event database entry.
type EventRow struct {
IncidentID int64 `db:"incident_id"`
Expand Down
61 changes: 15 additions & 46 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ type ruleID = int64
type escalationID = int64

type Incident struct {
Object *object.Object
StartedAt time.Time
RecoveredAt time.Time
Severity event.Severity
Id int64 `db:"id"`
ObjectID types.Binary `db:"object_id"`
StartedAt types.UnixMilli `db:"started_at"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
Severity event.Severity `db:"severity"`

Object *object.Object `db:"-"`

EscalationState map[escalationID]*EscalationState
Rules map[ruleID]struct{}
Recipients map[recipient.Key]*RecipientState

incidentRowID int64

// timer calls RetriggerEscalations the next time any escalation could be reached on the incident.
//
// For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident
Expand Down Expand Up @@ -73,11 +74,11 @@ func (i *Incident) SeverityString() string {
}

func (i *Incident) String() string {
return fmt.Sprintf("#%d", i.incidentRowID)
return fmt.Sprintf("#%d", i.ID())
}

func (i *Incident) ID() int64 {
return i.incidentRowID
return i.Id
}

func (i *Incident) HasManager() bool {
Expand Down Expand Up @@ -200,7 +201,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

if !i.RecoveredAt.IsZero() {
if !i.RecoveredAt.Time().IsZero() {
// Incident is recovered in the meantime.
return
}
Expand Down Expand Up @@ -284,14 +285,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
causedByHistoryId = historyId

if newSeverity == event.SeverityOK {
i.RecoveredAt = time.Now()
i.RecoveredAt = types.UnixMilli(time.Now())
i.logger.Info("All sources recovered, closing incident")

RemoveCurrent(i.Object)

history := &HistoryRow{
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(i.RecoveredAt),
Time: i.RecoveredAt,
Type: Closed,
}

Expand All @@ -318,7 +319,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
}

func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error {
i.StartedAt = ev.Time
i.StartedAt = types.UnixMilli(ev.Time)
i.Severity = ev.Severity
if err := i.Sync(ctx, tx); err != nil {
i.logger.Errorw("Can't insert incident to the database", zap.Error(err))
Expand Down Expand Up @@ -418,7 +419,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation,
i.timer = nil
}

filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity}
filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt.Time()), IncidentSeverity: i.Severity}

var escalations []*rule.Escalation
retryAfter := rule.RetryNever
Expand Down Expand Up @@ -474,7 +475,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation,
i.RetriggerEscalations(&event.Event{
Type: event.TypeInternal,
Time: nextEvalAt,
Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)),
Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())),
})
})
}
Expand Down Expand Up @@ -642,17 +643,6 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx,
return nil
}

// RestoreEscalationStateRules restores this incident's rules based on the given escalation states.
func (i *Incident) RestoreEscalationStateRules(states []*EscalationState) {
i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

for _, state := range states {
escalation := i.runtimeConfig.GetRuleEscalation(state.RuleEscalationID)
i.Rules[escalation.RuleID] = struct{}{}
}
}

// getRecipientsChannel returns all the configured channels of the current incident and escalation recipients.
func (i *Incident) getRecipientsChannel(t time.Time) contactChannels {
contactChs := make(contactChannels)
Expand Down Expand Up @@ -713,27 +703,6 @@ func (i *Incident) restoreRecipients(ctx context.Context) error {
return nil
}

// restoreEscalationsState restores all escalation states matching the current incident id from the database.
// Returns error on database failure.
func (i *Incident) restoreEscalationsState(ctx context.Context) error {
state := &EscalationState{}
var states []*EscalationState
err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID())
if err != nil {
i.logger.Errorw("Failed to restore incident rule escalation states", zap.Error(err))

return errors.New("failed to restore incident rule escalation states")
}

for _, state := range states {
i.EscalationState[state.RuleEscalationID] = state
}

i.RestoreEscalationStateRules(states)

return nil
}

type EscalationState struct {
IncidentID int64 `db:"incident_id"`
RuleEscalationID int64 `db:"rule_escalation_id"`
Expand Down
Loading

0 comments on commit 5e25990

Please sign in to comment.