Skip to content

Commit

Permalink
repl_mon fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
suetin committed May 27, 2024
1 parent 51782c9 commit 6f9a218
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 26 deletions.
2 changes: 1 addition & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2330,7 +2330,7 @@ func (app *App) Run() int {
if app.config.ExternalReplicationType != util.Disabled {
go app.externalCAFileChecker(ctx)
}
if app.config.ASync {
if app.config.ReplMon {
go app.replMonWriter(ctx)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/app/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

func (app *App) CheckAsyncSwitchAllowed(node *mysql.Node, switchover *Switchover) bool {
if app.config.ASync && switchover.Cause == CauseAuto {
if app.config.ASync && switchover.Cause == CauseAuto && app.config.AsyncAllowedLag > 0 {
app.logger.Infof("async mode is active and this is auto switch so we checking new master delay")
ts, err := app.GetReplMonTS()
if err != nil {
Expand Down
24 changes: 17 additions & 7 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,11 @@ func (n *Node) queryRowMogrifyWithTimeout(queryName string, arg map[string]inter
return err
}

func (n *Node) queryRowMogrify(queryName string, arg map[string]interface{}, result interface{}) error {
return n.queryRowMogrifyWithTimeout(queryName, arg, result, n.config.DBTimeout)

}

Check failure on line 350 in internal/mysql/node.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary trailing newline (whitespace)

// IsRunning checks if daemon process is running
func (n *Node) IsRunning() (bool, error) {
if !n.IsLocal() {
Expand Down Expand Up @@ -1007,23 +1012,28 @@ func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error {

func (n *Node) GetReplMonTS(replMonTable string) (string, error) {
result := new(ReplMonTS)
err := n.queryRow(strings.ReplaceAll(queryGetReplMonTS, "replMonTable", replMonTable), nil, result)
err := n.queryRowMogrify(queryGetReplMonTS,
map[string]interface{}{"replMonTable": replMonTable},
result)
return result.Timestamp, err
}

func (n *Node) CalcReplMonTSDelay(replMonTable string, ts string) (int64, error) {
result := new(ReplMonTSDelay)
err := n.queryRow(strings.ReplaceAll(queryCalcReplMonTSDelay, "replMonTable", replMonTable),
map[string]interface{}{"ts": ts}, result)
err := n.queryRowMogrify(queryCalcReplMonTSDelay,
map[string]interface{}{"ts": ts, "replMonTable": replMonTable},
result)
return result.Delay, err
}

func (n *Node) CreateReplMonTable(replMonTable string) error {
err := n.exec(strings.ReplaceAll(queryCreateReplMonTable, "replMonTable", replMonTable), nil)
return err
return n.execMogrify(queryCreateReplMonTable, map[string]interface{}{
"replMonTable": replMonTable,
})
}

func (n *Node) UpdateReplMonTable(replMonTable string) error {
err := n.exec(strings.ReplaceAll(queryUpdateReplMon, "replMonTable", replMonTable), nil)
return err
return n.execMogrify(queryUpdateReplMon, map[string]interface{}{
"replMonTable": replMonTable,
})
}
8 changes: 4 additions & 4 deletions internal/mysql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,14 @@ var DefaultQueries = map[string]string{
querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`,
queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`,
querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`,
queryGetReplMonTS: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM replMonTable`,
queryCalcReplMonTSDelay: `SELECT FLOOR(CAST(:ts AS DECIMAL(20,3)) - UNIX_TIMESTAMP(ts)) AS delay FROM replMonTable`,
queryCreateReplMonTable: `CREATE TABLE IF NOT EXISTS replMonTable(
queryGetReplMonTS: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM :replMonTable`,
queryCalcReplMonTSDelay: `SELECT FLOOR(CAST(:ts AS DECIMAL(20,3)) - UNIX_TIMESTAMP(ts)) AS delay FROM :replMonTable`,
queryCreateReplMonTable: `CREATE TABLE IF NOT EXISTS :replMonTable(
id INT NOT NULL PRIMARY KEY,
ts TIMESTAMP(3)
)
ENGINE=INNODB`,
queryUpdateReplMon: `INSERT INTO replMonTable(id, ts)
queryUpdateReplMon: `INSERT INTO :replMonTable(id, ts)
(
SELECT 1, CURRENT_TIMESTAMP(3)
WHERE @@read_only = 0
Expand Down
26 changes: 13 additions & 13 deletions tests/features/async_setting.feature
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Feature: mysync async mode tests
"""
MYSYNC_SEMISYNC=false
MYSYNC_ASYNC=true
ASYNC_ALLOWED_LAG=70
ASYNC_ALLOWED_LAG=90
MYSYNC_FAILOVER=true
MYSYNC_FAILOVER_DELAY=0s
MYSYNC_FAILOVER_COOLDOWN=0s
Expand Down Expand Up @@ -38,16 +38,16 @@ Feature: mysync async mode tests
And I run SQL on mysql host "mysql2"
"""
STOP REPLICA FOR CHANNEL '';
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 50;
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 60;
START REPLICA FOR CHANNEL '';
"""
And I run SQL on mysql host "mysql3"
"""
STOP REPLICA FOR CHANNEL '';
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 90;
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 120;
START REPLICA FOR CHANNEL '';
"""
And I wait for "120" seconds
And I wait for "150" seconds
And I run SQL on mysql host "mysql1"
"""
INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F")
Expand Down Expand Up @@ -204,7 +204,7 @@ Feature: mysync async mode tests
"""
MYSYNC_SEMISYNC=false
MYSYNC_ASYNC=true
ASYNC_ALLOWED_LAG=70
ASYNC_ALLOWED_LAG=90
MYSYNC_FAILOVER=true
MYSYNC_FAILOVER_DELAY=0s
MYSYNC_FAILOVER_COOLDOWN=0s
Expand Down Expand Up @@ -237,16 +237,16 @@ Feature: mysync async mode tests
And I run SQL on mysql host "mysql2"
"""
STOP REPLICA FOR CHANNEL '';
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 50;
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 60;
START REPLICA FOR CHANNEL '';
"""
And I run SQL on mysql host "mysql3"
"""
STOP REPLICA FOR CHANNEL '';
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 90;
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 120;
START REPLICA FOR CHANNEL '';
"""
And I wait for "120" seconds
And I wait for "150" seconds
And I run SQL on mysql host "mysql1"
"""
INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F")
Expand Down Expand Up @@ -296,7 +296,7 @@ Feature: mysync async mode tests
"""
[{"val":"A,B,C,D,E,F"}]
"""
When I wait for "120" seconds
When I wait for "150" seconds
And I run SQL on mysql host "mysql3"
"""
SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t
Expand Down Expand Up @@ -344,16 +344,16 @@ Feature: mysync async mode tests
And I run SQL on mysql host "mysql2"
"""
STOP REPLICA FOR CHANNEL '';
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 70;
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 60;
START REPLICA FOR CHANNEL '';
"""
And I run SQL on mysql host "mysql3"
"""
STOP REPLICA FOR CHANNEL '';
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 110;
CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 120;
START REPLICA FOR CHANNEL '';
"""
And I wait for "120" seconds
And I wait for "150" seconds
And I run SQL on mysql host "mysql1"
"""
INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F")
Expand Down Expand Up @@ -392,7 +392,7 @@ Feature: mysync async mode tests
"""
[{"val":"A,B,C"}]
"""
When I wait for "120" seconds
When I wait for "150" seconds
And I run SQL on mysql host "mysql3"
"""
SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t
Expand Down

0 comments on commit 6f9a218

Please sign in to comment.