From 55dd18bf1e5daed187dd4358356ca1081bbd7554 Mon Sep 17 00:00:00 2001 From: suetin Date: Thu, 14 Mar 2024 17:15:55 +0300 Subject: [PATCH] support async replication --- internal/app/app.go | 34 +++++++++++++++++++++++++++++++++- internal/app/app_dcs.go | 21 +++++++++++++++++++++ internal/app/data.go | 3 +++ internal/config/config.go | 7 +++++++ internal/mysql/data.go | 8 ++++++++ internal/mysql/node.go | 12 ++++++++++++ internal/mysql/queries.go | 4 ++++ 7 files changed, 88 insertions(+), 1 deletion(-) diff --git a/internal/app/app.go b/internal/app/app.go index 66f29f17..0cba1c1c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -700,6 +700,11 @@ func (app *App) stateManager() appState { app.logger.Errorf("failed to update active nodes in dcs: %v", err) } + err = app.updateMdbReplMonTs(master) + if err != nil { + app.logger.Errorf("failed to update mdb_repl_mon timestamp: %v", err) + } + return stateManager } @@ -2118,9 +2123,27 @@ func (app *App) waitForCatchUp(node *mysql.Node, gtidset gtids.GTIDSet, timeout if gtidExecuted.Contain(gtidset) { return true, nil } - if app.dcs.Get(pathCurrentSwitch, new(Switchover)) == dcs.ErrNotFound { + switchover := new(Switchover) + if app.dcs.Get(pathCurrentSwitch, switchover) == dcs.ErrNotFound { return false, nil } + if app.config.ASync && switchover.Cause == CauseAuto { + ts, err := app.GetMdbReplMonTs() + if err != nil { + app.logger.Errorf("failed to get mdb repl mon ts: %v", err) + continue + } + delay, err := node.CalcMdbReplMonTsDelay(ts) + if err != nil { + app.logger.Errorf("failed to calc mdb repl mon ts: %v", err) + continue + } + if delay < app.config.ASyncAllowedLag { + app.logger.Infof("async allowed lag is %s and current lag on host %s is %s, so we don't wait for catch up any more", + app.config.ASyncAllowedLag, node.Host(), delay) + break + } + } time.Sleep(sleep) if time.Now().After(deadline) { break @@ -2224,6 +2247,15 @@ func (app *App) getNodePositions(activeNodes []string) ([]nodePosition, error) { return positions, util.CombineErrors(errs) } +func (app *App) updateMdbReplMonTs(master string) error { + masterNode := app.cluster.Get(master) + ts, err := masterNode.GetMdbReplMonTs() + if err != nil { + return fmt.Errorf("failed to get master mdb_repl_mon timestamp: %v", err) + } + return app.SetMdbReplMonTs(ts) +} + /* Run enters the main application loop When Run exits mysync process is over diff --git a/internal/app/app_dcs.go b/internal/app/app_dcs.go index 291c9fc2..0fa325fe 100644 --- a/internal/app/app_dcs.go +++ b/internal/app/app_dcs.go @@ -228,3 +228,24 @@ func (app *App) GetMasterHostFromDcs() (string, error) { } return "", nil } + +func (app *App) SetMdbReplMonTs(ts string) error { + err := app.dcs.Create(pathMasterReplMonTs, ts) + if err != nil && err != dcs.ErrExists { + return err + } + err = app.dcs.Set(pathMasterReplMonTs, ts) + if err != nil { + return err + } + return nil +} + +func (app *App) GetMdbReplMonTs() (string, error) { + var ts string + err := app.dcs.Get(pathMasterReplMonTs, &ts) + if errors.Is(err, dcs.ErrNotFound) { + return "", nil + } + return ts, err +} \ No newline at end of file diff --git a/internal/app/data.go b/internal/app/data.go index 86b98982..99a27856 100644 --- a/internal/app/data.go +++ b/internal/app/data.go @@ -67,6 +67,9 @@ const ( pathResetupStatus = "resetup_status" pathLastShutdownNodeTime = "last_shutdown_node_time" + + // last known timestamp from mysql.mdb_repl_mon + pathMasterReplMonTs = "master_repl_mon_ts" ) var ( diff --git a/internal/config/config.go b/internal/config/config.go index 2e9c2168..86081d29 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -88,6 +88,8 @@ type Config struct { ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"` ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"` ExternalReplicationType util.ExternalReplicationType `config:"external_replication_type" yaml:"external_replication_type"` + ASync bool `config:"a_sync" yaml:"a_sync"` + ASyncAllowedLag int64 `config:"a_sync_allowed_lag" yaml:"a_sync_allowed_lag"` } // DefaultConfig returns default configuration for MySync @@ -164,6 +166,8 @@ func DefaultConfig() (Config, error) { ReplicationChannel: "", ExternalReplicationChannel: "external", ExternalReplicationType: util.Disabled, + ASync: false, + ASyncAllowedLag: 0, } return config, nil } @@ -205,5 +209,8 @@ func (cfg *Config) Validate() error { if cfg.NotCriticalDiskUsage > cfg.CriticalDiskUsage { return fmt.Errorf("not_critical_disk_usage should be <= critical_disk_usage") } + if cfg.SemiSync && cfg.ASync { + return fmt.Errorf("can't run in both semisync and async mode") + } return nil } diff --git a/internal/mysql/data.go b/internal/mysql/data.go index fedc605c..385e5386 100644 --- a/internal/mysql/data.go +++ b/internal/mysql/data.go @@ -268,6 +268,14 @@ type Version struct { PatchVersion int `db:"PatchVersion"` } +type MdbReplMonTs struct { + Timestamp string `db:"ts"` +} + +type MdbReplMonTsDelay struct { + Delay int64 `db:"delay"` +} + const ( Version80Major = 8 Version80Minor = 0 diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 5edb5e3f..f8bf95e4 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -985,3 +985,15 @@ func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error { } return nil } + +func (n *Node) GetMdbReplMonTs() (string, error) { + result := new(MdbReplMonTs) + err := n.queryRow(queryGetMdbReplMonTs, nil, result) + return result.Timestamp, err +} + +func (n *Node) CalcMdbReplMonTsDelay(ts string) (int64, error) { + result := new(MdbReplMonTsDelay) + err := n.queryRow(queryCalcMdbReplMonTsDelay, map[string]interface{}{"ts": ts}, result) + return result.Delay, err +} diff --git a/internal/mysql/queries.go b/internal/mysql/queries.go index 4e0288cd..d4641054 100644 --- a/internal/mysql/queries.go +++ b/internal/mysql/queries.go @@ -45,6 +45,8 @@ const ( querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit" querySetSyncBinlog = "set_sync_binlog" queryGetReplicationSettings = "get_replication_settings" + queryGetMdbReplMonTs = "get_mdb_repl_mon_ts" + queryCalcMdbReplMonTsDelay = "calc_mdb_repl_mon_ts_delay" ) var DefaultQueries = map[string]string{ @@ -123,4 +125,6 @@ var DefaultQueries = map[string]string{ querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`, queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`, querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`, + queryGetMdbReplMonTs: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM mysql.mdb_repl_mon`, + queryCalcMdbReplMonTsDelay: `SELECT UNIX_TIMESTAMP(CURRENT_TIMESTAMP(0)) - CAST(:ts AS DECIMAL(20,0)) AS delay`, }