Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry **every** database error #698

Merged
merged 15 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/config/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,16 @@ func dialWithLogging(dialer ctxDialerFunc, logger *logging.Logger) ctxDialerFunc
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
retry.Settings{
Timeout: 5 * time.Minute,
OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
Timeout: retry.DefaultTimeout,
OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
logger.Warnw("Can't connect to Redis. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
if attempt > 0 {
if attempt > 1 {
logger.Infow("Reconnected to Redis",
zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))
zap.Duration("after", elapsed), zap.Uint64("attempts", attempt))
}
},
},
Expand Down
42 changes: 29 additions & 13 deletions pkg/icingadb/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"github.com/icinga/icingadb/internal"
"github.com/icinga/icingadb/pkg/backoff"
"github.com/icinga/icingadb/pkg/com"
"github.com/icinga/icingadb/pkg/retry"
"github.com/icinga/icingadb/pkg/types"
"time"
)
Expand Down Expand Up @@ -40,32 +42,46 @@ func (db *DB) CleanupOlderThan(
count uint64, olderThan time.Time, onSuccess ...OnSuccess[struct{}],
) (uint64, error) {
var counter com.Counter
defer db.log(ctx, stmt.Build(db.DriverName(), 0), &counter).Stop()

q := db.Rebind(stmt.Build(db.DriverName(), count))

defer db.log(ctx, q, &counter).Stop()

for {
q := db.Rebind(stmt.Build(db.DriverName(), count))
rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
EnvironmentId: envId,
Time: types.UnixMilli(olderThan),
})
if err != nil {
return 0, internal.CantPerformQuery(err, q)
}
var rowsDeleted int64

err := retry.WithBackoff(
ctx,
func(ctx context.Context) error {
rs, err := db.NamedExecContext(ctx, q, cleanupWhere{
EnvironmentId: envId,
Time: types.UnixMilli(olderThan),
})
if err != nil {
return internal.CantPerformQuery(err, q)
}

rowsDeleted, err = rs.RowsAffected()

n, err := rs.RowsAffected()
return err
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
db.getDefaultRetrySettings(),
)
if err != nil {
return 0, err
}

counter.Add(uint64(n))
counter.Add(uint64(rowsDeleted))

for _, onSuccess := range onSuccess {
if err := onSuccess(ctx, make([]struct{}, n)); err != nil {
if err := onSuccess(ctx, make([]struct{}, rowsDeleted)); err != nil {
return 0, err
}
}

if n < int64(count) {
if rowsDeleted < int64(count) {
break
}
}
Expand Down
26 changes: 23 additions & 3 deletions pkg/icingadb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/icinga/icingadb/pkg/utils"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"reflect"
Expand Down Expand Up @@ -346,7 +347,7 @@ func (db *DB) BulkExec(
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
retry.Settings{},
db.getDefaultRetrySettings(),
)
}
}(b))
Expand Down Expand Up @@ -411,7 +412,7 @@ func (db *DB) NamedBulkExec(
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
retry.Settings{},
db.getDefaultRetrySettings(),
)
}
}(b))
Expand Down Expand Up @@ -484,7 +485,7 @@ func (db *DB) NamedBulkExecTx(
},
retry.Retryable,
backoff.NewExponentialWithJitter(1*time.Millisecond, 1*time.Second),
retry.Settings{},
db.getDefaultRetrySettings(),
)
}
}(b))
Expand Down Expand Up @@ -670,6 +671,25 @@ func (db *DB) GetSemaphoreForTable(table string) *semaphore.Weighted {
}
}

func (db *DB) getDefaultRetrySettings() retry.Settings {
return retry.Settings{
Timeout: retry.DefaultTimeout,
OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
db.logger.Warnw("Can't execute query. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, lastErr error) {
if attempt > 1 {
db.logger.Infow("Query retried successfully after error",
zap.Duration("after", elapsed),
zap.Uint64("attempts", attempt),
zap.NamedError("recovered_error", lastErr))
}
},
}
}

