diff --git a/internal/config/group.go b/internal/config/group.go index 75d990b16..7fd3f2d28 100644 --- a/internal/config/group.go +++ b/internal/config/group.go @@ -4,27 +4,43 @@ import ( "context" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/utils" + "github.com/icinga/icingadb/pkg/types" dbutils "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" ) func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { - var groupPtr *recipient.Group - changedAt := r.pendingLastChange[dbutils.TableName(groupPtr)] + type ContactgroupMember struct { + GroupId int64 `db:"contactgroup_id"` + ContactId int64 `db:"contact_id"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` + } + + var ( + groupPtr *recipient.Group + memberPtr *ContactgroupMember + + groupChangedAt = r.pendingLastChange[dbutils.TableName(groupPtr)] + memberChangedAt = r.pendingLastChange[dbutils.TableName(memberPtr)] + ) + stmt := r.buildSelectStmtWhereChangedAt(groupPtr) r.logger.Debugw("Executing query to fetch groups", - zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) + zap.String("query", stmt), zap.Time("changed_at_after", groupChangedAt.Time())) var groups []*recipient.Group - if err := tx.SelectContext(ctx, &groups, stmt, changedAt); err != nil { + if err := tx.SelectContext(ctx, &groups, stmt, groupChangedAt); err != nil { r.logger.Errorln(err) return err } groupsById := make(map[int64]*recipient.Group) for _, g := range groups { - changedAt = g.ChangedAt + groupChangedAt = g.ChangedAt groupLogger := r.logger.With( zap.Int64("id", g.ID), @@ -40,19 +56,14 @@ func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { } } - type ContactgroupMember struct { - GroupId int64 `db:"contactgroup_id"` - ContactId int64 `db:"contact_id"` - } - var members []*ContactgroupMember err := r.selectRelationshipTableEntries( ctx, tx, - new(ContactgroupMember), + memberPtr, "contactgroup_id", utils.MapKeys(groupsById), - r.Groups == nil, + memberChangedAt, &members) if err != nil { r.logger.Errorln(err) @@ -60,26 +71,44 @@ func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error { } for _, m := range members { + memberChangedAt = m.ChangedAt + memberLogger := r.logger.With( zap.Int64("contact_id", m.ContactId), zap.Int64("contactgroup_id", m.GroupId), ) - g, ok := groupsById[m.GroupId] - if !ok { + group, newGroupOk := groupsById[m.GroupId] + cachedGroup, cachedGroupOk := r.Groups[m.GroupId] + if !newGroupOk && !cachedGroupOk { memberLogger.Warn("Ignoring member for unknown contactgroup_id") continue - } else if g == nil { + } else if !newGroupOk { + groupsById[m.ContactId] = cachedGroup + group = cachedGroup + } + + if group == nil { memberLogger.Debug("Skipping deleted member for unknown contactgroup_id") continue } - g.MemberIDs = append(g.MemberIDs, m.ContactId) - memberLogger.Info("Loaded contact group member", zap.String("contactgroup_name", g.Name)) + chkFn := func(id int64) bool { return id == m.ContactId } + if m.IsDeleted.Valid && m.IsDeleted.Bool { + group.MemberIDs = slices.DeleteFunc(group.MemberIDs, chkFn) + memberLogger.Debug("Deleted contact group member") + } else if i := slices.IndexFunc(group.MemberIDs, chkFn); i >= 0 { + group.MemberIDs[i] = m.ContactId + memberLogger.Debug("Updated contact group member") + } else { + group.MemberIDs = append(group.MemberIDs, m.ContactId) + memberLogger.Debug("Loaded contact group member") + } } r.pending.Groups = groupsById - r.pendingLastChange[dbutils.TableName(groupPtr)] = changedAt + r.pendingLastChange[dbutils.TableName(groupPtr)] = groupChangedAt + r.pendingLastChange[dbutils.TableName(memberPtr)] = memberChangedAt return nil } diff --git a/internal/config/rule.go b/internal/config/rule.go index 66aa5d2ab..4c96ddf5a 100644 --- a/internal/config/rule.go +++ b/internal/config/rule.go @@ -8,25 +8,34 @@ import ( dbutils "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" ) func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { - var rulePtr *rule.Rule - changedAt := r.pendingLastChange[dbutils.TableName(rulePtr)] + var ( + rulePtr *rule.Rule + escalationPtr *rule.Escalation + recipientPtr *rule.EscalationRecipient + + ruleChangedAt = r.pendingLastChange[dbutils.TableName(rulePtr)] + escalationChangedAt = r.pendingLastChange[dbutils.TableName(escalationPtr)] + recipientChangedAt = r.pendingLastChange[dbutils.TableName(recipientPtr)] + ) + stmt := r.buildSelectStmtWhereChangedAt(rulePtr) r.logger.Debugw("Executing query to fetch rules", zap.String("query", stmt), - zap.Time("changed_at_after", changedAt.Time())) + zap.Time("changed_at_after", ruleChangedAt.Time())) var rules []*rule.Rule - if err := tx.SelectContext(ctx, &rules, stmt, changedAt); err != nil { + if err := tx.SelectContext(ctx, &rules, stmt, ruleChangedAt); err != nil { r.logger.Errorln(err) return err } rulesByID := make(map[int64]*rule.Rule) for _, ru := range rules { - changedAt = ru.ChangedAt + ruleChangedAt = ru.ChangedAt ruleLogger := r.logger.With( zap.Int64("id", ru.ID), @@ -61,10 +70,10 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { err := r.selectRelationshipTableEntries( ctx, tx, - new(rule.Escalation), + escalationPtr, "rule_id", utils.MapKeys(rulesByID), - r.Rules == nil, + escalationChangedAt, &escalations) if err != nil { r.logger.Errorln(err) @@ -73,6 +82,8 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { escalationsByID := make(map[int64]*rule.Escalation) for _, escalation := range escalations { + escalationChangedAt = escalation.ChangedAt + escalationLogger := r.logger.With( zap.Int64("id", escalation.ID), zap.Int64("rule_id", escalation.RuleID), @@ -81,15 +92,28 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { zap.Int64("fallback_for", escalation.FallbackForID.Int64), ) - r, ok := rulesByID[escalation.RuleID] - if !ok { + rule, newRuleOk := rulesByID[escalation.RuleID] + cachedRule, cachedRuleOk := r.Rules[escalation.RuleID] + if !newRuleOk && !cachedRuleOk { escalationLogger.Warn("Ignoring escalation for unknown rule_id") continue - } else if r == nil { + } else if !newRuleOk { + rulesByID[escalation.RuleID] = cachedRule + rule = cachedRule + } + + if rule == nil { escalationLogger.Debug("Skipping deleted escalation for unknown rule_id") continue } + if escalation.IsDeleted.Valid && escalation.IsDeleted.Bool { + rule.Escalations[escalation.ID] = nil + escalationsByID[escalation.ID] = nil + escalationLogger.Info("Deleted escalation config") + continue + } + if escalation.ConditionExpr.Valid { cond, err := filter.Parse(escalation.ConditionExpr.String) if err != nil { @@ -110,19 +134,19 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { escalation.Name = escalation.NameRaw.String } - r.Escalations[escalation.ID] = escalation + rule.Escalations[escalation.ID] = escalation escalationsByID[escalation.ID] = escalation - escalationLogger.Debugw("loaded escalation config") + escalationLogger.Debug("Loaded escalation config") } var recipients []*rule.EscalationRecipient err = r.selectRelationshipTableEntries( ctx, tx, - new(rule.EscalationRecipient), + recipientPtr, "rule_escalation_id", utils.MapKeys(escalationsByID), - r.Rules == nil, + recipientChangedAt, &recipients) if err != nil { r.logger.Errorln(err) @@ -130,15 +154,51 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { } for _, recipient := range recipients { + recipientChangedAt = recipient.ChangedAt + recipientLogger := r.logger.With( zap.Int64("id", recipient.ID), zap.Int64("escalation_id", recipient.EscalationID), zap.Int64("channel_id", recipient.ChannelID.Int64)) - escalation := escalationsByID[recipient.EscalationID] - // In contrary to similar code snippets, this should not be able to contain nil elements. + escalation, escalationOk := escalationsByID[recipient.EscalationID] + if !escalationOk { + for _, cachedRule := range r.Rules { + if ruleEscalation, ok := cachedRule.Escalations[recipient.EscalationID]; ok { + rule, ruleOk := rulesByID[cachedRule.ID] + if !ruleOk { + // Rule does not exist from this run; load from previous run. + rulesByID[cachedRule.ID] = cachedRule + rule = cachedRule + } else { + // Populate current rule with cached escalation. + rule.Escalations[ruleEscalation.ID] = ruleEscalation + } + + escalationsByID[ruleEscalation.ID] = ruleEscalation + escalation = ruleEscalation + break + } + } + + if escalation == nil { + recipientLogger.Warn("Failed to load escalation") + continue + } + } + if escalation == nil { recipientLogger.Warn("Ignoring recipient for unknown escalation") + continue + } + + chkFn := func(cmpRecipient *rule.EscalationRecipient) bool { return cmpRecipient.ID == recipient.ID } + if recipient.IsDeleted.Valid && recipient.IsDeleted.Bool { + escalation.Recipients = slices.DeleteFunc(escalation.Recipients, chkFn) + recipientLogger.Debug("Removed escalation recipient config") + } else if i := slices.IndexFunc(escalation.Recipients, chkFn); i >= 0 { + escalation.Recipients[i] = recipient + recipientLogger.Debug("Replaced escalation recipient config") } else { escalation.Recipients = append(escalation.Recipients, recipient) recipientLogger.Debug("Loaded escalation recipient config") @@ -146,7 +206,9 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error { } r.pending.Rules = rulesByID - r.pendingLastChange[dbutils.TableName(rulePtr)] = changedAt + r.pendingLastChange[dbutils.TableName(rulePtr)] = ruleChangedAt + r.pendingLastChange[dbutils.TableName(escalationPtr)] = escalationChangedAt + r.pendingLastChange[dbutils.TableName(recipientPtr)] = recipientChangedAt return nil } diff --git a/internal/config/schedule.go b/internal/config/schedule.go index d186637b7..5fc6ec895 100644 --- a/internal/config/schedule.go +++ b/internal/config/schedule.go @@ -2,29 +2,37 @@ package config import ( "context" + "database/sql" "github.com/icinga/icinga-notifications/internal/recipient" "github.com/icinga/icinga-notifications/internal/utils" dbutils "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" ) func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error { - var schedulePtr *recipient.Schedule - changedAt := r.pendingLastChange[dbutils.TableName(schedulePtr)] + var ( + schedulePtr *recipient.Schedule + memberPtr *recipient.ScheduleMemberRow + + scheduleChangedAt = r.pendingLastChange[dbutils.TableName(schedulePtr)] + memberChangedAt = r.pendingLastChange[dbutils.TableName(memberPtr)] + ) + stmt := r.buildSelectStmtWhereChangedAt(schedulePtr) r.logger.Debugw("Executing query to fetch schedule", - zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) + zap.String("query", stmt), zap.Time("changed_at_after", scheduleChangedAt.Time())) var schedules []*recipient.Schedule - if err := tx.SelectContext(ctx, &schedules, stmt, changedAt); err != nil { + if err := tx.SelectContext(ctx, &schedules, stmt, scheduleChangedAt); err != nil { r.logger.Errorln(err) return err } schedulesById := make(map[int64]*recipient.Schedule) for _, s := range schedules { - changedAt = s.ChangedAt + scheduleChangedAt = s.ChangedAt scheduleLogger := r.logger.With( zap.Int64("id", s.ID), @@ -44,10 +52,10 @@ func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error { err := r.selectRelationshipTableEntries( ctx, tx, - new(recipient.ScheduleMemberRow), + memberPtr, "schedule_id", utils.MapKeys(schedulesById), - r.Schedules == nil, + memberChangedAt, &members) if err != nil { r.logger.Errorln(err) @@ -55,23 +63,47 @@ func (r *RuntimeConfig) fetchSchedules(ctx context.Context, tx *sqlx.Tx) error { } for _, member := range members { + memberChangedAt = member.ChangedAt + memberLogger := makeScheduleMemberLogger(r.logger.SugaredLogger, member) - s, ok := schedulesById[member.ScheduleID] - if !ok { + schedule, newScheduleOk := schedulesById[member.ScheduleID] + cachedSchedule, cachedScheduleOk := r.Schedules[member.ScheduleID] + if !newScheduleOk && !cachedScheduleOk { memberLogger.Warn("Ignoring entry for unknown schedule_id") continue - } else if s == nil { + } else if !newScheduleOk { + schedulesById[member.ScheduleID] = cachedSchedule + schedule = cachedSchedule + } + + if schedule == nil { memberLogger.Debug("Skipping deleted entry for unknown schedule_id") continue } - s.MemberRows = append(s.MemberRows, member) - memberLogger.Debug("Member") + chkFn := func(row *recipient.ScheduleMemberRow) bool { + sqlNullInt64Eq := func(a, b sql.NullInt64) bool { return (!a.Valid && !b.Valid) || (a.Int64 == b.Int64) } + return row.ScheduleID == member.ScheduleID && + row.TimePeriodID == member.TimePeriodID && + sqlNullInt64Eq(row.ContactID, member.ContactID) && + sqlNullInt64Eq(row.GroupID, member.GroupID) + } + if member.IsDeleted.Valid && member.IsDeleted.Bool { + schedule.MemberRows = slices.DeleteFunc(schedule.MemberRows, chkFn) + memberLogger.Debug("Deleted schedule member") + } else if i := slices.IndexFunc(schedule.MemberRows, chkFn); i >= 0 { + schedule.MemberRows[i] = member + memberLogger.Debug("Replaced schedule member") + } else { + schedule.MemberRows = append(schedule.MemberRows, member) + memberLogger.Debug("Loaded schedule member") + } } r.pending.Schedules = schedulesById - r.pendingLastChange[dbutils.TableName(schedulePtr)] = changedAt + r.pendingLastChange[dbutils.TableName(schedulePtr)] = scheduleChangedAt + r.pendingLastChange[dbutils.TableName(memberPtr)] = memberChangedAt return nil } diff --git a/internal/config/timeperiod.go b/internal/config/timeperiod.go index e08e7d8e2..0be7623e4 100644 --- a/internal/config/timeperiod.go +++ b/internal/config/timeperiod.go @@ -10,25 +10,45 @@ import ( dbutils "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" + "slices" "time" ) func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error { - var timePeriodPtr *timeperiod.TimePeriod - changedAt := r.pendingLastChange[dbutils.TableName(timePeriodPtr)] + type TimeperiodEntry struct { + ID int64 `db:"id"` + TimePeriodID int64 `db:"timeperiod_id"` + StartTime types.UnixMilli `db:"start_time"` + EndTime types.UnixMilli `db:"end_time"` + Timezone string `db:"timezone"` + RRule sql.NullString `db:"rrule"` + Description sql.NullString `db:"description"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` + } + + var ( + timePeriodPtr *timeperiod.TimePeriod + timeEntryPtr *TimeperiodEntry + + timePeriodChangedAt = r.pendingLastChange[dbutils.TableName(timePeriodPtr)] + timeEntryChangedAt = r.pendingLastChange[dbutils.TableName(timeEntryPtr)] + ) + stmt := r.buildSelectStmtWhereChangedAt(timePeriodPtr) r.logger.Debugw("Executing query to fetch time periods", - zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time())) + zap.String("query", stmt), zap.Time("changed_at_after", timePeriodChangedAt.Time())) var timePeriods []*timeperiod.TimePeriod - if err := tx.SelectContext(ctx, &timePeriods, stmt, changedAt); err != nil { + if err := tx.SelectContext(ctx, &timePeriods, stmt, timePeriodChangedAt); err != nil { r.logger.Errorln(err) return err } timePeriodsById := make(map[int64]*timeperiod.TimePeriod) for _, p := range timePeriods { - changedAt = p.ChangedAt + timePeriodChangedAt = p.ChangedAt timePeriodLogger := r.logger.With( zap.Int64("id", p.ID), @@ -44,24 +64,14 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error } } - type TimeperiodEntry struct { - ID int64 `db:"id"` - TimePeriodID int64 `db:"timeperiod_id"` - StartTime types.UnixMilli `db:"start_time"` - EndTime types.UnixMilli `db:"end_time"` - Timezone string `db:"timezone"` - RRule sql.NullString `db:"rrule"` - Description sql.NullString `db:"description"` - } - var entries []*TimeperiodEntry err := r.selectRelationshipTableEntries( ctx, tx, - new(TimeperiodEntry), + timeEntryPtr, "timeperiod_id", utils.MapKeys(timePeriodsById), - r.TimePeriods == nil, + timeEntryChangedAt, &entries) if err != nil { r.logger.Errorln(err) @@ -69,23 +79,31 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error } for _, row := range entries { + timeEntryChangedAt = row.ChangedAt + entryLogger := r.logger.With( zap.Int64("timeperiod_entry_id", row.ID), zap.Int64("timeperiod_id", row.TimePeriodID)) - p, ok := timePeriodsById[row.TimePeriodID] - if !ok { + period, newPeriodOk := timePeriodsById[row.TimePeriodID] + cachedPeriod, cachedPeriodOk := r.TimePeriods[row.TimePeriodID] + if !newPeriodOk && !cachedPeriodOk { entryLogger.Warn("Ignoring entry for unknown timeperiod_id") continue - } else if p == nil { + } else if !newPeriodOk { + timePeriodsById[row.TimePeriodID] = cachedPeriod + period = cachedPeriod + } + + if period == nil { entryLogger.Debug("Skipping deleted entry for unknown timeperiod_id") continue } - if p.Name == "" { - p.Name = fmt.Sprintf("Time Period #%d", row.TimePeriodID) + if period.Name == "" { + period.Name = fmt.Sprintf("Time Period #%d", row.TimePeriodID) if row.Description.Valid { - p.Name += fmt.Sprintf(" (%s)", row.Description.String) + period.Name += fmt.Sprintf(" (%s)", row.Description.String) } } @@ -98,6 +116,7 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error } entry := &timeperiod.Entry{ + ID: row.ID, Start: row.StartTime.Time().Truncate(time.Second).In(loc), End: row.EndTime.Time().Truncate(time.Second).In(loc), TimeZone: row.Timezone, @@ -112,16 +131,27 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error entryLogger.Warnw("Ignoring time period entry", zap.String("rrule", entry.RecurrenceRule), zap.Error(err)) - continue + if !(row.IsDeleted.Valid && row.IsDeleted.Bool) { + // Still allow deleting invalid entries + continue + } } - p.Entries = append(p.Entries, entry) - - entryLogger.Debugw("Loaded time period entry", - zap.String("timeperiod", p.Name), - zap.Time("start", entry.Start), - zap.Time("end", entry.End), - zap.String("rrule", entry.RecurrenceRule)) + chkFn := func(cmpEntry *timeperiod.Entry) bool { return cmpEntry.ID == entry.ID } + if row.IsDeleted.Valid && row.IsDeleted.Bool { + period.Entries = slices.DeleteFunc(period.Entries, chkFn) + entryLogger.Debug("Deleted time period entry") + } else if i := slices.IndexFunc(period.Entries, chkFn); i >= 0 { + period.Entries[i] = entry + entryLogger.Debug("Replaced time period entry") + } else { + period.Entries = append(period.Entries, entry) + entryLogger.Debugw("Loaded time period entry", + zap.String("timeperiod", period.Name), + zap.Time("start", entry.Start), + zap.Time("end", entry.End), + zap.String("rrule", entry.RecurrenceRule)) + } } for _, p := range timePeriodsById { @@ -135,7 +165,8 @@ func (r *RuntimeConfig) fetchTimePeriods(ctx context.Context, tx *sqlx.Tx) error } r.pending.TimePeriods = timePeriodsById - r.pendingLastChange[dbutils.TableName(timePeriodPtr)] = changedAt + r.pendingLastChange[dbutils.TableName(timePeriodPtr)] = timePeriodChangedAt + r.pendingLastChange[dbutils.TableName(timeEntryPtr)] = timeEntryChangedAt return nil } diff --git a/internal/config/utils.go b/internal/config/utils.go index f9d1789e9..10af57900 100644 --- a/internal/config/utils.go +++ b/internal/config/utils.go @@ -2,6 +2,7 @@ package config import ( "context" + "github.com/icinga/icingadb/pkg/types" "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "go.uber.org/zap" @@ -12,6 +13,8 @@ import ( // // The query, which will be a prepared statement, expects a types.UnixMilli parameter to be compared against. This // parameter might be NULL, being COALESCEd to a numeric zero, returning all rows. +// +// The rows will be ordered by `changed_at`, allowing to update the last change timestamp when iterating over it. func (r *RuntimeConfig) buildSelectStmtWhereChangedAt(typePtr interface{}) string { return r.db.Rebind(r.db.BuildSelectStmt(typePtr, typePtr) + ` WHERE "changed_at" > COALESCE(?, CAST(0 AS BIGINT)) ORDER BY "changed_at"`) @@ -19,44 +22,36 @@ func (r *RuntimeConfig) buildSelectStmtWhereChangedAt(typePtr interface{}) strin // selectRelationshipTableEntries constructs and execute a SELECT query for the config relationship tables. // -// A SELECT query against the relationship tables without the changed_at and deleted fields is generated for either all -// entries at the initial sync (selectAll := true) or a subset specified by `ids` in the `idField`. In case of an -// incremental sync without any `ìds`, the function directly returns without performing any query. +// A SELECT query against the relationship tables is generated for either all entries at the initial sync (`changedAt` +// has a NULL value) or a subset specified by either all `ids` in the `idField` or all changes since `changedAt`. +// +// The rows will be ordered by `changed_at`, allowing to update the last change timestamp when iterating over it. func (r *RuntimeConfig) selectRelationshipTableEntries( ctx context.Context, tx *sqlx.Tx, typePtr interface{}, idField string, ids []int64, - selectAll bool, + changedAt types.UnixMilli, dest interface{}, ) error { - if !selectAll && len(ids) == 0 { - r.logger.Debugw("Skipping query fetching relationship table as no IDs were requested", - zap.String("table", utils.TableName(typePtr))) - return nil - } - - stmt := r.db.BuildSelectStmt(typePtr, typePtr) - if !selectAll { - stmt += ` WHERE "` + idField + `" IN (` - stmt += strings.Repeat("?,", len(ids)) - stmt = stmt[:len(stmt)-1] + ")" + stmt := r.db.BuildSelectStmt(typePtr, typePtr) + ` WHERE "changed_at" > COALESCE(?, CAST(0 AS BIGINT))` + if len(ids) > 0 { + stmt += ` OR "` + idField + `" IN (` + strings.Join(strings.Split(strings.Repeat("?", len(ids)), ""), ",") + `)` } + stmt += ` ORDER BY "changed_at"` stmt = r.db.Rebind(stmt) - args := make([]interface{}, 0, len(ids)) - if !selectAll { - for id := range ids { - args = append(args, id) - } + args := make([]interface{}, 0, 1+len(ids)) + args = append(args, changedAt) + for id := range ids { + args = append(args, id) } r.logger.Debugw("Executing query to fetch relationship table", zap.String("table", utils.TableName(typePtr)), - zap.String("id_field", idField), zap.String("query", stmt), - zap.Bool("select_all", selectAll), + zap.Time("changed_at_after", changedAt.Time()), zap.Int("ids_in", len(ids))) return tx.SelectContext(ctx, dest, stmt, args...) diff --git a/internal/recipient/schedule.go b/internal/recipient/schedule.go index 22e90837f..b04139716 100644 --- a/internal/recipient/schedule.go +++ b/internal/recipient/schedule.go @@ -28,6 +28,9 @@ type ScheduleMemberRow struct { TimePeriodID int64 `db:"timeperiod_id"` ContactID sql.NullInt64 `db:"contact_id"` GroupID sql.NullInt64 `db:"contactgroup_id"` + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } func (s *ScheduleMemberRow) TableName() string { diff --git a/internal/rule/escalation.go b/internal/rule/escalation.go index 231fc4bd1..a3f1dcd91 100644 --- a/internal/rule/escalation.go +++ b/internal/rule/escalation.go @@ -4,6 +4,7 @@ import ( "database/sql" "github.com/icinga/icinga-notifications/internal/filter" "github.com/icinga/icinga-notifications/internal/recipient" + "github.com/icinga/icingadb/pkg/types" "strings" "time" ) @@ -18,6 +19,9 @@ type Escalation struct { FallbackForID sql.NullInt64 `db:"fallback_for"` Fallbacks []*Escalation + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` + Recipients []*EscalationRecipient } @@ -68,6 +72,9 @@ type EscalationRecipient struct { ChannelID sql.NullInt64 `db:"channel_id"` recipient.Key `db:",inline"` Recipient recipient.Recipient + + ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"` + IsDeleted types.Bool `db:"deleted" json:"deleted"` } func (r *EscalationRecipient) TableName() string { diff --git a/internal/timeperiod/timeperiod.go b/internal/timeperiod/timeperiod.go index 4430e65cd..2ddd5e9f9 100644 --- a/internal/timeperiod/timeperiod.go +++ b/internal/timeperiod/timeperiod.go @@ -51,6 +51,9 @@ func (p *TimePeriod) NextTransition(base time.Time) time.Time { } type Entry struct { + // ID references timeperiod_entry.id in the database for both updates and cleanups. + ID int64 + Start, End time.Time // for future use diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index 28f0c7ffa..e10974b12 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -96,11 +96,13 @@ CREATE TABLE contactgroup ( CREATE INDEX idx_contactgroup_changed_at ON contactgroup(changed_at); --- Changes to contactgroup_member should be notified by an updated contactgroup.changed_at. CREATE TABLE contactgroup_member ( contactgroup_id bigint NOT NULL REFERENCES contactgroup(id), contact_id bigint NOT NULL REFERENCES contact(id), + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_contactgroup_member PRIMARY KEY (contactgroup_id, contact_id) ); @@ -128,7 +130,6 @@ CREATE TABLE timeperiod ( CREATE INDEX idx_timeperiod_changed_at ON timeperiod(changed_at); --- Changes to timeperiod_entry should be notified by an updated timeperiod.changed_at. CREATE TABLE timeperiod_entry ( id bigserial, timeperiod_id bigint NOT NULL REFERENCES timeperiod(id), @@ -144,16 +145,21 @@ CREATE TABLE timeperiod_entry ( frequency frequency_type, description text, + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_timeperiod_entry PRIMARY KEY (id) ); --- Changes to schedule_member should be notified by an updated schedule.changed_at. CREATE TABLE schedule_member ( schedule_id bigint NOT NULL REFERENCES schedule(id), timeperiod_id bigint NOT NULL REFERENCES timeperiod(id), contact_id bigint REFERENCES contact(id), contactgroup_id bigint REFERENCES contactgroup(id), + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + -- There is no PRIMARY KEY in that table as either contact_id or contactgroup_id should be allowed to be NULL. -- Instead, there are two UNIQUE constraints that prevent duplicate entries. Multiple NULLs are not considered to -- be duplicates, so rows with a contact_id but no contactgroup_id are basically ignored in the UNIQUE constraint @@ -260,7 +266,6 @@ CREATE TABLE rule ( CREATE INDEX idx_rule_changed_at ON rule(changed_at); --- Changes to rule_escalation should be notified by an updated rule.changed_at. CREATE TABLE rule_escalation ( id bigserial, rule_id bigint NOT NULL REFERENCES rule(id), @@ -269,13 +274,15 @@ CREATE TABLE rule_escalation ( name text, -- if not set, recipients are used as a fallback for display purposes fallback_for bigint REFERENCES rule_escalation(id), + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_rule_escalation PRIMARY KEY (id), UNIQUE (rule_id, position), CHECK (NOT (condition IS NOT NULL AND fallback_for IS NOT NULL)) ); --- Changes to rule_escalation_recipient should be notified by an updated rule.changed_at via rule_escalation.rule_id. CREATE TABLE rule_escalation_recipient ( id bigserial, rule_escalation_id bigint NOT NULL REFERENCES rule_escalation(id), @@ -284,6 +291,9 @@ CREATE TABLE rule_escalation_recipient ( schedule_id bigint REFERENCES schedule(id), channel_id bigint REFERENCES channel(id), + changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + deleted boolenum NOT NULL DEFAULT 'n', + CONSTRAINT pk_rule_escalation_recipient PRIMARY KEY (id), CHECK (num_nonnulls(contact_id, contactgroup_id, schedule_id) = 1) diff --git a/schema/pgsql/upgrades/025.sql b/schema/pgsql/upgrades/025.sql index 22f734626..e0a68a454 100644 --- a/schema/pgsql/upgrades/025.sql +++ b/schema/pgsql/upgrades/025.sql @@ -14,18 +14,38 @@ ALTER TABLE contactgroup ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; +ALTER TABLE contactgroup_member + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + ALTER TABLE timeperiod ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; +ALTER TABLE timeperiod_entry + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + ALTER TABLE schedule ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; +ALTER TABLE schedule_member + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + ALTER TABLE rule ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; +ALTER TABLE rule_escalation + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + +ALTER TABLE rule_escalation_recipient + ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, + ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n'; + ALTER TABLE source ADD COLUMN changed_at bigint NOT NULL DEFAULT EXTRACT(EPOCH FROM NOW()) * 1000, ADD COLUMN deleted boolenum NOT NULL DEFAULT 'n';