Skip to content

Commit

Permalink
External replication repair (#36)
Browse files Browse the repository at this point in the history
* External replication repair

* fixes

---------

Co-authored-by: Aleksandr Shevchuk <[email protected]>
  • Loading branch information
teem0n and Aleksandr Shevchuk authored Sep 13, 2023
1 parent 4f79361 commit dd7c960
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 42 deletions.
36 changes: 25 additions & 11 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (app *App) checkCrashRecovery() {
func (app *App) checkHAReplicasRunning(local *mysql.Node) bool {
checker := func(host string) error {
node := app.cluster.Get(host)
status, err := node.ReplicaStatusWithTimeout(app.config.DBLostCheckTimeout)
status, err := node.ReplicaStatusWithTimeout(app.config.DBLostCheckTimeout, app.config.ReplicationChannel)
if err != nil {
return err
}
Expand Down Expand Up @@ -1582,6 +1582,8 @@ func (app *App) repairMasterNode(masterNode *mysql.Node, clusterState, clusterSt
// enter read-only if disk is full
app.repairReadOnlyOnMaster(masterNode, masterState, clusterStateDcs)

app.repairExternalReplication(masterNode)

events, err := masterNode.ReenableEvents()
if err != nil {
app.logger.Errorf("repair: failed to reenable slaveside disabled events on %s: %s", host, err)
Expand Down Expand Up @@ -1669,10 +1671,10 @@ func (app *App) repairSlaveNode(node *mysql.Node, clusterState map[string]*NodeS
if result, code := state.IsReplicationPermanentlyBroken(); result {
app.logger.Warnf("repair: replication on host %v is permanently broken, error code: %d", host, code)
} else {
app.TryRepairReplication(node, master)
app.TryRepairReplication(node, master, app.config.ReplicationChannel)
}
} else {
app.MarkReplicationRunning(node)
app.MarkReplicationRunning(node, app.config.ReplicationChannel)
}
}
}
Expand Down Expand Up @@ -1787,6 +1789,25 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod
}
}

func (app *App) repairExternalReplication(masterNode *mysql.Node) {
extReplStatus, err := masterNode.GetExternalReplicaStatus()
if err != nil {
if !mysql.IsErrorChannelDoesNotExists(err) {
app.logger.Errorf("repair (external): host %s failed to get external replica status %v", masterNode.Host(), err)
}
return
}
if extReplStatus == nil {
// external replication is not supported
return
}

if extReplStatus.ReplicationState() == mysql.ReplicationError {
// TODO: remove "". Master is not needed for external replication now
app.TryRepairReplication(masterNode, "", app.config.ExternalReplicationChannel)
}
}

func (app *App) findBestStreamFrom(node *mysql.Node, clusterState map[string]*NodeState, master string, cascadeTopology map[string]mysql.CascadeNodeConfiguration) string {
var loopDetector []string
loopDetector = append(loopDetector, node.Host())
Expand Down Expand Up @@ -1957,14 +1978,7 @@ func (app *App) getNodeState(host string) *NodeState {
if slaveStatus != nil {
nodeState.IsMaster = false
nodeState.SlaveState = new(SlaveState)
nodeState.SlaveState.ExecutedGtidSet = slaveStatus.GetExecutedGtidSet()
nodeState.SlaveState.RetrievedGtidSet = slaveStatus.GetRetrievedGtidSet()
nodeState.SlaveState.MasterHost = slaveStatus.GetMasterHost()
nodeState.SlaveState.ReplicationState = slaveStatus.ReplicationState()
nodeState.SlaveState.MasterLogFile = slaveStatus.GetMasterLogFile()
nodeState.SlaveState.MasterLogPos = slaveStatus.GetReadMasterLogPos()
nodeState.SlaveState.LastIOErrno = slaveStatus.GetLastIOErrno()
nodeState.SlaveState.LastSQLErrno = slaveStatus.GetLastSQLErrno()
nodeState.SlaveState.FromReplicaStatus(slaveStatus)
lag, err2 := node.ReplicationLag(slaveStatus)
if err2 != nil {
return err2
Expand Down
13 changes: 13 additions & 0 deletions internal/app/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"strings"
"time"

"github.com/yandex/mysync/internal/mysql"
)

type appState string
Expand Down Expand Up @@ -218,6 +220,17 @@ type SlaveState struct {
LastSQLErrno int `json:"last_sql_errno"`
}

func (ns *SlaveState) FromReplicaStatus(replStatus mysql.ReplicaStatus) {
ns.ExecutedGtidSet = replStatus.GetExecutedGtidSet()
ns.RetrievedGtidSet = replStatus.GetRetrievedGtidSet()
ns.MasterHost = replStatus.GetMasterHost()
ns.ReplicationState = replStatus.ReplicationState()
ns.MasterLogFile = replStatus.GetMasterLogFile()
ns.MasterLogPos = replStatus.GetReadMasterLogPos()
ns.LastIOErrno = replStatus.GetLastIOErrno()
ns.LastSQLErrno = replStatus.GetLastSQLErrno()
}

// SemiSyncState contains semi sync host settings
type SemiSyncState struct {
MasterEnabled bool `json:"master_enabled"`
Expand Down
41 changes: 29 additions & 12 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/yandex/mysync/internal/mysql"
)

type RepairReplicationAlgorithm func(app *App, node *mysql.Node, master string) error
type RepairReplicationAlgorithm func(app *App, node *mysql.Node, master string, channel string) error

type ReplicationRepairAlgorithmType int

Expand All @@ -21,21 +21,22 @@ type ReplicationRepairState struct {
History map[ReplicationRepairAlgorithmType]int
}

func (app *App) MarkReplicationRunning(node *mysql.Node) {
func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
var replState *ReplicationRepairState
if state, ok := app.replRepairState[node.Host()]; ok {
key := app.makeReplStateKey(node, channel)
if state, ok := app.replRepairState[key]; ok {
replState = state
} else {
return
}

if replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
delete(app.replRepairState, node.Host())
delete(app.replRepairState, key)
}
}

func (app *App) TryRepairReplication(node *mysql.Node, master string) {
replState := app.getOrCreateHostRepairState(node.Host())
func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) {
replState := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel))

if !replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
return
Expand All @@ -48,7 +49,7 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string) {
}

algorithm := getRepairAlgorithm(algorithmType)
err = algorithm(app, node, master)
err = algorithm(app, node, master, channel)
if err != nil {
app.logger.Errorf("repair error: %v", err)
}
Expand All @@ -57,12 +58,28 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string) {
replState.LastAttempt = time.Now()
}

func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string) error {
func (app *App) makeReplStateKey(node *mysql.Node, channel string) string {
if channel == app.config.ExternalReplicationChannel {
return fmt.Sprintf("%s-%s", node.Host(), channel)
}
return node.Host()
}

func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string, channel string) error {
app.logger.Infof("repair: trying to repair replication using StartSlaveAlgorithm...")
if channel == app.config.ExternalReplicationChannel {
return node.StartExternalReplication()
}
return node.StartSlave()
}

func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string) error {
func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel string) error {
// TODO we don't want reset slave on external replication
// May be we should split algorithms by channel type (ext/int)
if channel == app.config.ExternalReplicationChannel {
app.logger.Infof("external repair: don't want to use ResetSlaveAlgorithm, leaving")
return nil
}
app.logger.Infof("repair: trying to repair replication using ResetSlaveAlgorithm...")
app.logger.Infof("repair: executing set slave offline")
err := node.SetOffline()
Expand Down Expand Up @@ -121,13 +138,13 @@ func (state *ReplicationRepairState) cooldownPassed(replicationRepairCooldown ti
return state.LastAttempt.Before(cooldown)
}

func (app *App) getOrCreateHostRepairState(host string) *ReplicationRepairState {
func (app *App) getOrCreateHostRepairState(stateKey string) *ReplicationRepairState {
var replState *ReplicationRepairState
if state, ok := app.replRepairState[host]; ok {
if state, ok := app.replRepairState[stateKey]; ok {
replState = state
} else {
replState = app.createRepairState()
app.replRepairState[host] = replState
app.replRepairState[stateKey] = replState
}

return replState
Expand Down
32 changes: 13 additions & 19 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,21 +489,7 @@ func (n *Node) Ping() (bool, error) {

// GetReplicaStatus returns slave/replica status or nil if node is master
func (n *Node) GetReplicaStatus() (ReplicaStatus, error) {
return n.ReplicaStatusWithTimeout(n.config.DBTimeout)
}

func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration) (ReplicaStatus, error) {
query, status, err := n.GetVersionSlaveStatusQuery()
if err != nil {
return nil, nil
}
err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{
"channel": n.config.ReplicationChannel,
}, status, timeout)
if err == sql.ErrNoRows {
return nil, nil
}
return status, err
return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ReplicationChannel)
}

// GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel
Expand All @@ -515,10 +501,18 @@ func (n *Node) GetExternalReplicaStatus() (ReplicaStatus, error) {
if !(checked) {
return nil, nil
}
status := new(ReplicaStatusStruct)
err = n.queryRowMogrifyWithTimeout(queryReplicaStatus, map[string]interface{}{
"channel": n.config.ExternalReplicationChannel,
}, status, n.config.DBTimeout)

return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel)
}

func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration, channel string) (ReplicaStatus, error) {
query, status, err := n.GetVersionSlaveStatusQuery()
if err != nil {
return nil, err
}
err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{
"channel": channel,
}, status, timeout)
if err == sql.ErrNoRows {
return nil, nil
}
Expand Down

0 comments on commit dd7c960

Please sign in to comment.