Skip to content

Commit

Permalink
[WIP] support async replication (#88)
Browse files Browse the repository at this point in the history
* support async replication

* fix a_sync->async, add async in tests yaml

* fix mysync.yaml syntax err

* fix docker compose cfg

* fix waitForCatchUp return in async mode

* set master online on switchover: phase 5

* fix PriorityChoiceMaxLag in async mode

* fix queryCalcMdbReplMonTsDelay query

* async mode: add tests, fix linters

* async mode: add tests, fix linters

* async mode: add tests, fix linters

* async replication refactoring

* async replication test fix

* async replication test fix

* async replication test fix

* async replication test fix

* async replication test fix

* add mysync-repl-mon feature

* add refactor mdb_repl_mon table name to custom configuring name

* add refactor mdb_repl_mon table name to custom configuring name

* repl_mon fixes

* repl_mon fixes

* fix typo

* add repl_mon tests

* fix async tests

* fix async tests

* fix async tests, add repl_mon.feature launch

* fix async tests

* fix async tests, fix repl_mon tests

* add switch_helper

* linters fix

* fix " too many arguments "

* linters fix

* linters fix

* linters fix

* fix async tests

* fix async tests

* refactor switch_helper

---------

Co-authored-by: suetin <[email protected]>
Co-authored-by: teem0n <[email protected]>
  • Loading branch information
3 people authored Jun 17, 2024
1 parent 77f1af9 commit 2243d70
Show file tree
Hide file tree
Showing 18 changed files with 735 additions and 8 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/docker-tests-8.0.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
command:
- 'VERSION=8.0 GODOG_FEATURE=active_nodes.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=async.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=async_setting.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=cascade_replicas.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=CLI.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=crash_recovery.feature make test'
Expand All @@ -57,6 +58,7 @@ jobs:
- 'VERSION=8.0 GODOG_FEATURE=readonly_filesystem.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=recovery.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=repair.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=repl_mon.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=statefile.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=switchover_from.feature make test'
- 'VERSION=8.0 GODOG_FEATURE=switchover_to.feature make test'
Expand Down
9 changes: 5 additions & 4 deletions .github/workflows/docker-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ jobs:
matrix:
command:
- 'GODOG_FEATURE=active_nodes.feature make test'
- 'GODOG_FEATURE=async.feature make test'
- 'GODOG_FEATURE=cascade_replicas.feature make test'
- 'GODOG_FEATURE=async.feature make test'
- 'GODOG_FEATURE=cascade_replicas.feature make test'
- 'GODOG_FEATURE=CLI.feature make test'
- 'GODOG_FEATURE=crash_recovery.feature make test'
- 'GODOG_FEATURE=events_reenable.feature make test'
Expand All @@ -55,8 +55,9 @@ jobs:
- 'GODOG_FEATURE=priority.feature make test'
- 'GODOG_FEATURE=readonly_filesystem.feature make test'
- 'GODOG_FEATURE=recovery.feature make test'
- 'GODOG_FEATURE=repair.feature make test'
- 'GODOG_FEATURE=statefile.feature make test'
- 'GODOG_FEATURE=repair.feature make test'
- 'GODOG_FEATURE=repl_mon.feature make test'
- 'GODOG_FEATURE=statefile.feature make test'
- 'GODOG_FEATURE=switchover_from.feature make test'
- 'GODOG_FEATURE=switchover_to.feature make test'
- 'GODOG_FEATURE=zk_failure.feature make test'
Expand Down
68 changes: 66 additions & 2 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type App struct {
daemonMutex sync.Mutex
replRepairState map[string]*ReplicationRepairState
externalReplication mysql.IExternalReplication
switchHelper mysql.ISwitchHelper
}

// NewApp returns new App. Suddenly.
Expand All @@ -62,6 +63,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
if err != nil {
return nil, err
}
switchHelper := mysql.NewSwitchHelper(config)
app := &App{
state: stateFirstRun,
config: config,
Expand All @@ -70,6 +72,7 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) {
streamFromFailedAt: make(map[string]time.Time),
replRepairState: make(map[string]*ReplicationRepairState),
externalReplication: externalReplication,
switchHelper: switchHelper,
}
return app, nil
}
Expand Down Expand Up @@ -256,6 +259,51 @@ func (app *App) externalCAFileChecker(ctx context.Context) {
}
}

