diff --git a/pkg/config/redis.go b/pkg/config/redis.go index dbb21a709..ad8b31a60 100644 --- a/pkg/config/redis.go +++ b/pkg/config/redis.go @@ -85,16 +85,16 @@ func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), retry.Settings{ - Timeout: 5 * time.Minute, - OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + Timeout: retry.DefaultTimeout, + OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) { if lastErr == nil || err.Error() != lastErr.Error() { logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err)) } }, OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { - if attempt > 0 { + if attempt > 1 { logger.Infow("Reconnected to Redis", - zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) + zap.Duration("after", elapsed), zap.Uint64("attempts", attempt)) } }, }, diff --git a/pkg/icingadb/cleanup.go b/pkg/icingadb/cleanup.go index 15d36e86f..22bf02d6e 100644 --- a/pkg/icingadb/cleanup.go +++ b/pkg/icingadb/cleanup.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "github.com/icinga/icingadb/internal" + "github.com/icinga/icingadb/pkg/backoff" "github.com/icinga/icingadb/pkg/com" + "github.com/icinga/icingadb/pkg/retry" "github.com/icinga/icingadb/pkg/types" "time" ) @@ -40,32 +42,46 @@ func (db *DB) CleanupOlderThan( count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}], ) (uint64, error) { var counter com.Counter - defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop() + + q := db.Rebind(stmt.Build(db.DriverName(), count)) + + defer db.log(ctx, q, &counter).Stop() for { - q := db.Rebind(stmt.Build(db.DriverName(), count)) - rs, err := db.NamedExecContext(ctx, q, cleanupWhere{ - EnvironmentId: envId, - Time: types.UnixMilli(olderThan), - }) - if err != nil { - return 0, internal.CantPerformQuery(err, q) - } + var rowsDeleted int64 + + err := retry.WithBackoff( + ctx, + func(ctx context.Context) error { + rs, err := db.NamedExecContext(ctx, q, cleanupWhere{ + EnvironmentId: envId, + Time: types.UnixMilli(olderThan), + }) + if err != nil { + return internal.CantPerformQuery(err, q) + } + + rowsDeleted, err = rs.RowsAffected() - n, err := rs.RowsAffected() + return err + }, + retry.Retryable, + backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), + db.getDefaultRetrySettings(), + ) if err != nil { return 0, err } - counter.Add(uint64(n)) + counter.Add(uint64(rowsDeleted)) for _, onSuccess := range onSuccess { - if err := onSuccess(ctx, make([]struct{}, n)); err != nil { + if err := onSuccess(ctx, make([]struct{}, rowsDeleted)); err != nil { return 0, err } } - if n < int64(count) { + if rowsDeleted < int64(count) { break } } diff --git a/pkg/icingadb/db.go b/pkg/icingadb/db.go index 9bb480ba2..4c575e3ba 100644 --- a/pkg/icingadb/db.go +++ b/pkg/icingadb/db.go @@ -13,6 +13,7 @@ import ( "github.com/icinga/icingadb/pkg/utils" "github.com/jmoiron/sqlx" "github.com/pkg/errors" + "go.uber.org/zap" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "reflect" @@ -346,7 +347,7 @@ func (db *DB) BulkExec( }, retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), - retry.Settings{}, + db.getDefaultRetrySettings(), ) } }(b)) @@ -411,7 +412,7 @@ func (db *DB) NamedBulkExec( }, retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), - retry.Settings{}, + db.getDefaultRetrySettings(), ) } }(b)) @@ -484,7 +485,7 @@ func (db *DB) NamedBulkExecTx( }, retry.Retryable, backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second), - retry.Settings{}, + db.getDefaultRetrySettings(), ) } }(b)) @@ -670,6 +671,25 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted { } } +func (db *DB) getDefaultRetrySettings() retry.Settings { + return retry.Settings{ + Timeout: retry.DefaultTimeout, + OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) { + if lastErr == nil || err.Error() != lastErr.Error() { + db.logger.Warnw("Can't execute query. Retrying", zap.Error(err)) + } + }, + OnSuccess: func(elapsed time.Duration, attempt uint64, lastErr error) { + if attempt > 1 { + db.logger.Infow("Query retried successfully after error", + zap.Duration("after", elapsed), + zap.Uint64("attempts", attempt), + zap.NamedError("recovered_error", lastErr)) + } + }, + } +} + func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper { return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) { if count := counter.Reset(); count > 0 { diff --git a/pkg/icingadb/driver.go b/pkg/icingadb/driver.go index e2712ca83..ac5af7e1d 100644 --- a/pkg/icingadb/driver.go +++ b/pkg/icingadb/driver.go @@ -55,8 +55,8 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { shouldRetry, backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1), retry.Settings{ - Timeout: time.Minute * 5, - OnError: func(_ time.Duration, _ uint64, err, lastErr error) { + Timeout: retry.DefaultTimeout, + OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) { telemetry.UpdateCurrentDbConnErr(err) if lastErr == nil || err.Error() != lastErr.Error() { @@ -66,9 +66,9 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) { OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) { telemetry.UpdateCurrentDbConnErr(nil) - if attempt > 0 { + if attempt > 1 { c.logger.Infow("Reconnected to database", - zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1)) + zap.Duration("after", elapsed), zap.Uint64("attempts", attempt)) } }, }, diff --git a/pkg/icingadb/ha.go b/pkg/icingadb/ha.go index 7e821c76a..68a2a68df 100644 --- a/pkg/icingadb/ha.go +++ b/pkg/icingadb/ha.go @@ -148,8 +148,19 @@ func (h *HA) controller() { defer routineLogTicker.Stop() shouldLogRoutineEvents := true + // The retry logic in HA is twofold: + // + // 1) Updating or inserting the instance row based on the current heartbeat must be done within the heartbeat's + // expiration time. Therefore, we use a deadline ctx to retry.WithBackoff() in realize() which expires earlier + // than our default timeout. + // 2) Since we do not want to exit before our default timeout expires, we have to repeat step 1 until it does. + retryTimeout := time.NewTimer(retry.DefaultTimeout) + defer retryTimeout.Stop() + for { select { + case <-retryTimeout.C: + h.abort(errors.New("retry deadline exceeded")) case m := <-h.heartbeat.Events(): if m != nil { now := time.Now() @@ -163,8 +174,13 @@ func (h *HA) controller() { } if tt.Before(now.Add(-1 * peerTimeout)) { h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt)) + h.signalHandover("received heartbeat from the past") h.realizeLostHeartbeat() + + // Reset retry timeout so that the next iterations have the full amount of time available again. + retry.ResetTimeout(retryTimeout, retry.DefaultTimeout) + continue } s, err := m.Stats().IcingaStatus() @@ -200,17 +216,17 @@ func (h *HA) controller() { default: } - var realizeCtx context.Context - var cancelRealizeCtx context.CancelFunc - if h.responsible { - realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime()) - } else { - realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx) - } + // Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time. + realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime()) err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents) cancelRealizeCtx() if errors.Is(err, context.DeadlineExceeded) { - h.signalHandover("context deadline exceeded") + h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time") + + // Instance insert/update was not completed by the expiration time of the current heartbeat. + // Pass control back to the loop to try again with the next heartbeat, + // or exit the loop when the retry timeout has expired. Therefore, + // retry timeout is **not** reset here so that retries continue until the timeout has expired. continue } if err != nil { @@ -228,6 +244,14 @@ func (h *HA) controller() { h.signalHandover("lost heartbeat") h.realizeLostHeartbeat() } + + // Reset retry timeout so that the next iterations have the full amount of time available again. + // Don't be surprised by the location of the code, + // as it is obvious that the timer is also reset after an error that ends the loop anyway. + // But this is the best place to catch all scenarios where the timeout needs to be reset. + // And since HA needs quite a bit of refactoring anyway to e.g. return immediately after calling h.abort(), + // it's fine to have it here for now. + retry.ResetTimeout(retryTimeout, retry.DefaultTimeout) case <-h.heartbeat.Done(): if err := h.heartbeat.Err(); err != nil { h.abort(err) @@ -253,6 +277,10 @@ func (h *HA) realize( otherResponsible bool ) + if _, ok := ctx.Deadline(); !ok { + panic("can't use context w/o deadline in realize()") + } + err := retry.WithBackoff( ctx, func(ctx context.Context) error { @@ -358,14 +386,31 @@ func (h *HA) realize( retry.Retryable, backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3), retry.Settings{ - OnError: func(_ time.Duration, attempt uint64, err, lastErr error) { + // Intentionally no timeout is set, as we use a context with a deadline. + OnRetryableError: func(_ time.Duration, attempt uint64, err, lastErr error) { if lastErr == nil || err.Error() != lastErr.Error() { log := h.logger.Debugw - if attempt > 2 { + if attempt > 3 { + log = h.logger.Infow + } + + log("Can't update or insert instance. Retrying", zap.Error(err)) + } + }, + OnSuccess: func(elapsed time.Duration, attempt uint64, lastErr error) { + if attempt > 1 { + log := h.logger.Debugw + + if attempt > 4 { + // We log errors with severity info starting from the fourth attempt, (see above) + // so we need to log success with severity info from the fifth attempt. log = h.logger.Infow } - log("Can't update or insert instance. Retrying", zap.Error(err), zap.Uint64("retry count", attempt)) + log("Instance updated/inserted successfully after error", + zap.Duration("after", elapsed), + zap.Uint64("attempts", attempt), + zap.NamedError("recovered_error", lastErr)) } }, }, diff --git a/pkg/retry/retry.go b/pkg/retry/retry.go index 32d3a615e..fd17eefc4 100644 --- a/pkg/retry/retry.go +++ b/pkg/retry/retry.go @@ -8,11 +8,13 @@ import ( "github.com/lib/pq" "github.com/pkg/errors" "net" - "strings" "syscall" "time" ) +// DefaultTimeout is our opinionated default timeout for retrying database and Redis operations. +const DefaultTimeout = 5 * time.Minute + // RetryableFunc is a retryable function. type RetryableFunc func(context.Context) error @@ -21,10 +23,15 @@ type IsRetryable func(error) bool // Settings aggregates optional settings for WithBackoff. type Settings struct { - // Timeout lets WithBackoff give up once elapsed (if >0). + // If >0, Timeout lets WithBackoff stop retrying gracefully once elapsed based on the following criteria: + // * If the execution of RetryableFunc has taken longer than Timeout, no further attempts are made. + // * If Timeout elapses during the sleep phase between retries, one final retry is attempted. + // * RetryableFunc is always granted its full execution time and is not canceled if it exceeds Timeout. + // This means that WithBackoff may not stop exactly after Timeout expires, + // or may not retry at all if the first execution of RetryableFunc already takes longer than Timeout. Timeout time.Duration - // OnError is called if an error occurs. - OnError func(elapsed time.Duration, attempt uint64, err, lastErr error) + // OnRetryableError is called if a retryable error occurs. + OnRetryableError func(elapsed time.Duration, attempt uint64, err, lastErr error) // OnSuccess is called once the operation succeeds. OnSuccess func(elapsed time.Duration, attempt uint64, lastErr error) } @@ -34,16 +41,19 @@ type Settings struct { func WithBackoff( ctx context.Context, retryableFunc RetryableFunc, retryable IsRetryable, b backoff.Backoff, settings Settings, ) (err error) { - parentCtx := ctx + // Channel for retry deadline, which is set to the channel of NewTimer() if a timeout is configured, + // otherwise nil, so that it blocks forever if there is no timeout. + var timeout <-chan time.Time if settings.Timeout > 0 { - var cancelCtx context.CancelFunc - ctx, cancelCtx = context.WithTimeout(ctx, settings.Timeout) - defer cancelCtx() + t := time.NewTimer(settings.Timeout) + defer t.Stop() + timeout = t.C } start := time.Now() - for attempt := uint64(0); ; /* true */ attempt++ { + timedOut := false + for attempt := uint64(1); ; /* true */ attempt++ { prevErr := err if err = retryableFunc(ctx); err == nil { @@ -54,43 +64,72 @@ func WithBackoff( return } - if settings.OnError != nil { - settings.OnError(time.Since(start), attempt, err, prevErr) + // Retryable function may have exited prematurely due to context errors. + // We explicitly check the context error here, as the error returned by the retryable function can pass the + // error.Is() checks even though it is not a real context error, e.g. + // https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/net/net.go;l=422 + // https://cs.opensource.google/go/go/+/refs/tags/go1.22.2:src/net/net.go;l=601 + if errors.Is(ctx.Err(), context.DeadlineExceeded) || errors.Is(ctx.Err(), context.Canceled) { + if prevErr != nil { + err = errors.Wrap(err, prevErr.Error()) + } + + return } - isRetryable := retryable(err) + if !retryable(err) { + err = errors.Wrap(err, "can't retry") - if prevErr != nil && (errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled)) { - err = prevErr + return } - if !isRetryable { - err = errors.Wrap(err, "can't retry") + select { + case <-timeout: + // Stop retrying immediately if executing the retryable function took longer than the timeout. + timedOut = true + default: + } + + if timedOut { + err = errors.Wrap(err, "retry deadline exceeded") return } - sleep := b(attempt) + if settings.OnRetryableError != nil { + settings.OnRetryableError(time.Since(start), attempt, err, prevErr) + } + select { + case <-time.After(b(attempt)): + case <-timeout: + // Do not stop retrying immediately, but start one last attempt to mitigate timing issues where + // the timeout expires while waiting for the next attempt and + // therefore no retries have happened during this possibly long period. + timedOut = true case <-ctx.Done(): - if outerErr := parentCtx.Err(); outerErr != nil { - err = errors.Wrap(outerErr, "outer context canceled") - } else { - if err == nil { - err = ctx.Err() - } - err = errors.Wrap(err, "can't retry") - } + err = errors.Wrap(ctx.Err(), err.Error()) return - case <-time.After(sleep): } } } +// ResetTimeout changes the possibly expired timer t to expire after duration d. +// +// If the timer has already expired and nothing has been received from its channel, +// it is automatically drained as if the timer had never expired. +func ResetTimeout(t *time.Timer, d time.Duration) { + if !t.Stop() { + <-t.C + } + + t.Reset(d) +} + // Retryable returns true for common errors that are considered retryable, // i.e. temporary, timeout, DNS, connection refused and reset, host down and unreachable and -// network down and unreachable errors. +// network down and unreachable errors. In addition, any database error is considered retryable. func Retryable(err error) bool { var temporary interface { Temporary() bool @@ -144,43 +183,10 @@ func Retryable(err error) bool { return true } - var e *mysql.MySQLError - if errors.As(err, &e) { - switch e.Number { - case 1053, 1205, 1213, 2006: - // 1053: Server shutdown in progress - // 1205: Lock wait timeout - // 1213: Deadlock found when trying to get lock - // 2006: MySQL server has gone away - return true - default: - return false - } - } - - var pe *pq.Error - if errors.As(err, &pe) { - switch pe.Code { - case "08000", // connection_exception - "08006", // connection_failure - "08001", // sqlclient_unable_to_establish_sqlconnection - "08004", // sqlserver_rejected_establishment_of_sqlconnection - "40001", // serialization_failure - "40P01", // deadlock_detected - "54000", // program_limit_exceeded - "55006", // object_in_use - "55P03", // lock_not_available - "57P01", // admin_shutdown - "57P02", // crash_shutdown - "57P03", // cannot_connect_now - "58000", // system_error - "58030", // io_error - "XX000": // internal_error - return true - default: - // Class 53 - Insufficient Resources - return strings.HasPrefix(string(pe.Code), "53") - } + var mye *mysql.MySQLError + var pqe *pq.Error + if errors.As(err, &mye) || errors.As(err, &pqe) { + return true } return false