Skip to content

Commit

Permalink
refactor(dbm-services): 优化本地升级流程 #8543
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Dec 12, 2024
1 parent af3f501 commit 0090a71
Show file tree
Hide file tree
Showing 5 changed files with 142 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,17 @@ func (d *UpgradeMySQLAct) Run() (err error) {
FunName: "前置检查",
Func: d.Service.PreCheck,
},
{
FunName: "升级检查",
Func: d.Service.MysqlUpgradeCheck,
},
}
if d.Service.Params.Run {
steps = append(steps, subcmd.StepFunc{
FunName: "升级MySQL",
Func: d.Service.Upgrade,
})
} else {
steps = append(steps, subcmd.StepFunc{
FunName: "升级检查",
Func: d.Service.MysqlUpgradeCheck,
})
}

if err := steps.Run(); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"time"

"github.com/samber/lo"
"github.com/spf13/cast"

"dbm-services/common/go-pubpkg/cmutil"
Expand Down Expand Up @@ -120,6 +121,41 @@ cd %s && %s ./bin/mysqld_safe --defaults-file=%s --user=mysql `, mediaDir, numaS
)
}

// StartMysqlInsSpecialErrlog 指定错误日志启动
func (p *StartMySQLParam) StartMysqlInsSpecialErrlog(errLog string) (pid int, err error) {
var (
mediaDir = p.MediaDir
numaStr = osutil.GetNumaStr()
myCnfName = p.MyCnfName
startCmd = fmt.Sprintf(
`ulimit -n 204800;
cd %s && %s ./bin/mysqld_safe --defaults-file=%s --user=mysql `, mediaDir, numaStr, myCnfName,
)
)
if p.SkipSlaveFlag {
startCmd += "--skip-slave-start "
}
if p.SkipGrantTables {
startCmd += " --skip-grant-tables "
}
if lo.IsNotEmpty(errLog) {
startCmd += fmt.Sprintf(" --log-error=%s ", errLog)
}
startCmd += " &"
logger.Info(fmt.Sprintf("execute mysqld_safe: [%s]", startCmd))
pid, err = osutil.RunInBG(false, startCmd)
if err != nil {
logger.Error(fmt.Sprintf("start mysqld_safe failed:%v", err))
return pid, err
}
return pid, util.Retry(
util.RetryConfig{
Times: 40,
DelayTime: 5 * time.Second,
}, func() error { return p.CheckMysqlProcess() },
)
}

// CheckMysqlProcess TODO
func (p *StartMySQLParam) CheckMysqlProcess() (err error) {
if p.MyCnfName == "" {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,12 @@ func (m *MysqlUpgradeComp) MysqlUpgradeCheck() (err error) {
}
}
// table check
if err = conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion); err != nil {
logger.Error("check table upgrade failed %s", err.Error())
return err
errs := conn.CheckTableUpgrade(currentVer.MysqlVersion, m.newVersion.MysqlVersion)
if len(errs) > 0 {
for _, err := range errs {
logger.Error("port:[%d]: check table upgrade error: %s", port, err.Error())
}
return fmt.Errorf("check table upgrade failed, port: %d, errors: %v", port, errs)
}
}
return
Expand Down Expand Up @@ -283,7 +286,7 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) {
MediaDir: cst.MysqldInstallPath,
}
logger.Info("start mysql for %d", port)
pid, err := start.StartMysqlInstance()
pid, err := start.StartMysqlInsSpecialErrlog(fmt.Sprintf("relink-media-firsrt-start-%d.log", port))
if err != nil {
logger.Error("start mysql %d failed %s", err.Error())
return err
Expand All @@ -308,6 +311,26 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) {
logger.Info("Upgrading to MySQL version>=8.0.16, remaining upgrade procedure is skipped.")
return nil
}
// 处理分区表升级
if m.newVersion.MysqlVersion >= native.MYSQL_5P70 && m.newVersion.MysqlVersion < native.MYSQL_8P0 {
// logger.Info("Upgrading to MySQL version>=5.7.0, remaining upgrade procedure is skipped.")
pdata, errx := dbConn.GetPartitionSchema()
if errx != nil {
logger.Error("get partition schema failed %s", errx.Error())
return errx
}
if len(pdata) > 0 {
for _, p := range pdata {
usql := fmt.Sprintf("ALTER TABLE `%s`.`%s` UPGRADE PARTITIONING", p.TableSchema, p.TableName)
logger.Info("upgrade partition sql: %s", usql)
_, err = dbConn.Exec(usql)
if err != nil {
logger.Error("upgrade partition table %s.%s failed %s", p.TableSchema, p.TableName, err.Error())
return err
}
}
}
}
logger.Info("do mysqlcheck for %d", port)
if err = m.mysqlCheck(dbConn, port); err != nil {
logger.Error("do %d mysqlcheck failed %s", port, err.Error())
Expand All @@ -323,6 +346,13 @@ func (m *MysqlUpgradeComp) Upgrade() (err error) {
logger.Error("do %d additionalActions failed %s", port, err.Error())
return err
}
logger.Info("do mysql restart for %d", port)
pid, err = start.RestartMysqlInstance()
if err != nil {
logger.Error("restart mysql %d failed %s", port, err.Error())
return err
}
logger.Info("restart mysql %d success,pid is %d", port, pid)
}
return nil
}
Expand Down
61 changes: 41 additions & 20 deletions dbm-services/mysql/db-tools/dbactuator/pkg/native/upgrade_tool.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,13 @@ type TableInfo struct {
RowFormat string `db:"ROW_FORMAT"`
}

// CheckTableUpgrade TODO
func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err error) {
// TableNameIsValid TODO
func (h *DbWorker) TableNameIsValid(currentVersion, newVersion uint64) (errs []error) {
type checkFunc struct {
fn func(currentVersion, newVersion uint64) error
desc string
}
var err error
// 库表名关键字检查
fns := []checkFunc{}
fns = append(fns, checkFunc{
Expand Down Expand Up @@ -257,14 +258,20 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(currentVersion, newVersion); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
type checkFuncNoparam struct {
fn func() error
desc string
}
// 非法字符检查
return errs
}

type checkFuncNoparam struct {
fn func() error
desc string
}

// IllegalCharacterCheck TODO
func (h *DbWorker) IllegalCharacterCheck() (errs []error) {
var err error
fnns := []checkFuncNoparam{}
fnns = append(fnns, checkFuncNoparam{
fn: h.tableNameAsciiCodeCheck,
Expand Down Expand Up @@ -294,9 +301,19 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
return errs
}

// CheckTableUpgrade 检查表是否满足升级条件
func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (errs []error) {
var err error
errs = append(errs, h.TableNameIsValid(currentVersion, newVersion)...)
// 非法字符检查
errs = append(errs, h.IllegalCharacterCheck()...)

switch {
// 当准备升级到8.0版本
case newVersion >= MYSQL_8P0 && currentVersion < MYSQL_8P0:
Expand Down Expand Up @@ -330,7 +347,7 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
// 当准备升级到5.7版本
Expand Down Expand Up @@ -358,19 +375,19 @@ func (h *DbWorker) CheckTableUpgrade(currentVersion, newVersion uint64) (err err
logger.Info("start check %s ...", f.desc)
if err = f.fn(); err != nil {
logger.Error("when check %s,failed %s", f.desc, err.Error())
return err
errs = append(errs, fmt.Errorf("[%s]:%v", f.desc, err.Error()))
}
}
// 当准备升级到5.6版本
case newVersion >= MYSQL_5P60 && currentVersion < MYSQL_5P60:
// per-4.1 password check
logger.Info("准备升级到MySQL5.6 需要做这些额外的检查...")
if err = h.passwordCheck(); err != nil {
return err
errs = append(errs, fmt.Errorf("%v", err.Error()))
}

}
return nil
return errs
}

func (h *DbWorker) getKeyWords(currentVersion, newVersion uint64) []string {
Expand Down Expand Up @@ -765,6 +782,15 @@ func (h *DbWorker) passwordCheck() (err error) {
// handler to use the native partitioning handler instead, run mysql_upgrade.
func (h *DbWorker) partitionCheck() (err error) {
var data []TableInfo
data, err = h.GetPartitionSchema()
if len(data) > 0 {
return fmt.Errorf("%v found partition name,but it is not allowed", data)
}
return nil
}

// GetPartitionSchema 获取分区表
func (h *DbWorker) GetPartitionSchema() (data []TableInfo, err error) {
q := `
select TABLE_SCHEMA,
TABLE_NAME,
Expand All @@ -774,13 +800,8 @@ where PARTITION_NAME is not NULL
group by 1,
2;
`
if err = h.Queryx(&data, q); err != nil {
return err
}
if len(data) > 0 {
return fmt.Errorf("%v found partition name,but it is not allowed", data)
}
return nil
err = h.Queryx(&data, q)
return data, err
}

func (h *DbWorker) tokudbEngineCheck() (err error) {
Expand Down
40 changes: 26 additions & 14 deletions dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from backend.flow.consts import InstanceStatus, MediumEnum
from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder
from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList
from backend.flow.engine.bamboo.scene.mysql.common.common_sub_flow import build_surrounding_apps_sub_flow
from backend.flow.engine.bamboo.scene.mysql.common.master_and_slave_switch import master_and_slave_switch
from backend.flow.engine.bamboo.scene.mysql.mysql_migrate_cluster_remote_flow import MySQLMigrateClusterRemoteFlow
from backend.flow.plugins.components.collections.common.pause import PauseComponent
Expand Down Expand Up @@ -196,13 +197,6 @@ def __get_clusters_master_instance(self, cluster_ids: list):
)
return instances

def __get_tendbsingle_instance(self, cluster_ids: list):
clusters = Cluster.objects.filter(id__in=cluster_ids)
instances = StorageInstance.objects.filter(
cluster__in=clusters, machine_type=MachineType.SINGLE, instance_role=InstanceRole.ORPHAN
)
return instances

def __get_pkg_name_by_pkg_id(self, pkg_id: int) -> str:
# 获取大版本的最新的包名
mysql_pkg = Package.objects.get(id=pkg_id, pkg_type=MediumEnum.MySQL, db_type=DBType.MySQL)
Expand All @@ -212,6 +206,9 @@ def upgrade_mysql_flow(self):
mysql_upgrade_pipeline = Builder(root_id=self.root_id, data=self.data)
sub_pipelines = []
cluster_ids = []
reinstall_ip_list = []
cluster_type = None
bk_cloud_id = 0
# 声明子流程
for upgrade_info in self.upgrade_cluster_list:
sub_flow_context = copy.deepcopy(self.data)
Expand All @@ -224,8 +221,6 @@ def upgrade_mysql_flow(self):
sub_pipeline = SubBuilder(
root_id=self.root_id, data=copy.deepcopy(sub_flow_context), need_random_pass_cluster_ids=cluster_ids
)
# 取集群列表中的第一个集群类型
cluster_type = None
first_cluster = Cluster.objects.filter(id__in=cluster_ids).first()
if first_cluster:
cluster_type = first_cluster.cluster_type
Expand All @@ -246,6 +241,7 @@ def upgrade_mysql_flow(self):
raise DBMetaException(message=_("集群的master应该同属于一个机器,当前分布在{}").format(list(set(master_ip_list))))

master_ip = master_ip_list[0]
reinstall_ip_list.append(master_ip)
port_map = defaultdict(list)
for slave_instance in slave_instances:
port_map[slave_instance.machine.ip].append(slave_instance.port)
Expand Down Expand Up @@ -345,33 +341,49 @@ def upgrade_mysql_flow(self):

# tendbsingle 本地升级
elif cluster_type == ClusterType.TenDBSingle:
instances = self.__get_tendbsingle_instance(cluster_ids)
clusters = Cluster.objects.filter(id__in=cluster_ids)
instances = StorageInstance.objects.filter(
cluster__in=clusters, machine_type=MachineType.SINGLE, instance_role=InstanceRole.ORPHAN
)
ipList = []
ports = []
for instance in instances:
ports.append(instance.port)
ipList.append(instance.machine.ip)
upgrade_version_check(instance.version, new_mysql_pkg_name)

bk_cloud_id = clusters[0].bk_cloud_id
if len(list(set(ipList))) != 1:
raise DBMetaException(message=_("集群的master应该同属于一个机器,当前分布在{}").format(list(set(ipList))))

proxy_ip = ipList[0]
host_ip = ipList[0]
reinstall_ip_list.append(host_ip)
sub_pipeline.add_sub_pipeline(
sub_flow=self.upgrade_mysql_subflow(
bk_cloud_id=bk_cloud_id,
ip=proxy_ip,
ip=host_ip,
mysql_ports=ports,
mysql_pkg_name=new_mysql_pkg_name,
pkg_id=pkg_id,
)
)

sub_pipelines.append(sub_pipeline.build_sub_process(sub_name=_("[TendbSingle]本地升级MySQL版本")))
else:
raise DBMetaException(message=_("不支持的集群类型"))

mysql_upgrade_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines)

# 升级周边应用
mysql_upgrade_pipeline.add_sub_pipeline(
sub_flow=build_surrounding_apps_sub_flow(
bk_cloud_id=int(bk_cloud_id),
master_ip_list=reinstall_ip_list,
root_id=self.root_id,
parent_global_data=copy.deepcopy(sub_flow_context),
is_init=True,
collect_sysinfo=False,
cluster_type=cluster_type,
)
)
mysql_upgrade_pipeline.run_pipeline(is_drop_random_user=True)

return
Expand Down

0 comments on commit 0090a71

Please sign in to comment.