Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize replication settings when the replica is offline #34

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,6 +1382,7 @@ func (app *App) getMasterHost(clusterState map[string]*NodeState) (string, error
}

func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master string) {
masterNode := app.cluster.Get(master)
for host, state := range clusterState {
if !state.PingOk {
continue
Expand All @@ -1390,7 +1391,7 @@ func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master str
if host == master {
app.repairMasterOfflineMode(host, node, state)
} else {
app.repairSlaveOfflineMode(host, node, state, clusterState[master])
app.repairSlaveOfflineMode(host, node, state, masterNode, clusterState[master])
}
}
}
Expand All @@ -1409,7 +1410,7 @@ func (app *App) repairMasterOfflineMode(host string, node *mysql.Node, state *No
}
}

func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *NodeState, masterState *NodeState) {
func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *NodeState, masterNode *mysql.Node, masterState *NodeState) {
if state.SlaveState != nil && state.SlaveState.ReplicationLag != nil {
replPermBroken, _ := state.IsReplicationPermanentlyBroken()
if state.IsOffline && *state.SlaveState.ReplicationLag <= app.config.OfflineModeDisableLag.Seconds() {
Expand All @@ -1431,7 +1432,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod
app.logger.Errorf("repair: should not turn slave to online until get actual resetup status")
return
}

err = node.SetDefaultReplicationSettings(masterNode)
if err != nil {
app.logger.Errorf("repair: failed to set default replication settings on slave %s: %s", host, err)
}
err = node.SetOnline()
if err != nil {
app.logger.Errorf("repair: failed to set slave %s online: %s", host, err)
Expand All @@ -1447,6 +1451,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod
} else {
app.logger.Infof("repair: slave %s set offline, because ReplicationLag (%f s) >= OfflineModeEnableLag (%v)",
host, *state.SlaveState.ReplicationLag, app.config.OfflineModeEnableLag)
err = node.OptimizeReplication()
if err != nil {
app.logger.Errorf("repair: failed to set optimize replication settings on slave %s: %s", host, err)
}
}
}
// gradual transfer of permanently broken nodes to offline
Expand Down
42 changes: 42 additions & 0 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ var queryOnliner = regexp.MustCompile(`\r?\n\s*`)
var mogrifyRegex = regexp.MustCompile(`:\w+`)
var ErrNotLocalNode = errors.New("this method should be run on local node only")

const (
optimalSyncBinlogValue = 1000
optimalInnodbFlushLogAtTrxCommitValue = 2
)

// NewNode returns new Node
func NewNode(config *config.Config, logger *log.Logger, host string) (*Node, error) {
addr := util.JoinHostPort(host, config.MySQL.Port)
Expand Down Expand Up @@ -1060,3 +1065,40 @@ func (n *Node) SaveCAFile(data string, path string) error {
}
return nil
}

// OptimizeReplication sets the optimal settings for replication, which reduces the replication lag.
// Cannot be used permanently due to the instability of the replica in this state
func (n *Node) OptimizeReplication() error {
err := n.exec(querySetInnodbFlushLogAtTrxCommit, map[string]interface{}{"level": optimalInnodbFlushLogAtTrxCommitValue})
if err != nil {
return err
}
err = n.exec(querySetSyncBinlog, map[string]interface{}{"sync_binlog": optimalSyncBinlogValue})
if err != nil {
return err
}
return nil
}

type ReplicationSettings struct {
InnodbFlushLogAtTrxCommit int `db:"InnodbFlushLogAtTrxCommit"`
SyncBinlog int `db:"SyncBinlog"`
}

// SetDefaultReplicationSettings sets default values for replication based on the value on the master
func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error {
var rs ReplicationSettings
err := masterNode.queryRow(queryGetReplicationSettings, nil, &rs)
if err != nil {
return err
}
err = n.exec(querySetInnodbFlushLogAtTrxCommit, map[string]interface{}{"level": rs.InnodbFlushLogAtTrxCommit})
if err != nil {
return err
}
err = n.exec(querySetSyncBinlog, map[string]interface{}{"sync_binlog": rs.SyncBinlog})
if err != nil {
return err
}
return nil
}
8 changes: 7 additions & 1 deletion internal/mysql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ const (
queryStopReplica = "stop_replica"
queryStartReplica = "start_replica"
queryIgnoreDB = "ignore_db"
querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit"
querySetSyncBinlog = "set_sync_binlog"
queryGetReplicationSettings = "get_replication_settings"
)

var DefaultQueries = map[string]string{
Expand Down Expand Up @@ -116,5 +119,8 @@ var DefaultQueries = map[string]string{
SOURCE_RETRY_COUNT = :retryCount,
SOURCE_DELAY = :sourceDelay
FOR CHANNEL :channel`,
queryIgnoreDB: `CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (:ignoreList) FOR CHANNEl :channel`,
queryIgnoreDB: `CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (:ignoreList) FOR CHANNEl :channel`,
querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`,
queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`,
querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`,
}