Skip to content

Commit

Permalink
External replication repair
Browse files Browse the repository at this point in the history
  • Loading branch information
Aleksandr Shevchuk committed Sep 11, 2023
1 parent 4807b86 commit 4f24e11
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 39 deletions.
30 changes: 19 additions & 11 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func (app *App) checkCrashRecovery() {
func (app *App) checkHAReplicasRunning(local *mysql.Node) bool {
checker := func(host string) error {
node := app.cluster.Get(host)
status, err := node.ReplicaStatusWithTimeout(app.config.DBLostCheckTimeout)
status, err := node.ReplicaStatusWithTimeout(app.config.DBLostCheckTimeout, app.config.ReplicationChannel)
if err != nil {
return err
}
Expand Down Expand Up @@ -1574,6 +1574,8 @@ func (app *App) repairMasterNode(masterNode *mysql.Node, clusterState, clusterSt
// enter read-only if disk is full
app.repairReadOnlyOnMaster(masterNode, masterState, clusterStateDcs)

app.repairExternalReplication(masterNode)

events, err := masterNode.ReenableEvents()
if err != nil {
app.logger.Errorf("repair: failed to reenable slaveside disabled events on %s: %s", host, err)
Expand Down Expand Up @@ -1661,10 +1663,10 @@ func (app *App) repairSlaveNode(node *mysql.Node, clusterState map[string]*NodeS
if result, code := state.IsReplicationPermanentlyBroken(); result {
app.logger.Warnf("repair: replication on host %v is permanently broken, error code: %d", host, code)
} else {
app.TryRepairReplication(node, master)
app.TryRepairReplication(node, master, app.config.ReplicationChannel)
}
} else {
app.MarkReplicationRunning(node)
app.MarkReplicationRunning(node, app.config.ReplicationChannel)
}
}
}
Expand Down Expand Up @@ -1779,6 +1781,19 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod
}
}

func (app *App) repairExternalReplication(masterNode *mysql.Node) {
extReplStatus, err := masterNode.GetExternalReplicaStatus()
if err != nil {
if !mysql.IsErrorChannelDoesNotExists(err) {
app.logger.Errorf("repair (external): host %s failed to get external replica status %v", masterNode.Host(), err)
}
return
}

if extReplStatus.ReplicationState() == mysql.ReplicationError {

Check failure on line 1793 in internal/app/app.go

View workflow job for this annotation

GitHub Actions / lint

empty-block: this block is empty, you can remove it (revive)
}
}

func (app *App) findBestStreamFrom(node *mysql.Node, clusterState map[string]*NodeState, master string, cascadeTopology map[string]mysql.CascadeNodeConfiguration) string {
var loopDetector []string
loopDetector = append(loopDetector, node.Host())
Expand Down Expand Up @@ -1949,14 +1964,7 @@ func (app *App) getNodeState(host string) *NodeState {
if slaveStatus != nil {
nodeState.IsMaster = false
nodeState.SlaveState = new(SlaveState)
nodeState.SlaveState.ExecutedGtidSet = slaveStatus.GetExecutedGtidSet()
nodeState.SlaveState.RetrievedGtidSet = slaveStatus.GetRetrievedGtidSet()
nodeState.SlaveState.MasterHost = slaveStatus.GetMasterHost()
nodeState.SlaveState.ReplicationState = slaveStatus.ReplicationState()
nodeState.SlaveState.MasterLogFile = slaveStatus.GetMasterLogFile()
nodeState.SlaveState.MasterLogPos = slaveStatus.GetReadMasterLogPos()
nodeState.SlaveState.LastIOErrno = slaveStatus.GetLastIOErrno()
nodeState.SlaveState.LastSQLErrno = slaveStatus.GetLastSQLErrno()
nodeState.SlaveState.FromReplicaStatus(slaveStatus)
lag, err2 := node.ReplicationLag(slaveStatus)
if err2 != nil {
return err2
Expand Down
13 changes: 13 additions & 0 deletions internal/app/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"strconv"
"strings"
"time"

"github.com/yandex/mysync/internal/mysql"
)

type appState string
Expand Down Expand Up @@ -218,6 +220,17 @@ type SlaveState struct {
LastSQLErrno int `json:"last_sql_errno"`
}

func (ns *SlaveState) FromReplicaStatus(replStatus mysql.ReplicaStatus) {
ns.ExecutedGtidSet = replStatus.GetExecutedGtidSet()
ns.RetrievedGtidSet = replStatus.GetRetrievedGtidSet()
ns.MasterHost = replStatus.GetMasterHost()
ns.ReplicationState = replStatus.ReplicationState()
ns.MasterLogFile = replStatus.GetMasterLogFile()
ns.MasterLogPos = replStatus.GetReadMasterLogPos()
ns.LastIOErrno = replStatus.GetLastIOErrno()
ns.LastSQLErrno = replStatus.GetLastSQLErrno()
}

// SemiSyncState contains semi sync host settings
type SemiSyncState struct {
MasterEnabled bool `json:"master_enabled"`
Expand Down
34 changes: 25 additions & 9 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/yandex/mysync/internal/mysql"
)

type RepairReplicationAlgorithm func(app *App, node *mysql.Node, master string) error
type RepairReplicationAlgorithm func(app *App, node *mysql.Node, master string, channel string) error

type ReplicationRepairAlgorithmType int

Expand All @@ -21,21 +21,22 @@ type ReplicationRepairState struct {
History map[ReplicationRepairAlgorithmType]int
}

func (app *App) MarkReplicationRunning(node *mysql.Node) {
func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
var replState *ReplicationRepairState
if state, ok := app.replRepairState[node.Host()]; ok {
key := app.makeReplStateKey(node, channel)
if state, ok := app.replRepairState[key]; ok {
replState = state
} else {
return
}

if replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
delete(app.replRepairState, node.Host())
delete(app.replRepairState, key)
}
}

func (app *App) TryRepairReplication(node *mysql.Node, master string) {
replState := app.getOrCreateHostRepairState(node.Host())
func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) {
replState := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel))

if !replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
return
Expand All @@ -48,7 +49,7 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string) {
}

algorithm := getRepairAlgorithm(algorithmType)
err = algorithm(app, node, master)
err = algorithm(app, node, master, channel)
if err != nil {
app.logger.Errorf("repair error: %v", err)
}
Expand All @@ -57,12 +58,27 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string) {
replState.LastAttempt = time.Now()
}

func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string) error {
func (app *App) makeReplStateKey(node *mysql.Node, channel string) string {
if channel == app.config.ExternalReplicationChannel {
return fmt.Sprintf("%s-%s", node.Host(), channel)
}
return node.Host()
}

func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string, channel string) error {
app.logger.Infof("repair: trying to repair replication using StartSlaveAlgorithm...")
if channel == app.config.ExternalReplicationChannel {
node.StartExternalReplication()

Check failure on line 71 in internal/app/replication.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `node.StartExternalReplication` is not checked (errcheck)
}
return node.StartSlave()
}

