Skip to content

Commit

Permalink
bug fix: full gtid replaced with channel gtid
Browse files Browse the repository at this point in the history
  • Loading branch information
noname0443 committed Dec 29, 2023
1 parent b169948 commit 27fab78
Showing 1 changed file with 10 additions and 13 deletions.
23 changes: 10 additions & 13 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
type ReplicationRepairState struct {
LastAttempt time.Time
History map[ReplicationRepairAlgorithmType]int
BeforeRepairGTID *mysql.GTIDExecuted
BeforeRepairGTID string
}

func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
Expand All @@ -31,18 +31,18 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
return
}

gtid, err := node.GTIDExecuted()
status, err := node.ReplicaStatusWithTimeout(app.config.DBTimeout, channel)
if err != nil {
return
}
if replState.cooldownPassed(app.config.ReplicationRepairCooldown) &&
replState.isGtidExecuted(gtid) {
replState.isGtidExecuted(status.GetExecutedGtidSet()) {
delete(app.replRepairState, key)
}
}

func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) {
replState, err := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel), node.Host())
replState, err := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel), node.Host(), channel)
if err != nil {
app.logger.Errorf("repair error: host %s, %v", node.Host(), err)
return
Expand Down Expand Up @@ -148,24 +148,21 @@ func (state *ReplicationRepairState) cooldownPassed(replicationRepairCooldown ti
return state.LastAttempt.Before(cooldown)
}

func (state *ReplicationRepairState) isGtidExecuted(currentGtid *mysql.GTIDExecuted) bool {
if state.BeforeRepairGTID.ExecutedGtidSet == currentGtid.ExecutedGtidSet {
return false
}
return true
func (state *ReplicationRepairState) isGtidExecuted(currentGtid string) bool {
return state.BeforeRepairGTID != currentGtid
}

func (app *App) getOrCreateHostRepairState(stateKey, hostname string) (*ReplicationRepairState, error) {
func (app *App) getOrCreateHostRepairState(stateKey, hostname, channel string) (*ReplicationRepairState, error) {
var replState *ReplicationRepairState
if state, ok := app.replRepairState[stateKey]; ok {
replState = state
} else {
replState = app.createRepairState()
executedGtid, err := app.cluster.Get(hostname).GTIDExecuted()
status, err := app.cluster.Get(hostname).ReplicaStatusWithTimeout(app.config.DBTimeout, channel)
if err != nil {
return nil, err
}
replState.BeforeRepairGTID = executedGtid
replState.BeforeRepairGTID = status.GetExecutedGtidSet()
app.replRepairState[stateKey] = replState
}

Expand All @@ -176,7 +173,7 @@ func (app *App) createRepairState() *ReplicationRepairState {
result := ReplicationRepairState{
LastAttempt: time.Now(),
History: make(map[ReplicationRepairAlgorithmType]int),
BeforeRepairGTID: &mysql.GTIDExecuted{},
BeforeRepairGTID: "",
}

for i := range app.getAlgorithmOrder() {
Expand Down

0 comments on commit 27fab78

Please sign in to comment.