diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index 5a2ff5250..4584ccaf8 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -162,8 +162,8 @@ func run() int { hactx, cancelHactx := context.WithCancel(ctx) for hactx.Err() == nil { select { - case <-ha.Takeover(): - logger.Info("Taking over") + case takeoverReason := <-ha.Takeover(): + logger.Infow("Taking over", zap.String("reason", takeoverReason)) go func() { for hactx.Err() == nil { @@ -324,8 +324,8 @@ func run() int { } } }() - case <-ha.Handover(): - logger.Warn("Handing over") + case handoverReason := <-ha.Handover(): + logger.Warnw("Handing over", zap.String("reason", handoverReason)) cancelHactx() case <-hactx.Done(): diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 74d3b3234..fa43911f9 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -43,8 +43,8 @@ type HA struct { heartbeat *icingaredis.Heartbeat logger *logging.Logger responsible bool - handover chan struct{} - takeover chan struct{} + handover chan string + takeover chan string done chan struct{} errOnce sync.Once errMu sync.Mutex @@ -64,8 +64,8 @@ func NewHA(ctx context.Context, db *DB, heartbeat *icingaredis.Heartbeat, logger db: db, heartbeat: heartbeat, logger: logger, - handover: make(chan struct{}), - takeover: make(chan struct{}), + handover: make(chan string), + takeover: make(chan string), done: make(chan struct{}), } @@ -107,13 +107,13 @@ func (h *HA) Err() error { return h.err } -// Handover returns a channel with which handovers are signaled. -func (h *HA) Handover() chan struct{} { +// Handover returns a channel with which handovers and their reasons are signaled. +func (h *HA) Handover() chan string { return h.handover } -// Takeover returns a channel with which takeovers are signaled. -func (h *HA) Takeover() chan struct{} { +// Takeover returns a channel with which takeovers and their reasons are signaled. +func (h *HA) Takeover() chan string { return h.takeover } @@ -141,10 +141,6 @@ func (h *HA) controller() { oldInstancesRemoved := false - logTicker := time.NewTicker(time.Second * 60) - defer logTicker.Stop() - shouldLog := true - for { select { case m := <-h.heartbeat.Events(): @@ -160,7 +156,7 @@ func (h *HA) controller() { } if tt.Before(now.Add(-1 * timeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) - h.signalHandover() + h.signalHandover("received heartbeat from the past") h.realizeLostHeartbeat() continue } @@ -196,12 +192,6 @@ func (h *HA) controller() { h.environmentMu.Unlock() } - select { - case <-logTicker.C: - shouldLog = true - default: - } - var realizeCtx context.Context var cancelRealizeCtx context.CancelFunc if h.responsible { @@ -209,10 +199,10 @@ func (h *HA) controller() { } else { realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) } - err = h.realize(realizeCtx, s, t, envId, shouldLog) + err = h.realize(realizeCtx, s, t, envId) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { - h.signalHandover() + h.signalHandover("context deadline exceeded") continue } if err != nil { @@ -223,11 +213,9 @@ func (h *HA) controller() { go h.removeOldInstances(s, envId) oldInstancesRemoved = true } - - shouldLog = false } else { h.logger.Error("Lost heartbeat") - h.signalHandover() + h.signalHandover("lost heartbeat") h.realizeLostHeartbeat() } case <-h.heartbeat.Done(): @@ -240,7 +228,8 @@ func (h *HA) controller() { } } -func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary, shouldLog bool) error { +// realize a HA cycle triggered by a heartbeat event. +func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *types.UnixMilli, envId types.Binary) error { var takeover, otherResponsible bool err := retry.WithBackoff( @@ -271,18 +260,20 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type errQuery := tx.QueryRowxContext( ctx, query, envId, "y", h.instanceId, time.Now().Add(-1*timeout).UnixMilli(), ).StructScan(instance) - switch errQuery { - case nil: + switch { + case errQuery == nil: otherResponsible = true - if shouldLog { - h.logger.Infow("Another instance is active", - zap.String("instance_id", instance.Id.String()), - zap.String("environment", envId.String()), - "heartbeat", instance.Heartbeat, - zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) - } - case sql.ErrNoRows: + h.logger.Debugw("Another instance is active", + zap.String("instance_id", instance.Id.String()), + zap.String("environment", envId.String()), + zap.Time("heartbeat", instance.Heartbeat.Time()), + zap.Duration("heartbeat_age", time.Since(instance.Heartbeat.Time()))) + case errors.Is(errQuery, sql.ErrNoRows): takeover = true + h.logger.Debugw("No other instance is active, consider myself as active", + zap.Bool("responsible", h.responsible), + zap.String("instance_id", h.instanceId.String()), + zap.String("environment", envId.String())) default: return internal.CantPerformQuery(errQuery, query) } @@ -355,7 +346,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return errors.Wrap(err, "can't insert environment") } - h.signalTakeover() + h.signalTakeover("no other instance is active") } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { state.otherResponsible = true @@ -366,6 +357,7 @@ func (h *HA) realize(ctx context.Context, s *icingaredisv1.IcingaStatus, t *type return nil } +// realizeLostHeartbeat updates "responsible = n" for this HA into the database. func (h *HA) realizeLostHeartbeat() { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE id = ?") if _, err := h.db.ExecContext(h.ctx, stmt, "n", h.instanceId); err != nil && !utils.IsContextCanceled(err) { @@ -421,7 +413,8 @@ func (h *HA) removeOldInstances(s *icingaredisv1.IcingaStatus, envId types.Binar } } -func (h *HA) signalHandover() { +// signalHandover gives up HA.responsible and notifies the HA.Handover chan. +func (h *HA) signalHandover(reason string) { if h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -430,7 +423,7 @@ func (h *HA) signalHandover() { }) select { - case h.handover <- struct{}{}: + case h.handover <- reason: h.responsible = false case <-h.ctx.Done(): // Noop @@ -438,7 +431,8 @@ func (h *HA) signalHandover() { } } -func (h *HA) signalTakeover() { +// signalTakeover claims HA.responsible and notifies the HA.Takeover chan. +func (h *HA) signalTakeover(reason string) { if !h.responsible { h.state.Store(haState{ responsibleTsMilli: time.Now().UnixMilli(), @@ -447,7 +441,7 @@ func (h *HA) signalTakeover() { }) select { - case h.takeover <- struct{}{}: + case h.takeover <- reason: h.responsible = true case <-h.ctx.Done(): // Noop