func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string) error {
func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel string) error {
// TODO we don't want reset slave on external replication
if channel == app.config.ExternalReplicationChannel {
app.logger.Infof("external repair: don't want to use ResetSlaveAlgorithm, leaving")
return nil
}
app.logger.Infof("repair: trying to repair replication using ResetSlaveAlgorithm...")
app.logger.Infof("repair: executing set slave offline")
err := node.SetOffline()
Expand Down
32 changes: 13 additions & 19 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,21 +484,7 @@ func (n *Node) Ping() (bool, error) {

// GetReplicaStatus returns slave/replica status or nil if node is master
func (n *Node) GetReplicaStatus() (ReplicaStatus, error) {
return n.ReplicaStatusWithTimeout(n.config.DBTimeout)
}

func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration) (ReplicaStatus, error) {
query, status, err := n.GetVersionSlaveStatusQuery()
if err != nil {
return nil, nil
}
err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{
"channel": n.config.ReplicationChannel,
}, status, timeout)
if err == sql.ErrNoRows {
return nil, nil
}
return status, err
return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ReplicationChannel)
}

// GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel
Expand All @@ -510,10 +496,18 @@ func (n *Node) GetExternalReplicaStatus() (ReplicaStatus, error) {
if !(checked) {
return nil, nil
}
status := new(ReplicaStatusStruct)
err = n.queryRowMogrifyWithTimeout(queryReplicaStatus, map[string]interface{}{
"channel": n.config.ExternalReplicationChannel,
}, status, n.config.DBTimeout)

return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel)
}

func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration, channel string) (ReplicaStatus, error) {
query, status, err := n.GetVersionSlaveStatusQuery()
if err != nil {
return nil, nil
}
err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{
"channel": channel,
}, status, timeout)
if err == sql.ErrNoRows {
return nil, nil
}
Expand Down

0 comments on commit 4f24e11

Please sign in to comment.