Skip to content

Commit

Permalink
Write create & delete sla events when initially syncing configs
Browse files Browse the repository at this point in the history
  • Loading branch information
yhabteab committed Mar 3, 2023
1 parent 15c8ddc commit 8492326
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 2 deletions.
137 changes: 137 additions & 0 deletions pkg/icingadb/sla.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package icingadb

import (
"context"
"github.com/icinga/icingadb/pkg/contracts"
v1 "github.com/icinga/icingadb/pkg/icingadb/v1"
"github.com/icinga/icingadb/pkg/types"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"time"
)

type SlaHistoryTrail struct {
Id types.Int `json:"id" db:"-"`
v1.EnvironmentMeta `json:",inline"`
HostId types.Binary `json:"host_id"`
ServiceId types.Binary `json:"service_id"`
EventType string `json:"event_type"`
EventTime types.UnixMilli `json:"event_time"`
}

// Fingerprint implements the contracts.Fingerprinter interface.
func (sht SlaHistoryTrail) Fingerprint() contracts.Fingerprinter {
return sht
}

// ID implements part of the contracts.IDer interface.
func (sht SlaHistoryTrail) ID() contracts.ID {
return sht.Id
}

// SetID implements part of the contracts.IDer interface.
func (sht *SlaHistoryTrail) SetID(id contracts.ID) {
sht.Id = id.(types.Int)
}

type SlaServiceHistoryTrailColumns struct {
v1.EntityWithoutChecksum `json:",inline"`
HostId types.Binary `json:"host_id"`
}

func CheckableToSlaTrailEntities(ctx context.Context, g *errgroup.Group, checkables <-chan contracts.Entity, eventType string) <-chan contracts.Entity {
entities := make(chan contracts.Entity, 1)

g.Go(func() error {
defer close(entities)

env, err := getEnvironmentId(ctx)
if err != nil {
return err
}

// Use the same event time for all chackables
now := time.Now()

for {
select {
case checkable, ok := <-checkables:
if !ok {
return nil
}

entity := &SlaHistoryTrail{
EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: env},
EventType: eventType,
EventTime: types.UnixMilli(now),
}

switch ptr := checkable.(type) {
case *v1.Host:
entity.HostId = ptr.Id
case *v1.Service:
entity.HostId = ptr.HostId
entity.ServiceId = ptr.Id
}

entities <- entity
case <-ctx.Done():
return ctx.Err()
}
}
})

return entities
}

// HostIdsToSlaHistoryTrail transforms the IDs from the passed channel into sla history trail struct
// and streams them into a returned channel.
func HostIdsToSlaHistoryTrail(ctx context.Context, g *errgroup.Group, ids <-chan any, eventType string) <-chan contracts.Entity {
entities := make(chan contracts.Entity, 1)
g.Go(func() error {
defer close(entities)

env, err := getEnvironmentId(ctx)
if err != nil {
return err
}

// Use the same event time for all hosts
now := time.Now()

for {
select {
case id, ok := <-ids:
if !ok {
return nil
}

entities <- &SlaHistoryTrail{
EnvironmentMeta: v1.EnvironmentMeta{EnvironmentId: env},
HostId: id.(types.Binary),
EventType: eventType,
EventTime: types.UnixMilli(now),
}
case <-ctx.Done():
return ctx.Err()
}
}
})

return entities
}

// Get environment id from the given context
func getEnvironmentId(ctx context.Context) (types.Binary, error) {
env, ok := v1.EnvironmentFromContext(ctx)
if !ok {
return nil, errors.New("can't get environment from context")
}

return env.Id, nil
}

// Assert interface compliance.
var (
_ contracts.Entity = (*SlaHistoryTrail)(nil)
)
68 changes: 66 additions & 2 deletions pkg/icingadb/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"runtime"
"strings"
"time"
)

Expand Down Expand Up @@ -129,9 +130,29 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
entities = delta.Create.Entities(ctx)
}

var createdEntities chan contracts.Entity
onSuccessHandlers := []OnSuccess[contracts.Entity]{OnSuccessIncrement[contracts.Entity](stat)}

switch delta.Subject.Entity().(type) {
case *v1.Host, *v1.Service:
createdEntities = make(chan contracts.Entity)
onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[contracts.Entity](createdEntities))
}

g.Go(func() error {
return s.db.CreateStreamed(ctx, entities, OnSuccessIncrement[contracts.Entity](stat))
if createdEntities != nil {
defer close(createdEntities)
}

return s.db.CreateStreamed(ctx, entities, onSuccessHandlers...)
})

if createdEntities != nil {
s.logger.Infof("Inserting %d items of type %s sla history trails of type create", len(delta.Create), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
g.Go(func() error {
return s.db.CreateStreamed(ctx, CheckableToSlaTrailEntities(ctx, g, createdEntities, "create"))
})
}
}

