Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix failover #396

Merged
merged 5 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions backend/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,15 +193,14 @@ func (lt *logtracer) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
cmdErr := cmd.Err()
if cmdErr != nil {
vals.span.SetData("error", cmdErr)
le = le.Err(cmdErr)
le.Err(cmdErr).Msg("failed to execute Redis command")
paceRedisCmdFailed.With(prometheus.Labels{
"method": cmd.Name(),
}).Inc()
}

// do log statement
dur := float64(time.Since(vals.startedAt)) / float64(time.Millisecond)
le.Float64("duration", dur).Msg("Redis query")

paceRedisCmdDurationSeconds.With(prometheus.Labels{
"method": cmd.Name(),
Expand Down
86 changes: 31 additions & 55 deletions maintenance/failover/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import (
"github.com/redis/go-redis/v9"
)

const waitRetry = time.Millisecond * 500

type status int

const (
Expand All @@ -34,17 +32,15 @@ const Label = "github.com.pace.bricks.activepassive"
// to deploy a service multiple times but ony one will accept
// traffic by using the label selector of kubernetes.
// In order to determine the active, a lock needs to be hold
// in redis. Hocks can be passed to handle the case of becoming
// in redis. Hooks can be passed to handle the case of becoming
// the active or passive.
// The readiness probe will report the state (ACTIVE/PASSIVE)
// of each of the members in the cluster.
type ActivePassive struct {
// OnActive will be called in case the current processes
// is elected to be the active one
// OnActive will be called in case the current processes is elected to be the active one
OnActive func(ctx context.Context)

// OnPassive will be called in case the current process is
// the passive one
// OnPassive will be called in case the current process is the passive one
OnPassive func(ctx context.Context)

// OnStop is called after the ActivePassive process stops
Expand All @@ -56,41 +52,41 @@ type ActivePassive struct {
locker *redislock.Client

// access to the kubernetes api
client *k8sapi.Client
k8sClient *k8sapi.Client

// current status of the failover (to show it in the readiness status)
state status
stateMu sync.RWMutex
}

// NewActivePassive creates a new active passive cluster
// identified by the name, the time to failover determines
// the frequency of checks performed against the redis to
// identified by the name. The time to fail over determines
// the frequency of checks performed against redis to
// keep the active state.
// NOTE: creating multiple ActivePassive in one processes
// is not working correctly as there is only one readiness
// probe.
// NOTE: creating multiple ActivePassive in one process
// is not working correctly as there is only one readiness probe.
func NewActivePassive(clusterName string, timeToFailover time.Duration, client *redis.Client) (*ActivePassive, error) {
cl, err := k8sapi.NewClient()
k8sClient, err := k8sapi.NewClient()
if err != nil {
return nil, err
}

ap := &ActivePassive{
activePassive := &ActivePassive{
clusterName: clusterName,
timeToFailover: timeToFailover,
locker: redislock.New(client),
client: cl,
k8sClient: k8sClient,
}
health.SetCustomReadinessCheck(ap.Handler)

return ap, nil
health.SetCustomReadinessCheck(activePassive.Handler)

return activePassive, nil
}

// Run registers the readiness probe and calls the OnActive
// and OnPassive callbacks in case the election toke place.
// Will handle panic safely and therefore can be directly called
// with go.
// Run manages distributed lock-based leadership.
// This method is designed to continually monitor and maintain the leadership status of the calling pod,
// ensuring only one active instance holds the lock at a time, while transitioning other instances to passive
// mode. The handler will try to renew its active status by refreshing the lock periodically.
func (a *ActivePassive) Run(ctx context.Context) error {
defer errors.HandleWithCtx(ctx, "activepassive failover handler")

Expand All @@ -101,7 +97,6 @@ func (a *ActivePassive) Run(ctx context.Context) error {
a.close = make(chan struct{})
defer close(a.close)

// trigger stop handler
defer func() {
if a.OnStop != nil {
a.OnStop()
Expand All @@ -110,68 +105,49 @@ func (a *ActivePassive) Run(ctx context.Context) error {

var lock *redislock.Lock

// t is a ticker that reminds to call refresh if
// the token was acquired after half of the remaining ttl time
t := time.NewTicker(a.timeToFailover)
// Ticker to try to refresh the lock's TTL before it expires
tryRefreshLock := time.NewTicker(a.timeToFailover)

// retry time triggers to check if the look needs to be acquired
retry := time.NewTicker(waitRetry)
// Ticker to try to acquire the lock if in passive or undefined state
tryAcquireLock := time.NewTicker(500 * time.Millisecond)

for {
// allow close or cancel
select {
case <-ctx.Done():
return ctx.Err()
case <-a.close:
return nil
case <-t.C:
case <-tryRefreshLock.C:
if a.getState() == ACTIVE {
err := lock.Refresh(ctx, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
})
if err != nil {
logger.Debug().Err(err).Msg("failed to refresh")
logger.Info().Err(err).Msg("failed to refresh the lock; becoming undefined...")
a.becomeUndefined(ctx)
}
}
case <-retry.C:
// try to acquire the lock, as we are not the active
case <-tryAcquireLock.C:
if a.getState() != ACTIVE {
var err error

lock, err = a.locker.Obtain(ctx, lockName, a.timeToFailover, &redislock.Options{
RetryStrategy: redislock.LimitRetry(redislock.LinearBackoff(a.timeToFailover/3), 3),
})
if err != nil {
// we became passive, trigger callback
if a.getState() != PASSIVE {
logger.Debug().Err(err).Msg("becoming passive")
logger.Info().Err(err).Msg("failed to obtain the lock; becoming passive...")
a.becomePassive(ctx)
}

continue
}

// lock acquired
logger.Debug().Msg("becoming active")
logger.Debug().Msg("lock acquired; becoming active...")
a.becomeActive(ctx)

// we are active, renew if required
d, err := lock.TTL(ctx)
if err != nil {
logger.Debug().Err(err).Msg("failed to get TTL")
}
if d == 0 {
// TTL seems to be expired, retry to get lock or become
// passive in next iteration
logger.Debug().Msg("ttl expired")
a.becomeUndefined(ctx)
}
refreshTime := d / 2

logger.Debug().Msgf("set refresh to %v", refreshTime)

// set to trigger refresh after TTL / 2
t.Reset(refreshTime)
// Reset the refresh ticker to half of the time to failover
tryRefreshLock.Reset(a.timeToFailover / 2)
arnold-iakab marked this conversation as resolved.
Show resolved Hide resolved
arnold-iakab marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -222,7 +198,7 @@ func (a *ActivePassive) becomeUndefined(ctx context.Context) {

// setState returns true if the state was set successfully
func (a *ActivePassive) setState(ctx context.Context, state status) bool {
err := a.client.SetCurrentPodLabel(ctx, Label, a.label(state))
err := a.k8sClient.SetCurrentPodLabel(ctx, Label, a.label(state))
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("failed to mark pod as undefined")
a.stateMu.Lock()
Expand Down
Loading