Skip to content

Commit

Permalink
Incremental Config Updates on Relationship Tables
Browse files Browse the repository at this point in the history
  • Loading branch information
oxzi committed May 8, 2024
1 parent 4c482cf commit fe367d4
Show file tree
Hide file tree
Showing 10 changed files with 299 additions and 107 deletions.
65 changes: 47 additions & 18 deletions internal/config/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,43 @@ import (
"context"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/internal/utils"
"github.com/icinga/icingadb/pkg/types"
dbutils "github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"slices"
)

func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error {
var groupPtr *recipient.Group
changedAt := r.pendingLastChange[dbutils.TableName(groupPtr)]
type ContactgroupMember struct {
GroupId int64 `db:"contactgroup_id"`
ContactId int64 `db:"contact_id"`

ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"`
IsDeleted types.Bool `db:"deleted" json:"deleted"`
}

var (
groupPtr *recipient.Group
memberPtr *ContactgroupMember

groupChangedAt = r.pendingLastChange[dbutils.TableName(groupPtr)]
memberChangedAt = r.pendingLastChange[dbutils.TableName(memberPtr)]
)

stmt := r.buildSelectStmtWhereChangedAt(groupPtr)
r.logger.Debugw("Executing query to fetch groups",
zap.String("query", stmt), zap.Time("changed_at_after", changedAt.Time()))
zap.String("query", stmt), zap.Time("changed_at_after", groupChangedAt.Time()))

