Skip to content

Commit

Permalink
Restore active incidents from DB gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Dec 1, 2023
1 parent f4123ad commit c05571c
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 216 deletions.
48 changes: 0 additions & 48 deletions internal/incident/db_types.go
Original file line number Diff line number Diff line change
@@ -1,59 +1,11 @@
package incident

import (
"context"
"fmt"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
)

type IncidentRow struct {
ID int64 `db:"id"`
ObjectID types.Binary `db:"object_id"`
StartedAt types.UnixMilli `db:"started_at"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
Severity event.Severity `db:"severity"`
}

// TableName implements the contracts.TableNamer interface.
func (i *IncidentRow) TableName() string {
return "incident"
}

// Upsert implements the contracts.Upserter interface.
func (i *IncidentRow) Upsert() interface{} {
return &struct {
Severity event.Severity `db:"severity"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
}{Severity: i.Severity, RecoveredAt: i.RecoveredAt}
}

// Sync synchronizes incidents to the database.
// Fetches the last inserted incident id and modifies this incident's id.
// Returns an error on database failure.
func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error {
if upsert {
stmt, _ := db.BuildUpsertStmt(i)
_, err := tx.NamedExecContext(ctx, stmt, i)
if err != nil {
return fmt.Errorf("failed to upsert incident: %s", err)
}
} else {
incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i)
if err != nil {
return err
}

i.ID = incidentId
}

return nil
}

// EventRow represents a single incident event database entry.
type EventRow struct {
IncidentID int64 `db:"incident_id"`
Expand Down
29 changes: 15 additions & 14 deletions internal/incident/incident.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ type ruleID = int64
type escalationID = int64

type Incident struct {
Object *object.Object
StartedAt time.Time
RecoveredAt time.Time
Severity event.Severity
Id int64 `db:"id"`
ObjectID types.Binary `db:"object_id"`
StartedAt types.UnixMilli `db:"started_at"`
RecoveredAt types.UnixMilli `db:"recovered_at"`
Severity event.Severity `db:"severity"`

Object *object.Object `db:"-"`

EscalationState map[escalationID]*EscalationState
Rules map[ruleID]struct{}
Recipients map[recipient.Key]*RecipientState

incidentRowID int64

// timer calls RetriggerEscalations the next time any escalation could be reached on the incident.
//
// For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident
Expand Down Expand Up @@ -73,11 +74,11 @@ func (i *Incident) SeverityString() string {
}

func (i *Incident) String() string {
return fmt.Sprintf("#%d", i.incidentRowID)
return fmt.Sprintf("#%d", i.ID())
}

func (i *Incident) ID() int64 {
return i.incidentRowID
return i.Id
}

func (i *Incident) HasManager() bool {
Expand Down Expand Up @@ -190,7 +191,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) {
i.runtimeConfig.RLock()
defer i.runtimeConfig.RUnlock()

if !i.RecoveredAt.IsZero() {
if !i.RecoveredAt.Time().IsZero() {
// Incident is recovered in the meantime.
return
}
Expand Down Expand Up @@ -276,14 +277,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
causedByHistoryId = historyId

if newSeverity == event.SeverityOK {
i.RecoveredAt = time.Now()
i.RecoveredAt = types.UnixMilli(time.Now())
i.logger.Info("All sources recovered, closing incident")

RemoveCurrent(i.Object)

history := &HistoryRow{
EventID: utils.ToDBInt(ev.ID),
Time: types.UnixMilli(i.RecoveredAt),
Time: i.RecoveredAt,
Type: Closed,
}

Expand All @@ -310,7 +311,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx,
}

func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error {
i.StartedAt = ev.Time
i.StartedAt = types.UnixMilli(ev.Time)
i.Severity = ev.Severity
if err := i.Sync(ctx, tx); err != nil {
i.logger.Errorw("Can't insert incident to the database", zap.Error(err))
Expand Down Expand Up @@ -410,7 +411,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation,
i.timer = nil
}

filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity}
filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt.Time()), IncidentSeverity: i.Severity}

var escalations []*rule.Escalation
retryAfter := rule.RetryNever
Expand Down Expand Up @@ -466,7 +467,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation,
i.RetriggerEscalations(&event.Event{
Type: event.TypeInternal,
Time: nextEvalAt,
Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)),
Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())),
})
})
}
Expand Down
163 changes: 110 additions & 53 deletions internal/incident/incidents.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package incident

