Skip to content

Commit

Permalink
fix(backend): ro slave 迁移升级的参数处理 #8363
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq authored and iSecloud committed Dec 3, 2024
1 parent 0d072cf commit 02a0589
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const (
// Ins tance TODO
type Ins struct {
native.Instance
dbConn *native.DbWorker `json:"-"`
}

// Proxies TODO
Expand Down Expand Up @@ -202,7 +201,7 @@ func (m *MySQLClusterDetail) CheckAltSlaveMasterAddr() (err error) {
m.MasterIns.Addr(),
)
logger.Error(msg)
return fmt.Errorf(msg)
return errors.New(msg)
}
return err
}
Expand Down Expand Up @@ -257,7 +256,7 @@ func (c *MasterInfo) LockTablesPreCheck(backupUser string) (err error) {

// FlushTablesWithReadLock 执行flush table with read lock
func (c *MasterInfo) FlushTablesWithReadLock() (err error) {
if _, err := c.lockConn.ExecContext(context.Background(), "set lock_wait_timeout = 10;"); err != nil {
if _, err = c.lockConn.ExecContext(context.Background(), "set lock_wait_timeout = 10;"); err != nil {
return err
}
err = util.Retry(
Expand Down Expand Up @@ -302,7 +301,7 @@ func (c *MasterInfo) KillBackupUserProcesslist(backupUser string) (err error) {
if err != nil {
return err
}
if len(processLists) <= 0 {
if len(processLists) == 0 {
logger.Info("没有发现关于备份用户[%s]相关的processlist~", backupUser)
return nil
}
Expand All @@ -321,11 +320,11 @@ func (c *MasterInfo) FindLongQuery() (err error) {
if err != nil {
return err
}
if len(activeProcessLists) <= 0 {
if len(activeProcessLists) == 0 {
return nil
}
var errs []error
errs = []error{errors.New("active processlist exist:\n")}
errs = []error{errors.New("active processlist exist")}
for _, p := range activeProcessLists {
errs = append(
errs, fmt.Errorf(
Expand Down Expand Up @@ -364,7 +363,7 @@ func realVal(v sql.NullString) string {
// return
// }

// RecordBinPos 记录切换时候的bin postion
// RecordBinPos 记录切换时候的bin position
func (s *AltSlaveInfo) RecordBinPos() (binPosJsonStr string, err error) {
pos, _ := s.dbConn.ShowMasterStatus()
logger.Info("show master status on %s,detail: File:%s,Pos:%d", s.Addr(), pos.File, pos.Position)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"dbm-services/mysql/db-tools/dbactuator/pkg/util"
)

// CutOverParam TODO
// CutOverParam cutover 参数
type CutOverParam struct {
Host string `json:"host" validate:"required,ip"`
Cluster *MySQLClusterDetail `json:"cluster"`
Expand Down Expand Up @@ -182,11 +182,12 @@ func (m *CutOverToSlaveComp) Example() interface{} {
func (m *CutOverToSlaveComp) PreCheck() (err error) {
// 以下是强制检查的内容
// 检查下proxy backend 是不是 源Master
if err := m.cluster.CheckBackends(m.cluster.MasterIns.Host, m.cluster.MasterIns.Port); err != nil {
if err = m.cluster.CheckBackends(m.cluster.MasterIns.Host, m.cluster.MasterIns.Port); err != nil {
logger.Error("proxy backend is not %s:%d", m.cluster.MasterIns.Host, m.cluster.MasterIns.Port)
return err
}
// 检查alt Slave repl 的地址不是 cluster.MasterIns
if err := m.cluster.CheckAltSlaveMasterAddr(); err != nil {
if err = m.cluster.CheckAltSlaveMasterAddr(); err != nil {
return err
}

Expand All @@ -203,10 +204,10 @@ func (m *CutOverToSlaveComp) PreCheck() (err error) {
}
// 客户端连接检查
if m.Params.ClientConnCheck {
prcsls, err := m.cluster.AltSlaveIns.dbConn.ShowApplicationProcesslist(m.sysUsers)
if err != nil {
logger.Error("show processlist failed %s", err.Error())
return err
prcsls, errx := m.cluster.AltSlaveIns.dbConn.ShowApplicationProcesslist(m.sysUsers)
if errx != nil {
logger.Error("show processlist failed %s", errx.Error())
return errx
}
if len(prcsls) > 0 {
return fmt.Errorf("there is a connection for non system users %v", prcsls)
Expand All @@ -224,14 +225,16 @@ func (m *CutOverToSlaveComp) PreCheck() (err error) {
if err = m.cluster.AltSlaveIns.dbConn.CheckSlaveReplStatus(func() (resp native.ShowSlaveStatusResp, err error) {
return m.cluster.AltSlaveIns.dbConn.ShowSlaveStatus()
}); err != nil {
logger.Error("检查主从同步状态出错: %s", err.Error())
logger.Error("检查从实例[%s:%d]同步状态出错: %s", m.cluster.AltSlaveIns.Host, m.cluster.AltSlaveIns.Port, err.Error())
return err
}

if m.isCutOverPair {
if err = m.cluster.AltSlaveIns.Slave.dbConn.CheckSlaveReplStatus(func() (resp native.ShowSlaveStatusResp, err error) {
return m.cluster.AltSlaveIns.Slave.dbConn.ShowSlaveStatus()
}); err != nil {
logger.Error("检查从实例[%s:%d]同步状态出错: %s", m.cluster.AltSlaveIns.Slave.Host, m.cluster.AltSlaveIns.Slave.Port,
err.Error())
return err
}
}
Expand Down Expand Up @@ -259,7 +262,10 @@ func (m *CutOverToSlaveComp) PreCheck() (err error) {
func (m *CutOverToSlaveComp) CutOver() (binPos string, err error) {
defer func() {
if m.Params.LockedSwitch {
m.cluster.MasterIns.UnlockTables()
errx := m.cluster.MasterIns.UnlockTables()
if errx != nil {
logger.Error("unlock tables failed %s", errx.Error())
}
}
if err != nil {
e := m.cluster.UpdateProxiesBackend(m.cluster.MasterIns.Host, m.cluster.MasterIns.Port)
Expand Down
36 changes: 24 additions & 12 deletions dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_ha_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ def tendbha_cluster_upgrade_subflow(
bk_host_ids = [new_slave["bk_host_id"]]
old_ro_slave_ip = old_ro_slave["ip"]
old_ro_slave_ips.append(old_ro_slave_ip)
db_config = get_instance_config(cluster_cls.bk_cloud_id, old_ro_slave_ip, ports=ports)
origin_config = get_instance_config(cluster_cls.bk_cloud_id, old_ro_slave_ip, ports=ports)
db_config = deal_mycnf(pkg.name, db_version, origin_config)
install_ro_slave_sub_pipeline = build_install_slave_sub_pipeline(
uid,
root_id,
Expand Down Expand Up @@ -458,17 +459,8 @@ def tendbha_cluster_upgrade_subflow(
ms_sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data)
bk_host_ids = [new_master["bk_host_id"], new_slave["bk_host_id"]]
master = cluster_cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
db_config = get_instance_config(cluster_cls.bk_cloud_id, master.machine.ip, ports)
if mysql_version_parse(db_version) >= mysql_version_parse("5.7.0"):
will_del_keys = ["slave_parallel_type", "replica_parallel_type"]
# 如果不是tmysql的话,需要删除一些配置
if "tmysql" not in pkg.name:
will_del_keys.append("log_bin_compress")
will_del_keys.append("relay_log_uncompress")
for port in db_config:
for key in will_del_keys:
if db_config[port].get(key):
del db_config[port][key]
origin_config = get_instance_config(cluster_cls.bk_cloud_id, master.machine.ip, ports)
db_config = deal_mycnf(pkg.name, db_version, origin_config)
install_ms_pair_subflow = build_install_ms_pair_sub_pipeline(
uid=uid,
root_id=root_id,
Expand Down Expand Up @@ -583,6 +575,26 @@ def tendbha_cluster_upgrade_subflow(
return sub_pipeline.build_sub_process(sub_name=_("{}:整体迁移升级").format(cluster_cls.immute_domain))


def deal_mycnf(pkg_name, db_version: str, db_config: dict):
if mysql_version_parse(db_version) >= ("5.7.0"):
will_del_keys = ["slave_parallel_type", "replica_parallel_type"]
# 如果不是tmysql的话,需要删除一些配置
if "tmysql" not in pkg_name:
will_del_keys.append("log_bin_compress")
will_del_keys.append("relay_log_uncompress")
for port in db_config:
for key in will_del_keys:
if db_config[port].get(key):
del db_config[port][key]
if mysql_version_parse(db_version) >= ("8.0.0"):
will_del_keys = ["innodb_large_prefix"]
for port in db_config:
for key in will_del_keys:
if db_config[port].get(key):
del db_config[port][key]
return db_config


def non_standby_slaves_upgrade_subflow(
uid: str,
root_id: str,
Expand Down

0 comments on commit 02a0589

Please sign in to comment.