diff --git a/internal/app/app.go b/internal/app/app.go index 33d32df9..a11cbda5 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -1382,6 +1382,7 @@ func (app *App) getMasterHost(clusterState map[string]*NodeState) (string, error } func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master string) { + masterNode := app.cluster.Get(master) for host, state := range clusterState { if !state.PingOk { continue @@ -1390,7 +1391,7 @@ func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master str if host == master { app.repairMasterOfflineMode(host, node, state) } else { - app.repairSlaveOfflineMode(host, node, state, clusterState[master]) + app.repairSlaveOfflineMode(host, node, state, masterNode, clusterState[master]) } } } @@ -1409,7 +1410,7 @@ func (app *App) repairMasterOfflineMode(host string, node *mysql.Node, state *No } } -func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *NodeState, masterState *NodeState) { +func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *NodeState, masterNode *mysql.Node, masterState *NodeState) { if state.SlaveState != nil && state.SlaveState.ReplicationLag != nil { replPermBroken, _ := state.IsReplicationPermanentlyBroken() if state.IsOffline && *state.SlaveState.ReplicationLag <= app.config.OfflineModeDisableLag.Seconds() { @@ -1431,7 +1432,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod app.logger.Errorf("repair: should not turn slave to online until get actual resetup status") return } - + err = node.SetDefaultReplicationSettings(masterNode) + if err != nil { + app.logger.Errorf("repair: failed to set default replication settings on slave %s: %s", host, err) + } err = node.SetOnline() if err != nil { app.logger.Errorf("repair: failed to set slave %s online: %s", host, err) @@ -1447,6 +1451,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod } else { app.logger.Infof("repair: slave %s set offline, because ReplicationLag (%f s) >= OfflineModeEnableLag (%v)", host, *state.SlaveState.ReplicationLag, app.config.OfflineModeEnableLag) + err = node.OptimizeReplication() + if err != nil { + app.logger.Errorf("repair: failed to set optimize replication settings on slave %s: %s", host, err) + } } } // gradual transfer of permanently broken nodes to offline diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 9bac4eaa..5f43e477 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -40,6 +40,11 @@ var queryOnliner = regexp.MustCompile(`\r?\n\s*`) var mogrifyRegex = regexp.MustCompile(`:\w+`) var ErrNotLocalNode = errors.New("this method should be run on local node only") +const ( + optimalSyncBinlogValue = 1000 + optimalInnodbFlushLogAtTrxCommitValue = 2 +) + // NewNode returns new Node func NewNode(config *config.Config, logger *log.Logger, host string) (*Node, error) { addr := util.JoinHostPort(host, config.MySQL.Port) @@ -1060,3 +1065,40 @@ func (n *Node) SaveCAFile(data string, path string) error { } return nil } + +// OptimizeReplication sets the optimal settings for replication, which reduces the replication lag. +// Cannot be used permanently due to the instability of the replica in this state +func (n *Node) OptimizeReplication() error { + err := n.exec(querySetInnodbFlushLogAtTrxCommit, map[string]interface{}{"level": optimalInnodbFlushLogAtTrxCommitValue}) + if err != nil { + return err + } + err = n.exec(querySetSyncBinlog, map[string]interface{}{"sync_binlog": optimalSyncBinlogValue}) + if err != nil { + return err + } + return nil +} + +type ReplicationSettings struct { + InnodbFlushLogAtTrxCommit int `db:"InnodbFlushLogAtTrxCommit"` + SyncBinlog int `db:"SyncBinlog"` +} + +// SetDefaultReplicationSettings sets default values for replication based on the value on the master +func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error { + var rs ReplicationSettings + err := masterNode.queryRow(queryGetReplicationSettings, nil, &rs) + if err != nil { + return err + } + err = n.exec(querySetInnodbFlushLogAtTrxCommit, map[string]interface{}{"level": rs.InnodbFlushLogAtTrxCommit}) + if err != nil { + return err + } + err = n.exec(querySetSyncBinlog, map[string]interface{}{"sync_binlog": rs.SyncBinlog}) + if err != nil { + return err + } + return nil +} diff --git a/internal/mysql/queries.go b/internal/mysql/queries.go index 4da48fb5..9e1d6abe 100644 --- a/internal/mysql/queries.go +++ b/internal/mysql/queries.go @@ -42,6 +42,9 @@ const ( queryStopReplica = "stop_replica" queryStartReplica = "start_replica" queryIgnoreDB = "ignore_db" + querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit" + querySetSyncBinlog = "set_sync_binlog" + queryGetReplicationSettings = "get_replication_settings" ) var DefaultQueries = map[string]string{ @@ -116,5 +119,8 @@ var DefaultQueries = map[string]string{ SOURCE_RETRY_COUNT = :retryCount, SOURCE_DELAY = :sourceDelay FOR CHANNEL :channel`, - queryIgnoreDB: `CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (:ignoreList) FOR CHANNEl :channel`, + queryIgnoreDB: `CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (:ignoreList) FOR CHANNEl :channel`, + querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`, + queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`, + querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`, }