diff --git a/internal/app/replication.go b/internal/app/replication.go index 70d2fa4b..29488c39 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -5,6 +5,7 @@ import ( "time" "github.com/yandex/mysync/internal/mysql" + "github.com/yandex/mysync/internal/mysql/gtids" ) type RepairReplicationAlgorithm func(app *App, node *mysql.Node, master string, channel string) error @@ -17,8 +18,9 @@ const ( ) type ReplicationRepairState struct { - LastAttempt time.Time - History map[ReplicationRepairAlgorithmType]int + LastAttempt time.Time + History map[ReplicationRepairAlgorithmType]int + LastGTIDExecuted string } func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) { @@ -31,12 +33,26 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) { } if replState.cooldownPassed(app.config.ReplicationRepairCooldown) { - delete(app.replRepairState, key) + status, err := node.ReplicaStatusWithTimeout(app.config.DBTimeout, channel) + if err != nil { + return + } + + newGtidSet := gtids.ParseGtidSet(status.GetExecutedGtidSet()) + oldGtidSet := gtids.ParseGtidSet(replState.LastGTIDExecuted) + + if !isGTIDLessOrEqual(newGtidSet, oldGtidSet) { + delete(app.replRepairState, key) + } } } func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) { - replState := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel)) + replState, err := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel), node.Host(), channel) + if err != nil { + app.logger.Errorf("repair error: host %s, %v", node.Host(), err) + return + } if !replState.cooldownPassed(app.config.ReplicationRepairCooldown) { return @@ -138,29 +154,40 @@ func (state *ReplicationRepairState) cooldownPassed(replicationRepairCooldown ti return state.LastAttempt.Before(cooldown) } -func (app *App) getOrCreateHostRepairState(stateKey string) *ReplicationRepairState { +func (app *App) getOrCreateHostRepairState(stateKey, hostname, channel string) (*ReplicationRepairState, error) { var replState *ReplicationRepairState if state, ok := app.replRepairState[stateKey]; ok { replState = state } else { - replState = app.createRepairState() + var err error + replState, err = app.createRepairState(hostname, channel) + if err != nil { + return nil, err + } + app.replRepairState[stateKey] = replState } - return replState + return replState, nil } -func (app *App) createRepairState() *ReplicationRepairState { +func (app *App) createRepairState(hostname, channel string) (*ReplicationRepairState, error) { + status, err := app.cluster.Get(hostname).ReplicaStatusWithTimeout(app.config.DBTimeout, channel) + if err != nil { + return nil, err + } + result := ReplicationRepairState{ - LastAttempt: time.Now(), - History: make(map[ReplicationRepairAlgorithmType]int), + LastAttempt: time.Now(), + History: make(map[ReplicationRepairAlgorithmType]int), + LastGTIDExecuted: status.GetExecutedGtidSet(), } for i := range app.getAlgorithmOrder() { result.History[ReplicationRepairAlgorithmType(i)] = 0 } - return &result + return &result, nil } var defaultOrder = []ReplicationRepairAlgorithmType{