From 09c1643beee743f8a3d64ca0b44ef3f4f2be922d Mon Sep 17 00:00:00 2001 From: yuanruji Date: Fri, 11 Oct 2024 11:37:02 +0800 Subject: [PATCH] stage save 3 --- .../mysql/mysql_non_standby_slaves_upgrade.py | 130 +++++++++++++++++- 1 file changed, 124 insertions(+), 6 deletions(-) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py index 4e81675f41..30180deefc 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mysql/mysql_non_standby_slaves_upgrade.py @@ -293,7 +293,13 @@ def more_slaves_tendbha_cluster_upgrade_subflow( charset, db_version = get_version_and_charset( cluster_cls.bk_biz_id, db_module_id=new_db_module_id, cluster_type=cluster_cls.cluster_type ) - + # 确定要迁移的主节点,从节点. + master_model = cluster_cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) + slave = cluster_cls.storageinstance_set.filter( + instance_inner_role=InstanceInnerRole.SLAVE.value, is_stand_by=True + ).first() + old_master_ip = master_model.machine.ip + old_slave_ip = slave.machine.ip parent_global_data = { "uid": uid, "root_id": root_id, @@ -304,6 +310,8 @@ def more_slaves_tendbha_cluster_upgrade_subflow( "cluster_type": cluster_cls.cluster_type, "created_by": created_by, "package": pkg.name, + "master_ip": old_master_ip, + "old_slave_ip": old_slave_ip, "ports": ports, "charset": charset, "db_version": db_version, @@ -312,9 +320,79 @@ def more_slaves_tendbha_cluster_upgrade_subflow( sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) if len(ro_slaves) > 0: + ro_sub_piplelines = [] + ro_switch_ro_sub_piplelines = [] for ro_slave in ro_slaves: - print("ro_slave", ro_slave) + ro_sub_pipleline = SubBuilder(root_id=root_id, data=parent_global_data) + old_slave = ro_slave["old_slave"] + new_slave = ro_slave["new_slave"] + new_ro_slave_ip = new_slave["ip"] + bk_host_ids = [new_slave["bk_host_id"]] + old_slave_ip = old_slave["ip"] + db_config = get_instance_config(cluster_cls.bk_cloud_id, old_slave_ip, ports=ports) + install_ro_slave_sub_pipeline = build_install_slave_sub_pipeline( + uid, + root_id, + parent_global_data, + cluster_cls, + new_ro_slave_ip, + ports, + bk_host_ids, + db_config, + pkg_id, + pkg.name, + cluster_ids, + new_db_module_id, + ) + ro_sub_pipleline.add_sub_pipeline(sub_flow=install_ro_slave_sub_pipeline) + # 恢复主从数据 + local_backup = False + if backup_source == MySQLBackupSource.LOCAL: + local_backup = True + sync_data_sub_pipeline_list = build_sync_data_sub_pipelines( + root_id, parent_global_data, cluster_ids, new_ro_slave_ip, local_backup, charset + ) + ro_sub_pipleline.add_parallel_sub_pipeline(sub_flow=sync_data_sub_pipeline_list) + ro_sub_piplelines.append(ro_sub_pipleline.build_sub_process(sub_name=_("安装新从节点并数据同步"))) + # 切换换subpipeline + ro_switch_ro_sub_pipleline = SubBuilder(root_id=root_id, data=parent_global_data) + switch_sub_pipeline_list = build_switch_sub_pipelines( + root_id, parent_global_data, cluster_ids, old_slave_ip, new_ro_slave_ip + ) + ro_switch_ro_sub_pipleline.add_parallel_sub_pipeline(switch_sub_pipeline_list) + ro_switch_ro_sub_pipleline.add_act( + act_name=_("更新[NewSlave]{} db module id".format(new_ro_slave_ip)), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.update_upgrade_slaves_dbmodule.__name__, + is_update_trans_data=True, + cluster={ + "db_module_id": new_db_module_id, + "new_slave_ip": new_ro_slave_ip, + }, + ) + ), + ) + + # 解除old从节点和集群的元数据的关系 + ro_switch_ro_sub_pipleline.add_act( + act_name=_("解除[OldSlave]{}相关从实例和集群的元数据的关系".format(old_slave_ip)), + act_component_code=MySQLDBMetaComponent.code, + kwargs=asdict( + DBMetaOPKwargs( + db_meta_class_func=MySQLDBMeta.dissolve_master_slave_relationship.__name__, + is_update_trans_data=True, + cluster={ + "cluster_ids": cluster_ids, + "old_slave_ip": old_slave_ip, + }, + ) + ), + ) + ro_switch_ro_sub_piplelines.append(ro_switch_ro_sub_pipleline.build_sub_process(sub_name=_("切换从节点"))) # 安装mysql + ms_sub_pipeline = SubBuilder(root_id=root_id, data=parent_global_data) bk_host_ids = [new_master["bk_host_id"], new_slave["bk_host_id"]] master = cluster_cls.storageinstance_set.get(instance_inner_role=InstanceInnerRole.MASTER.value) db_config = get_instance_config(cluster_cls.bk_cloud_id, master.machine.ip, ports) @@ -331,9 +409,47 @@ def more_slaves_tendbha_cluster_upgrade_subflow( db_config=db_config, new_db_module_id=new_db_module_id, ) + ms_sub_pipeline.add_sub_pipeline(sub_flow=install_ms_pair_subflow) + new_master_ip = new_master["ip"] + new_slave_ip = new_slave["ip"] + sync_data_sub_pipeline_list = build_ms_pair_sync_data_sub_pipelines( + root_id, parent_global_data, cluster_ids, new_master_ip, new_slave_ip, local_backup, charset + ) + ms_sub_pipeline.add_parallel_sub_pipeline(sub_flow=sync_data_sub_pipeline_list) + ms_process = ms_sub_pipeline.build_sub_process(sub_name=_("安装主从节点,并同步数据")) + if len(ro_slaves) > 0: + sub_pipeline.add_parallel_sub_pipeline(sub_flow=[ms_process] + ro_sub_piplelines) + else: + sub_pipeline.add_parallel_sub_pipeline(sub_flow=[ms_process]) + # 切换主从 + sub_pipeline.add_act(act_name=_("人工确认切换"), act_component_code=PauseComponent.code, kwargs={}) + # 先切ro slaves + if len(ro_slaves) > 0: + sub_pipeline.add_parallel_sub_pipeline(sub_flow=ro_switch_ro_sub_piplelines) + ms_switch_subflows = build_ms_pair_switch_sub_pipelines( + uid, + root_id, + parent_global_data, + cluster_ids, + old_master_ip, + old_slave_ip, + new_master_ip, + new_slave_ip, + ) + sub_pipeline.add_parallel_sub_pipeline(sub_flow=ms_switch_subflows) + sub_pipeline.add_act(act_name=_("人工确认下架旧节点"), act_component_code=PauseComponent.code, kwargs={}) - sub_pipeline.add_sub_pipeline(sub_flow=install_ms_pair_subflow) - + uninsatll_flows = [] + if len(ro_slaves) > 0: + for ro_slave in ro_slaves: + old_slave = ro_slave["old_slave"] + old_slave_ip = old_slave["ip"] + uninsatll_flows.append( + build_uninstall_ro_slave_sub_pipeline( + root_id, parent_global_data, old_slave_ip, cluster_ids, cluster_cls.bk_cloud_id, ports + ) + ) + sub_pipeline.add_parallel_sub_pipeline(sub_flow=uninsatll_flows) return sub_pipeline.build_sub_process(sub_name=_("{}:迁移升级到").format("")) @@ -448,7 +564,7 @@ def non_standby_slaves_upgrade_subflow( # 切换完成后,确认卸载旧的从节点 sub_pipeline.add_act(act_name=_("确认卸载旧实例"), act_component_code=PauseComponent.code, kwargs={}) # 卸载旧从节点 - uninstall_svr_sub_pipeline = build_uninstall_sub_pipeline( + uninstall_svr_sub_pipeline = build_uninstall_ro_slave_sub_pipeline( root_id, parent_global_data, old_slave_ip, relation_cluster_ids, cluster_cls.bk_cloud_id, ports ) sub_pipeline.add_sub_pipeline(sub_flow=uninstall_svr_sub_pipeline) @@ -637,7 +753,9 @@ def build_switch_sub_pipelines(root_id, parent_global_data, relation_cluster_ids return switch_sub_pipeline_list -def build_uninstall_sub_pipeline(root_id, parent_global_data, old_slave_ip, relation_cluster_ids, bk_cloud_id, ports): +def build_uninstall_ro_slave_sub_pipeline( + root_id, parent_global_data, old_slave_ip, relation_cluster_ids, bk_cloud_id, ports +): uninstall_svr_sub_pipeline = SubBuilder(root_id=root_id, data=copy.deepcopy(parent_global_data)) cluster_info = {"uninstall_ip": old_slave_ip, "cluster_ids": relation_cluster_ids}