// Update
Expand Down Expand Up @@ -161,9 +182,52 @@ func (s Sync) ApplyDelta(ctx context.Context, delta *Delta) error {
// Delete
if len(delta.Delete) > 0 {
s.logger.Infof("Deleting %d items of type %s", len(delta.Delete), utils.Key(utils.Name(delta.Subject.Entity()), ' '))
entity := delta.Subject.Entity()
if _, ok := entity.(*v1.Service); ok {
s.logger.Infof("Inserting %d items of type service sla history trails of type delete", len(delta.Delete))
g.Go(func() error {
columns := &SlaServiceHistoryTrailColumns{}
query := s.db.BuildSelectStmt(entity, columns)
if len(delta.Delete) == 1 {
query += ` WHERE id = ?`
} else {
var placeholders []string
for i := 0; i < len(delta.Delete); i++ {
placeholders = append(placeholders, "?")
}

query += fmt.Sprintf(` WHERE id IN (%s)`, strings.Join(placeholders, `, `))
}
entities, err := s.db.YieldAll(ctx, delta.Subject.Factory(), query, false, delta.Delete.IDs()...)
com.ErrgroupReceive(g, err)

return s.db.CreateStreamed(ctx, entities)
})
}

var hostIds chan any
onSuccessHandlers := []OnSuccess[any]{OnSuccessIncrement[any](stat)}

_, isHost := entity.(*v1.Host)
if isHost {
hostIds = make(chan any, 1)
onSuccessHandlers = append(onSuccessHandlers, OnSuccessSendTo[any](hostIds))
}

g.Go(func() error {
return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), OnSuccessIncrement[any](stat))
if isHost {
defer close(hostIds)
}

return s.db.Delete(ctx, delta.Subject.Entity(), delta.Delete.IDs(), onSuccessHandlers...)
})

if isHost {
s.logger.Infof("Inserting %d items of type host sla history trails of type delete", len(delta.Delete))
g.Go(func() error {
return s.db.CreateStreamed(ctx, HostIdsToSlaHistoryTrail(ctx, g, hostIds, "delete"))
})
}
}

return g.Wait()
Expand Down
12 changes: 12 additions & 0 deletions schema/mysql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,18 @@ CREATE TABLE sla_history_downtime (
INDEX idx_sla_history_downtime_env_downtime_end (environment_id, downtime_end) COMMENT 'Filter for sla history retention'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

CREATE TABLE sla_history_trail (
id bigint NOT NULL AUTO_INCREMENT,
environment_id binary(20) NOT NULL COMMENT 'environment.id',
host_id binary(20) NOT NULL COMMENT 'host.id (may reference already deleted hosts)',
service_id binary(20) DEFAULT NULL COMMENT 'service.id (may reference already deleted services)',

event_type enum('delete', 'create') NOT NULL,
event_time bigint unsigned NOT NULL COMMENT 'unix timestamp the event occurred',

PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin ROW_FORMAT=DYNAMIC;

CREATE TABLE icingadb_schema (
id int unsigned NOT NULL AUTO_INCREMENT,
version smallint unsigned NOT NULL,
Expand Down
18 changes: 18 additions & 0 deletions schema/pgsql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ CREATE TYPE boolenum AS ENUM ( 'n', 'y' );
CREATE TYPE acked AS ENUM ( 'n', 'y', 'sticky' );
CREATE TYPE state_type AS ENUM ( 'hard', 'soft' );
CREATE TYPE checkable_type AS ENUM ( 'host', 'service' );
CREATE TYPE sla_trail_event_type AS ENUM ( 'create', 'delete' );
CREATE TYPE comment_type AS ENUM ( 'comment', 'ack' );
CREATE TYPE notification_type AS ENUM ( 'downtime_start', 'downtime_end', 'downtime_removed', 'custom', 'acknowledgement', 'problem', 'recovery', 'flapping_start', 'flapping_end' );
CREATE TYPE history_type AS ENUM ( 'notification', 'state_change', 'downtime_start', 'downtime_end', 'comment_add', 'comment_remove', 'flapping_start', 'flapping_end', 'ack_set', 'ack_clear' );
Expand Down Expand Up @@ -2147,6 +2148,23 @@ COMMENT ON COLUMN sla_history_downtime.downtime_id IS 'downtime.id (may referenc
COMMENT ON COLUMN sla_history_downtime.downtime_start IS 'start time of the downtime';
COMMENT ON COLUMN sla_history_downtime.downtime_end IS 'end time of the downtime';

CREATE TABLE sla_history_trail (
id bigserial NOT NULL,
environment_id bytea20 NOT NULL,
host_id bytea20 NOT NULL,
service_id bytea20 DEFAULT NULL,

event_type sla_trail_event_type NOT NULL,
event_time biguint NOT NULL,

CONSTRAINT pk_sla_history_trail PRIMARY KEY (id)
);

COMMENT ON COLUMN sla_history_trail.environment_id IS 'environment.id';
COMMENT ON COLUMN sla_history_trail.host_id IS 'host.id (may reference already deleted hosts)';
COMMENT ON COLUMN sla_history_trail.service_id IS 'service.id (may reference already deleted services)';
COMMENT ON COLUMN sla_history_trail.event_time IS 'unix timestamp the event occurred';

CREATE SEQUENCE icingadb_schema_id_seq;

CREATE TABLE icingadb_schema (
Expand Down

0 comments on commit 8492326

Please sign in to comment.