import (
"context"
"database/sql"
"errors"
"github.com/icinga/icinga-notifications/internal/config"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/object"
"github.com/icinga/icingadb/pkg/icingadb"
"github.com/icinga/icingadb/pkg/logging"
"github.com/icinga/icingadb/pkg/types"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"sync"
"time"
)
Expand All @@ -25,34 +23,116 @@ var (
func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig) error {
logger.Info("Loading all active incidents from database")

var objectIDs []types.Binary
err := db.SelectContext(ctx, &objectIDs, `SELECT object_id FROM incident WHERE "recovered_at" IS NULL`)
if err != nil {
logger.Errorw("Failed to load active incidents from database", zap.Error(err))
incidents := make(chan *Incident, 1)
g, _ := errgroup.WithContext(ctx)

return errors.New("failed to fetch open incidents")
}
g.Go(func() error {
defer close(incidents)

query := `
SELECT incident.id, started_at, severity, incident.object_id, object.source_id, object.name, object.url, oit.tag, oit.value, oet.tag, oet.value
FROM incident
INNER JOIN object on object.id = incident.object_id
INNER JOIN public.object_id_tag oit on object.id = oit.object_id
LEFT JOIN public.object_extra_tag oet on object.id = oet.object_id
WHERE "recovered_at" IS NULL
GROUP BY incident.id, object.id, oet.tag, oet.value, oit.tag, oit.value`

for _, objectID := range objectIDs {
obj, err := object.LoadFromDB(ctx, db, objectID)
rows, err := db.QueryxContext(ctx, query)
if err != nil {
logger.Errorw("Failed to retrieve incident object from database", zap.Error(err))
continue
return err
}
defer func() { _ = rows.Close() }()

var extraTag, extraTagValue, idTag, idTagValue string
for rows.Next() {
ev := &event.Event{Tags: make(map[string]string), ExtraTags: make(map[string]string)}
i := NewIncident(db, object.NewObject(db, ev), runtimeConfig, nil)
// Note: We have to specify the fields the SQL row is to be scanned into in exactly
// the same order as in the SELECT statement above.
err := rows.Scan(&i.Id, &i.StartedAt, &i.Severity, &i.ObjectID, &i.Object.SourceID, &i.Object.Name, &i.Object.URL, &idTag, &idTagValue, &extraTag, &extraTagValue)
if err != nil {
return err
}

incident, _, err := GetCurrent(ctx, db, obj, logger, runtimeConfig, false)
if err != nil {
continue
i.Object.ID = i.ObjectID
i.Object.Tags[idTag] = idTagValue
i.Object.ExtraTags[extraTag] = extraTagValue

select {
case incidents <- i:
case <-ctx.Done():
return ctx.Err()
}
}

incident.RetriggerEscalations(&event.Event{
Time: time.Now(),
Type: event.TypeInternal,
Message: "Incident reevaluation at daemon startup",
})
}
return nil
})

g.Go(func() error {
cacheIncident := func(i *Incident) {
currentIncidentsMu.Lock()
defer currentIncidentsMu.Unlock()

i.logger = logger.With(zap.String("object", i.Object.DisplayName()), zap.String("incident", i.String()))
if i.restoreEscalationsState(ctx) != nil {
// Error is already logged within the function, and apart from that we don't want
// to abort loading the remaining incidents just because we fail to restore the
// escalations of some incidents.
return
}

i.RetriggerEscalations(&event.Event{
Time: time.Now(),
Type: event.TypeInternal,
Message: "Incident reevaluation at daemon startup",
})

object.Cache(i.Object)

currentIncidents[i.Object] = i
}

return nil
var prevIncident *Incident
for {
select {
case <-ctx.Done():
return ctx.Err()
case i, ok := <-incidents:
if !ok {
if prevIncident != nil {
// There are no more results to stream, so add the last received incident to the cache
cacheIncident(prevIncident)
}

return nil
}

if prevIncident == nil || i.ID() != prevIncident.ID() {
if prevIncident != nil {
// Having already stored an incident in `prevIncident` means that there are no results to
// stream referring to the previously received incident ID. So add the previous incident
// to the cache before overwriting it with the current one.
cacheIncident(prevIncident)
}

prevIncident = i
continue
}

// Copy the just consumed incident (extra) tags into the previous one, as both share the same ID.
for tag, val := range i.Object.Tags {
prevIncident.Object.Tags[tag] = val
}

for tag, val := range i.Object.ExtraTags {
prevIncident.Object.ExtraTags[tag] = val
}
}
}
})

return g.Wait()
}

func GetCurrent(
Expand All @@ -65,37 +145,14 @@ func GetCurrent(
created := false
currentIncident := currentIncidents[obj]

if currentIncident == nil {
ir := &IncidentRow{}
incidentLogger := logger.With(zap.String("object", obj.DisplayName()))
incident := NewIncident(db, obj, runtimeConfig, incidentLogger)
if currentIncident == nil && create {
created = true

err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
logger.Errorw("Failed to load incident from database", zap.String("object", obj.DisplayName()), zap.Error(err))

return nil, false, errors.New("failed to load incident from database")
} else if err == nil {
incident.incidentRowID = ir.ID
incident.StartedAt = ir.StartedAt.Time()
incident.Severity = ir.Severity
incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String()))

if err := incident.restoreEscalationsState(ctx); err != nil {
return nil, false, err
}

currentIncident = incident
}

if create && currentIncident == nil {
created = true
currentIncident = incident
}
incidentLogger := logger.With(zap.String("object", obj.DisplayName()))
currentIncident = NewIncident(db, obj, runtimeConfig, incidentLogger)
currentIncident.ObjectID = obj.ID

if currentIncident != nil {
currentIncidents[obj] = currentIncident
}
currentIncidents[obj] = currentIncident
}

if !created && currentIncident != nil {
Expand Down Expand Up @@ -128,7 +185,7 @@ func GetCurrentIncidents() map[int64]*Incident {

m := make(map[int64]*Incident)
for _, incident := range currentIncidents {
m[incident.incidentRowID] = incident
m[incident.ID()] = incident
}
return m
}
Loading

0 comments on commit c05571c

Please sign in to comment.