Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize replication lag on a replica before switchover #158

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
c5c36c2
rebase
noname0443 Mar 5, 2025
788925f
Fix linter && reject switchover only if deadline exceeded
noname0443 Feb 13, 2025
2b3623c
Change replica calculation algorithm && add logic for async replica
noname0443 Feb 14, 2025
7e94e18
Add async tests && add options to docker env
noname0443 Feb 14, 2025
cd4bb8b
fix linter
noname0443 Feb 14, 2025
5523024
Refactoring
noname0443 Feb 17, 2025
6b4bfe8
fix
noname0443 Feb 19, 2025
b15335a
fix linter
noname0443 Feb 19, 2025
4bce72f
refactor the code
noname0443 Feb 20, 2025
00188f2
Take in account async replication
noname0443 Feb 21, 2025
7e7ed5e
Add separate option for async optimization
noname0443 Feb 21, 2025
af112dd
fix tests
noname0443 Feb 21, 2025
faf2dac
fix tests
noname0443 Feb 21, 2025
4c3be46
Fix the feature
noname0443 Feb 21, 2025
4d1fb4e
fix tests
noname0443 Feb 21, 2025
dee765b
fix tests
noname0443 Feb 21, 2025
9d56e16
fix tests
noname0443 Feb 24, 2025
eddd2af
oh, let's debug it
noname0443 Feb 24, 2025
9fad6b7
tests
noname0443 Feb 24, 2025
efd6e65
Linter fix
noname0443 Feb 24, 2025
f98ed66
Don't apply optimization when replicas are async
noname0443 Feb 27, 2025
98f1616
Delegate responsability of applying turbo mode to ISwitchHelper
noname0443 Feb 27, 2025
faeb034
fix lint
noname0443 Feb 27, 2025
9616c65
handle network disruption case
noname0443 Mar 5, 2025
50becd5
after rebase aftermath
noname0443 Mar 5, 2025
855d5cd
Forget to remove variable
noname0443 Mar 5, 2025
9601ea8
fix linter
noname0443 Mar 5, 2025
6b0c62d
Merge branch 'master' into add-turbo-mode-before-switchover
teem0n Mar 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 152 additions & 4 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1201,10 +1201,16 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node
}

