Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
noname0443 committed Dec 28, 2023
1 parent 3e66b2c commit b169948
Showing 1 changed file with 31 additions and 8 deletions.
39 changes: 31 additions & 8 deletions internal/app/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ const (
)

type ReplicationRepairState struct {
LastAttempt time.Time
History map[ReplicationRepairAlgorithmType]int
LastAttempt time.Time
History map[ReplicationRepairAlgorithmType]int
BeforeRepairGTID *mysql.GTIDExecuted
}

func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
Expand All @@ -30,13 +31,22 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) {
return
}

if replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
gtid, err := node.GTIDExecuted()
if err != nil {
return
}
if replState.cooldownPassed(app.config.ReplicationRepairCooldown) &&
replState.isGtidExecuted(gtid) {
delete(app.replRepairState, key)
}
}

func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) {
replState := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel))
replState, err := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel), node.Host())
if err != nil {
app.logger.Errorf("repair error: host %s, %v", node.Host(), err)
return
}

if !replState.cooldownPassed(app.config.ReplicationRepairCooldown) {
return
Expand Down Expand Up @@ -138,22 +148,35 @@ func (state *ReplicationRepairState) cooldownPassed(replicationRepairCooldown ti
return state.LastAttempt.Before(cooldown)
}

func (app *App) getOrCreateHostRepairState(stateKey string) *ReplicationRepairState {
func (state *ReplicationRepairState) isGtidExecuted(currentGtid *mysql.GTIDExecuted) bool {
if state.BeforeRepairGTID.ExecutedGtidSet == currentGtid.ExecutedGtidSet {
return false
}
return true
}

func (app *App) getOrCreateHostRepairState(stateKey, hostname string) (*ReplicationRepairState, error) {
var replState *ReplicationRepairState
if state, ok := app.replRepairState[stateKey]; ok {
replState = state
} else {
replState = app.createRepairState()
executedGtid, err := app.cluster.Get(hostname).GTIDExecuted()
if err != nil {
return nil, err
}
replState.BeforeRepairGTID = executedGtid
app.replRepairState[stateKey] = replState
}

return replState
return replState, nil
}

func (app *App) createRepairState() *ReplicationRepairState {
result := ReplicationRepairState{
LastAttempt: time.Now(),
History: make(map[ReplicationRepairAlgorithmType]int),
LastAttempt: time.Now(),
History: make(map[ReplicationRepairAlgorithmType]int),
BeforeRepairGTID: &mysql.GTIDExecuted{},
}

for i := range app.getAlgorithmOrder() {
Expand Down

0 comments on commit b169948

Please sign in to comment.