Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support async replication #88

Merged
merged 43 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
55dd18b
support async replication
Mar 14, 2024
4dfafc3
fix a_sync->async, add async in tests yaml
Mar 17, 2024
afefe5c
fix mysync.yaml syntax err
Mar 18, 2024
3a4dcb0
fix docker compose cfg
Mar 18, 2024
62a430a
fix waitForCatchUp return in async mode
Mar 19, 2024
71f4403
set master online on switchover: phase 5
Mar 19, 2024
091024c
fix PriorityChoiceMaxLag in async mode
Mar 20, 2024
b685481
fix queryCalcMdbReplMonTsDelay query
Mar 22, 2024
2bfb126
async mode: add tests, fix linters
Mar 27, 2024
e88784d
async mode: add tests, fix linters
Mar 27, 2024
a155b32
async mode: add tests, fix linters
Mar 28, 2024
27b7834
async replication refactoring
May 6, 2024
00bdd06
async replication test fix
May 6, 2024
81a0f60
async replication test fix
May 6, 2024
3f44fc7
async replication test fix
May 7, 2024
c741698
Merge branch 'master' into async-replication
teem0n May 7, 2024
9b8d7a3
async replication test fix
May 14, 2024
302ff31
Merge branch 'async-replication' of github.com:yandex/mysync into asy…
May 14, 2024
b8a90db
async replication test fix
May 14, 2024
7425ad9
add mysync-repl-mon feature
May 22, 2024
3481208
add refactor mdb_repl_mon table name to custom configuring name
May 22, 2024
51782c9
add refactor mdb_repl_mon table name to custom configuring name
May 22, 2024
6f9a218
repl_mon fixes
May 27, 2024
5efe22e
repl_mon fixes
May 28, 2024
928e140
fix typo
May 28, 2024
de6a36e
add repl_mon tests
May 28, 2024
26efe42
fix async tests
May 28, 2024
9e9a083
fix async tests
May 29, 2024
7a878b3
fix async tests, add repl_mon.feature launch
May 29, 2024
4faacd9
fix async tests
May 29, 2024
428b272
fix async tests, fix repl_mon tests
May 30, 2024
31cfffb
add switch_helper
Jun 3, 2024
b7b0b32
linters fix
Jun 3, 2024
c5e08fb
fix " too many arguments "
Jun 3, 2024
68c8a96
linters fix
Jun 3, 2024
2de37a7
linters fix
Jun 3, 2024
24f2840
linters fix
Jun 3, 2024
7fddbdd
Merge branch 'master' into async-replication
teem0n Jun 10, 2024
d9ba04f
Merge branch 'master' into async-replication
teem0n Jun 11, 2024
9fd2bd6
Merge branch 'master' into async-replication
teem0n Jun 11, 2024
723d4db
fix async tests
Jun 13, 2024
1468bb5
fix async tests
Jun 13, 2024
6937a8b
refactor switch_helper
Jun 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,11 @@
app.logger.Errorf("failed to update active nodes in dcs: %v", err)
}

err = app.updateMdbReplMonTs(master)
if err != nil {
app.logger.Errorf("failed to update mdb_repl_mon timestamp: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explicitly mention "in dcs"

}

return stateManager
}