// and finally enlarge HA-group, if needed
for _, host := range becomeActive {
err := app.enableSemiSyncOnSlave(host, clusterState[host], masterState)
for _, hostname := range becomeActive {
err := app.enableSemiSyncOnSlave(hostname, clusterState[hostname], masterState)
if err != nil {
app.logger.Errorf("failed to enable semi-sync on slave %s: %v", host, err)
app.logger.Errorf("failed to enable semi-sync on slave %s: %v", hostname, err)
}

host := app.cluster.Get(hostname)
err = host.SetDefaultReplicationSettings(masterNode)
if err != nil {
app.logger.Errorf("failed to set default replication settings %s: %v", hostname, err)
}
}
if waitSlaveCount < oldWaitSlaveCount {
Expand Down Expand Up @@ -1304,6 +1310,105 @@ func (app *App) disableSemiSyncIfNonNeeded(node *mysql.Node, state *NodeState) {
}
}

func (app *App) optimizeReplicaWithSmallestLag(
replicas []string,
masterHost string,
optionalDesirableReplica string,
) error {
hostnameToOptimize, err := app.chooseReplicaToOptimize(optionalDesirableReplica, replicas)
if err != nil {
return err
}

replicaToOptimize := app.cluster.Get(hostnameToOptimize)
isOptimized, err := app.isReplicationLagUnderThreshold(replicaToOptimize)
if err != nil {
return err
}
if isOptimized {
return nil
}

err = replicaToOptimize.OptimizeReplication()
if err != nil {
return err
}
defer func() {
masterNode := app.cluster.Get(masterHost)
err = replicaToOptimize.SetDefaultReplicationSettings(masterNode)
if err != nil {
app.logger.Error("can't set default replication settings")
}
}()

return app.waitReplicaToConverge(replicaToOptimize)
}

func (app *App) chooseReplicaToOptimize(
optionalDesirableReplica string,
replicas []string,
) (string, error) {
if len(optionalDesirableReplica) > 0 {
return optionalDesirableReplica, nil
}

positions, err := app.getNodePositions(replicas)
if err != nil {
return "", err
}

hostnameToOptimize, err := app.getMostDesirableReplicaToOptimize(positions)
if err != nil {
return "", err
}
app.logger.Infof("replica optimization: the replica is '%s'", hostnameToOptimize)

return hostnameToOptimize, nil
}

func (app *App) getMostDesirableReplicaToOptimize(positions []nodePosition) (string, error) {
lagThreshold := app.config.OptimizeReplicationLagThreshold
return getMostDesirableNode(app.logger, positions, lagThreshold)
}

func (app *App) waitReplicaToConverge(
replica *mysql.Node,
) error {
timer := time.NewTimer(app.config.OptimizeReplicationConvergenceTimeout)
for {
select {
case <-timer.C:
return errors.New(DeadlineExceeded)
default:
lagUnderThreshold, err := app.isReplicationLagUnderThreshold(replica)
if err != nil {
app.logger.Infof("can't check replication status: %s", err.Error())
}
if lagUnderThreshold {
return nil
}
time.Sleep(time.Second)
}
}
}

func (app *App) isReplicationLagUnderThreshold(
replica *mysql.Node,
) (bool, error) {
status, err := replica.GetReplicaStatus()
if err != nil {
return false, err
}

lag := status.GetReplicationLag().Float64
lagThreshold := app.config.OptimizeReplicationLagThreshold.Seconds()

if !app.config.ASync && lag < lagThreshold {
return true, nil
}
return false, nil
}

// nolint: gocyclo, funlen
func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNodes []string, switchover *Switchover, oldMaster string) error {
if switchover.To != "" {
Expand All @@ -1323,6 +1428,11 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
activeNodes = filterOut(activeNodes, []string{oldMaster})
}

err := app.optimizationPhase(activeNodes, switchover, oldMaster)
if err != nil {
return err
}

// set read only everywhere (all HA-nodes) and stop replication
app.logger.Info("switchover: phase 1: enter read only")
errs := util.RunParallel(func(host string) error {
Expand Down Expand Up @@ -1404,7 +1514,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
frozenActiveNodes = append(frozenActiveNodes, host)
}
}
err := app.switchHelper.CheckFailoverQuorum(activeNodesWithOldMaster, len(frozenActiveNodes))
err = app.switchHelper.CheckFailoverQuorum(activeNodesWithOldMaster, len(frozenActiveNodes))
if err != nil {
return err
}
Expand Down Expand Up @@ -1583,6 +1693,44 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
return nil
}

func (app *App) optimizationPhase(activeNodes []string, switchover *Switchover, oldMaster string) error {
if !app.switchHelper.IsOptimizationPhaseAllowed() {
app.logger.Info("switchover: phase 0: turbo mode is skipped")
return nil
}

appropriateReplicas := filterOut(activeNodes, []string{oldMaster, switchover.From})
desirableReplica := switchover.To

app.logger.Infof(
"switchover: phase 0: enter turbo mode; replicas: %v, oldMaster: '%s', desirable replica: '%s'",
appropriateReplicas,
oldMaster,
desirableReplica,
)
err := app.optimizeReplicaWithSmallestLag(
appropriateReplicas,
oldMaster,
desirableReplica,
)
if err != nil && err.Error() == DeadlineExceeded {
app.logger.Infof("switchover: phase 0: turbo mode failed: %v", err)
switchErr := app.FinishSwitchover(switchover, fmt.Errorf("turbo mode exceeded deadline"))
if switchErr != nil {
return fmt.Errorf("switchover: failed to reject switchover %s", switchErr)
}
app.logger.Info("switchover: rejected")
return err
}

// Conceptually, we should only reject the switchover if we encounter a DeadlineExceeded error.
// This indicates that the replica with the freshest data is too far from convergence,
// and we can't optimize it within a limited time frame.
// Other cases can be handled in subsequent steps, so no special action is needed here.
app.logger.Info("switchover: phase 0: turbo mode is complete")
return nil
}

