diff --git a/internal/app/app.go b/internal/app/app.go index 33d32df9..6ca55970 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -375,7 +375,7 @@ func (app *App) checkCrashRecovery() { func (app *App) checkHAReplicasRunning(local *mysql.Node) bool { checker := func(host string) error { node := app.cluster.Get(host) - status, err := node.ReplicaStatusWithTimeout(app.config.DBLostCheckTimeout) + status, err := node.ReplicaStatusWithTimeout(app.config.DBLostCheckTimeout, app.config.ReplicationChannel) if err != nil { return err } @@ -1574,6 +1574,8 @@ func (app *App) repairMasterNode(masterNode *mysql.Node, clusterState, clusterSt // enter read-only if disk is full app.repairReadOnlyOnMaster(masterNode, masterState, clusterStateDcs) + app.repairExternalReplication(masterNode) + events, err := masterNode.ReenableEvents() if err != nil { app.logger.Errorf("repair: failed to reenable slaveside disabled events on %s: %s", host, err) @@ -1661,10 +1663,10 @@ func (app *App) repairSlaveNode(node *mysql.Node, clusterState map[string]*NodeS if result, code := state.IsReplicationPermanentlyBroken(); result { app.logger.Warnf("repair: replication on host %v is permanently broken, error code: %d", host, code) } else { - app.TryRepairReplication(node, master) + app.TryRepairReplication(node, master, app.config.ReplicationChannel) } } else { - app.MarkReplicationRunning(node) + app.MarkReplicationRunning(node, app.config.ReplicationChannel) } } } @@ -1779,6 +1781,25 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod } } +func (app *App) repairExternalReplication(masterNode *mysql.Node) { + extReplStatus, err := masterNode.GetExternalReplicaStatus() + if extReplStatus == nil { + // external replication is not supported + return + } + if err != nil { + if !mysql.IsErrorChannelDoesNotExists(err) { + app.logger.Errorf("repair (external): host %s failed to get external replica status %v", masterNode.Host(), err) + } + return + } + + if extReplStatus.ReplicationState() == mysql.ReplicationError { + // TODO: remove "". Master is not needed for external replication now + app.TryRepairReplication(masterNode, "", app.config.ExternalReplicationChannel) + } +} + func (app *App) findBestStreamFrom(node *mysql.Node, clusterState map[string]*NodeState, master string, cascadeTopology map[string]mysql.CascadeNodeConfiguration) string { var loopDetector []string loopDetector = append(loopDetector, node.Host()) @@ -1949,14 +1970,7 @@ func (app *App) getNodeState(host string) *NodeState { if slaveStatus != nil { nodeState.IsMaster = false nodeState.SlaveState = new(SlaveState) - nodeState.SlaveState.ExecutedGtidSet = slaveStatus.GetExecutedGtidSet() - nodeState.SlaveState.RetrievedGtidSet = slaveStatus.GetRetrievedGtidSet() - nodeState.SlaveState.MasterHost = slaveStatus.GetMasterHost() - nodeState.SlaveState.ReplicationState = slaveStatus.ReplicationState() - nodeState.SlaveState.MasterLogFile = slaveStatus.GetMasterLogFile() - nodeState.SlaveState.MasterLogPos = slaveStatus.GetReadMasterLogPos() - nodeState.SlaveState.LastIOErrno = slaveStatus.GetLastIOErrno() - nodeState.SlaveState.LastSQLErrno = slaveStatus.GetLastSQLErrno() + nodeState.SlaveState.FromReplicaStatus(slaveStatus) lag, err2 := node.ReplicationLag(slaveStatus) if err2 != nil { return err2 diff --git a/internal/app/data.go b/internal/app/data.go index 4d5e1a2e..86b98982 100644 --- a/internal/app/data.go +++ b/internal/app/data.go @@ -7,6 +7,8 @@ import ( "strconv" "strings" "time" + + "github.com/yandex/mysync/internal/mysql" ) type appState string @@ -218,6 +220,17 @@ type SlaveState struct { LastSQLErrno int `json:"last_sql_errno"` } +func (ns *SlaveState) FromReplicaStatus(replStatus mysql.ReplicaStatus) { + ns.ExecutedGtidSet = replStatus.GetExecutedGtidSet() + ns.RetrievedGtidSet = replStatus.GetRetrievedGtidSet() + ns.MasterHost = replStatus.GetMasterHost() + ns.ReplicationState = replStatus.ReplicationState() + ns.MasterLogFile = replStatus.GetMasterLogFile() + ns.MasterLogPos = replStatus.GetReadMasterLogPos() + ns.LastIOErrno = replStatus.GetLastIOErrno() + ns.LastSQLErrno = replStatus.GetLastSQLErrno() +} + // SemiSyncState contains semi sync host settings type SemiSyncState struct { MasterEnabled bool `json:"master_enabled"` diff --git a/internal/app/replication.go b/internal/app/replication.go index 83981246..9167f983 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -7,7 +7,7 @@ import ( "github.com/yandex/mysync/internal/mysql" ) -type RepairReplicationAlgorithm func(app *App, node *mysql.Node, master string) error +type RepairReplicationAlgorithm func(app *App, node *mysql.Node, master string, channel string) error type ReplicationRepairAlgorithmType int @@ -21,21 +21,22 @@ type ReplicationRepairState struct { History map[ReplicationRepairAlgorithmType]int } -func (app *App) MarkReplicationRunning(node *mysql.Node) { +func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) { var replState *ReplicationRepairState - if state, ok := app.replRepairState[node.Host()]; ok { + key := app.makeReplStateKey(node, channel) + if state, ok := app.replRepairState[key]; ok { replState = state } else { return } if replState.cooldownPassed(app.config.ReplicationRepairCooldown) { - delete(app.replRepairState, node.Host()) + delete(app.replRepairState, key) } } -func (app *App) TryRepairReplication(node *mysql.Node, master string) { - replState := app.getOrCreateHostRepairState(node.Host()) +func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) { + replState := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel)) if !replState.cooldownPassed(app.config.ReplicationRepairCooldown) { return @@ -48,7 +49,7 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string) { } algorithm := getRepairAlgorithm(algorithmType) - err = algorithm(app, node, master) + err = algorithm(app, node, master, channel) if err != nil { app.logger.Errorf("repair error: %v", err) } @@ -57,12 +58,27 @@ func (app *App) TryRepairReplication(node *mysql.Node, master string) { replState.LastAttempt = time.Now() } -func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string) error { +func (app *App) makeReplStateKey(node *mysql.Node, channel string) string { + if channel == app.config.ExternalReplicationChannel { + return fmt.Sprintf("%s-%s", node.Host(), channel) + } + return node.Host() +} + +func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string, channel string) error { app.logger.Infof("repair: trying to repair replication using StartSlaveAlgorithm...") + if channel == app.config.ExternalReplicationChannel { + return node.StartExternalReplication() + } return node.StartSlave() } -func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string) error { +func ResetSlaveAlgorithm(app *App, node *mysql.Node, master string, channel string) error { + // TODO we don't want reset slave on external replication + if channel == app.config.ExternalReplicationChannel { + app.logger.Infof("external repair: don't want to use ResetSlaveAlgorithm, leaving") + return nil + } app.logger.Infof("repair: trying to repair replication using ResetSlaveAlgorithm...") app.logger.Infof("repair: executing set slave offline") err := node.SetOffline() diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 9bac4eaa..0e8e878d 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -484,21 +484,7 @@ func (n *Node) Ping() (bool, error) { // GetReplicaStatus returns slave/replica status or nil if node is master func (n *Node) GetReplicaStatus() (ReplicaStatus, error) { - return n.ReplicaStatusWithTimeout(n.config.DBTimeout) -} - -func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration) (ReplicaStatus, error) { - query, status, err := n.GetVersionSlaveStatusQuery() - if err != nil { - return nil, nil - } - err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{ - "channel": n.config.ReplicationChannel, - }, status, timeout) - if err == sql.ErrNoRows { - return nil, nil - } - return status, err + return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ReplicationChannel) } // GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel @@ -510,10 +496,18 @@ func (n *Node) GetExternalReplicaStatus() (ReplicaStatus, error) { if !(checked) { return nil, nil } - status := new(ReplicaStatusStruct) - err = n.queryRowMogrifyWithTimeout(queryReplicaStatus, map[string]interface{}{ - "channel": n.config.ExternalReplicationChannel, - }, status, n.config.DBTimeout) + + return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel) +} + +func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration, channel string) (ReplicaStatus, error) { + query, status, err := n.GetVersionSlaveStatusQuery() + if err != nil { + return nil, nil + } + err = n.queryRowMogrifyWithTimeout(query, map[string]interface{}{ + "channel": channel, + }, status, timeout) if err == sql.ErrNoRows { return nil, nil }