Skip to content

Commit

Permalink
stage save 3
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Oct 11, 2024
1 parent dbd55d5 commit 09c1643
Showing 1 changed file with 124 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,13 @@ def more_slaves_tendbha_cluster_upgrade_subflow(
charset, db_version = get_version_and_charset(
cluster_cls.bk_biz_id, db_module_id=new_db_module_id, cluster_type=cluster_cls.cluster_type
)

# 确定要迁移的主节点,从节点.
master_model = cluster_cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
slave = cluster_cls.storageinstance_set.filter(
instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True
).first()
old_master_ip = master_model.machine.ip
old_slave_ip = slave.machine.ip
parent_global_data = {
"uid": uid,
"root_id": root_id,
Expand All @@ -304,6 +310,8 @@ def more_slaves_tendbha_cluster_upgrade_subflow(
"cluster_type": cluster_cls.cluster_type,
"created_by": created_by,
"package": pkg.name,
"master_ip": old_master_ip,
"old_slave_ip": old_slave_ip,
"ports": ports,
"charset": charset,
"db_version": db_version,
Expand All @@ -312,9 +320,79 @@ def more_slaves_tendbha_cluster_upgrade_subflow(
sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data)

if len(ro_slaves) > 0:
ro_sub_piplelines = []
ro_switch_ro_sub_piplelines = []
for ro_slave in ro_slaves:
print("ro_slave", ro_slave)
ro_sub_pipleline = SubBuilder(root_id=root_id, data=parent_global_data)
old_slave = ro_slave["old_slave"]
new_slave = ro_slave["new_slave"]
new_ro_slave_ip = new_slave["ip"]
bk_host_ids = [new_slave["bk_host_id"]]
old_slave_ip = old_slave["ip"]
db_config = get_instance_config(cluster_cls.bk_cloud_id, old_slave_ip, ports=ports)
install_ro_slave_sub_pipeline = build_install_slave_sub_pipeline(
uid,
root_id,
parent_global_data,
cluster_cls,
new_ro_slave_ip,
ports,
bk_host_ids,
db_config,
pkg_id,
pkg.name,
cluster_ids,
new_db_module_id,
)
ro_sub_pipleline.add_sub_pipeline(sub_flow=install_ro_slave_sub_pipeline)
# 恢复主从数据
local_backup = False
if backup_source == MySQLBackupSource.LOCAL:
local_backup = True
sync_data_sub_pipeline_list = build_sync_data_sub_pipelines(
root_id, parent_global_data, cluster_ids, new_ro_slave_ip, local_backup, charset
)
ro_sub_pipleline.add_parallel_sub_pipeline(sub_flow=sync_data_sub_pipeline_list)
ro_sub_piplelines.append(ro_sub_pipleline.build_sub_process(sub_name=_("安装新从节点并数据同步")))
# 切换换subpipeline
ro_switch_ro_sub_pipleline = SubBuilder(root_id=root_id, data=parent_global_data)
switch_sub_pipeline_list = build_switch_sub_pipelines(
root_id, parent_global_data, cluster_ids, old_slave_ip, new_ro_slave_ip
)
ro_switch_ro_sub_pipleline.add_parallel_sub_pipeline(switch_sub_pipeline_list)
ro_switch_ro_sub_pipleline.add_act(
act_name=_("更新[NewSlave]{} db module id".format(new_ro_slave_ip)),
act_component_code=MySQLDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
db_meta_class_func=MySQLDBMeta.update_upgrade_slaves_dbmodule.__name__,
is_update_trans_data=True,
cluster={
"db_module_id": new_db_module_id,
"new_slave_ip": new_ro_slave_ip,
},
)
),
)

# 解除old从节点和集群的元数据的关系
ro_switch_ro_sub_pipleline.add_act(
act_name=_("解除[OldSlave]{}相关从实例和集群的元数据的关系".format(old_slave_ip)),
act_component_code=MySQLDBMetaComponent.code,
kwargs=asdict(
DBMetaOPKwargs(
db_meta_class_func=MySQLDBMeta.dissolve_master_slave_relationship.__name__,
is_update_trans_data=True,
cluster={
"cluster_ids": cluster_ids,
"old_slave_ip": old_slave_ip,
},
)
),
)
ro_switch_ro_sub_piplelines.append(ro_switch_ro_sub_pipleline.build_sub_process(sub_name=_("切换从节点")))
# 安装mysql
ms_sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data)
bk_host_ids = [new_master["bk_host_id"], new_slave["bk_host_id"]]
master = cluster_cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value)
db_config = get_instance_config(cluster_cls.bk_cloud_id, master.machine.ip, ports)
Expand All @@ -331,9 +409,47 @@ def more_slaves_tendbha_cluster_upgrade_subflow(
db_config=db_config,
new_db_module_id=new_db_module_id,
)
ms_sub_pipeline.add_sub_pipeline(sub_flow=install_ms_pair_subflow)
new_master_ip = new_master["ip"]
new_slave_ip = new_slave["ip"]
sync_data_sub_pipeline_list = build_ms_pair_sync_data_sub_pipelines(
root_id, parent_global_data, cluster_ids, new_master_ip, new_slave_ip, local_backup, charset
)
ms_sub_pipeline.add_parallel_sub_pipeline(sub_flow=sync_data_sub_pipeline_list)
ms_process = ms_sub_pipeline.build_sub_process(sub_name=_("安装主从节点,并同步数据"))
if len(ro_slaves) > 0:
sub_pipeline.add_parallel_sub_pipeline(sub_flow=[ms_process] + ro_sub_piplelines)
else:
sub_pipeline.add_parallel_sub_pipeline(sub_flow=[ms_process])
# 切换主从
sub_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={})
# 先切ro slaves
if len(ro_slaves) > 0:
sub_pipeline.add_parallel_sub_pipeline(sub_flow=ro_switch_ro_sub_piplelines)
ms_switch_subflows = build_ms_pair_switch_sub_pipelines(
uid,
root_id,
parent_global_data,
cluster_ids,
old_master_ip,
old_slave_ip,
new_master_ip,
new_slave_ip,
)
sub_pipeline.add_parallel_sub_pipeline(sub_flow=ms_switch_subflows)
sub_pipeline.add_act(act_name=_("人工确认下架旧节点"), act_component_code=PauseComponent.code, kwargs={})