func (db *DB) log(ctx context.Context, query string, counter *com.Counter) periodic.Stopper {
return periodic.Start(ctx, db.logger.Interval(), func(tick periodic.Tick) {
if count := counter.Reset(); count > 0 {
Expand Down
8 changes: 4 additions & 4 deletions pkg/icingadb/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
shouldRetry,
backoff.NewExponentialWithJitter(time.Millisecond*128, time.Minute*1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the nesting of the retry.WithBackoff() within those in other places that execute SQL queries and these backoff settings, some strange effects can be observed: If you stop your database for something like 4m30s, you may see some attempts recovering and others - quite some time after that - failing fatally because those exceeded the 5 minutes. I believe this happens to the maximal backoff of a minute given here, i.e. there may be a minute without an attempt and after that, both the inner and outer 5 minute timeout are exceeded.

Also, if we're adding retry.WithBackoff(), do we even still need it here? Just for fun, I removed all the retry from this function (so that it just does Connect() and initConn() with no loops or anything around it) and it seemed to work more reliably, only the logging became very annoying with queries and stacktraces within a single line.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The following changes address your considerations:

HA now uses a Timer instead of a Ticker that is reset properly even if expired (and not yet drained): 81085c0.

If the timeout expires during the sleep phase between attempts one final retry attempt will be made: 3a3baaf.

only the logging became very annoying with queries and stacktraces within a single line

I have something in my stash which fixes this. zap adds the errorVerbose field if the error implements fmt.Formatter which is true for all our wrapped errors. Simple solution would be to replace zap.Error() calls with a custom implementation that returns a "silent" error, e.g.

package logging

...

type stackTracer interface {
	StackTrace() errors.StackTrace
}

type errNoStackTrace struct {
	e error
}

func (e errNoStackTrace) Error() string {
	return e.e.Error()
}

func Error(e error) zap.Field {
	if _, ok := e.(stackTracer); ok {
		return zap.Error(errNoStackTrace{e})
	}

	return zap.Error(e)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only the logging became very annoying with queries and stacktraces within a single line

I have something in my stash which fixes this. zap adds the errorVerbose field if the error implements fmt.Formatter which is true for all our wrapped errors. Simple solution would be to replace zap.Error() calls with a custom implementation that returns a "silent" error, e.g.

However, the PR just keeps the nested retrying for now, thereby avoiding this issue.

retry.Settings{
Timeout: time.Minute * 5,
OnError: func(_ time.Duration, _ uint64, err, lastErr error) {
Timeout: retry.DefaultTimeout,
OnRetryableError: func(_ time.Duration, _ uint64, err, lastErr error) {
telemetry.UpdateCurrentDbConnErr(err)

if lastErr == nil || err.Error() != lastErr.Error() {
Expand All @@ -66,9 +66,9 @@ func (c RetryConnector) Connect(ctx context.Context) (driver.Conn, error) {
OnSuccess: func(elapsed time.Duration, attempt uint64, _ error) {
telemetry.UpdateCurrentDbConnErr(nil)

if attempt > 0 {
if attempt > 1 {
c.logger.Infow("Reconnected to database",
zap.Duration("after", elapsed), zap.Uint64("attempts", attempt+1))
zap.Duration("after", elapsed), zap.Uint64("attempts", attempt))
}
},
},
Expand Down
67 changes: 56 additions & 11 deletions pkg/icingadb/ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,19 @@ func (h *HA) controller() {
defer routineLogTicker.Stop()
shouldLogRoutineEvents := true

// The retry logic in HA is twofold:
//
// 1) Updating or inserting the instance row based on the current heartbeat must be done within the heartbeat's
// expiration time. Therefore, we use a deadline ctx to retry.WithBackoff() in realize() which expires earlier
// than our default timeout.
// 2) Since we do not want to exit before our default timeout expires, we have to repeat step 1 until it does.
retryTimeout := time.NewTimer(retry.DefaultTimeout)
defer retryTimeout.Stop()

for {
select {
case <-retryTimeout.C:
h.abort(errors.New("retry deadline exceeded"))
case m := <-h.heartbeat.Events():
if m != nil {
now := time.Now()
Expand All @@ -163,8 +174,13 @@ func (h *HA) controller() {
}
if tt.Before(now.Add(-1 * peerTimeout)) {
h.logger.Errorw("Received heartbeat from the past", zap.Time("time", tt))

h.signalHandover("received heartbeat from the past")
h.realizeLostHeartbeat()

// Reset retry timeout so that the next iterations have the full amount of time available again.
retry.ResetTimeout(retryTimeout, retry.DefaultTimeout)

continue
}
s, err := m.Stats().IcingaStatus()
Expand Down Expand Up @@ -200,17 +216,17 @@ func (h *HA) controller() {
default:
}

var realizeCtx context.Context
var cancelRealizeCtx context.CancelFunc
if h.responsible {
realizeCtx, cancelRealizeCtx = context.WithDeadline(h.ctx, m.ExpiryTime())
} else {
realizeCtx, cancelRealizeCtx = context.WithCancel(h.ctx)
}
// Ensure that updating/inserting the instance row is completed by the current heartbeat's expiry time.
realizeCtx, cancelRealizeCtx := context.WithDeadline(h.ctx, m.ExpiryTime())
err = h.realize(realizeCtx, s, t, envId, shouldLogRoutineEvents)
cancelRealizeCtx()
if errors.Is(err, context.DeadlineExceeded) {
h.signalHandover("context deadline exceeded")
h.signalHandover("instance update/insert deadline exceeded heartbeat expiry time")

// Instance insert/update was not completed by the expiration time of the current heartbeat.
// Pass control back to the loop to try again with the next heartbeat,
// or exit the loop when the retry timeout has expired. Therefore,
// retry timeout is **not** reset here so that retries continue until the timeout has expired.
continue
}
if err != nil {
Expand All @@ -228,6 +244,14 @@ func (h *HA) controller() {
h.signalHandover("lost heartbeat")
h.realizeLostHeartbeat()
}

// Reset retry timeout so that the next iterations have the full amount of time available again.
// Don't be surprised by the location of the code,
// as it is obvious that the timer is also reset after an error that ends the loop anyway.
// But this is the best place to catch all scenarios where the timeout needs to be reset.
// And since HA needs quite a bit of refactoring anyway to e.g. return immediately after calling h.abort(),
// it's fine to have it here for now.
retry.ResetTimeout(retryTimeout, retry.DefaultTimeout)
case <-h.heartbeat.Done():
if err := h.heartbeat.Err(); err != nil {
h.abort(err)
Expand All @@ -253,6 +277,10 @@ func (h *HA) realize(
otherResponsible bool
)

if _, ok := ctx.Deadline(); !ok {
panic("can't use context w/o deadline in realize()")
}

err := retry.WithBackoff(
ctx,
func(ctx context.Context) error {
Expand Down Expand Up @@ -358,14 +386,31 @@ func (h *HA) realize(
retry.Retryable,
backoff.NewExponentialWithJitter(time.Millisecond*256, time.Second*3),
retry.Settings{
OnError: func(_ time.Duration, attempt uint64, err, lastErr error) {
// Intentionally no timeout is set, as we use a context with a deadline.
OnRetryableError: func(_ time.Duration, attempt uint64, err, lastErr error) {
if lastErr == nil || err.Error() != lastErr.Error() {
log := h.logger.Debugw
if attempt > 2 {
if attempt > 3 {
log = h.logger.Infow
}

log("Can't update or insert instance. Retrying", zap.Error(err))
}
},
OnSuccess: func(elapsed time.Duration, attempt uint64, lastErr error) {
if attempt > 1 {
log := h.logger.Debugw

if attempt > 4 {
// We log errors with severity info starting from the fourth attempt, (see above)
// so we need to log success with severity info from the fifth attempt.
log = h.logger.Infow
}

log("Can't update or insert instance. Retrying", zap.Error(err), zap.Uint64("retry count", attempt))
log("Instance updated/inserted successfully after error",
zap.Duration("after", elapsed),
zap.Uint64("attempts", attempt),
zap.NamedError("recovered_error", lastErr))
}
},
},
Expand Down
Loading
Loading