From 2243d70919c133636a78368969312478f9cd8e36 Mon Sep 17 00:00:00 2001 From: moridin26 Date: Mon, 17 Jun 2024 11:00:49 +0300 Subject: [PATCH] [WIP] support async replication (#88) * support async replication * fix a_sync->async, add async in tests yaml * fix mysync.yaml syntax err * fix docker compose cfg * fix waitForCatchUp return in async mode * set master online on switchover: phase 5 * fix PriorityChoiceMaxLag in async mode * fix queryCalcMdbReplMonTsDelay query * async mode: add tests, fix linters * async mode: add tests, fix linters * async mode: add tests, fix linters * async replication refactoring * async replication test fix * async replication test fix * async replication test fix * async replication test fix * async replication test fix * add mysync-repl-mon feature * add refactor mdb_repl_mon table name to custom configuring name * add refactor mdb_repl_mon table name to custom configuring name * repl_mon fixes * repl_mon fixes * fix typo * add repl_mon tests * fix async tests * fix async tests * fix async tests, add repl_mon.feature launch * fix async tests * fix async tests, fix repl_mon tests * add switch_helper * linters fix * fix " too many arguments " * linters fix * linters fix * linters fix * fix async tests * fix async tests * refactor switch_helper --------- Co-authored-by: suetin Co-authored-by: teem0n --- .github/workflows/docker-tests-8.0.yml | 2 + .github/workflows/docker-tests.yml | 9 +- internal/app/app.go | 68 ++++- internal/app/app_dcs.go | 21 ++ internal/app/async.go | 38 +++ internal/app/cli.go | 2 +- internal/app/data.go | 3 + internal/config/config.go | 19 ++ internal/mysql/data.go | 8 + internal/mysql/node.go | 41 +++ internal/mysql/queries.go | 17 ++ internal/mysql/switch_helper.go | 32 ++ tests/features/async_setting.feature | 407 +++++++++++++++++++++++++ tests/features/repl_mon.feature | 53 ++++ tests/images/docker-compose.yaml | 12 + tests/images/mysql/mysync.yaml | 5 + tests/mysync_test.go | 4 + tests/testutil/docker_composer.go | 2 +- 18 files changed, 735 insertions(+), 8 deletions(-) create mode 100644 internal/app/async.go create mode 100644 internal/mysql/switch_helper.go create mode 100644 tests/features/async_setting.feature create mode 100644 tests/features/repl_mon.feature diff --git a/.github/workflows/docker-tests-8.0.yml b/.github/workflows/docker-tests-8.0.yml index 4d317567..c17449fa 100644 --- a/.github/workflows/docker-tests-8.0.yml +++ b/.github/workflows/docker-tests-8.0.yml @@ -42,6 +42,7 @@ jobs: command: - 'VERSION=8.0 GODOG_FEATURE=active_nodes.feature make test' - 'VERSION=8.0 GODOG_FEATURE=async.feature make test' + - 'VERSION=8.0 GODOG_FEATURE=async_setting.feature make test' - 'VERSION=8.0 GODOG_FEATURE=cascade_replicas.feature make test' - 'VERSION=8.0 GODOG_FEATURE=CLI.feature make test' - 'VERSION=8.0 GODOG_FEATURE=crash_recovery.feature make test' @@ -57,6 +58,7 @@ jobs: - 'VERSION=8.0 GODOG_FEATURE=readonly_filesystem.feature make test' - 'VERSION=8.0 GODOG_FEATURE=recovery.feature make test' - 'VERSION=8.0 GODOG_FEATURE=repair.feature make test' + - 'VERSION=8.0 GODOG_FEATURE=repl_mon.feature make test' - 'VERSION=8.0 GODOG_FEATURE=statefile.feature make test' - 'VERSION=8.0 GODOG_FEATURE=switchover_from.feature make test' - 'VERSION=8.0 GODOG_FEATURE=switchover_to.feature make test' diff --git a/.github/workflows/docker-tests.yml b/.github/workflows/docker-tests.yml index c7ab8257..db70c05a 100644 --- a/.github/workflows/docker-tests.yml +++ b/.github/workflows/docker-tests.yml @@ -41,8 +41,8 @@ jobs: matrix: command: - 'GODOG_FEATURE=active_nodes.feature make test' - - 'GODOG_FEATURE=async.feature make test' - - 'GODOG_FEATURE=cascade_replicas.feature make test' + - 'GODOG_FEATURE=async.feature make test' + - 'GODOG_FEATURE=cascade_replicas.feature make test' - 'GODOG_FEATURE=CLI.feature make test' - 'GODOG_FEATURE=crash_recovery.feature make test' - 'GODOG_FEATURE=events_reenable.feature make test' @@ -55,8 +55,9 @@ jobs: - 'GODOG_FEATURE=priority.feature make test' - 'GODOG_FEATURE=readonly_filesystem.feature make test' - 'GODOG_FEATURE=recovery.feature make test' - - 'GODOG_FEATURE=repair.feature make test' - - 'GODOG_FEATURE=statefile.feature make test' + - 'GODOG_FEATURE=repair.feature make test' + - 'GODOG_FEATURE=repl_mon.feature make test' + - 'GODOG_FEATURE=statefile.feature make test' - 'GODOG_FEATURE=switchover_from.feature make test' - 'GODOG_FEATURE=switchover_to.feature make test' - 'GODOG_FEATURE=zk_failure.feature make test' diff --git a/internal/app/app.go b/internal/app/app.go index 2fcb6948..b71a199b 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -38,6 +38,7 @@ type App struct { daemonMutex sync.Mutex replRepairState map[string]*ReplicationRepairState externalReplication mysql.IExternalReplication + switchHelper mysql.ISwitchHelper } // NewApp returns new App. Suddenly. @@ -62,6 +63,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) { if err != nil { return nil, err } + switchHelper := mysql.NewSwitchHelper(config) app := &App{ state: stateFirstRun, config: config, @@ -70,6 +72,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) { streamFromFailedAt: make(map[string]time.Time), replRepairState: make(map[string]*ReplicationRepairState), externalReplication: externalReplication, + switchHelper: switchHelper, } return app, nil } @@ -256,6 +259,51 @@ func (app *App) externalCAFileChecker(ctx context.Context) { } } +func (app *App) replMonWriter(ctx context.Context) { + ticker := time.NewTicker(app.config.ReplMonWriteInterval) + for { + select { + case <-ticker.C: + localNode := app.cluster.Local() + sstatus, err := localNode.GetReplicaStatus() + if err != nil { + app.logger.Errorf("repl mon writer: got error %v while checking replica status", err) + time.Sleep(app.config.ReplMonErrorWaitInterval) + continue + } + if sstatus != nil { + app.logger.Infof("repl mon writer: host is replica") + time.Sleep(app.config.ReplMonSlaveWaitInterval) + continue + } + readOnly, _, err := localNode.IsReadOnly() + if err != nil { + app.logger.Errorf("repl mon writer: got error %v while checking read only status", err) + time.Sleep(app.config.ReplMonErrorWaitInterval) + continue + } + if readOnly { + app.logger.Infof("repl mon writer: host is read only") + time.Sleep(app.config.ReplMonSlaveWaitInterval) + continue + } + err = localNode.UpdateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName) + if err != nil { + if mysql.IsErrorTableDoesNotExists(err) { + err = localNode.CreateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName) + if err != nil { + app.logger.Errorf("repl mon writer: got error %v while creating repl mon table", err) + } + continue + } + app.logger.Errorf("repl mon writer: got error %v while writing in repl mon table", err) + } + case <-ctx.Done(): + return + } + } +} + func (app *App) SetResetupStatus() { err := app.setResetupStatus(app.cluster.Local().Host(), app.doesResetupFileExist()) if err != nil { @@ -710,6 +758,11 @@ func (app *App) stateManager() appState { app.logger.Errorf("failed to update active nodes in dcs: %v", err) } + err = app.updateReplMonTS(master) + if err != nil { + app.logger.Errorf("failed to update repl_mon timestamp: %v", err) + } + return stateManager } @@ -1246,7 +1299,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode } else if switchover.From != "" { positions2 := filterOutNodeFromPositions(positions, switchover.From) // we ignore splitbrain flag as it should be handled during searching most recent host - newMaster, err = getMostDesirableNode(app.logger, positions2, app.config.PriorityChoiceMaxLag) + newMaster, err = getMostDesirableNode(app.logger, positions2, app.switchHelper.GetPriorityChoiceMaxLag()) if err != nil { return fmt.Errorf("switchover: error while looking for highest priority node: %s", switchover.From) } @@ -1297,6 +1350,10 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode // turn slaves to the new master app.logger.Info("switchover: phase 5: turn to the new master") + err = app.cluster.Get(newMaster).SetOnline() + if err != nil { + return fmt.Errorf("got error on setting new master %s online %v", newMaster, err) + } errs = util.RunParallel(func(host string) error { if host == newMaster || !clusterState[host].PingOk { return nil @@ -2145,9 +2202,13 @@ func (app *App) waitForCatchUp(node *mysql.Node, gtidset gtids.GTIDSet, timeout if gtidExecuted.Contain(gtidset) { return true, nil } - if app.dcs.Get(pathCurrentSwitch, new(Switchover)) == dcs.ErrNotFound { + switchover := new(Switchover) + if app.dcs.Get(pathCurrentSwitch, switchover) == dcs.ErrNotFound { return false, nil } + if app.CheckAsyncSwitchAllowed(node, switchover) { + return true, nil + } time.Sleep(sleep) if time.Now().After(deadline) { break @@ -2288,6 +2349,9 @@ func (app *App) Run() int { if app.config.ExternalReplicationType != util.Disabled { go app.externalCAFileChecker(ctx) } + if app.config.ReplMon { + go app.replMonWriter(ctx) + } handlers := map[appState](func() appState){ stateFirstRun: app.stateFirstRun, diff --git a/internal/app/app_dcs.go b/internal/app/app_dcs.go index df8c07e8..ecfccff6 100644 --- a/internal/app/app_dcs.go +++ b/internal/app/app_dcs.go @@ -220,3 +220,24 @@ func (app *App) GetMasterHostFromDcs() (string, error) { } return "", nil } + +func (app *App) SetReplMonTS(ts string) error { + err := app.dcs.Create(pathMasterReplMonTS, ts) + if err != nil && err != dcs.ErrExists { + return err + } + err = app.dcs.Set(pathMasterReplMonTS, ts) + if err != nil { + return err + } + return nil +} + +func (app *App) GetReplMonTS() (string, error) { + var ts string + err := app.dcs.Get(pathMasterReplMonTS, &ts) + if errors.Is(err, dcs.ErrNotFound) { + return "", nil + } + return ts, err +} diff --git a/internal/app/async.go b/internal/app/async.go new file mode 100644 index 00000000..43dc2bd8 --- /dev/null +++ b/internal/app/async.go @@ -0,0 +1,38 @@ +package app + +import ( + "fmt" + + "github.com/yandex/mysync/internal/mysql" +) + +func (app *App) CheckAsyncSwitchAllowed(node *mysql.Node, switchover *Switchover) bool { + if app.config.ASync && switchover.Cause == CauseAuto && app.config.AsyncAllowedLag > 0 { + app.logger.Infof("async mode is active and this is auto switch so we checking new master delay") + ts, err := app.GetReplMonTS() + if err != nil { + app.logger.Errorf("failed to get mdb repl mon ts: %v", err) + return false + } + delay, err := node.CalcReplMonTSDelay(app.config.ReplMonSchemeName, app.config.ReplMonTableName, ts) + if err != nil { + app.logger.Errorf("failed to calc mdb repl mon ts: %v", err) + return false + } + if delay < app.config.AsyncAllowedLag { + app.logger.Infof("async allowed lag is %d and current lag on host %s is %d, so we don't wait for catch up any more", + app.config.AsyncAllowedLag, node.Host(), delay) + return true + } + } + return false +} + +func (app *App) updateReplMonTS(master string) error { + masterNode := app.cluster.Get(master) + ts, err := masterNode.GetReplMonTS(app.config.ReplMonSchemeName, app.config.ReplMonTableName) + if err != nil { + return fmt.Errorf("failed to get master repl_mon timestamp: %v", err) + } + return app.SetReplMonTS(ts) +} diff --git a/internal/app/cli.go b/internal/app/cli.go index 6b107ffe..6756f73f 100644 --- a/internal/app/cli.go +++ b/internal/app/cli.go @@ -280,7 +280,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration app.logger.Errorf(err.Error()) return 1 } - toHost, err = getMostDesirableNode(app.logger, positions, app.config.PriorityChoiceMaxLag) + toHost, err = getMostDesirableNode(app.logger, positions, app.switchHelper.GetPriorityChoiceMaxLag()) if err != nil { app.logger.Errorf(err.Error()) return 1 diff --git a/internal/app/data.go b/internal/app/data.go index 86b98982..30db8d10 100644 --- a/internal/app/data.go +++ b/internal/app/data.go @@ -67,6 +67,9 @@ const ( pathResetupStatus = "resetup_status" pathLastShutdownNodeTime = "last_shutdown_node_time" + + // last known timestamp from repl_mon table + pathMasterReplMonTS = "master_repl_mon_ts" ) var ( diff --git a/internal/config/config.go b/internal/config/config.go index 2e9c2168..b27bc3c6 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -88,6 +88,14 @@ type Config struct { ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"` ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"` ExternalReplicationType util.ExternalReplicationType `config:"external_replication_type" yaml:"external_replication_type"` + ASync bool `config:"async" yaml:"async"` + AsyncAllowedLag int64 `config:"async_allowed_lag" yaml:"async_allowed_lag"` + ReplMon bool `config:"repl_mon" yaml:"repl_mon"` + ReplMonSchemeName string `config:"repl_mon_scheme_name" yaml:"repl_mon_scheme_name"` + ReplMonTableName string `config:"repl_mon_table_name" yaml:"repl_mon_table_name"` + ReplMonWriteInterval time.Duration `config:"repl_mon_write_interval" yaml:"repl_mon_write_interval"` + ReplMonErrorWaitInterval time.Duration `config:"repl_mon_error_wait_interval" yaml:"repl_mon_error_wait_interval"` + ReplMonSlaveWaitInterval time.Duration `config:"repl_mon_slave_wait_interval" yaml:"repl_mon_slave_wait_interval"` } // DefaultConfig returns default configuration for MySync @@ -164,6 +172,14 @@ func DefaultConfig() (Config, error) { ReplicationChannel: "", ExternalReplicationChannel: "external", ExternalReplicationType: util.Disabled, + ASync: false, + AsyncAllowedLag: 0, + ReplMon: false, + ReplMonSchemeName: "mysql", + ReplMonTableName: "mysync_repl_mon", + ReplMonWriteInterval: 1 * time.Second, + ReplMonErrorWaitInterval: 10 * time.Second, + ReplMonSlaveWaitInterval: 10 * time.Second, } return config, nil } @@ -205,5 +221,8 @@ func (cfg *Config) Validate() error { if cfg.NotCriticalDiskUsage > cfg.CriticalDiskUsage { return fmt.Errorf("not_critical_disk_usage should be <= critical_disk_usage") } + if cfg.SemiSync && cfg.ASync { + return fmt.Errorf("can't run in both semisync and async mode") + } return nil } diff --git a/internal/mysql/data.go b/internal/mysql/data.go index 854a5e06..c4a4c9ce 100644 --- a/internal/mysql/data.go +++ b/internal/mysql/data.go @@ -292,6 +292,14 @@ type Version struct { PatchVersion int `db:"PatchVersion"` } +type ReplMonTS struct { + Timestamp string `db:"ts"` +} + +type ReplMonTSDelay struct { + Delay int64 `db:"delay"` +} + const ( Version80Major = 8 Version80Minor = 0 diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 642095d7..4475e56a 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -344,6 +344,10 @@ func (n *Node) queryRowMogrifyWithTimeout(queryName string, arg map[string]inter return err } +func (n *Node) queryRowMogrify(queryName string, arg map[string]interface{}, result interface{}) error { + return n.queryRowMogrifyWithTimeout(queryName, arg, result, n.config.DBTimeout) +} + // IsRunning checks if daemon process is running func (n *Node) IsRunning() (bool, error) { if !n.IsLocal() { @@ -1013,3 +1017,40 @@ func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error { } return nil } + +func (n *Node) GetReplMonTS(replMonSchemeName string, replMonTable string) (string, error) { + result := new(ReplMonTS) + err := n.queryRowMogrify(queryGetReplMonTS, + map[string]interface{}{ + "replMonSchemeName": schemaname(replMonSchemeName), + "replMonTable": schemaname(replMonTable), + }, + result) + return result.Timestamp, err +} + +func (n *Node) CalcReplMonTSDelay(replMonSchemeName string, replMonTable string, ts string) (int64, error) { + result := new(ReplMonTSDelay) + err := n.queryRowMogrify(queryCalcReplMonTSDelay, + map[string]interface{}{ + "ts": ts, + "replMonSchemeName": schemaname(replMonSchemeName), + "replMonTable": schemaname(replMonTable), + }, + result) + return result.Delay, err +} + +func (n *Node) CreateReplMonTable(replMonSchemeName string, replMonTable string) error { + return n.execMogrify(queryCreateReplMonTable, map[string]interface{}{ + "replMonSchemeName": schemaname(replMonSchemeName), + "replMonTable": schemaname(replMonTable), + }) +} + +func (n *Node) UpdateReplMonTable(replMonSchemeName string, replMonTable string) error { + return n.execMogrify(queryUpdateReplMon, map[string]interface{}{ + "replMonSchemeName": schemaname(replMonSchemeName), + "replMonTable": schemaname(replMonTable), + }) +} diff --git a/internal/mysql/queries.go b/internal/mysql/queries.go index f5c596ba..d2f3cafc 100644 --- a/internal/mysql/queries.go +++ b/internal/mysql/queries.go @@ -46,6 +46,10 @@ const ( querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit" querySetSyncBinlog = "set_sync_binlog" queryGetReplicationSettings = "get_replication_settings" + queryGetReplMonTS = "get_repl_mon_ts" + queryCalcReplMonTSDelay = "calc_repl_mon_ts_delay" + queryCreateReplMonTable = "create_repl_mon_table" + queryUpdateReplMon = "update_repl_mon" ) var DefaultQueries = map[string]string{ @@ -125,4 +129,17 @@ var DefaultQueries = map[string]string{ querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`, queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`, querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`, + queryGetReplMonTS: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM :replMonSchemeName.:replMonTable`, + queryCalcReplMonTSDelay: `SELECT FLOOR(CAST(:ts AS DECIMAL(20,3)) - UNIX_TIMESTAMP(ts)) AS delay FROM :replMonSchemeName.:replMonTable`, + queryCreateReplMonTable: `CREATE TABLE IF NOT EXISTS :replMonSchemeName.:replMonTable( + id INT NOT NULL PRIMARY KEY, + ts TIMESTAMP(3) + ) + ENGINE=INNODB`, + queryUpdateReplMon: `INSERT INTO :replMonSchemeName.:replMonTable(id, ts) + ( + SELECT 1, CURRENT_TIMESTAMP(3) + WHERE @@read_only = 0 + ) + ON DUPLICATE KEY UPDATE ts = CURRENT_TIMESTAMP(3)`, } diff --git a/internal/mysql/switch_helper.go b/internal/mysql/switch_helper.go new file mode 100644 index 00000000..d6d342ba --- /dev/null +++ b/internal/mysql/switch_helper.go @@ -0,0 +1,32 @@ +package mysql + +import ( + "time" + + "github.com/yandex/mysync/internal/config" +) + +type ISwitchHelper interface { + GetPriorityChoiceMaxLag() time.Duration +} + +type SwitchHelper struct { + priorityChoiceMaxLag time.Duration +} + +func NewSwitchHelper(config *config.Config) ISwitchHelper { + priorityChoiceMaxLag := config.PriorityChoiceMaxLag + if config.ASync { + AsyncAllowedLagTime := time.Duration(config.AsyncAllowedLag) * time.Second + if AsyncAllowedLagTime > config.PriorityChoiceMaxLag { + priorityChoiceMaxLag = AsyncAllowedLagTime + } + } + return &SwitchHelper{ + priorityChoiceMaxLag: priorityChoiceMaxLag, + } +} + +func (sh *SwitchHelper) GetPriorityChoiceMaxLag() time.Duration { + return sh.priorityChoiceMaxLag +} diff --git a/tests/features/async_setting.feature b/tests/features/async_setting.feature new file mode 100644 index 00000000..a37c8855 --- /dev/null +++ b/tests/features/async_setting.feature @@ -0,0 +1,407 @@ +Feature: mysync async mode tests + + Scenario: failover with lag less then allowed + Given cluster environment is + """ + MYSYNC_SEMISYNC=false + MYSYNC_ASYNC=true + ASYNC_ALLOWED_LAG=120 + MYSYNC_REPLICATION_LAG_QUERY="SELECT UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) - UNIX_TIMESTAMP(ts) AS Seconds_Behind_Master FROM mysql.mysync_repl_mon" + MYSYNC_FAILOVER=true + MYSYNC_FAILOVER_DELAY=0s + MYSYNC_FAILOVER_COOLDOWN=0s + REPL_MON=true + """ + Given cluster is up and running + When I wait for "10" seconds + Then zookeeper node "/test/active_nodes" should match json_exactly within "20" seconds + """ + ["mysql1","mysql2","mysql3"] + """ + And mysql host "mysql1" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql1" should have variable "rpl_semi_sync_slave_enabled" set to "0" + And mysql host "mysql2" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql2" should have variable "rpl_semi_sync_slave_enabled" set to "0" + And mysql host "mysql3" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql3" should have variable "rpl_semi_sync_slave_enabled" set to "0" + + And I wait for "2" seconds + And I run SQL on mysql host "mysql1" + """ + CREATE TABLE IF NOT EXISTS mysql.test_table1 ( + value VARCHAR(30) + ) + """ + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.test_table1 VALUES ("A"), ("B"), ("C") + """ + And I run SQL on mysql host "mysql2" + """ + STOP REPLICA FOR CHANNEL ''; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 90; + START REPLICA FOR CHANNEL ''; + """ + And I run SQL on mysql host "mysql3" + """ + STOP REPLICA FOR CHANNEL ''; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 120; + START REPLICA FOR CHANNEL ''; + """ + And I wait for "150" seconds + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F") + """ + And I wait for "40" seconds + When I run SQL on mysql host "mysql2" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ + When host "mysql1" is stopped + Then mysql host "mysql1" should become unavailable within "10" seconds + Then zookeeper node "/test/manager" should match regexp within "10" seconds + """ + .*mysql[23].* + """ + Then zookeeper node "/test/last_switch" should match json within "40" seconds + """ + { + "cause": "auto", + "from": "mysql1", + "result": { + "ok": true + } + } + """ + And I wait for "2" seconds + When I run SQL on mysql host "mysql2" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ + When I run SQL on mysql host "mysql3" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ + + Scenario: failover with lag greater then allowed + Given cluster environment is + """ + MYSYNC_SEMISYNC=false + MYSYNC_ASYNC=true + ASYNC_ALLOWED_LAG=60 + MYSYNC_REPLICATION_LAG_QUERY="SELECT UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) - UNIX_TIMESTAMP(ts) AS Seconds_Behind_Master FROM mysql.mysync_repl_mon" + MYSYNC_FAILOVER=true + MYSYNC_FAILOVER_DELAY=0s + MYSYNC_FAILOVER_COOLDOWN=0s + REPL_MON=true + """ + Given cluster is up and running + When I wait for "10" seconds + Then zookeeper node "/test/active_nodes" should match json_exactly within "20" seconds + """ + ["mysql1","mysql2","mysql3"] + """ + And mysql host "mysql1" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql1" should have variable "rpl_semi_sync_slave_enabled" set to "0" + And mysql host "mysql2" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql2" should have variable "rpl_semi_sync_slave_enabled" set to "0" + And mysql host "mysql3" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql3" should have variable "rpl_semi_sync_slave_enabled" set to "0" + + And I wait for "2" seconds + And I run SQL on mysql host "mysql1" + """ + CREATE TABLE IF NOT EXISTS mysql.test_table1 ( + value VARCHAR(30) + ) + """ + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.test_table1 VALUES ("A"), ("B"), ("C") + """ + And I run SQL on mysql host "mysql2" + """ + STOP REPLICA FOR CHANNEL ''; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 90; + START REPLICA FOR CHANNEL ''; + """ + And I run SQL on mysql host "mysql3" + """ + STOP REPLICA FOR CHANNEL ''; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 110; + START REPLICA FOR CHANNEL ''; + """ + And I wait for "120" seconds + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F") + """ + And I wait for "70" seconds + When I run SQL on mysql host "mysql2" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ + When host "mysql1" is stopped + Then mysql host "mysql1" should become unavailable within "10" seconds + Then zookeeper node "/test/manager" should match regexp within "10" seconds + """ + .*mysql[23].* + """ + Then zookeeper node "/test/last_switch" should match json within "90" seconds + """ + { + "cause": "auto", + "from": "mysql1", + "result": { + "ok": true + } + } + """ + And I wait for "2" seconds + When I run SQL on mysql host "mysql2" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C,D,E,F"}] + """ + When I run SQL on mysql host "mysql3" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ + And I wait for "150" seconds + When I run SQL on mysql host "mysql3" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C,D,E,F"}] + """ + + Scenario: manual switchover ignores async + Given cluster environment is + """ + MYSYNC_SEMISYNC=false + MYSYNC_ASYNC=true + ASYNC_ALLOWED_LAG=120 + MYSYNC_REPLICATION_LAG_QUERY="SELECT UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) - UNIX_TIMESTAMP(ts) AS Seconds_Behind_Master FROM mysql.mysync_repl_mon" + MYSYNC_FAILOVER=true + MYSYNC_FAILOVER_DELAY=0s + MYSYNC_FAILOVER_COOLDOWN=0s + REPL_MON=true + """ + Given cluster is up and running + When I wait for "10" seconds + Then zookeeper node "/test/active_nodes" should match json_exactly within "20" seconds + """ + ["mysql1","mysql2","mysql3"] + """ + And mysql host "mysql1" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql1" should have variable "rpl_semi_sync_slave_enabled" set to "0" + And mysql host "mysql2" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql2" should have variable "rpl_semi_sync_slave_enabled" set to "0" + And mysql host "mysql3" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql3" should have variable "rpl_semi_sync_slave_enabled" set to "0" + + And I wait for "2" seconds + And I run SQL on mysql host "mysql1" + """ + CREATE TABLE IF NOT EXISTS mysql.test_table1 ( + value VARCHAR(30) + ) + """ + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.test_table1 VALUES ("A"), ("B"), ("C") + """ + And I run SQL on mysql host "mysql2" + """ + STOP REPLICA FOR CHANNEL ''; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 90; + START REPLICA FOR CHANNEL ''; + """ + And I run SQL on mysql host "mysql3" + """ + STOP REPLICA FOR CHANNEL ''; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 150; + START REPLICA FOR CHANNEL ''; + """ + And I wait for "180" seconds + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F") + """ + And I wait for "2" seconds + When I run SQL on mysql host "mysql2" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ + When I run command on host "mysql1" + """ + mysync switch --from mysql1 --wait=0s + """ + Then command return code should be "0" + And command output should match regexp + """ + switchover scheduled + """ + And zookeeper node "/test/switch" should match json + """ + { + "from": "mysql1" + } + """ + Then zookeeper node "/test/last_switch" should match json within "100" seconds + """ + { + "from": "mysql1", + "to": "", + "cause": "manual", + "initiated_by": "mysql1", + "result": { + "ok": true + } + } + """ + And I wait for "2" seconds + When I run SQL on mysql host "mysql2" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C,D,E,F"}] + """ + When I wait for "180" seconds + And I run SQL on mysql host "mysql3" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C,D,E,F"}] + """ + + Scenario: failover with lag less then allowed and less then default PriorityChoiceMaxLag + Given cluster environment is + """ + MYSYNC_SEMISYNC=false + MYSYNC_ASYNC=true + ASYNC_ALLOWED_LAG=50 + MYSYNC_REPLICATION_LAG_QUERY="SELECT UNIX_TIMESTAMP(CURRENT_TIMESTAMP(3)) - UNIX_TIMESTAMP(ts) AS Seconds_Behind_Master FROM mysql.mysync_repl_mon" + MYSYNC_FAILOVER=true + MYSYNC_FAILOVER_DELAY=0s + MYSYNC_FAILOVER_COOLDOWN=0s + REPL_MON=true + """ + Given cluster is up and running + When I wait for "10" seconds + Then zookeeper node "/test/active_nodes" should match json_exactly within "20" seconds + """ + ["mysql1","mysql2","mysql3"] + """ + And mysql host "mysql1" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql1" should have variable "rpl_semi_sync_slave_enabled" set to "0" + And mysql host "mysql2" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql2" should have variable "rpl_semi_sync_slave_enabled" set to "0" + And mysql host "mysql3" should have variable "rpl_semi_sync_master_enabled" set to "0" + And mysql host "mysql3" should have variable "rpl_semi_sync_slave_enabled" set to "0" + + And I wait for "2" seconds + And I run SQL on mysql host "mysql1" + """ + CREATE TABLE IF NOT EXISTS mysql.test_table1 ( + value VARCHAR(30) + ) + """ + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.test_table1 VALUES ("A"), ("B"), ("C") + """ + And I run SQL on mysql host "mysql2" + """ + STOP REPLICA FOR CHANNEL ''; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 50; + START REPLICA FOR CHANNEL ''; + """ + And I run SQL on mysql host "mysql3" + """ + STOP REPLICA FOR CHANNEL ''; + CHANGE REPLICATION SOURCE TO SOURCE_DELAY = 80; + START REPLICA FOR CHANNEL ''; + """ + And I wait for "100" seconds + And I run SQL on mysql host "mysql1" + """ + INSERT INTO mysql.test_table1 VALUES ("D"), ("E"), ("F") + """ + And I wait for "2" seconds + When I run SQL on mysql host "mysql2" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ + When host "mysql1" is stopped + Then mysql host "mysql1" should become unavailable within "10" seconds + Then zookeeper node "/test/manager" should match regexp within "10" seconds + """ + .*mysql[23].* + """ + Then zookeeper node "/test/last_switch" should match json within "40" seconds + """ + { + "cause": "auto", + "from": "mysql1", + "result": { + "ok": true + } + } + """ + And I wait for "2" seconds + When I run SQL on mysql host "mysql2" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ + When I wait for "100" seconds + And I run SQL on mysql host "mysql3" + """ + SELECT GROUP_CONCAT(value) as val from (SELECT value from mysql.test_table1 order by value) as t + """ + Then SQL result should match json + """ + [{"val":"A,B,C"}] + """ \ No newline at end of file diff --git a/tests/features/repl_mon.feature b/tests/features/repl_mon.feature new file mode 100644 index 00000000..bb941820 --- /dev/null +++ b/tests/features/repl_mon.feature @@ -0,0 +1,53 @@ +Feature: repl_mon tests + + Scenario: repl_mon enabled + Given cluster environment is + """ + REPL_MON=true + """ + Given cluster is up and running + When I wait for "10" seconds + Then zookeeper node "/test/active_nodes" should match json_exactly within "20" seconds + """ + ["mysql1","mysql2","mysql3"] + """ + And I wait for "5" seconds + And I run SQL on mysql host "mysql1" expecting error on number "1050" + """ + CREATE TABLE mysql.mysync_repl_mon( + ts TIMESTAMP(3) + ) ENGINE=INNODB; + """ + And I run SQL on mysql host "mysql1" + """ + SELECT (CURRENT_TIMESTAMP(3) - ts) < 2 as res FROM mysql.mysync_repl_mon + """ + Then SQL result should match json + """ + [{"res":1}] + """ + + Scenario: repl_mon disabled + Given cluster environment is + """ + REPL_MON=false + """ + Given cluster is up and running + When I wait for "10" seconds + Then zookeeper node "/test/active_nodes" should match json_exactly within "20" seconds + """ + ["mysql1","mysql2","mysql3"] + """ + And I wait for "5" seconds + And I run SQL on mysql host "mysql1" expecting error on number "1146" + """ + SELECT ts FROM mysql.mysync_repl_mon + """ + And I run SQL on mysql host "mysql2" expecting error on number "1146" + """ + SELECT ts FROM mysql.mysync_repl_mon + """ + And I run SQL on mysql host "mysql3" expecting error on number "1146" + """ + SELECT ts FROM mysql.mysync_repl_mon + """ diff --git a/tests/images/docker-compose.yaml b/tests/images/docker-compose.yaml index d6a15769..b47f1fb0 100644 --- a/tests/images/docker-compose.yaml +++ b/tests/images/docker-compose.yaml @@ -86,6 +86,8 @@ services: MYSQL_PORT: 3306 MYSYNC_DISABLE_REPLICATION_ON_MAINT: "true" MYSYNC_SEMISYNC: + MYSYNC_ASYNC: + ASYNC_ALLOWED_LAG: MYSYNC_CRITICAL_DISK_USAGE: MYSYNC_KEEP_SUPER_WRITABLE_ON_CRITICAL_DISK_USAGE: MYSYNC_WAIT_FOR_SLAVE_COUNT: @@ -93,6 +95,8 @@ services: MYSYNC_RESETUP_CRASHED_HOSTS: MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE: MYSYNC_SET_RO_TIMEOUT: + MYSYNC_REPLICATION_LAG_QUERY: + REPL_MON: healthcheck: test: "mysql --user=admin --password=admin_pwd -e 'SELECT 1'" start_period: 30s @@ -128,6 +132,8 @@ services: MYSQL_PORT: 3306 MYSYNC_DISABLE_REPLICATION_ON_MAINT: "true" MYSYNC_SEMISYNC: + MYSYNC_ASYNC: + ASYNC_ALLOWED_LAG: MYSYNC_CRITICAL_DISK_USAGE: MYSYNC_KEEP_SUPER_WRITABLE_ON_CRITICAL_DISK_USAGE: MYSYNC_WAIT_FOR_SLAVE_COUNT: @@ -135,6 +141,8 @@ services: MYSYNC_RESETUP_CRASHED_HOSTS: MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE: MYSYNC_SET_RO_TIMEOUT: + MYSYNC_REPLICATION_LAG_QUERY: + REPL_MON: depends_on: mysql1: condition: service_healthy @@ -164,6 +172,8 @@ services: MYSQL_PORT: 3306 MYSYNC_DISABLE_REPLICATION_ON_MAINT: "true" MYSYNC_SEMISYNC: + MYSYNC_ASYNC: + ASYNC_ALLOWED_LAG: MYSYNC_CRITICAL_DISK_USAGE: MYSYNC_KEEP_SUPER_WRITABLE_ON_CRITICAL_DISK_USAGE: MYSYNC_WAIT_FOR_SLAVE_COUNT: @@ -171,6 +181,8 @@ services: MYSYNC_RESETUP_CRASHED_HOSTS: MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE: MYSYNC_SET_RO_TIMEOUT: + MYSYNC_REPLICATION_LAG_QUERY: + REPL_MON: depends_on: mysql1: condition: service_healthy diff --git a/tests/images/mysql/mysync.yaml b/tests/images/mysql/mysync.yaml index 6fac222d..96a126d5 100644 --- a/tests/images/mysql/mysync.yaml +++ b/tests/images/mysql/mysync.yaml @@ -10,6 +10,8 @@ failover_cooldown: ${MYSYNC_FAILOVER_COOLDOWN:-60m} failover_delay: ${MYSYNC_FAILOVER_DELAY:-0s} inactivation_delay: ${MYSYNC_INACTIVATION_DELAY:-5s} semi_sync: ${MYSYNC_SEMISYNC:-true} +async: ${MYSYNC_ASYNC:-false} +async_allowed_lag: ${ASYNC_ALLOWED_LAG:-0} resetupfile: /tmp/mysync.resetup resetup_crashed_hosts: ${MYSYNC_RESETUP_CRASHED_HOSTS:-false} zookeeper: @@ -33,6 +35,8 @@ mysql: external_replication_ssl_ca: /etc/mysql/ssl/external_CA.pem pid_file: /tmp/mysqld.pid error_log: /var/log/mysql/error.log +queries: + replication_lag: $MYSYNC_REPLICATION_LAG_QUERY disable_semi_sync_replication_on_maintenance: ${MYSYNC_DISABLE_REPLICATION_ON_MAINT:-false} rpl_semi_sync_master_wait_for_slave_count: ${MYSYNC_WAIT_FOR_SLAVE_COUNT:-1} critical_disk_usage: ${MYSYNC_CRITICAL_DISK_USAGE:-100} @@ -56,3 +60,4 @@ replication_repair_aggressive_mode: ${MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE: test_filesystem_readonly_file: /tmp/readonly replication_channel: '' external_replication_type: 'external' +repl_mon: ${REPL_MON:-false} diff --git a/tests/mysync_test.go b/tests/mysync_test.go index 84b26958..64184aec 100644 --- a/tests/mysync_test.go +++ b/tests/mysync_test.go @@ -826,6 +826,10 @@ func (tctx *testContext) stepIRunSQLOnHostExpectingErrorOfNumber(host string, er query := strings.TrimSpace(body.Content) _, err := tctx.queryMysql(host, query, struct{}{}) mysqlErr, ok := err.(*mysql.MySQLError) + if mysqlErr == nil { + err = fmt.Errorf("there is no expected sql error") + return err + } if !ok { return err } diff --git a/tests/testutil/docker_composer.go b/tests/testutil/docker_composer.go index 94635e1f..cc4ddca8 100644 --- a/tests/testutil/docker_composer.go +++ b/tests/testutil/docker_composer.go @@ -99,7 +99,7 @@ func (dc *DockerComposer) runCompose(args []string, env []string) error { cmd.Env = append(os.Environ(), env...) out, err := cmd.CombinedOutput() if err != nil { - return fmt.Errorf("failed to run 'docker compose %s': %s\n%s", strings.Join(args2, " "), err, out) + return fmt.Errorf("failed to run 'docker %s': %s\n%s", strings.Join(args2, " "), err, out) } return nil }