Expand Down Expand Up @@ -2118,9 +2123,27 @@
if gtidExecuted.Contain(gtidset) {
return true, nil
}
if app.dcs.Get(pathCurrentSwitch, new(Switchover)) == dcs.ErrNotFound {
switchover := new(Switchover)
if app.dcs.Get(pathCurrentSwitch, switchover) == dcs.ErrNotFound {
return false, nil
}
if app.config.ASync && switchover.Cause == CauseAuto {
ts, err := app.GetMdbReplMonTs()
if err != nil {
app.logger.Errorf("failed to get mdb repl mon ts: %v", err)
continue
}
delay, err := node.CalcMdbReplMonTsDelay(ts)
if err != nil {
app.logger.Errorf("failed to calc mdb repl mon ts: %v", err)
continue
}
if delay < app.config.ASyncAllowedLag {
app.logger.Infof("async allowed lag is %s and current lag on host %s is %s, so we don't wait for catch up any more",

Check failure on line 2142 in internal/app/app.go

View workflow job for this annotation

GitHub Actions / all_unittests

(*github.com/yandex/mysync/internal/log.Logger).Infof format %s has arg app.config.ASyncAllowedLag of wrong type int64

Check failure on line 2142 in internal/app/app.go

View workflow job for this annotation

GitHub Actions / lint

printf: (*github.com/yandex/mysync/internal/log.Logger).Infof format %s has arg app.config.ASyncAllowedLag of wrong type int64 (govet)
app.config.ASyncAllowedLag, node.Host(), delay)
break
}
}
time.Sleep(sleep)
if time.Now().After(deadline) {
break
Expand Down Expand Up @@ -2224,6 +2247,15 @@
return positions, util.CombineErrors(errs)
}

func (app *App) updateMdbReplMonTs(master string) error {

Check failure on line 2250 in internal/app/app.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: method updateMdbReplMonTs should be updateMdbReplMonTS (stylecheck)
masterNode := app.cluster.Get(master)
ts, err := masterNode.GetMdbReplMonTs()
if err != nil {
return fmt.Errorf("failed to get master mdb_repl_mon timestamp: %v", err)
}
return app.SetMdbReplMonTs(ts)
}

/*
Run enters the main application loop
When Run exits mysync process is over
Expand Down
21 changes: 21 additions & 0 deletions internal/app/app_dcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,24 @@
}
return "", nil
}

func (app *App) SetMdbReplMonTs(ts string) error {
err := app.dcs.Create(pathMasterReplMonTs, ts)
if err != nil && err != dcs.ErrExists {
return err
}
err = app.dcs.Set(pathMasterReplMonTs, ts)
if err != nil {
return err
}
return nil
}

func (app *App) GetMdbReplMonTs() (string, error) {
var ts string
err := app.dcs.Get(pathMasterReplMonTs, &ts)
if errors.Is(err, dcs.ErrNotFound) {
return "", nil
}
return ts, err
}

Check failure on line 251 in internal/app/app_dcs.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
3 changes: 3 additions & 0 deletions internal/app/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ const (
pathResetupStatus = "resetup_status"

pathLastShutdownNodeTime = "last_shutdown_node_time"

// last known timestamp from mysql.mdb_repl_mon
pathMasterReplMonTs = "master_repl_mon_ts"
)

var (
Expand Down
7 changes: 7 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@
ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"`
ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"`
ExternalReplicationType util.ExternalReplicationType `config:"external_replication_type" yaml:"external_replication_type"`
ASync bool `config:"a_sync" yaml:"a_sync"`

Check failure on line 91 in internal/config/config.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
ASyncAllowedLag int64 `config:"a_sync_allowed_lag" yaml:"a_sync_allowed_lag"`
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is a_sync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New setting which will allow client to set server in async mode. In async mode in case of failover mysync will promote slave to new master even if it has replication delay, but only if delay less then ASyncAllowedLag

}

// DefaultConfig returns default configuration for MySync
Expand Down Expand Up @@ -164,6 +166,8 @@
ReplicationChannel: "",
ExternalReplicationChannel: "external",
ExternalReplicationType: util.Disabled,
ASync: false,
ASyncAllowedLag: 0,
}
return config, nil
}
Expand Down Expand Up @@ -205,5 +209,8 @@
if cfg.NotCriticalDiskUsage > cfg.CriticalDiskUsage {
return fmt.Errorf("not_critical_disk_usage should be <= critical_disk_usage")
}
if cfg.SemiSync && cfg.ASync {
return fmt.Errorf("can't run in both semisync and async mode")
}
return nil
}
8 changes: 8 additions & 0 deletions internal/mysql/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,14 @@
PatchVersion int `db:"PatchVersion"`
}

type MdbReplMonTs struct {

Check failure on line 271 in internal/mysql/data.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: type MdbReplMonTs should be MdbReplMonTS (stylecheck)
Timestamp string `db:"ts"`
}

type MdbReplMonTsDelay struct {

Check failure on line 275 in internal/mysql/data.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: type MdbReplMonTsDelay should be MdbReplMonTSDelay (stylecheck)
Delay int64 `db:"delay"`
}

const (
Version80Major = 8
Version80Minor = 0
Expand Down
12 changes: 12 additions & 0 deletions internal/mysql/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,3 +985,15 @@
}
return nil
}

func (n *Node) GetMdbReplMonTs() (string, error) {

Check failure on line 989 in internal/mysql/node.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: method GetMdbReplMonTs should be GetMdbReplMonTS (stylecheck)
result := new(MdbReplMonTs)
err := n.queryRow(queryGetMdbReplMonTs, nil, result)
return result.Timestamp, err
}

func (n *Node) CalcMdbReplMonTsDelay(ts string) (int64, error) {

Check failure on line 995 in internal/mysql/node.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: method CalcMdbReplMonTsDelay should be CalcMdbReplMonTSDelay (stylecheck)
result := new(MdbReplMonTsDelay)
err := n.queryRow(queryCalcMdbReplMonTsDelay, map[string]interface{}{"ts": ts}, result)
return result.Delay, err
}
4 changes: 4 additions & 0 deletions internal/mysql/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
querySetInnodbFlushLogAtTrxCommit = "set_innodb_flush_log_at_trx_commit"
querySetSyncBinlog = "set_sync_binlog"
queryGetReplicationSettings = "get_replication_settings"
queryGetMdbReplMonTs = "get_mdb_repl_mon_ts"

Check failure on line 48 in internal/mysql/queries.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofmt`-ed with `-s` (gofmt)
queryCalcMdbReplMonTsDelay = "calc_mdb_repl_mon_ts_delay"

Check failure on line 49 in internal/mysql/queries.go

View workflow job for this annotation

GitHub Actions / lint

ST1003: const queryCalcMdbReplMonTsDelay should be queryCalcMdbReplMonTSDelay (stylecheck)
)

var DefaultQueries = map[string]string{
Expand Down Expand Up @@ -123,4 +125,6 @@
querySetInnodbFlushLogAtTrxCommit: `SET GLOBAL innodb_flush_log_at_trx_commit = :level`,
queryGetReplicationSettings: `SELECT @@innodb_flush_log_at_trx_commit as InnodbFlushLogAtTrxCommit, @@sync_binlog as SyncBinlog`,
querySetSyncBinlog: `SET GLOBAL sync_binlog = :sync_binlog`,
queryGetMdbReplMonTs: `SELECT UNIX_TIMESTAMP(ts) AS ts FROM mysql.mdb_repl_mon`,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one looks very specific to your installation. Probably, we should use data from replication_applier_status_by_worker table OR move mdb_repl_mon to mysync.

queryCalcMdbReplMonTsDelay: `SELECT UNIX_TIMESTAMP(CURRENT_TIMESTAMP(0)) - CAST(:ts AS DECIMAL(20,0)) AS delay`,
}
Loading