diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index d5fe38f1a..5fa54071c 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -325,7 +325,10 @@ func run() int { cancelHactx() case <-hactx.Done(): - // Nothing to do here, surrounding loop will terminate now. + if ctx.Err() != nil { + logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) + } + // Otherwise, there is nothing to do here, surrounding loop will terminate now. case <-ha.Done(): if err := ha.Err(); err != nil { logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error")) @@ -337,8 +340,6 @@ func run() int { cancelHactx() return ExitFailure - case <-ctx.Done(): - logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) case s := <-sig: logger.Infow("Exiting due to signal", zap.String("signal", s.String())) cancelHactx() diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 6460ac32d..ad1f752db 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -170,7 +170,7 @@ func (h *HA) controller() { } tt := t.Time() if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) + h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt)) } if tt.Before(now.Add(-1 * peerTimeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) @@ -218,7 +218,7 @@ func (h *HA) controller() { // Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time. realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) - err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) + err = h.realize(realizeCtx, s, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time") @@ -264,11 +264,16 @@ func (h *HA) controller() { // realize a HA cycle triggered by a heartbeat event. // +// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly +// enforced to abort the realization logic the moment the context expires. +// // shouldLogRoutineEvents indicates if recurrent events should be logged. +// +// The internal, retryable function always fetches the last received heartbeat's timestamp instead of reusing the one +// from the calling controller loop. Doing so results in inserting a more accurate timestamp if a retry happens. func (h *HA) realize( ctx context.Context, s *icingaredisv1.IcingaStatus, - t *types.UnixMilli, envId types.Binary, shouldLogRoutineEvents bool, ) error { @@ -300,6 +305,7 @@ func (h *HA) realize( if errBegin != nil { return errors.Wrap(errBegin, "can't start transaction") } + defer func() { _ = tx.Rollback() }() query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+ "WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock @@ -350,7 +356,7 @@ func (h *HA) realize( EnvironmentMeta: v1.EnvironmentMeta{ EnvironmentId: envId, }, - Heartbeat: *t, + Heartbeat: types.UnixMilli(time.UnixMilli(h.heartbeat.LastMessageTime())), Responsible: types.Bool{Bool: takeover != "" || h.responsible, Valid: true}, EndpointId: s.EndpointId, Icinga2Version: s.Version, @@ -370,15 +376,51 @@ func (h *HA) realize( if takeover != "" { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") - _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) + if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil { + return database.CantPerformQuery(err, stmt) + } - if err != nil { + // Insert the environment after each heartbeat takeover if it does not already exist in the database + // as the environment may have changed, although this is likely to happen very rarely. + stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment) + if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil { return database.CantPerformQuery(err, stmt) } } - if err := tx.Commit(); err != nil { - return errors.Wrap(err, "can't commit transaction") + // In general, cancellation does not work for COMMIT and ROLLBACK. Some database drivers may support a + // context-based abort, but only if the DBMS allows it. This was also discussed in the initial issue about + // context support to Go's sql package: https://github.com/golang/go/issues/15123#issuecomment-245882486 + // + // This paragraph is implementation knowledge, not covered by the API specification. Go's sql.Tx.Commit() - + // which is not being overridden by sqlx.Tx - performs a preflight check on the context before handing over + // to the driver's Commit() method. Drivers may behave differently. For example, the used + // github.com/go-sql-driver/mysql package calls its internal exec() method with a COMMIT query, writing and + // reading packets without honoring the context. + // + // In a nutshell, one cannot expect a Tx.Commit() call to be covered by the transaction context. For this + // reason, the following Commit() call has been moved to its own goroutine, which communicates back via a + // channel selected along with the context. If the context ends before Commit(), this retryable function + // returns with a non-retryable error. + // + // However, while the COMMIT continues in the background, it may still succeed. In this case, the state of + // the database does not match the state of Icinga DB, specifically the database says that this instance is + // active while this instance thinks otherwise. Fortunately, this mismatch is not critical because when this + // function is re-entered, the initial SELECT query would be empty for this Icinga DB node and imply the + // presence of another active instance for the other node. Effectively, this could result in a single HA + // cycle with no active node. Afterwards, either this instance takes over due to the false impression that + // no other node is active, or the other instances does so as the inserted heartbeat has already expired. + // Not great, not terrible. + commitErrCh := make(chan error, 1) + go func() { commitErrCh <- tx.Commit() }() + + select { + case err := <-commitErrCh: + if err != nil { + return errors.Wrap(err, "can't commit transaction") + } + case <-ctx.Done(): + return ctx.Err() } return nil @@ -420,12 +462,6 @@ func (h *HA) realize( } if takeover != "" { - // Insert the environment after each heartbeat takeover if it does not already exist in the database - // as the environment may have changed, although this is likely to happen very rarely. - if err := h.insertEnvironment(); err != nil { - return errors.Wrap(err, "can't insert environment") - } - h.signalTakeover(takeover) } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { @@ -445,18 +481,6 @@ func (h *HA) realizeLostHeartbeat() { } } -// insertEnvironment inserts the environment from the specified state into the database if it does not already exist. -func (h *HA) insertEnvironment() error { - // Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does. - stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment) - - if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil { - return database.CantPerformQuery(err, stmt) - } - - return nil -} - func (h *HA) removeInstance(ctx context.Context) { h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId))) // Intentionally not using h.ctx here as it's already cancelled. diff --git a/pkg/icingaredis/heartbeat.go b/pkg/icingaredis/heartbeat.go index 840445a23..5f7a03c07 100644 --- a/pkg/icingaredis/heartbeat.go +++ b/pkg/icingaredis/heartbeat.go @@ -25,6 +25,7 @@ type Heartbeat struct { active bool events chan *HeartbeatMessage lastReceivedMs int64 + lastMessageMs atomic.Int64 cancelCtx context.CancelFunc client *redis.Client done chan struct{} @@ -62,6 +63,11 @@ func (h *Heartbeat) LastReceived() int64 { return atomic.LoadInt64(&h.lastReceivedMs) } +// LastMessageTime returns the last message's time in ms. +func (h *Heartbeat) LastMessageTime() int64 { + return h.lastMessageMs.Load() +} + // Close stops the heartbeat controller loop, waits for it to finish, and returns an error if any. // Implements the io.Closer interface. func (h *Heartbeat) Close() error { @@ -139,6 +145,15 @@ func (h *Heartbeat) controller(ctx context.Context) { } atomic.StoreInt64(&h.lastReceivedMs, m.received.UnixMilli()) + + statsT, err := m.stats.Time() + if err != nil { + h.logger.Warnw("Received Icinga heartbeat with invalid stats time", zap.Error(err)) + h.lastMessageMs.Store(0) + } else { + h.lastMessageMs.Store(statsT.Time().UnixMilli()) + } + h.sendEvent(m) case <-time.After(Timeout): if h.active { @@ -150,6 +165,7 @@ func (h *Heartbeat) controller(ctx context.Context) { } atomic.StoreInt64(&h.lastReceivedMs, 0) + h.lastMessageMs.Store(0) case <-ctx.Done(): return ctx.Err() }