diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 1c22afc79..14de96c10 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -161,8 +161,8 @@ func run() int { hactx, cancelHactx := context.WithCancel(ctx) for hactx.Err() == nil { select { - case <-ha.Takeover(): - logger.Info("Taking over") + case takeoverReason := <-ha.Takeover(): + logger.Infow("Taking over", zap.String("reason", takeoverReason)) go func() { for hactx.Err() == nil { @@ -323,8 +323,8 @@ func run() int { } } }() - case <-ha.Handover(): - logger.Warn("Handing over") + case handoverReason := <-ha.Handover(): + logger.Warnw("Handing over", zap.String("reason", handoverReason)) cancelHactx() case <-hactx.Done(): diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 74d3b3234..aa4fcb2c6 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -43,8 +43,8 @@ type HA struct { heartbeat *icingaredis.Heartbeat logger *logging.Logger responsible bool - handover chan struct{} - takeover chan struct{} + handover chan string + takeover chan string done chan struct{} errOnce sync.Once errMu sync.Mutex @@ -64,8 +64,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger db: db, heartbeat: heartbeat, logger: logger, - handover: make(chan struct{}), - takeover: make(chan struct{}), + handover: make(chan string), + takeover: make(chan string), done: make(chan struct{}), } @@ -107,13 +107,13 @@ func (h *HA) Err() error { return h.err } -// Handover returns a channel with which handovers are signaled. -func (h *HA) Handover() chan struct{} { +// Handover returns a channel with which handovers and their reasons are signaled. +func (h *HA) Handover() chan string { return h.handover } -// Takeover returns a channel with which takeovers are signaled. -func (h *HA) Takeover() chan struct{} { +// Takeover returns a channel with which takeovers and their reasons are signaled. +func (h *HA) Takeover() chan string { return h.takeover } @@ -141,9 +141,10 @@ func (h *HA) controller() { oldInstancesRemoved := false - logTicker := time.NewTicker(time.Second * 60) - defer logTicker.Stop() - shouldLog := true + // Suppress recurring log messages in the realize method to be only logged this often. + routineLogTicker := time.NewTicker(time.Second * 60) + defer routineLogTicker.Stop() + shouldLogRoutineEvents := true for { select { @@ -160,7 +161,7 @@ func (h *HA) controller() { } if tt.Before(now.Add(-1 * timeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) - h.signalHandover() + h.signalHandover("received heartbeat from the past") h.realizeLostHeartbeat() continue } @@ -197,8 +198,8 @@ func (h *HA) controller() { } select { - case <-logTicker.C: - shouldLog = true + case <-routineLogTicker.C: + shouldLogRoutineEvents = true default: } @@ -209,10 +210,10 @@ func (h *HA) controller() { } else { realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) } - err = h.realize(realizeCtx, s, t, envId, shouldLog) + err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { - h.signalHandover() + h.signalHandover("context deadline exceeded") continue } if err != nil { @@ -224,10 +225,10 @@ func (h *HA) controller() { oldInstancesRemoved = true } - shouldLog = false + shouldLogRoutineEvents = false } else { h.logger.Error("Lost heartbeat") - h.signalHandover() + h.signalHandover("lost heartbeat") h.realizeLostHeartbeat() } case <-h.heartbeat.Done(): @@ -240,13 +241,25 @@ func (h *HA) controller() { } } -func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { - var takeover, otherResponsible bool +// realize a HA cycle triggered by a heartbeat event. +// +// shouldLogRoutineEvents indicates if recurrent events should be logged. +func (h *HA) realize( + ctx context.Context, + s *icingaredisv1.IcingaStatus, + t *types.UnixMilli, + envId types.Binary, + shouldLogRoutineEvents bool, +) error { + var ( + takeover string + otherResponsible bool + ) err := retry.WithBackoff( ctx, func(ctx context.Context) error { - takeover = false + takeover = "" otherResponsible = false isoLvl := sql.LevelSerializable selectLock := "" @@ -274,15 +287,25 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type switch errQuery { case nil: otherResponsible = true - if shouldLog { + if shouldLogRoutineEvents { h.logger.Infow("Another instance is active", zap.String("instance_id", instance.Id.String()), zap.String("environment", envId.String()), - "heartbeat", instance.Heartbeat, + zap.Time("heartbeat", instance.Heartbeat.Time()), zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) } case sql.ErrNoRows: - takeover = true + fields := []any{ + zap.String("instance_id", instance.Id.String()), + zap.String("environment", envId.String()), + zap.Duration("heartbeat_timeout", timeout)} + + if !h.responsible { + h.logger.Infow("Preparing to take over HA as no instance is active", fields...) + } else if h.responsible && shouldLogRoutineEvents { + h.logger.Infow("Continuing being the active instance", fields...) + } + takeover = "no other instance is active" default: return internal.CantPerformQuery(errQuery, query) } @@ -297,7 +320,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type EnvironmentId: envId, }, Heartbeat: *t, - Responsible: types.Bool{Bool: takeover || h.responsible, Valid: true}, + Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true}, EndpointId: s.EndpointId, Icinga2Version: s.Version, Icinga2StartTime: s.ProgramStart, @@ -314,7 +337,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return internal.CantPerformQuery(err, stmt) } - if takeover { + if takeover != "" { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) @@ -348,14 +371,14 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return err } - if takeover { + if takeover != "" { // Insert the environment after each heartbeat takeover if it does not already exist in the database // as the environment may have changed, although this is likely to happen very rarely. if err := h.insertEnvironment(); err != nil { return errors.Wrap(err, "can't insert environment") } - h.signalTakeover() + h.signalTakeover(takeover) } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { state.otherResponsible = true @@ -366,6 +389,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return nil } +// realizeLostHeartbeat updates "responsible = n" for this HA into the database. func (h *HA) realizeLostHeartbeat() { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?") if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) { @@ -421,7 +445,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar } } -func (h *HA) signalHandover() { +// signalHandover gives up HA.responsible and notifies the HA.Handover chan. +func (h *HA) signalHandover(reason string) { if h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -430,7 +455,7 @@ func (h *HA) signalHandover() { }) select { - case h.handover <- struct{}{}: + case h.handover <- reason: h.responsible = false case <-h.ctx.Done(): // Noop @@ -438,7 +463,8 @@ func (h *HA) signalHandover() { } } -func (h *HA) signalTakeover() { +// signalTakeover claims HA.responsible and notifies the HA.Takeover chan. +func (h *HA) signalTakeover(reason string) { if !h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -447,7 +473,7 @@ func (h *HA) signalTakeover() { }) select { - case h.takeover <- struct{}{}: + case h.takeover <- reason: h.responsible = true case <-h.ctx.Done(): // Noop