Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve failover mechanism #386

Closed
wants to merge 26 commits into from
Closed
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ead7df2
set log level of some messages to info to have more visibility of the…
Nov 14, 2024
c3c2435
try refreshing lock TTL well before TTL expiration
Nov 14, 2024
72f31af
adjust the retry strategy for more sanity when trying to obtain the l…
Nov 14, 2024
995a3fe
try immediate lock reacquisition if lock refresh failed
Nov 14, 2024
da7b3a3
add retry backoff to locker.Obtain() to avoid a retry loop; rename va…
Nov 14, 2024
a8c6a30
add nil check before trying to refresh a lock
Nov 14, 2024
80ff4b2
rename ticker t to refresh
Nov 14, 2024
8ecf3e0
improvements for readability
Nov 14, 2024
06c105a
improve backoff interval and comment
Nov 14, 2024
10aecd4
improve/add comments; implement safeguard when lock.TTL() returns an …
Nov 15, 2024
3b13d0a
decrease log level for most logs to debug; enforce minimum refresh ti…
Nov 15, 2024
a1eba4a
DEBUG: add Stefan to log messages for testing
Nov 15, 2024
4ba1382
replace *redislock.Client by an interface, that is, LockClient to all…
Nov 15, 2024
a35d417
create mocks for redislock.Client and redislock.Lock
Nov 15, 2024
75e6f63
remove m.Called() from Obtain()
Nov 17, 2024
419751c
add some more log lines
Nov 18, 2024
a033516
add some more log lines
Nov 18, 2024
d9b2a29
add some more log lines
Nov 19, 2024
7eef210
add some more log lines
Nov 19, 2024
fc86408
call lock.TTL() with a timeout context
Nov 19, 2024
e2f8b1a
pass redis client instead of redislock client to NewActivePassive()
Nov 19, 2024
ac90f64
delete lock and redislock mocks
Nov 19, 2024
7f0039e
check for lock==nil before calling lock.TTL()
Nov 19, 2024
528e9e4
do lock operations only if Redis is reachable
Nov 20, 2024
333e931
add check for lock key existence to prevent interface {} conversion e…
Nov 20, 2024
20cf4bd
recover from panic in lock.TTL() and become undefined in this case
Nov 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 156 additions & 53 deletions maintenance/failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package failover
import (
"context"
"fmt"
"github.com/rs/zerolog"
"net/http"
"strings"
"sync"
Expand All @@ -18,8 +19,6 @@ import (
"github.com/redis/go-redis/v9"
)

const waitRetry = time.Millisecond * 500

type status int

const (
Expand All @@ -30,21 +29,19 @@ const (

const Label = "github.com.pace.bricks.activepassive"

// ActivePassive implements a failover mechanism that allows
// ActivePassive implements a fail over mechanism that allows
// to deploy a service multiple times but ony one will accept
// traffic by using the label selector of kubernetes.
// In order to determine the active, a lock needs to be hold
// in redis. Hocks can be passed to handle the case of becoming
// in redis. Hooks can be passed to handle the case of becoming
// the active or passive.
// The readiness probe will report the state (ACTIVE/PASSIVE)
// of each of the members in the cluster.
type ActivePassive struct {
// OnActive will be called in case the current processes
// is elected to be the active one
// OnActive will be called in case the current process is elected to be the active one
OnActive func()

// OnPassive will be called in case the current process is
// the passive one
// OnPassive will be called in case the current process is the passive one
OnPassive func()

// OnStop is called after the ActivePassive process stops
Expand All @@ -54,43 +51,51 @@ type ActivePassive struct {
clusterName string
timeToFailover time.Duration
locker *redislock.Client
redisClient *redis.Client

// access to the kubernetes api
client *k8sapi.Client
k8sClient *k8sapi.Client

// current status of the failover (to show it in the readiness status)
// current status of the fail over (to show it in the readiness status)
state status
stateMu sync.RWMutex
}

// NewActivePassive creates a new active passive cluster
// identified by the name, the time to failover determines
// the frequency of checks performed against the redis to
// identified by the name. The time to fail over determines
// the frequency of checks performed against redis to
// keep the active state.
// NOTE: creating multiple ActivePassive in one processes
// NOTE: creating multiple ActivePassive in one process
// is not working correctly as there is only one readiness
// probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) {
cl, err := k8sapi.NewClient()
func NewActivePassive(clusterName string, timeToFailover time.Duration, redisClient *redis.Client) (*ActivePassive, error) {
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to initialize the kubernets client: %w", err)
}

if redisClient == nil {
return nil, fmt.Errorf("redis client is not initialized")
}

ap := &ActivePassive{
clusterName: clusterName,
timeToFailover: timeToFailover,
locker: redislock.New(client),
client: cl,
locker: redislock.New(redisClient),
redisClient: redisClient,
k8sClient: k8sClient,
}

health.SetCustomReadinessCheck(ap.Handler)

return ap, nil
}

// Run registers the readiness probe and calls the OnActive
// and OnPassive callbacks in case the election toke place.
// Will handle panic safely and therefore can be directly called
// with go.
// Run manages distributed lock-based leadership.
// This method is designed to continually monitor and maintain the leadership status of the calling pod,
// ensuring only one active instance holds the lock at a time, while transitioning other instances to passive
// mode. The handler will try to renew its active status by refreshing the lock periodically, and attempt
// reacquisition on failure, avoiding potential race conditions for leadership.
func (a *ActivePassive) Run(ctx context.Context) error {
defer errors.HandleWithCtx(ctx, "activepassive failover handler")

Expand All @@ -101,7 +106,7 @@ func (a *ActivePassive) Run(ctx context.Context) error {
a.close = make(chan struct{})
defer close(a.close)

// trigger stop handler
// Trigger stop handler
defer func() {
if a.OnStop != nil {
a.OnStop()
Expand All @@ -110,68 +115,130 @@ func (a *ActivePassive) Run(ctx context.Context) error {

var lock *redislock.Lock

// t is a ticker that reminds to call refresh if
// the token was acquired after half of the remaining ttl time
t := time.NewTicker(a.timeToFailover)
// Ticker to refresh the lock's TTL before it expires
refreshInterval := a.timeToFailover / 2
refresh := time.NewTicker(refreshInterval)
logger.Debug().Msgf("Stefan: refresh interval: %v s", refreshInterval)

// retry time triggers to check if the look needs to be acquired
retry := time.NewTicker(waitRetry)
// Ticker to check if the lock can be acquired if in passive or undefined state
retryInterval := a.timeToFailover / 3
retry := time.NewTicker(retryInterval)
logger.Debug().Msgf("Stefan: retry interval: %v s", retryInterval)

for {
// allow close or cancel
// Allow close or cancel
select {
case <-ctx.Done():
logger.Warn().Err(ctx.Err()).Msg("Stefan: context canceled; exiting Run()")
return ctx.Err()
case <-a.close:
logger.Warn().Err(ctx.Err()).Msg("Stefan: closed; exiting Run()")
return nil
case <-t.C:
if a.getState() == ACTIVE {
case <-refresh.C:
logger.Debug().Msgf("Stefan: tick from refresh; state: %v", a.getState())

if a.getState() == ACTIVE && lock != nil {
if !a.isRedisOnline(ctx) {
logger.Debug().Msg("Stefan: redis is offline; skipping lock.Refresh() attempt")
continue
}

err := lock.Refresh(ctx, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(refreshInterval/5), 2),
})
if err != nil {
logger.Debug().Err(err).Msg("failed to refresh")
a.becomeUndefined(ctx)
logger.Warn().Err(err).Msg("Stefan: failed to refresh the redis lock; attempting to reacquire lock")

// Attempt to reacquire the lock immediately with short and limited retries
var errReacquire error

lock, errReacquire = a.locker.Obtain(ctx, lockName, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(100*time.Millisecond), 2),
})
if errReacquire == nil {
// Successfully reacquired the lock, remain active
logger.Debug().Msg("Stefan: redis lock reacquired after refresh failure; remaining active")
a.becomeActive(ctx)
refresh.Reset(refreshInterval)
} else {
// We were active but couldn't refresh the lock TTL and reacquire the lock, so, become undefined
logger.Debug().Err(err).Msg("Stefan: failed to reacquire the redis lock; becoming undefined")
a.becomeUndefined(ctx)
}
} else {
logger.Debug().Err(err).Msg("Stefan: redis lock refreshed")
}
}
case <-retry.C:
// try to acquire the lock, as we are not the active
logger.Debug().Msgf("Stefan: tick from retry; state: %v", a.getState())

// Try to acquire the lock as we are not active
if a.getState() != ACTIVE {
logger.Debug().Msgf("Stefan: in retry: trying to acquire the lock...")

var err error

if !a.isRedisOnline(ctx) {
logger.Debug().Msg("Stefan: redis is offline; skipping locker.Obtain() attempt")
continue
}

lock, err = a.locker.Obtain(ctx, lockName, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(retryInterval/2), 2),
})
if err != nil {
// we became passive, trigger callback
logger.Debug().Err(err).Msgf("Stefan: failed to obtain the redis lock; current state: %v", a.getState())

// couldn't obtain the lock; becoming passive
if a.getState() != PASSIVE {
logger.Debug().Err(err).Msg("becoming passive")
logger.Debug().Err(err).Msg("Stefan: couldn't obtain the redis lock; becoming passive")
a.becomePassive(ctx)
}

continue
}

// lock acquired
logger.Debug().Msg("becoming active")
// Lock acquired, transitioning to active
logger.Debug().Msg("Stefan: redis lock acquired; becoming active")
a.becomeActive(ctx)

// we are active, renew if required
d, err := lock.TTL(ctx)
if err != nil {
logger.Debug().Err(err).Msg("failed to get TTL")
logger.Debug().Msg("Stefan: became active")

if !a.isRedisOnline(ctx) || !a.isLockKeyPresent(ctx, lockName) {
logger.Debug().Msg("Stefan: redis is offline or the lock key does not exist; becoming undefined")
a.becomeUndefined(ctx)
continue
}
if d == 0 {
// TTL seems to be expired, retry to get lock or become
// passive in next iteration
logger.Debug().Msg("ttl expired")

// Check TTL of the newly acquired lock and adjust refresh timer
ttl, err := safeGetTTL(ctx, lock, logger)
if err != nil {
// If trying to get the TTL from the lock fails we become undefined and retry acquisition at the next tick.
logger.Debug().Err(err).Msg("Stefan: failed to get TTL from redis lock; becoming undefined")
a.becomeUndefined(ctx)
continue
}
refreshTime := d / 2

logger.Debug().Msgf("set refresh to %v", refreshTime)
logger.Debug().Msgf("Stefan: got TTL from redis lock: %v", ttl.Abs())

if ttl == 0 {
// Since the lock is very fresh with a TTL well > 0 this case is just a safeguard against rare occasions.
logger.Debug().Msg("Stefan: redis lock TTL has expired; becoming undefined")
a.becomeUndefined(ctx)
} else {
logger.Debug().Msg("Stefan: redis lock TTL > 0")

// Enforce a minimum refresh time
minRefreshTime := 2 * time.Second
refreshTime := ttl / 2
if refreshTime < minRefreshTime {
logger.Warn().Msgf("Stefan: calculated refresh time %v is below minimum threshold; using %v instead", refreshTime, minRefreshTime)
refreshTime = minRefreshTime
}

// set to trigger refresh after TTL / 2
t.Reset(refreshTime)
logger.Debug().Msgf("Stefan: redis lock TTL is still valid; set refresh time to %v ms", refreshTime)
refresh.Reset(refreshTime)
}
}
}
}
Expand Down Expand Up @@ -222,7 +289,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) {

// setState returns true if the state was set successfully
func (a *ActivePassive) setState(ctx context.Context, state status) bool {
err := a.client.SetCurrentPodLabel(ctx, Label, a.label(state))
err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state))
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined")
a.stateMu.Lock()
Expand All @@ -242,3 +309,39 @@ func (a *ActivePassive) getState() status {
a.stateMu.RUnlock()
return state
}

// isRedisOnline pings the Redis client to check its availability. We want to do lock operations
// only if Redis is reachable.
func (a *ActivePassive) isRedisOnline(ctx context.Context) bool {
err := a.redisClient.Ping(ctx).Err()
if err != nil {
log.Ctx(ctx).Warn().Err(err).Msg("Stefan: redis is offline or unreachable")
return false
}

return true
}

// isLockKeyPresent returns true if the lock's key exists in Redis.
func (a *ActivePassive) isLockKeyPresent(ctx context.Context, lockKey string) bool {
cmd := a.redisClient.Exists(ctx, lockKey)
return cmd.Val() > 0
}

// safeGetTTL tries to get the TTL from the provided redislock and recovers from a panic inside TTL().
func safeGetTTL(ctx context.Context, lock *redislock.Lock, logger zerolog.Logger) (time.Duration, error) {
var (
err error
ttl time.Duration
)

defer func() {
if r := recover(); r != nil {
logger.Error().Msgf("Recovered from panic in lock.TTL(): %v", r)
err = fmt.Errorf("panic during lock.TTL(): %v", r)
}
}()

ttl, err = lock.TTL(ctx)
return ttl, err
}
Loading