diff --git a/cmd/icingadb/main.go b/cmd/icingadb/main.go index d5fe38f1a..5fa54071c 100644 --- a/cmd/icingadb/main.go +++ b/cmd/icingadb/main.go @@ -325,7 +325,10 @@ func run() int { cancelHactx() case <-hactx.Done(): - // Nothing to do here, surrounding loop will terminate now. + if ctx.Err() != nil { + logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) + } + // Otherwise, there is nothing to do here, surrounding loop will terminate now. case <-ha.Done(): if err := ha.Err(); err != nil { logger.Fatalf("%+v", errors.Wrap(err, "HA exited with an error")) @@ -337,8 +340,6 @@ func run() int { cancelHactx() return ExitFailure - case <-ctx.Done(): - logger.Fatalf("%+v", errors.New("main context closed unexpectedly")) case s := <-sig: logger.Infow("Exiting due to signal", zap.String("signal", s.String())) cancelHactx() diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 6460ac32d..ea0404601 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -170,7 +170,7 @@ func (h *HA) controller() { } tt := t.Time() if tt.After(now.Add(1 * time.Second)) { - h.logger.Debugw("Received heartbeat from the future", zap.Time("time", tt)) + h.logger.Warnw("Received heartbeat from the future", zap.Time("time", tt)) } if tt.Before(now.Add(-1 * peerTimeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) @@ -264,6 +264,9 @@ func (h *HA) controller() { // realize a HA cycle triggered by a heartbeat event. // +// The context passed is expected to have a deadline, otherwise the method will panic. This deadline is strictly +// enforced to abort the realization logic the moment the context expires. +// // shouldLogRoutineEvents indicates if recurrent events should be logged. func (h *HA) realize( ctx context.Context, @@ -300,6 +303,7 @@ func (h *HA) realize( if errBegin != nil { return errors.Wrap(errBegin, "can't start transaction") } + defer func() { _ = tx.Rollback() }() query := h.db.Rebind("SELECT id, heartbeat FROM icingadb_instance "+ "WHERE environment_id = ? AND responsible = ? AND id <> ?") + selectLock @@ -370,9 +374,14 @@ func (h *HA) realize( if takeover != "" { stmt := h.db.Rebind("UPDATE icingadb_instance SET responsible = ? WHERE environment_id = ? AND id <> ?") - _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId) + if _, err := tx.ExecContext(ctx, stmt, "n", envId, h.instanceId); err != nil { + return database.CantPerformQuery(err, stmt) + } - if err != nil { + // Insert the environment after each heartbeat takeover if it does not already exist in the database + // as the environment may have changed, although this is likely to happen very rarely. + stmt, _ = h.db.BuildInsertIgnoreStmt(h.environment) + if _, err := h.db.NamedExecContext(ctx, stmt, h.environment); err != nil { return database.CantPerformQuery(err, stmt) } } @@ -386,7 +395,7 @@ func (h *HA) realize( retry.Retryable, backoff.NewExponentialWithJitter(256*time.Millisecond, 3*time.Second), retry.Settings{ - // Intentionally no timeout is set, as we use a context with a deadline. + // Intentionally, no timeout is set because a context with a deadline is used and QuickContextExit is set. OnRetryableError: func(_ time.Duration, attempt uint64, err, lastErr error) { if lastErr == nil || err.Error() != lastErr.Error() { log := h.logger.Debugw @@ -413,6 +422,7 @@ func (h *HA) realize( zap.NamedError("recovered_error", lastErr)) } }, + QuickContextExit: true, }, ) if err != nil { @@ -420,12 +430,6 @@ func (h *HA) realize( } if takeover != "" { - // Insert the environment after each heartbeat takeover if it does not already exist in the database - // as the environment may have changed, although this is likely to happen very rarely. - if err := h.insertEnvironment(); err != nil { - return errors.Wrap(err, "can't insert environment") - } - h.signalTakeover(takeover) } else if otherResponsible { if state, _ := h.state.Load(); !state.otherResponsible { @@ -445,18 +449,6 @@ func (h *HA) realizeLostHeartbeat() { } } -// insertEnvironment inserts the environment from the specified state into the database if it does not already exist. -func (h *HA) insertEnvironment() error { - // Instead of checking whether the environment already exists, use an INSERT statement that does nothing if it does. - stmt, _ := h.db.BuildInsertIgnoreStmt(h.environment) - - if _, err := h.db.NamedExecContext(h.ctx, stmt, h.environment); err != nil { - return database.CantPerformQuery(err, stmt) - } - - return nil -} - func (h *HA) removeInstance(ctx context.Context) { h.logger.Debugw("Removing our row from icingadb_instance", zap.String("instance_id", hex.EncodeToString(h.instanceId))) // Intentionally not using h.ctx here as it's already cancelled.