func (app *App) replMonWriter(ctx context.Context) {
ticker := time.NewTicker(app.config.ReplMonWriteInterval)
for {
select {
case <-ticker.C:
localNode := app.cluster.Local()
sstatus, err := localNode.GetReplicaStatus()
if err != nil {
app.logger.Errorf("repl mon writer: got error %v while checking replica status", err)
time.Sleep(app.config.ReplMonErrorWaitInterval)
continue
}
if sstatus != nil {
app.logger.Infof("repl mon writer: host is replica")
time.Sleep(app.config.ReplMonSlaveWaitInterval)
continue
}
readOnly, _, err := localNode.IsReadOnly()
if err != nil {
app.logger.Errorf("repl mon writer: got error %v while checking read only status", err)
time.Sleep(app.config.ReplMonErrorWaitInterval)
continue
}
if readOnly {
app.logger.Infof("repl mon writer: host is read only")
time.Sleep(app.config.ReplMonSlaveWaitInterval)
continue
}
err = localNode.UpdateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
if mysql.IsErrorTableDoesNotExists(err) {
err = localNode.CreateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
app.logger.Errorf("repl mon writer: got error %v while creating repl mon table", err)
}
continue
}
app.logger.Errorf("repl mon writer: got error %v while writing in repl mon table", err)
}
case <-ctx.Done():
return
}
}
}

