From a244a637935eb8a058c0cdbd91d6eee086fd8754 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 25 Apr 2024 11:43:59 +0200 Subject: [PATCH 1/6] Add escalation/routing differentiation --- internal/config/evaluable_config.go | 10 +-- internal/config/evaluable_config_test.go | 26 +++---- internal/config/rule.go | 30 +++---- internal/config/runtime.go | 26 +++---- internal/config/verify.go | 56 ++++++------- internal/incident/db_types.go | 2 +- internal/incident/incident.go | 34 ++++---- internal/incident/incidents.go | 2 +- internal/incident/sync.go | 4 +- internal/rule/{escalation.go => entry.go} | 49 ++++++------ internal/rule/rule.go | 16 ++-- internal/rule/rule_type.go | 56 +++++++++++++ schema/mysql/schema.sql | 95 +++++++++++++---------- schema/mysql/upgrades/01.sql | 88 +++++++++++++++++++++ schema/pgsql/schema.sql | 84 ++++++++++++-------- schema/pgsql/upgrades/01.sql | 68 ++++++++++++++++ 16 files changed, 446 insertions(+), 200 deletions(-) rename internal/rule/{escalation.go => entry.go} (69%) create mode 100644 internal/rule/rule_type.go create mode 100644 schema/mysql/upgrades/01.sql create mode 100644 schema/pgsql/upgrades/01.sql diff --git a/internal/config/evaluable_config.go b/internal/config/evaluable_config.go index a75378e8..5c753e3c 100644 --- a/internal/config/evaluable_config.go +++ b/internal/config/evaluable_config.go @@ -40,15 +40,15 @@ type EvalOptions[T, U any] struct { // Evaluable manages an evaluable config types in a centralised and structured way. // An evaluable config is a config type that allows to evaluate filter expressions in some way. type Evaluable struct { - Rules map[int64]bool `db:"-"` - RuleEntries map[int64]*rule.Escalation `db:"-" json:"-"` + Rules map[int64]bool `db:"-"` + RuleEntries map[int64]*rule.Entry `db:"-" json:"-"` } // NewEvaluable returns a fully initialised and ready to use Evaluable type. func NewEvaluable() *Evaluable { return &Evaluable{ Rules: make(map[int64]bool), - RuleEntries: make(map[int64]*rule.Escalation), + RuleEntries: make(map[int64]*rule.Entry), } } @@ -95,7 +95,7 @@ func (e *Evaluable) EvaluateRules(r *RuntimeConfig, filterable filter.Filterable // possible special cases. // // Returns an error if any of the provided callbacks return an error, otherwise always nil. -func (e *Evaluable) EvaluateRuleEntries(r *RuntimeConfig, filterable filter.Filterable, options EvalOptions[*rule.Escalation, any]) error { +func (e *Evaluable) EvaluateRuleEntries(r *RuntimeConfig, filterable filter.Filterable, options EvalOptions[*rule.Entry, any]) error { retryAfter := rule.RetryNever for ruleID := range e.Rules { @@ -105,7 +105,7 @@ func (e *Evaluable) EvaluateRuleEntries(r *RuntimeConfig, filterable filter.Filt continue } - for _, entry := range ru.Escalations { + for _, entry := range ru.Entries { if options.OnPreEvaluate != nil && !options.OnPreEvaluate(entry) { continue } diff --git a/internal/config/evaluable_config_test.go b/internal/config/evaluable_config_test.go index ed8ee6e4..59a4a9a4 100644 --- a/internal/config/evaluable_config_test.go +++ b/internal/config/evaluable_config_test.go @@ -103,7 +103,7 @@ func TestEvaluableConfig(t *testing.T) { runtime.Rules = maps.Clone(runtimeConfigTest.Rules) e := NewEvaluable() - options := EvalOptions[*rule.Escalation, any]{} + options := EvalOptions[*rule.Entry, any]{} expectedLen := 0 filterContext := &rule.EscalationFilter{IncidentSeverity: 9} // Event severity "emergency" @@ -114,7 +114,7 @@ func TestEvaluableConfig(t *testing.T) { require.NoError(t, e.EvaluateRuleEntries(runtime, filterContext, options)) } require.Len(t, e.RuleEntries, *expectedLen) - e.RuleEntries = make(map[int64]*rule.Escalation) + e.RuleEntries = make(map[int64]*rule.Entry) } assertEntries(&expectedLen, false) @@ -125,7 +125,7 @@ func TestEvaluableConfig(t *testing.T) { // Drop some random rules from the runtime config to simulate a runtime config deletion! maps.DeleteFunc(runtime.Rules, func(ruleID int64, _ *rule.Rule) bool { return ruleID > 35 && ruleID%defaultDivisor == 0 }) - options.OnPreEvaluate = func(re *rule.Escalation) bool { + options.OnPreEvaluate = func(re *rule.Entry) bool { if re.RuleID > 35 && re.RuleID%defaultDivisor == 0 { // Those rules are deleted from our runtime config. require.Failf(t, "OnPreEvaluate() shouldn't have been called", "rule %d was deleted from runtime config", re.RuleID) } @@ -133,19 +133,19 @@ func TestEvaluableConfig(t *testing.T) { require.Nilf(t, e.RuleEntries[re.ID], "EvaluateRuleEntries() shouldn't evaluate entry %d twice", re.ID) return true } - options.OnError = func(re *rule.Escalation, err error) bool { + options.OnError = func(re *rule.Entry, err error) bool { require.EqualError(t, err, `unknown severity "evaluable"`) require.Truef(t, re.RuleID%defaultDivisor == 0, "evaluating rule entry %d should not fail", re.ID) return true } - options.OnFilterMatch = func(re *rule.Escalation) error { + options.OnFilterMatch = func(re *rule.Entry) error { require.Nilf(t, e.RuleEntries[re.ID], "OnPreEvaluate() shouldn't evaluate %d twice", re.ID) return nil } assertEntries(&expectedLen, false) lenBeforeError := new(int) - options.OnError = func(re *rule.Escalation, err error) bool { + options.OnError = func(re *rule.Entry, err error) bool { if *lenBeforeError != 0 { require.Fail(t, "OnError() shouldn't have been called again") } @@ -160,7 +160,7 @@ func TestEvaluableConfig(t *testing.T) { *lenBeforeError = 0 options.OnError = nil - options.OnFilterMatch = func(re *rule.Escalation) error { + options.OnFilterMatch = func(re *rule.Entry) error { if *lenBeforeError != 0 { require.Fail(t, "OnFilterMatch() shouldn't have been called again") } @@ -175,7 +175,7 @@ func TestEvaluableConfig(t *testing.T) { filterContext.IncidentAge = 5 * time.Minute options.OnFilterMatch = nil - options.OnPreEvaluate = func(re *rule.Escalation) bool { return re.RuleID < 5 } + options.OnPreEvaluate = func(re *rule.Entry) bool { return re.RuleID < 5 } options.OnAllConfigEvaluated = func(result any) { retryAfter := result.(time.Duration) // The filter string of the escalation condition is incident_age>=10m and the actual incident age is 5m. @@ -189,12 +189,12 @@ func makeRule(t *testing.T, i int) *rule.Rule { r := new(rule.Rule) r.ID = int64(i) r.Name = fmt.Sprintf("rule-%d", i) - r.Escalations = make(map[int64]*rule.Escalation) + r.Entries = make(map[int64]*rule.Entry) invalidSeverity, err := filter.Parse("incident_severity=evaluable") require.NoError(t, err, "parsing incident_severity=evaluable shouldn't fail") - redundant := new(rule.Escalation) + redundant := new(rule.Entry) redundant.ID = r.ID * 150 // It must be large enough to avoid colliding with others! redundant.RuleID = r.ID redundant.Condition = invalidSeverity @@ -202,7 +202,7 @@ func makeRule(t *testing.T, i int) *rule.Rule { nonexistent, err := filter.Parse("nonexistent=evaluable") require.NoError(t, err, "parsing nonexistent=evaluable shouldn't fail") - r.Escalations[redundant.ID] = redundant + r.Entries[redundant.ID] = redundant r.ObjectFilter = nonexistent if i%defaultDivisor == 0 { objCond, err := filter.Parse("host=evaluable") @@ -211,13 +211,13 @@ func makeRule(t *testing.T, i int) *rule.Rule { escalationCond, err := filter.Parse("incident_severity>warning||incident_age>=10m") require.NoError(t, err, "parsing incident_severity>=ok shouldn't fail") - entry := new(rule.Escalation) + entry := new(rule.Entry) entry.ID = r.ID * 2 entry.RuleID = r.ID entry.Condition = escalationCond r.ObjectFilter = objCond - r.Escalations[entry.ID] = entry + r.Entries[entry.ID] = entry } return r diff --git a/internal/config/rule.go b/internal/config/rule.go index 59b69c94..ee8cc023 100644 --- a/internal/config/rule.go +++ b/internal/config/rule.go @@ -20,7 +20,7 @@ func (r *RuntimeConfig) applyPendingRules() { newElement.TimePeriod = tp } - newElement.Escalations = make(map[int64]*rule.Escalation) + newElement.Entries = make(map[int64]*rule.Entry) return nil }, func(curElement, update *rule.Rule) error { @@ -48,17 +48,17 @@ func (r *RuntimeConfig) applyPendingRules() { incrementalApplyPending( r, - &r.ruleEscalations, &r.configChange.ruleEscalations, - func(newElement *rule.Escalation) error { + &r.ruleEntries, &r.configChange.ruleEntries, + func(newElement *rule.Entry) error { elementRule, ok := r.Rules[newElement.RuleID] if !ok { return fmt.Errorf("rule escalation refers unknown rule %d", newElement.RuleID) } - elementRule.Escalations[newElement.ID] = newElement + elementRule.Entries[newElement.ID] = newElement return nil }, - func(curElement, update *rule.Escalation) error { + func(curElement, update *rule.Entry) error { if curElement.RuleID != update.RuleID { return errRemoveAndAddInstead } @@ -72,42 +72,42 @@ func (r *RuntimeConfig) applyPendingRules() { return nil }, - func(delElement *rule.Escalation) error { + func(delElement *rule.Entry) error { elementRule, ok := r.Rules[delElement.RuleID] if !ok { return nil } - delete(elementRule.Escalations, delElement.ID) + delete(elementRule.Entries, delElement.ID) return nil }) incrementalApplyPending( r, - &r.ruleEscalationRecipients, &r.configChange.ruleEscalationRecipients, - func(newElement *rule.EscalationRecipient) error { + &r.ruleEntryRecipients, &r.configChange.ruleEntryRecipients, + func(newElement *rule.EntryRecipient) error { newElement.Recipient = r.GetRecipient(newElement.Key) if newElement.Recipient == nil { return fmt.Errorf("rule escalation recipient is missing or unknown") } - escalation := r.GetRuleEscalation(newElement.EscalationID) + escalation := r.GetRuleEntry(newElement.EntryID) if escalation == nil { - return fmt.Errorf("rule escalation recipient refers to unknown escalation %d", newElement.EscalationID) + return fmt.Errorf("rule escalation recipient refers to unknown escalation %d", newElement.EntryID) } escalation.Recipients = append(escalation.Recipients, newElement) return nil }, nil, - func(delElement *rule.EscalationRecipient) error { - escalation := r.GetRuleEscalation(delElement.EscalationID) + func(delElement *rule.EntryRecipient) error { + escalation := r.GetRuleEntry(delElement.EntryID) if escalation == nil { return nil } - escalation.Recipients = slices.DeleteFunc(escalation.Recipients, func(recipient *rule.EscalationRecipient) bool { - return recipient.EscalationID == delElement.EscalationID + escalation.Recipients = slices.DeleteFunc(escalation.Recipients, func(recipient *rule.EntryRecipient) bool { + return recipient.EntryID == delElement.EntryID }) return nil }) diff --git a/internal/config/runtime.go b/internal/config/runtime.go index a3d26781..34587960 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -73,12 +73,12 @@ type ConfigSet struct { // The following fields contain intermediate values, necessary for the incremental config synchronization. // Furthermore, they allow accessing intermediate tables as everything is referred by pointers. - groupMembers map[recipient.GroupMemberKey]*recipient.GroupMember - timePeriodEntries map[int64]*timeperiod.Entry - scheduleRotations map[int64]*recipient.Rotation - scheduleRotationMembers map[int64]*recipient.RotationMember - ruleEscalations map[int64]*rule.Escalation - ruleEscalationRecipients map[int64]*rule.EscalationRecipient + groupMembers map[recipient.GroupMemberKey]*recipient.GroupMember + timePeriodEntries map[int64]*timeperiod.Entry + scheduleRotations map[int64]*recipient.Rotation + scheduleRotationMembers map[int64]*recipient.RotationMember + ruleEntries map[int64]*rule.Entry + ruleEntryRecipients map[int64]*rule.EntryRecipient } func (r *RuntimeConfig) UpdateFromDatabase(ctx context.Context) error { @@ -156,13 +156,13 @@ func (r *RuntimeConfig) GetRecipient(k recipient.Key) recipient.Recipient { return nil } -// GetRuleEscalation returns a *rule.Escalation by the given id. -// Returns nil if there is no rule escalation with given id. -func (r *RuntimeConfig) GetRuleEscalation(escalationID int64) *rule.Escalation { +// GetRuleEntry returns a *rule.Entry by the given id. +// Returns nil if there is no rule entry with given id. +func (r *RuntimeConfig) GetRuleEntry(entryID int64) *rule.Entry { for _, r := range r.Rules { - escalation, ok := r.Escalations[escalationID] + entry, ok := r.Entries[entryID] if ok { - return escalation + return entry } } @@ -248,8 +248,8 @@ func (r *RuntimeConfig) fetchFromDatabase(ctx context.Context) error { func() error { return incrementalFetch(ctx, tx, r, &r.configChange.TimePeriods) }, func() error { return incrementalFetch(ctx, tx, r, &r.configChange.timePeriodEntries) }, func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Rules) }, - func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ruleEscalations) }, - func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ruleEscalationRecipients) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ruleEntries) }, + func() error { return incrementalFetch(ctx, tx, r, &r.configChange.ruleEntryRecipients) }, func() error { return incrementalFetch(ctx, tx, r, &r.configChange.Sources) }, } for _, f := range fetchFns { diff --git a/internal/config/verify.go b/internal/config/verify.go index 6f1107d3..dc2c8026 100644 --- a/internal/config/verify.go +++ b/internal/config/verify.go @@ -252,70 +252,70 @@ func (r *RuntimeConfig) debugVerifyRule(id int64, rule *rule.Rule) error { return fmt.Errorf("rule has a ObjectFilterExpr but ObjectFilter is nil") } - for escalationID, escalation := range rule.Escalations { - if escalation == nil { - return fmt.Errorf("Escalations[%d] is nil", escalationID) + for entryID, entry := range rule.Entries { + if entry == nil { + return fmt.Errorf("Entries[%d] is nil", entryID) } - if escalation.ID != escalationID { - return fmt.Errorf("Escalations[%d]: ecalation has ID %d but is referenced as %d", - escalationID, escalation.ID, escalationID) + if entry.ID != entryID { + return fmt.Errorf("Entries[%d]: ecalation has ID %d but is referenced as %d", + entryID, entry.ID, entryID) } - if escalation.RuleID != rule.ID { - return fmt.Errorf("Escalations[%d] (ID=%d) has RuleID = %d while being referenced from rule %d", - escalationID, escalation.ID, escalation.RuleID, rule.ID) + if entry.RuleID != rule.ID { + return fmt.Errorf("Entries[%d] (ID=%d) has RuleID = %d while being referenced from rule %d", + entryID, entry.ID, entry.RuleID, rule.ID) } - if escalation.ConditionExpr.Valid && escalation.Condition == nil { - return fmt.Errorf("Escalations[%d] (ID=%d) has ConditionExpr but Condition is nil", escalationID, escalation.ID) + if entry.ConditionExpr.Valid && entry.Condition == nil { + return fmt.Errorf("Entries[%d] (ID=%d) has ConditionExpr but Condition is nil", entryID, entry.ID) } // TODO: verify fallback - for i, escalationRecpient := range escalation.Recipients { - if escalationRecpient == nil { - return fmt.Errorf("Escalations[%d].Recipients[%d] is nil", escalationID, i) + for i, entryRecipient := range entry.Recipients { + if entryRecipient == nil { + return fmt.Errorf("Entries[%d].Recipients[%d] is nil", entryID, i) } - if escalationRecpient.EscalationID != escalation.ID { - return fmt.Errorf("Escalation[%d].Recipients[%d].EscalationID = %d does not match Escalations[%d].ID = %d", - escalationID, i, escalationRecpient.EscalationID, escalationID, escalation.ID) + if entryRecipient.EntryID != entry.ID { + return fmt.Errorf("Entry[%d].Recipients[%d].EntryID = %d does not match Entries[%d].ID = %d", + entryID, i, entryRecipient.EntryID, entryID, entry.ID) } - switch rec := escalationRecpient.Recipient.(type) { + switch rec := entryRecipient.Recipient.(type) { case *recipient.Contact: if rec == nil { - return fmt.Errorf("Escalations[%d].Recipients[%d].Recipient (Contact) is nil", escalationID, i) + return fmt.Errorf("Entries[%d].Recipients[%d].Recipient (Contact) is nil", entryID, i) } - err := r.debugVerifyContact(escalationRecpient.ContactID.Int64, rec) + err := r.debugVerifyContact(entryRecipient.ContactID.Int64, rec) if err != nil { - return fmt.Errorf("Escalations[%d].Recipients[%d].Recipient (Contact): %w", escalationID, i, err) + return fmt.Errorf("Entries[%d].Recipients[%d].Recipient (Contact): %w", entryID, i, err) } case *recipient.Group: if rec == nil { - return fmt.Errorf("Escalations[%d].Recipients[%d].Recipient (Group) is nil", escalationID, i) + return fmt.Errorf("Entries[%d].Recipients[%d].Recipient (Group) is nil", entryID, i) } - err := r.debugVerifyGroup(escalationRecpient.GroupID.Int64, rec) + err := r.debugVerifyGroup(entryRecipient.GroupID.Int64, rec) if err != nil { - return fmt.Errorf("Escalations[%d].Recipients[%d].Recipient (Group): %w", escalationID, i, err) + return fmt.Errorf("Entries[%d].Recipients[%d].Recipient (Group): %w", entryID, i, err) } case *recipient.Schedule: if rec == nil { - return fmt.Errorf("Escalations[%d].Recipients[%d].Recipient (Schedule) is nil", escalationID, i) + return fmt.Errorf("Entries[%d].Recipients[%d].Recipient (Schedule) is nil", entryID, i) } - err := r.debugVerifySchedule(escalationRecpient.ScheduleID.Int64, rec) + err := r.debugVerifySchedule(entryRecipient.ScheduleID.Int64, rec) if err != nil { - return fmt.Errorf("Escalations[%d].Recipients[%d].Recipient (Schedule): %w", escalationID, i, err) + return fmt.Errorf("Entries[%d].Recipients[%d].Recipient (Schedule): %w", entryID, i, err) } default: - return fmt.Errorf("Escalations[%d].Recipients[%d].Recipient has invalid type %T", escalationID, i, rec) + return fmt.Errorf("Entries[%d].Recipients[%d].Recipient has invalid type %T", entryID, i, rec) } } } diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index 594a3b39..d9224430 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -68,7 +68,7 @@ func (r *RuleRow) TableName() string { type HistoryRow struct { ID int64 `db:"id"` IncidentID int64 `db:"incident_id"` - RuleEscalationID types.Int `db:"rule_escalation_id"` + RuleEntryID types.Int `db:"rule_entry_id"` EventID types.Int `db:"event_id"` recipient.Key `db:",inline"` RuleID types.Int `db:"rule_id"` diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 002cad06..4600f871 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -170,7 +170,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { // Check if any (additional) rules match this object. Incident filter rules are stateful, which means that // once they have been matched, they remain effective for the ongoing incident and never need to be rechecked. err := i.EvaluateRules(i.runtimeConfig, i.Object, config.EvalOptions[*rule.Rule, any]{ - OnPreEvaluate: func(r *rule.Rule) bool { return true }, // This might change in the future! + OnPreEvaluate: func(r *rule.Rule) bool { return r.Type == rule.TypeEscalation }, OnFilterMatch: func(r *rule.Rule) error { return i.onFilterRuleMatch(ctx, r, tx, ev) }, OnError: func(r *rule.Rule, err error) bool { i.logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) @@ -185,7 +185,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { // Reset the evaluated escalations when leaving this function while holding the incident lock, // otherwise the pointers could be invalidated in the meantime and lead to unexpected behaviour. - defer func() { i.RuleEntries = make(map[int64]*rule.Escalation) }() + defer func() { i.RuleEntries = make(map[int64]*rule.Entry) }() // Re-evaluate escalations based on the newly evaluated rules. i.evaluateEscalations(ev.Time) @@ -242,7 +242,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { // Reset the evaluated escalations when leaving this function while holding the incident lock, // otherwise the pointers could be invalidated in the meantime and lead to unexpected behaviour. - defer func() { i.RuleEntries = make(map[int64]*rule.Escalation) }() + defer func() { i.RuleEntries = make(map[int64]*rule.Entry) }() i.evaluateEscalations(ev.Time) if len(i.RuleEntries) == 0 { @@ -268,7 +268,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { channels := make(rule.ContactChannels) for _, escalation := range i.RuleEntries { - channels.LoadFromEscalationRecipients(escalation, ev.Time, i.isRecipientNotifiable) + channels.LoadFromEntryRecipients(escalation, ev.Time, i.isRecipientNotifiable) } notifications, err = i.generateNotifications(ctx, tx, ev, channels) @@ -444,10 +444,10 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) { // EvaluateRuleEntries only returns an error if one of the provided callback hooks returns // an error or the OnError handler returns false, and since none of our callbacks return an // error nor false, we can safely discard the return value here. - _ = i.EvaluateRuleEntries(i.runtimeConfig, filterContext, config.EvalOptions[*rule.Escalation, any]{ + _ = i.EvaluateRuleEntries(i.runtimeConfig, filterContext, config.EvalOptions[*rule.Entry, any]{ // Prevent reevaluation of an already triggered escalation via the pre run hook. - OnPreEvaluate: func(escalation *rule.Escalation) bool { return i.EscalationState[escalation.ID] == nil }, - OnError: func(escalation *rule.Escalation, err error) bool { + OnPreEvaluate: func(escalation *rule.Entry) bool { return i.EscalationState[escalation.ID] == nil }, + OnError: func(escalation *rule.Entry, err error) bool { r := i.runtimeConfig.Rules[escalation.RuleID] i.logger.Warnw("Failed to evaluate escalation condition", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err)) @@ -501,12 +501,12 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even } hr := &HistoryRow{ - IncidentID: i.ID, - Time: state.TriggeredAt, - EventID: utils.ToDBInt(ev.ID), - RuleEscalationID: utils.ToDBInt(state.RuleEscalationID), - RuleID: utils.ToDBInt(r.ID), - Type: EscalationTriggered, + IncidentID: i.ID, + Time: state.TriggeredAt, + EventID: utils.ToDBInt(ev.ID), + RuleEntryID: utils.ToDBInt(state.RuleEscalationID), + RuleID: utils.ToDBInt(r.ID), + Type: EscalationTriggered, } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -682,13 +682,13 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { contactChs := make(rule.ContactChannels) // Load all escalations recipients channels for escalationID := range i.EscalationState { - escalation := i.runtimeConfig.GetRuleEscalation(escalationID) + escalation := i.runtimeConfig.GetRuleEntry(escalationID) if escalation == nil { i.logger.Debugw("Incident refers unknown escalation, might got deleted", zap.Int64("escalation_id", escalationID)) continue } - contactChs.LoadFromEscalationRecipients(escalation, t, i.isRecipientNotifiable) + contactChs.LoadFromEntryRecipients(escalation, t, i.isRecipientNotifiable) } // Check whether all the incident recipients do have an appropriate contact channel configured. @@ -758,13 +758,13 @@ func (i *Incident) isRecipientNotifiable(key recipient.Key) bool { type EscalationState struct { IncidentID int64 `db:"incident_id"` - RuleEscalationID int64 `db:"rule_escalation_id"` + RuleEscalationID int64 `db:"rule_entry_id"` TriggeredAt types.UnixMilli `db:"triggered_at"` } // TableName implements the contracts.TableNamer interface. func (e *EscalationState) TableName() string { - return "incident_rule_escalation_state" + return "incident_rule_entry_state" } type RecipientState struct { diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 56edfdc8..13b16ad5 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -101,7 +101,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log i.runtimeConfig.RLock() defer i.runtimeConfig.RUnlock() - escalation := i.runtimeConfig.GetRuleEscalation(state.RuleEscalationID) + escalation := i.runtimeConfig.GetRuleEntry(state.RuleEscalationID) if escalation != nil { i.Rules[escalation.RuleID] = true } diff --git a/internal/incident/sync.go b/internal/incident/sync.go index f712a029..e74495c1 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -62,9 +62,9 @@ func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) e return err } -// AddRecipient adds recipient from the given *rule.Escalation to this incident. +// AddRecipient adds recipient from the given *rule.Entry to this incident. // Syncs also all the recipients with the database and returns an error on db failure. -func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Escalation, eventId int64) error { +func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Entry, eventId int64) error { newRole := RoleRecipient if i.HasManager() { newRole = RoleSubscriber diff --git a/internal/rule/escalation.go b/internal/rule/entry.go similarity index 69% rename from internal/rule/escalation.go rename to internal/rule/entry.go index 66ce4e76..452be90a 100644 --- a/internal/rule/escalation.go +++ b/internal/rule/entry.go @@ -11,21 +11,20 @@ import ( "time" ) -type Escalation struct { +type Entry struct { baseconf.IncrementalPkDbEntry[int64] `db:",inline"` - - RuleID int64 `db:"rule_id"` - NameRaw sql.NullString `db:"name"` - Condition filter.Filter `db:"-"` - ConditionExpr sql.NullString `db:"condition"` - FallbackForID sql.NullInt64 `db:"fallback_for"` - Fallbacks []*Escalation `db:"-"` - - Recipients []*EscalationRecipient `db:"-"` + RuleID int64 `db:"rule_id"` + NameRaw sql.NullString `db:"name"` + Condition filter.Filter `db:"-"` + ConditionExpr sql.NullString `db:"condition"` + FallbackForID sql.NullInt64 `db:"fallback_for"` + + Fallbacks []*Entry `db:"-"` + Recipients []*EntryRecipient `db:"-"` } // IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. -func (e *Escalation) IncrementalInitAndValidate() error { +func (e *Entry) IncrementalInitAndValidate() error { if e.ConditionExpr.Valid { cond, err := filter.Parse(e.ConditionExpr.String) if err != nil { @@ -48,7 +47,7 @@ func (e *Escalation) IncrementalInitAndValidate() error { // This allows us to use `zap.Inline(escalation)` or `zap.Object("rule_escalation", escalation)` wherever // fine-grained logging context is needed, without having to add all the individual fields ourselves each time. // https://pkg.go.dev/go.uber.org/zap/zapcore#ObjectMarshaler -func (e *Escalation) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (e *Entry) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", e.ID) encoder.AddInt64("rule_id", e.RuleID) encoder.AddString("name", e.DisplayName()) @@ -63,9 +62,9 @@ func (e *Escalation) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } -// Eval evaluates the configured escalation filter for the provided filter. -// Returns always true if there are no configured escalation conditions. -func (e *Escalation) Eval(filterable filter.Filterable) (bool, error) { +// Eval evaluates the configured escalation/routing filter for the provided filter. +// Returns always true if there are no configured escalation/routing conditions. +func (e *Entry) Eval(filterable filter.Filterable) (bool, error) { if e.Condition == nil { return true, nil } @@ -73,7 +72,7 @@ func (e *Escalation) Eval(filterable filter.Filterable) (bool, error) { return e.Condition.Eval(filterable) } -func (e *Escalation) DisplayName() string { +func (e *Entry) DisplayName() string { if e.NameRaw.Valid && e.NameRaw.String != "" { return e.NameRaw.String } @@ -98,7 +97,7 @@ func (e *Escalation) DisplayName() string { return strings.Join(recipients, ", ") } -func (e *Escalation) GetContactsAt(t time.Time) []ContactChannelPair { +func (e *Entry) GetContactsAt(t time.Time) []ContactChannelPair { var pairs []ContactChannelPair for _, r := range e.Recipients { @@ -110,31 +109,31 @@ func (e *Escalation) GetContactsAt(t time.Time) []ContactChannelPair { return pairs } -func (e *Escalation) TableName() string { - return "rule_escalation" +func (e *Entry) TableName() string { + return "rule_entry" } -type EscalationRecipient struct { +type EntryRecipient struct { baseconf.IncrementalPkDbEntry[int64] `db:",inline"` - EscalationID int64 `db:"rule_escalation_id"` + EntryID int64 `db:"rule_entry_id"` ChannelID sql.NullInt64 `db:"channel_id"` recipient.Key `db:",inline"` Recipient recipient.Recipient `db:"-"` } // MarshalLogObject implements the zapcore.ObjectMarshaler interface. -func (r *EscalationRecipient) MarshalLogObject(encoder zapcore.ObjectEncoder) error { +func (r *EntryRecipient) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", r.ID) - encoder.AddInt64("rule_escalation_id", r.EscalationID) + encoder.AddInt64("rule_escalation_id", r.EntryID) if r.ChannelID.Valid { encoder.AddInt64("channel_id", r.ChannelID.Int64) } return r.Key.MarshalLogObject(encoder) } -func (r *EscalationRecipient) TableName() string { - return "rule_escalation_recipient" +func (r *EntryRecipient) TableName() string { + return "rule_entry_recipient" } type ContactChannelPair struct { diff --git a/internal/rule/rule.go b/internal/rule/rule.go index 8b1fbcef..adda542a 100644 --- a/internal/rule/rule.go +++ b/internal/rule/rule.go @@ -14,11 +14,12 @@ type Rule struct { baseconf.IncrementalPkDbEntry[int64] `db:",inline"` Name string `db:"name"` + Type Type `db:"type"` TimePeriod *timeperiod.TimePeriod `db:"-"` TimePeriodID types.Int `db:"timeperiod_id"` ObjectFilter filter.Filter `db:"-"` ObjectFilterExpr types.String `db:"object_filter"` - Escalations map[int64]*Escalation `db:"-"` + Entries map[int64]*Entry `db:"-"` } // IncrementalInitAndValidate implements the config.IncrementalConfigurableInitAndValidatable interface. @@ -39,6 +40,7 @@ func (r *Rule) IncrementalInitAndValidate() error { func (r *Rule) MarshalLogObject(encoder zapcore.ObjectEncoder) error { encoder.AddInt64("id", r.ID) encoder.AddString("name", r.Name) + encoder.AddString("type", r.Type.String()) if r.TimePeriodID.Valid && r.TimePeriodID.Int64 != 0 { encoder.AddInt64("timeperiod_id", r.TimePeriodID.Int64) @@ -63,19 +65,19 @@ func (r *Rule) Eval(filterable filter.Filterable) (bool, error) { // ContactChannels stores a set of channel IDs for each set of individual contacts. type ContactChannels map[*recipient.Contact]map[int64]bool -// LoadFromEscalationRecipients loads recipients channel of the specified escalation to the current map. +// LoadFromEntryRecipients loads recipients channel of the specified rule.Entry to the current map. // You can provide this method a callback to control whether the channel of a specific contact should // be loaded, and it will skip those for whom the callback returns false. Pass AlwaysNotifiable for default actions. -func (ch ContactChannels) LoadFromEscalationRecipients(escalation *Escalation, t time.Time, isNotifiable func(recipient.Key) bool) { - for _, escalationRecipient := range escalation.Recipients { - ch.LoadRecipientChannel(escalationRecipient, t, isNotifiable) +func (ch ContactChannels) LoadFromEntryRecipients(entry *Entry, t time.Time, isNotifiable func(recipient.Key) bool) { + for _, entryRecipient := range entry.Recipients { + ch.LoadRecipientChannel(entryRecipient, t, isNotifiable) } } // LoadRecipientChannel loads recipient channel to the current map. // You can provide this method a callback to control whether the channel of a specific contact should // be loaded, and it will skip those for whom the callback returns false. Pass AlwaysNotifiable for default actions. -func (ch ContactChannels) LoadRecipientChannel(er *EscalationRecipient, t time.Time, isNotifiable func(recipient.Key) bool) { +func (ch ContactChannels) LoadRecipientChannel(er *EntryRecipient, t time.Time, isNotifiable func(recipient.Key) bool) { if isNotifiable(er.Key) { for _, c := range er.Recipient.GetContactsAt(t) { if ch[c] == nil { @@ -91,7 +93,7 @@ func (ch ContactChannels) LoadRecipientChannel(er *EscalationRecipient, t time.T } // AlwaysNotifiable (checks) whether the given recipient is notifiable and returns always true. -// This function is usually passed as an argument to ContactChannels.LoadFromEscalationRecipients whenever you do +// This function is usually passed as an argument to ContactChannels.LoadFromEntryRecipients whenever you do // not want to perform any custom actions. func AlwaysNotifiable(_ recipient.Key) bool { return true diff --git a/internal/rule/rule_type.go b/internal/rule/rule_type.go new file mode 100644 index 00000000..60ed0cf3 --- /dev/null +++ b/internal/rule/rule_type.go @@ -0,0 +1,56 @@ +package rule + +import ( + "database/sql/driver" + "fmt" +) + +type Type int + +const ( + TypeEscalation Type = iota + TypeRouting +) + +var typeByName = map[string]Type{ + "escalation": TypeEscalation, + "routing": TypeRouting, +} + +var typeToName = func() map[Type]string { + types := make(map[Type]string) + for name, eventType := range typeByName { + types[eventType] = name + } + return types +}() + +// Scan implements the sql.Scanner interface. +func (t *Type) Scan(src any) error { + var name string + switch val := src.(type) { + case string: + name = val + case []byte: + name = string(val) + default: + return fmt.Errorf("unable to scan type %T into rule.Type", src) + } + + ruleType, ok := typeByName[name] + if !ok { + return fmt.Errorf("unknown rule type %q", name) + } + + *t = ruleType + + return nil +} + +func (t Type) Value() (driver.Value, error) { + return t.String(), nil +} + +func (t *Type) String() string { + return typeToName[*t] +} diff --git a/schema/mysql/schema.sql b/schema/mysql/schema.sql index 27f1ef5c..31fc58e0 100644 --- a/schema/mysql/schema.sql +++ b/schema/mysql/schema.sql @@ -291,6 +291,7 @@ CREATE TABLE rule ( name text NOT NULL COLLATE utf8mb4_unicode_ci, timeperiod_id bigint, object_filter text, + type enum('escalation', 'routing') NOT NULL, changed_at bigint NOT NULL, deleted enum('n', 'y') NOT NULL DEFAULT 'n', @@ -301,7 +302,7 @@ CREATE TABLE rule ( CREATE INDEX idx_rule_changed_at ON rule(changed_at); -CREATE TABLE rule_escalation ( +CREATE TABLE rule_entry ( id bigint NOT NULL AUTO_INCREMENT, rule_id bigint NOT NULL, position integer, @@ -312,23 +313,23 @@ CREATE TABLE rule_escalation ( changed_at bigint NOT NULL, deleted enum('n', 'y') NOT NULL DEFAULT 'n', - CONSTRAINT pk_rule_escalation PRIMARY KEY (id), + CONSTRAINT pk_rule_entry PRIMARY KEY (id), -- Each position in an escalation can only be used once. -- Column position must be NULLed for deletion via "deleted = 'y'" - CONSTRAINT uk_rule_escalation_rule_id_position UNIQUE (rule_id, position), + CONSTRAINT uk_rule_entry_rule_id_position UNIQUE (rule_id, position), - CONSTRAINT ck_rule_escalation_not_both_condition_and_fallback_for CHECK (NOT (`condition` IS NOT NULL AND fallback_for IS NOT NULL)), - CONSTRAINT ck_rule_escalation_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL), - CONSTRAINT fk_rule_escalation_rule FOREIGN KEY (rule_id) REFERENCES rule(id), - CONSTRAINT fk_rule_escalation_rule_escalation FOREIGN KEY (fallback_for) REFERENCES rule_escalation(id) + CONSTRAINT ck_rule_entry_not_both_condition_and_fallback_for CHECK (NOT (`condition` IS NOT NULL AND fallback_for IS NOT NULL)), + CONSTRAINT ck_rule_entry_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL), + CONSTRAINT fk_rule_entry_rule FOREIGN KEY (rule_id) REFERENCES rule(id), + CONSTRAINT fk_rule_entry_rule_entry FOREIGN KEY (fallback_for) REFERENCES rule_entry(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE INDEX idx_rule_escalation_changed_at ON rule_escalation(changed_at); +CREATE INDEX idx_rule_entry_changed_at ON rule_entry(changed_at); -CREATE TABLE rule_escalation_recipient ( +CREATE TABLE rule_entry_recipient ( id bigint NOT NULL AUTO_INCREMENT, - rule_escalation_id bigint NOT NULL, + rule_entry_id bigint NOT NULL, contact_id bigint, contactgroup_id bigint, schedule_id bigint, @@ -337,16 +338,16 @@ CREATE TABLE rule_escalation_recipient ( changed_at bigint NOT NULL, deleted enum('n', 'y') NOT NULL DEFAULT 'n', - CONSTRAINT pk_rule_escalation_recipient PRIMARY KEY (id), - CONSTRAINT ck_rule_escalation_recipient_has_exactly_one_recipient CHECK (if(contact_id IS NULL, 0, 1) + if(contactgroup_id IS NULL, 0, 1) + if(schedule_id IS NULL, 0, 1) = 1), - CONSTRAINT fk_rule_escalation_recipient_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id), - CONSTRAINT fk_rule_escalation_recipient_contact FOREIGN KEY (contact_id) REFERENCES contact(id), - CONSTRAINT fk_rule_escalation_recipient_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), - CONSTRAINT fk_rule_escalation_recipient_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), - CONSTRAINT fk_rule_escalation_recipient_channel FOREIGN KEY (channel_id) REFERENCES channel(id) + CONSTRAINT pk_rule_entry_recipient PRIMARY KEY (id), + CONSTRAINT ck_rule_entry_recipient_has_exactly_one_recipient CHECK (if(contact_id IS NULL, 0, 1) + if(contactgroup_id IS NULL, 0, 1) + if(schedule_id IS NULL, 0, 1) = 1), + CONSTRAINT fk_rule_entry_recipient_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), + CONSTRAINT fk_rule_entry_recipient_contact FOREIGN KEY (contact_id) REFERENCES contact(id), + CONSTRAINT fk_rule_entry_recipient_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), + CONSTRAINT fk_rule_entry_recipient_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), + CONSTRAINT fk_rule_entry_recipient_channel FOREIGN KEY (channel_id) REFERENCES channel(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE INDEX idx_rule_escalation_recipient_changed_at ON rule_escalation_recipient(changed_at); +CREATE INDEX idx_rule_entry_recipient_changed_at ON rule_entry_recipient(changed_at); CREATE TABLE incident ( id bigint NOT NULL AUTO_INCREMENT, @@ -361,15 +362,6 @@ CREATE TABLE incident ( CONSTRAINT fk_incident_object FOREIGN KEY (object_id) REFERENCES object(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE incident_event ( - incident_id bigint NOT NULL, - event_id bigint NOT NULL, - - CONSTRAINT pk_incident_event PRIMARY KEY (incident_id, event_id), - CONSTRAINT fk_incident_event_incident FOREIGN KEY (incident_id) REFERENCES incident(id), - CONSTRAINT fk_incident_event_event FOREIGN KEY (event_id) REFERENCES event(id) -) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; - CREATE TABLE incident_contact ( incident_id bigint NOT NULL, contact_id bigint, @@ -398,28 +390,50 @@ CREATE TABLE incident_rule ( CONSTRAINT fk_incident_rule_rule FOREIGN KEY (rule_id) REFERENCES rule(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE incident_rule_escalation_state ( +CREATE TABLE incident_rule_entry_state ( incident_id bigint NOT NULL, - rule_escalation_id bigint NOT NULL, + rule_entry_id bigint NOT NULL, triggered_at bigint NOT NULL, - CONSTRAINT pk_incident_rule_escalation_state PRIMARY KEY (incident_id, rule_escalation_id), - CONSTRAINT fk_incident_rule_escalation_state_incident FOREIGN KEY (incident_id) REFERENCES incident(id), - CONSTRAINT fk_incident_rule_escalation_state_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id) + CONSTRAINT pk_incident_rule_entry_state PRIMARY KEY (incident_id, rule_entry_id), + CONSTRAINT fk_incident_rule_entry_state_incident FOREIGN KEY (incident_id) REFERENCES incident(id), + CONSTRAINT fk_incident_rule_entry_state_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; -CREATE TABLE incident_history ( +CREATE TABLE notification_history ( id bigint NOT NULL AUTO_INCREMENT, + incident_id bigint, + rule_entry_id bigint, + contact_id bigint, + contactgroup_id bigint, + schedule_id bigint, + channel_id bigint, + time bigint NOT NULL, + notification_state enum('suppressed', 'pending', 'sent', 'failed'), + sent_at bigint, + message mediumtext, + + CONSTRAINT pk_notification_history PRIMARY KEY (id), + CONSTRAINT fk_notification_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id), + CONSTRAINT fk_notification_history_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), + CONSTRAINT fk_notification_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id), + CONSTRAINT fk_notification_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), + CONSTRAINT fk_notification_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), + CONSTRAINT fk_notification_history_channel FOREIGN KEY (channel_id) REFERENCES channel(id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; + +CREATE TABLE incident_history ( + id bigint NOT NULL AUTO_INCREMENT PRIMARY KEY, incident_id bigint NOT NULL, - rule_escalation_id bigint, + rule_entry_id bigint, event_id bigint, contact_id bigint, contactgroup_id bigint, schedule_id bigint, rule_id bigint, - channel_id bigint, + notification_history_id bigint, time bigint NOT NULL, - message mediumtext, + message mediumtext, -- Contains the (un)mute reason of an incident and is only set if type is also set to `mute`. -- Order to be honored for events with identical millisecond timestamps. -- NOT NULL is enforced via CHECK not to default to 'opened' type enum('opened', 'muted', 'unmuted', 'incident_severity_changed', 'rule_matched', 'escalation_triggered', 'recipient_role_changed', 'closed', 'notified'), @@ -427,20 +441,17 @@ CREATE TABLE incident_history ( old_severity enum('ok', 'debug', 'info', 'notice', 'warning', 'err', 'crit', 'alert', 'emerg'), new_recipient_role enum('recipient', 'subscriber', 'manager'), old_recipient_role enum('recipient', 'subscriber', 'manager'), - notification_state enum('suppressed', 'pending', 'sent', 'failed'), - sent_at bigint, - CONSTRAINT pk_incident_history PRIMARY KEY (id), CONSTRAINT ck_incident_history_type_notnull CHECK (type IS NOT NULL), - CONSTRAINT fk_incident_history_incident_rule_escalation_state FOREIGN KEY (incident_id, rule_escalation_id) REFERENCES incident_rule_escalation_state(incident_id, rule_escalation_id), + CONSTRAINT fk_incident_history_incident_rule_entry_state FOREIGN KEY (incident_id, rule_entry_id) REFERENCES incident_rule_entry_state(incident_id, rule_entry_id), CONSTRAINT fk_incident_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id), - CONSTRAINT fk_incident_history_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id), + CONSTRAINT fk_incident_history_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), CONSTRAINT fk_incident_history_event FOREIGN KEY (event_id) REFERENCES event(id), CONSTRAINT fk_incident_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id), CONSTRAINT fk_incident_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), CONSTRAINT fk_incident_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), CONSTRAINT fk_incident_history_rule FOREIGN KEY (rule_id) REFERENCES rule(id), - CONSTRAINT fk_incident_history_channel FOREIGN KEY (channel_id) REFERENCES channel(id) + CONSTRAINT fk_incident_history_notification_history FOREIGN KEY (notification_history_id) REFERENCES notification_history(id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin; CREATE INDEX idx_incident_history_time_type ON incident_history(time, type) COMMENT 'Incident History ordered by time/type'; diff --git a/schema/mysql/upgrades/01.sql b/schema/mysql/upgrades/01.sql new file mode 100644 index 00000000..7a63c0e9 --- /dev/null +++ b/schema/mysql/upgrades/01.sql @@ -0,0 +1,88 @@ +ALTER TABLE rule ADD COLUMN type enum('escalation', 'routing'); +UPDATE rule SET type = 'escalation'; +ALTER TABLE rule MODIFY COLUMN type enum('escalation', 'routing') NOT NULL; + +ALTER TABLE rule_escalation RENAME TO rule_entry; +ALTER TABLE rule_entry + DROP CONSTRAINT uk_rule_escalation_rule_id_position, + DROP CONSTRAINT ck_rule_escalation_not_both_condition_and_fallback_for, + DROP CONSTRAINT ck_rule_escalation_non_deleted_needs_position, + DROP CONSTRAINT fk_rule_escalation_rule, + DROP CONSTRAINT fk_rule_escalation_rule_escalation, + DROP INDEX idx_rule_escalation_changed_at; + +ALTER TABLE rule_entry + ADD CONSTRAINT uk_rule_entry_rule_id_position UNIQUE (rule_id, position), + ADD CONSTRAINT ck_rule_entry_not_both_condition_and_fallback_for CHECK (NOT (`condition` IS NOT NULL AND fallback_for IS NOT NULL)), + ADD CONSTRAINT ck_rule_entry_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL), + ADD CONSTRAINT fk_rule_entry_rule FOREIGN KEY (rule_id) REFERENCES rule(id), + ADD CONSTRAINT fk_rule_entry_rule_entry FOREIGN KEY (fallback_for) REFERENCES rule_entry(id), + ADD INDEX idx_rule_entry_changed_at (changed_at); + +ALTER TABLE rule_escalation_recipient RENAME TO rule_entry_recipient; +ALTER TABLE rule_entry_recipient RENAME COLUMN rule_escalation_id TO rule_entry_id; +ALTER TABLE rule_entry_recipient + DROP CONSTRAINT ck_rule_escalation_recipient_has_exactly_one_recipient, + DROP CONSTRAINT fk_rule_escalation_recipient_rule_escalation, + DROP CONSTRAINT fk_rule_escalation_recipient_contact, + DROP CONSTRAINT fk_rule_escalation_recipient_contactgroup, + DROP CONSTRAINT fk_rule_escalation_recipient_schedule, + DROP CONSTRAINT fk_rule_escalation_recipient_channel, + DROP INDEX idx_rule_escalation_recipient_changed_at; + +ALTER TABLE rule_entry_recipient + ADD CONSTRAINT ck_rule_entry_recipient_has_exactly_one_recipient CHECK (if(contact_id IS NULL, 0, 1) + if(contactgroup_id IS NULL, 0, 1) + if(schedule_id IS NULL, 0, 1) = 1), + ADD CONSTRAINT fk_rule_entry_recipient_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), + ADD CONSTRAINT fk_rule_entry_recipient_contact FOREIGN KEY (contact_id) REFERENCES contact(id), + ADD CONSTRAINT fk_rule_entry_recipient_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), + ADD CONSTRAINT fk_rule_entry_recipient_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), + ADD CONSTRAINT fk_rule_entry_recipient_channel FOREIGN KEY (channel_id) REFERENCES channel(id), + ADD INDEX idx_rule_entry_recipient_changed_at (changed_at); + +ALTER TABLE incident_rule_escalation_state RENAME TO incident_rule_entry_state; +ALTER TABLE incident_rule_entry_state RENAME COLUMN rule_escalation_id TO rule_entry_id; +ALTER TABLE incident_rule_entry_state + DROP CONSTRAINT fk_incident_rule_escalation_state_incident, + DROP CONSTRAINT fk_incident_rule_escalation_state_rule_escalation; + +ALTER TABLE incident_rule_entry_state + Add CONSTRAINT fk_incident_rule_entry_state_incident FOREIGN KEY (incident_id) REFERENCES incident(id), + Add CONSTRAINT fk_incident_rule_entry_state_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id); + +CREATE TABLE notification_history ( + id bigint NOT NULL AUTO_INCREMENT PRIMARY KEY, + incident_id bigint, + rule_entry_id bigint, + contact_id bigint, + contactgroup_id bigint, + schedule_id bigint, + channel_id bigint, + time bigint NOT NULL, + notification_state enum('suppressed', 'pending', 'sent', 'failed'), + sent_at bigint, + message mediumtext, + + CONSTRAINT fk_notification_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id), + CONSTRAINT fk_notification_history_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), + CONSTRAINT fk_notification_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id), + CONSTRAINT fk_notification_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), + CONSTRAINT fk_notification_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), + CONSTRAINT fk_notification_history_channel FOREIGN KEY (channel_id) REFERENCES channel(id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;; + +ALTER TABLE incident_history RENAME COLUMN rule_escalation_id TO rule_entry_id; +ALTER TABLE incident_history + ADD COLUMN notification_history_id bigint AFTER rule_id, + DROP CONSTRAINT fk_incident_history_channel, + DROP COLUMN channel_id, + DROP COLUMN notification_state, + DROP COLUMN sent_at, + DROP CONSTRAINT fk_incident_history_incident_rule_escalation_state, + DROP CONSTRAINT fk_incident_history_rule_escalation; + +ALTER TABLE incident_history + Add CONSTRAINT fk_incident_history_incident_rule_entry_state FOREIGN KEY (incident_id, rule_entry_id) REFERENCES incident_rule_entry_state(incident_id, rule_entry_id), + Add CONSTRAINT fk_incident_history_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), + ADD CONSTRAINT fk_incident_history_notification_history FOREIGN KEY (notification_history_id) REFERENCES notification_history(id); + +DROP TABLE incident_event; diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index 5ddb12e7..69e0e3fb 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -15,6 +15,7 @@ CREATE TYPE incident_history_event_type AS ENUM ( ); CREATE TYPE rotation_type AS ENUM ( '24-7', 'partial', 'multi' ); CREATE TYPE notification_state_type AS ENUM ( 'suppressed', 'pending', 'sent', 'failed' ); +CREATE TYPE rule_type AS ENUM ('escalation', 'routing'); -- IPL ORM renders SQL queries with LIKE operators for all suggestions in the search bar, -- which fails for numeric and enum types on PostgreSQL. Just like in Icinga DB Web. @@ -338,6 +339,7 @@ CREATE TABLE rule ( name citext NOT NULL, timeperiod_id bigint, object_filter text, + type rule_type NOT NULL, changed_at bigint NOT NULL, deleted boolenum NOT NULL DEFAULT 'n', @@ -348,7 +350,7 @@ CREATE TABLE rule ( CREATE INDEX idx_rule_changed_at ON rule(changed_at); -CREATE TABLE rule_escalation ( +CREATE TABLE rule_entry ( id bigserial, rule_id bigint NOT NULL, position integer, @@ -359,23 +361,23 @@ CREATE TABLE rule_escalation ( changed_at bigint NOT NULL, deleted boolenum NOT NULL DEFAULT 'n', - CONSTRAINT pk_rule_escalation PRIMARY KEY (id), + CONSTRAINT pk_rule_entry PRIMARY KEY (id), -- Each position in an escalation can only be used once. -- Column position must be NULLed for deletion via "deleted = 'y'" - CONSTRAINT uk_rule_escalation_rule_id_position UNIQUE (rule_id, position), + CONSTRAINT uk_rule_entry_rule_id_position UNIQUE (rule_id, position), - CONSTRAINT ck_rule_escalation_not_both_condition_and_fallback_for CHECK (NOT (condition IS NOT NULL AND fallback_for IS NOT NULL)), - CONSTRAINT ck_rule_escalation_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL), - CONSTRAINT fk_rule_escalation_rule FOREIGN KEY (rule_id) REFERENCES rule(id), - CONSTRAINT fk_rule_escalation_rule_escalation FOREIGN KEY (fallback_for) REFERENCES rule_escalation(id) + CONSTRAINT ck_rule_entry_not_both_condition_and_fallback_for CHECK (NOT (condition IS NOT NULL AND fallback_for IS NOT NULL)), + CONSTRAINT ck_rule_entry_non_deleted_needs_position CHECK (deleted = 'y' OR position IS NOT NULL), + CONSTRAINT fk_rule_entry_rule FOREIGN KEY (rule_id) REFERENCES rule(id), + CONSTRAINT fk_rule_entry_rule_entry FOREIGN KEY (fallback_for) REFERENCES rule_entry(id) ); -CREATE INDEX idx_rule_escalation_changed_at ON rule_escalation(changed_at); +CREATE INDEX idx_rule_entry_changed_at ON rule_entry(changed_at); -CREATE TABLE rule_escalation_recipient ( +CREATE TABLE rule_entry_recipient ( id bigserial, - rule_escalation_id bigint NOT NULL, + rule_entry_id bigint NOT NULL, contact_id bigint, contactgroup_id bigint, schedule_id bigint, @@ -384,16 +386,16 @@ CREATE TABLE rule_escalation_recipient ( changed_at bigint NOT NULL, deleted boolenum NOT NULL DEFAULT 'n', - CONSTRAINT pk_rule_escalation_recipient PRIMARY KEY (id), - CONSTRAINT ck_rule_escalation_recipient_has_exactly_one_recipient CHECK (num_nonnulls(contact_id, contactgroup_id, schedule_id) = 1), - CONSTRAINT fk_rule_escalation_recipient_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id), - CONSTRAINT fk_rule_escalation_recipient_contact FOREIGN KEY (contact_id) REFERENCES contact(id), - CONSTRAINT fk_rule_escalation_recipient_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), - CONSTRAINT fk_rule_escalation_recipient_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), - CONSTRAINT fk_rule_escalation_recipient_channel FOREIGN KEY (channel_id) REFERENCES channel(id) + CONSTRAINT pk_rule_entry_recipient PRIMARY KEY (id), + CONSTRAINT ck_rule_entry_recipient_has_exactly_one_recipient CHECK (num_nonnulls(contact_id, contactgroup_id, schedule_id) = 1), + CONSTRAINT fk_rule_entry_recipient_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), + CONSTRAINT fk_rule_entry_recipient_contact FOREIGN KEY (contact_id) REFERENCES contact(id), + CONSTRAINT fk_rule_entry_recipient_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), + CONSTRAINT fk_rule_entry_recipient_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), + CONSTRAINT fk_rule_entry_recipient_channel FOREIGN KEY (channel_id) REFERENCES channel(id) ); -CREATE INDEX idx_rule_escalation_recipient_changed_at ON rule_escalation_recipient(changed_at); +CREATE INDEX idx_rule_entry_recipient_changed_at ON rule_entry_recipient(changed_at); CREATE TABLE incident ( id bigserial, @@ -445,46 +447,66 @@ CREATE TABLE incident_rule ( CONSTRAINT fk_incident_rule_rule FOREIGN KEY (rule_id) REFERENCES rule(id) ); -CREATE TABLE incident_rule_escalation_state ( +CREATE TABLE incident_rule_entry_state ( incident_id bigint NOT NULL, - rule_escalation_id bigint NOT NULL, + rule_entry_id bigint NOT NULL, triggered_at bigint NOT NULL, - CONSTRAINT pk_incident_rule_escalation_state PRIMARY KEY (incident_id, rule_escalation_id), - CONSTRAINT fk_incident_rule_escalation_state_incident FOREIGN KEY (incident_id) REFERENCES incident(id), - CONSTRAINT fk_incident_rule_escalation_state_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id) + CONSTRAINT pk_incident_rule_entry_state PRIMARY KEY (incident_id, rule_entry_id), + CONSTRAINT fk_incident_rule_entry_state_incident FOREIGN KEY (incident_id) REFERENCES incident(id), + CONSTRAINT fk_incident_rule_entry_state_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id) +); + +CREATE TABLE notification_history ( + id bigserial, + incident_id bigint, + rule_entry_id bigint, + contact_id bigint, + contactgroup_id bigint, + schedule_id bigint, + channel_id bigint, + time bigint NOT NULL, + notification_state notification_state_type, + sent_at bigint, + message text, + + CONSTRAINT pk_notification_history PRIMARY KEY (id), + CONSTRAINT fk_notification_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id), + CONSTRAINT fk_notification_history_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), + CONSTRAINT fk_notification_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id), + CONSTRAINT fk_notification_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), + CONSTRAINT fk_notification_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), + CONSTRAINT fk_notification_history_channel FOREIGN KEY (channel_id) REFERENCES channel(id) ); CREATE TABLE incident_history ( id bigserial, incident_id bigint NOT NULL, - rule_escalation_id bigint, + rule_entry_id bigint, event_id bigint, contact_id bigint, contactgroup_id bigint, schedule_id bigint, rule_id bigint, - channel_id bigint, + notification_history_id bigint, time bigint NOT NULL, - message text, + message text, -- Contains the (un)mute reason of an incident and is only set if type is also set to `mute`. type incident_history_event_type NOT NULL, new_severity severity, old_severity severity, new_recipient_role incident_contact_role, old_recipient_role incident_contact_role, - notification_state notification_state_type, - sent_at bigint, CONSTRAINT pk_incident_history PRIMARY KEY (id), - CONSTRAINT fk_incident_history_incident_rule_escalation_state FOREIGN KEY (incident_id, rule_escalation_id) REFERENCES incident_rule_escalation_state(incident_id, rule_escalation_id), + CONSTRAINT fk_incident_history_incident_rule_entry_state FOREIGN KEY (incident_id, rule_entry_id) REFERENCES incident_rule_entry_state(incident_id, rule_entry_id), CONSTRAINT fk_incident_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id), - CONSTRAINT fk_incident_history_rule_escalation FOREIGN KEY (rule_escalation_id) REFERENCES rule_escalation(id), + CONSTRAINT fk_incident_history_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), CONSTRAINT fk_incident_history_event FOREIGN KEY (event_id) REFERENCES event(id), CONSTRAINT fk_incident_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id), CONSTRAINT fk_incident_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), CONSTRAINT fk_incident_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), CONSTRAINT fk_incident_history_rule FOREIGN KEY (rule_id) REFERENCES rule(id), - CONSTRAINT fk_incident_history_channel FOREIGN KEY (channel_id) REFERENCES channel(id) + CONSTRAINT fk_incident_history_notification_history FOREIGN KEY (notification_history_id) REFERENCES notification_history(id) ); CREATE INDEX idx_incident_history_time_type ON incident_history(time, type); diff --git a/schema/pgsql/upgrades/01.sql b/schema/pgsql/upgrades/01.sql new file mode 100644 index 00000000..607e2211 --- /dev/null +++ b/schema/pgsql/upgrades/01.sql @@ -0,0 +1,68 @@ +CREATE TYPE rule_type AS ENUM ('escalation', 'routing'); + +ALTER TABLE rule ADD COLUMN type rule_type; +UPDATE rule SET type = 'escalation'; +ALTER TABLE rule ALTER COLUMN type SET NOT NULL; + +ALTER TABLE rule_escalation RENAME TO rule_entry; +ALTER SEQUENCE rule_escalation_id_seq RENAME TO rule_entry_id_seq; +ALTER TABLE rule_entry RENAME CONSTRAINT pk_rule_escalation TO pk_rule_entry; +ALTER TABLE rule_entry RENAME CONSTRAINT uk_rule_escalation_rule_id_position TO uk_rule_entry_rule_id_position; +ALTER TABLE rule_entry RENAME CONSTRAINT ck_rule_escalation_not_both_condition_and_fallback_for TO ck_rule_entry_not_both_condition_and_fallback_for; +ALTER TABLE rule_entry RENAME CONSTRAINT ck_rule_escalation_non_deleted_needs_position TO ck_rule_entry_non_deleted_needs_position; +ALTER TABLE rule_entry RENAME CONSTRAINT fk_rule_escalation_rule TO fk_rule_entry_rule; +ALTER TABLE rule_entry RENAME CONSTRAINT fk_rule_escalation_rule_escalation TO fk_rule_entry_rule_entry; + +ALTER INDEX idx_rule_escalation_changed_at RENAME TO idx_rule_entry_changed_at; + +ALTER TABLE rule_escalation_recipient RENAME TO rule_entry_recipient; +ALTER TABLE rule_entry_recipient RENAME COLUMN rule_escalation_id TO rule_entry_id; +ALTER SEQUENCE rule_escalation_recipient_id_seq RENAME TO rule_entry_recipient_id_seq; +ALTER TABLE rule_entry_recipient RENAME CONSTRAINT pk_rule_escalation_recipient TO pk_rule_entry_recipient; +ALTER TABLE rule_entry_recipient RENAME CONSTRAINT ck_rule_escalation_recipient_has_exactly_one_recipient TO ck_rule_entry_recipient_has_exactly_one_recipient; +ALTER TABLE rule_entry_recipient RENAME CONSTRAINT fk_rule_escalation_recipient_rule_escalation TO fk_rule_entry_recipient_rule_entry; +ALTER TABLE rule_entry_recipient RENAME CONSTRAINT fk_rule_escalation_recipient_contact TO fk_rule_entry_recipient_contact; +ALTER TABLE rule_entry_recipient RENAME CONSTRAINT fk_rule_escalation_recipient_contactgroup TO fk_rule_entry_recipient_contactgroup; +ALTER TABLE rule_entry_recipient RENAME CONSTRAINT fk_rule_escalation_recipient_schedule TO fk_rule_entry_recipient_schedule; +ALTER TABLE rule_entry_recipient RENAME CONSTRAINT fk_rule_escalation_recipient_channel TO fk_rule_entry_recipient_channel; + +ALTER INDEX idx_rule_escalation_recipient_changed_at RENAME TO idx_rule_entry_recipient_changed_at; + +ALTER TABLE incident_rule_escalation_state RENAME TO incident_rule_entry_state; +ALTER TABLE incident_rule_entry_state RENAME COLUMN rule_escalation_id TO rule_entry_id; +ALTER TABLE incident_rule_entry_state RENAME CONSTRAINT pk_incident_rule_escalation_state TO pk_incident_rule_entry_state; +ALTER TABLE incident_rule_entry_state RENAME CONSTRAINT fk_incident_rule_escalation_state_incident TO fk_incident_rule_entry_state_incident; +ALTER TABLE incident_rule_entry_state RENAME CONSTRAINT fk_incident_rule_escalation_state_rule_escalation TO fk_incident_rule_entry_state_rule_entry; + +CREATE TABLE notification_history ( + id bigserial, + incident_id bigint, + rule_entry_id bigint, + contact_id bigint, + contactgroup_id bigint, + schedule_id bigint, + channel_id bigint, + time bigint NOT NULL, + notification_state notification_state_type, + sent_at bigint, + message text, + + CONSTRAINT pk_notification_history PRIMARY KEY (id), + CONSTRAINT fk_notification_history_incident FOREIGN KEY (incident_id) REFERENCES incident(id), + CONSTRAINT fk_notification_history_rule_entry FOREIGN KEY (rule_entry_id) REFERENCES rule_entry(id), + CONSTRAINT fk_notification_history_contact FOREIGN KEY (contact_id) REFERENCES contact(id), + CONSTRAINT fk_notification_history_contactgroup FOREIGN KEY (contactgroup_id) REFERENCES contactgroup(id), + CONSTRAINT fk_notification_history_schedule FOREIGN KEY (schedule_id) REFERENCES schedule(id), + CONSTRAINT fk_notification_history_channel FOREIGN KEY (channel_id) REFERENCES channel(id) +); + +ALTER TABLE incident_history RENAME COLUMN rule_escalation_id TO rule_entry_id; +ALTER TABLE incident_history + ADD COLUMN notification_history_id bigint, + DROP COLUMN channel_id, + DROP COLUMN notification_state, + DROP COLUMN sent_at, + ADD CONSTRAINT fk_incident_history_notification_history FOREIGN KEY (notification_history_id) REFERENCES notification_history(id); + +ALTER TABLE incident_history RENAME CONSTRAINT fk_incident_history_incident_rule_escalation_state TO fk_incident_history_incident_rule_entry_state; +ALTER TABLE incident_history RENAME CONSTRAINT fk_incident_history_rule_escalation TO fk_incident_history_rule_entry; From 69feca438fef846290dab025f9b65d05bcec24d3 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 17 May 2024 11:35:22 +0200 Subject: [PATCH 2/6] Introduce type `rule.RoutingFilter` --- internal/rule/condition.go | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/internal/rule/condition.go b/internal/rule/condition.go index 97368e44..b413daf2 100644 --- a/internal/rule/condition.go +++ b/internal/rule/condition.go @@ -83,7 +83,7 @@ func (e *EscalationFilter) EvalLess(key string, value string) (bool, error) { } } -func (e *EscalationFilter) EvalLike(key string, value string) (bool, error) { +func (e *EscalationFilter) EvalLike(_, _ string) (bool, error) { return false, fmt.Errorf("escalation filter does not support wildcard matches") } @@ -118,3 +118,34 @@ func (e *EscalationFilter) EvalExists(key string) bool { return false } } + +// RoutingFilter is used to evaluate non-state events (routing) conditions. +// Currently, it only implements the equal operator for the "event_type" filter key. +type RoutingFilter struct { + EventType string +} + +func (rf *RoutingFilter) EvalEqual(key, value string) (bool, error) { + switch key { + case "event_type": + return rf.EventType == value, nil + default: + return false, fmt.Errorf("unsupported rule routing filter option %q", key) + } +} + +func (rf *RoutingFilter) EvalLess(_, _ string) (bool, error) { + return false, fmt.Errorf("rule routing filter does not support '<' operator") +} + +func (rf *RoutingFilter) EvalLike(_, _ string) (bool, error) { + return false, fmt.Errorf("rule routing filter does not support wildcard matches") +} + +func (rf *RoutingFilter) EvalLessOrEqual(key, value string) (bool, error) { + return rf.EvalEqual(key, value) +} + +func (rf *RoutingFilter) EvalExists(key string) bool { + return key == "event_type" +} From 309e6a6326170611bff1b6d4c78051703a63dcec Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Sat, 18 May 2024 13:03:00 +0200 Subject: [PATCH 3/6] Drop `incident_event` table --- internal/incident/incident.go | 9 --------- internal/incident/sync.go | 9 --------- schema/pgsql/schema.sql | 9 --------- schema/pgsql/upgrades/01.sql | 2 ++ 4 files changed, 2 insertions(+), 27 deletions(-) diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 4600f871..09b1fb0e 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -149,11 +149,6 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { i.logger = i.logger.With(zap.String("incident", i.String())) } - if err = i.AddEvent(ctx, tx, ev); err != nil { - i.logger.Errorw("Cannot insert incident event to the database", zap.Error(err)) - return err - } - if err := i.handleMuteUnmute(ctx, tx, ev); err != nil { i.logger.Errorw("Cannot insert incident muted history", zap.String("event", ev.String()), zap.Error(err)) return err @@ -258,10 +253,6 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return err } - if err = i.AddEvent(ctx, tx, ev); err != nil { - return fmt.Errorf("cannot insert incident event to the database: %w", err) - } - if err = i.triggerEscalations(ctx, tx, ev); err != nil { return err } diff --git a/internal/incident/sync.go b/internal/incident/sync.go index e74495c1..a1d376a6 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -53,15 +53,6 @@ func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, stat return err } -// AddEvent Inserts incident history record to the database and returns an error on db failure. -func (i *Incident) AddEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - ie := &EventRow{IncidentID: i.ID, EventID: ev.ID} - stmt, _ := i.db.BuildInsertStmt(ie) - _, err := tx.NamedExecContext(ctx, stmt, ie) - - return err -} - // AddRecipient adds recipient from the given *rule.Entry to this incident. // Syncs also all the recipients with the database and returns an error on db failure. func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *rule.Entry, eventId int64) error { diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index 69e0e3fb..7a2f6b9a 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -408,15 +408,6 @@ CREATE TABLE incident ( CONSTRAINT fk_incident_object FOREIGN KEY (object_id) REFERENCES object(id) ); -CREATE TABLE incident_event ( - incident_id bigint NOT NULL, - event_id bigint NOT NULL, - - CONSTRAINT pk_incident_event PRIMARY KEY (incident_id, event_id), - CONSTRAINT fk_incident_event_incident FOREIGN KEY (incident_id) REFERENCES incident(id), - CONSTRAINT fk_incident_event_event FOREIGN KEY (event_id) REFERENCES event(id) -); - CREATE TYPE incident_contact_role AS ENUM ('recipient', 'subscriber', 'manager'); CREATE TABLE incident_contact ( diff --git a/schema/pgsql/upgrades/01.sql b/schema/pgsql/upgrades/01.sql index 607e2211..4e8d6583 100644 --- a/schema/pgsql/upgrades/01.sql +++ b/schema/pgsql/upgrades/01.sql @@ -66,3 +66,5 @@ ALTER TABLE incident_history ALTER TABLE incident_history RENAME CONSTRAINT fk_incident_history_incident_rule_escalation_state TO fk_incident_history_incident_rule_entry_state; ALTER TABLE incident_history RENAME CONSTRAINT fk_incident_history_rule_escalation TO fk_incident_history_rule_entry; + +DROP TABLE incident_event; From d63d1c26de22fab0331d01ead38540711f886654 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Fri, 17 May 2024 14:37:56 +0200 Subject: [PATCH 4/6] Introduce and use `notification` package --- internal/incident/db_types.go | 49 ++++------- internal/incident/incident.go | 43 ++++------ internal/incident/notification_state.go | 71 ---------------- internal/incident/sync.go | 53 ++++++------ internal/notification/history.go | 104 ++++++++++++++++++++++++ internal/notification/state.go | 71 ++++++++++++++++ 6 files changed, 233 insertions(+), 158 deletions(-) delete mode 100644 internal/incident/notification_state.go create mode 100644 internal/notification/history.go create mode 100644 internal/notification/state.go diff --git a/internal/incident/db_types.go b/internal/incident/db_types.go index d9224430..12648640 100644 --- a/internal/incident/db_types.go +++ b/internal/incident/db_types.go @@ -66,22 +66,20 @@ func (r *RuleRow) TableName() string { // HistoryRow represents a single incident history database entry. type HistoryRow struct { - ID int64 `db:"id"` - IncidentID int64 `db:"incident_id"` - RuleEntryID types.Int `db:"rule_entry_id"` - EventID types.Int `db:"event_id"` - recipient.Key `db:",inline"` - RuleID types.Int `db:"rule_id"` - Time types.UnixMilli `db:"time"` - Type HistoryEventType `db:"type"` - ChannelID types.Int `db:"channel_id"` - NewSeverity event.Severity `db:"new_severity"` - OldSeverity event.Severity `db:"old_severity"` - NewRecipientRole ContactRole `db:"new_recipient_role"` - OldRecipientRole ContactRole `db:"old_recipient_role"` - Message types.String `db:"message"` - NotificationState NotificationState `db:"notification_state"` - SentAt types.UnixMilli `db:"sent_at"` + ID int64 `db:"id"` + IncidentID int64 `db:"incident_id"` + RuleEntryID types.Int `db:"rule_entry_id"` + EventID types.Int `db:"event_id"` + recipient.Key `db:",inline"` + RuleID types.Int `db:"rule_id"` + NotificationHistoryID types.Int `db:"notification_history_id"` + Time types.UnixMilli `db:"time"` + Type HistoryEventType `db:"type"` + NewSeverity event.Severity `db:"new_severity"` + OldSeverity event.Severity `db:"old_severity"` + NewRecipientRole ContactRole `db:"new_recipient_role"` + OldRecipientRole ContactRole `db:"old_recipient_role"` + Message types.String `db:"message"` // Is only used to store Incident (un)mute reason. } // TableName implements the contracts.TableNamer interface. @@ -101,22 +99,3 @@ func (h *HistoryRow) Sync(ctx context.Context, db *database.DB, tx *sqlx.Tx) err return nil } - -// NotificationEntry is used to cache a set of incident history fields of type Notified. -// -// The event processing workflow is performed in a separate transaction before trying to send the actual -// notifications. Thus, all resulting notification entries are marked as pending, and it creates a reference -// to them of this type. The cached entries are then used to actually notify the contacts and mark the pending -// notification entries as either NotificationStateSent or NotificationStateFailed. -type NotificationEntry struct { - HistoryRowID int64 `db:"id"` - ContactID int64 `db:"-"` - ChannelID int64 `db:"-"` - State NotificationState `db:"notification_state"` - SentAt types.UnixMilli `db:"sent_at"` -} - -// TableName implements the contracts.TableNamer interface. -func (h *NotificationEntry) TableName() string { - return "incident_history" -} diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 09b1fb0e..421ad78c 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -9,6 +9,7 @@ import ( "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/notification" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" @@ -200,8 +201,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } } - var notifications []*NotificationEntry - notifications, err = i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time)) + notifications, err := i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time)) if err != nil { return err } @@ -245,7 +245,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return } - var notifications []*NotificationEntry + notifications := make(notification.PendingNotifications) ctx := context.Background() err := utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { err := ev.Sync(ctx, tx, i.db, i.Object.ID) @@ -294,7 +294,6 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, Type: IncidentSeverityChanged, NewSeverity: newSeverity, OldSeverity: oldSeverity, - Message: utils.ToDBString(ev.Message), } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -350,7 +349,6 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, Time: types.UnixMilli(ev.Time), EventID: utils.ToDBInt(ev.ID), NewSeverity: i.Severity, - Message: utils.ToDBString(ev.Message), } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -518,7 +516,7 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even // notifyContacts executes all the given pending notifications of the current incident. // Returns error on database failure or if the provided context is cancelled. -func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifications []*NotificationEntry) error { +func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notificationHistories notification.PendingNotifications) error { baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) if err != nil { i.logger.Errorw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) @@ -548,26 +546,20 @@ func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notifica }, } - for _, notification := range notifications { - contact := i.runtimeConfig.Contacts[notification.ContactID] - if contact == nil { - i.logger.Debugw("Incident refers unknown contact, might got deleted", zap.Int64("contact_id", notification.ContactID)) - continue - } - - if i.notifyContact(contact, req, notification.ChannelID) != nil { - notification.State = NotificationStateFailed - } else { - notification.State = NotificationStateSent - } + for contact, histories := range notificationHistories { + for _, history := range histories { + if i.notifyContact(contact, req, history.ChannelID) != nil { + history.State = notification.StateFailed + } else { + history.State = notification.StateSent + } - notification.SentAt = types.UnixMilli(time.Now()) - stmt, _ := i.db.BuildUpdateStmt(notification) - if _, err := i.db.NamedExecContext(ctx, stmt, notification); err != nil { - i.logger.Errorw( - "Failed to update contact notified incident history", zap.String("contact", contact.String()), - zap.Error(err), - ) + history.SentAt = types.UnixMilli(time.Now()) + stmt, _ := i.db.BuildUpdateStmt(history) + if _, err := i.db.NamedExecContext(ctx, stmt, history); err != nil { + i.logger.Errorw("Failed to update contact notified history", + zap.String("contact", contact.String()), zap.Error(err)) + } } if err := ctx.Err(); err != nil { @@ -648,7 +640,6 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, Time: types.UnixMilli(time.Now()), NewRecipientRole: newRole, OldRecipientRole: oldRole, - Message: utils.ToDBString(ev.Message), } if err := hr.Sync(ctx, i.db, tx); err != nil { diff --git a/internal/incident/notification_state.go b/internal/incident/notification_state.go deleted file mode 100644 index 36196839..00000000 --- a/internal/incident/notification_state.go +++ /dev/null @@ -1,71 +0,0 @@ -package incident - -import ( - "database/sql/driver" - "fmt" -) - -type NotificationState int - -const ( - NotificationStateNull NotificationState = iota - NotificationStateSuppressed - NotificationStatePending - NotificationStateSent - NotificationStateFailed -) - -var notificationStatTypeByName = map[string]NotificationState{ - "suppressed": NotificationStateSuppressed, - "pending": NotificationStatePending, - "sent": NotificationStateSent, - "failed": NotificationStateFailed, -} - -var notificationStateTypeToName = func() map[NotificationState]string { - stateTypes := make(map[NotificationState]string) - for name, eventType := range notificationStatTypeByName { - stateTypes[eventType] = name - } - return stateTypes -}() - -// Scan implements the sql.Scanner interface. -// Supports SQL NULL. -func (n *NotificationState) Scan(src any) error { - if src == nil { - *n = NotificationStateNull - return nil - } - - var name string - switch val := src.(type) { - case string: - name = val - case []byte: - name = string(val) - default: - return fmt.Errorf("unable to scan type %T into NotificationState", src) - } - - historyType, ok := notificationStatTypeByName[name] - if !ok { - return fmt.Errorf("unknown notification state type %q", name) - } - - *n = historyType - - return nil -} - -func (n NotificationState) Value() (driver.Value, error) { - if n == NotificationStateNull { - return nil, nil - } - - return n.String(), nil -} - -func (n *NotificationState) String() string { - return notificationStateTypeToName[*n] -} diff --git a/internal/incident/sync.go b/internal/incident/sync.go index a1d376a6..c0621e57 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/notification" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/utils" @@ -125,28 +126,33 @@ func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule // generateNotifications generates incident notification histories of the given recipients. // -// This function will just insert NotificationStateSuppressed incident histories and return an empty slice if +// This function will just insert notification.StateSuppressed incident histories and return an empty slice if // the current Object is muted, otherwise a slice of pending *NotificationEntry(ies) that can be used to update // the corresponding histories after the actual notifications have been sent out. func (i *Incident) generateNotifications( ctx context.Context, tx *sqlx.Tx, ev *event.Event, contactChannels rule.ContactChannels, -) ([]*NotificationEntry, error) { - var notifications []*NotificationEntry - suppress := i.isMuted && i.Object.IsMuted() - for contact, channels := range contactChannels { - for chID := range channels { +) (notification.PendingNotifications, error) { + notifications, err := notification.AddNotifications(ctx, i.db, tx, contactChannels, func(n *notification.History) { + n.IncidentID = utils.ToDBInt(i.ID) + n.Message = utils.ToDBString(ev.Message) + if i.isMuted && i.Object.IsMuted() { + n.NotificationState = notification.StateSuppressed + } + }) + if err != nil { + i.logger.Errorw("Failed to add pending notification histories", zap.Error(err)) + return nil, err + } + + for contact, entries := range notifications { + for _, entry := range entries { hr := &HistoryRow{ - IncidentID: i.ID, - Key: recipient.ToKey(contact), - EventID: utils.ToDBInt(ev.ID), - Time: types.UnixMilli(time.Now()), - Type: Notified, - ChannelID: utils.ToDBInt(chID), - NotificationState: NotificationStatePending, - Message: utils.ToDBString(ev.Message), - } - if suppress { - hr.NotificationState = NotificationStateSuppressed + IncidentID: i.ID, + Key: recipient.ToKey(contact), + EventID: utils.ToDBInt(ev.ID), + Time: types.UnixMilli(time.Now()), + Type: Notified, + NotificationHistoryID: utils.ToDBInt(entry.HistoryRowID), } if err := hr.Sync(ctx, i.db, tx); err != nil { @@ -155,17 +161,12 @@ func (i *Incident) generateNotifications( zap.Error(err)) return nil, err } - - if !suppress { - notifications = append(notifications, &NotificationEntry{ - HistoryRowID: hr.ID, - ContactID: contact.ID, - State: NotificationStatePending, - ChannelID: chID, - }) - } } } + if i.isMuted && i.Object.IsMuted() { + notifications = nil + } + return notifications, nil } diff --git a/internal/notification/history.go b/internal/notification/history.go new file mode 100644 index 00000000..fb252d3b --- /dev/null +++ b/internal/notification/history.go @@ -0,0 +1,104 @@ +package notification + +import ( + "context" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icinga-notifications/internal/rule" + "github.com/icinga/icinga-notifications/internal/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "time" +) + +// History represents a single history database entry. +type History struct { + ID int64 `db:"id"` + IncidentID types.Int `db:"incident_id"` // Is only set for incident related notifications + RuleEntryID types.Int `db:"rule_entry_id"` + recipient.Key `db:",inline"` + Time types.UnixMilli `db:"time"` + ChannelID int64 `db:"channel_id"` + Message types.String `db:"message"` + NotificationState State `db:"notification_state"` + SentAt types.UnixMilli `db:"sent_at"` +} + +// Sync persists the current state of this history to the database and retrieves the just inserted history ID. +// Returns error when failed to execute the query. +func (h *History) Sync(ctx context.Context, db *database.DB, tx *sqlx.Tx) error { + historyId, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, h, "id"), h) + if err != nil { + return err + } + + h.ID = historyId + + return nil +} + +// TableName implements the contracts.TableNamer interface. +func (h *History) TableName() string { + return "notification_history" +} + +// Entry is used to cache a set of incident history fields of type Notified. +// +// The event processing workflow is performed in a separate transaction before trying to send the actual +// notifications. Thus, all resulting notification entries are marked as pending, and it creates a reference +// to them of this type. The cached entries are then used to actually notify the contacts and mark the pending +// notification entries as either StateSent or StateFailed. +type Entry struct { + HistoryRowID int64 `db:"id"` + ContactID int64 + ChannelID int64 + State State `db:"notification_state"` + SentAt types.UnixMilli `db:"sent_at"` +} + +// TableName implements the contracts.TableNamer interface. +func (h *Entry) TableName() string { + return "notification_history" +} + +// InitHistoryFunc is used to additionally initialise a History entry before persisting it to the database. +type InitHistoryFunc func(*History) + +// PendingNotifications is a map of per recipient.Contact pending notifications. +// Is just a short/readable form of the actual map. +type PendingNotifications map[*recipient.Contact][]*Entry + +// AddNotifications inserts by default pending notification histories into the global notification History table. +// If you need to set/override some additional fields of the History type, you can specify an InitHistoryFunc +// as an argument that is called prior to persisting the history entry to the database. +// +// Returns on success PendingNotifications referencing the just inserted entries and error on any database failure. +func AddNotifications( + ctx context.Context, db *database.DB, tx *sqlx.Tx, contactChannels rule.ContactChannels, initializer InitHistoryFunc, +) (PendingNotifications, error) { + notifications := make(PendingNotifications) + for contact, channels := range contactChannels { + for chID := range channels { + nh := &History{Key: recipient.ToKey(contact), Time: types.UnixMilli(time.Now()), ChannelID: chID} + nh.NotificationState = StatePending + if initializer != nil { + // Might be used to initialise some context specific fields like "incident_id", "rule_entry_id" etc. + initializer(nh) + } + + if err := nh.Sync(ctx, db, tx); err != nil { + return nil, errors.Wrapf(err, "cannot insert pending notification history for %q", contact.String()) + } + + notifications[contact] = append(notifications[contact], &Entry{ + HistoryRowID: nh.ID, + ContactID: contact.ID, + ChannelID: chID, + State: nh.NotificationState, + }) + } + } + + return notifications, nil +} diff --git a/internal/notification/state.go b/internal/notification/state.go new file mode 100644 index 00000000..b5f081c2 --- /dev/null +++ b/internal/notification/state.go @@ -0,0 +1,71 @@ +package notification + +import ( + "database/sql/driver" + "fmt" +) + +type State int + +const ( + StateNull State = iota + StateSuppressed + StatePending + StateSent + StateFailed +) + +var statTypeByName = map[string]State{ + "suppressed": StateSuppressed, + "pending": StatePending, + "sent": StateSent, + "failed": StateFailed, +} + +var stateTypeToName = func() map[State]string { + stateTypes := make(map[State]string) + for name, eventType := range statTypeByName { + stateTypes[eventType] = name + } + return stateTypes +}() + +// Scan implements the sql.Scanner interface. +// Supports SQL NULL. +func (n *State) Scan(src any) error { + if src == nil { + *n = StateNull + return nil + } + + var name string + switch val := src.(type) { + case string: + name = val + case []byte: + name = string(val) + default: + return fmt.Errorf("unable to scan type %T into NotificationState", src) + } + + historyType, ok := statTypeByName[name] + if !ok { + return fmt.Errorf("unknown notification state type %q", name) + } + + *n = historyType + + return nil +} + +func (n State) Value() (driver.Value, error) { + if n == StateNull { + return nil, nil + } + + return n.String(), nil +} + +func (n *State) String() string { + return stateTypeToName[*n] +} From 4f947bbeaa700c553a5b5b0e2a9ee5199efb447b Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 22 May 2024 10:13:11 +0200 Subject: [PATCH 5/6] Extract notify recipients process from incident package --- internal/incident/incident.go | 242 ++++++++++-------------------- internal/incident/incidents.go | 8 +- internal/incident/sync.go | 26 ++-- internal/incident/utils.go | 32 ++++ internal/notification/notifier.go | 103 +++++++++++++ 5 files changed, 228 insertions(+), 183 deletions(-) create mode 100644 internal/incident/utils.go create mode 100644 internal/notification/notifier.go diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 421ad78c..5a399fc8 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -7,17 +7,14 @@ import ( "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/config" - "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/notification" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/utils" - "github.com/icinga/icinga-notifications/pkg/plugin" "github.com/jmoiron/sqlx" "go.uber.org/zap" - "net/url" "sync" "time" ) @@ -48,9 +45,9 @@ type Incident struct { // This prevents us from generating multiple muted histories when receiving several events that mute our Object. isMuted bool - db *database.DB - logger *zap.SugaredLogger - runtimeConfig *config.RuntimeConfig + // notification.Notifier is a helper type used to send notifications. + // It is embedded to allow direct access to its members, such as logger, DB etc. + notification.Notifier // config.Evaluable encapsulates all evaluable configuration types, such as rule.Rule, rule.Entry etc. // It is embedded to enable direct access to its members. @@ -63,11 +60,9 @@ func NewIncident( db *database.DB, obj *object.Object, runtimeConfig *config.RuntimeConfig, logger *zap.SugaredLogger, ) *Incident { i := &Incident{ - db: db, Object: obj, - logger: logger, - runtimeConfig: runtimeConfig, Evaluable: config.NewEvaluable(), + Notifier: notification.Notifier{DB: db, RuntimeConfig: runtimeConfig, Logger: logger}, EscalationState: map[escalationID]*EscalationState{}, Recipients: map[recipient.Key]*RecipientState{}, } @@ -85,8 +80,8 @@ func (i *Incident) String() string { func (i *Incident) HasManager() bool { for recipientKey, state := range i.Recipients { - if i.runtimeConfig.GetRecipient(recipientKey) == nil { - i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) + if i.RuntimeConfig.GetRecipient(recipientKey) == nil { + i.Logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue } if state.Role == RoleManager { @@ -114,29 +109,29 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { i.Lock() defer i.Unlock() - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() // These event types are not like the others used to mute an object/incident, such as DowntimeStart, which // uniquely identify themselves why an incident is being muted, but are rather super generic types, and as // such, we are ignoring superfluous ones that don't have any effect on that incident. if i.isMuted && ev.Type == event.TypeMute { - i.logger.Debugw("Ignoring superfluous mute event", zap.String("event", ev.String())) + i.Logger.Debugw("Ignoring superfluous mute event", zap.String("event", ev.String())) return event.ErrSuperfluousMuteUnmuteEvent } else if !i.isMuted && ev.Type == event.TypeUnmute { - i.logger.Debugw("Ignoring superfluous unmute event", zap.String("event", ev.String())) + i.Logger.Debugw("Ignoring superfluous unmute event", zap.String("event", ev.String())) return event.ErrSuperfluousMuteUnmuteEvent } - tx, err := i.db.BeginTxx(ctx, nil) + tx, err := i.DB.BeginTxx(ctx, nil) if err != nil { - i.logger.Errorw("Cannot start a db transaction", zap.Error(err)) + i.Logger.Errorw("Cannot start a db transaction", zap.Error(err)) return err } defer func() { _ = tx.Rollback() }() - if err = ev.Sync(ctx, tx, i.db, i.Object.ID); err != nil { - i.logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) + if err = ev.Sync(ctx, tx, i.DB, i.Object.ID); err != nil { + i.Logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) return err } @@ -147,11 +142,11 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { return err } - i.logger = i.logger.With(zap.String("incident", i.String())) + i.Logger = i.Logger.With(zap.String("incident", i.String())) } if err := i.handleMuteUnmute(ctx, tx, ev); err != nil { - i.logger.Errorw("Cannot insert incident muted history", zap.String("event", ev.String()), zap.Error(err)) + i.Logger.Errorw("Cannot insert incident muted history", zap.String("event", ev.String()), zap.Error(err)) return err } @@ -165,11 +160,11 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { // Check if any (additional) rules match this object. Incident filter rules are stateful, which means that // once they have been matched, they remain effective for the ongoing incident and never need to be rechecked. - err := i.EvaluateRules(i.runtimeConfig, i.Object, config.EvalOptions[*rule.Rule, any]{ + err := i.EvaluateRules(i.RuntimeConfig, i.Object, config.EvalOptions[*rule.Rule, any]{ OnPreEvaluate: func(r *rule.Rule) bool { return r.Type == rule.TypeEscalation }, OnFilterMatch: func(r *rule.Rule) error { return i.onFilterRuleMatch(ctx, r, tx, ev) }, OnError: func(r *rule.Rule, err error) bool { - i.logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) + i.Logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) // We don't want to stop evaluating the remaining rules just because one of them couldn't be evaluated. return true @@ -207,14 +202,14 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } if err = tx.Commit(); err != nil { - i.logger.Errorw("Cannot commit db transaction", zap.Error(err)) + i.Logger.Errorw("Cannot commit db transaction", zap.Error(err)) return err } // We've just committed the DB transaction and can safely update the incident muted flag. i.isMuted = i.Object.IsMuted() - return i.notifyContacts(ctx, ev, notifications) + return i.NotifyContacts(ctx, i.makeNotificationRequest(ev), notifications) } // RetriggerEscalations tries to re-evaluate the escalations and notify contacts. @@ -222,8 +217,8 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { i.Lock() defer i.Unlock() - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() if !i.RecoveredAt.Time().IsZero() { // Incident is recovered in the meantime. @@ -231,7 +226,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { } if !time.Now().After(ev.Time) { - i.logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev)) + i.Logger.DPanicw("Event from the future", zap.Time("event_time", ev.Time), zap.Any("event", ev)) return } @@ -241,14 +236,14 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { i.evaluateEscalations(ev.Time) if len(i.RuleEntries) == 0 { - i.logger.Debug("Reevaluated escalations, no new escalations triggered") + i.Logger.Debug("Reevaluated escalations, no new escalations triggered") return } notifications := make(notification.PendingNotifications) ctx := context.Background() - err := utils.RunInTx(ctx, i.db, func(tx *sqlx.Tx) error { - err := ev.Sync(ctx, tx, i.db, i.Object.ID) + err := utils.RunInTx(ctx, i.DB, func(tx *sqlx.Tx) error { + err := ev.Sync(ctx, tx, i.DB, i.Object.ID) if err != nil { return err } @@ -266,14 +261,14 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return err }) if err != nil { - i.logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) + i.Logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) } else { - if err = i.notifyContacts(ctx, ev, notifications); err != nil { - i.logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) + if err = i.NotifyContacts(ctx, i.makeNotificationRequest(ev), notifications); err != nil { + i.Logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) return } - i.logger.Info("Successfully reevaluated time-based escalations") + i.Logger.Info("Successfully reevaluated time-based escalations") } } @@ -281,11 +276,11 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, oldSeverity := i.Severity newSeverity := ev.Severity if oldSeverity == newSeverity { - i.logger.Debugw("Ignoring superfluous severity change event", zap.Int64("source_id", ev.SourceId), zap.Stringer("event", ev)) + i.Logger.Debugw("Ignoring superfluous severity change event", zap.Int64("source_id", ev.SourceId), zap.Stringer("event", ev)) return event.ErrSuperfluousStateChange } - i.logger.Infof("Incident severity changed from %s to %s", oldSeverity.String(), newSeverity.String()) + i.Logger.Infof("Incident severity changed from %s to %s", oldSeverity.String(), newSeverity.String()) hr := &HistoryRow{ IncidentID: i.ID, @@ -296,14 +291,14 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, OldSeverity: oldSeverity, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to insert incident severity changed history", zap.Error(err)) return err } if newSeverity == event.SeverityOK { i.RecoveredAt = types.UnixMilli(time.Now()) - i.logger.Info("All sources recovered, closing incident") + i.Logger.Info("All sources recovered, closing incident") RemoveCurrent(i.Object) @@ -314,8 +309,8 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, Type: Closed, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Cannot insert incident closed history to the database", zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Cannot insert incident closed history to the database", zap.Error(err)) return err } @@ -326,7 +321,7 @@ func (i *Incident) processSeverityChangedEvent(ctx context.Context, tx *sqlx.Tx, i.Severity = newSeverity if err := i.Sync(ctx, tx); err != nil { - i.logger.Errorw("Failed to update incident severity", zap.Error(err)) + i.Logger.Errorw("Failed to update incident severity", zap.Error(err)) return err } @@ -337,11 +332,11 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, i.StartedAt = types.UnixMilli(ev.Time) i.Severity = ev.Severity if err := i.Sync(ctx, tx); err != nil { - i.logger.Errorw("Cannot insert incident to the database", zap.Error(err)) + i.Logger.Errorw("Cannot insert incident to the database", zap.Error(err)) return err } - i.logger.Infow(fmt.Sprintf("Source %d opened incident at severity %q", ev.SourceId, i.Severity.String()), zap.String("message", ev.Message)) + i.Logger.Infow(fmt.Sprintf("Source %d opened incident at severity %q", ev.SourceId, i.Severity.String()), zap.String("message", ev.Message)) hr := &HistoryRow{ IncidentID: i.ID, @@ -351,8 +346,8 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, NewSeverity: i.Severity, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Cannot insert incident opened history event", zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Cannot insert incident opened history event", zap.Error(err)) return err } @@ -367,7 +362,7 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. } hr := &HistoryRow{IncidentID: i.ID, EventID: utils.ToDBInt(ev.ID), Time: types.UnixMilli(time.Now())} - logger := i.logger.With(zap.String("event", ev.String())) + logger := i.Logger.With(zap.String("event", ev.String())) if i.Object.IsMuted() { hr.Type = Muted // Since the object may have already been muted with previous events before this incident even @@ -381,7 +376,7 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. logger.Infow("Unmuting incident", zap.String("reason", ev.MuteReason)) } - return hr.Sync(ctx, i.db, tx) + return hr.Sync(ctx, i.DB, tx) } // onFilterRuleMatch records a database entry in the `incident_rule` table that refers to the specified rule.Rule. @@ -392,10 +387,10 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. // // Returns an error if it fails to persist the database entries. func (i *Incident) onFilterRuleMatch(ctx context.Context, r *rule.Rule, tx *sqlx.Tx, ev *event.Event) error { - i.logger.Infow("Rule matches", zap.Object("rule", r)) + i.Logger.Infow("Rule matches", zap.Object("rule", r)) if err := i.AddRuleMatched(ctx, tx, r); err != nil { - i.logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) + i.Logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) return err } @@ -406,8 +401,8 @@ func (i *Incident) onFilterRuleMatch(ctx context.Context, r *rule.Rule, tx *sqlx RuleID: utils.ToDBInt(r.ID), Type: RuleMatched, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) return err } @@ -423,7 +418,7 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) { // Escalations are reevaluated now, reset any existing timer, if there might be future time-based escalations, // this function will start a new timer. if i.timer != nil { - i.logger.Info("Stopping reevaluate timer due to escalation evaluation") + i.Logger.Info("Stopping reevaluate timer due to escalation evaluation") i.timer.Stop() i.timer = nil } @@ -433,12 +428,12 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) { // EvaluateRuleEntries only returns an error if one of the provided callback hooks returns // an error or the OnError handler returns false, and since none of our callbacks return an // error nor false, we can safely discard the return value here. - _ = i.EvaluateRuleEntries(i.runtimeConfig, filterContext, config.EvalOptions[*rule.Entry, any]{ + _ = i.EvaluateRuleEntries(i.RuntimeConfig, filterContext, config.EvalOptions[*rule.Entry, any]{ // Prevent reevaluation of an already triggered escalation via the pre run hook. OnPreEvaluate: func(escalation *rule.Entry) bool { return i.EscalationState[escalation.ID] == nil }, OnError: func(escalation *rule.Entry, err error) bool { - r := i.runtimeConfig.Rules[escalation.RuleID] - i.logger.Warnw("Failed to evaluate escalation condition", zap.Object("rule", r), + r := i.RuntimeConfig.Rules[escalation.RuleID] + i.Logger.Warnw("Failed to evaluate escalation condition", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err)) return true }, @@ -451,9 +446,9 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) { // the incident start time here. nextEvalAt := eventTime.Add(retryAfter) - i.logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) + i.Logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) i.timer = time.AfterFunc(retryAfter, func() { - i.logger.Info("Reevaluating escalations") + i.Logger.Info("Reevaluating escalations") i.RetriggerEscalations(&event.Event{ Time: nextEvalAt, @@ -470,19 +465,19 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) { // Returns an error on database failure. func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { for _, escalation := range i.RuleEntries { - r := i.runtimeConfig.Rules[escalation.RuleID] + r := i.RuntimeConfig.Rules[escalation.RuleID] if r == nil { - i.logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) + i.Logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) continue } - i.logger.Infow("Rule reached escalation", zap.Object("rule", r), zap.Object("escalation", escalation)) + i.Logger.Infow("Rule reached escalation", zap.Object("rule", r), zap.Object("escalation", escalation)) state := &EscalationState{RuleEscalationID: escalation.ID, TriggeredAt: types.UnixMilli(time.Now())} i.EscalationState[escalation.ID] = state if err := i.AddEscalationTriggered(ctx, tx, state); err != nil { - i.logger.Errorw( + i.Logger.Errorw( "Failed to upsert escalation state", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), ) @@ -498,8 +493,8 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even Type: EscalationTriggered, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw( + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw( "Failed to insert escalation triggered incident history", zap.Object("rule", r), zap.Object("escalation", escalation), zap.Error(err), ) @@ -514,91 +509,6 @@ func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *even return nil } -// notifyContacts executes all the given pending notifications of the current incident. -// Returns error on database failure or if the provided context is cancelled. -func (i *Incident) notifyContacts(ctx context.Context, ev *event.Event, notificationHistories notification.PendingNotifications) error { - baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) - if err != nil { - i.logger.Errorw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) - return err - } - - incidentUrl := baseUrl.JoinPath("/notifications/incident") - incidentUrl.RawQuery = fmt.Sprintf("id=%d", i.ID) - - req := &plugin.NotificationRequest{ - Object: &plugin.Object{ - Name: i.Object.DisplayName(), - Url: ev.URL, - Tags: i.Object.Tags, - ExtraTags: i.Object.ExtraTags, - }, - Incident: &plugin.Incident{ - Id: i.ID, - Url: incidentUrl.String(), - Severity: i.Severity.String(), - }, - Event: &plugin.Event{ - Time: ev.Time, - Type: ev.Type, - Username: ev.Username, - Message: ev.Message, - }, - } - - for contact, histories := range notificationHistories { - for _, history := range histories { - if i.notifyContact(contact, req, history.ChannelID) != nil { - history.State = notification.StateFailed - } else { - history.State = notification.StateSent - } - - history.SentAt = types.UnixMilli(time.Now()) - stmt, _ := i.db.BuildUpdateStmt(history) - if _, err := i.db.NamedExecContext(ctx, stmt, history); err != nil { - i.logger.Errorw("Failed to update contact notified history", - zap.String("contact", contact.String()), zap.Error(err)) - } - } - - if err := ctx.Err(); err != nil { - return err - } - } - - return nil -} - -// notifyContact notifies the given recipient via a channel matching the given ID. -func (i *Incident) notifyContact(contact *recipient.Contact, req *plugin.NotificationRequest, chID int64) error { - ch := i.runtimeConfig.Channels[chID] - if ch == nil { - i.logger.Errorw("Could not find config for channel", zap.Int64("channel_id", chID)) - - return fmt.Errorf("could not find config for channel ID: %d", chID) - } - - i.logger.Infow(fmt.Sprintf("Notify contact %q via %q of type %q", contact.FullName, ch.Name, ch.Type), - zap.Int64("channel_id", chID), zap.String("event_type", req.Event.Type)) - - contactStruct := &plugin.Contact{FullName: contact.FullName} - for _, addr := range contact.Addresses { - contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) - } - req.Contact = contactStruct - - if err := ch.Notify(req); err != nil { - i.logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) - return err - } - - i.logger.Infow("Successfully sent a notification via channel plugin", zap.String("type", ch.Type), - zap.String("contact", contact.FullName), zap.String("event_type", req.Event.Type)) - - return nil -} - // errSuperfluousAckEvent is returned when the same ack author submits two successive ack set events on an incident. // This is error is going to be used only within this incident package. var errSuperfluousAckEvent = errors.New("superfluous acknowledgement set event, author is already a manager") @@ -607,9 +517,9 @@ var errSuperfluousAckEvent = errors.New("superfluous acknowledgement set event, // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { - contact := i.runtimeConfig.GetContact(ev.Username) + contact := i.RuntimeConfig.GetContact(ev.Username) if contact == nil { - i.logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username)) + i.Logger.Warnw("Ignoring acknowledgement event from an unknown author", zap.String("author", ev.Username)) return fmt.Errorf("unknown acknowledgment author %q", ev.Username) } @@ -623,14 +533,14 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, if oldRole == RoleManager { // The user is already a manager - i.logger.Debugw("Ignoring acknowledgement-set event, author is already a manager", zap.String("author", ev.Username)) + i.Logger.Debugw("Ignoring acknowledgement-set event, author is already a manager", zap.String("author", ev.Username)) return errSuperfluousAckEvent } } else { i.Recipients[recipientKey] = &RecipientState{Role: newRole} } - i.logger.Infof("Contact %q role changed from %s to %s", contact.String(), oldRole.String(), newRole.String()) + i.Logger.Infof("Contact %q role changed from %s to %s", contact.String(), oldRole.String(), newRole.String()) hr := &HistoryRow{ IncidentID: i.ID, @@ -642,17 +552,17 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, OldRecipientRole: oldRole, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err)) + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to add recipient role changed history", zap.String("recipient", contact.String()), zap.Error(err)) return err } cr := &ContactRow{IncidentID: hr.IncidentID, Key: recipientKey, Role: newRole} - stmt, _ := i.db.BuildUpsertStmt(cr) + stmt, _ := i.DB.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { - i.logger.Errorw("Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err)) + i.Logger.Errorw("Failed to upsert incident contact", zap.String("contact", contact.String()), zap.Error(err)) return err } @@ -664,9 +574,9 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { contactChs := make(rule.ContactChannels) // Load all escalations recipients channels for escalationID := range i.EscalationState { - escalation := i.runtimeConfig.GetRuleEntry(escalationID) + escalation := i.RuntimeConfig.GetRuleEntry(escalationID) if escalation == nil { - i.logger.Debugw("Incident refers unknown escalation, might got deleted", zap.Int64("escalation_id", escalationID)) + i.Logger.Debugw("Incident refers unknown escalation, might got deleted", zap.Int64("escalation_id", escalationID)) continue } @@ -677,16 +587,16 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { // When a recipient has subscribed/managed this incident via the UI or using an ACK, fallback // to the default contact channel. for recipientKey, state := range i.Recipients { - r := i.runtimeConfig.GetRecipient(recipientKey) + r := i.RuntimeConfig.GetRecipient(recipientKey) if r == nil { - i.logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) + i.Logger.Debugw("Incident refers unknown recipient key, might got deleted", zap.Inline(recipientKey)) continue } if i.IsNotifiable(state.Role) { contacts := r.GetContactsAt(t) if len(contacts) > 0 { - i.logger.Debugw("Expanded recipient to contacts", + i.Logger.Debugw("Expanded recipient to contacts", zap.Object("recipient", r), zap.Objects("contacts", contacts)) @@ -697,7 +607,7 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { } } } else { - i.logger.Warnw("Recipient expanded to no contacts", zap.Object("recipient", r)) + i.Logger.Warnw("Recipient expanded to no contacts", zap.Object("recipient", r)) } } } @@ -710,9 +620,9 @@ func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { func (i *Incident) restoreRecipients(ctx context.Context) error { contact := &ContactRow{} var contacts []*ContactRow - err := i.db.SelectContext(ctx, &contacts, i.db.Rebind(i.db.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID) + err := i.DB.SelectContext(ctx, &contacts, i.DB.Rebind(i.DB.BuildSelectStmt(contact, contact)+` WHERE "incident_id" = ?`), i.ID) if err != nil { - i.logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err)) + i.Logger.Errorw("Failed to restore incident recipients from the database", zap.Error(err)) return err } diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 13b16ad5..061d1cfe 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -98,10 +98,10 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log i.EscalationState[state.RuleEscalationID] = state // Restore the incident rule matching the current escalation state if any. - i.runtimeConfig.RLock() - defer i.runtimeConfig.RUnlock() + i.RuntimeConfig.RLock() + defer i.RuntimeConfig.RUnlock() - escalation := i.runtimeConfig.GetRuleEntry(state.RuleEscalationID) + escalation := i.RuntimeConfig.GetRuleEntry(state.RuleEscalationID) if escalation != nil { i.Rules[escalation.RuleID] = true } @@ -121,7 +121,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log for _, i := range incidentsById { i.Object = object.GetFromCache(i.ObjectID) i.isMuted = i.Object.IsMuted() - i.logger = logger.With(zap.String("object", i.Object.DisplayName()), + i.Logger = logger.With(zap.String("object", i.Object.DisplayName()), zap.String("incident", i.String())) currentIncidentsMu.Lock() diff --git a/internal/incident/sync.go b/internal/incident/sync.go index c0621e57..ccbb59ee 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -27,13 +27,13 @@ func (i *Incident) Upsert() interface{} { // Returns an error on db failure. func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { if i.ID != 0 { - stmt, _ := i.db.BuildUpsertStmt(i) + stmt, _ := i.DB.BuildUpsertStmt(i) _, err := tx.NamedExecContext(ctx, stmt, i) if err != nil { return fmt.Errorf("failed to upsert incident: %w", err) } } else { - stmt := utils.BuildInsertStmtWithout(i.db, i, "id") + stmt := utils.BuildInsertStmtWithout(i.DB, i, "id") incidentId, err := utils.InsertAndFetchId(ctx, tx, stmt, i) if err != nil { return err @@ -48,7 +48,7 @@ func (i *Incident) Sync(ctx context.Context, tx *sqlx.Tx) error { func (i *Incident) AddEscalationTriggered(ctx context.Context, tx *sqlx.Tx, state *EscalationState) error { state.IncidentID = i.ID - stmt, _ := i.db.BuildUpsertStmt(state) + stmt, _ := i.DB.BuildUpsertStmt(state) _, err := tx.NamedExecContext(ctx, stmt, state) return err @@ -77,7 +77,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru oldRole := state.Role state.Role = newRole - i.logger.Infof("Contact %q role changed from %s to %s", r, state.Role.String(), newRole.String()) + i.Logger.Infof("Contact %q role changed from %s to %s", r, state.Role.String(), newRole.String()) hr := &HistoryRow{ IncidentID: i.ID, @@ -89,8 +89,8 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru OldRecipientRole: oldRole, } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw( + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw( "Failed to insert recipient role changed incident history", zap.Object("escalation", escalation), zap.String("recipients", r.String()), zap.Error(err), ) @@ -100,10 +100,10 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru cr.Role = state.Role } - stmt, _ := i.db.BuildUpsertStmt(cr) + stmt, _ := i.DB.BuildUpsertStmt(cr) _, err := tx.NamedExecContext(ctx, stmt, cr) if err != nil { - i.logger.Errorw( + i.Logger.Errorw( "Failed to upsert incident recipient", zap.Object("escalation", escalation), zap.String("recipient", r.String()), zap.Error(err), ) @@ -118,7 +118,7 @@ func (i *Incident) AddRecipient(ctx context.Context, tx *sqlx.Tx, escalation *ru // Returns an error on database failure. func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule) error { rr := &RuleRow{IncidentID: i.ID, RuleID: r.ID} - stmt, _ := i.db.BuildUpsertStmt(rr) + stmt, _ := i.DB.BuildUpsertStmt(rr) _, err := tx.NamedExecContext(ctx, stmt, rr) return err @@ -132,7 +132,7 @@ func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule func (i *Incident) generateNotifications( ctx context.Context, tx *sqlx.Tx, ev *event.Event, contactChannels rule.ContactChannels, ) (notification.PendingNotifications, error) { - notifications, err := notification.AddNotifications(ctx, i.db, tx, contactChannels, func(n *notification.History) { + notifications, err := notification.AddNotifications(ctx, i.DB, tx, contactChannels, func(n *notification.History) { n.IncidentID = utils.ToDBInt(i.ID) n.Message = utils.ToDBString(ev.Message) if i.isMuted && i.Object.IsMuted() { @@ -140,7 +140,7 @@ func (i *Incident) generateNotifications( } }) if err != nil { - i.logger.Errorw("Failed to add pending notification histories", zap.Error(err)) + i.Logger.Errorw("Failed to add pending notification histories", zap.Error(err)) return nil, err } @@ -155,8 +155,8 @@ func (i *Incident) generateNotifications( NotificationHistoryID: utils.ToDBInt(entry.HistoryRowID), } - if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw("Failed to insert incident notification history", + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to insert incident notification history", zap.String("contact", contact.FullName), zap.Bool("incident_muted", i.Object.IsMuted()), zap.Error(err)) return nil, err diff --git a/internal/incident/utils.go b/internal/incident/utils.go new file mode 100644 index 00000000..357aa0e2 --- /dev/null +++ b/internal/incident/utils.go @@ -0,0 +1,32 @@ +package incident + +import ( + "fmt" + "github.com/icinga/icinga-notifications/internal/daemon" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/notification" + "github.com/icinga/icinga-notifications/pkg/plugin" + "go.uber.org/zap" + "net/url" +) + +// makeNotificationRequest generates a *plugin.NotificationRequest for the provided event. +// Fails fatally when fails to parse the Icinga Web 2 url. +func (i *Incident) makeNotificationRequest(ev *event.Event) *plugin.NotificationRequest { + baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) + if err != nil { + i.Logger.Panicw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) + } + + incidentUrl := baseUrl.JoinPath("/notifications/incident") + incidentUrl.RawQuery = fmt.Sprintf("id=%d", i.ID) + + req := notification.NewPluginRequest(i.Object, ev) + req.Incident = &plugin.Incident{ + Id: i.ID, + Url: incidentUrl.String(), + Severity: i.Severity.String(), + } + + return req +} diff --git a/internal/notification/notifier.go b/internal/notification/notifier.go new file mode 100644 index 00000000..79a32cea --- /dev/null +++ b/internal/notification/notifier.go @@ -0,0 +1,103 @@ +package notification + +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icinga-notifications/pkg/plugin" + "go.uber.org/zap" + "time" +) + +// Notifier is helper type used to send notifications requests to their recipients. +type Notifier struct { + DB *database.DB `db:"-" json:"-"` + RuntimeConfig *config.RuntimeConfig `db:"-" json:"-"` + Logger *zap.SugaredLogger `db:"-" json:"-"` +} + +// NotifyContacts delivers all the provided pending notifications to their corresponding contacts. +// +// Each of the given notifications will either be marked as StateSent or StateFailed in the database. +// When a specific notification fails to be sent, it won't interrupt the subsequent notifications, instead +// it will simply log the error and continue sending the remaining ones. +// +// Returns an error if the specified context is cancelled, otherwise always nil. +func (n *Notifier) NotifyContacts(ctx context.Context, req *plugin.NotificationRequest, notifications PendingNotifications) error { + for contact, entries := range notifications { + for _, notification := range entries { + if n.NotifyContact(contact, req, notification.ChannelID) != nil { + notification.State = StateFailed + } else { + notification.State = StateSent + } + notification.SentAt = types.UnixMilli(time.Now()) + + stmt, _ := n.DB.BuildUpdateStmt(notification) + if _, err := n.DB.NamedExecContext(ctx, stmt, notification); err != nil { + n.Logger.Errorw("Failed to update contact notified history", + zap.String("contact", contact.String()), zap.Error(err)) + } + } + + if err := ctx.Err(); err != nil { + return err + } + } + + return nil +} + +// NotifyContact notifies the given recipient via a channel matching the given ID. +// +// Please make sure to call this method while holding the config.RuntimeConfig lock. +// Returns an error if unable to find a channel with the specified ID or fails to send the notification. +func (n *Notifier) NotifyContact(c *recipient.Contact, req *plugin.NotificationRequest, chID int64) error { + ch := n.RuntimeConfig.Channels[chID] + if ch == nil { + n.Logger.Errorw("Cannot not find config for channel", zap.Int64("channel_id", chID)) + return fmt.Errorf("cannot not find config for channel ID '%d'", chID) + } + + n.Logger.Infow(fmt.Sprintf("Notify contact %q via %q of type %q", c, ch.Name, ch.Type), + zap.Int64("channel_id", chID), zap.String("event_tye", req.Event.Type)) + + contactStruct := &plugin.Contact{FullName: c.FullName} + for _, addr := range c.Addresses { + contactStruct.Addresses = append(contactStruct.Addresses, &plugin.Address{Type: addr.Type, Address: addr.Address}) + } + req.Contact = contactStruct + + if err := ch.Notify(req); err != nil { + n.Logger.Errorw("Failed to send notification via channel plugin", zap.String("type", ch.Type), zap.Error(err)) + return err + } + + n.Logger.Infow("Successfully sent a notification via channel plugin", zap.String("type", ch.Type), + zap.String("contact", c.String()), zap.String("event_type", req.Event.Type)) + + return nil +} + +// NewPluginRequest returns a new plugin.NotificationRequest from the given arguments. +func NewPluginRequest(obj *object.Object, ev *event.Event) *plugin.NotificationRequest { + return &plugin.NotificationRequest{ + Object: &plugin.Object{ + Name: obj.DisplayName(), + Url: ev.URL, + Tags: obj.Tags, + ExtraTags: obj.ExtraTags, + }, + Event: &plugin.Event{ + Time: ev.Time, + Type: ev.Type, + Username: ev.Username, + Message: ev.Message, + }, + } +} From 35d83ce9259092a9dbb2403a91a5e9883057fbfe Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 5 Aug 2024 15:10:15 +0200 Subject: [PATCH 6/6] Support sending non-state notifications --- internal/daemon/config.go | 17 +++ internal/events/events.go | 29 +++++ internal/events/events_test.go | 156 +++++++++++++++++++++++ internal/events/router.go | 225 +++++++++++++++++++++++++++++++++ internal/icinga2/launcher.go | 4 +- internal/incident/incident.go | 67 ++++------ internal/incident/incidents.go | 65 ---------- internal/incident/sync.go | 4 +- internal/incident/utils.go | 4 +- internal/listener/listener.go | 3 +- 10 files changed, 458 insertions(+), 116 deletions(-) create mode 100644 internal/events/events.go create mode 100644 internal/events/events_test.go create mode 100644 internal/events/router.go diff --git a/internal/daemon/config.go b/internal/daemon/config.go index 72e9742d..ea188020 100644 --- a/internal/daemon/config.go +++ b/internal/daemon/config.go @@ -101,3 +101,20 @@ func ParseFlagsAndConfig() { utils.PrintErrorThenExit(err, ExitFailure) } } + +// InitTestConfig initialises the global daemon config instance and applies the defaults. +// This should be used for unit tests only. +func InitTestConfig() error { + daemonConfig = new(ConfigFile) + if err := defaults.Set(daemonConfig); err != nil { + return err + } + if err := defaults.Set(&daemonConfig.Database); err != nil { + return err + } + if err := defaults.Set(&daemonConfig.Logging); err != nil { + return err + } + + return nil +} diff --git a/internal/events/events.go b/internal/events/events.go new file mode 100644 index 00000000..33803fdb --- /dev/null +++ b/internal/events/events.go @@ -0,0 +1,29 @@ +package events + +import ( + "context" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/notification" +) + +// Process processes the specified event.Event. +// +// Please note that this function is the only way to access the internal events.router type. +// +// The returned error might be wrapped around event.ErrSuperfluousStateChange. +func Process(ctx context.Context, db *database.DB, logs *logging.Logging, rc *config.RuntimeConfig, ev *event.Event) error { + r := &router{ + logs: logs, + Evaluable: config.NewEvaluable(), + Notifier: notification.Notifier{ + DB: db, + RuntimeConfig: rc, + Logger: logs.GetChildLogger("routing").SugaredLogger, + }, + } + + return r.route(ctx, ev) +} diff --git a/internal/events/events_test.go b/internal/events/events_test.go new file mode 100644 index 00000000..6526fd21 --- /dev/null +++ b/internal/events/events_test.go @@ -0,0 +1,156 @@ +package events + +import ( + "context" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/daemon" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icinga-notifications/internal/testutils" + "github.com/icinga/icinga-notifications/internal/utils" + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" + "testing" + "time" +) + +func TestProcess(t *testing.T) { + ctx := context.Background() + db := testutils.GetTestDB(ctx, t) + + require.NoError(t, daemon.InitTestConfig(), "mocking daemon.Config should not fail") + + // Insert a dummy source for our test cases! + source := config.Source{Type: "notifications", Name: "Icinga Notifications", Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}} + source.ChangedAt = types.UnixMilli(time.Now()) + source.Deleted = types.Bool{Bool: false, Valid: true} + + err := utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { + id, err := utils.InsertAndFetchId(ctx, tx, utils.BuildInsertStmtWithout(db, source, "id"), source) + require.NoError(t, err, "populating source table should not fail") + + source.ID = id + return nil + }) + require.NoError(t, err, "utils.RunInTx() should not fail") + + logs, err := logging.NewLogging("events-router", zapcore.DebugLevel, "console", nil, time.Hour) + require.NoError(t, err, "logging initialisation should not fail") + + runtimeConfig := new(config.RuntimeConfig) + + t.Run("InvalidEvents", func(t *testing.T) { + assert.Nil(t, Process(ctx, db, logs, runtimeConfig, makeEvent(t, source.ID, event.TypeState, event.SeverityNone))) + assert.ErrorIs(t, Process(ctx, db, logs, runtimeConfig, makeEvent(t, source.ID, event.TypeState, event.SeverityOK)), event.ErrSuperfluousStateChange) + assert.ErrorIs(t, Process(ctx, db, logs, runtimeConfig, makeEvent(t, source.ID, event.TypeAcknowledgementSet, event.SeverityOK)), event.ErrSuperfluousStateChange) + assert.ErrorIs(t, Process(ctx, db, logs, runtimeConfig, makeEvent(t, source.ID, event.TypeAcknowledgementCleared, event.SeverityOK)), event.ErrSuperfluousStateChange) + }) + + t.Run("StateChangeEvents", func(t *testing.T) { + states := map[string]*event.Event{ + "crit": makeEvent(t, source.ID, event.TypeState, event.SeverityCrit), + "warn": makeEvent(t, source.ID, event.TypeState, event.SeverityWarning), + "err": makeEvent(t, source.ID, event.TypeState, event.SeverityErr), + "alert": makeEvent(t, source.ID, event.TypeState, event.SeverityAlert), + } + + for severity, ev := range states { + assert.NoErrorf(t, Process(ctx, db, logs, runtimeConfig, ev), "state event with severity %q should open an incident", severity) + assert.ErrorIsf(t, Process(ctx, db, logs, runtimeConfig, ev), event.ErrSuperfluousStateChange, + "superfluous state event %q should be ignored", severity) + + obj := object.GetFromCache(object.ID(source.ID, ev.Tags)) + require.NotNil(t, obj, "there should be a cached object") + + i, err := incident.GetCurrent(ctx, db, obj, logs.GetLogger(), runtimeConfig, false) + require.NoError(t, err, "retrieving current incident should not fail") + require.NotNil(t, i, "there should be a cached incident") + assert.Equal(t, ev.Severity, i.Severity, "severities should be equal") + } + + reloadIncidents := func(ctx context.Context) { + object.ClearCache() + + // Remove all existing incidents from the cache, as they are indexed with the + // pointer of their object, which is going to change! + for _, i := range incident.GetCurrentIncidents() { + incident.RemoveCurrent(i.Object) + } + + // The incident loading process may hang due to unknown bugs or semaphore lock waits. + // Therefore, give it maximum time of 10s to finish normally, otherwise give up and fail. + ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) + defer cancelFunc() + + err := incident.LoadOpenIncidents(ctx, db, logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour), runtimeConfig) + require.NoError(t, err, "loading active incidents should not fail") + } + reloadIncidents(ctx) + + for severity, ev := range states { + obj, err := object.FromEvent(ctx, db, ev) + assert.NoError(t, err) + + i, err := incident.GetCurrent(ctx, db, obj, logs.GetLogger(), runtimeConfig, false) + assert.NoErrorf(t, err, "incident for event severity %q should be in cache", severity) + + assert.Equal(t, obj, i.Object, "incident and event object should be the same") + assert.Equal(t, i.Severity, ev.Severity, "incident and event severity should be the same") + } + + // Recover the incidents + for _, ev := range states { + ev.Time = time.Now() + ev.Severity = event.SeverityOK + + assert.NoErrorf(t, Process(ctx, db, logs, runtimeConfig, ev), "state event with severity %q should close an incident", "ok") + } + reloadIncidents(ctx) + assert.Len(t, incident.GetCurrentIncidents(), 0, "there should be no cached incidents") + }) + + t.Run("NonStateEvents", func(t *testing.T) { + events := []*event.Event{ + makeEvent(t, source.ID, event.TypeDowntimeStart, event.SeverityNone), + makeEvent(t, source.ID, event.TypeDowntimeEnd, event.SeverityNone), + makeEvent(t, source.ID, event.TypeDowntimeRemoved, event.SeverityNone), + makeEvent(t, source.ID, event.TypeCustom, event.SeverityNone), + makeEvent(t, source.ID, event.TypeFlappingStart, event.SeverityNone), + makeEvent(t, source.ID, event.TypeFlappingEnd, event.SeverityNone), + } + + for _, ev := range events { + assert.NoErrorf(t, Process(ctx, db, logs, runtimeConfig, ev), "processing non-state event %q should not fail", ev.Type) + assert.Lenf(t, incident.GetCurrentIncidents(), 0, "non-state event %q should not open an incident", ev.Type) + require.NotNil(t, object.GetFromCache(object.ID(source.ID, ev.Tags)), "there should be a cached object") + } + }) +} + +// makeEvent creates a fully initialised event.Event of the given type and severity. +func makeEvent(t *testing.T, sourceID int64, typ string, severity event.Severity) *event.Event { + return &event.Event{ + SourceId: sourceID, + Name: testutils.MakeRandomString(t), + URL: "https://localhost/icingaweb2/icingadb", + Type: typ, + Time: time.Now(), + Severity: severity, + Username: "icingaadmin", + Message: "You will contract a rare disease :(", + Tags: map[string]string{ + "Host": testutils.MakeRandomString(t), + "Service": testutils.MakeRandomString(t), + }, + ExtraTags: map[string]string{ + "hostgroup/database-server": "", + "servicegroup/webserver": "", + }, + } +} diff --git a/internal/events/router.go b/internal/events/router.go new file mode 100644 index 00000000..ce1dde08 --- /dev/null +++ b/internal/events/router.go @@ -0,0 +1,225 @@ +package events + +import ( + "context" + "fmt" + "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/notification" + "github.com/icinga/icinga-notifications/internal/object" + "github.com/icinga/icinga-notifications/internal/rule" + "github.com/icinga/icinga-notifications/internal/utils" + "github.com/jmoiron/sqlx" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// router dispatches all incoming events to their corresponding handlers and provides a default one if there is none. +// +// You should always use this type to handle events properly and shouldn't try to bypass it +// by accessing other handlers directly. +type router struct { + // notification.Notifier is a helper type used to send notifications. + // It is embedded to allow direct access to its members, such as logger, DB etc. + notification.Notifier + + // config.Evaluable encapsulates all evaluable configuration types, such as rule.Rule, rule.Entry etc. + // It is embedded to enable direct access to its members. + *config.Evaluable + + logs *logging.Logging +} + +// route routes the specified event.Event to its corresponding handler. +// +// This function first constructs the target object.Object and its incident.Incident from the provided event.Event. +// After some safety checks have been carried out, the event is then handed over to the process method. +// +// Returns an error if it fails to successfully route/process the provided event. +func (r *router) route(ctx context.Context, ev *event.Event) error { + var wasObjectMuted bool + if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil { + wasObjectMuted = obj.IsMuted() + } + + obj, err := object.FromEvent(ctx, r.DB, ev) + if err != nil { + r.Logger.Errorw("Failed to generate object from event", zap.Stringer("event", ev), zap.Error(err)) + return err + } + + r.Logger = r.Logger.With(zap.String("object", obj.DisplayName()), zap.Stringer("event", ev)) + + createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK + currentIncident, err := incident.GetCurrent(ctx, r.DB, obj, r.logs.GetChildLogger("incident"), r.RuntimeConfig, createIncident) + if err != nil { + r.Logger.Errorw("Failed to create/determine an incident", zap.Error(err)) + return err + } + + if currentIncident == nil { + switch { + case ev.Severity == event.SeverityNone: + // We need to ignore superfluous mute and unmute events here, as would be the case with an existing + // incident, otherwise the event stream catch-up phase will generate useless events after each + // Icinga 2 reload and overwhelm the database with the very same mute/unmute events. + if wasObjectMuted && ev.Type == event.TypeMute { + return event.ErrSuperfluousMuteUnmuteEvent + } + if !wasObjectMuted && ev.Type == event.TypeUnmute { + return event.ErrSuperfluousMuteUnmuteEvent + } + case ev.Severity == event.SeverityOK: + r.Logger.Debugw("Cannot process OK state event", zap.Int64("source_id", ev.SourceId)) + return errors.Wrapf(event.ErrSuperfluousStateChange, "OK state event from source %d", ev.SourceId) + default: + panic(fmt.Sprintf("cannot process event %v with a non-OK state %v without a known incident", ev, ev.Severity)) + } + } + + return r.process(ctx, obj, ev, currentIncident, wasObjectMuted) +} + +// process processes the provided event and notifies the recipients of the resulting notifications in a non-blocking manner. +// You should be aware, though, that this method might block competing events that refer to the same incident.Incident. +// +// process processes the specified event in an own transaction and rolls back any changes made to the database +// if it returns with an error. However, it should be noted that notifications are triggered outside a database +// transaction initiated after successful event processing and will not undo the changes made by the event processing +// tx if sending the notifications fails. +// +// Returns an error in case of internal processing errors. +func (r *router) process(ctx context.Context, obj *object.Object, ev *event.Event, currentIncident *incident.Incident, wasObjMuted bool) error { + tx, err := r.DB.BeginTxx(ctx, nil) + if err != nil { + r.Logger.Errorw("Failed to start a database transaction", zap.Error(err)) + return err + } + defer func() { _ = tx.Rollback() }() + + if err := ev.Sync(ctx, tx, r.DB, obj.ID); err != nil { + r.Logger.Errorw("Failed to sync an event to the database", zap.Error(err)) + return err + } + + r.RuntimeConfig.RLock() + defer r.RuntimeConfig.RUnlock() + + if currentIncident != nil { + currentIncident.Lock() + defer currentIncident.Unlock() + + if err := currentIncident.ProcessEvent(ctx, tx, ev); err != nil { + return err + } + } + + // EvaluateRules only returns an error if one of the provided callback hooks returns + // an error or the OnError handler returns false, and since none of our callbacks return + // an error nor false, we can safely discard the return value here. + _ = r.EvaluateRules(r.RuntimeConfig, obj, config.EvalOptions[*rule.Rule, any]{ + OnPreEvaluate: func(r *rule.Rule) bool { return r.Type == rule.TypeRouting }, + OnFilterMatch: func(ru *rule.Rule) error { + r.Logger.Infow("Rule matches", zap.Object("rule", ru)) + return nil + }, + OnError: func(ru *rule.Rule, err error) bool { + r.Logger.Warnw("Failed to evaluate non-state rule condition", zap.Object("rule", ru), zap.Error(err)) + return true + }, + }) + + filterContext := &rule.RoutingFilter{EventType: ev.Type} + // EvaluateRuleEntries only returns an error if one of the provided callback hooks returns + // an error or the OnError handler returns false, and since none of our callbacks return an + // error nor false, we can safely discard the return value here. + _ = r.EvaluateRuleEntries(r.RuntimeConfig, filterContext, config.EvalOptions[*rule.Entry, any]{ + OnFilterMatch: func(route *rule.Entry) error { + ru := r.RuntimeConfig.Rules[route.RuleID] + r.Logger.Debugw("Routing condition matches", zap.Object("rule", ru), zap.Object("rule_routing", route)) + return nil + }, + OnError: func(route *rule.Entry, err error) bool { + ru := r.RuntimeConfig.Rules[route.RuleID] + r.Logger.Warnw("Failed to evaluate routing condition", + zap.Object("rule", ru), + zap.Object("rule_routing", route), + zap.Error(err)) + return true + }, + }) + + var incidentID int64 + notifications := make(notification.PendingNotifications) + if currentIncident != nil { + incidentID = currentIncident.ID + notifications, err = currentIncident.GenerateNotifications(ctx, tx, ev, currentIncident.GetRecipientsChannel(ev.Time)) + if err != nil { + r.Logger.Errorw("Failed to generate incident notifications", zap.Error(err)) + return err + } + } + if err := r.generateNotifications(ctx, tx, ev, wasObjMuted && obj.IsMuted(), incidentID, notifications); err != nil { + return err + } + + if err = tx.Commit(); err != nil { + r.Logger.Errorw("Cannot commit database transaction", zap.Error(err)) + return err + } + + if currentIncident != nil { + // We've just committed the DB transaction and can safely update the incident muted flag. + currentIncident.RefreshIsMuted() + return currentIncident.NotifyContacts(ctx, currentIncident.MakeNotificationRequest(ev), notifications) + } + + if err := r.NotifyContacts(ctx, notification.NewPluginRequest(obj, ev), notifications); err != nil { + r.Logger.Errorw("Failed to send all pending notifications", zap.Error(err)) + return err + } + + return nil +} + +// generateNotifications generates non-state notifications and loads them into the provided map. +// +// Returns an error if it fails to persist the generated pending/suppressed notifications to the database. +func (r *router) generateNotifications( + ctx context.Context, tx *sqlx.Tx, ev *event.Event, suppressed bool, incidentID int64, + notifications notification.PendingNotifications, +) error { + for _, route := range r.RuleEntries { + channels := make(rule.ContactChannels) + channels.LoadFromEntryRecipients(route, ev.Time, rule.AlwaysNotifiable) + if len(channels) == 0 { + r.Logger.Warnw("Rule routing expanded to no contacts", + zap.Object("rule_routing", route)) + continue + } + + histories, err := notification.AddNotifications(ctx, r.DB, tx, channels, func(h *notification.History) { + h.RuleEntryID = utils.ToDBInt(route.ID) + h.IncidentID = utils.ToDBInt(incidentID) + h.Message = utils.ToDBString(ev.Message) + if suppressed { + h.NotificationState = notification.StateSuppressed + } + }) + if err != nil { + r.Logger.Errorw("Failed to insert pending notification histories", + zap.Inline(route), zap.Bool("suppressed", suppressed), zap.Error(err)) + return err + } + + if !suppressed { + for contact, entries := range histories { + notifications[contact] = append(notifications[contact], entries...) + } + } + } + + return nil +} diff --git a/internal/icinga2/launcher.go b/internal/icinga2/launcher.go index becc9226..54849160 100644 --- a/internal/icinga2/launcher.go +++ b/internal/icinga2/launcher.go @@ -13,7 +13,7 @@ import ( "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" - "github.com/icinga/icinga-notifications/internal/incident" + "github.com/icinga/icinga-notifications/internal/events" "go.uber.org/zap" "net/http" "sync" @@ -129,7 +129,7 @@ func (launcher *Launcher) launch(src *config.Source) { CallbackFn: func(ev *event.Event) { l := logger.With(zap.Stringer("event", ev)) - err := incident.ProcessEvent(subCtx, launcher.Db, launcher.Logs, launcher.RuntimeConfig, ev) + err := events.Process(subCtx, launcher.Db, launcher.Logs, launcher.RuntimeConfig, ev) switch { case errors.Is(err, event.ErrSuperfluousStateChange): l.Debugw("Stopped processing event with superfluous state change", zap.Error(err)) diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 5a399fc8..ade37205 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -78,6 +78,12 @@ func (i *Incident) String() string { return fmt.Sprintf("#%d", i.ID) } +// RefreshIsMuted refreshes the current incident isMuted flag. +// Please note that you always have to call this method while holding the incident lock. +func (i *Incident) RefreshIsMuted() { + i.isMuted = i.Object.IsMuted() +} + func (i *Incident) HasManager() bool { for recipientKey, state := range i.Recipients { if i.RuntimeConfig.GetRecipient(recipientKey) == nil { @@ -104,14 +110,16 @@ func (i *Incident) IsNotifiable(role ContactRole) bool { return role > RoleRecipient } -// ProcessEvent processes the given event for the current incident in an own transaction. -func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { - i.Lock() - defer i.Unlock() - - i.RuntimeConfig.RLock() - defer i.RuntimeConfig.RUnlock() - +// ProcessEvent processes the given event for the current incident. +// +// ProcessEvent will perform all the necessary actions for the current incident and execute any database queries +// within the provided transaction. However, this method does not trigger any notifications by itself and must be +// generated/triggered manually via the GenerateNotifications method. +// +// Please note that you always have to call this method while holding the incident and config.RuntimeConfig lock. +// +// Returns an error when it fails to successfully process the specified event. +func (i *Incident) ProcessEvent(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { // These event types are not like the others used to mute an object/incident, such as DowntimeStart, which // uniquely identify themselves why an incident is being muted, but are rather super generic types, and as // such, we are ignoring superfluous ones that don't have any effect on that incident. @@ -123,22 +131,9 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { return event.ErrSuperfluousMuteUnmuteEvent } - tx, err := i.DB.BeginTxx(ctx, nil) - if err != nil { - i.Logger.Errorw("Cannot start a db transaction", zap.Error(err)) - return err - } - defer func() { _ = tx.Rollback() }() - - if err = ev.Sync(ctx, tx, i.DB, i.Object.ID); err != nil { - i.Logger.Errorw("Failed to insert event and fetch its ID", zap.String("event", ev.String()), zap.Error(err)) - return err - } - isNew := i.StartedAt.Time().IsZero() if isNew { - err = i.processIncidentOpenedEvent(ctx, tx, ev) - if err != nil { + if err := i.processIncidentOpenedEvent(ctx, tx, ev); err != nil { return err } @@ -180,10 +175,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { // Re-evaluate escalations based on the newly evaluated rules. i.evaluateEscalations(ev.Time) - - if err := i.triggerEscalations(ctx, tx, ev); err != nil { - return err - } + return i.triggerEscalations(ctx, tx, ev) case event.TypeAcknowledgementSet: if err := i.processAcknowledgementEvent(ctx, tx, ev); err != nil { if errors.Is(err, errSuperfluousAckEvent) { @@ -196,20 +188,7 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } } - notifications, err := i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time)) - if err != nil { - return err - } - - if err = tx.Commit(); err != nil { - i.Logger.Errorw("Cannot commit db transaction", zap.Error(err)) - return err - } - - // We've just committed the DB transaction and can safely update the incident muted flag. - i.isMuted = i.Object.IsMuted() - - return i.NotifyContacts(ctx, i.makeNotificationRequest(ev), notifications) + return nil } // RetriggerEscalations tries to re-evaluate the escalations and notify contacts. @@ -257,13 +236,13 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { channels.LoadFromEntryRecipients(escalation, ev.Time, i.isRecipientNotifiable) } - notifications, err = i.generateNotifications(ctx, tx, ev, channels) + notifications, err = i.GenerateNotifications(ctx, tx, ev, channels) return err }) if err != nil { i.Logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) } else { - if err = i.NotifyContacts(ctx, i.makeNotificationRequest(ev), notifications); err != nil { + if err = i.NotifyContacts(ctx, i.MakeNotificationRequest(ev), notifications); err != nil { i.Logger.Errorw("Failed to notify reevaluated escalation recipients", zap.Error(err)) return } @@ -569,8 +548,8 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, return nil } -// getRecipientsChannel returns all the configured channels of the current incident and escalation recipients. -func (i *Incident) getRecipientsChannel(t time.Time) rule.ContactChannels { +// GetRecipientsChannel returns all the configured channels of the current incident and escalation recipients. +func (i *Incident) GetRecipientsChannel(t time.Time) rule.ContactChannels { contactChs := make(rule.ContactChannels) // Load all escalations recipients channels for escalationID := range i.EscalationState { diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 061d1cfe..cef99f6c 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -11,7 +11,6 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/utils" - "github.com/jmoiron/sqlx" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -196,67 +195,3 @@ func GetCurrentIncidents() map[int64]*Incident { } return m } - -// ProcessEvent from an event.Event. -// -// This function first gets this Event's object.Object and its incident.Incident. Then, after performing some safety -// checks, it calls the Incident.ProcessEvent method. -// -// The returned error might be wrapped around event.ErrSuperfluousStateChange. -func ProcessEvent( - ctx context.Context, - db *database.DB, - logs *logging.Logging, - runtimeConfig *config.RuntimeConfig, - ev *event.Event, -) error { - var wasObjectMuted bool - if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil { - wasObjectMuted = obj.IsMuted() - } - - obj, err := object.FromEvent(ctx, db, ev) - if err != nil { - return fmt.Errorf("cannot sync event object: %w", err) - } - - createIncident := ev.Severity != event.SeverityNone && ev.Severity != event.SeverityOK - currentIncident, err := GetCurrent( - ctx, - db, - obj, - logs.GetChildLogger("incident"), - runtimeConfig, - createIncident) - if err != nil { - return fmt.Errorf("cannot get current incident for %q: %w", obj.DisplayName(), err) - } - - if currentIncident == nil { - switch { - case ev.Severity == event.SeverityNone: - // We need to ignore superfluous mute and unmute events here, as would be the case with an existing - // incident, otherwise the event stream catch-up phase will generate useless events after each - // Icinga 2 reload and overwhelm the database with the very same mute/unmute events. - if wasObjectMuted && ev.Type == event.TypeMute { - return event.ErrSuperfluousMuteUnmuteEvent - } else if !wasObjectMuted && ev.Type == event.TypeUnmute { - return event.ErrSuperfluousMuteUnmuteEvent - } - - // There is no active incident, but the event appears to be relevant, so try to persist it in the DB. - err = utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) }) - if err != nil { - return errors.New("cannot sync non-state event to the database") - } - - return nil - case ev.Severity != event.SeverityOK: - panic(fmt.Sprintf("cannot process event %v with a non-OK state %v without a known incident", ev, ev.Severity)) - default: - return fmt.Errorf("%w: ok state event from source %d", event.ErrSuperfluousStateChange, ev.SourceId) - } - } - - return currentIncident.ProcessEvent(ctx, ev) -} diff --git a/internal/incident/sync.go b/internal/incident/sync.go index ccbb59ee..830fd9e3 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -124,12 +124,12 @@ func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule return err } -// generateNotifications generates incident notification histories of the given recipients. +// GenerateNotifications generates incident notification histories of the given recipients. // // This function will just insert notification.StateSuppressed incident histories and return an empty slice if // the current Object is muted, otherwise a slice of pending *NotificationEntry(ies) that can be used to update // the corresponding histories after the actual notifications have been sent out. -func (i *Incident) generateNotifications( +func (i *Incident) GenerateNotifications( ctx context.Context, tx *sqlx.Tx, ev *event.Event, contactChannels rule.ContactChannels, ) (notification.PendingNotifications, error) { notifications, err := notification.AddNotifications(ctx, i.DB, tx, contactChannels, func(n *notification.History) { diff --git a/internal/incident/utils.go b/internal/incident/utils.go index 357aa0e2..a75ee997 100644 --- a/internal/incident/utils.go +++ b/internal/incident/utils.go @@ -10,9 +10,9 @@ import ( "net/url" ) -// makeNotificationRequest generates a *plugin.NotificationRequest for the provided event. +// MakeNotificationRequest generates a *plugin.NotificationRequest for the provided event. // Fails fatally when fails to parse the Icinga Web 2 url. -func (i *Incident) makeNotificationRequest(ev *event.Event) *plugin.NotificationRequest { +func (i *Incident) MakeNotificationRequest(ev *event.Event) *plugin.NotificationRequest { baseUrl, err := url.Parse(daemon.Config().Icingaweb2URL) if err != nil { i.Logger.Panicw("Failed to parse Icinga Web 2 URL", zap.String("url", daemon.Config().Icingaweb2URL), zap.Error(err)) diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 7429f6e1..9b63d088 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -12,6 +12,7 @@ import ( "github.com/icinga/icinga-notifications/internal/config" "github.com/icinga/icinga-notifications/internal/daemon" "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/events" "github.com/icinga/icinga-notifications/internal/incident" "go.uber.org/zap" "net/http" @@ -133,7 +134,7 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { } l.logger.Infow("Processing event", zap.String("event", ev.String())) - err = incident.ProcessEvent(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) + err = events.Process(context.Background(), l.db, l.logs, l.runtimeConfig, &ev) if errors.Is(err, event.ErrSuperfluousStateChange) || errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent) { abort(http.StatusNotAcceptable, &ev, "%v", err) return