From b169948d9e44c69f8ad6e3b4aaeab01311dd22d5 Mon Sep 17 00:00:00 2001 From: Evgeniy Date: Thu, 28 Dec 2023 16:54:05 +0300 Subject: [PATCH] bug fix --- internal/app/replication.go | 39 +++++++++++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/internal/app/replication.go b/internal/app/replication.go index 70d2fa4b..89f0e76b 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -17,8 +17,9 @@ const ( ) type ReplicationRepairState struct { - LastAttempt time.Time - History map[ReplicationRepairAlgorithmType]int + LastAttempt time.Time + History map[ReplicationRepairAlgorithmType]int + BeforeRepairGTID *mysql.GTIDExecuted } func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) { @@ -30,13 +31,22 @@ func (app *App) MarkReplicationRunning(node *mysql.Node, channel string) { return } - if replState.cooldownPassed(app.config.ReplicationRepairCooldown) { + gtid, err := node.GTIDExecuted() + if err != nil { + return + } + if replState.cooldownPassed(app.config.ReplicationRepairCooldown) && + replState.isGtidExecuted(gtid) { delete(app.replRepairState, key) } } func (app *App) TryRepairReplication(node *mysql.Node, master string, channel string) { - replState := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel)) + replState, err := app.getOrCreateHostRepairState(app.makeReplStateKey(node, channel), node.Host()) + if err != nil { + app.logger.Errorf("repair error: host %s, %v", node.Host(), err) + return + } if !replState.cooldownPassed(app.config.ReplicationRepairCooldown) { return @@ -138,22 +148,35 @@ func (state *ReplicationRepairState) cooldownPassed(replicationRepairCooldown ti return state.LastAttempt.Before(cooldown) } -func (app *App) getOrCreateHostRepairState(stateKey string) *ReplicationRepairState { +func (state *ReplicationRepairState) isGtidExecuted(currentGtid *mysql.GTIDExecuted) bool { + if state.BeforeRepairGTID.ExecutedGtidSet == currentGtid.ExecutedGtidSet { + return false + } + return true +} + +func (app *App) getOrCreateHostRepairState(stateKey, hostname string) (*ReplicationRepairState, error) { var replState *ReplicationRepairState if state, ok := app.replRepairState[stateKey]; ok { replState = state } else { replState = app.createRepairState() + executedGtid, err := app.cluster.Get(hostname).GTIDExecuted() + if err != nil { + return nil, err + } + replState.BeforeRepairGTID = executedGtid app.replRepairState[stateKey] = replState } - return replState + return replState, nil } func (app *App) createRepairState() *ReplicationRepairState { result := ReplicationRepairState{ - LastAttempt: time.Now(), - History: make(map[ReplicationRepairAlgorithmType]int), + LastAttempt: time.Now(), + History: make(map[ReplicationRepairAlgorithmType]int), + BeforeRepairGTID: &mysql.GTIDExecuted{}, } for i := range app.getAlgorithmOrder() {