Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: optimize replication settings when the replica is offline #34

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
feat: optimize replication settings when the replica is offline
Fizic committed Sep 7, 2023
commit 7d8d9e03f75ad2af51dcdaa0d4e2669b07d2436e
18 changes: 15 additions & 3 deletions internal/app/app.go
Original file line number Diff line number Diff line change
@@ -1382,6 +1382,7 @@ func (app *App) getMasterHost(clusterState map[string]*NodeState) (string, error
}

func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master string) {
masterNode := app.cluster.Get(master)
for host, state := range clusterState {
if !state.PingOk {
continue
@@ -1390,7 +1391,7 @@ func (app *App) repairOfflineMode(clusterState map[string]*NodeState, master str
if host == master {
app.repairMasterOfflineMode(host, node, state)
} else {
app.repairSlaveOfflineMode(host, node, state, clusterState[master])
app.repairSlaveOfflineMode(host, node, state, masterNode, clusterState[master])
}
}
}
@@ -1409,7 +1410,7 @@ func (app *App) repairMasterOfflineMode(host string, node *mysql.Node, state *No
}
}

func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *NodeState, masterState *NodeState) {
func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *NodeState, masterNode *mysql.Node, masterState *NodeState) {
if state.SlaveState != nil && state.SlaveState.ReplicationLag != nil {
replPermBroken, _ := state.IsReplicationPermanentlyBroken()
if state.IsOffline && *state.SlaveState.ReplicationLag <= app.config.OfflineModeDisableLag.Seconds() {
@@ -1431,7 +1432,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod
app.logger.Errorf("repair: should not turn slave to online until get actual resetup status")
return
}

err = node.SetDefaultReplicationSettings(masterNode)
if err != nil {
app.logger.Errorf("repair: failed to set optimise replication settings on slave %s: %s", host, err)
}
err = node.SetOnline()
if err != nil {
app.logger.Errorf("repair: failed to set slave %s online: %s", host, err)
@@ -1447,6 +1451,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod
} else {
app.logger.Infof("repair: slave %s set offline, because ReplicationLag (%f s) >= OfflineModeEnableLag (%v)",
host, *state.SlaveState.ReplicationLag, app.config.OfflineModeEnableLag)
err = node.OptimizeReplication()
if err != nil {
app.logger.Errorf("repair: failed to set optimise replication settings on slave %s: %s", host, err)
}
}
}
// gradual transfer of permanently broken nodes to offline
@@ -1467,6 +1475,10 @@ func (app *App) repairSlaveOfflineMode(host string, node *mysql.Node, state *Nod
} else {
app.logger.Infof("repair: slave %s set offline, because replication permanently broken", host)
}
err = node.OptimizeReplication()
if err != nil {
app.logger.Errorf("repair: failed to set optimise replication settings on slave %s: %s", host, err)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

replPermBroken is true. Does it make sense to change replication settings in this case?

}
}
}
42 changes: 42 additions & 0 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
@@ -40,6 +40,11 @@ var queryOnliner = regexp.MustCompile(`\r?\n\s*`)
var mogrifyRegex = regexp.MustCompile(`:\w+`)
var ErrNotLocalNode = errors.New("this method should be run on local node only")

const (
optimalSyncBinlogValue = 1000
optimalInnodbFlushLogAtTrxCommitValue = 2
)

// NewNode returns new Node
func NewNode(config *config.Config, logger *log.Logger, host string) (*Node, error) {
addr := util.JoinHostPort(host, config.MySQL.Port)
@@ -1060,3 +1065,40 @@ func (n *Node) SaveCAFile(data string, path string) error {
}
return nil
}

// OptimizeReplication sets the optimal settings for replication, which reduces the replication lag.
// Cannot be used permanently due to the instability of the replica in this state
func (n *Node) OptimizeReplication() error {
err := n.exec(querySetInnodbFlushLogAtTrxCommit, map[string]interface{}{"level": optimalInnodbFlushLogAtTrxCommitValue})
if err != nil {
return err
}
err = n.exec(querySetSyncBinlog, map[string]interface{}{"sync_binlog": optimalSyncBinlogValue})
if err != nil {
return err
}
return nil
}

type ReplicationSettings struct {
InnodbFlushLogAtTrxCommit int `db:"InnodbFlushLogAtTrxCommit"`
SyncBinlog int `db:"SyncBinlog"`
}

// SetDefaultReplicationSettings sets default values for replication based on the value on the master
func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error {
var rs ReplicationSettings
err := masterNode.queryRow(queryGetReplicationSettings, nil, &rs)
if err != nil {
return err
}
err = n.exec(querySetInnodbFlushLogAtTrxCommit, map[string]interface{}{"level": rs.InnodbFlushLogAtTrxCommit})
if err != nil {
return err
}
err = n.exec(querySetSyncBinlog, map[string]interface{}{"sync_binlog": rs.SyncBinlog})
if err != nil {
return err
}
return nil
}
8 changes: 7 additions & 1 deletion internal/mysql/queries.go
Original file line number Diff line number Diff line change
@@ -42,6 +42,9 @@ const (
queryStopReplica = "stop_replica"
queryStartReplica = "start_replica"
queryIgnoreDB = "ignore_db"
querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit"
querySetSyncBinlog = "set_sync_binlog"
queryGetReplicationSettings = "get_replication_settings"
)

var DefaultQueries = map[string]string{
@@ -116,5 +119,8 @@ var DefaultQueries = map[string]string{
SOURCE_RETRY_COUNT = :retryCount,
SOURCE_DELAY = :sourceDelay
FOR CHANNEL :channel`,
queryIgnoreDB: `CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (:ignoreList) FOR CHANNEl :channel`,
queryIgnoreDB: `CHANGE REPLICATION FILTER REPLICATE_IGNORE_DB = (:ignoreList) FOR CHANNEl :channel`,
querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`,
queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`,
querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`,
}