sub_pipeline.add_sub_pipeline(sub_flow=install_ms_pair_subflow)

uninsatll_flows = []
if len(ro_slaves) > 0:
for ro_slave in ro_slaves:
old_slave = ro_slave["old_slave"]
old_slave_ip = old_slave["ip"]
uninsatll_flows.append(
build_uninstall_ro_slave_sub_pipeline(
root_id, parent_global_data, old_slave_ip, cluster_ids, cluster_cls.bk_cloud_id, ports
)
)
sub_pipeline.add_parallel_sub_pipeline(sub_flow=uninsatll_flows)
return sub_pipeline.build_sub_process(sub_name=_("{}:迁移升级到").format(""))


Expand Down Expand Up @@ -448,7 +564,7 @@ def non_standby_slaves_upgrade_subflow(
# 切换完成后,确认卸载旧的从节点
sub_pipeline.add_act(act_name=_("确认卸载旧实例"), act_component_code=PauseComponent.code, kwargs={})
# 卸载旧从节点
uninstall_svr_sub_pipeline = build_uninstall_sub_pipeline(
uninstall_svr_sub_pipeline = build_uninstall_ro_slave_sub_pipeline(
root_id, parent_global_data, old_slave_ip, relation_cluster_ids, cluster_cls.bk_cloud_id, ports
)
sub_pipeline.add_sub_pipeline(sub_flow=uninstall_svr_sub_pipeline)
Expand Down Expand Up @@ -637,7 +753,9 @@ def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids
return switch_sub_pipeline_list


def build_uninstall_sub_pipeline(root_id, parent_global_data, old_slave_ip, relation_cluster_ids, bk_cloud_id, ports):
def build_uninstall_ro_slave_sub_pipeline(
root_id, parent_global_data, old_slave_ip, relation_cluster_ids, bk_cloud_id, ports
):
uninstall_svr_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data))
cluster_info = {"uninstall_ip": old_slave_ip, "cluster_ids": relation_cluster_ids}

Expand Down

0 comments on commit 09c1643

Please sign in to comment.