diff --git a/internal/app/app.go b/internal/app/app.go index fec74e66..5ad39d70 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2330,7 +2330,7 @@ func (app *App) Run() int { if app.config.ExternalReplicationType != util.Disabled { go app.externalCAFileChecker(ctx) } - if app.config.ASync { + if app.config.ReplMon { go app.replMonWriter(ctx) } diff --git a/internal/app/async.go b/internal/app/async.go index d6cd1118..cf0d39b3 100644 --- a/internal/app/async.go +++ b/internal/app/async.go @@ -6,7 +6,7 @@ import ( ) func (app *App) CheckAsyncSwitchAllowed(node *mysql.Node, switchover *Switchover) bool { - if app.config.ASync && switchover.Cause == CauseAuto { + if app.config.ASync && switchover.Cause == CauseAuto && app.config.AsyncAllowedLag > 0 { app.logger.Infof("async mode is active and this is auto switch so we checking new master delay") ts, err := app.GetReplMonTS() if err != nil { diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 36fd0c98..4c65cb78 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -344,6 +344,11 @@ func (n *Node) queryRowMogrifyWithTimeout(queryName string, arg map[string]inter return err } +func (n *Node) queryRowMogrify(queryName string, arg map[string]interface{}, result interface{}) error { + return n.queryRowMogrifyWithTimeout(queryName, arg, result, n.config.DBTimeout) + +} + // IsRunning checks if daemon process is running func (n *Node) IsRunning() (bool, error) { if !n.IsLocal() { @@ -1007,23 +1012,28 @@ func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error { func (n *Node) GetReplMonTS(replMonTable string) (string, error) { result := new(ReplMonTS) - err := n.queryRow(strings.ReplaceAll(queryGetReplMonTS, "replMonTable", replMonTable), nil, result) + err := n.queryRowMogrify(queryGetReplMonTS, + map[string]interface{}{"replMonTable": replMonTable}, + result) return result.Timestamp, err } func (n *Node) CalcReplMonTSDelay(replMonTable string, ts string) (int64, error) { result := new(ReplMonTSDelay) - err := n.queryRow(strings.ReplaceAll(queryCalcReplMonTSDelay, "replMonTable", replMonTable), - map[string]interface{}{"ts": ts}, result) + err := n.queryRowMogrify(queryCalcReplMonTSDelay, + map[string]interface{}{"ts": ts, "replMonTable": replMonTable}, + result) return result.Delay, err } func (n *Node) CreateReplMonTable(replMonTable string) error { - err := n.exec(strings.ReplaceAll(queryCreateReplMonTable, "replMonTable", replMonTable), nil) - return err + return n.execMogrify(queryCreateReplMonTable, map[string]interface{}{ + "replMonTable": replMonTable, + }) } func (n *Node) UpdateReplMonTable(replMonTable string) error { - err := n.exec(strings.ReplaceAll(queryUpdateReplMon, "replMonTable", replMonTable), nil) - return err + return n.execMogrify(queryUpdateReplMon, map[string]interface{}{ + "replMonTable": replMonTable, + }) } \ No newline at end of file diff --git a/internal/mysql/queries.go b/internal/mysql/queries.go index 61a0b3a4..c896d83a 100644 --- a/internal/mysql/queries.go +++ b/internal/mysql/queries.go @@ -129,14 +129,14 @@ var DefaultQueries = map[string]string{ querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`, queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`, querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`, - queryGetReplMonTS: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM replMonTable`, - queryCalcReplMonTSDelay: `SELECT FLOOR(CAST(:ts AS DECIMAL(20,3)) - UNIX_TIMESTAMP(ts)) AS delay FROM replMonTable`, - queryCreateReplMonTable: `CREATE TABLE IF NOT EXISTS replMonTable( + queryGetReplMonTS: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM :replMonTable`, + queryCalcReplMonTSDelay: `SELECT FLOOR(CAST(:ts AS DECIMAL(20,3)) - UNIX_TIMESTAMP(ts)) AS delay FROM :replMonTable`, + queryCreateReplMonTable: `CREATE TABLE IF NOT EXISTS :replMonTable( id INT NOT NULL PRIMARY KEY, ts TIMESTAMP(3) ) ENGINE=INNODB`, - queryUpdateReplMon: `INSERT INTO replMonTable(id, ts) + queryUpdateReplMon: `INSERT INTO :replMonTable(id, ts) ( SELECT 1, CURRENT_TIMESTAMP(3) WHERE @@read_only = 0 diff --git a/tests/features/async_setting.feature b/tests/features/async_setting.feature index 3001876a..c73a7382 100644 --- a/tests/features/async_setting.feature +++ b/tests/features/async_setting.feature @@ -5,7 +5,7 @@ Feature: mysync async mode tests """ MYSYNC_SEMISYNC=false MYSYNC_ASYNC=true - ASYNC_ALLOWED_LAG=70 + ASYNC_ALLOWED_LAG=90 MYSYNC_FAILOVER=true MYSYNC_FAILOVER_DELAY=0s MYSYNC_FAILOVER_COOLDOWN=0s @@ -38,16 +38,16 @@ Feature: mysync async mode tests And I run SQL on mysql host "mysql2" """ STOP REPLICA FOR CHANNEL ''; - CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 50; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 60; START REPLICA FOR CHANNEL ''; """ And I run SQL on mysql host "mysql3" """ STOP REPLICA FOR CHANNEL ''; - CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 90; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 120; START REPLICA FOR CHANNEL ''; """ - And I wait for "120" seconds + And I wait for "150" seconds And I run SQL on mysql host "mysql1" """ INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F") @@ -204,7 +204,7 @@ Feature: mysync async mode tests """ MYSYNC_SEMISYNC=false MYSYNC_ASYNC=true - ASYNC_ALLOWED_LAG=70 + ASYNC_ALLOWED_LAG=90 MYSYNC_FAILOVER=true MYSYNC_FAILOVER_DELAY=0s MYSYNC_FAILOVER_COOLDOWN=0s @@ -237,16 +237,16 @@ Feature: mysync async mode tests And I run SQL on mysql host "mysql2" """ STOP REPLICA FOR CHANNEL ''; - CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 50; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 60; START REPLICA FOR CHANNEL ''; """ And I run SQL on mysql host "mysql3" """ STOP REPLICA FOR CHANNEL ''; - CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 90; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 120; START REPLICA FOR CHANNEL ''; """ - And I wait for "120" seconds + And I wait for "150" seconds And I run SQL on mysql host "mysql1" """ INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F") @@ -296,7 +296,7 @@ Feature: mysync async mode tests """ [{"val":"A,B,C,D,E,F"}] """ - When I wait for "120" seconds + When I wait for "150" seconds And I run SQL on mysql host "mysql3" """ SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t @@ -344,16 +344,16 @@ Feature: mysync async mode tests And I run SQL on mysql host "mysql2" """ STOP REPLICA FOR CHANNEL ''; - CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 70; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 60; START REPLICA FOR CHANNEL ''; """ And I run SQL on mysql host "mysql3" """ STOP REPLICA FOR CHANNEL ''; - CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 110; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 120; START REPLICA FOR CHANNEL ''; """ - And I wait for "120" seconds + And I wait for "150" seconds And I run SQL on mysql host "mysql1" """ INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F") @@ -392,7 +392,7 @@ Feature: mysync async mode tests """ [{"val":"A,B,C"}] """ - When I wait for "120" seconds + When I wait for "150" seconds And I run SQL on mysql host "mysql3" """ SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t