From 5bb7474caeb7927a7ce7ad0e8ef195eb650e2d3a Mon Sep 17 00:00:00 2001 From: noname0443 Date: Mon, 25 Sep 2023 11:37:43 +0300 Subject: [PATCH] External replication refactoring (#39) * Added IExternalReplication interface * Added logging of information about the activation of external replication * Useless goroutine fix --- internal/app/app.go | 60 +++++----- internal/app/replication.go | 2 +- internal/config/config.go | 108 ++++++++--------- internal/mysql/node.go | 153 +----------------------- internal/mysql/replication.go | 207 +++++++++++++++++++++++++++++++++ internal/util/consts.go | 8 ++ tests/images/mysql/mysync.yaml | 1 + 7 files changed, 312 insertions(+), 227 deletions(-) create mode 100644 internal/mysql/replication.go create mode 100644 internal/util/consts.go diff --git a/internal/app/app.go b/internal/app/app.go index db82d228..66f29f17 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -26,17 +26,18 @@ import ( // App is main application structure type App struct { - state appState - logger *log.Logger - config *config.Config - dcs dcs.DCS - cluster *mysql.Cluster - filelock *flock.Flock - nodeFailedAt map[string]time.Time - streamFromFailedAt map[string]time.Time - daemonState *DaemonState - daemonMutex sync.Mutex - replRepairState map[string]*ReplicationRepairState + state appState + logger *log.Logger + config *config.Config + dcs dcs.DCS + cluster *mysql.Cluster + filelock *flock.Flock + nodeFailedAt map[string]time.Time + streamFromFailedAt map[string]time.Time + daemonState *DaemonState + daemonMutex sync.Mutex + replRepairState map[string]*ReplicationRepairState + externalReplication mysql.IExternalReplication } // NewApp returns new App. Suddenly. @@ -57,13 +58,18 @@ func NewApp(configFile, logLevel string, interactive bool) (*App, error) { if logPath != "" { logger.ReOpenOnSignal(syscall.SIGUSR2) } + externalReplication, err := mysql.NewExternalReplication(config.ExternalReplicationType, logger) + if err != nil { + return nil, err + } app := &App{ - state: stateFirstRun, - config: config, - logger: logger, - nodeFailedAt: make(map[string]time.Time), - streamFromFailedAt: make(map[string]time.Time), - replRepairState: make(map[string]*ReplicationRepairState), + state: stateFirstRun, + config: config, + logger: logger, + nodeFailedAt: make(map[string]time.Time), + streamFromFailedAt: make(map[string]time.Time), + replRepairState: make(map[string]*ReplicationRepairState), + externalReplication: externalReplication, } return app, nil } @@ -229,7 +235,7 @@ func (app *App) externalCAFileChecker(ctx context.Context) { select { case <-ticker.C: localNode := app.cluster.Local() - replicaStatus, err := localNode.GetExternalReplicaStatus() + replicaStatus, err := app.externalReplication.GetReplicaStatus(localNode) if err != nil { if !mysql.IsErrorChannelDoesNotExists(err) { app.logger.Errorf("external CA file checker: host %s failed to get external replica status %v", localNode.Host(), err) @@ -1145,7 +1151,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode oldMasterNode := app.cluster.Get(oldMaster) if clusterState[oldMaster].PingOk { - err := oldMasterNode.StopExternalReplication() + err := app.externalReplication.Stop(oldMasterNode) if err != nil { return fmt.Errorf("got error: %s while stopping external replication on old master: %s", err, oldMaster) } @@ -1290,7 +1296,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode } } else { app.logger.Infof("switchover: old master %s does not need recovery", oldMaster) - err = oldMasterNode.ResetExternalReplicationAll() + err = app.externalReplication.Reset(oldMasterNode) if err != nil { return fmt.Errorf("got error: %s while reseting external replication on old master: %s", err, oldMaster) } @@ -1332,7 +1338,7 @@ func (app *App) performSwitchover(clusterState map[string]*NodeState, activeNode } // enable external replication - err = newMasterNode.SetExternalReplication() + err = app.externalReplication.Set(newMasterNode) if err != nil { app.logger.Errorf("failed to set external replication on new master") } @@ -1615,11 +1621,11 @@ func (app *App) repairSlaveNode(node *mysql.Node, clusterState map[string]*NodeS if err != nil { app.logger.Errorf("repair: %s", err) } - err = node.StopExternalReplication() + err = app.externalReplication.Stop(node) if err != nil { app.logger.Errorf("repair: %s", err) } - err = node.ResetExternalReplicationAll() + err = app.externalReplication.Reset(node) if err != nil { app.logger.Errorf("repair: %s", err) } @@ -1790,7 +1796,7 @@ func (app *App) repairCascadeNode(node *mysql.Node, clusterState map[string]*Nod } func (app *App) repairExternalReplication(masterNode *mysql.Node) { - extReplStatus, err := masterNode.GetExternalReplicaStatus() + extReplStatus, err := app.externalReplication.GetReplicaStatus(masterNode) if err != nil { if !mysql.IsErrorChannelDoesNotExists(err) { app.logger.Errorf("repair (external): host %s failed to get external replica status %v", masterNode.Host(), err) @@ -1802,7 +1808,7 @@ func (app *App) repairExternalReplication(masterNode *mysql.Node) { return } - if masterNode.IsExternalReplicationRunningByUser() && !extReplStatus.ReplicationRunning() { + if app.externalReplication.IsRunningByUser(masterNode) && !extReplStatus.ReplicationRunning() { // TODO: remove "". Master is not needed for external replication now app.TryRepairReplication(masterNode, "", app.config.ExternalReplicationChannel) } @@ -2252,7 +2258,9 @@ func (app *App) Run() int { go app.healthChecker(ctx) go app.recoveryChecker(ctx) go app.stateFileHandler(ctx) - go app.externalCAFileChecker(ctx) + if app.config.ExternalReplicationType != util.Disabled { + go app.externalCAFileChecker(ctx) + } handlers := map[appState](func() appState){ stateFirstRun: app.stateFirstRun, diff --git a/internal/app/replication.go b/internal/app/replication.go index 4a519b1f..70d2fa4b 100644 --- a/internal/app/replication.go +++ b/internal/app/replication.go @@ -68,7 +68,7 @@ func (app *App) makeReplStateKey(node *mysql.Node, channel string) string { func StartSlaveAlgorithm(app *App, node *mysql.Node, _ string, channel string) error { app.logger.Infof("repair: trying to repair replication using StartSlaveAlgorithm...") if channel == app.config.ExternalReplicationChannel { - return node.StartExternalReplication() + return app.externalReplication.Start(node) } return node.StartSlave() } diff --git a/internal/config/config.go b/internal/config/config.go index c1ac7034..2e9c2168 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -34,59 +34,60 @@ type MySQLConfig struct { // Config contains all mysync configuration type Config struct { - DevMode bool `config:"dev_mode" yaml:"dev_mode"` - SemiSync bool `config:"semi_sync" yaml:"semi_sync"` - SemiSyncEnableLag int64 `config:"semi_sync_enable_lag" yaml:"semi_sync_enable_lag"` - Failover bool `config:"failover" yaml:"failover"` - FailoverCooldown time.Duration `config:"failover_cooldown" yaml:"failover_cooldown"` - FailoverDelay time.Duration `config:"failover_delay" yaml:"failover_delay"` - InactivationDelay time.Duration `config:"inactivation_delay" yaml:"inactivation_delay"` - CriticalDiskUsage float64 `config:"critical_disk_usage" yaml:"critical_disk_usage"` - NotCriticalDiskUsage float64 `config:"not_critical_disk_usage" yaml:"not_critical_disk_usage"` - LogLevel string `config:"loglevel"` - Log string `config:"log"` - Hostname string `config:"hostname"` - Lockfile string `config:"lockfile"` - InfoFile string `config:"info_file" yaml:"info_file"` - Emergefile string `config:"emergefile"` - Resetupfile string `config:"resetupfile"` - Maintenancefile string `config:"maintenancefile"` - MySQL MySQLConfig `config:"mysql"` - Queries map[string]string `config:"queries"` - Commands map[string]string `config:"commands"` - Zookeeper dcs.ZookeeperConfig `config:"zookeeper"` - DcsWaitTimeout time.Duration `config:"dcs_wait_timeout" yaml:"dcs_wait_timeout"` - DBTimeout time.Duration `config:"db_timeout" yaml:"db_timeout"` - DBLostCheckTimeout time.Duration `config:"db_lost_check_timeout" yaml:"db_lost_check_timeout"` - DBSetRoTimeout time.Duration `config:"db_set_ro_timeout" yaml:"db_set_ro_timeout"` - DBSetRoForceTimeout time.Duration `config:"db_set_ro_force_timeout" yaml:"db_set_ro_force_timeout"` - DBStopSlaveSQLThreadTimeout time.Duration `config:"db_stop_slave_sql_thread_timeout" yaml:"db_stop_slave_sql_thread_timeout"` - TickInterval time.Duration `config:"tick_interval" yaml:"tick_interval"` - HealthCheckInterval time.Duration `config:"healthcheck_interval" yaml:"healthcheck_interval"` - InfoFileHandlerInterval time.Duration `config:"info_file_handler_interval" yaml:"info_file_handler_interval"` - RecoveryCheckInterval time.Duration `config:"recoverycheck_interval" yaml:"recoverycheck_interval"` - ExternalCAFileCheckInterval time.Duration `config:"external_ca_file_check_interval" yaml:"external_ca_file_check_interval"` - MaxAcceptableLag float64 `config:"max_acceptable_lag" yaml:"max_acceptable_lag"` - SlaveCatchUpTimeout time.Duration `config:"slave_catch_up_timeout" yaml:"slave_catch_up_timeout"` - DisableSemiSyncReplicationOnMaintenance bool `config:"disable_semi_sync_replication_on_maintenance" yaml:"disable_semi_sync_replication_on_maintenance"` - KeepSuperWritableOnCriticalDiskUsage bool `config:"keep_super_writable_on_critical_disk_usage" yaml:"keep_super_writable_on_critical_disk_usage"` - ExcludeUsers []string `config:"exclude_users" yaml:"exclude_users"` - OfflineModeEnableInterval time.Duration `config:"offline_mode_enable_interval" yaml:"offline_mode_enable_interval"` - OfflineModeEnableLag time.Duration `config:"offline_mode_enable_lag" yaml:"offline_mode_enable_lag"` - OfflineModeDisableLag time.Duration `config:"offline_mode_disable_lag" yaml:"offline_mode_disable_lag"` - DisableSetReadonlyOnLost bool `config:"disable_set_readonly_on_lost" yaml:"disable_set_readonly_on_lost"` - ResetupCrashedHosts bool `config:"resetup_crashed_hosts" yaml:"resetup_crashed_hosts"` - StreamFromReasonableLag time.Duration `config:"stream_from_reasonable_lag" yaml:"stream_from_reasonable_lag"` - PriorityChoiceMaxLag time.Duration `config:"priority_choice_max_lag" yaml:"priority_choice_max_lag"` - TestDiskUsageFile string `config:"test_disk_usage_file" yaml:"test_disk_usage_file"` - RplSemiSyncMasterWaitForSlaveCount int `config:"rpl_semi_sync_master_wait_for_slave_count" yaml:"rpl_semi_sync_master_wait_for_slave_count"` - WaitReplicationStartTimeout time.Duration `config:"wait_start_replication_timeout" yaml:"wait_start_replication_timeout"` - ReplicationRepairAggressiveMode bool `config:"replication_repair_aggressive_mode" yaml:"replication_repair_aggressive_mode"` - ReplicationRepairCooldown time.Duration `config:"replication_repair_cooldown" yaml:"replication_repair_cooldown"` - ReplicationRepairMaxAttempts int `config:"replication_repair_max_attempts" yaml:"replication_repair_max_attempts"` - TestFilesystemReadonlyFile string `config:"test_filesystem_readonly_file" yaml:"test_filesystem_readonly_file"` - ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"` - ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"` + DevMode bool `config:"dev_mode" yaml:"dev_mode"` + SemiSync bool `config:"semi_sync" yaml:"semi_sync"` + SemiSyncEnableLag int64 `config:"semi_sync_enable_lag" yaml:"semi_sync_enable_lag"` + Failover bool `config:"failover" yaml:"failover"` + FailoverCooldown time.Duration `config:"failover_cooldown" yaml:"failover_cooldown"` + FailoverDelay time.Duration `config:"failover_delay" yaml:"failover_delay"` + InactivationDelay time.Duration `config:"inactivation_delay" yaml:"inactivation_delay"` + CriticalDiskUsage float64 `config:"critical_disk_usage" yaml:"critical_disk_usage"` + NotCriticalDiskUsage float64 `config:"not_critical_disk_usage" yaml:"not_critical_disk_usage"` + LogLevel string `config:"loglevel"` + Log string `config:"log"` + Hostname string `config:"hostname"` + Lockfile string `config:"lockfile"` + InfoFile string `config:"info_file" yaml:"info_file"` + Emergefile string `config:"emergefile"` + Resetupfile string `config:"resetupfile"` + Maintenancefile string `config:"maintenancefile"` + MySQL MySQLConfig `config:"mysql"` + Queries map[string]string `config:"queries"` + Commands map[string]string `config:"commands"` + Zookeeper dcs.ZookeeperConfig `config:"zookeeper"` + DcsWaitTimeout time.Duration `config:"dcs_wait_timeout" yaml:"dcs_wait_timeout"` + DBTimeout time.Duration `config:"db_timeout" yaml:"db_timeout"` + DBLostCheckTimeout time.Duration `config:"db_lost_check_timeout" yaml:"db_lost_check_timeout"` + DBSetRoTimeout time.Duration `config:"db_set_ro_timeout" yaml:"db_set_ro_timeout"` + DBSetRoForceTimeout time.Duration `config:"db_set_ro_force_timeout" yaml:"db_set_ro_force_timeout"` + DBStopSlaveSQLThreadTimeout time.Duration `config:"db_stop_slave_sql_thread_timeout" yaml:"db_stop_slave_sql_thread_timeout"` + TickInterval time.Duration `config:"tick_interval" yaml:"tick_interval"` + HealthCheckInterval time.Duration `config:"healthcheck_interval" yaml:"healthcheck_interval"` + InfoFileHandlerInterval time.Duration `config:"info_file_handler_interval" yaml:"info_file_handler_interval"` + RecoveryCheckInterval time.Duration `config:"recoverycheck_interval" yaml:"recoverycheck_interval"` + ExternalCAFileCheckInterval time.Duration `config:"external_ca_file_check_interval" yaml:"external_ca_file_check_interval"` + MaxAcceptableLag float64 `config:"max_acceptable_lag" yaml:"max_acceptable_lag"` + SlaveCatchUpTimeout time.Duration `config:"slave_catch_up_timeout" yaml:"slave_catch_up_timeout"` + DisableSemiSyncReplicationOnMaintenance bool `config:"disable_semi_sync_replication_on_maintenance" yaml:"disable_semi_sync_replication_on_maintenance"` + KeepSuperWritableOnCriticalDiskUsage bool `config:"keep_super_writable_on_critical_disk_usage" yaml:"keep_super_writable_on_critical_disk_usage"` + ExcludeUsers []string `config:"exclude_users" yaml:"exclude_users"` + OfflineModeEnableInterval time.Duration `config:"offline_mode_enable_interval" yaml:"offline_mode_enable_interval"` + OfflineModeEnableLag time.Duration `config:"offline_mode_enable_lag" yaml:"offline_mode_enable_lag"` + OfflineModeDisableLag time.Duration `config:"offline_mode_disable_lag" yaml:"offline_mode_disable_lag"` + DisableSetReadonlyOnLost bool `config:"disable_set_readonly_on_lost" yaml:"disable_set_readonly_on_lost"` + ResetupCrashedHosts bool `config:"resetup_crashed_hosts" yaml:"resetup_crashed_hosts"` + StreamFromReasonableLag time.Duration `config:"stream_from_reasonable_lag" yaml:"stream_from_reasonable_lag"` + PriorityChoiceMaxLag time.Duration `config:"priority_choice_max_lag" yaml:"priority_choice_max_lag"` + TestDiskUsageFile string `config:"test_disk_usage_file" yaml:"test_disk_usage_file"` + RplSemiSyncMasterWaitForSlaveCount int `config:"rpl_semi_sync_master_wait_for_slave_count" yaml:"rpl_semi_sync_master_wait_for_slave_count"` + WaitReplicationStartTimeout time.Duration `config:"wait_start_replication_timeout" yaml:"wait_start_replication_timeout"` + ReplicationRepairAggressiveMode bool `config:"replication_repair_aggressive_mode" yaml:"replication_repair_aggressive_mode"` + ReplicationRepairCooldown time.Duration `config:"replication_repair_cooldown" yaml:"replication_repair_cooldown"` + ReplicationRepairMaxAttempts int `config:"replication_repair_max_attempts" yaml:"replication_repair_max_attempts"` + TestFilesystemReadonlyFile string `config:"test_filesystem_readonly_file" yaml:"test_filesystem_readonly_file"` + ReplicationChannel string `config:"replication_channel" yaml:"replication_channel"` + ExternalReplicationChannel string `config:"external_replication_channel" yaml:"external_replication_channel"` + ExternalReplicationType util.ExternalReplicationType `config:"external_replication_type" yaml:"external_replication_type"` } // DefaultConfig returns default configuration for MySync @@ -162,6 +163,7 @@ func DefaultConfig() (Config, error) { TestFilesystemReadonlyFile: "", // fake readonly status, only for docker tests ReplicationChannel: "", ExternalReplicationChannel: "external", + ExternalReplicationType: util.Disabled, } return config, nil } diff --git a/internal/mysql/node.go b/internal/mysql/node.go index 8c22157b..e6343880 100644 --- a/internal/mysql/node.go +++ b/internal/mysql/node.go @@ -36,9 +36,11 @@ type Node struct { version *Version } -var queryOnliner = regexp.MustCompile(`\r?\n\s*`) -var mogrifyRegex = regexp.MustCompile(`:\w+`) -var ErrNotLocalNode = errors.New("this method should be run on local node only") +var ( + queryOnliner = regexp.MustCompile(`\r?\n\s*`) + mogrifyRegex = regexp.MustCompile(`:\w+`) + ErrNotLocalNode = errors.New("this method should be run on local node only") +) const ( optimalSyncBinlogValue = 1000 @@ -248,7 +250,6 @@ func (n *Node) getRunningQueryIds(excludeUsers []string, timeout time.Duration) for rows.Next() { var currid int err := rows.Scan(&currid) - if err != nil { n.traceQuery(bquery, nil, ret, err) return nil, err @@ -362,7 +363,7 @@ func (n *Node) GetDiskUsage() (used uint64, total uint64, err error) { err = syscall.Statfs(n.config.MySQL.DataDir, &stat) total = uint64(stat.Bsize) * stat.Blocks // on FreeBSD stat.Bavail may be negative - var bavail = stat.Bavail + bavail := stat.Bavail // nolint: staticcheck if bavail < 0 { bavail = 0 @@ -492,19 +493,6 @@ func (n *Node) GetReplicaStatus() (ReplicaStatus, error) { return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ReplicationChannel) } -// GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel -func (n *Node) GetExternalReplicaStatus() (ReplicaStatus, error) { - checked, err := n.IsExternalReplicationSupported() - if err != nil { - return nil, err - } - if !(checked) { - return nil, nil - } - - return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel) -} - func (n *Node) ReplicaStatusWithTimeout(timeout time.Duration, channel string) (ReplicaStatus, error) { query, status, err := n.GetVersionSlaveStatusQuery() if err != nil { @@ -744,57 +732,6 @@ func (n *Node) ResetSlaveAll() error { }) } -// StartExternalReplication starts external replication -func (n *Node) StartExternalReplication() error { - checked, err := n.IsExternalReplicationSupported() - if err != nil { - return err - } - if checked { - err := n.execMogrify(queryStartReplica, map[string]interface{}{ - "channel": n.config.ExternalReplicationChannel, - }) - if err != nil { - return err - } - } - return nil -} - -// StopExternalReplication stops external replication -func (n *Node) StopExternalReplication() error { - checked, err := n.IsExternalReplicationSupported() - if err != nil { - return err - } - if checked { - err := n.execMogrify(queryStopReplica, map[string]interface{}{ - "channel": n.config.ExternalReplicationChannel, - }) - if err != nil && !IsErrorChannelDoesNotExists(err) { - return err - } - } - return nil -} - -// ResetExternalReplicationAll resets external replication -func (n *Node) ResetExternalReplicationAll() error { - checked, err := n.IsExternalReplicationSupported() - if err != nil { - return err - } - if checked { - err := n.execMogrify(queryResetReplicaAll, map[string]interface{}{ - "channel": n.config.ExternalReplicationChannel, - }) - if err != nil && !IsErrorChannelDoesNotExists(err) { - return err - } - } - return nil -} - // SemiSyncStatus returns semi sync status func (n *Node) SemiSyncStatus() (*SemiSyncStatus, error) { status := new(SemiSyncStatus) @@ -944,84 +881,6 @@ func (n *Node) GetStartupTime() (time.Time, error) { return time.Unix(int64(startupTime.LastStartup), 0), nil } -func (n *Node) IsExternalReplicationSupported() (bool, error) { - version, err := n.GetVersion() - if err != nil { - return false, err - } - return version.CheckIfExternalReplicationSupported(), nil -} - -func (n *Node) SetExternalReplication() error { - var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) - if err != nil { - // If no table in scheme then we consider external replication not existing so we do nothing - if IsErrorTableDoesNotExists(err) { - return nil - } - // If there is no rows in table for external replication - do nothing - if err == sql.ErrNoRows { - n.logger.Infof("no external replication records found in replication table on host %s", n.host) - return nil - } - return err - } - useSsl := 0 - sslCa := "" - if replSettings.SourceSslCa != "" && n.config.MySQL.ExternalReplicationSslCA != "" { - useSsl = 1 - sslCa = n.config.MySQL.ExternalReplicationSslCA - } - err = n.StopExternalReplication() - if err != nil { - return err - } - err = n.ResetExternalReplicationAll() - if err != nil { - return err - } - err = n.execMogrify(queryChangeSource, map[string]interface{}{ - "host": replSettings.SourceHost, - "port": replSettings.SourcePort, - "user": replSettings.SourceUser, - "password": replSettings.SourcePassword, - "ssl": useSsl, - "sslCa": sslCa, - "sourceDelay": replSettings.SourceDelay, - "retryCount": n.config.MySQL.ReplicationRetryCount, - "connectRetry": n.config.MySQL.ReplicationConnectRetry, - "heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod, - "channel": "external", - }) - if err != nil { - return err - } - err = n.execMogrify(queryIgnoreDB, map[string]interface{}{ - "ignoreList": schemaname("mysql"), - "channel": "external", - }) - if err != nil { - return err - } - if replSettings.ShouldBeRunning() { - return n.StartExternalReplication() - } - return nil -} - -func (n *Node) IsExternalReplicationRunningByUser() bool { - var replSettings replicationSettings - err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) - if err != nil { - return false - } - if replSettings.ShouldBeRunning() { - return true - } - return false -} - func (n *Node) UpdateExternalCAFile() error { var replSettings replicationSettings err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) diff --git a/internal/mysql/replication.go b/internal/mysql/replication.go new file mode 100644 index 00000000..d5908536 --- /dev/null +++ b/internal/mysql/replication.go @@ -0,0 +1,207 @@ +package mysql + +import ( + "database/sql" + + "github.com/yandex/mysync/internal/log" + "github.com/yandex/mysync/internal/util" +) + +type IExternalReplication interface { + IsSupported(*Node) (bool, error) + Set(*Node) error + Reset(*Node) error + Start(*Node) error + GetReplicaStatus(*Node) (ReplicaStatus, error) + Stop(*Node) error + IsRunningByUser(*Node) bool +} + +type UnimplementedExternalReplication struct{} + +func (d *UnimplementedExternalReplication) IsSupported(*Node) (bool, error) { + return false, nil +} + +func (d *UnimplementedExternalReplication) IsRunningByUser(*Node) bool { + return false +} + +func (d *UnimplementedExternalReplication) Set(*Node) error { + return nil +} + +func (d *UnimplementedExternalReplication) Reset(*Node) error { + return nil +} + +func (d *UnimplementedExternalReplication) Start(*Node) error { + return nil +} + +func (d *UnimplementedExternalReplication) GetReplicaStatus(*Node) (ReplicaStatus, error) { + return nil, nil +} + +func (d *UnimplementedExternalReplication) Stop(*Node) error { + return nil +} + +type ExternalReplication struct { + logger *log.Logger +} + +func NewExternalReplication(replicationType util.ExternalReplicationType, logger *log.Logger) (IExternalReplication, error) { + switch replicationType { + case util.MyExternalReplication: + logger.Info("external replication is enabled") + return &ExternalReplication{ + logger: logger, + }, nil + default: + logger.Info("external replication is disabled") + return &UnimplementedExternalReplication{}, nil + } +} + +func (er *ExternalReplication) IsSupported(n *Node) (bool, error) { + version, err := n.GetVersion() + if err != nil { + return false, err + } + return version.CheckIfExternalReplicationSupported(), nil +} + +func (er *ExternalReplication) Set(n *Node) error { + var replSettings replicationSettings + err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) + if err != nil { + // If no table in scheme then we consider external replication not existing so we do nothing + if IsErrorTableDoesNotExists(err) { + return nil + } + // If there is no rows in table for external replication - do nothing + if err == sql.ErrNoRows { + n.logger.Infof("no external replication records found in replication table on host %s", n.host) + return nil + } + return err + } + useSsl := 0 + sslCa := "" + if replSettings.SourceSslCa != "" && n.config.MySQL.ExternalReplicationSslCA != "" { + useSsl = 1 + sslCa = n.config.MySQL.ExternalReplicationSslCA + } + err = er.Stop(n) + if err != nil { + return err + } + err = er.Reset(n) + if err != nil { + return err + } + err = n.execMogrify(queryChangeSource, map[string]interface{}{ + "host": replSettings.SourceHost, + "port": replSettings.SourcePort, + "user": replSettings.SourceUser, + "password": replSettings.SourcePassword, + "ssl": useSsl, + "sslCa": sslCa, + "sourceDelay": replSettings.SourceDelay, + "retryCount": n.config.MySQL.ReplicationRetryCount, + "connectRetry": n.config.MySQL.ReplicationConnectRetry, + "heartbeatPeriod": n.config.MySQL.ReplicationHeartbeatPeriod, + "channel": "external", + }) + if err != nil { + return err + } + err = n.execMogrify(queryIgnoreDB, map[string]interface{}{ + "ignoreList": schemaname("mysql"), + "channel": "external", + }) + if err != nil { + return err + } + if replSettings.ShouldBeRunning() { + return er.Start(n) + } + return nil +} + +func (er *ExternalReplication) IsRunningByUser(n *Node) bool { + var replSettings replicationSettings + err := n.queryRow(queryGetExternalReplicationSettings, nil, &replSettings) + if err != nil { + return false + } + if replSettings.ShouldBeRunning() { + return true + } + return false +} + +// GetExternalReplicaStatus returns slave/replica status or nil if node is master for external channel +func (er *ExternalReplication) GetReplicaStatus(n *Node) (ReplicaStatus, error) { + checked, err := er.IsSupported(n) + if err != nil { + return nil, err + } + if !(checked) { + return nil, nil + } + + return n.ReplicaStatusWithTimeout(n.config.DBTimeout, n.config.ExternalReplicationChannel) +} + +// StartExternalReplication starts external replication +func (er *ExternalReplication) Start(n *Node) error { + checked, err := er.IsSupported(n) + if err != nil { + return err + } + if checked { + err := n.execMogrify(queryStartReplica, map[string]interface{}{ + "channel": n.config.ExternalReplicationChannel, + }) + if err != nil { + return err + } + } + return nil +} + +// StopExternalReplication stops external replication +func (er *ExternalReplication) Stop(n *Node) error { + checked, err := er.IsSupported(n) + if err != nil { + return err + } + if checked { + err := n.execMogrify(queryStopReplica, map[string]interface{}{ + "channel": n.config.ExternalReplicationChannel, + }) + if err != nil && !IsErrorChannelDoesNotExists(err) { + return err + } + } + return nil +} + +// ResetExternalReplicationAll resets external replication +func (er *ExternalReplication) Reset(n *Node) error { + checked, err := er.IsSupported(n) + if err != nil { + return err + } + if checked { + err := n.execMogrify(queryResetReplicaAll, map[string]interface{}{ + "channel": n.config.ExternalReplicationChannel, + }) + if err != nil && !IsErrorChannelDoesNotExists(err) { + return err + } + } + return nil +} diff --git a/internal/util/consts.go b/internal/util/consts.go new file mode 100644 index 00000000..ef421eb7 --- /dev/null +++ b/internal/util/consts.go @@ -0,0 +1,8 @@ +package util + +type ExternalReplicationType string + +const ( + Disabled ExternalReplicationType = "" + MyExternalReplication ExternalReplicationType = "external" +) diff --git a/tests/images/mysql/mysync.yaml b/tests/images/mysql/mysync.yaml index 71db629d..6fac222d 100644 --- a/tests/images/mysql/mysync.yaml +++ b/tests/images/mysql/mysync.yaml @@ -55,3 +55,4 @@ replication_repair_cooldown: 10s replication_repair_aggressive_mode: ${MYSYNC_REPLICATION_REPAIR_AGGRESSIVE_MODE:-false} test_filesystem_readonly_file: /tmp/readonly replication_channel: '' +external_replication_type: 'external'