From 02a0589ae5bc29934578d85bda7002eb34d42942 Mon Sep 17 00:00:00 2001 From: yuanruji Date: Tue, 3 Dec 2024 09:37:52 +0800 Subject: [PATCH] =?UTF-8?q?fix(backend):=20ro=20slave=20=E8=BF=81=E7=A7=BB?= =?UTF-8?q?=E5=8D=87=E7=BA=A7=E7=9A=84=E5=8F=82=E6=95=B0=E5=A4=84=E7=90=86?= =?UTF-8?q?=20#8363?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../pkg/components/mysql/cutover/base.go | 13 ++++--- .../pkg/components/mysql/cutover/cutover.go | 24 ++++++++----- .../bamboo/scene/mysql/mysql_ha_upgrade.py | 36 ++++++++++++------- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/cutover/base.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/cutover/base.go index eb71953ccc..51e0d61720 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/cutover/base.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/cutover/base.go @@ -43,7 +43,6 @@ const ( // Ins tance TODO type Ins struct { native.Instance - dbConn *native.DbWorker `json:"-"` } // Proxies TODO @@ -202,7 +201,7 @@ func (m *MySQLClusterDetail) CheckAltSlaveMasterAddr() (err error) { m.MasterIns.Addr(), ) logger.Error(msg) - return fmt.Errorf(msg) + return errors.New(msg) } return err } @@ -257,7 +256,7 @@ func (c *MasterInfo) LockTablesPreCheck(backupUser string) (err error) { // FlushTablesWithReadLock 执行flush table with read lock func (c *MasterInfo) FlushTablesWithReadLock() (err error) { - if _, err := c.lockConn.ExecContext(context.Background(), "set lock_wait_timeout = 10;"); err != nil { + if _, err = c.lockConn.ExecContext(context.Background(), "set lock_wait_timeout = 10;"); err != nil { return err } err = util.Retry( @@ -302,7 +301,7 @@ func (c *MasterInfo) KillBackupUserProcesslist(backupUser string) (err error) { if err != nil { return err } - if len(processLists) <= 0 { + if len(processLists) == 0 { logger.Info("没有发现关于备份用户[%s]相关的processlist~", backupUser) return nil } @@ -321,11 +320,11 @@ func (c *MasterInfo) FindLongQuery() (err error) { if err != nil { return err } - if len(activeProcessLists) <= 0 { + if len(activeProcessLists) == 0 { return nil } var errs []error - errs = []error{errors.New("active processlist exist:\n")} + errs = []error{errors.New("active processlist exist")} for _, p := range activeProcessLists { errs = append( errs, fmt.Errorf( @@ -364,7 +363,7 @@ func realVal(v sql.NullString) string { // return // } -// RecordBinPos 记录切换时候的bin postion +// RecordBinPos 记录切换时候的bin position func (s *AltSlaveInfo) RecordBinPos() (binPosJsonStr string, err error) { pos, _ := s.dbConn.ShowMasterStatus() logger.Info("show master status on %s,detail: File:%s,Pos:%d", s.Addr(), pos.File, pos.Position) diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/cutover/cutover.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/cutover/cutover.go index 820213d662..4363265f72 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/cutover/cutover.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/cutover/cutover.go @@ -23,7 +23,7 @@ import ( "dbm-services/mysql/db-tools/dbactuator/pkg/util" ) -// CutOverParam TODO +// CutOverParam cutover 参数 type CutOverParam struct { Host string `json:"host" validate:"required,ip"` Cluster *MySQLClusterDetail `json:"cluster"` @@ -182,11 +182,12 @@ func (m *CutOverToSlaveComp) Example() interface{} { func (m *CutOverToSlaveComp) PreCheck() (err error) { // 以下是强制检查的内容 // 检查下proxy backend 是不是 源Master - if err := m.cluster.CheckBackends(m.cluster.MasterIns.Host, m.cluster.MasterIns.Port); err != nil { + if err = m.cluster.CheckBackends(m.cluster.MasterIns.Host, m.cluster.MasterIns.Port); err != nil { + logger.Error("proxy backend is not %s:%d", m.cluster.MasterIns.Host, m.cluster.MasterIns.Port) return err } // 检查alt Slave repl 的地址不是 cluster.MasterIns - if err := m.cluster.CheckAltSlaveMasterAddr(); err != nil { + if err = m.cluster.CheckAltSlaveMasterAddr(); err != nil { return err } @@ -203,10 +204,10 @@ func (m *CutOverToSlaveComp) PreCheck() (err error) { } // 客户端连接检查 if m.Params.ClientConnCheck { - prcsls, err := m.cluster.AltSlaveIns.dbConn.ShowApplicationProcesslist(m.sysUsers) - if err != nil { - logger.Error("show processlist failed %s", err.Error()) - return err + prcsls, errx := m.cluster.AltSlaveIns.dbConn.ShowApplicationProcesslist(m.sysUsers) + if errx != nil { + logger.Error("show processlist failed %s", errx.Error()) + return errx } if len(prcsls) > 0 { return fmt.Errorf("there is a connection for non system users %v", prcsls) @@ -224,7 +225,7 @@ func (m *CutOverToSlaveComp) PreCheck() (err error) { if err = m.cluster.AltSlaveIns.dbConn.CheckSlaveReplStatus(func() (resp native.ShowSlaveStatusResp, err error) { return m.cluster.AltSlaveIns.dbConn.ShowSlaveStatus() }); err != nil { - logger.Error("检查主从同步状态出错: %s", err.Error()) + logger.Error("检查从实例[%s:%d]同步状态出错: %s", m.cluster.AltSlaveIns.Host, m.cluster.AltSlaveIns.Port, err.Error()) return err } @@ -232,6 +233,8 @@ func (m *CutOverToSlaveComp) PreCheck() (err error) { if err = m.cluster.AltSlaveIns.Slave.dbConn.CheckSlaveReplStatus(func() (resp native.ShowSlaveStatusResp, err error) { return m.cluster.AltSlaveIns.Slave.dbConn.ShowSlaveStatus() }); err != nil { + logger.Error("检查从实例[%s:%d]同步状态出错: %s", m.cluster.AltSlaveIns.Slave.Host, m.cluster.AltSlaveIns.Slave.Port, + err.Error()) return err } } @@ -259,7 +262,10 @@ func (m *CutOverToSlaveComp) PreCheck() (err error) { func (m *CutOverToSlaveComp) CutOver() (binPos string, err error) { defer func() { if m.Params.LockedSwitch { - m.cluster.MasterIns.UnlockTables() + errx := m.cluster.MasterIns.UnlockTables() + if errx != nil { + logger.Error("unlock tables failed %s", errx.Error()) + } } if err != nil { e := m.cluster.UpdateProxiesBackend(m.cluster.MasterIns.Host, m.cluster.MasterIns.Port) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_ha_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_ha_upgrade.py index a9cd6d1b99..25335888c1 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_ha_upgrade.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_ha_upgrade.py @@ -381,7 +381,8 @@ def tendbha_cluster_upgrade_subflow( bk_host_ids = [new_slave["bk_host_id"]] old_ro_slave_ip = old_ro_slave["ip"] old_ro_slave_ips.append(old_ro_slave_ip) - db_config = get_instance_config(cluster_cls.bk_cloud_id, old_ro_slave_ip, ports=ports) + origin_config = get_instance_config(cluster_cls.bk_cloud_id, old_ro_slave_ip, ports=ports) + db_config = deal_mycnf(pkg.name, db_version, origin_config) install_ro_slave_sub_pipeline = build_install_slave_sub_pipeline( uid, root_id, @@ -458,17 +459,8 @@ def tendbha_cluster_upgrade_subflow( ms_sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) bk_host_ids = [new_master["bk_host_id"], new_slave["bk_host_id"]] master = cluster_cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) - db_config = get_instance_config(cluster_cls.bk_cloud_id, master.machine.ip, ports) - if mysql_version_parse(db_version) >= mysql_version_parse("5.7.0"): - will_del_keys = ["slave_parallel_type", "replica_parallel_type"] - # 如果不是tmysql的话,需要删除一些配置 - if "tmysql" not in pkg.name: - will_del_keys.append("log_bin_compress") - will_del_keys.append("relay_log_uncompress") - for port in db_config: - for key in will_del_keys: - if db_config[port].get(key): - del db_config[port][key] + origin_config = get_instance_config(cluster_cls.bk_cloud_id, master.machine.ip, ports) + db_config = deal_mycnf(pkg.name, db_version, origin_config) install_ms_pair_subflow = build_install_ms_pair_sub_pipeline( uid=uid, root_id=root_id, @@ -583,6 +575,26 @@ def tendbha_cluster_upgrade_subflow( return sub_pipeline.build_sub_process(sub_name=_("{}:整体迁移升级").format(cluster_cls.immute_domain)) +def deal_mycnf(pkg_name, db_version: str, db_config: dict): + if mysql_version_parse(db_version) >= ("5.7.0"): + will_del_keys = ["slave_parallel_type", "replica_parallel_type"] + # 如果不是tmysql的话,需要删除一些配置 + if "tmysql" not in pkg_name: + will_del_keys.append("log_bin_compress") + will_del_keys.append("relay_log_uncompress") + for port in db_config: + for key in will_del_keys: + if db_config[port].get(key): + del db_config[port][key] + if mysql_version_parse(db_version) >= ("8.0.0"): + will_del_keys = ["innodb_large_prefix"] + for port in db_config: + for key in will_del_keys: + if db_config[port].get(key): + del db_config[port][key] + return db_config + + def non_standby_slaves_upgrade_subflow( uid: str, root_id: str,