Skip to content

Commit

Permalink
refactor(dbm-services): 优化本地升级流程 #8543
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Dec 12, 2024
1 parent af3f501 commit f92a28d
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,17 @@ func (d *UpgradeMySQLAct) Run() (err error) {
FunName: "前置检查",
Func: d.Service.PreCheck,
},
{
FunName: "升级检查",
Func: d.Service.MysqlUpgradeCheck,
},
}
if d.Service.Params.Run {
steps = append(steps, subcmd.StepFunc{
FunName: "升级MySQL",
Func: d.Service.Upgrade,
})
} else {
steps = append(steps, subcmd.StepFunc{
FunName: "升级检查",
Func: d.Service.MysqlUpgradeCheck,
})
}

if err := steps.Run(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"time"

"github.com/samber/lo"
"github.com/spf13/cast"

"dbm-services/common/go-pubpkg/cmutil"
Expand Down Expand Up @@ -120,6 +121,41 @@ cd %s && %s ./bin/mysqld_safe --defaults-file=%s --user=mysql `, mediaDir, numaS
)
}

// StartMysqlInsSpecialErrlog 指定错误日志启动
func (p *StartMySQLParam) StartMysqlInsSpecialErrlog(errLog string) (pid int, err error) {
var (
mediaDir = p.MediaDir
numaStr = osutil.GetNumaStr()
myCnfName = p.MyCnfName
startCmd = fmt.Sprintf(
`ulimit -n 204800;
cd %s && %s ./bin/mysqld_safe --defaults-file=%s --user=mysql `, mediaDir, numaStr, myCnfName,
)
)
if p.SkipSlaveFlag {
startCmd += "--skip-slave-start "
}
if p.SkipGrantTables {
startCmd += " --skip-grant-tables "
}
if lo.IsNotEmpty(errLog) {
startCmd += fmt.Sprintf(" --log-error=%s ", errLog)
}
startCmd += " &"
logger.Info(fmt.Sprintf("execute mysqld_safe: [%s]", startCmd))
pid, err = osutil.RunInBG(false, startCmd)
if err != nil {
logger.Error(fmt.Sprintf("start mysqld_safe failed:%v", err))
return pid, err
}
return pid, util.Retry(
util.RetryConfig{
Times: 40,
DelayTime: 5 * time.Second,
}, func() error { return p.CheckMysqlProcess() },
)
}

// CheckMysqlProcess TODO
func (p *StartMySQLParam) CheckMysqlProcess() (err error) {
if p.MyCnfName == "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,12 @@ func (m *MysqlUpgradeComp) MysqlUpgradeCheck() (err error) {
}
}
// table check
if err = conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion); err != nil {
logger.Error("check table upgrade failed %s", err.Error())
return err
errs := conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion)
if len(errs) > 0 {
for _, err := range errs {
logger.Error("port:[%d]: check table upgrade error: %s", port, err.Error())
}
return fmt.Errorf("check table upgrade failed, port: %d, errors: %v", port, errs)
}
}
return
Expand Down Expand Up @@ -283,7 +286,7 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) {
MediaDir: cst.MysqldInstallPath,
}
logger.Info("start mysql for %d", port)
pid, err := start.StartMysqlInstance()
pid, err := start.StartMysqlInsSpecialErrlog(fmt.Sprintf("relink-media-firsrt-start-%d.log", port))
if err != nil {
logger.Error("start mysql %d failed %s", err.Error())
return err
Expand All @@ -308,6 +311,26 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) {
logger.Info("Upgrading to MySQL version>=8.0.16, remaining upgrade procedure is skipped.")
return nil
}
// 处理分区表升级
if m.newVersion.MysqlVersion >= native.MYSQL_5P70 && m.newVersion.MysqlVersion < native.MYSQL_8P0 {
// logger.Info("Upgrading to MySQL version>=5.7.0, remaining upgrade procedure is skipped.")
pdata, errx := dbConn.GetPartitionSchema()
if errx != nil {
logger.Error("get partition schema failed %s", errx.Error())
return errx
}
if len(pdata) > 0 {
for _, p := range pdata {
usql := fmt.Sprintf("ALTER TABLE `%s`.`%s` UPGRADE PARTITIONING", p.TableSchema, p.TableName)
logger.Info("upgrade partition sql: %s", usql)
_, err = dbConn.Exec(usql)
if err != nil {
logger.Error("upgrade partition table %s.%s failed %s", p.TableSchema, p.TableName, err.Error())
return err
}
}
}
}
logger.Info("do mysqlcheck for %d", port)
if err = m.mysqlCheck(dbConn, port); err != nil {
logger.Error("do %d mysqlcheck failed %s", port, err.Error())
Expand All @@ -323,6 +346,13 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) {
logger.Error("do %d additionalActions failed %s", port, err.Error())
return err
}
logger.Info("do mysql restart for %d", port)
pid, err = start.RestartMysqlInstance()
if err != nil {
logger.Error("restart mysql %d failed %s", port, err.Error())
return err
}
logger.Info("restart mysql %d success,pid is %d", port, pid)
}
return nil
}
Expand Down
61 changes: 41 additions & 20 deletions dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,13 @@ type TableInfo struct {
RowFormat string `db:"ROW_FORMAT"`
}

// CheckTableUpgrade TODO
func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err error) {
// TableNameIsValid TODO
func (h *DbWorker) TableNameIsValid(currentVersion, newVersion uint64) (errs []error) {
type checkFunc struct {
fn func(currentVersion, newVersion uint64) error
desc string
}
var err error
// 库表名关键字检查
fns := []checkFunc{}
fns = append(fns, checkFunc{
Expand Down Expand Up @@ -257,14 +258,20 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(currentVersion, newVersion); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
type checkFuncNoparam struct {
fn func() error
desc string
}
// 非法字符检查
return errs
}

type checkFuncNoparam struct {
fn func() error
desc string
}

// IllegalCharacterCheck TODO
func (h *DbWorker) IllegalCharacterCheck() (errs []error) {
var err error
fnns := []checkFuncNoparam{}
fnns = append(fnns, checkFuncNoparam{
fn: h.tableNameAsciiCodeCheck,
Expand Down Expand Up @@ -294,9 +301,19 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
return errs
}

// CheckTableUpgrade 检查表是否满足升级条件
func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (errs []error) {
var err error
errs = append(errs, h.TableNameIsValid(currentVersion, newVersion)...)
// 非法字符检查
errs = append(errs, h.IllegalCharacterCheck()...)

switch {
// 当准备升级到8.0版本
case newVersion >= MYSQL_8P0 && currentVersion < MYSQL_8P0:
Expand Down Expand Up @@ -330,7 +347,7 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
// 当准备升级到5.7版本
Expand Down Expand Up @@ -358,19 +375,19 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
// 当准备升级到5.6版本
case newVersion >= MYSQL_5P60 && currentVersion < MYSQL_5P60:
// per-4.1 password check
logger.Info("准备升级到MySQL5.6 需要做这些额外的检查...")
if err = h.passwordCheck(); err != nil {
return err
errs = append(errs, fmt.Errorf("%v", err.Error()))
}

}
return nil
return errs
}

func (h *DbWorker) getKeyWords(currentVersion, newVersion uint64) []string {
Expand Down Expand Up @@ -765,6 +782,15 @@ func (h *DbWorker) passwordCheck() (err error) {
// handler to use the native partitioning handler instead, run mysql_upgrade.
func (h *DbWorker) partitionCheck() (err error) {
var data []TableInfo
data, err = h.GetPartitionSchema()
if len(data) > 0 {
return fmt.Errorf("%v found partition name,but it is not allowed", data)
}
return nil
}

// GetPartitionSchema 获取分区表
func (h *DbWorker) GetPartitionSchema() (data []TableInfo, err error) {
q := `
select TABLE_SCHEMA,
TABLE_NAME,
Expand All @@ -774,13 +800,8 @@ where PARTITION_NAME is not NULL
group by 1,
2;
`
if err = h.Queryx(&data, q); err != nil {
return err
}
if len(data) > 0 {
return fmt.Errorf("%v found partition name,but it is not allowed", data)
}
return nil
err = h.Queryx(&data, q)
return data, err
}

func (h *DbWorker) tokudbEngineCheck() (err error) {
Expand Down

0 comments on commit f92a28d

Please sign in to comment.