From 39ba43bf3ad60d9362e14e5de52225c18ae5e0aa Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Mon, 17 Jun 2024 18:21:33 +0200 Subject: [PATCH] WIP --- internal/channel/channel.go | 30 ++++ internal/config/channel.go | 90 ++-------- internal/config/contact.go | 60 ++----- internal/config/contact_address.go | 57 +----- internal/config/group.go | 114 +++++------- internal/config/incremental_sync.go | 169 ++++++++++++++++++ internal/config/rule.go | 266 +++++++++++----------------- internal/config/runtime.go | 26 ++- internal/config/schedule.go | 246 ++++++++++++------------- internal/recipient/contact.go | 45 +++++ internal/recipient/group.go | 62 ++++++- internal/recipient/schedule.go | 58 ++++++ internal/rule/escalation.go | 55 ++++++ internal/rule/rule.go | 28 +++ internal/utils/utils.go | 6 + 15 files changed, 756 insertions(+), 556 deletions(-) create mode 100644 internal/config/incremental_sync.go diff --git a/internal/channel/channel.go b/internal/channel/channel.go index a318ac69f..dd0ba154e 100644 --- a/internal/channel/channel.go +++ b/internal/channel/channel.go @@ -4,11 +4,13 @@ import ( "context" "errors" "fmt" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/contracts" "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/pkg/plugin" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "net/url" ) @@ -18,6 +20,9 @@ type Channel struct { Type string `db:"type"` Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` + Logger *zap.SugaredLogger `db:"-"` restartCh chan newConfig @@ -27,6 +32,31 @@ type Channel struct { pluginCtxCancel func() } +func (c *Channel) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", c.ID) + encoder.AddString("name", c.Name) + encoder.AddString("type", c.Type) + encoder.AddTime("changed_at", c.ChangedAt.Time()) + encoder.AddBool("deleted", c.IsDeleted()) + return nil +} + +func (c *Channel) GetID() int64 { + return c.ID +} + +func (c *Channel) GetChangedAt() types.UnixMilli { + return c.ChangedAt +} + +func (c *Channel) IsDeleted() bool { + return c.Deleted.Valid && c.Deleted.Bool +} + +func (c *Channel) Validate() error { + return ValidateType(c.Type) +} + // newConfig helps to store the channel's updated properties type newConfig struct { ctype string diff --git a/internal/config/channel.go b/internal/config/channel.go index 769a919f3..f7105ab8f 100644 --- a/internal/config/channel.go +++ b/internal/config/channel.go @@ -8,79 +8,25 @@ import ( ) func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error { - var channelPtr *channel.Channel - stmt := r.db.BuildSelectStmt(channelPtr, channelPtr) - r.logger.Debugf("Executing query %q", stmt) - - var channels []*channel.Channel - if err := tx.SelectContext(ctx, &channels, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - channelsById := make(map[int64]*channel.Channel) - for _, c := range channels { - channelLogger := r.logger.With( - zap.Int64("id", c.ID), - zap.String("name", c.Name), - zap.String("type", c.Type), - ) - if channelsById[c.ID] != nil { - channelLogger.Warnw("ignoring duplicate config for channel type") - } else if err := channel.ValidateType(c.Type); err != nil { - channelLogger.Errorw("Cannot load channel config", zap.Error(err)) - } else { - channelsById[c.ID] = c - - channelLogger.Debugw("loaded channel config") - } - } - - if r.Channels != nil { - // mark no longer existing channels for deletion - for id := range r.Channels { - if _, ok := channelsById[id]; !ok { - channelsById[id] = nil - } - } - } - - r.pending.Channels = channelsById - - return nil + return incrementalFetch(ctx, r, tx, &r.pending.Channels) } func (r *RuntimeConfig) applyPendingChannels() { - if r.Channels == nil { - r.Channels = make(map[int64]*channel.Channel) - } - - for id, pendingChannel := range r.pending.Channels { - if pendingChannel == nil { - r.Channels[id].Logger.Info("Channel has been removed") - r.Channels[id].Stop() - - delete(r.Channels, id) - } else if currentChannel := r.Channels[id]; currentChannel != nil { - // Currently, the whole config is reloaded from the database frequently, replacing everything. - // Prevent restarting the plugin processes every time by explicitly checking for config changes. - // The if condition should no longer be necessary when https://github.com/Icinga/icinga-notifications/issues/5 - // is solved properly. - if currentChannel.Type != pendingChannel.Type || currentChannel.Name != pendingChannel.Name || currentChannel.Config != pendingChannel.Config { - currentChannel.Type = pendingChannel.Type - currentChannel.Name = pendingChannel.Name - currentChannel.Config = pendingChannel.Config - - currentChannel.Restart() - } - } else { - pendingChannel.Start(context.TODO(), r.logs.GetChildLogger("channel").With( - zap.Int64("id", pendingChannel.ID), - zap.String("name", pendingChannel.Name))) - - r.Channels[id] = pendingChannel - } - } - - r.pending.Channels = nil + incrementalApplyPending( + r, + &r.Channels, &r.pending.Channels, + func(element *channel.Channel) { + element.Start(context.TODO(), r.logs.GetChildLogger("channel").With( + zap.Int64("id", element.ID), + zap.String("name", element.Name))) + }, + func(element, update *channel.Channel) { + element.Name = update.Name + element.Type = update.Type + element.Config = update.Config + element.Restart() + }, + func(element *channel.Channel) { + element.Stop() + }) } diff --git a/internal/config/contact.go b/internal/config/contact.go index bb72efad4..86a03f18e 100644 --- a/internal/config/contact.go +++ b/internal/config/contact.go @@ -4,59 +4,21 @@ import ( "context" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/jmoiron/sqlx" - "go.uber.org/zap" ) func (r *RuntimeConfig) fetchContacts(ctx context.Context, tx *sqlx.Tx) error { - var contactPtr *recipient.Contact - stmt := r.db.BuildSelectStmt(contactPtr, contactPtr) - r.logger.Debugf("Executing query %q", stmt) - - var contacts []*recipient.Contact - if err := tx.SelectContext(ctx, &contacts, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - contactsByID := make(map[int64]*recipient.Contact) - for _, c := range contacts { - contactsByID[c.ID] = c - - r.logger.Debugw("loaded contact config", - zap.Int64("id", c.ID), - zap.String("name", c.FullName)) - } - - if r.Contacts != nil { - // mark no longer existing contacts for deletion - for id := range r.Contacts { - if _, ok := contactsByID[id]; !ok { - contactsByID[id] = nil - } - } - } - - r.pending.Contacts = contactsByID - - return nil + return incrementalFetch(ctx, r, tx, &r.pending.Contacts) } func (r *RuntimeConfig) applyPendingContacts() { - if r.Contacts == nil { - r.Contacts = make(map[int64]*recipient.Contact) - } - - for id, pendingContact := range r.pending.Contacts { - if pendingContact == nil { - delete(r.Contacts, id) - } else if currentContact := r.Contacts[id]; currentContact != nil { - currentContact.FullName = pendingContact.FullName - currentContact.Username = pendingContact.Username - currentContact.DefaultChannelID = pendingContact.DefaultChannelID - } else { - r.Contacts[id] = pendingContact - } - } - - r.pending.Contacts = nil + incrementalApplyPending( + r, + &r.Contacts, &r.pending.Contacts, + nil, + func(element, update *recipient.Contact) { + element.FullName = update.FullName + element.Username = update.Username + element.DefaultChannelID = update.DefaultChannelID + }, + nil) } diff --git a/internal/config/contact_address.go b/internal/config/contact_address.go index f89f82f0a..dbf123979 100644 --- a/internal/config/contact_address.go +++ b/internal/config/contact_address.go @@ -9,59 +9,16 @@ import ( ) func (r *RuntimeConfig) fetchContactAddresses(ctx context.Context, tx *sqlx.Tx) error { - var addressPtr *recipient.Address - stmt := r.db.BuildSelectStmt(addressPtr, addressPtr) - r.logger.Debugf("Executing query %q", stmt) - - var addresses []*recipient.Address - if err := tx.SelectContext(ctx, &addresses, stmt); err != nil { - r.logger.Errorln(err) - return err - } - - addressesById := make(map[int64]*recipient.Address) - for _, a := range addresses { - addressesById[a.ID] = a - r.logger.Debugw("loaded contact_address config", - zap.Int64("id", a.ID), - zap.Int64("contact_id", a.ContactID), - zap.String("type", a.Type), - zap.String("address", a.Address), - ) - } - - if r.ContactAddresses != nil { - // mark no longer existing contacts for deletion - for id := range r.ContactAddresses { - if _, ok := addressesById[id]; !ok { - addressesById[id] = nil - } - } - } - - r.pending.ContactAddresses = addressesById - - return nil + return incrementalFetch(ctx, r, tx, &r.pending.ContactAddresses) } func (r *RuntimeConfig) applyPendingContactAddresses() { - if r.ContactAddresses == nil { - r.ContactAddresses = make(map[int64]*recipient.Address) - } - - for id, pendingAddress := range r.pending.ContactAddresses { - currentAddress := r.ContactAddresses[id] - - if pendingAddress == nil { - r.removeContactAddress(currentAddress) - } else if currentAddress != nil { - r.updateContactAddress(currentAddress, pendingAddress) - } else { - r.addContactAddress(pendingAddress) - } - } - - r.pending.ContactAddresses = nil + incrementalApplyPending( + r, + &r.ContactAddresses, &r.pending.ContactAddresses, + r.addContactAddress, + r.updateContactAddress, + r.removeContactAddress) } func (r *RuntimeConfig) addContactAddress(addr *recipient.Address) { diff --git a/internal/config/group.go b/internal/config/group.go index 433162aaf..dd3b6cadd 100644 --- a/internal/config/group.go +++ b/internal/config/group.go @@ -5,96 +5,62 @@ import ( "github.com/icinga/icinga-notifications/internal/recipient" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" ) func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { - var groupPtr *recipient.Group - stmt := r.db.BuildSelectStmt(groupPtr, groupPtr) - r.logger.Debugf("Executing query %q", stmt) - - var groups []*recipient.Group - if err := tx.SelectContext(ctx, &groups, stmt); err != nil { - r.logger.Errorln(err) + err := incrementalFetch(ctx, r, tx, &r.pending.Groups) + if err != nil { return err } - groupsById := make(map[int64]*recipient.Group) - for _, g := range groups { - groupsById[g.ID] = g - - r.logger.Debugw("loaded group config", - zap.Int64("id", g.ID), - zap.String("name", g.Name)) - } - - type ContactgroupMember struct { - GroupId int64 `db:"contactgroup_id"` - ContactId int64 `db:"contact_id"` - } - - var memberPtr *ContactgroupMember - stmt = r.db.BuildSelectStmt(memberPtr, memberPtr) - r.logger.Debugf("Executing query %q", stmt) - - var members []*ContactgroupMember - if err := tx.SelectContext(ctx, &members, stmt); err != nil { - r.logger.Errorln(err) + err = incrementalFetch(ctx, r, tx, &r.pending.GroupMembers) + if err != nil { return err } - for _, m := range members { - memberLogger := r.logger.With( - zap.Int64("contact_id", m.ContactId), - zap.Int64("contactgroup_id", m.GroupId), - ) - - if g := groupsById[m.GroupId]; g == nil { - memberLogger.Warnw("ignoring member for unknown contactgroup_id") - } else { - g.MemberIDs = append(g.MemberIDs, m.ContactId) - - memberLogger.Debugw("loaded contact group member", - zap.String("contactgroup_name", g.Name)) - } - } - - if r.Groups != nil { - // mark no longer existing groups for deletion - for id := range r.Groups { - if _, ok := groupsById[id]; !ok { - groupsById[id] = nil - } - } - } - - r.pending.Groups = groupsById - return nil } func (r *RuntimeConfig) applyPendingGroups() { - if r.Groups == nil { - r.Groups = make(map[int64]*recipient.Group) - } + incrementalApplyPending( + r, + &r.Groups, &r.pending.Groups, + nil, + nil, // TODO: sync fields + nil) + + incrementalApplyPending( + r, + &r.GroupMembers, &r.pending.GroupMembers, + func(element *recipient.GroupMember) { + group, ok := r.Groups[element.GroupId] + if !ok { + r.logger.Errorw("Cannot update group membership for unknown group", zap.Inline(element)) + return + } - for id, pendingGroup := range r.pending.Groups { - if pendingGroup == nil { - delete(r.Groups, id) - } else { - pendingGroup.Members = make([]*recipient.Contact, 0, len(pendingGroup.MemberIDs)) - for _, contactID := range pendingGroup.MemberIDs { - if c := r.Contacts[contactID]; c != nil { - pendingGroup.Members = append(pendingGroup.Members, c) - } + contact, ok := r.Contacts[element.ContactId] + if !ok { + r.logger.Errorw("Cannot update group membership for unknown contact", zap.Inline(element)) + return } - if currentGroup := r.Groups[id]; currentGroup != nil { - *currentGroup = *pendingGroup - } else { - r.Groups[id] = pendingGroup + group.Members = append(group.Members, contact) + }, + func(element, update *recipient.GroupMember) { + r.logger.Warnf("Group memberships shouldn't change, from %+v to %+v", element, update) + }, + func(element *recipient.GroupMember) { + group, ok := r.Groups[element.GroupId] + if !ok { + r.logger.Errorw("Cannot update group membership for unknown group", zap.Inline(element)) + return } - } - } - r.pending.Groups = nil + group.Members = slices.DeleteFunc(group.Members, func(contact *recipient.Contact) bool { + return contact.ID == element.ContactId + }) + }) + } diff --git a/internal/config/incremental_sync.go b/internal/config/incremental_sync.go new file mode 100644 index 000000000..018d94dbd --- /dev/null +++ b/internal/config/incremental_sync.go @@ -0,0 +1,169 @@ +package config + +import ( + "context" + "github.com/icinga/icinga-go-library/database" + "github.com/icinga/icinga-go-library/types" + "github.com/jmoiron/sqlx" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// IncrementalConfigurable specifies Getter methods required for types supporting incremental configuration loading. +type IncrementalConfigurable[T comparable] interface { + zapcore.ObjectMarshaler + + // GetID returns the primary key value. + GetID() T + + // GetChangedAt returns the changed_at value. + GetChangedAt() types.UnixMilli + + // IsDeleted returns if this entry was marked as deleted. + IsDeleted() bool + + // Validate and optionally return an error, resulting in aborting this entry. + Validate() error +} + +// buildSelectStmtWhereChangedAt creates a SQL SELECT for an incremental configuration synchronization. +// +// The query, which will be a prepared statement, expects a types.UnixMilli parameter to be compared against. This +// parameter might be NULL, being COALESCEd to a numeric zero, returning all rows. +// +// The rows will be ordered by `changed_at`, allowing to update the last change timestamp when iterating over it. +func (r *RuntimeConfig) buildSelectStmtWhereChangedAt(typePtr interface{}) string { + return r.db.Rebind(r.db.BuildSelectStmt(typePtr, typePtr) + + ` WHERE "changed_at" > COALESCE(?, CAST(0 AS BIGINT)) ORDER BY "changed_at"`) +} + +func incrementalFetch[ + BaseT any, + CompT comparable, + T interface { + *BaseT + IncrementalConfigurable[CompT] + }, +](ctx context.Context, r *RuntimeConfig, tx *sqlx.Tx, pendingConfigSetField *map[CompT]T) error { + var typePtr T + + tableName := database.TableName(typePtr) + changedAt := r.pendingChangeTimestamps[tableName] + stmt := r.buildSelectStmtWhereChangedAt(typePtr) + + stmtLogger := r.logger.With( + zap.String("table", tableName), + zap.String("query", stmt), + zap.Time("changed_at", changedAt.Time())) + + stmtLogger.Debug("Executing query to fetch incremental config updates") + + var ts []T + if err := tx.SelectContext(ctx, &ts, stmt, changedAt); err != nil { + stmtLogger.Errorw("Cannot execute query to fetch incremental config updates", zap.Error(err)) + return err + } + + tsById := make(map[CompT]T) + countDel, countErr, countLoad := 0, 0, 0 + for _, t := range ts { + changedAt = t.GetChangedAt() + + logger := r.logger.With(zap.String("table", tableName), zap.Inline(t)) + if t.IsDeleted() { + countDel++ + logger.Debug("Marking entry as deleted") + tsById[t.GetID()] = nil + } else if err := t.Validate(); err != nil { + countErr++ + logger.Errorw("Cannot validate entry", zap.Error(err)) + } else { + countLoad++ + logger.Debug("Loaded entry") + tsById[t.GetID()] = t + } + } + + if countDel+countErr+countLoad > 0 { + r.logger.Debugw("Fetched incremental configuration updates", + zap.String("table", tableName), + zap.Int("deleted-elements", countDel), + zap.Int("faulty-elements", countErr), + zap.Int("loaded-elements", countLoad)) + } else { + r.logger.Debugw("No configuration updates are available", zap.String("table", tableName)) + } + + *pendingConfigSetField = tsById + r.pendingChangeTimestamps[tableName] = changedAt + + return nil +} + +func incrementalApplyPending[ + BaseT any, + CompT comparable, + T interface { + *BaseT + IncrementalConfigurable[CompT] + }, +]( + r *RuntimeConfig, + configSetField, pendingConfigSetField *map[CompT]T, + createFn func(element T), + updateFn func(element, update T), + deleteFn func(element T), +) { + if *configSetField == nil { + *configSetField = make(map[CompT]T) + } + + tableName := database.TableName(new(T)) + countDelSkip, countDel, countUpdate, countNew := 0, 0, 0, 0 + for id, newT := range *pendingConfigSetField { + oldT, oldExists := (*configSetField)[id] + + logger := r.logger.With( + zap.String("table", tableName), + zap.Any("id", id)) + + if newT == nil && !oldExists { + countDelSkip++ + logger.Warn("Skipping unknown element marked as deleted") + continue + } else if newT == nil { + countDel++ + logger.Debug("Deleting element") + if deleteFn != nil { + deleteFn(oldT) + } + delete(*configSetField, id) + } else if oldExists { + countUpdate++ + logger.Debug("Updating known element") + if updateFn != nil { + updateFn(oldT, newT) + } + } else { + countNew++ + logger.Debug("Creating new element") + if createFn != nil { + createFn(newT) + } + (*configSetField)[id] = newT + } + } + + if countDelSkip+countDel+countUpdate+countNew > 0 { + r.logger.Infow("Applied configuration updates", + zap.String("table", tableName), + zap.Int("deleted-unknown-elements", countDelSkip), + zap.Int("deleted-elements", countDel), + zap.Int("updated-elements", countUpdate), + zap.Int("new-elements", countNew)) + } else { + r.logger.Debugw("No configuration updates available to be applied", zap.String("table", tableName)) + } + + *pendingConfigSetField = nil +} diff --git a/internal/config/rule.go b/internal/config/rule.go index 10012cb58..556bf9eee 100644 --- a/internal/config/rule.go +++ b/internal/config/rule.go @@ -2,7 +2,6 @@ package config import ( "context" - "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/rule" "github.com/jmoiron/sqlx" "go.uber.org/zap" @@ -10,186 +9,131 @@ import ( ) func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { - var rulePtr *rule.Rule - stmt := r.db.BuildSelectStmt(rulePtr, rulePtr) - r.logger.Debugf("Executing query %q", stmt) - - var rules []*rule.Rule - if err := tx.SelectContext(ctx, &rules, stmt); err != nil { - r.logger.Errorln(err) + err := incrementalFetch(ctx, r, tx, &r.pending.Rules) + if err != nil { return err } - rulesByID := make(map[int64]*rule.Rule) - for _, ru := range rules { - ruleLogger := r.logger.With(zap.Inline(ru)) - - if ru.ObjectFilterExpr.Valid { - f, err := filter.Parse(ru.ObjectFilterExpr.String) - if err != nil { - ruleLogger.Warnw("ignoring rule as parsing object_filter failed", zap.Error(err)) - continue - } - - ru.ObjectFilter = f - } - - ru.Escalations = make(map[int64]*rule.Escalation) - - rulesByID[ru.ID] = ru - ruleLogger.Debugw("loaded rule config") - } - - var escalationPtr *rule.Escalation - stmt = r.db.BuildSelectStmt(escalationPtr, escalationPtr) - r.logger.Debugf("Executing query %q", stmt) - - var escalations []*rule.Escalation - if err := tx.SelectContext(ctx, &escalations, stmt); err != nil { - r.logger.Errorln(err) + err = incrementalFetch(ctx, r, tx, &r.pending.RuleEscalations) + if err != nil { return err } - escalationsByID := make(map[int64]*rule.Escalation) - for _, escalation := range escalations { - escalationLogger := r.logger.With(zap.Inline(escalation)) - - rule := rulesByID[escalation.RuleID] - if rule == nil { - escalationLogger.Warnw("ignoring escalation for unknown rule_id") - continue - } - - if escalation.ConditionExpr.Valid { - cond, err := filter.Parse(escalation.ConditionExpr.String) - if err != nil { - escalationLogger.Warnw("ignoring escalation, failed to parse condition", zap.Error(err)) - continue - } - - escalation.Condition = cond - } - - if escalation.FallbackForID.Valid { - // TODO: implement fallbacks (needs extra validation: mismatching rule_id, cycles) - escalationLogger.Warnw("ignoring fallback escalation (not yet implemented)") - continue - } - - rule.Escalations[escalation.ID] = escalation - escalationsByID[escalation.ID] = escalation - escalationLogger.Debugw("loaded escalation config") - } - - var recipientPtr *rule.EscalationRecipient - stmt = r.db.BuildSelectStmt(recipientPtr, recipientPtr) - r.logger.Debugf("Executing query %q", stmt) - - var recipients []*rule.EscalationRecipient - if err := tx.SelectContext(ctx, &recipients, stmt); err != nil { - r.logger.Errorln(err) + err = incrementalFetch(ctx, r, tx, &r.pending.RuleEscalationRecipients) + if err != nil { return err } - for _, recipient := range recipients { - recipientLogger := r.logger.With( - zap.Int64("id", recipient.ID), - zap.Int64("escalation_id", recipient.EscalationID), - zap.Int64("channel_id", recipient.ChannelID.Int64)) - - escalation := escalationsByID[recipient.EscalationID] - if escalation == nil { - recipientLogger.Warnw("ignoring recipient for unknown escalation") - } else { - escalation.Recipients = append(escalation.Recipients, recipient) - recipientLogger.Debugw("loaded escalation recipient config") - } - } - - if r.Rules != nil { - // mark no longer existing rules for deletion - for id := range r.Rules { - if _, ok := rulesByID[id]; !ok { - rulesByID[id] = nil - } - } - } - - r.pending.Rules = rulesByID - return nil } func (r *RuntimeConfig) applyPendingRules() { - if r.Rules == nil { - r.Rules = make(map[int64]*rule.Rule) - } - - for id, pendingRule := range r.pending.Rules { - if pendingRule == nil { - delete(r.Rules, id) - } else { - ruleLogger := r.logger.With(zap.Inline(pendingRule)) - - if pendingRule.TimePeriodID.Valid { - if p := r.TimePeriods[pendingRule.TimePeriodID.Int64]; p == nil { - ruleLogger.Warnw("ignoring rule with unknown timeperiod_id") - continue - } else { - pendingRule.TimePeriod = p + incrementalApplyPending( + r, + &r.Rules, &r.pending.Rules, + func(element *rule.Rule) { + if element.TimePeriodID.Valid { + timePeriod, ok := r.TimePeriods[element.TimePeriodID.Int64] + if !ok { + r.logger.Errorw("Cannot load rule with unknown time period", zap.Inline(element)) + return } + + element.TimePeriod = timePeriod + } + + element.Escalations = make(map[int64]*rule.Escalation) + }, + func(element, update *rule.Rule) { + element.IsActive = update.IsActive + element.Name = update.Name + element.ObjectFilter = update.ObjectFilter + }, + nil) + + incrementalApplyPending( + r, + &r.RuleEscalations, &r.pending.RuleEscalations, + func(element *rule.Escalation) { + elementRule, ok := r.Rules[element.RuleID] + if !ok { + r.logger.Errorw("Cannot update rule membership for unknown rule escalation", zap.Inline(element)) + return } - for _, escalation := range pendingRule.Escalations { - for i, recipient := range escalation.Recipients { - recipientLogger := r.logger.With( - zap.Int64("id", recipient.ID), - zap.Int64("escalation_id", recipient.EscalationID), - zap.Int64("channel_id", recipient.ChannelID.Int64), - zap.Inline(recipient.Key)) - - if recipient.ContactID.Valid { - id := recipient.ContactID.Int64 - if c := r.Contacts[id]; c != nil { - recipient.Recipient = c - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else if recipient.GroupID.Valid { - id := recipient.GroupID.Int64 - if g := r.Groups[id]; g != nil { - recipient.Recipient = g - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else if recipient.ScheduleID.Valid { - id := recipient.ScheduleID.Int64 - if s := r.Schedules[id]; s != nil { - recipient.Recipient = s - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } - } else { - recipientLogger.Warnw("ignoring unknown escalation recipient") - escalation.Recipients[i] = nil - } + elementRule.Escalations[element.ID] = element + }, + func(element, update *rule.Escalation) { + element.NameRaw = update.NameRaw + element.Condition = update.Condition + // TODO: Synchronize FallbackForID/Fallback when implemented + }, + func(element *rule.Escalation) { + elementRule, ok := r.Rules[element.RuleID] + if !ok { + r.logger.Errorw("Cannot update rule membership for unknown rule escalation", zap.Inline(element)) + return + } + + delete(elementRule.Escalations, element.ID) + }) + + incrementalApplyPending( + r, + &r.RuleEscalationRecipients, &r.pending.RuleEscalationRecipients, + func(element *rule.EscalationRecipient) { + ok := false + if element.ContactID.Valid { + element.Recipient, ok = r.Contacts[element.ContactID.Int64] + } else if element.GroupID.Valid { + element.Recipient, ok = r.Groups[element.GroupID.Int64] + } else if element.ScheduleID.Valid { + element.Recipient, ok = r.Schedules[element.ScheduleID.Int64] + } + if !ok { + r.logger.Errorw("Rule escalation recipient is missing or unknown", zap.Inline(element)) + return + } + + ruleFound := false + for id, elementRule := range r.Rules { + _, ok := elementRule.Escalations[element.EscalationID] + if ok { + element.RuleID = id + ruleFound = true + break } + } + if !ruleFound { + r.logger.Errorw("Rule escalation recipient cannot be mapped to a rule", zap.Inline(element)) + return + } - escalation.Recipients = slices.DeleteFunc(escalation.Recipients, func(r *rule.EscalationRecipient) bool { - return r == nil - }) + escalation := r.Rules[element.RuleID].Escalations[element.EscalationID] + escalation.Recipients = append(escalation.Recipients, element) + }, + func(element, update *rule.EscalationRecipient) { + element.ChannelID = update.ChannelID + element.ContactID = update.ContactID + element.GroupID = update.ScheduleID + element.ScheduleID = update.ScheduleID + // TODO: re-add + }, + func(element *rule.EscalationRecipient) { + elementRule, ok := r.Rules[element.RuleID] + if !ok { + r.logger.Errorw("Cannot resolve rule for rule escalation recipient", zap.Inline(element)) + return } - if currentRule := r.Rules[id]; currentRule != nil { - *currentRule = *pendingRule - } else { - r.Rules[id] = pendingRule + escalation, ok := elementRule.Escalations[element.EscalationID] + if !ok { + r.logger.Errorw("Cannot resolve escalation for rule escalation recipient", zap.Inline(element)) + return } - } - } - r.pending.Rules = nil + escalation.Recipients = slices.DeleteFunc(escalation.Recipients, func(recipient *rule.EscalationRecipient) bool { + return recipient.EscalationID == element.EscalationID + }) + }) } diff --git a/internal/config/runtime.go b/internal/config/runtime.go index 69b38188d..f0033758d 100644 --- a/internal/config/runtime.go +++ b/internal/config/runtime.go @@ -6,10 +6,12 @@ import ( "errors" "github.com/icinga/icinga-go-library/database" "github.com/icinga/icinga-go-library/logging" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/channel" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/rule" "github.com/icinga/icinga-notifications/internal/timeperiod" + "github.com/icinga/icinga-notifications/internal/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" "golang.org/x/crypto/bcrypt" @@ -30,7 +32,8 @@ type RuntimeConfig struct { EventStreamLaunchFunc func(source *Source) // pending contains changes to config objects that are to be applied to the embedded live config. - pending ConfigSet + pending ConfigSet + pendingChangeTimestamps map[string]types.UnixMilli logs *logging.Logging logger *logging.Logger @@ -55,14 +58,19 @@ func NewRuntimeConfig( } type ConfigSet struct { - Channels map[int64]*channel.Channel - Contacts map[int64]*recipient.Contact - ContactAddresses map[int64]*recipient.Address - Groups map[int64]*recipient.Group - TimePeriods map[int64]*timeperiod.TimePeriod - Schedules map[int64]*recipient.Schedule - Rules map[int64]*rule.Rule - Sources map[int64]*Source + Channels map[int64]*channel.Channel + Contacts map[int64]*recipient.Contact + ContactAddresses map[int64]*recipient.Address + GroupMembers map[utils.CompTuple[int64, int64]]*recipient.GroupMember + Groups map[int64]*recipient.Group + TimePeriods map[int64]*timeperiod.TimePeriod + Schedules map[int64]*recipient.Schedule + ScheduleRotations map[int64]*recipient.Rotation + ScheduleRotationMembers map[int64]*recipient.RotationMember + Rules map[int64]*rule.Rule + RuleEscalations map[int64]*rule.Escalation + RuleEscalationRecipients map[int64]*rule.EscalationRecipient + Sources map[int64]*Source } func (r *RuntimeConfig) UpdateFromDatabase(ctx context.Context) error { diff --git a/internal/config/schedule.go b/internal/config/schedule.go index af3c7557b..da6e989f7 100644 --- a/internal/config/schedule.go +++ b/internal/config/schedule.go @@ -6,167 +6,139 @@ import ( "github.com/icinga/icinga-notifications/internal/timeperiod" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" ) func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error { - var schedulePtr *recipient.Schedule - stmt := r.db.BuildSelectStmt(schedulePtr, schedulePtr) - r.logger.Debugf("Executing query %q", stmt) - - var schedules []*recipient.Schedule - if err := tx.SelectContext(ctx, &schedules, stmt); err != nil { - r.logger.Errorln(err) + err := incrementalFetch(ctx, r, tx, &r.pending.Schedules) + if err != nil { return err } - schedulesById := make(map[int64]*recipient.Schedule) - for _, g := range schedules { - schedulesById[g.ID] = g - - r.logger.Debugw("loaded schedule config", - zap.Int64("id", g.ID), - zap.String("name", g.Name)) - } - - var rotationPtr *recipient.Rotation - stmt = r.db.BuildSelectStmt(rotationPtr, rotationPtr) - r.logger.Debugf("Executing query %q", stmt) - - var rotations []*recipient.Rotation - if err := tx.SelectContext(ctx, &rotations, stmt); err != nil { - r.logger.Errorln(err) + err = incrementalFetch(ctx, r, tx, &r.pending.ScheduleRotations) + if err != nil { return err } - rotationsById := make(map[int64]*recipient.Rotation) - for _, rotation := range rotations { - rotationLogger := r.logger.With(zap.Object("rotation", rotation)) - - if schedule := schedulesById[rotation.ScheduleID]; schedule == nil { - rotationLogger.Warnw("ignoring schedule rotation for unknown schedule_id") - } else { - rotationsById[rotation.ID] = rotation - schedule.Rotations = append(schedule.Rotations, rotation) - - rotationLogger.Debugw("loaded schedule rotation") - } - } - - var rotationMemberPtr *recipient.RotationMember - stmt = r.db.BuildSelectStmt(rotationMemberPtr, rotationMemberPtr) - r.logger.Debugf("Executing query %q", stmt) - - var members []*recipient.RotationMember - if err := tx.SelectContext(ctx, &members, stmt); err != nil { - r.logger.Errorln(err) + err = incrementalFetch(ctx, r, tx, &r.pending.ScheduleRotationMembers) + if err != nil { return err } - rotationMembersById := make(map[int64]*recipient.RotationMember) - for _, member := range members { - memberLogger := r.logger.With(zap.Object("rotation_member", member)) + return nil +} - if rotation := rotationsById[member.RotationID]; rotation == nil { - memberLogger.Warnw("ignoring rotation member for unknown rotation_member_id") - } else { - member.TimePeriodEntries = make(map[int64]*timeperiod.Entry) - rotation.Members = append(rotation.Members, member) - rotationMembersById[member.ID] = member +func (r *RuntimeConfig) applyPendingSchedules() { + incrementalApplyPending( + r, + &r.Schedules, &r.pending.Schedules, + nil, + func(element, update *recipient.Schedule) { + element.Name = update.Name + }, + nil) + + incrementalApplyPending( + r, + &r.ScheduleRotations, &r.pending.ScheduleRotations, + func(element *recipient.Rotation) { + schedule, ok := r.Schedules[element.ScheduleID] + if !ok { + r.logger.Errorw("Cannot resolve schedule for rotation", zap.Inline(element)) + return + } - memberLogger.Debugw("loaded schedule rotation member") - } - } + schedule.Rotations = append(schedule.Rotations, element) + }, + func(element, update *recipient.Rotation) { + element.ActualHandoff = update.ActualHandoff + element.Priority = update.Priority + element.Name = update.Name + }, + func(element *recipient.Rotation) { + schedule, ok := r.Schedules[element.ScheduleID] + if !ok { + r.logger.Errorw("Cannot resolve schedule for rotation", zap.Inline(element)) + return + } - var entryPtr *timeperiod.Entry - stmt = r.db.BuildSelectStmt(entryPtr, entryPtr) + " WHERE rotation_member_id IS NOT NULL" - r.logger.Debugf("Executing query %q", stmt) + schedule.Rotations = slices.DeleteFunc(schedule.Rotations, func(rotation *recipient.Rotation) bool { + return rotation.ID == element.ID + }) + }) + + incrementalApplyPending( + r, + &r.ScheduleRotationMembers, &r.pending.ScheduleRotationMembers, + func(element *recipient.RotationMember) { + element.TimePeriodEntries = make(map[int64]*timeperiod.Entry) + + scheduleFound := false + for id, schedule := range r.Schedules { + ok := slices.ContainsFunc(schedule.Rotations, func(rotation *recipient.Rotation) bool { + return rotation.ID == element.RotationID + }) + if ok { + element.ScheduleID = id + scheduleFound = true + break + } + } + if !scheduleFound { + r.logger.Errorw("Schedule rotation member cannot be mapped to a schedule", zap.Inline(element)) + return + } - var entries []*timeperiod.Entry - if err := tx.SelectContext(ctx, &entries, stmt); err != nil { - r.logger.Errorln(err) - return err - } + rotations := r.Schedules[element.ScheduleID].Rotations[element.RotationID] + rotations.Members = append(rotations.Members, element) - for _, entry := range entries { - var member *recipient.RotationMember - if entry.RotationMemberID.Valid { - member = rotationMembersById[entry.RotationMemberID.Int64] - } - - if member == nil { - r.logger.Warnw("ignoring entry for unknown rotation_member_id", - zap.Int64("timeperiod_entry_id", entry.ID), - zap.Int64("timeperiod_id", entry.TimePeriodID)) - continue - } - - err := entry.Init() - if err != nil { - r.logger.Warnw("ignoring time period entry", - zap.Object("entry", entry), - zap.Error(err)) - continue - } - - member.TimePeriodEntries[entry.ID] = entry - } + if element.ContactID.Valid { + var ok bool + element.Contact, ok = r.Contacts[element.ContactID.Int64] + if !ok { + r.logger.Errorw("Schedule rotation member has invalid contact", zap.Inline(element)) + return + } + } - for _, schedule := range schedulesById { - schedule.RefreshRotations() - } + if element.ContactGroupID.Valid { + var ok bool + element.ContactGroup, ok = r.Groups[element.ContactGroupID.Int64] + if !ok { + r.logger.Errorw("Schedule rotation member has invalid contact group", zap.Inline(element)) + return + } + } + }, + func(element, update *recipient.RotationMember) { + // TODO readd + }, + func(element *recipient.RotationMember) { + schedule, ok := r.Schedules[element.ScheduleID] + if !ok { + r.logger.Errorw("Cannot find schedule rotation member's schedule", zap.Inline(element)) + return + } - if r.Schedules != nil { - // mark no longer existing schedules for deletion - for id := range r.Schedules { - if _, ok := schedulesById[id]; !ok { - schedulesById[id] = nil + rotationId := slices.IndexFunc(schedule.Rotations, func(rotation *recipient.Rotation) bool { + return rotation.ID == element.RotationID + }) + if rotationId < 0 { + r.logger.Errorw("Cannot find schedule rotation member's rotation", zap.Inline(element)) + return } - } - } - r.pending.Schedules = schedulesById + rotation := schedule.Rotations[rotationId] + rotation.Members = slices.DeleteFunc(rotation.Members, func(member *recipient.RotationMember) bool { + return member.ID == element.ID + }) + }) - return nil -} + for _, schedule := range r.Schedules { + schedule.RefreshRotations() + } -func (r *RuntimeConfig) applyPendingSchedules() { if r.Schedules == nil { r.Schedules = make(map[int64]*recipient.Schedule) } - - for id, pendingSchedule := range r.pending.Schedules { - if pendingSchedule == nil { - delete(r.Schedules, id) - } else { - for _, rotation := range pendingSchedule.Rotations { - for _, member := range rotation.Members { - memberLogger := r.logger.With( - zap.Object("rotation", rotation), - zap.Object("rotation_member", member)) - - if member.ContactID.Valid { - member.Contact = r.Contacts[member.ContactID.Int64] - if member.Contact == nil { - memberLogger.Warnw("rotation member has an unknown contact_id") - } - } - - if member.ContactGroupID.Valid { - member.ContactGroup = r.Groups[member.ContactGroupID.Int64] - if member.ContactGroup == nil { - memberLogger.Warnw("rotation member has an unknown contactgroup_id") - } - } - } - } - - if currentSchedule := r.Schedules[id]; currentSchedule != nil { - *currentSchedule = *pendingSchedule - } else { - r.Schedules[id] = pendingSchedule - } - } - } - - r.pending.Schedules = nil } diff --git a/internal/recipient/contact.go b/internal/recipient/contact.go index 82732f1f6..a6d9d5e4c 100644 --- a/internal/recipient/contact.go +++ b/internal/recipient/contact.go @@ -2,6 +2,7 @@ package recipient import ( "database/sql" + "github.com/icinga/icinga-go-library/types" "go.uber.org/zap/zapcore" "time" ) @@ -12,6 +13,25 @@ type Contact struct { Username sql.NullString `db:"username"` DefaultChannelID int64 `db:"default_channel_id"` Addresses []*Address `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (c *Contact) GetID() int64 { + return c.ID +} + +func (c *Contact) GetChangedAt() types.UnixMilli { + return c.ChangedAt +} + +func (c *Contact) IsDeleted() bool { + return c.Deleted.Valid && c.Deleted.Bool +} + +func (c *Contact) Validate() error { + return nil } func (c *Contact) String() string { @@ -37,6 +57,31 @@ type Address struct { ContactID int64 `db:"contact_id"` Type string `db:"type"` Address string `db:"address"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (a *Address) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("id", a.ID) + encoder.AddInt64("contact_id", a.ContactID) + return nil +} + +func (a *Address) GetID() int64 { + return a.ID +} + +func (a *Address) GetChangedAt() types.UnixMilli { + return a.ChangedAt +} + +func (a *Address) IsDeleted() bool { + return a.Deleted.Valid && a.Deleted.Bool +} + +func (a *Address) Validate() error { + return nil } func (a *Address) TableName() string { diff --git a/internal/recipient/group.go b/internal/recipient/group.go index 243dde7b9..2742765d3 100644 --- a/internal/recipient/group.go +++ b/internal/recipient/group.go @@ -1,15 +1,35 @@ package recipient import ( + "github.com/icinga/icinga-go-library/types" + "github.com/icinga/icinga-notifications/internal/utils" "go.uber.org/zap/zapcore" "time" ) type Group struct { - ID int64 `db:"id"` - Name string `db:"name"` - Members []*Contact `db:"-"` - MemberIDs []int64 `db:"-"` + ID int64 `db:"id"` + Name string `db:"name"` + Members []*Contact `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (g *Group) GetID() int64 { + return g.ID +} + +func (g *Group) GetChangedAt() types.UnixMilli { + return g.ChangedAt +} + +func (g *Group) IsDeleted() bool { + return g.Deleted.Valid && g.Deleted.Bool +} + +func (g *Group) Validate() error { + return nil } func (g *Group) GetContactsAt(t time.Time) []*Contact { @@ -32,4 +52,38 @@ func (g *Group) MarshalLogObject(encoder zapcore.ObjectEncoder) error { return nil } +type GroupMember struct { + GroupId int64 `db:"contactgroup_id"` + ContactId int64 `db:"contact_id"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (g *GroupMember) TableName() string { + return "contactgroup_member" +} + +func (g *GroupMember) MarshalLogObject(encoder zapcore.ObjectEncoder) error { + encoder.AddInt64("group_id", g.GroupId) + encoder.AddInt64("contact_id", g.ContactId) + return nil +} + +func (g *GroupMember) GetID() utils.CompTuple[int64, int64] { + return utils.CompTuple[int64, int64]{g.GroupId, g.ContactId} +} + +func (g *GroupMember) GetChangedAt() types.UnixMilli { + return g.ChangedAt +} + +func (g *GroupMember) IsDeleted() bool { + return g.Deleted.Valid && g.Deleted.Bool +} + +func (g *GroupMember) Validate() error { + return nil +} + var _ Recipient = (*Group)(nil) diff --git a/internal/recipient/schedule.go b/internal/recipient/schedule.go index d66ef027d..f7a9b136e 100644 --- a/internal/recipient/schedule.go +++ b/internal/recipient/schedule.go @@ -12,10 +12,29 @@ type Schedule struct { ID int64 `db:"id"` Name string `db:"name"` + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` + Rotations []*Rotation `db:"-"` rotationResolver rotationResolver } +func (s *Schedule) GetID() int64 { + return s.ID +} + +func (s *Schedule) GetChangedAt() types.UnixMilli { + return s.ChangedAt +} + +func (s *Schedule) IsDeleted() bool { + return s.Deleted.Valid && s.Deleted.Bool +} + +func (s *Schedule) Validate() error { + return nil +} + // RefreshRotations updates the internally cached rotations. // // This must be called after the Rotations member was updated for the change to become active. @@ -38,6 +57,25 @@ type Rotation struct { Priority int32 `db:"priority"` Name string `db:"name"` Members []*RotationMember `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (r *Rotation) GetID() int64 { + return r.ID +} + +func (r *Rotation) GetChangedAt() types.UnixMilli { + return r.ChangedAt +} + +func (r *Rotation) IsDeleted() bool { + return r.Deleted.Valid && r.Deleted.Bool +} + +func (r *Rotation) Validate() error { + return nil } // MarshalLogObject implements the zapcore.ObjectMarshaler interface. @@ -52,11 +90,31 @@ func (r *Rotation) MarshalLogObject(encoder zapcore.ObjectEncoder) error { type RotationMember struct { ID int64 `db:"id"` RotationID int64 `db:"rotation_id"` + ScheduleID int64 `db:"-"` Contact *Contact `db:"-"` ContactID sql.NullInt64 `db:"contact_id"` ContactGroup *Group `db:"-"` ContactGroupID sql.NullInt64 `db:"contactgroup_id"` TimePeriodEntries map[int64]*timeperiod.Entry `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (r *RotationMember) GetID() int64 { + return r.ID +} + +func (r *RotationMember) GetChangedAt() types.UnixMilli { + return r.ChangedAt +} + +func (r *RotationMember) IsDeleted() bool { + return r.Deleted.Valid && r.Deleted.Bool +} + +func (r *RotationMember) Validate() error { + return nil } func (r *RotationMember) MarshalLogObject(encoder zapcore.ObjectEncoder) error { diff --git a/internal/rule/escalation.go b/internal/rule/escalation.go index 823648cf7..592a61e51 100644 --- a/internal/rule/escalation.go +++ b/internal/rule/escalation.go @@ -2,6 +2,8 @@ package rule import ( "database/sql" + "fmt" + "github.com/icinga/icinga-go-library/types" "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/recipient" "go.uber.org/zap/zapcore" @@ -19,6 +21,39 @@ type Escalation struct { Fallbacks []*Escalation `db:"-"` Recipients []*EscalationRecipient `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (e *Escalation) GetID() int64 { + return e.ID +} + +func (e *Escalation) GetChangedAt() types.UnixMilli { + return e.ChangedAt +} + +func (e *Escalation) IsDeleted() bool { + return e.Deleted.Valid && e.Deleted.Bool +} + +func (e *Escalation) Validate() error { + if e.ConditionExpr.Valid { + cond, err := filter.Parse(e.ConditionExpr.String) + if err != nil { + return err + } + + e.Condition = cond + } + + if e.FallbackForID.Valid { + // TODO: implement fallbacks (needs extra validation: mismatching rule_id, cycles) + return fmt.Errorf("ignoring fallback escalation (not yet implemented)") + } + + return nil } // MarshalLogObject implements the zapcore.ObjectMarshaler interface. @@ -95,9 +130,29 @@ func (e *Escalation) TableName() string { type EscalationRecipient struct { ID int64 `db:"id"` EscalationID int64 `db:"rule_escalation_id"` + RuleID int64 `db:"-"` ChannelID sql.NullInt64 `db:"channel_id"` recipient.Key `db:",inline"` Recipient recipient.Recipient `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (r *EscalationRecipient) GetID() int64 { + return r.ID +} + +func (r *EscalationRecipient) GetChangedAt() types.UnixMilli { + return r.ChangedAt +} + +func (r *EscalationRecipient) IsDeleted() bool { + return r.Deleted.Valid && r.Deleted.Bool +} + +func (r *EscalationRecipient) Validate() error { + return nil } func (r *EscalationRecipient) TableName() string { diff --git a/internal/rule/rule.go b/internal/rule/rule.go index 22b3c0500..977fcb13b 100644 --- a/internal/rule/rule.go +++ b/internal/rule/rule.go @@ -18,6 +18,34 @@ type Rule struct { ObjectFilter filter.Filter `db:"-"` ObjectFilterExpr types.String `db:"object_filter"` Escalations map[int64]*Escalation `db:"-"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + Deleted types.Bool `db:"deleted" json:"deleted"` +} + +func (r *Rule) GetID() int64 { + return r.ID +} + +func (r *Rule) GetChangedAt() types.UnixMilli { + return r.ChangedAt +} + +func (r *Rule) IsDeleted() bool { + return r.Deleted.Valid && r.Deleted.Bool +} + +func (r *Rule) Validate() error { + if r.ObjectFilterExpr.Valid { + f, err := filter.Parse(r.ObjectFilterExpr.String) + if err != nil { + return err + } + + r.ObjectFilter = f + } + + return nil } // MarshalLogObject implements the zapcore.ObjectMarshaler interface. diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 7d05c736b..fc4d8cac8 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -162,3 +162,9 @@ func IterateOrderedMap[K cmp.Ordered, V any](m map[K]V) func(func(K, V) bool) { } } } + +// CompTuple is a simple tuple struct of two independent comparable types. +type CompTuple[TA, TB comparable] struct { + A TA + B TB +}