From f92a28dac00a9d338760a1653b3ff2b445d124ca Mon Sep 17 00:00:00 2001 From: yuanruji Date: Wed, 11 Dec 2024 16:21:27 +0800 Subject: [PATCH] =?UTF-8?q?refactor(dbm-services):=20=E4=BC=98=E5=8C=96?= =?UTF-8?q?=E6=9C=AC=E5=9C=B0=E5=8D=87=E7=BA=A7=E6=B5=81=E7=A8=8B=20#8543?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../internal/subcmd/mysqlcmd/mysql_upgrade.go | 9 +-- .../pkg/components/computil/mysql_operate.go | 36 +++++++++++ .../pkg/components/mysql/mysql_upgrade.go | 38 ++++++++++-- .../dbactuator/pkg/native/upgrade_tool.go | 61 +++++++++++++------ 4 files changed, 116 insertions(+), 28 deletions(-) diff --git a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go index 8bfb2be241..deb47da3e3 100644 --- a/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go +++ b/dbm-services/mysql/db-tools/dbactuator/internal/subcmd/mysqlcmd/mysql_upgrade.go @@ -73,16 +73,17 @@ func (d *UpgradeMySQLAct) Run() (err error) { FunName: "前置检查", Func: d.Service.PreCheck, }, - { - FunName: "升级检查", - Func: d.Service.MysqlUpgradeCheck, - }, } if d.Service.Params.Run { steps = append(steps, subcmd.StepFunc{ FunName: "升级MySQL", Func: d.Service.Upgrade, }) + } else { + steps = append(steps, subcmd.StepFunc{ + FunName: "升级检查", + Func: d.Service.MysqlUpgradeCheck, + }) } if err := steps.Run(); err != nil { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/computil/mysql_operate.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/computil/mysql_operate.go index a5f7872ceb..dec7054252 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/computil/mysql_operate.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/computil/mysql_operate.go @@ -16,6 +16,7 @@ import ( "strings" "time" + "github.com/samber/lo" "github.com/spf13/cast" "dbm-services/common/go-pubpkg/cmutil" @@ -120,6 +121,41 @@ cd %s && %s ./bin/mysqld_safe --defaults-file=%s --user=mysql `, mediaDir, numaS ) } +// StartMysqlInsSpecialErrlog 指定错误日志启动 +func (p *StartMySQLParam) StartMysqlInsSpecialErrlog(errLog string) (pid int, err error) { + var ( + mediaDir = p.MediaDir + numaStr = osutil.GetNumaStr() + myCnfName = p.MyCnfName + startCmd = fmt.Sprintf( + `ulimit -n 204800; +cd %s && %s ./bin/mysqld_safe --defaults-file=%s --user=mysql `, mediaDir, numaStr, myCnfName, + ) + ) + if p.SkipSlaveFlag { + startCmd += "--skip-slave-start " + } + if p.SkipGrantTables { + startCmd += " --skip-grant-tables " + } + if lo.IsNotEmpty(errLog) { + startCmd += fmt.Sprintf(" --log-error=%s ", errLog) + } + startCmd += " &" + logger.Info(fmt.Sprintf("execute mysqld_safe: [%s]", startCmd)) + pid, err = osutil.RunInBG(false, startCmd) + if err != nil { + logger.Error(fmt.Sprintf("start mysqld_safe failed:%v", err)) + return pid, err + } + return pid, util.Retry( + util.RetryConfig{ + Times: 40, + DelayTime: 5 * time.Second, + }, func() error { return p.CheckMysqlProcess() }, + ) +} + // CheckMysqlProcess TODO func (p *StartMySQLParam) CheckMysqlProcess() (err error) { if p.MyCnfName == "" { diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go index c388c0bdcb..947a576765 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/components/mysql/mysql_upgrade.go @@ -235,9 +235,12 @@ func (m *MysqlUpgradeComp) MysqlUpgradeCheck() (err error) { } } // table check - if err = conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion); err != nil { - logger.Error("check table upgrade failed %s", err.Error()) - return err + errs := conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion) + if len(errs) > 0 { + for _, err := range errs { + logger.Error("port:[%d]: check table upgrade error: %s", port, err.Error()) + } + return fmt.Errorf("check table upgrade failed, port: %d, errors: %v", port, errs) } } return @@ -283,7 +286,7 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) { MediaDir: cst.MysqldInstallPath, } logger.Info("start mysql for %d", port) - pid, err := start.StartMysqlInstance() + pid, err := start.StartMysqlInsSpecialErrlog(fmt.Sprintf("relink-media-firsrt-start-%d.log", port)) if err != nil { logger.Error("start mysql %d failed %s", err.Error()) return err @@ -308,6 +311,26 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) { logger.Info("Upgrading to MySQL version>=8.0.16, remaining upgrade procedure is skipped.") return nil } + // 处理分区表升级 + if m.newVersion.MysqlVersion >= native.MYSQL_5P70 && m.newVersion.MysqlVersion < native.MYSQL_8P0 { + // logger.Info("Upgrading to MySQL version>=5.7.0, remaining upgrade procedure is skipped.") + pdata, errx := dbConn.GetPartitionSchema() + if errx != nil { + logger.Error("get partition schema failed %s", errx.Error()) + return errx + } + if len(pdata) > 0 { + for _, p := range pdata { + usql := fmt.Sprintf("ALTER TABLE `%s`.`%s` UPGRADE PARTITIONING", p.TableSchema, p.TableName) + logger.Info("upgrade partition sql: %s", usql) + _, err = dbConn.Exec(usql) + if err != nil { + logger.Error("upgrade partition table %s.%s failed %s", p.TableSchema, p.TableName, err.Error()) + return err + } + } + } + } logger.Info("do mysqlcheck for %d", port) if err = m.mysqlCheck(dbConn, port); err != nil { logger.Error("do %d mysqlcheck failed %s", port, err.Error()) @@ -323,6 +346,13 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) { logger.Error("do %d additionalActions failed %s", port, err.Error()) return err } + logger.Info("do mysql restart for %d", port) + pid, err = start.RestartMysqlInstance() + if err != nil { + logger.Error("restart mysql %d failed %s", port, err.Error()) + return err + } + logger.Info("restart mysql %d success,pid is %d", port, pid) } return nil } diff --git a/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go b/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go index fc85ec6249..3303a96232 100644 --- a/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go +++ b/dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go @@ -222,12 +222,13 @@ type TableInfo struct { RowFormat string `db:"ROW_FORMAT"` } -// CheckTableUpgrade TODO -func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err error) { +// TableNameIsValid TODO +func (h *DbWorker) TableNameIsValid(currentVersion, newVersion uint64) (errs []error) { type checkFunc struct { fn func(currentVersion, newVersion uint64) error desc string } + var err error // 库表名关键字检查 fns := []checkFunc{} fns = append(fns, checkFunc{ @@ -257,14 +258,20 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err logger.Info("start check %s ...", f.desc) if err = f.fn(currentVersion, newVersion); err != nil { logger.Error("when check %s,failed %s", f.desc, err.Error()) - return err + errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error())) } } - type checkFuncNoparam struct { - fn func() error - desc string - } - // 非法字符检查 + return errs +} + +type checkFuncNoparam struct { + fn func() error + desc string +} + +// IllegalCharacterCheck TODO +func (h *DbWorker) IllegalCharacterCheck() (errs []error) { + var err error fnns := []checkFuncNoparam{} fnns = append(fnns, checkFuncNoparam{ fn: h.tableNameAsciiCodeCheck, @@ -294,9 +301,19 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err logger.Info("start check %s ...", f.desc) if err = f.fn(); err != nil { logger.Error("when check %s,failed %s", f.desc, err.Error()) - return err + errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error())) } } + return errs +} + +// CheckTableUpgrade 检查表是否满足升级条件 +func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (errs []error) { + var err error + errs = append(errs, h.TableNameIsValid(currentVersion, newVersion)...) + // 非法字符检查 + errs = append(errs, h.IllegalCharacterCheck()...) + switch { // 当准备升级到8.0版本 case newVersion >= MYSQL_8P0 && currentVersion < MYSQL_8P0: @@ -330,7 +347,7 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err logger.Info("start check %s ...", f.desc) if err = f.fn(); err != nil { logger.Error("when check %s,failed %s", f.desc, err.Error()) - return err + errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error())) } } // 当准备升级到5.7版本 @@ -358,7 +375,7 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err logger.Info("start check %s ...", f.desc) if err = f.fn(); err != nil { logger.Error("when check %s,failed %s", f.desc, err.Error()) - return err + errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error())) } } // 当准备升级到5.6版本 @@ -366,11 +383,11 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err // per-4.1 password check logger.Info("准备升级到MySQL5.6 需要做这些额外的检查...") if err = h.passwordCheck(); err != nil { - return err + errs = append(errs, fmt.Errorf("%v", err.Error())) } } - return nil + return errs } func (h *DbWorker) getKeyWords(currentVersion, newVersion uint64) []string { @@ -765,6 +782,15 @@ func (h *DbWorker) passwordCheck() (err error) { // handler to use the native partitioning handler instead, run mysql_upgrade. func (h *DbWorker) partitionCheck() (err error) { var data []TableInfo + data, err = h.GetPartitionSchema() + if len(data) > 0 { + return fmt.Errorf("%v found partition name,but it is not allowed", data) + } + return nil +} + +// GetPartitionSchema 获取分区表 +func (h *DbWorker) GetPartitionSchema() (data []TableInfo, err error) { q := ` select TABLE_SCHEMA, TABLE_NAME, @@ -774,13 +800,8 @@ where PARTITION_NAME is not NULL group by 1, 2; ` - if err = h.Queryx(&data, q); err != nil { - return err - } - if len(data) > 0 { - return fmt.Errorf("%v found partition name,but it is not allowed", data) - } - return nil + err = h.Queryx(&data, q) + return data, err } func (h *DbWorker) tokudbEngineCheck() (err error) {