func (app *App) SetResetupStatus() {
err := app.setResetupStatus(app.cluster.Local().Host(), app.doesResetupFileExist())
if err != nil {
Expand Down Expand Up @@ -710,6 +758,11 @@ func (app *App) stateManager() appState {
app.logger.Errorf("failed to update active nodes in dcs: %v", err)
}

err = app.updateReplMonTS(master)
if err != nil {
app.logger.Errorf("failed to update repl_mon timestamp: %v", err)
}

return stateManager
}

Expand Down Expand Up @@ -1246,7 +1299,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode
} else if switchover.From != "" {
positions2 := filterOutNodeFromPositions(positions, switchover.From)
// we ignore splitbrain flag as it should be handled during searching most recent host
newMaster, err = getMostDesirableNode(app.logger, positions2, app.config.PriorityChoiceMaxLag)
newMaster, err = getMostDesirableNode(app.logger, positions2, app.switchHelper.GetPriorityChoiceMaxLag())
if err != nil {
return fmt.Errorf("switchover: error while looking for highest priority node: %s", switchover.From)
}
Expand Down Expand Up @@ -1297,6 +1350,10 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode

// turn slaves to the new master
app.logger.Info("switchover: phase 5: turn to the new master")
err = app.cluster.Get(newMaster).SetOnline()
if err != nil {
return fmt.Errorf("got error on setting new master %s online %v", newMaster, err)
}
errs = util.RunParallel(func(host string) error {
if host == newMaster || !clusterState[host].PingOk {
return nil
Expand Down Expand Up @@ -2145,9 +2202,13 @@ func (app *App) waitForCatchUp(node *mysql.Node, gtidset gtids.GTIDSet, timeout
if gtidExecuted.Contain(gtidset) {
return true, nil
}
if app.dcs.Get(pathCurrentSwitch, new(Switchover)) == dcs.ErrNotFound {
switchover := new(Switchover)
if app.dcs.Get(pathCurrentSwitch, switchover) == dcs.ErrNotFound {
return false, nil
}
if app.CheckAsyncSwitchAllowed(node, switchover) {
return true, nil
}
time.Sleep(sleep)
if time.Now().After(deadline) {
break
Expand Down Expand Up @@ -2288,6 +2349,9 @@ func (app *App) Run() int {
if app.config.ExternalReplicationType != util.Disabled {
go app.externalCAFileChecker(ctx)
}
if app.config.ReplMon {
go app.replMonWriter(ctx)
}

handlers := map[appState](func() appState){
stateFirstRun: app.stateFirstRun,
Expand Down
21 changes: 21 additions & 0 deletions internal/app/app_dcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,24 @@ func (app *App) GetMasterHostFromDcs() (string, error) {
}
return "", nil
}

func (app *App) SetReplMonTS(ts string) error {
err := app.dcs.Create(pathMasterReplMonTS, ts)
if err != nil && err != dcs.ErrExists {
return err
}
err = app.dcs.Set(pathMasterReplMonTS, ts)
if err != nil {
return err
}
return nil
}

func (app *App) GetReplMonTS() (string, error) {
var ts string
err := app.dcs.Get(pathMasterReplMonTS, &ts)
if errors.Is(err, dcs.ErrNotFound) {
return "", nil
}
return ts, err
}
38 changes: 38 additions & 0 deletions internal/app/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package app

import (
"fmt"

"github.com/yandex/mysync/internal/mysql"
)

func (app *App) CheckAsyncSwitchAllowed(node *mysql.Node, switchover *Switchover) bool {
if app.config.ASync && switchover.Cause == CauseAuto && app.config.AsyncAllowedLag > 0 {
app.logger.Infof("async mode is active and this is auto switch so we checking new master delay")
ts, err := app.GetReplMonTS()
if err != nil {
app.logger.Errorf("failed to get mdb repl mon ts: %v", err)
return false
}
delay, err := node.CalcReplMonTSDelay(app.config.ReplMonSchemeName, app.config.ReplMonTableName, ts)
if err != nil {
app.logger.Errorf("failed to calc mdb repl mon ts: %v", err)
return false
}
if delay < app.config.AsyncAllowedLag {
app.logger.Infof("async allowed lag is %d and current lag on host %s is %d, so we don't wait for catch up any more",
app.config.AsyncAllowedLag, node.Host(), delay)
return true
}
}
return false
}

func (app *App) updateReplMonTS(master string) error {
masterNode := app.cluster.Get(master)
ts, err := masterNode.GetReplMonTS(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
return fmt.Errorf("failed to get master repl_mon timestamp: %v", err)
}
return app.SetReplMonTS(ts)
}
2 changes: 1 addition & 1 deletion internal/app/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func (app *App) CliSwitch(switchFrom, switchTo string, waitTimeout time.Duration
app.logger.Errorf(err.Error())
return 1
}
toHost, err = getMostDesirableNode(app.logger, positions, app.config.PriorityChoiceMaxLag)
toHost, err = getMostDesirableNode(app.logger, positions, app.switchHelper.GetPriorityChoiceMaxLag())
if err != nil {
app.logger.Errorf(err.Error())
return 1
Expand Down
3 changes: 3 additions & 0 deletions internal/app/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ const (
pathResetupStatus = "resetup_status"

pathLastShutdownNodeTime = "last_shutdown_node_time"

// last known timestamp from repl_mon table
pathMasterReplMonTS = "master_repl_mon_ts"
)

var (
Expand Down
19 changes: 19 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,14 @@ type Config struct {
ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"`
ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"`
ExternalReplicationType util.ExternalReplicationType `config:"external_replication_type" yaml:"external_replication_type"`
ASync bool `config:"async" yaml:"async"`
AsyncAllowedLag int64 `config:"async_allowed_lag" yaml:"async_allowed_lag"`
ReplMon bool `config:"repl_mon" yaml:"repl_mon"`
ReplMonSchemeName string `config:"repl_mon_scheme_name" yaml:"repl_mon_scheme_name"`
ReplMonTableName string `config:"repl_mon_table_name" yaml:"repl_mon_table_name"`
ReplMonWriteInterval time.Duration `config:"repl_mon_write_interval" yaml:"repl_mon_write_interval"`
ReplMonErrorWaitInterval time.Duration `config:"repl_mon_error_wait_interval" yaml:"repl_mon_error_wait_interval"`
ReplMonSlaveWaitInterval time.Duration `config:"repl_mon_slave_wait_interval" yaml:"repl_mon_slave_wait_interval"`
}

// DefaultConfig returns default configuration for MySync
Expand Down Expand Up @@ -164,6 +172,14 @@ func DefaultConfig() (Config, error) {
ReplicationChannel: "",
ExternalReplicationChannel: "external",
ExternalReplicationType: util.Disabled,
ASync: false,
AsyncAllowedLag: 0,
ReplMon: false,
ReplMonSchemeName: "mysql",
ReplMonTableName: "mysync_repl_mon",
ReplMonWriteInterval: 1 * time.Second,
ReplMonErrorWaitInterval: 10 * time.Second,
ReplMonSlaveWaitInterval: 10 * time.Second,
}
return config, nil
}
Expand Down Expand Up @@ -205,5 +221,8 @@ func (cfg *Config) Validate() error {
if cfg.NotCriticalDiskUsage > cfg.CriticalDiskUsage {
return fmt.Errorf("not_critical_disk_usage should be <= critical_disk_usage")
}
if cfg.SemiSync && cfg.ASync {
return fmt.Errorf("can't run in both semisync and async mode")
}
return nil
}
8 changes: 8 additions & 0 deletions internal/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,14 @@ type Version struct {
PatchVersion int `db:"PatchVersion"`
}

type ReplMonTS struct {
Timestamp string `db:"ts"`
}

type ReplMonTSDelay struct {
Delay int64 `db:"delay"`
}

const (
Version80Major = 8
Version80Minor = 0
Expand Down
41 changes: 41 additions & 0 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ func (n *Node) queryRowMogrifyWithTimeout(queryName string, arg map[string]inter
return err
}

func (n *Node) queryRowMogrify(queryName string, arg map[string]interface{}, result interface{}) error {
return n.queryRowMogrifyWithTimeout(queryName, arg, result, n.config.DBTimeout)
}

// IsRunning checks if daemon process is running
func (n *Node) IsRunning() (bool, error) {
if !n.IsLocal() {
Expand Down Expand Up @@ -1013,3 +1017,40 @@ func (n *Node) SetDefaultReplicationSettings(masterNode *Node) error {
}
return nil
}

func (n *Node) GetReplMonTS(replMonSchemeName string, replMonTable string) (string, error) {
result := new(ReplMonTS)
err := n.queryRowMogrify(queryGetReplMonTS,
map[string]interface{}{
"replMonSchemeName": schemaname(replMonSchemeName),
"replMonTable": schemaname(replMonTable),
},
result)
return result.Timestamp, err
}

func (n *Node) CalcReplMonTSDelay(replMonSchemeName string, replMonTable string, ts string) (int64, error) {
result := new(ReplMonTSDelay)
err := n.queryRowMogrify(queryCalcReplMonTSDelay,
map[string]interface{}{
"ts": ts,
"replMonSchemeName": schemaname(replMonSchemeName),
"replMonTable": schemaname(replMonTable),
},
result)
return result.Delay, err
}

func (n *Node) CreateReplMonTable(replMonSchemeName string, replMonTable string) error {
return n.execMogrify(queryCreateReplMonTable, map[string]interface{}{
"replMonSchemeName": schemaname(replMonSchemeName),
"replMonTable": schemaname(replMonTable),
})
}

func (n *Node) UpdateReplMonTable(replMonSchemeName string, replMonTable string) error {
return n.execMogrify(queryUpdateReplMon, map[string]interface{}{
"replMonSchemeName": schemaname(replMonSchemeName),
"replMonTable": schemaname(replMonTable),
})
}
17 changes: 17 additions & 0 deletions internal/mysql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ const (
querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit"
querySetSyncBinlog = "set_sync_binlog"
queryGetReplicationSettings = "get_replication_settings"
queryGetReplMonTS = "get_repl_mon_ts"
queryCalcReplMonTSDelay = "calc_repl_mon_ts_delay"
queryCreateReplMonTable = "create_repl_mon_table"
queryUpdateReplMon = "update_repl_mon"
)

var DefaultQueries = map[string]string{
Expand Down Expand Up @@ -125,4 +129,17 @@ var DefaultQueries = map[string]string{
querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`,
queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`,
querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`,
queryGetReplMonTS: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM :replMonSchemeName.:replMonTable`,
queryCalcReplMonTSDelay: `SELECT FLOOR(CAST(:ts AS DECIMAL(20,3)) - UNIX_TIMESTAMP(ts)) AS delay FROM :replMonSchemeName.:replMonTable`,
queryCreateReplMonTable: `CREATE TABLE IF NOT EXISTS :replMonSchemeName.:replMonTable(
id INT NOT NULL PRIMARY KEY,
ts TIMESTAMP(3)
)
ENGINE=INNODB`,
queryUpdateReplMon: `INSERT INTO :replMonSchemeName.:replMonTable(id, ts)
(
SELECT 1, CURRENT_TIMESTAMP(3)
WHERE @@read_only = 0
)
ON DUPLICATE KEY UPDATE ts = CURRENT_TIMESTAMP(3)`,
}
Loading

0 comments on commit 2243d70

Please sign in to comment.