From 1eb8a4ceadf64c3581018cc651afbda9657c041d Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 21 Nov 2023 14:58:57 +0100 Subject: [PATCH] Restore active incidents from DB gracefully --- internal/incident/db_types.go | 48 ------- internal/incident/incident.go | 54 +++---- internal/incident/incidents.go | 256 ++++++++++++++++++++++++++------- internal/incident/sync.go | 51 ++++--- internal/object/db_types.go | 68 +-------- internal/object/object.go | 53 +++---- 6 files changed, 283 insertions(+), 247 deletions(-) diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index c037f7a0d..b5aa227ca 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -1,59 +1,11 @@ package incident import ( - "context" - "fmt" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" - "github.com/icinga/icinga-notifications/internal/utils" - "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" - "github.com/jmoiron/sqlx" ) -type IncidentRow struct { - ID int64 `db:"id"` - ObjectID types.Binary `db:"object_id"` - StartedAt types.UnixMilli `db:"started_at"` - RecoveredAt types.UnixMilli `db:"recovered_at"` - Severity event.Severity `db:"severity"` -} - -// TableName implements the contracts.TableNamer interface. -func (i *IncidentRow) TableName() string { - return "incident" -} - -// Upsert implements the contracts.Upserter interface. -func (i *IncidentRow) Upsert() interface{} { - return &struct { - Severity event.Severity `db:"severity"` - RecoveredAt types.UnixMilli `db:"recovered_at"` - }{Severity: i.Severity, RecoveredAt: i.RecoveredAt} -} - -// Sync synchronizes incidents to the database. -// Fetches the last inserted incident id and modifies this incident's id. -// Returns an error on database failure. -func (i *IncidentRow) Sync(ctx context.Context, tx *sqlx.Tx, db *icingadb.DB, upsert bool) error { - if upsert { - stmt, _ := db.BuildUpsertStmt(i) - _, err := tx.NamedExecContext(ctx, stmt, i) - if err != nil { - return fmt.Errorf("failed to upsert incident: %s", err) - } - } else { - incidentId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, i, "id"), i) - if err != nil { - return err - } - - i.ID = incidentId - } - - return nil -} - // EventRow represents a single incident event database entry. type EventRow struct { IncidentID int64 `db:"incident_id"` diff --git a/internal/incident/incident.go b/internal/incident/incident.go index b0d60aa64..e041d3999 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -24,17 +24,18 @@ type ruleID = int64 type escalationID = int64 type Incident struct { - Object *object.Object - StartedAt time.Time - RecoveredAt time.Time - Severity event.Severity + Id int64 `db:"id"` + ObjectID types.Binary `db:"object_id"` + StartedAt types.UnixMilli `db:"started_at"` + RecoveredAt types.UnixMilli `db:"recovered_at"` + Severity event.Severity `db:"severity"` + + Object *object.Object `db:"-"` EscalationState map[escalationID]*EscalationState Rules map[ruleID]struct{} Recipients map[recipient.Key]*RecipientState - incidentRowID int64 - // timer calls RetriggerEscalations the next time any escalation could be reached on the incident. // // For example, if there are escalations configured for incident_age>=1h and incident_age>=2h, if the incident @@ -73,11 +74,11 @@ func (i *Incident) SeverityString() string { } func (i *Incident) String() string { - return fmt.Sprintf("#%d", i.incidentRowID) + return fmt.Sprintf("#%d", i.ID()) } func (i *Incident) ID() int64 { - return i.incidentRowID + return i.Id } func (i *Incident) HasManager() bool { @@ -190,7 +191,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { i.runtimeConfig.RLock() defer i.runtimeConfig.RUnlock() - if !i.RecoveredAt.IsZero() { + if !i.RecoveredAt.Time().IsZero() { // Incident is recovered in the meantime. return } @@ -276,14 +277,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, causedByHistoryId = historyId if newSeverity == event.SeverityOK { - i.RecoveredAt = time.Now() + i.RecoveredAt = types.UnixMilli(time.Now()) i.logger.Info("All sources recovered, closing incident") RemoveCurrent(i.Object) history := &HistoryRow{ EventID: utils.ToDBInt(ev.ID), - Time: types.UnixMilli(i.RecoveredAt), + Time: i.RecoveredAt, Type: Closed, } @@ -310,7 +311,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, } func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - i.StartedAt = ev.Time + i.StartedAt = types.UnixMilli(ev.Time) i.Severity = ev.Severity if err := i.Sync(ctx, tx); err != nil { i.logger.Errorw("Can't insert incident to the database", zap.Error(err)) @@ -410,7 +411,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, i.timer = nil } - filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt), IncidentSeverity: i.Severity} + filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt.Time()), IncidentSeverity: i.Severity} var escalations []*rule.Escalation retryAfter := rule.RetryNever @@ -466,7 +467,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Escalation, i.RetriggerEscalations(&event.Event{ Type: event.TypeInternal, Time: nextEvalAt, - Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt)), + Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())), }) }) } @@ -635,11 +636,11 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, } // RestoreEscalationStateRules restores this incident's rules based on the given escalation states. -func (i *Incident) RestoreEscalationStateRules(states []*EscalationState) { +func (i *Incident) RestoreEscalationStateRules() { i.runtimeConfig.RLock() defer i.runtimeConfig.RUnlock() - for _, state := range states { + for _, state := range i.EscalationState { escalation := i.runtimeConfig.GetRuleEscalation(state.RuleEscalationID) i.Rules[escalation.RuleID] = struct{}{} } @@ -705,27 +706,6 @@ func (i *Incident) restoreRecipients(ctx context.Context) error { return nil } -// restoreEscalationsState restores all escalation states matching the current incident id from the database. -// Returns error on database failure. -func (i *Incident) restoreEscalationsState(ctx context.Context) error { - state := &EscalationState{} - var states []*EscalationState - err := i.db.SelectContext(ctx, &states, i.db.Rebind(i.db.BuildSelectStmt(state, state)+` WHERE "incident_id" = ?`), i.ID()) - if err != nil { - i.logger.Errorw("Failed to restore incident rule escalation states", zap.Error(err)) - - return errors.New("failed to restore incident rule escalation states") - } - - for _, state := range states { - i.EscalationState[state.RuleEscalationID] = state - } - - i.RestoreEscalationStateRules(states) - - return nil -} - type EscalationState struct { IncidentID int64 `db:"incident_id"` RuleEscalationID int64 `db:"rule_escalation_id"` diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 56d0b1663..1746e21a4 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -2,15 +2,17 @@ package incident import ( "context" - "database/sql" - "errors" + "fmt" "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icingadb/pkg/com" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" - "github.com/icinga/icingadb/pkg/types" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "sync" "time" ) @@ -25,34 +27,205 @@ var ( func LoadOpenIncidents(ctx context.Context, db *icingadb.DB, logger *logging.Logger, runtimeConfig *config.RuntimeConfig) error { logger.Info("Loading all active incidents from database") - var objectIDs []types.Binary - err := db.SelectContext(ctx, &objectIDs, `SELECT object_id FROM incident WHERE "recovered_at" IS NULL`) - if err != nil { - logger.Errorw("Failed to load active incidents from database", zap.Error(err)) + g, childCtx := errgroup.WithContext(ctx) - return errors.New("failed to fetch open incidents") - } + incidents := make(chan *Incident) + g.Go(func() error { + defer close(incidents) - for _, objectID := range objectIDs { - obj, err := object.LoadFromDB(ctx, db, objectID) + rows, err := db.QueryxContext(childCtx, db.BuildSelectStmt(&Incident{}, &Incident{})+` WHERE "recovered_at" IS NULL`) if err != nil { - logger.Errorw("Failed to retrieve incident object from database", zap.Error(err)) - continue + return err } + defer func() { _ = rows.Close() }() - incident, _, err := GetCurrent(ctx, db, obj, logger, runtimeConfig, false) - if err != nil { - continue + for rows.Next() { + i := NewIncident(db, nil, runtimeConfig, nil) + if err := rows.StructScan(i); err != nil { + return err + } + + select { + case incidents <- i: + case <-childCtx.Done(): + return childCtx.Err() + } } - incident.RetriggerEscalations(&event.Event{ - Time: time.Now(), - Type: event.TypeInternal, - Message: "Incident reevaluation at daemon startup", - }) - } + return nil + }) + + g.Go(func() error { + bulks := com.Bulk(childCtx, incidents, db.Options.MaxPlaceholdersPerStatement, com.NeverSplit[*Incident]) + + expandArgs := func(subject any, bindVals []any, column string) (string, []any, error) { + query := fmt.Sprintf("%s WHERE %q IN(?)", db.BuildSelectStmt(subject, subject), column) + stmt, args, err := sqlx.In(query, bindVals) + if err != nil { + return "", nil, errors.Wrapf(err, "cannot build placeholders for %q", query) + } + + return stmt, args, nil + } + + for { + select { + case <-childCtx.Done(): + return childCtx.Err() + case bulk, ok := <-bulks: + if !ok { + return nil + } + + g.Go(func() error { + objectIds := make([]any, len(bulk)) + incidentIds := make([]any, len(bulk)) + incidentsByObjId := make(map[string]*Incident) + + for i := range bulk { + objectIds[i] = bulk[i].ObjectID + incidentIds[i] = bulk[i].Id + incidentsByObjId[bulk[i].ObjectID.String()] = bulk[i] + } + + stmt, args, err := expandArgs(new(object.Object), objectIds, "id") + if err != nil { + return err + } + + objRows, err := db.QueryxContext(childCtx, db.Rebind(stmt), args...) + if err != nil { + return errors.Wrap(err, "cannot fetch incident objects") + } + defer func() { _ = objRows.Close() }() + + for objRows.Next() { + obj := object.New(db, &event.Event{Tags: make(map[string]string), ExtraTags: make(map[string]string)}) + if err = objRows.StructScan(obj); err != nil { + return err + } + + incidentsByObjId[obj.ID.String()].Object = obj + } + + // Object ID tags... + stmt, args, err = expandArgs(new(object.IdTagRow), objectIds, "object_id") + if err != nil { + return err + } + + idRows, err := db.QueryxContext(childCtx, db.Rebind(stmt), args...) + if err != nil { + return errors.Wrap(err, "cannot fetch object id tags") + } + defer func() { _ = idRows.Close() }() + + for idRows.Next() { + idtag := new(object.IdTagRow) + if err = idRows.StructScan(idtag); err != nil { + return err + } + + incidentsByObjId[idtag.ObjectId.String()].Object.Tags[idtag.Tag] = idtag.Value + } + + // Object extra tags... + stmt, args, err = expandArgs(new(object.ExtraTagRow), objectIds, "object_id") + if err != nil { + return err + } + + extraTagRows, err := db.QueryxContext(childCtx, db.Rebind(stmt), args...) + if err != nil { + return errors.Wrap(err, "cannot fetch object extra tags") + } + defer func() { _ = extraTagRows.Close() }() + + for extraTagRows.Next() { + extraTag := new(object.ExtraTagRow) + if err = extraTagRows.StructScan(extraTag); err != nil { + return err + } + + incidentsByObjId[extraTag.ObjectId.String()].Object.ExtraTags[extraTag.Tag] = extraTag.Value + } - return nil + // Restore all escalation states matching the current incident ids. + stmt, args, err = expandArgs(new(EscalationState), incidentIds, "incident_id") + if err != nil { + return err + } + + statesRows, err := db.QueryxContext(childCtx, db.Rebind(stmt), args...) + if err != nil { + return errors.Wrap(err, "cannot restore incident rule escalation states") + } + defer func() { _ = statesRows.Close() }() + + for statesRows.Next() { + state := new(EscalationState) + if err = statesRows.StructScan(state); err != nil { + return err + } + + for _, i := range incidentsByObjId { + if i.ID() == state.IncidentID { + i.EscalationState[state.RuleEscalationID] = state + break + } + } + } + + // Restore incident recipients... + stmt, args, err = expandArgs(new(ContactRow), incidentIds, "incident_id") + if err != nil { + return err + } + + recipientRows, err := db.QueryxContext(childCtx, db.Rebind(stmt), args...) + if err != nil { + return errors.Wrap(err, "cannot restore incident recipients") + } + defer func() { _ = recipientRows.Close() }() + + for recipientRows.Next() { + contact := new(ContactRow) + if err = recipientRows.StructScan(contact); err != nil { + return err + } + + for _, i := range incidentsByObjId { + if i.ID() == contact.IncidentID { + i.Recipients[contact.Key] = &RecipientState{Role: contact.Role} + break + } + } + } + + for _, i := range incidentsByObjId { + i.logger = logger.With(zap.String("object", i.Object.DisplayName()), zap.String("incident", i.String())) + + i.RestoreEscalationStateRules() + i.RetriggerEscalations(&event.Event{ + Time: time.Now(), + Type: event.TypeInternal, + Message: "Incident reevaluation at daemon startup", + }) + + object.Cache(i.Object) + + currentIncidentsMu.Lock() + currentIncidents[i.Object] = i + currentIncidentsMu.Unlock() + } + + return nil + }) + } + } + }) + + return g.Wait() } func GetCurrent( @@ -65,37 +238,14 @@ func GetCurrent( created := false currentIncident := currentIncidents[obj] - if currentIncident == nil { - ir := &IncidentRow{} - incidentLogger := logger.With(zap.String("object", obj.DisplayName())) - incident := NewIncident(db, obj, runtimeConfig, incidentLogger) - - err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(ir, ir)+` WHERE "object_id" = ? AND "recovered_at" IS NULL`), obj.ID).StructScan(ir) - if err != nil && !errors.Is(err, sql.ErrNoRows) { - logger.Errorw("Failed to load incident from database", zap.String("object", obj.DisplayName()), zap.Error(err)) - - return nil, false, errors.New("failed to load incident from database") - } else if err == nil { - incident.incidentRowID = ir.ID - incident.StartedAt = ir.StartedAt.Time() - incident.Severity = ir.Severity - incident.logger = logger.With(zap.String("object", obj.DisplayName()), zap.String("incident", incident.String())) - - if err := incident.restoreEscalationsState(ctx); err != nil { - return nil, false, err - } - - currentIncident = incident - } + if currentIncident == nil && create { + created = true - if create && currentIncident == nil { - created = true - currentIncident = incident - } + incidentLogger := logger.With(zap.String("object", obj.DisplayName())) + currentIncident = NewIncident(db, obj, runtimeConfig, incidentLogger) + currentIncident.ObjectID = obj.ID - if currentIncident != nil { - currentIncidents[obj] = currentIncident - } + currentIncidents[obj] = currentIncident } if !created && currentIncident != nil { @@ -128,7 +278,7 @@ func GetCurrentIncidents() map[int64]*Incident { m := make(map[int64]*Incident) for _, incident := range currentIncidents { - m[incident.incidentRowID] = incident + m[incident.ID()] = incident } return m } diff --git a/internal/incident/sync.go b/internal/incident/sync.go index b29969dc9..ea13a5a4f 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -3,6 +3,7 @@ package incident import ( "context" "errors" + "fmt" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" @@ -13,30 +14,44 @@ import ( "time" ) +// TableName implements the contracts.TableNamer interface. +func (i *Incident) TableName() string { + return "incident" +} + +// Upsert implements the contracts.Upserter interface. +func (i *Incident) Upsert() interface{} { + return &struct { + Severity event.Severity `db:"severity"` + RecoveredAt types.UnixMilli `db:"recovered_at"` + }{Severity: i.Severity, RecoveredAt: i.RecoveredAt} +} + // Sync initiates an *incident.IncidentRow from the current incident state and syncs it with the database. // Before syncing any incident related database entries, this method should be called at least once. // Returns an error on db failure. func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { - incidentRow := &IncidentRow{ - ID: i.incidentRowID, - ObjectID: i.Object.ID, - StartedAt: types.UnixMilli(i.StartedAt), - RecoveredAt: types.UnixMilli(i.RecoveredAt), - Severity: i.Severity, - } + if i.ID() != 0 { + stmt, _ := i.db.BuildUpsertStmt(i) + _, err := tx.NamedExecContext(ctx, stmt, i) + if err != nil { + return fmt.Errorf("failed to upsert incident: %s", err) + } + } else { + stmt := utils.BuildInsertStmtWithout(i.db, i, "id") + incidentId, err := utils.InsertAndFetchId(ctx, tx, stmt, i) + if err != nil { + return err + } - err := incidentRow.Sync(ctx, tx, i.db, i.incidentRowID != 0) - if err != nil { - return err + i.Id = incidentId } - i.incidentRowID = incidentRow.ID - return nil } func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *HistoryRow, fetchId bool) (types.Int, error) { - historyRow.IncidentID = i.incidentRowID + historyRow.IncidentID = i.ID() stmt := utils.BuildInsertStmtWithout(i.db, historyRow, "id") if fetchId { @@ -57,7 +72,7 @@ func (i *Incident) AddHistory(ctx context.Context, tx *sqlx.Tx, historyRow *Hist } func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error { - state.IncidentID = i.incidentRowID + state.IncidentID = i.ID() stmt, _ := i.db.BuildUpsertStmt(state) _, err := tx.NamedExecContext(ctx, stmt, state) @@ -67,7 +82,7 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat // AddEvent Inserts incident history record to the database and returns an error on db failure. func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - ie := &EventRow{IncidentID: i.incidentRowID, EventID: ev.ID} + ie := &EventRow{IncidentID: i.ID(), EventID: ev.ID} stmt, _ := i.db.BuildInsertStmt(ie) _, err := tx.NamedExecContext(ctx, stmt, ie) @@ -84,7 +99,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru for _, escalationRecipient := range escalation.Recipients { r := escalationRecipient.Recipient - cr := &ContactRow{IncidentID: i.incidentRowID, Role: newRole} + cr := &ContactRow{IncidentID: i.ID(), Role: newRole} recipientKey := recipient.ToKey(r) cr.Key = recipientKey @@ -100,7 +115,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru i.logger.Infof("Contact %q role changed from %s to %s", r, state.Role.String(), newRole.String()) hr := &HistoryRow{ - IncidentID: i.incidentRowID, + IncidentID: i.ID(), EventID: utils.ToDBInt(eventId), Key: cr.Key, Time: types.UnixMilli(time.Now()), @@ -140,7 +155,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru // AddRuleMatched syncs the given *rule.Rule to the database. // Returns an error on database failure. func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error { - rr := &RuleRow{IncidentID: i.incidentRowID, RuleID: r.ID} + rr := &RuleRow{IncidentID: i.ID(), RuleID: r.ID} stmt, _ := i.db.BuildUpsertStmt(rr) _, err := tx.NamedExecContext(ctx, stmt, rr) diff --git a/internal/object/db_types.go b/internal/object/db_types.go index 36c669c5c..354f88f0c 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -1,9 +1,6 @@ package object import ( - "context" - "fmt" - "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/types" ) @@ -30,76 +27,15 @@ func (e *IdTagRow) TableName() string { return "object_id_tag" } -type ObjectRow struct { - ID types.Binary `db:"id"` - SourceID int64 `db:"source_id"` - Name string `db:"name"` - URL types.String `db:"url"` -} - // TableName implements the contracts.TableNamer interface. -func (or *ObjectRow) TableName() string { +func (o *Object) TableName() string { return "object" } // Upsert implements the contracts.Upserter interface. -func (or *ObjectRow) Upsert() interface{} { +func (o *Object) Upsert() interface{} { return struct { Name string `db:"name"` URL types.String `db:"url"` }{} } - -// LoadFromDB loads objects from the database matching the given id. -// This is only used to load the objects at daemon startup before the listener becomes ready, -// therefore it doesn't lock the objects cache mutex and panics when the given object ID is already -// in the cache. Otherwise, loads all the required data and returns error on database failure. -func LoadFromDB(ctx context.Context, db *icingadb.DB, id types.Binary) (*Object, error) { - if obj, ok := cache[id.String()]; ok { - panic(fmt.Sprintf("Object %s is already in cache", obj.DisplayName())) - } - - objectRow := &ObjectRow{ID: id} - err := db.QueryRowxContext(ctx, db.Rebind(db.BuildSelectStmt(objectRow, objectRow)+` WHERE "id" = ?`), objectRow.ID).StructScan(objectRow) - if err != nil { - return nil, fmt.Errorf("failed to fetch object: %w", err) - } - - var idTagRows []*IdTagRow - err = db.SelectContext( - ctx, &idTagRows, - db.Rebind(db.BuildSelectStmt(&IdTagRow{}, &IdTagRow{})+` WHERE "object_id" = ?`), id, - ) - if err != nil { - return nil, fmt.Errorf("failed to fetch object id tags: %w", err) - } - idTags := map[string]string{} - for _, idTag := range idTagRows { - idTags[idTag.Tag] = idTag.Value - } - - var extraTagRows []*ExtraTagRow - err = db.SelectContext( - ctx, &extraTagRows, - db.Rebind(db.BuildSelectStmt(&ExtraTagRow{}, &ExtraTagRow{})+` WHERE "object_id" = ?`), id, - ) - if err != nil { - return nil, fmt.Errorf("failed to fetch object extra tags: %w", err) - } - extraTags := map[string]string{} - for _, extraTag := range extraTagRows { - extraTags[extraTag.Tag] = extraTag.Value - } - - obj := &Object{ - db: db, - ID: id, - Name: objectRow.Name, - URL: objectRow.URL.String, - Tags: idTags, - ExtraTags: extraTags, - } - cache[id.String()] = obj - - return obj, nil -} diff --git a/internal/object/object.go b/internal/object/object.go index 4b1ec42d5..f3abad2fc 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -24,28 +24,43 @@ var ( ) type Object struct { - ID types.Binary - SourceId int64 - Name string - Tags map[string]string - URL string + ID types.Binary `db:"id"` + SourceID int64 `db:"source_id"` + Name string `db:"name"` + URL types.String `db:"url"` + Tags map[string]string ExtraTags map[string]string db *icingadb.DB } -func NewObject(db *icingadb.DB, ev *event.Event) *Object { +// New creates a new object from the given event. +func New(db *icingadb.DB, ev *event.Event) *Object { return &Object{ - SourceId: ev.SourceId, + SourceID: ev.SourceId, Name: ev.Name, db: db, - URL: ev.URL, + URL: utils.ToDBString(ev.URL), Tags: ev.Tags, ExtraTags: ev.ExtraTags, } } +// Cache adds the given object to the global object cache store. +// This is only used when loading the incident objects at daemon startup before the listener becomes ready. +// Panics when the given object is already in the cache store. +func Cache(obj *Object) { + cacheMu.Lock() + defer cacheMu.Unlock() + + if obj, ok := cache[obj.ID.String()]; ok { + panic(fmt.Sprintf("Object %q is already in cache", obj.DisplayName())) + } + + cache[obj.ID.String()] = obj +} + // FromEvent creates an object from the provided event tags if it's not in the cache // and syncs all object related types with the database. // Returns error on any database failure @@ -57,7 +72,7 @@ func FromEvent(ctx context.Context, db *icingadb.DB, ev *event.Event) (*Object, object, ok := cache[id.String()] if !ok { - object = NewObject(db, ev) + object = New(db, ev) object.ID = id cache[id.String()] = object } @@ -68,15 +83,8 @@ func FromEvent(ctx context.Context, db *icingadb.DB, ev *event.Event) (*Object, } defer func() { _ = tx.Rollback() }() - dbObj := &ObjectRow{ - ID: object.ID, - SourceID: ev.SourceId, - Name: ev.Name, - URL: utils.ToDBString(ev.URL), - } - - stmt, _ := object.db.BuildUpsertStmt(&ObjectRow{}) - _, err = tx.NamedExecContext(ctx, stmt, dbObj) + stmt, _ := object.db.BuildUpsertStmt(&Object{}) + _, err = tx.NamedExecContext(ctx, stmt, object) if err != nil { return nil, fmt.Errorf("failed to insert object: %w", err) } @@ -105,11 +113,6 @@ func FromEvent(ctx context.Context, db *icingadb.DB, ev *event.Event) (*Object, return nil, fmt.Errorf("can't commit object database transaction: %w", err) } - object.ExtraTags = ev.ExtraTags - object.Tags = ev.Tags - object.Name = ev.Name - object.URL = ev.URL - return object, nil } @@ -137,9 +140,9 @@ func (o *Object) String() string { _, _ = fmt.Fprintf(&b, "\n") } - _, _ = fmt.Fprintf(&b, " Source %d:\n", o.SourceId) + _, _ = fmt.Fprintf(&b, " Source %d:\n", o.SourceID) _, _ = fmt.Fprintf(&b, " Name: %q\n", o.Name) - _, _ = fmt.Fprintf(&b, " URL: %q\n", o.URL) + _, _ = fmt.Fprintf(&b, " URL: %q\n", o.URL.String) _, _ = fmt.Fprintf(&b, " Extra Tags:\n") for tag, value := range o.ExtraTags {