func (app *App) getCurrentMaster(clusterState map[string]*NodeState) (string, error) {
master, err := app.GetMasterHostFromDcs()
if master != "" && err == nil {
Expand Down
4 changes: 4 additions & 0 deletions internal/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import (
"github.com/yandex/mysync/internal/util"
)

const (
DeadlineExceeded = "deadline exceeded"
)

type nodePosition struct {
host string
gtidset gtids.GTIDSet
Expand Down
6 changes: 5 additions & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,11 @@ type Config struct {
ReplMonErrorWaitInterval time.Duration `config:"repl_mon_error_wait_interval" yaml:"repl_mon_error_wait_interval"`
ReplMonSlaveWaitInterval time.Duration `config:"repl_mon_slave_wait_interval" yaml:"repl_mon_slave_wait_interval"`
ShowOnlyGTIDDiff bool `config:"show_only_gtid_diff" yaml:"show_only_gtid_diff"`
ManagerSwitchover bool `config:"manager_switchover" yaml:"manager_switchover"`
ForceSwitchover bool `config:"force_switchover" yaml:"force_switchover"` // TODO: Remove when we will be sure it's right way to do switchover
DSNSettings string `config:"dsn_settings" yaml:"dsn_settings"`
ManagerSwitchover bool `config:"manager_switchover" yaml:"manager_switchover"`
OptimizeReplicationLagThreshold time.Duration `config:"optimize_replication_lag_threshold" yaml:"optimize_replication_lag_threshold"`
OptimizeReplicationConvergenceTimeout time.Duration `config:"optimize_replication_convergence_timeout" yaml:"optimize_replication_convergence_timeout"`
}

// DefaultConfig returns default configuration for MySync
Expand Down Expand Up @@ -192,6 +194,8 @@ func DefaultConfig() (Config, error) {
ManagerSwitchover: false,
ForceSwitchover: false,
DSNSettings: "?autocommit=1&sql_log_off=1",
OptimizeReplicationLagThreshold: 60 * time.Second,
OptimizeReplicationConvergenceTimeout: 300 * time.Second,
}
return config, nil
}
Expand Down
7 changes: 7 additions & 0 deletions internal/mysql/switch_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ type ISwitchHelper interface {
GetRequiredWaitSlaveCount([]string) int
GetFailoverQuorum([]string) int
CheckFailoverQuorum([]string, int) error
IsOptimizationPhaseAllowed() bool
}

type SwitchHelper struct {
priorityChoiceMaxLag time.Duration
rplSemiSyncMasterWaitForSlaveCount int
SemiSync bool
ASync bool
}

func NewSwitchHelper(config *config.Config) ISwitchHelper {
Expand All @@ -31,6 +33,7 @@ func NewSwitchHelper(config *config.Config) ISwitchHelper {
priorityChoiceMaxLag: priorityChoiceMaxLag,
rplSemiSyncMasterWaitForSlaveCount: config.RplSemiSyncMasterWaitForSlaveCount,
SemiSync: config.SemiSync,
ASync: config.ASync,
}
}

Expand Down Expand Up @@ -71,3 +74,7 @@ func (sh *SwitchHelper) CheckFailoverQuorum(activeNodes []string, permissibleSla
}
return nil
}

func (sh *SwitchHelper) IsOptimizationPhaseAllowed() bool {
return !sh.ASync
}
2 changes: 2 additions & 0 deletions tests/images/mysql/mysync.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,5 @@ force_switchover: ${FORCE_SWITCHOVER:-false}
manager_switchover: ${MANAGER_SWITCHOVER:-true}
manager_election_delay_after_quorum_loss: ${MANAGER_ELECTION_DELAY_AFTER_QUORUM_LOSS:-15s}
manager_lock_acquire_delay_after_quorum_loss: ${MANAGER_LOCK_ACQUIRE_DELAY_AFTER_QUORUM_LOSS:-30s}
optimize_replication_lag_threshold: 60s
optimize_replication_convergence_timeout: 300s
Loading