var groups []*recipient.Group
if err := tx.SelectContext(ctx, &groups, stmt, changedAt); err != nil {
if err := tx.SelectContext(ctx, &groups, stmt, groupChangedAt); err != nil {
r.logger.Errorln(err)
return err
}

groupsById := make(map[int64]*recipient.Group)
for _, g := range groups {
changedAt = g.ChangedAt
groupChangedAt = g.ChangedAt

groupLogger := r.logger.With(
zap.Int64("id", g.ID),
Expand All @@ -40,46 +56,59 @@ func (r *RuntimeConfig) fetchGroups(ctx context.Context, tx *sqlx.Tx) error {
}
}

type ContactgroupMember struct {
GroupId int64 `db:"contactgroup_id"`
ContactId int64 `db:"contact_id"`
}

var members []*ContactgroupMember
err := r.selectRelationshipTableEntries(
ctx,
tx,
new(ContactgroupMember),
memberPtr,
"contactgroup_id",
utils.MapKeys(groupsById),
r.Groups == nil,
memberChangedAt,
&members)
if err != nil {
r.logger.Errorln(err)
return err
}

for _, m := range members {
memberChangedAt = m.ChangedAt

memberLogger := r.logger.With(
zap.Int64("contact_id", m.ContactId),
zap.Int64("contactgroup_id", m.GroupId),
)

g, ok := groupsById[m.GroupId]
if !ok {
group, newGroupOk := groupsById[m.GroupId]
cachedGroup, cachedGroupOk := r.Groups[m.GroupId]
if !newGroupOk && !cachedGroupOk {
memberLogger.Warn("Ignoring member for unknown contactgroup_id")
continue
} else if g == nil {
} else if !newGroupOk {
groupsById[m.ContactId] = cachedGroup
group = cachedGroup
}

if group == nil {
memberLogger.Debug("Skipping deleted member for unknown contactgroup_id")
continue
}

g.MemberIDs = append(g.MemberIDs, m.ContactId)
memberLogger.Info("Loaded contact group member", zap.String("contactgroup_name", g.Name))
chkFn := func(id int64) bool { return id == m.ContactId }
if m.IsDeleted.Valid && m.IsDeleted.Bool {
group.MemberIDs = slices.DeleteFunc(group.MemberIDs, chkFn)
memberLogger.Debug("Deleted contact group member")
} else if i := slices.IndexFunc(group.MemberIDs, chkFn); i >= 0 {
group.MemberIDs[i] = m.ContactId
memberLogger.Debug("Updated contact group member")
} else {
group.MemberIDs = append(group.MemberIDs, m.ContactId)
memberLogger.Debug("Loaded contact group member")
}
}

r.pending.Groups = groupsById
r.pendingLastChange[dbutils.TableName(groupPtr)] = changedAt
r.pendingLastChange[dbutils.TableName(groupPtr)] = groupChangedAt
r.pendingLastChange[dbutils.TableName(memberPtr)] = memberChangedAt

return nil
}
Expand Down
96 changes: 79 additions & 17 deletions internal/config/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,34 @@ import (
dbutils "github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
"slices"
)

func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
var rulePtr *rule.Rule
changedAt := r.pendingLastChange[dbutils.TableName(rulePtr)]
var (
rulePtr *rule.Rule
escalationPtr *rule.Escalation
recipientPtr *rule.EscalationRecipient

ruleChangedAt = r.pendingLastChange[dbutils.TableName(rulePtr)]
escalationChangedAt = r.pendingLastChange[dbutils.TableName(escalationPtr)]
recipientChangedAt = r.pendingLastChange[dbutils.TableName(recipientPtr)]
)

stmt := r.buildSelectStmtWhereChangedAt(rulePtr)
r.logger.Debugw("Executing query to fetch rules",
zap.String("query", stmt),
zap.Time("changed_at_after", changedAt.Time()))
zap.Time("changed_at_after", ruleChangedAt.Time()))

var rules []*rule.Rule
if err := tx.SelectContext(ctx, &rules, stmt, changedAt); err != nil {
if err := tx.SelectContext(ctx, &rules, stmt, ruleChangedAt); err != nil {
r.logger.Errorln(err)
return err
}

rulesByID := make(map[int64]*rule.Rule)
for _, ru := range rules {
changedAt = ru.ChangedAt
ruleChangedAt = ru.ChangedAt

ruleLogger := r.logger.With(
zap.Int64("id", ru.ID),
Expand Down Expand Up @@ -61,10 +70,10 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
err := r.selectRelationshipTableEntries(
ctx,
tx,
new(rule.Escalation),
escalationPtr,
"rule_id",
utils.MapKeys(rulesByID),
r.Rules == nil,
escalationChangedAt,
&escalations)
if err != nil {
r.logger.Errorln(err)
Expand All @@ -73,6 +82,8 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {

escalationsByID := make(map[int64]*rule.Escalation)
for _, escalation := range escalations {
escalationChangedAt = escalation.ChangedAt

escalationLogger := r.logger.With(
zap.Int64("id", escalation.ID),
zap.Int64("rule_id", escalation.RuleID),
Expand All @@ -81,15 +92,28 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
zap.Int64("fallback_for", escalation.FallbackForID.Int64),
)

r, ok := rulesByID[escalation.RuleID]
if !ok {
rule, newRuleOk := rulesByID[escalation.RuleID]
cachedRule, cachedRuleOk := r.Rules[escalation.RuleID]
if !newRuleOk && !cachedRuleOk {
escalationLogger.Warn("Ignoring escalation for unknown rule_id")
continue
} else if r == nil {
} else if !newRuleOk {
rulesByID[escalation.RuleID] = cachedRule
rule = cachedRule
}

if rule == nil {
escalationLogger.Debug("Skipping deleted escalation for unknown rule_id")
continue
}

if escalation.IsDeleted.Valid && escalation.IsDeleted.Bool {
rule.Escalations[escalation.ID] = nil
escalationsByID[escalation.ID] = nil
escalationLogger.Info("Deleted escalation config")
continue
}

if escalation.ConditionExpr.Valid {
cond, err := filter.Parse(escalation.ConditionExpr.String)
if err != nil {
Expand All @@ -110,43 +134,81 @@ func (r *RuntimeConfig) fetchRules(ctx context.Context, tx *sqlx.Tx) error {
escalation.Name = escalation.NameRaw.String
}

r.Escalations[escalation.ID] = escalation
rule.Escalations[escalation.ID] = escalation
escalationsByID[escalation.ID] = escalation
escalationLogger.Debugw("loaded escalation config")
escalationLogger.Debug("Loaded escalation config")
}

var recipients []*rule.EscalationRecipient
err = r.selectRelationshipTableEntries(
ctx,
tx,
new(rule.EscalationRecipient),
recipientPtr,
"rule_escalation_id",
utils.MapKeys(escalationsByID),
r.Rules == nil,
recipientChangedAt,
&recipients)
if err != nil {
r.logger.Errorln(err)
return err
}

for _, recipient := range recipients {
recipientChangedAt = recipient.ChangedAt

recipientLogger := r.logger.With(
zap.Int64("id", recipient.ID),
zap.Int64("escalation_id", recipient.EscalationID),
zap.Int64("channel_id", recipient.ChannelID.Int64))

escalation := escalationsByID[recipient.EscalationID]
// In contrary to similar code snippets, this should not be able to contain nil elements.
escalation, escalationOk := escalationsByID[recipient.EscalationID]
if !escalationOk {
for _, cachedRule := range r.Rules {
if ruleEscalation, ok := cachedRule.Escalations[recipient.EscalationID]; ok {
rule, ruleOk := rulesByID[cachedRule.ID]
if !ruleOk {
// Rule does not exist from this run; load from previous run.
rulesByID[cachedRule.ID] = cachedRule
rule = cachedRule
} else {
// Populate current rule with cached escalation.
rule.Escalations[ruleEscalation.ID] = ruleEscalation
}

escalationsByID[ruleEscalation.ID] = ruleEscalation
escalation = ruleEscalation
break
}
}

if escalation == nil {
recipientLogger.Warn("Failed to load escalation")
continue
}
}

if escalation == nil {
recipientLogger.Warn("Ignoring recipient for unknown escalation")
continue
}

chkFn := func(cmpRecipient *rule.EscalationRecipient) bool { return cmpRecipient.ID == recipient.ID }
if recipient.IsDeleted.Valid && recipient.IsDeleted.Bool {
escalation.Recipients = slices.DeleteFunc(escalation.Recipients, chkFn)
recipientLogger.Debug("Removed escalation recipient config")
} else if i := slices.IndexFunc(escalation.Recipients, chkFn); i >= 0 {
escalation.Recipients[i] = recipient
recipientLogger.Debug("Replaced escalation recipient config")
} else {
escalation.Recipients = append(escalation.Recipients, recipient)
recipientLogger.Debug("Loaded escalation recipient config")
}
}

r.pending.Rules = rulesByID
r.pendingLastChange[dbutils.TableName(rulePtr)] = changedAt
r.pendingLastChange[dbutils.TableName(rulePtr)] = ruleChangedAt
r.pendingLastChange[dbutils.TableName(escalationPtr)] = escalationChangedAt
r.pendingLastChange[dbutils.TableName(recipientPtr)] = recipientChangedAt

return nil
}
Expand Down
Loading

0 comments on commit fe367d4

Please sign in to comment.