Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
oxzi committed Jun 19, 2024
1 parent fb1264c commit 845faae
Show file tree
Hide file tree
Showing 19 changed files with 923 additions and 559 deletions.
30 changes: 30 additions & 0 deletions internal/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"errors"
"fmt"
"github.com/icinga/icinga-go-library/types"
"github.com/icinga/icinga-notifications/internal/contracts"
"github.com/icinga/icinga-notifications/internal/event"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/icinga/icinga-notifications/pkg/plugin"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"net/url"
)

Expand All @@ -18,6 +20,9 @@ type Channel struct {
Type string `db:"type"`
Config string `db:"config" json:"-"` // excluded from JSON config dump as this may contain sensitive information

ChangedAt types.UnixMilli `db:"changed_at" json:"changed_at"`
Deleted types.Bool `db:"deleted" json:"deleted"`

Logger *zap.SugaredLogger `db:"-"`

restartCh chan newConfig
Expand All @@ -27,6 +32,31 @@ type Channel struct {
pluginCtxCancel func()
}

func (c *Channel) MarshalLogObject(encoder zapcore.ObjectEncoder) error {
encoder.AddInt64("id", c.ID)
encoder.AddString("name", c.Name)
encoder.AddString("type", c.Type)
encoder.AddTime("changed_at", c.ChangedAt.Time())
encoder.AddBool("deleted", c.IsDeleted())
return nil
}

func (c *Channel) GetID() int64 {
return c.ID
}

func (c *Channel) GetChangedAt() types.UnixMilli {
return c.ChangedAt
}

func (c *Channel) IsDeleted() bool {
return c.Deleted.Valid && c.Deleted.Bool
}

func (c *Channel) Validate() error {
return ValidateType(c.Type)
}

// newConfig helps to store the channel's updated properties
type newConfig struct {
ctype string
Expand Down
90 changes: 18 additions & 72 deletions internal/config/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,79 +8,25 @@ import (
)

func (r *RuntimeConfig) fetchChannels(ctx context.Context, tx *sqlx.Tx) error {
var channelPtr *channel.Channel
stmt := r.db.BuildSelectStmt(channelPtr, channelPtr)
r.logger.Debugf("Executing query %q", stmt)

var channels []*channel.Channel
if err := tx.SelectContext(ctx, &channels, stmt); err != nil {
r.logger.Errorln(err)
return err
}

channelsById := make(map[int64]*channel.Channel)
for _, c := range channels {
channelLogger := r.logger.With(
zap.Int64("id", c.ID),
zap.String("name", c.Name),
zap.String("type", c.Type),
)
if channelsById[c.ID] != nil {
channelLogger.Warnw("ignoring duplicate config for channel type")
} else if err := channel.ValidateType(c.Type); err != nil {
channelLogger.Errorw("Cannot load channel config", zap.Error(err))
} else {
channelsById[c.ID] = c

channelLogger.Debugw("loaded channel config")
}
}

if r.Channels != nil {
// mark no longer existing channels for deletion
for id := range r.Channels {
if _, ok := channelsById[id]; !ok {
channelsById[id] = nil
}
}
}

r.pending.Channels = channelsById

return nil
return incrementalFetch(ctx, r, tx, &r.pending.Channels)
}

func (r *RuntimeConfig) applyPendingChannels() {
if r.Channels == nil {
r.Channels = make(map[int64]*channel.Channel)
}

for id, pendingChannel := range r.pending.Channels {
if pendingChannel == nil {
r.Channels[id].Logger.Info("Channel has been removed")
r.Channels[id].Stop()

delete(r.Channels, id)
} else if currentChannel := r.Channels[id]; currentChannel != nil {
// Currently, the whole config is reloaded from the database frequently, replacing everything.
// Prevent restarting the plugin processes every time by explicitly checking for config changes.
// The if condition should no longer be necessary when https://github.com/Icinga/icinga-notifications/issues/5
// is solved properly.
if currentChannel.Type != pendingChannel.Type || currentChannel.Name != pendingChannel.Name || currentChannel.Config != pendingChannel.Config {
currentChannel.Type = pendingChannel.Type
currentChannel.Name = pendingChannel.Name
currentChannel.Config = pendingChannel.Config

currentChannel.Restart()
}
} else {
pendingChannel.Start(context.TODO(), r.logs.GetChildLogger("channel").With(
zap.Int64("id", pendingChannel.ID),
zap.String("name", pendingChannel.Name)))

r.Channels[id] = pendingChannel
}
}

r.pending.Channels = nil
incrementalApplyPending(
r,
&r.Channels, &r.pending.Channels,
func(element *channel.Channel) {
element.Start(context.TODO(), r.logs.GetChildLogger("channel").With(
zap.Int64("id", element.ID),
zap.String("name", element.Name)))
},
func(element, update *channel.Channel) {
element.Name = update.Name
element.Type = update.Type
element.Config = update.Config
element.Restart()
},
func(element *channel.Channel) {
element.Stop()
})
}
60 changes: 11 additions & 49 deletions internal/config/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,21 @@ import (
"context"
"github.com/icinga/icinga-notifications/internal/recipient"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)

func (r *RuntimeConfig) fetchContacts(ctx context.Context, tx *sqlx.Tx) error {
var contactPtr *recipient.Contact
stmt := r.db.BuildSelectStmt(contactPtr, contactPtr)
r.logger.Debugf("Executing query %q", stmt)

var contacts []*recipient.Contact
if err := tx.SelectContext(ctx, &contacts, stmt); err != nil {
r.logger.Errorln(err)
return err
}

contactsByID := make(map[int64]*recipient.Contact)
for _, c := range contacts {
contactsByID[c.ID] = c

r.logger.Debugw("loaded contact config",
zap.Int64("id", c.ID),
zap.String("name", c.FullName))
}

if r.Contacts != nil {
// mark no longer existing contacts for deletion
for id := range r.Contacts {
if _, ok := contactsByID[id]; !ok {
contactsByID[id] = nil
}
}
}

r.pending.Contacts = contactsByID

return nil
return incrementalFetch(ctx, r, tx, &r.pending.Contacts)
}

func (r *RuntimeConfig) applyPendingContacts() {
if r.Contacts == nil {
r.Contacts = make(map[int64]*recipient.Contact)
}

for id, pendingContact := range r.pending.Contacts {
if pendingContact == nil {
delete(r.Contacts, id)
} else if currentContact := r.Contacts[id]; currentContact != nil {
currentContact.FullName = pendingContact.FullName
currentContact.Username = pendingContact.Username
currentContact.DefaultChannelID = pendingContact.DefaultChannelID
} else {
r.Contacts[id] = pendingContact
}
}

r.pending.Contacts = nil
incrementalApplyPending(
r,
&r.Contacts, &r.pending.Contacts,
nil,
func(element, update *recipient.Contact) {
element.FullName = update.FullName
element.Username = update.Username
element.DefaultChannelID = update.DefaultChannelID
},
nil)
}
57 changes: 7 additions & 50 deletions internal/config/contact_address.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,59 +9,16 @@ import (
)

func (r *RuntimeConfig) fetchContactAddresses(ctx context.Context, tx *sqlx.Tx) error {
var addressPtr *recipient.Address
stmt := r.db.BuildSelectStmt(addressPtr, addressPtr)
r.logger.Debugf("Executing query %q", stmt)

var addresses []*recipient.Address
if err := tx.SelectContext(ctx, &addresses, stmt); err != nil {
r.logger.Errorln(err)
return err
}

addressesById := make(map[int64]*recipient.Address)
for _, a := range addresses {
addressesById[a.ID] = a
r.logger.Debugw("loaded contact_address config",
zap.Int64("id", a.ID),
zap.Int64("contact_id", a.ContactID),
zap.String("type", a.Type),
zap.String("address", a.Address),
)
}

if r.ContactAddresses != nil {
// mark no longer existing contacts for deletion
for id := range r.ContactAddresses {
if _, ok := addressesById[id]; !ok {
addressesById[id] = nil
}
}
}

r.pending.ContactAddresses = addressesById

return nil
return incrementalFetch(ctx, r, tx, &r.pending.ContactAddresses)
}

func (r *RuntimeConfig) applyPendingContactAddresses() {
if r.ContactAddresses == nil {
r.ContactAddresses = make(map[int64]*recipient.Address)
}

for id, pendingAddress := range r.pending.ContactAddresses {
currentAddress := r.ContactAddresses[id]

if pendingAddress == nil {
r.removeContactAddress(currentAddress)
} else if currentAddress != nil {
r.updateContactAddress(currentAddress, pendingAddress)
} else {
r.addContactAddress(pendingAddress)
}
}

r.pending.ContactAddresses = nil
incrementalApplyPending(
r,
&r.ContactAddresses, &r.pending.ContactAddresses,
r.addContactAddress,
r.updateContactAddress,
r.removeContactAddress)
}

func (r *RuntimeConfig) addContactAddress(addr *recipient.Address) {
Expand Down
Loading

0 comments on commit 845faae

Please sign in to comment.