Skip to content

Commit

Permalink
feat: optimize replication settings when the replica is offline (#34)
Browse files Browse the repository at this point in the history
* feat: optimize replication settings when the replica is offline

* fix: misspell

* fix: issues
  • Loading branch information
Fizic authored Sep 12, 2023
1 parent 4807b86 commit 4f79361
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 4 deletions.
14 changes: 11 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@ func (app *App) getMasterHost(clusterState map[string]*NodeState) (string, error
}

func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master string) {
masterNode := app.cluster.Get(master)
for host, state := range clusterState {
if !state.PingOk {
continue
Expand All @@ -1390,7 +1391,7 @@ func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master str
if host == master {
app.repairMasterOfflineMode(host, node, state)
} else {
app.repairSlaveOfflineMode(host, node, state, clusterState[master])
app.repairSlaveOfflineMode(host, node, state, masterNode, clusterState[master])
}
}
}
Expand All @@ -1409,7 +1410,7 @@ func (app *App) repairMasterOfflineMode(host string, node *mysql.Node, state *No
}
}

func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *NodeState, masterState *NodeState) {
func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *NodeState, masterNode *mysql.Node, masterState *NodeState) {
if state.SlaveState != nil && state.SlaveState.ReplicationLag != nil {
replPermBroken, _ := state.IsReplicationPermanentlyBroken()
if state.IsOffline && *state.SlaveState.ReplicationLag <= app.config.OfflineModeDisableLag.Seconds() {
Expand All @@ -1431,7 +1432,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod
app.logger.Errorf("repair: should not turn slave to online until get actual resetup status")
return
}

err = node.SetDefaultReplicationSettings(masterNode)
if err != nil {
app.logger.Errorf("repair: failed to set default replication settings on slave %s: %s", host, err)
}
err = node.SetOnline()
if err != nil {
app.logger.Errorf("repair: failed to set slave %s online: %s", host, err)
Expand All @@ -1447,6 +1451,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod
} else {
app.logger.Infof("repair: slave %s set offline, because ReplicationLag (%f s) >= OfflineModeEnableLag (%v)",
host, *state.SlaveState.ReplicationLag, app.config.OfflineModeEnableLag)
err = node.OptimizeReplication()
if err != nil {
app.logger.Errorf("repair: failed to set optimize replication settings on slave %s: %s", host, err)
}
}
}
// gradual transfer of permanently broken nodes to offline
Expand Down
42 changes: 42 additions & 0 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ var queryOnliner = regexp.MustCompile(`\r?\n\s*`)
var mogrifyRegex = regexp.MustCompile(`:\w+`)
var ErrNotLocalNode = errors.New("this method should be run on local node only")

const (
optimalSyncBinlogValue = 1000
optimalInnodbFlushLogAtTrxCommitValue = 2
)

// NewNode returns new Node
func NewNode(config *config.Config, logger *log.Logger, host string) (*Node, error) {
addr := util.JoinHostPort(host, config.MySQL.Port)
Expand Down Expand Up @@ -1060,3 +1065,40 @@ func (n *Node) SaveCAFile(data string, path string) error {
}
return nil
}

// OptimizeReplication sets the optimal settings for replication, which reduces the replication lag.
// Cannot be used permanently due to the instability of the replica in this state
func (n *Node) OptimizeReplication() error {
err := n.exec(querySetInnodbFlushLogAtTrxCommit, map[string]interface{}{"level": optimalInnodbFlushLogAtTrxCommitValue})
if err != nil {
return err
}
err = n.exec(querySetSyncBinlog, map[string]interface{}{"sync_binlog": optimalSyncBinlogValue})
if err != nil {
return err
}
return nil
}

type ReplicationSettings struct {
InnodbFlushLogAtTrxCommit int `db:"InnodbFlushLogAtTrxCommit"`
SyncBinlog int `db:"SyncBinlog"`
}

// SetDefaultReplicationSettings sets default values for replication based on the value on the master
func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error {
var rs ReplicationSettings
err := masterNode.queryRow(queryGetReplicationSettings, nil, &rs)
if err != nil {
return err
}
err = n.exec(querySetInnodbFlushLogAtTrxCommit, map[string]interface{}{"level": rs.InnodbFlushLogAtTrxCommit})
if err != nil {
return err
}
err = n.exec(querySetSyncBinlog, map[string]interface{}{"sync_binlog": rs.SyncBinlog})
if err != nil {
return err
}
return nil
}
8 changes: 7 additions & 1 deletion internal/mysql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
queryStopReplica = "stop_replica"
queryStartReplica = "start_replica"
queryIgnoreDB = "ignore_db"
querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit"
querySetSyncBinlog = "set_sync_binlog"
queryGetReplicationSettings = "get_replication_settings"
)

var DefaultQueries = map[string]string{
Expand Down Expand Up @@ -116,5 +119,8 @@ var DefaultQueries = map[string]string{
SOURCE_RETRY_COUNT = :retryCount,
SOURCE_DELAY = :sourceDelay
FOR CHANNEL :channel`,
queryIgnoreDB: `CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (:ignoreList) FOR CHANNEl :channel`,
queryIgnoreDB: `CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (:ignoreList) FOR CHANNEl :channel`,
querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`,
queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`,
querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`,
}

0 comments on commit 4f79361

Please sign in to comment.