diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index c037f7a0d..b5aa227ca 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -1,59 +1,11 @@ package incident import ( - "context" - "fmt" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/icinga/icinga-notifications/internal/utils" - "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" - "github.com/jmoiron/sqlx" ) -type IncidentRow struct { - ID int64 `db:"id"` - ObjectID types.Binary `db:"object_id"` - StartedAt types.UnixMilli `db:"started_at"` - RecoveredAt types.UnixMilli `db:"recovered_at"` - Severity event.Severity `db:"severity"` -} - -// TableName implements the contracts.TableNamer interface. -func (i *IncidentRow) TableName() string { - return "incident" -} - -// Upsert implements the contracts.Upserter interface. -func (i *IncidentRow) Upsert() interface{} { - return &struct { - Severity event.Severity `db:"severity"` - RecoveredAt types.UnixMilli `db:"recovered_at"` - }{Severity: i.Severity, RecoveredAt: i.RecoveredAt} -} - -// Sync synchronizes incidents to the database. -// Fetches the last inserted incident id and modifies this incident's id. -// Returns an error on database failure. -func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error { - if upsert { - stmt, _ := db.BuildUpsertStmt(i) - _, err := tx.NamedExecContext(ctx, stmt, i) - if err != nil { - return fmt.Errorf("failed to upsert incident: %s", err) - } - } else { - incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i) - if err != nil { - return err - } - - i.ID = incidentId - } - - return nil -} - // EventRow represents a single incident event database entry. type EventRow struct { IncidentID int64 `db:"incident_id"` diff --git a/internal/incident/incident.go b/internal/incident/incident.go index b0d60aa64..8ba9326e6 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -24,17 +24,18 @@ type ruleID = int64 type escalationID = int64 type Incident struct { - Object *object.Object - StartedAt time.Time - RecoveredAt time.Time - Severity event.Severity + Id int64 `db:"id"` + ObjectID types.Binary `db:"object_id"` + StartedAt types.UnixMilli `db:"started_at"` + RecoveredAt types.UnixMilli `db:"recovered_at"` + Severity event.Severity `db:"severity"` + + Object *object.Object `db:"-"` EscalationState map[escalationID]*EscalationState Rules map[ruleID]struct{} Recipients map[recipient.Key]*RecipientState - incidentRowID int64 - // timer calls RetriggerEscalations the next time any escalation could be reached on the incident. // // For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident @@ -73,11 +74,11 @@ func (i *Incident) SeverityString() string { } func (i *Incident) String() string { - return fmt.Sprintf("#%d", i.incidentRowID) + return fmt.Sprintf("#%d", i.ID()) } func (i *Incident) ID() int64 { - return i.incidentRowID + return i.Id } func (i *Incident) HasManager() bool { @@ -190,7 +191,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { i.runtimeConfig.RLock() defer i.runtimeConfig.RUnlock() - if !i.RecoveredAt.IsZero() { + if !i.RecoveredAt.Time().IsZero() { // Incident is recovered in the meantime. return } @@ -276,14 +277,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, causedByHistoryId = historyId if newSeverity == event.SeverityOK { - i.RecoveredAt = time.Now() + i.RecoveredAt = types.UnixMilli(time.Now()) i.logger.Info("All sources recovered, closing incident") RemoveCurrent(i.Object) history := &HistoryRow{ EventID: utils.ToDBInt(ev.ID), - Time: types.UnixMilli(i.RecoveredAt), + Time: i.RecoveredAt, Type: Closed, } @@ -310,7 +311,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, } func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - i.StartedAt = ev.Time + i.StartedAt = types.UnixMilli(ev.Time) i.Severity = ev.Severity if err := i.Sync(ctx, tx); err != nil { i.logger.Errorw("Can't insert incident to the database", zap.Error(err)) @@ -410,7 +411,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, i.timer = nil } - filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity} + filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt.Time()), IncidentSeverity: i.Severity} var escalations []*rule.Escalation retryAfter := rule.RetryNever @@ -466,7 +467,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, i.RetriggerEscalations(&event.Event{ Type: event.TypeInternal, Time: nextEvalAt, - Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)), + Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())), }) }) } diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 56d0b1663..4863ba8b9 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -2,15 +2,13 @@ package incident import ( "context" - "database/sql" - "errors" "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" - "github.com/icinga/icingadb/pkg/types" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "sync" "time" ) @@ -25,34 +23,116 @@ var ( func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig) error { logger.Info("Loading all active incidents from database") - var objectIDs []types.Binary - err := db.SelectContext(ctx, &objectIDs, `SELECT object_id FROM incident WHERE "recovered_at" IS NULL`) - if err != nil { - logger.Errorw("Failed to load active incidents from database", zap.Error(err)) + incidents := make(chan *Incident, 1) + g, _ := errgroup.WithContext(ctx) - return errors.New("failed to fetch open incidents") - } + g.Go(func() error { + defer close(incidents) + + query := ` + SELECT incident.id, started_at, severity, incident.object_id, object.source_id, object.name, object.url, oit.tag, oit.value, oet.tag, oet.value + FROM incident + INNER JOIN object on object.id = incident.object_id + INNER JOIN public.object_id_tag oit on object.id = oit.object_id + LEFT JOIN public.object_extra_tag oet on object.id = oet.object_id + WHERE "recovered_at" IS NULL + GROUP BY incident.id, object.id, oet.tag, oet.value, oit.tag, oit.value` - for _, objectID := range objectIDs { - obj, err := object.LoadFromDB(ctx, db, objectID) + rows, err := db.QueryxContext(ctx, query) if err != nil { - logger.Errorw("Failed to retrieve incident object from database", zap.Error(err)) - continue + return err } + defer func() { _ = rows.Close() }() + + var extraTag, extraTagValue, idTag, idTagValue string + for rows.Next() { + ev := &event.Event{Tags: make(map[string]string), ExtraTags: make(map[string]string)} + i := NewIncident(db, object.NewObject(db, ev), runtimeConfig, nil) + // Note: We have to specify the fields the SQL row is to be scanned into in exactly + // the same order as in the SELECT statement above. + err := rows.Scan(&i.Id, &i.StartedAt, &i.Severity, &i.ObjectID, &i.Object.SourceID, &i.Object.Name, &i.Object.URL, &idTag, &idTagValue, &extraTag, &extraTagValue) + if err != nil { + return err + } - incident, _, err := GetCurrent(ctx, db, obj, logger, runtimeConfig, false) - if err != nil { - continue + i.Object.ID = i.ObjectID + i.Object.Tags[idTag] = idTagValue + i.Object.ExtraTags[extraTag] = extraTagValue + + select { + case incidents <- i: + case <-ctx.Done(): + return ctx.Err() + } } - incident.RetriggerEscalations(&event.Event{ - Time: time.Now(), - Type: event.TypeInternal, - Message: "Incident reevaluation at daemon startup", - }) - } + return nil + }) + + g.Go(func() error { + cacheIncident := func(i *Incident) { + currentIncidentsMu.Lock() + defer currentIncidentsMu.Unlock() + + i.logger = logger.With(zap.String("object", i.Object.DisplayName()), zap.String("incident", i.String())) + if i.restoreEscalationsState(ctx) != nil { + // Error is already logged within the function, and apart from that we don't want + // to abort loading the remaining incidents just because we fail to restore the + // escalations of some incidents. + return + } + + i.RetriggerEscalations(&event.Event{ + Time: time.Now(), + Type: event.TypeInternal, + Message: "Incident reevaluation at daemon startup", + }) + + object.Cache(i.Object) + + currentIncidents[i.Object] = i + } - return nil + var prevIncident *Incident + for { + select { + case <-ctx.Done(): + return ctx.Err() + case i, ok := <-incidents: + if !ok { + if prevIncident != nil { + // There are no more results to stream, so add the last received incident to the cache + cacheIncident(prevIncident) + } + + return nil + } + + if prevIncident == nil || i.ID() != prevIncident.ID() { + if prevIncident != nil { + // Having already stored an incident in `prevIncident` means that there are no results to + // stream referring to the previously received incident ID. So add the previous incident + // to the cache before overwriting it with the current one. + cacheIncident(prevIncident) + } + + prevIncident = i + continue + } + + // Copy the just consumed incident (extra) tags into the previous one, as both share the same ID. + for tag, val := range i.Object.Tags { + prevIncident.Object.Tags[tag] = val + } + + for tag, val := range i.Object.ExtraTags { + prevIncident.Object.ExtraTags[tag] = val + } + } + } + }) + + return g.Wait() } func GetCurrent( @@ -65,37 +145,14 @@ func GetCurrent( created := false currentIncident := currentIncidents[obj] - if currentIncident == nil { - ir := &IncidentRow{} - incidentLogger := logger.With(zap.String("object", obj.DisplayName())) - incident := NewIncident(db, obj, runtimeConfig, incidentLogger) + if currentIncident == nil && create { + created = true - err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - logger.Errorw("Failed to load incident from database", zap.String("object", obj.DisplayName()), zap.Error(err)) - - return nil, false, errors.New("failed to load incident from database") - } else if err == nil { - incident.incidentRowID = ir.ID - incident.StartedAt = ir.StartedAt.Time() - incident.Severity = ir.Severity - incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String())) - - if err := incident.restoreEscalationsState(ctx); err != nil { - return nil, false, err - } - - currentIncident = incident - } - - if create && currentIncident == nil { - created = true - currentIncident = incident - } + incidentLogger := logger.With(zap.String("object", obj.DisplayName())) + currentIncident = NewIncident(db, obj, runtimeConfig, incidentLogger) + currentIncident.ObjectID = obj.ID - if currentIncident != nil { - currentIncidents[obj] = currentIncident - } + currentIncidents[obj] = currentIncident } if !created && currentIncident != nil { @@ -128,7 +185,7 @@ func GetCurrentIncidents() map[int64]*Incident { m := make(map[int64]*Incident) for _, incident := range currentIncidents { - m[incident.incidentRowID] = incident + m[incident.ID()] = incident } return m } diff --git a/internal/incident/sync.go b/internal/incident/sync.go index b29969dc9..ea13a5a4f 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -3,6 +3,7 @@ package incident import ( "context" "errors" + "fmt" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" @@ -13,30 +14,44 @@ import ( "time" ) +// TableName implements the contracts.TableNamer interface. +func (i *Incident) TableName() string { + return "incident" +} + +// Upsert implements the contracts.Upserter interface. +func (i *Incident) Upsert() interface{} { + return &struct { + Severity event.Severity `db:"severity"` + RecoveredAt types.UnixMilli `db:"recovered_at"` + }{Severity: i.Severity, RecoveredAt: i.RecoveredAt} +} + // Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database. // Before syncing any incident related database entries, this method should be called at least once. // Returns an error on db failure. func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { - incidentRow := &IncidentRow{ - ID: i.incidentRowID, - ObjectID: i.Object.ID, - StartedAt: types.UnixMilli(i.StartedAt), - RecoveredAt: types.UnixMilli(i.RecoveredAt), - Severity: i.Severity, - } + if i.ID() != 0 { + stmt, _ := i.db.BuildUpsertStmt(i) + _, err := tx.NamedExecContext(ctx, stmt, i) + if err != nil { + return fmt.Errorf("failed to upsert incident: %s", err) + } + } else { + stmt := utils.BuildInsertStmtWithout(i.db, i, "id") + incidentId, err := utils.InsertAndFetchId(ctx, tx, stmt, i) + if err != nil { + return err + } - err := incidentRow.Sync(ctx, tx, i.db, i.incidentRowID != 0) - if err != nil { - return err + i.Id = incidentId } - i.incidentRowID = incidentRow.ID - return nil } func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) { - historyRow.IncidentID = i.incidentRowID + historyRow.IncidentID = i.ID() stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id") if fetchId { @@ -57,7 +72,7 @@ func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *Hist } func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error { - state.IncidentID = i.incidentRowID + state.IncidentID = i.ID() stmt, _ := i.db.BuildUpsertStmt(state) _, err := tx.NamedExecContext(ctx, stmt, state) @@ -67,7 +82,7 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat // AddEvent Inserts incident history record to the database and returns an error on db failure. func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID} + ie := &EventRow{IncidentID: i.ID(), EventID: ev.ID} stmt, _ := i.db.BuildInsertStmt(ie) _, err := tx.NamedExecContext(ctx, stmt, ie) @@ -84,7 +99,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru for _, escalationRecipient := range escalation.Recipients { r := escalationRecipient.Recipient - cr := &ContactRow{IncidentID: i.incidentRowID, Role: newRole} + cr := &ContactRow{IncidentID: i.ID(), Role: newRole} recipientKey := recipient.ToKey(r) cr.Key = recipientKey @@ -100,7 +115,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru i.logger.Infof("Contact %q role changed from %s to %s", r, state.Role.String(), newRole.String()) hr := &HistoryRow{ - IncidentID: i.incidentRowID, + IncidentID: i.ID(), EventID: utils.ToDBInt(eventId), Key: cr.Key, Time: types.UnixMilli(time.Now()), @@ -140,7 +155,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru // AddRuleMatched syncs the given *rule.Rule to the database. // Returns an error on database failure. func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error { - rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID} + rr := &RuleRow{IncidentID: i.ID(), RuleID: r.ID} stmt, _ := i.db.BuildUpsertStmt(rr) _, err := tx.NamedExecContext(ctx, stmt, rr) diff --git a/internal/object/db_types.go b/internal/object/db_types.go index 36c669c5c..4f46a5a25 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -1,9 +1,7 @@ package object import ( - "context" "fmt" - "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" ) @@ -30,76 +28,26 @@ func (e *IdTagRow) TableName() string { return "object_id_tag" } -type ObjectRow struct { - ID types.Binary `db:"id"` - SourceID int64 `db:"source_id"` - Name string `db:"name"` - URL types.String `db:"url"` -} - // TableName implements the contracts.TableNamer interface. -func (or *ObjectRow) TableName() string { +func (o *Object) TableName() string { return "object" } // Upsert implements the contracts.Upserter interface. -func (or *ObjectRow) Upsert() interface{} { +func (o *Object) Upsert() interface{} { return struct { Name string `db:"name"` URL types.String `db:"url"` }{} } -// LoadFromDB loads objects from the database matching the given id. -// This is only used to load the objects at daemon startup before the listener becomes ready, -// therefore it doesn't lock the objects cache mutex and panics when the given object ID is already -// in the cache. Otherwise, loads all the required data and returns error on database failure. -func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, error) { - if obj, ok := cache[id.String()]; ok { +// Cache adds the given object to the global object cache store. +// This is only used after loading the objects at daemon startup before the listener becomes ready, therefore it +// doesn't lock the objects cache mutex and panics when the given object is already in the cache. +func Cache(obj *Object) { + if obj, ok := cache[obj.ID.String()]; ok { panic(fmt.Sprintf("Object %s is already in cache", obj.DisplayName())) } - objectRow := &ObjectRow{ID: id} - err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow) - if err != nil { - return nil, fmt.Errorf("failed to fetch object: %w", err) - } - - var idTagRows []*IdTagRow - err = db.SelectContext( - ctx, &idTagRows, - db.Rebind(db.BuildSelectStmt(&IdTagRow{}, &IdTagRow{})+` WHERE "object_id" = ?`), id, - ) - if err != nil { - return nil, fmt.Errorf("failed to fetch object id tags: %w", err) - } - idTags := map[string]string{} - for _, idTag := range idTagRows { - idTags[idTag.Tag] = idTag.Value - } - - var extraTagRows []*ExtraTagRow - err = db.SelectContext( - ctx, &extraTagRows, - db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id, - ) - if err != nil { - return nil, fmt.Errorf("failed to fetch object extra tags: %w", err) - } - extraTags := map[string]string{} - for _, extraTag := range extraTagRows { - extraTags[extraTag.Tag] = extraTag.Value - } - - obj := &Object{ - db: db, - ID: id, - Name: objectRow.Name, - URL: objectRow.URL.String, - Tags: idTags, - ExtraTags: extraTags, - } - cache[id.String()] = obj - - return obj, nil + cache[obj.ID.String()] = obj } diff --git a/internal/object/object.go b/internal/object/object.go index 4b1ec42d5..3cda5b79e 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -24,12 +24,12 @@ var ( ) type Object struct { - ID types.Binary - SourceId int64 - Name string - Tags map[string]string - URL string + ID types.Binary `db:"id"` + SourceID int64 `db:"source_id"` + Name string `db:"name"` + URL types.String `db:"url"` + Tags map[string]string ExtraTags map[string]string db *icingadb.DB @@ -37,10 +37,10 @@ type Object struct { func NewObject(db *icingadb.DB, ev *event.Event) *Object { return &Object{ - SourceId: ev.SourceId, + SourceID: ev.SourceId, Name: ev.Name, db: db, - URL: ev.URL, + URL: utils.ToDBString(ev.URL), Tags: ev.Tags, ExtraTags: ev.ExtraTags, } @@ -68,15 +68,8 @@ func FromEvent(ctx context.Context, db *icingadb.DB, ev *event.Event) (*Object, } defer func() { _ = tx.Rollback() }() - dbObj := &ObjectRow{ - ID: object.ID, - SourceID: ev.SourceId, - Name: ev.Name, - URL: utils.ToDBString(ev.URL), - } - - stmt, _ := object.db.BuildUpsertStmt(&ObjectRow{}) - _, err = tx.NamedExecContext(ctx, stmt, dbObj) + stmt, _ := object.db.BuildUpsertStmt(&Object{}) + _, err = tx.NamedExecContext(ctx, stmt, object) if err != nil { return nil, fmt.Errorf("failed to insert object: %w", err) } @@ -105,11 +98,6 @@ func FromEvent(ctx context.Context, db *icingadb.DB, ev *event.Event) (*Object, return nil, fmt.Errorf("can't commit object database transaction: %w", err) } - object.ExtraTags = ev.ExtraTags - object.Tags = ev.Tags - object.Name = ev.Name - object.URL = ev.URL - return object, nil } @@ -137,9 +125,9 @@ func (o *Object) String() string { _, _ = fmt.Fprintf(&b, "\n") } - _, _ = fmt.Fprintf(&b, " Source %d:\n", o.SourceId) + _, _ = fmt.Fprintf(&b, " Source %d:\n", o.SourceID) _, _ = fmt.Fprintf(&b, " Name: %q\n", o.Name) - _, _ = fmt.Fprintf(&b, " URL: %q\n", o.URL) + _, _ = fmt.Fprintf(&b, " URL: %q\n", o.URL.String) _, _ = fmt.Fprintf(&b, " Extra Tags:\n") for tag, value := range o.ExtraTags {