Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

restart only replicas ahead of the master #113

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 25 additions & 11 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -940,7 +940,7 @@ func (app *App) calcActiveNodes(clusterState, clusterStateDcs map[string]*NodeSt
continue
}
sgtids := gtids.ParseGtidSet(sstatus.ExecutedGtidSet)
if sstatus.ReplicationState != mysql.ReplicationRunning || isSplitBrained(sgtids, mgtids, muuid) {
if sstatus.ReplicationState != mysql.ReplicationRunning || gtids.IsSplitBrained(sgtids, mgtids, muuid) {
app.logger.Errorf("calc active nodes: %s is not replicating or splitbrained, deleting from active...", host)
continue
}
Expand Down Expand Up @@ -1069,7 +1069,7 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node

// first, shrink HA-group, if needed
if waitSlaveCount > oldWaitSlaveCount {
err := app.adjustSemiSyncOnMaster(masterNode, clusterState[master], waitSlaveCount)
err := app.adjustSemiSyncOnMaster(masterNode, masterState, waitSlaveCount)
if err != nil {
app.logger.Errorf("failed to adjust semi-sync on master %s to %d: %v", masterNode.Host(), waitSlaveCount, err)
return err
Expand All @@ -1092,13 +1092,13 @@ func (app *App) updateActiveNodes(clusterState, clusterStateDcs map[string]*Node

// and finally enlarge HA-group, if needed
for _, host := range becomeActive {
err := app.enableSemiSyncOnSlave(host)
err := app.enableSemiSyncOnSlave(host, clusterState[host], masterState)
if err != nil {
app.logger.Errorf("failed to enable semi-sync on slave %s: %v", host, err)
}
}
if waitSlaveCount < oldWaitSlaveCount {
err := app.adjustSemiSyncOnMaster(masterNode, clusterState[master], waitSlaveCount)
err := app.adjustSemiSyncOnMaster(masterNode, masterState, waitSlaveCount)
if err != nil {
app.logger.Errorf("failed to adjust semi-sync on master %s to %d: %v", masterNode.Host(), waitSlaveCount, err)
}
Expand Down Expand Up @@ -1136,18 +1136,31 @@ func (app *App) adjustSemiSyncOnMaster(node *mysql.Node, state *NodeState, waitS
return nil
}

func (app *App) enableSemiSyncOnSlave(host string) error {
func (app *App) enableSemiSyncOnSlave(host string, slaveState, masterState *NodeState) error {
node := app.cluster.Get(host)
err := node.SemiSyncSetSlave()
if err != nil {
app.logger.Errorf("failed to enable semi_sync_slave on %s: %s", host, err)
return err
}
err = node.RestartReplica()
if err != nil {
app.logger.Errorf("failed restart replication after set semi_sync_slave on %s: %s", host, err)
return err
masterGtidSet := gtids.ParseGtidSet(masterState.MasterState.ExecutedGtidSet)
slaveGtidSet := gtids.ParseGtidSet(slaveState.SlaveState.ExecutedGtidSet)

if gtids.IsSlaveAhead(slaveGtidSet, masterGtidSet) {
// we should restart only replicas ahead of the master
err = node.RestartReplica()
if err != nil {
app.logger.Errorf("failed restart replication after set semi_sync_slave on %s: %s", host, err)
return err
}
} else {
err = node.RestartSlaveIOThread()
if err != nil {
app.logger.Errorf("failed restart slave io thread after set semi_sync_slave on %s: %s", host, err)
return err
}
}

return nil
}

Expand Down Expand Up @@ -1859,12 +1872,13 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod
}
app.logger.Debugf("repair: %s GTID set = %v, new stream_from GTID set is %v", host, myGITIDs, candidateGTIDs)

if !isGTIDLessOrEqual(myGITIDs, candidateGTIDs) && !isGTIDLessOrEqual(candidateGTIDs, myGITIDs) {
// TODO: replace with IsSplitBrained
if gtids.IsSlaveAhead(myGITIDs, candidateGTIDs) && gtids.IsSlaveAhead(candidateGTIDs, myGITIDs) {
app.logger.Errorf("repair: %s and %s are splitbrained...", host, upstreamCandidate)
app.writeEmergeFile("cascade replica splitbain detected")
return
}
if isGTIDLessOrEqual(myGITIDs, candidateGTIDs) {
if gtids.IsSlaveBehindOrEqual(myGITIDs, candidateGTIDs) {
app.logger.Infof("repair: new stream_from host GTID set is superset of our GTID set. Switching Master_Host, Starting replication")
err = app.performChangeMaster(host, upstreamCandidate)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
newGtidSet := gtids.ParseGtidSet(status.GetExecutedGtidSet())
oldGtidSet := gtids.ParseGtidSet(replState.LastGTIDExecuted)

if !isGTIDLessOrEqual(newGtidSet, oldGtidSet) {
if !gtids.IsSlaveBehindOrEqual(newGtidSet, oldGtidSet) {
teem0n marked this conversation as resolved.
Show resolved Hide resolved
delete(app.replRepairState, key)
}
}
Expand Down
31 changes: 1 addition & 30 deletions internal/app/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"fmt"
"time"

gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/google/uuid"
"github.com/yandex/mysync/internal/log"
"github.com/yandex/mysync/internal/mysql"
"github.com/yandex/mysync/internal/mysql/gtids"
Expand Down Expand Up @@ -228,34 +226,7 @@ func isSlavePermanentlyLost(sstatus mysql.ReplicaStatus, masterGtidSet gtids.GTI
return true
}
slaveGtidSet := gtids.ParseGtidSet(sstatus.GetExecutedGtidSet())
return !isGTIDLessOrEqual(slaveGtidSet, masterGtidSet)
}

func isGTIDLessOrEqual(slaveGtidSet, masterGtidSet gtids.GTIDSet) bool {
return masterGtidSet.Contain(slaveGtidSet) || masterGtidSet.Equal(slaveGtidSet)
}

func isSplitBrained(slaveGtidSet, masterGtidSet gtids.GTIDSet, masterUUID uuid.UUID) bool {
mysqlSlaveGtidSet := slaveGtidSet.(*gomysql.MysqlGTIDSet)
mysqlMasterGtidSet := masterGtidSet.(*gomysql.MysqlGTIDSet)
for _, slaveSet := range mysqlSlaveGtidSet.Sets {
masterSet, ok := mysqlMasterGtidSet.Sets[slaveSet.SID.String()]
if !ok {
return true
}

if masterSet.Contain(slaveSet) {
continue
}

if masterSet.SID == masterUUID {
continue
}

return true
}

return false
return !gtids.IsSlaveBehindOrEqual(slaveGtidSet, masterGtidSet)
teem0n marked this conversation as resolved.
Show resolved Hide resolved
}

func validatePriority(priority *int64) error {
Expand Down
13 changes: 7 additions & 6 deletions internal/app/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/yandex/mysync/internal/log"
"github.com/yandex/mysync/internal/mysql"
"github.com/yandex/mysync/internal/mysql/gtids"
)

func mustGTIDSet(s string) gomysql.GTIDSet {
Expand Down Expand Up @@ -338,41 +339,41 @@ func TestIsSplitBrained(t *testing.T) {
slaveGTID := mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100")
ok := isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok := gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.False(t, ok)

// the replica is lagging behind the master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-99," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.False(t, ok)

// the replica is lagging behind the new master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.False(t, ok)

// the replica applied the transaction from the master before the master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-101," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.False(t, ok)

// the replica applied a transaction not from the master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-100," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-101")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.True(t, ok)

// the replica applied a new transaction not from the master
slaveGTID = mustGTIDSet("6DBC0B04-4B09-43DC-86CC-9AF852DED919:1-101," +
"09978591-5754-4710-BF67-062880ABE1B4:1-100," +
"AA6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100," +
"BB6890C8-69F8-4BC4-B3A5-5D3FEA8C28CF:1-100")
ok = isSplitBrained(slaveGTID, masterGTID, masterUUID)
ok = gtids.IsSplitBrained(slaveGTID, masterGTID, masterUUID)
require.True(t, ok)
}
37 changes: 37 additions & 0 deletions internal/mysql/gtids/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package gtids

import (
gomysql "github.com/go-mysql-org/go-mysql/mysql"
"github.com/google/uuid"
)

func IsSlaveBehindOrEqual(slaveGtidSet, masterGtidSet GTIDSet) bool {
return masterGtidSet.Contain(slaveGtidSet) || masterGtidSet.Equal(slaveGtidSet)
}

func IsSlaveAhead(slaveGtidSet, masterGtidSet GTIDSet) bool {
return !IsSlaveBehindOrEqual(slaveGtidSet, masterGtidSet)
}

func IsSplitBrained(slaveGtidSet, masterGtidSet GTIDSet, masterUUID uuid.UUID) bool {
mysqlSlaveGtidSet := slaveGtidSet.(*gomysql.MysqlGTIDSet)
mysqlMasterGtidSet := masterGtidSet.(*gomysql.MysqlGTIDSet)
for _, slaveSet := range mysqlSlaveGtidSet.Sets {
masterSet, ok := mysqlMasterGtidSet.Sets[slaveSet.SID.String()]
if !ok {
return true
}

if masterSet.Contain(slaveSet) {
continue
}

if masterSet.SID == masterUUID {
continue
}

return true
}

return false
}
2 changes: 1 addition & 1 deletion internal/mysql/gtids/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

type GTIDSet = mysql.GTIDSet

func ParseGtidSet(gtidset string) mysql.GTIDSet {
func ParseGtidSet(gtidset string) GTIDSet {
parsed, err := mysql.ParseGTIDSet(mysql.MySQLFlavor, gtidset)
if err != nil {
panic(err)
Expand Down
Loading