From 3e78b9210efc0a33f0a538cb797f7adb7bd6f24c Mon Sep 17 00:00:00 2001 From: Alvar Penning Date: Mon, 23 Sep 2024 15:12:02 +0200 Subject: [PATCH] Abort HA Realization Logic After Timeout A strange HA behavior was reported in #787, resulting in both instances being active. The logs contained an entry of the previous active instance exiting the HA.realize() method successfully after 1m9s. This, however, should not be possible as the method's context is deadlined to a minute after the heartbeat was received. However, as it turns out, executing COMMIT on a database transaction is not bound to the transaction's context, allowing to survive longer. To mitigate this, another context watch was introduced. Doing so allows directly handing over, while the other instance can now take over due to the expired heartbeat in the database. As a related change, the HA.insertEnvironment() method was inlined into the retryable function to use the deadlined context. Otherwise, this might block afterwards, as it was used within HA.realize(), but without the passed context. Since the retryable HA function may be executed a few times before succeeding, the inserted heartbeat value will be directly outdated. The heartbeat logic was slightly altered to always use the latest heartbeat time value. In addition, the main loop select cases for hactx.Done() and ctx.Done() were unified, as hactx is a derived ctx. A closed ctx case may be lost as the hactx case could have been chosen. --- cmd/icingadb/main.go | 7 ++-- pkg/icingadb/ha.go | 76 ++++++++++++++++++++++++------------ pkg/icingaredis/heartbeat.go | 16 ++++++++ 3 files changed, 70 insertions(+), 29 deletions(-) diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index d5fe38f1a..5fa54071c 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -325,7 +325,10 @@ func run() int { cancelHactx() case <-hactx.Done(): - // Nothing to do here, surrounding loop will terminate now. + if ctx.Err() != nil { + logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) + } + // Otherwise, there is nothing to do here, surrounding loop will terminate now. case <-ha.Done(): if err := ha.Err(); err != nil { logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error")) @@ -337,8 +340,6 @@ func run() int { cancelHactx() return ExitFailure - case <-ctx.Done(): - logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) case s := <-sig: logger.Infow("Exiting due to signal", zap.String("signal", s.String())) cancelHactx() diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 6460ac32d..2775be1ae 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -170,7 +170,7 @@ func (h *HA) controller() { } tt := t.Time() if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) + h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt)) } if tt.Before(now.Add(-1 * peerTimeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) @@ -218,7 +218,7 @@ func (h *HA) controller() { // Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time. realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) - err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) + err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time") @@ -264,11 +264,16 @@ func (h *HA) controller() { // realize a HA cycle triggered by a heartbeat event. // +// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly +// enforced to abort the realization logic the moment the context expires. +// // shouldLogRoutineEvents indicates if recurrent events should be logged. +// +// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one +// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens. func (h *HA) realize( ctx context.Context, s *icingaredisv1.IcingaStatus, - t *types.UnixMilli, envId types.Binary, shouldLogRoutineEvents bool, ) error { @@ -300,6 +305,7 @@ func (h *HA) realize( if errBegin != nil { return errors.Wrap(errBegin, "can't start transaction") } + defer func() { _ = tx.Rollback() }() query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+ "WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock @@ -350,7 +356,7 @@ func (h *HA) realize( EnvironmentMeta: v1.EnvironmentMeta{ EnvironmentId: envId, }, - Heartbeat: *t, + Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessage())), Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true}, EndpointId: s.EndpointId, Icinga2Version: s.Version, @@ -370,15 +376,51 @@ func (h *HA) realize( if takeover != "" { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") - _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) + if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil { + return database.CantPerformQuery(err, stmt) + } - if err != nil { + // Insert the environment after each heartbeat takeover if it does not already exist in the database + // as the environment may have changed, although this is likely to happen very rarely. + stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment) + if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil { return database.CantPerformQuery(err, stmt) } } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "can't commit transaction") + // In general, cancellation does not work for COMMIT and ROLLBACK. Some database drivers may support a + // context-based abort, but only if the DBMS allows it. This was also discussed in the initial issue about + // context support to Go's sql package: https://github.com/golang/go/issues/15123#issuecomment-245882486 + // + // This paragraph is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() - + // which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over + // to the driver's Commit() method. Drivers may behave differently. For example, the used + // github.com/go-sql-driver/mysql package calls its internal exec() method with a COMMIT query, writing and + // reading packets without honoring the context. + // + // In a nutshell, one cannot expect a Tx.Commit() call to be covered by the transaction context. For this + // reason, the following Commit() call has been moved to its own goroutine, which communicates back via a + // channel selected along with the context. If the context ends before Commit(), this retryable function + // returns with a non-retryable error. + // + // However, while the COMMIT continues in the background, it may still succeed. In this case, the state of + // the database does not match the state of Icinga DB, specifically the database says that this instance is + // active while this instance thinks otherwise. Fortunately, this mismatch is not critical because when this + // function is re-entered, the initial SELECT query would be empty for this Icinga DB node and imply the + // presence of another active instance for the other node. Effectively, this could result in a single HA + // cycle with no active node. Afterwards, either this instance takes over due to the false impression that + // no other node is active, or the other instances does so as the inserted heartbeat has already expired. + // Not great, not terrible. + commitErrCh := make(chan error, 1) + go func() { commitErrCh <- tx.Commit() }() + + select { + case err := <-commitErrCh: + if err != nil { + return errors.Wrap(err, "can't commit transaction") + } + case <-ctx.Done(): + return ctx.Err() } return nil @@ -420,12 +462,6 @@ func (h *HA) realize( } if takeover != "" { - // Insert the environment after each heartbeat takeover if it does not already exist in the database - // as the environment may have changed, although this is likely to happen very rarely. - if err := h.insertEnvironment(); err != nil { - return errors.Wrap(err, "can't insert environment") - } - h.signalTakeover(takeover) } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { @@ -445,18 +481,6 @@ func (h *HA) realizeLostHeartbeat() { } } -// insertEnvironment inserts the environment from the specified state into the database if it does not already exist. -func (h *HA) insertEnvironment() error { - // Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does. - stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment) - - if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil { - return database.CantPerformQuery(err, stmt) - } - - return nil -} - func (h *HA) removeInstance(ctx context.Context) { h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId))) // Intentionally not using h.ctx here as it's already cancelled. diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 840445a23..bcda4d47d 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -25,6 +25,7 @@ type Heartbeat struct { active bool events chan *HeartbeatMessage lastReceivedMs int64 + lastMessageMs int64 cancelCtx context.CancelFunc client *redis.Client done chan struct{} @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 { return atomic.LoadInt64(&h.lastReceivedMs) } +// LastMessage returns the last message's time in ms. +func (h *Heartbeat) LastMessage() int64 { + return atomic.LoadInt64(&h.lastMessageMs) +} + // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. // Implements the io.Closer interface. func (h *Heartbeat) Close() error { @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) { } atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) + + statsT, err := m.stats.Time() + if err != nil { + h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err)) + atomic.StoreInt64(&h.lastMessageMs, 0) + } else { + atomic.StoreInt64(&h.lastMessageMs, statsT.Time().UnixMilli()) + } + h.sendEvent(m) case <-time.After(Timeout): if h.active { @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) { } atomic.StoreInt64(&h.lastReceivedMs, 0) + atomic.StoreInt64(&h.lastMessageMs, 0) case <-ctx.Done(): return ctx.Err() }