Skip to content

Commit

Permalink
Create Event Stream Clients from Source
Browse files Browse the repository at this point in the history
After #132 got merged and each Source's state is now within the
database, the Event Stream's configuration could go there, too.

This resulted in some level of refactoring as the data flow logic was
now reversed at some points. Especially Golang's non-cyclic imports and
the omnipresence of the RuntimeConfig made the "hack" of the
eventstream.Launcher necessary to not have an importing edge from config
to eventstream.
  • Loading branch information
oxzi committed Dec 11, 2023
1 parent 619ecff commit 97824ac
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 165 deletions.
19 changes: 4 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,11 @@ It is required that you have created a new database and imported the [schema](sc
Additionally, it also requires you to manually insert items into the **source** table before starting the daemon.
```sql
INSERT INTO source (id, type, name, listener_password_hash)
VALUES (1, 'icinga2', 'Icinga 2', '$2y$10$QU8bJ7cpW1SmoVQ/RndX5O2J5L1PJF7NZ2dlIW7Rv3zUEcbUFg3z2');
INSERT INTO source
(id, type, name, icinga2_base_url, icinga2_auth_user, icinga2_auth_pass, icinga2_insecure_tls)
VALUES
(1, 'icinga2', 'Local Icinga 2', 'https://localhost:5665', 'root', 'icinga', 'y');
```
The `listener_password_hash` is a [PHP `password_hash`](https://www.php.net/manual/en/function.password-hash.php) with the `PASSWORD_DEFAULT` algorithm, currently bcrypt.
In the example above, this is "correct horse battery staple".
This mimics Icinga Web 2's behavior, as stated in [its documentation](https://icinga.com/docs/icinga-web/latest/doc/20-Advanced-Topics/#manual-user-creation-for-database-authentication-backend).

Currently, there are two ways how notifications get communicated between Icinga 2 and Icinga Notifications.
Please select only one, whereby the first is recommended:

* Icinga Notifications can pull those from the Icinga 2 API when being configured in the YAML configuration file.
For each `source`, as inserted in the database above, an `icinga2-apis` endpoint must be defined.
* Otherwise, Icinga 2 can push the notifications to the Icinga Notification daemon.
Therefore, you need to copy the [Icinga 2 config](icinga2.conf) to `/etc/icinga2/features-enabled` on your master node(s) and restart the Icinga 2 service.
At the top of this file, you will find multiple configurations options that can be set in `/etc/icinga2/constants.conf`.
There are also Icinga2 `EventCommand` definitions in this file that will automatically match all your **checkables**, which may not work properly if the configuration already uses event commands for something else.

Then, you can launch the daemon with the following command.
```go
Expand Down
21 changes: 12 additions & 9 deletions cmd/icinga-notifications-daemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,26 +86,29 @@ func main() {
ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer cancel()

runtimeConfig := config.NewRuntimeConfig(db, logs)
esLauncher := &eventstream.Launcher{
Ctx: ctx,
Logs: logs,
Db: db,
RuntimeConfig: nil, // Will be set below as it is interconnected..
}

runtimeConfig := config.NewRuntimeConfig(esLauncher.Launch, logs, db)
if err := runtimeConfig.UpdateFromDatabase(ctx); err != nil {
logger.Fatalw("failed to load config from database", zap.Error(err))
}

esLauncher.RuntimeConfig = runtimeConfig

go runtimeConfig.PeriodicUpdates(ctx, 1*time.Second)

err = incident.LoadOpenIncidents(ctx, db, logs.GetChildLogger("incident"), runtimeConfig)
if err != nil {
logger.Fatalw("Can't load incidents from database", zap.Error(err))
}

esClients, err := eventstream.NewClientsFromConfig(ctx, logs, db, runtimeConfig, conf)
if err != nil {
logger.Fatalw("cannot prepare Event Stream API Clients form config", zap.Error(err))
}
for _, esClient := range esClients {
go esClient.Process()
}

// Wait to load open incidents from the database before either starting Event Stream Clients or starting the Listener.
esLauncher.Ready()
if err := listener.NewListener(db, runtimeConfig, logs).Run(ctx); err != nil {
logger.Errorw("Listener has finished with an error", zap.Error(err))
} else {
Expand Down
10 changes: 0 additions & 10 deletions config.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@
icingaweb2-url: http://localhost/icingaweb2/
channel-plugin-dir: /usr/libexec/icinga-notifications/channel

icinga2-apis:
- notifications-event-source-id: 1
host: https://localhost:5665
auth-user: root
auth-pass: icinga
# The Icinga 2 API CA must either be in the system's CA store, be passed as
# icinga-ca-file or certificate verification can be disabled.
# icinga-ca-file: /path/to/icinga-ca.crt
# insecure-tls: true

database:
type: pgsql
host: /run/postgresql
Expand Down
25 changes: 22 additions & 3 deletions internal/config/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type RuntimeConfig struct {
// Accessing it requires a lock that is obtained with RLock() and released with RUnlock().
ConfigSet

// EventStreamLaunchFunc is a callback to launch an Event Stream API Client.
// This became necessary due to circular imports, either with the incident or eventstream package.
EventStreamLaunchFunc func(source *Source)

// pending contains changes to config objects that are to be applied to the embedded live config.
pending ConfigSet

Expand All @@ -36,8 +40,18 @@ type RuntimeConfig struct {
mu sync.RWMutex
}

func NewRuntimeConfig(db *icingadb.DB, logs *logging.Logging) *RuntimeConfig {
return &RuntimeConfig{db: db, logs: logs, logger: logs.GetChildLogger("runtime-updates")}
func NewRuntimeConfig(
esLaunch func(source *Source),
logs *logging.Logging,
db *icingadb.DB,
) *RuntimeConfig {
return &RuntimeConfig{
EventStreamLaunchFunc: esLaunch,

logs: logs,
logger: logs.GetChildLogger("runtime-updates"),
db: db,
}
}

type ConfigSet struct {
Expand Down Expand Up @@ -167,9 +181,14 @@ func (r *RuntimeConfig) GetSourceFromCredentials(user, pass string, logger *logg
return nil
}

if !source.ListenerPasswordHash.Valid {
logger.Debugw("Cannot check credentials for source without a listener_password_hash", zap.Int64("id", sourceId))
return nil
}

// If either PHP's PASSWORD_DEFAULT changes or Icinga Web 2 starts using something else, e.g., Argon2id, this will
// return a descriptive error as the identifier does no longer match the bcrypt "$2y$".
err = bcrypt.CompareHashAndPassword([]byte(source.ListenerPasswordHash), []byte(pass))
err = bcrypt.CompareHashAndPassword([]byte(source.ListenerPasswordHash.String), []byte(pass))
if errors.Is(err, bcrypt.ErrMismatchedHashAndPassword) {
logger.Debugw("Invalid password for this source", zap.Int64("id", sourceId))
return nil
Expand Down
78 changes: 66 additions & 12 deletions internal/config/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,46 @@ package config

import (
"context"
"github.com/icinga/icingadb/pkg/types"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)

// SourceTypeIcinga2 represents the "icinga2" Source Type for Event Stream API sources.
const SourceTypeIcinga2 = "icinga2"

// Source entry within the ConfigSet to describe a source.
type Source struct {
ID int64 `db:"id"`
Type string `db:"type"`
Name string `db:"name"`

ListenerPasswordHash string `db:"listener_password_hash"`
ListenerPasswordHash types.String `db:"listener_password_hash"`

Icinga2BaseURL types.String `db:"icinga2_base_url"`
Icinga2AuthUser types.String `db:"icinga2_auth_user"`
Icinga2AuthPass types.String `db:"icinga2_auth_pass"`
Icinga2CAPem types.String `db:"icinga2_ca_pem"`
Icinga2InsecureTLS types.Bool `db:"icinga2_insecure_tls"`

// Icinga2SourceConf for Event Stream API sources, only if Source.Type == SourceTypeIcinga2.
Icinga2SourceCancel context.CancelFunc `db:"-" json:"-"`
}

// FieldEquals checks if this Source's database fields are equal to those of another Source.
func (source *Source) FieldEquals(other *Source) bool {
boolEq := func(a, b types.Bool) bool { return (!a.Valid && !b.Valid) || (a.Bool == b.Bool) }
stringEq := func(a, b types.String) bool { return (!a.Valid && !b.Valid) || (a.String == b.String) }

return source.ID == other.ID &&
source.Type == other.Type &&
source.Name == other.Name &&
stringEq(source.ListenerPasswordHash, other.ListenerPasswordHash) &&
stringEq(source.Icinga2BaseURL, other.Icinga2BaseURL) &&
stringEq(source.Icinga2AuthUser, other.Icinga2AuthUser) &&
stringEq(source.Icinga2AuthPass, other.Icinga2AuthPass) &&
stringEq(source.Icinga2CAPem, other.Icinga2CAPem) &&
boolEq(source.Icinga2InsecureTLS, other.Icinga2InsecureTLS)
}

func (r *RuntimeConfig) fetchSources(ctx context.Context, tx *sqlx.Tx) error {
Expand All @@ -34,12 +63,12 @@ func (r *RuntimeConfig) fetchSources(ctx context.Context, tx *sqlx.Tx) error {
zap.String("type", s.Type),
)
if sourcesById[s.ID] != nil {
sourceLogger.Warnw("ignoring duplicate config for source ID")
} else {
sourcesById[s.ID] = s

sourceLogger.Debugw("loaded source config")
sourceLogger.Error("Ignoring duplicate config for source ID")
continue
}

sourcesById[s.ID] = s
sourceLogger.Debug("loaded source config")
}

if r.Sources != nil {
Expand All @@ -62,16 +91,41 @@ func (r *RuntimeConfig) applyPendingSources() {
}

for id, pendingSource := range r.pending.Sources {
if pendingSource == nil {
r.logger.Infow("Source has been removed",
zap.Int64("id", r.Sources[id].ID),
zap.String("name", r.Sources[id].Name),
zap.String("type", r.Sources[id].Type))
logger := r.logger.With(zap.Int64("id", id))
currentSource := r.Sources[id]

// Compare the pending source with an optional existing source; instruct the Event Source Client, if necessary.
if pendingSource == nil && currentSource != nil {
logger.Info("Source has been removed")

if currentSource.Type == SourceTypeIcinga2 {
currentSource.Icinga2SourceCancel()
}
delete(r.Sources, id)
continue
} else if pendingSource != nil && currentSource != nil {
if currentSource.FieldEquals(pendingSource) {
continue
}

logger.Info("Source has been updated")

if currentSource.Type == SourceTypeIcinga2 {
currentSource.Icinga2SourceCancel()
}
} else if pendingSource != nil && currentSource == nil {
logger.Info("Source has been added")
} else {
r.Sources[id] = pendingSource
// Neither an active nor a pending source?
logger.Error("Cannot applying pending configuration: neither an active nor a pending source")
continue
}

if pendingSource.Type == SourceTypeIcinga2 {
r.EventStreamLaunchFunc(pendingSource)
}

r.Sources[id] = pendingSource
}

r.pending.Sources = nil
Expand Down
10 changes: 0 additions & 10 deletions internal/daemon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,11 @@ import (
"os"
)

type Icinga2ApiConfig struct {
NotificationsEventSourceId int64 `yaml:"notifications-event-source-id"`
Host string `yaml:"host"`
AuthUser string `yaml:"auth-user"`
AuthPass string `yaml:"auth-pass"`
IcingaCaFile string `yaml:"icinga-ca-file"`
InsecureTls bool `yaml:"insecure-tls"`
}

type ConfigFile struct {
Listen string `yaml:"listen" default:"localhost:5680"`
DebugPassword string `yaml:"debug-password"`
ChannelPluginDir string `yaml:"channel-plugin-dir" default:"/usr/libexec/icinga-notifications/channel"`
Icingaweb2URL string `yaml:"icingaweb2-url"`
Icinga2Apis []Icinga2ApiConfig `yaml:"icinga2-apis"`
Database icingadbConfig.Database `yaml:"database"`
Logging icingadbConfig.Logging `yaml:"logging"`
}
Expand Down
Loading

0 comments on commit 97824ac

Please sign in to comment.