From 1ee68fefabcfc9cf6a0fefb469411bc7bd05847c Mon Sep 17 00:00:00 2001 From: cycker Date: Mon, 21 Oct 2024 17:35:48 +0800 Subject: [PATCH] feat(mongodb): pit_restore #7478 --- .../db_services/mongodb/restore/handlers.py | 12 +- dbm-ui/backend/flow/consts.py | 2 + .../scene/mongodb/mongodb_install_dbmon.py | 9 +- .../scene/mongodb/mongodb_pitr_restore.py | 290 ++++++++++++++---- .../mongodb/sub_task/install_dbmon_sub.py | 5 +- .../scene/mongodb/sub_task/instance_op.py | 102 ++++++ .../mongodb/sub_task/pitr_rebuild_sub.py | 51 +++ .../mongodb/prepare_instance_info.py | 4 - .../flow/utils/mongodb/mongodb_repo.py | 232 ++++++++++---- 9 files changed, 578 insertions(+), 129 deletions(-) create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/instance_op.py create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/pitr_rebuild_sub.py diff --git a/dbm-ui/backend/db_services/mongodb/restore/handlers.py b/dbm-ui/backend/db_services/mongodb/restore/handlers.py index 6b8ed7f3c1..b996149792 100644 --- a/dbm-ui/backend/db_services/mongodb/restore/handlers.py +++ b/dbm-ui/backend/db_services/mongodb/restore/handlers.py @@ -33,7 +33,8 @@ class MongoDBRestoreHandler(object): """mongodb定点构造函数封装""" def __init__(self, cluster_id: int): - self.cluster = Cluster.objects.get(id=cluster_id) + self.cluster_id = cluster_id + # self.cluster = Cluster.objects.get(id=cluster_id) @staticmethod def _get_log_from_bklog(collector: str, start_time: datetime, end_time: datetime, query_string="*") -> List[Dict]: @@ -52,7 +53,8 @@ def _query_latest_log_and_index(self, rollback_time: datetime, query_string: str query_string=query_string, ) if not backup_logs: - raise AppBaseException(_("距离回档时间点7天内没有备份日志 {} {}").format(query_string, rollback_time)) + raise AppBaseException(_("距离回档时间点{}天内没有备份日志 query_string: {} from {} to {}").format( + BACKUP_LOG_RANGE_DAYS, query_string, start_time, end_time)) # 获取距离回档时间最近的全备日志 backup_logs.sort(key=lambda x: x[time_key]) @@ -60,7 +62,7 @@ def _query_latest_log_and_index(self, rollback_time: datetime, query_string: str try: latest_backup_log_index = find_nearby_time(time_keys, rollback_time, flag) except IndexError: - raise AppBaseException(_("无法找到时间点{}附近的全备日志记录 query_string:{} ").format(rollback_time, query_string)) + raise AppBaseException(_("无法找到时间点{}附近的全备日志记录 query_string:{}").format(rollback_time, query_string)) return backup_logs, latest_backup_log_index @@ -71,7 +73,7 @@ def query_latest_backup_log(self, rollback_time: datetime, set_name: str = None) @param set_name: 指定SetName. cluster_type为ReplicaSet时,只有一个set_name, 可以为空. """ # 获取距离回档时间最近的全备日志 - query_string = f"cluster_id: {self.cluster.id} AND pitr_file_type: {PitrFillType.FULL}" + query_string = f"cluster_id: {self.cluster_id} AND pitr_file_type: {PitrFillType.FULL}" if set_name is not None: query_string += f" AND set_name: {set_name}" full_backup_logs, full_latest_index = self._query_latest_log_and_index( @@ -82,7 +84,7 @@ def query_latest_backup_log(self, rollback_time: datetime, set_name: str = None) # 找到与全备日志pitr_fullname相同的增量备份日志 pitr_fullname = latest_full_backup_log["pitr_fullname"] query_string = ( - f"cluster_id: {self.cluster.id} AND pitr_file_type: {PitrFillType.INCR} AND pitr_fullname: {pitr_fullname}" + f"cluster_id: {self.cluster_id} AND pitr_file_type: {PitrFillType.INCR} AND pitr_fullname: {pitr_fullname}" ) if set_name is not None: query_string += f" AND set_name: {set_name}" diff --git a/dbm-ui/backend/flow/consts.py b/dbm-ui/backend/flow/consts.py index 55744c5861..7553fc1189 100644 --- a/dbm-ui/backend/flow/consts.py +++ b/dbm-ui/backend/flow/consts.py @@ -514,6 +514,8 @@ class MongoDBActuatorActionEnum(str, StructuredEnum): InstallDBMon = EnumField("install_dbmon", _("install_dbmon")) MongoStart = EnumField("mongo_start", _("mongo_start")) MongoHello = EnumField("mongodb_hello", _("mongodb_hello")) + MongoPitrRebuild = EnumField("mongodb_pitr_rebuild", _("mongodb_pitr_rebuild")) + MongoInstanceOp = EnumField("mongodb_instance_op", _("mongodb_instance_op")) class EsActuatorActionEnum(str, StructuredEnum): diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install_dbmon.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install_dbmon.py index b2b525ff34..6085c95623 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install_dbmon.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_install_dbmon.py @@ -37,8 +37,8 @@ def get_pkg_info(): # repo_version 如果REPO_VERSION_FOR_DEV有值,则使用REPO_VERSION_FOR_DEV,否则使用最新版本 # 正式环境中,REPO_VERSION_FOR_DEV为空 # 个人测试环境中,REPO_VERSION_FOR_DEV 按需配置 - repo_version = env.REPO_VERSION_FOR_DEV if env.REPO_VERSION_FOR_DEV else MediumEnum.Latest - + dev_env = str(env.REPO_VERSION_FOR_DEV) + repo_version = dev_env if dev_env != "" else MediumEnum.Latest actuator_pkg = Package.get_latest_package( version=repo_version, pkg_type=MediumEnum.DBActuator, db_type=DBType.MongoDB ) @@ -47,7 +47,8 @@ def get_pkg_info(): toolkit_pkg = Package.get_latest_package( version=MediumEnum.Latest, pkg_type="mongo-toolkit", db_type=DBType.MongoDB ) - dbmon_pkg = Package.get_latest_package(version=MediumEnum.Latest, pkg_type="dbmon", db_type=DBType.MongoDB) + dbmon_pkg = Package.get_latest_package(version=MediumEnum.Latest + , pkg_type="dbmon", db_type=DBType.MongoDB) return { "actuator_pkg": actuator_pkg, "dbmon_pkg": dbmon_pkg, @@ -103,7 +104,7 @@ def add_install_dbmon(root_id, flow_data, pipeline, iplist, bk_cloud_id, allow_e bk_host_list.extend(sub_bk_host_list) sub_pipelines.append(sub_pl.build_sub_process(_("dbmon-{}").format(ip))) - # 介质下发,包括actuator+dbmon+dbtools 如果文件没有变化,不会占用带宽 + # 介质下发,包括actuator+dbmon+dbtools pipeline.add_act( **SendMedia.act( act_name=_("CpFile: actuator+dbmon+dbtools+toolkit"), diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_pitr_restore.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_pitr_restore.py index 2e15054ee4..ebeae41862 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_pitr_restore.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/mongodb_pitr_restore.py @@ -23,6 +23,8 @@ from backend.flow.engine.bamboo.scene.mongodb.sub_task.exec_shell_script import ExecShellScript from backend.flow.engine.bamboo.scene.mongodb.sub_task.fetch_backup_record_subtask import FetchBackupRecordSubTask from backend.flow.engine.bamboo.scene.mongodb.sub_task.hello_sub import HelloSubTask +from backend.flow.engine.bamboo.scene.mongodb.sub_task.instance_op import InstanceOpSubTask +from backend.flow.engine.bamboo.scene.mongodb.sub_task.pitr_rebuild_sub import PitrRebuildSubTask from backend.flow.engine.bamboo.scene.mongodb.sub_task.pitr_restore_sub import PitrRestoreSubTask from backend.flow.engine.bamboo.scene.mongodb.sub_task.send_media import SendMedia from backend.flow.plugins.components.collections.mongodb.exec_actuator_job2 import ExecJobComponent2 @@ -48,7 +50,6 @@ class MongoPitrRestoreFlow(MongoBaseFlow): class Serializer(serializers.Serializer): class DataRow(serializers.Serializer): task_ids = BsTask.Serializer(many=True, required=False) - src_cluster_id = serializers.IntegerField() dst_cluster_id = serializers.IntegerField() dst_cluster_type = serializers.CharField() dst_time = serializers.CharField() @@ -84,7 +85,7 @@ def start(self): MongoPitrRestoreFlow 流程 """ logger.debug("MongoPitrRestoreFlow start, payload", self.payload) - # actuator_workdir 提前创建好的,在部署的时候就创建好了. + # actuator_workdir 在部署的时候就创建好的 actuator_workdir = MongoUtil().get_mongodb_os_conf()["file_path"] file_list = GetFileList(db_type=DBType.MongoDB).get_db_actuator_package() @@ -96,7 +97,11 @@ def start(self): # 4. 执行回档任务 # 所有涉及的cluster - cluster_id_list = [row["dst_cluster_id"] for row in self.payload["infos"]] + cluster_id_list = [] + for row in self.payload["infos"]: + cluster_id_list.append(row["dst_cluster_id"]) + if row["src_cluster_id"] > 0: + cluster_id_list.append(row["src_cluster_id"]) self.check_cluster_id_list(cluster_id_list) clusters = MongoRepository.fetch_many_cluster_dict(id__in=cluster_id_list) dest_dir = str(DirEnum.MONGO_RECOVER_DIR.value) @@ -115,6 +120,7 @@ def start(self): dst_cluster_id = row["dst_cluster_id"] cluster = clusters[dst_cluster_id] self.check_cluster_valid(cluster, self.payload) + # todo check src_cluster dst_cluster has same topology except Exception as e: logger.exception("check_cluster_valid fail") raise Exception("check_cluster_valid fail cluster_id:{} {}".format(row["cluster_id"], e)) @@ -129,13 +135,31 @@ def start(self): pipeline = Builder(root_id=self.root_id, data=self.payload) cluster_pipes = [] for row in self.payload["infos"]: + # src_cluster_id 如果 > 0 读取 src_cluster_id + # src_cluster_id 如果 = 0 读取 row["src_cluster"] 这个一般用于测试或者src_cluster_id已不存在 + if row["src_cluster_id"] > 0: + src_cluster = clusters[row["src_cluster_id"]] + else: + src_cluster = MongoRepository.new_cluster_from_conf(row["src_cluster"]) cluster = clusters[row["dst_cluster_id"]] logger.debug("sub_pipline start row", row) - logger.debug("sub_pipline start cluster", cluster) + # check src_cluster dst_cluster has same topology + if src_cluster.is_sharded_cluster() != cluster.is_sharded_cluster(): + raise Exception("src_cluster and dst_cluster has different topology") + + if src_cluster.is_sharded_cluster(): + if len(src_cluster.get_shards()) != len(cluster.get_shards()): + raise Exception("src_cluster and dst_cluster has different shards") + if len(src_cluster.get_config().members) <= 0: + raise Exception("src_cluster config has no member") + else: + if src_cluster.get_shards() is None: + raise Exception("src_cluster has no shard") + cluster_sb = self.process_cluster( - row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir + row=row, src_cluster=src_cluster, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir ) - cluster_pipes.append(cluster_sb.build_sub_process(_("cluster {}").format(cluster.name))) + cluster_pipes.append(cluster_sb.build_sub_process(_("pitr cluster {}").format(cluster.name))) # 1. 统一预处理 # 2. 统一下发文件 @@ -186,91 +210,121 @@ def set_exec_node(row: Dict, cluster: MongoDBCluster) -> list[MongoNode]: exec_node_list.append(exec_node) return exec_node_list - def process_cluster(self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str) -> SubBuilder: + def process_cluster(self, row: Dict, src_cluster: MongoDBCluster, + cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str) -> SubBuilder: """ - cluster pitr_restore_flow + pitr_restore_flow - 兼容 ShardedCluster和ReplicaSet """ cluster_sb = SubBuilder(root_id=self.root_id, data=self.payload) shard_pipes = [] - if cluster.is_sharded_cluster(): - self.check_empty_cluster( - row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb - ) + """ 测试需要 """ + self.start_all_shardsvr( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + ) + + self.check_empty_cluster( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + ) + + self.stop_dbmon(row=row, cluster=cluster, actuator_workdir=actuator_workdir, + dest_dir=dest_dir, cluster_sb=cluster_sb) + if cluster.is_sharded_cluster(): self.stop_mongos( row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb ) - # 准备主节点: - # 让第1个节点成为Primary,remove掉其它成员,start as standalone mode - # self.remove_none_exec_node - # restart exec_node as standalone node - - self.stop_not_exec_node( + self.remove_not_exec_node_from_rs( row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb ) - self.restart_as_standalone( + + self.stop_not_exec_node( row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb ) - # 为每个Shard执行回档,包括configsvr - restore_sb = SubBuilder(root_id=self.root_id, data=self.payload) - for shard in cluster.get_shards(with_config=True): + src_shards = src_cluster.get_shards(with_config=True, sort_by_set_name=True) + dst_shards = cluster.get_shards(with_config=True, sort_by_set_name=True) + + if len(src_shards) != len(dst_shards): + raise Exception("src_shards and dst_shards has different shards") + + for idx in range(len(src_shards)): + src_shard = src_shards[idx] + dst_shard = dst_shards[idx] shard_sb = SubBuilder(root_id=self.root_id, data=self.payload) self.process_shard( row=row, + src_cluster=src_cluster, + src_shard=src_shard, cluster=cluster, - shard=shard, + shard=dst_shard, actuator_workdir=actuator_workdir, dest_dir=dest_dir, shard_sub=shard_sb, ) - shard_pipes.append(shard_sb.build_sub_process(_("{} {}").format(shard.set_type, shard.set_name))) + shard_pipes.append( + shard_sb.build_sub_process(_("{} restore {} to {}").format( + src_shard.set_type, src_shard.set_name, dst_shard.set_name))) + # 为每个Shard执行回档,包括configsvr + restore_sb = SubBuilder(root_id=self.root_id, data=self.payload) restore_sb.add_parallel_sub_pipeline(sub_flow_list=shard_pipes) - cluster_sb.add_sub_pipeline(sub_flow=restore_sb.build_sub_process("restore_by_shard")) + cluster_sb.add_sub_pipeline(sub_flow=restore_sb.build_sub_process("restore_shards")) # restore_sb end if cluster.is_sharded_cluster(): - # if sharded_cluster - # 处理各个分片和configsvr的关系. - # start as clusterRole: shardsvr + """ + step1: configsvr: + - update config.shards + - stop balancer + - fetchClusterId config.version.findOne({"clusterId":{$exists:true}}).clusterId + step2: shardsvr: + - start as standalone + - insert { "_id" : "shardIdentity”} to admin.system.version + """ self.rebuild_cluster( - row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + row=row, actuator_workdir=actuator_workdir, dest_dir=dest_dir, + src_cluster=src_cluster, dst_cluster=cluster, + cluster_sb=cluster_sb ) - # todo restart as auth mode && and re add secondary + self.start_all_mongos( + row=row, cluster=cluster, actuator_workdir=actuator_workdir, dest_dir=dest_dir, cluster_sb=cluster_sb + ) + # todo start another node return cluster_sb def check_empty_cluster( - self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder ): - exec_node = cluster.get_mongos()[0] - HelloSubTask.process_node( + exec_node = cluster.get_connect_node() + InstanceOpSubTask.process_node( root_id=self.root_id, ticket_data=self.payload, sub_ticket_data=row, sub_pipeline=cluster_sb, exec_node=exec_node, file_path=actuator_workdir, - act_name=_("空闲检查"), + act_name=_("CheckEmptyData"), + op="check_empty_data" ) return - def process_shard(self, row: Dict, cluster, shard, actuator_workdir: str, dest_dir: str, shard_sub: SubBuilder): + def process_shard(self, row: Dict, src_cluster, src_shard, + cluster, shard, actuator_workdir: str, dest_dir: str, shard_sub: SubBuilder): """ - pitr_restore_flow one shard + pitr_restore_flow one shard from src_cluster/src_shard to cluster/shard """ # FetchBackupRecordSubTask 根据 sub_ticket_data中的src_cluster_id, dst_time 获得备份文件列表. FetchBackupRecordSubTask.process_shard( root_id=self.root_id, ticket_data=self.payload, sub_ticket_data=row, - cluster=cluster, - shard=shard, + cluster=src_cluster, + shard=src_shard, ) exec_node = row["__exec_node"][shard.set_name] @@ -300,8 +354,33 @@ def process_shard(self, row: Dict, cluster, shard, actuator_workdir: str, dest_d return + def stop_dbmon( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): + + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + + for ip in cluster.get_iplist(): + acts_list.append( + { + "act_name": _("stop_dbmon {}".format(ip)), + "act_component_code": ExecJobComponent2.code, + "kwargs": InstanceOpSubTask.make_node_kwargs( + ip=ip, file_path=actuator_workdir, bk_cloud_id=cluster.bk_cloud_id, op="stop_dbmon"), + + } + ) + + # 可能会存在mongos列表为空的情况 + if len(acts_list) == 0: + return + + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("stop_dbmon")) + def stop_mongos( - self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder ): acts_list = [] @@ -311,33 +390,101 @@ def stop_mongos( { "act_name": _("stop_mongos {}:{}".format(mongos.ip, mongos.port)), "act_component_code": ExecJobComponent2.code, - "kwargs": HelloSubTask.make_kwargs(exec_node=mongos, file_path=actuator_workdir), + "kwargs": InstanceOpSubTask.make_kwargs(exec_node=mongos, file_path=actuator_workdir, op="stop"), } ) - # 可能会存在mongos列表为空的情况吗? + # 可能会存在mongos列表为空的情况 if len(acts_list) == 0: return sb.add_parallel_acts(acts_list=acts_list) cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("stop_mongos")) - def stop_not_exec_node( - self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + def remove_not_exec_node_from_rs( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder ): + """ remove_not_exec_node_from_rs remove other nodes""" + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + for shard in cluster.get_shards(with_config=True): + for m in shard.members: + # 每个分片都有一个exec_node,这个exec_node是用于导入数据的 + if m.equal(row["__exec_node"][shard.set_name]): + acts_list.append( + { + "act_name": _("rs_remove_others {}:{}".format(m.ip, m.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": InstanceOpSubTask.make_kwargs( + exec_node=m, file_path=actuator_workdir, op="rs_remove_other_node"), + } + ) + + if len(acts_list) == 0: + return + + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("rs_remove_other_node")) + def start_all_shardsvr( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): acts_list = [] sb = SubBuilder(root_id=self.root_id, data=self.payload) - for mongos in cluster.get_mongos(): + for shard in cluster.get_shards(with_config=True): + for m in shard.members: + acts_list.append( + { + "act_name": _("start {}:{}".format(m.ip, m.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": InstanceOpSubTask.make_kwargs(exec_node=m, file_path=actuator_workdir, op="start"), + } + ) + + if len(acts_list) == 0: + return + + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("start_mongo")) + + def start_all_mongos( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + for m in cluster.get_mongos(): acts_list.append( { - "act_name": _("stop {}:{}".format(mongos.ip, mongos.port)), + "act_name": _("start {}:{}".format(m.ip, m.port)), "act_component_code": ExecJobComponent2.code, - "kwargs": HelloSubTask.make_kwargs(exec_node=mongos, file_path=actuator_workdir), - } - ) + "kwargs": InstanceOpSubTask.make_kwargs(exec_node=m, file_path=actuator_workdir, op="start"), + }) + + if len(acts_list) == 0: + return + + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("start_mongos")) + + def stop_not_exec_node( + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + ): + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + for shard in cluster.get_shards(with_config=True): + for m in shard.members: + # 每个分片都有一个exec_node,这个exec_node是用于导入数据的 + if m.equal(row["__exec_node"][shard.set_name]): + continue + + acts_list.append( + { + "act_name": _("stop {}:{}".format(m.ip, m.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": InstanceOpSubTask.make_kwargs(exec_node=m, file_path=actuator_workdir, op="stop"), + } + ) - # 可能会存在mongos列表为空的情况吗? if len(acts_list) == 0: return @@ -345,7 +492,7 @@ def stop_not_exec_node( cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("stop_not_exec_node")) def restart_as_standalone( - self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder ): acts_list = [] @@ -367,23 +514,44 @@ def restart_as_standalone( cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("restart_as_standalone")) def rebuild_cluster( - self, row: Dict, cluster: MongoDBCluster, actuator_workdir: str, dest_dir: str, cluster_sb: SubBuilder + self, row: Dict, actuator_workdir: str, dest_dir: str, + src_cluster: MongoDBCluster, dst_cluster: MongoDBCluster, + cluster_sb: SubBuilder ): + src_configsvr = src_cluster.get_config() + dst_configsvr = dst_cluster.get_config() + src_shards = src_cluster.get_shards(with_config=False, sort_by_set_name=True) + dst_shards = dst_cluster.get_shards(with_config=False, sort_by_set_name=True) acts_list = [] sb = SubBuilder(root_id=self.root_id, data=self.payload) - for mongos in cluster.get_mongos(): + exec_node = row["__exec_node"][dst_configsvr.set_name] + acts_list.append( + { + "act_name": _("rebuild {} {}:{}".format(dst_configsvr.set_name, exec_node.ip, exec_node.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": PitrRebuildSubTask.make_kwargs(exec_node=exec_node, file_path=actuator_workdir, + src_shard=src_configsvr, dst_shard=dst_configsvr, + src_cluster=src_cluster, dst_cluster=dst_cluster), + } + ) + sb.add_parallel_acts(acts_list=acts_list) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("rebuild_cluster-configsvr")) + + acts_list = [] + sb = SubBuilder(root_id=self.root_id, data=self.payload) + for i in range(len(src_shards)): + src_shard = src_shards[i] + dst_shard = dst_shards[i] + exec_node = row["__exec_node"][dst_shard.set_name] acts_list.append( { - "act_name": _("rebuild {}:{}".format(mongos.ip, mongos.port)), + "act_name": _("rebuild {} {}:{}".format(dst_shard.set_name, exec_node.ip, exec_node.port)), "act_component_code": ExecJobComponent2.code, - "kwargs": HelloSubTask.make_kwargs(exec_node=mongos, file_path=actuator_workdir), - } - ) - - # 可能会存在mongos列表为空的情况吗? - if len(acts_list) == 0: - return + "kwargs": PitrRebuildSubTask.make_kwargs(exec_node=exec_node, file_path=actuator_workdir, + src_shard=src_shard, dst_shard=dst_shard, + src_cluster=src_cluster, dst_cluster=dst_cluster), + }) sb.add_parallel_acts(acts_list=acts_list) - cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("rebuild_cluster")) + cluster_sb.add_sub_pipeline(sub_flow=sb.build_sub_process("rebuild_cluster-shardsvr")) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/install_dbmon_sub.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/install_dbmon_sub.py index e4cda2d8e8..9ed6c89c97 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/install_dbmon_sub.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/install_dbmon_sub.py @@ -53,7 +53,7 @@ def make_kwargs( "payload_func": payload_func, "db_act_template": { "exec_ip": ip, # 用于 payload_func - "action": MongoDBActuatorActionEnum.InstallDBMon, + "action": MongoDBActuatorActionEnum.InstallDBMon.value, "file_path": file_path, "exec_account": "root", "sudo_account": "mysql", @@ -124,7 +124,7 @@ def make_kwargs( "bk_cloud_id": bk_cloud_id, }, "db_act_template": { - "action": MongoDBActuatorActionEnum.InstallDBMon, + "action": MongoDBActuatorActionEnum.InstallDBMon.value, "file_path": file_path, "exec_account": "root", "sudo_account": "mysql", @@ -169,3 +169,4 @@ def process_iplist( "act_component_code": ExecPrepareInstanceInfoOperationComponent.code, "kwargs": kwargs, } + \ No newline at end of file diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/instance_op.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/instance_op.py new file mode 100644 index 0000000000..58cf277f0a --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/instance_op.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.flow.consts import MongoDBActuatorActionEnum +from backend.flow.engine.bamboo.scene.common.builder import SubBuilder +from backend.flow.engine.bamboo.scene.mongodb.sub_task.base_subtask import BaseSubTask +from backend.flow.plugins.components.collections.mongodb.exec_actuator_job2 import ExecJobComponent2 +from backend.flow.utils.mongodb.mongodb_dataclass import CommonContext +from backend.flow.utils.mongodb.mongodb_repo import MongoNode +from backend.flow.utils.mongodb.mongodb_util import MongoUtil + + +# InstanceOpSubTask 对mongod/mongos实例做一些简单的操作. start/stop 等 +class InstanceOpSubTask(BaseSubTask): + """ + payload: 整体的ticket_data + sub_payload: 这个子任务的ticket_data + rs: + """ + + @classmethod + def make_kwargs(cls, file_path, exec_node: MongoNode, op: str) -> dict: + dba_user, dba_pwd = MongoUtil.get_dba_user_password(exec_node.ip, exec_node.port, exec_node.bk_cloud_id) + return { + "set_trans_data_dataclass": CommonContext.__name__, + "get_trans_data_ip_var": None, + "bk_cloud_id": exec_node.bk_cloud_id, + "exec_ip": exec_node.ip, + "db_act_template": { + "action": MongoDBActuatorActionEnum.MongoInstanceOp, + "file_path": file_path, + "exec_account": "mysql", + "sudo_account": "mysql", + "payload": { + "ip": exec_node.ip, + "port": int(exec_node.port), + "adminUsername": dba_user, + "adminPassword": dba_pwd, + "op": op, + }, + }, + } + + @classmethod + def make_node_kwargs(cls, file_path, ip: str, bk_cloud_id: int, op: str) -> dict: + # 按IP为单位执行任务 stop_dbmon/start_dbmon + return { + "set_trans_data_dataclass": CommonContext.__name__, + "get_trans_data_ip_var": None, + "bk_cloud_id": bk_cloud_id, + "exec_ip": ip, + "db_act_template": { + "action": MongoDBActuatorActionEnum.MongoInstanceOp, + "file_path": file_path, + "exec_account": "mysql", + "sudo_account": "mysql", + "payload": { + "ip": ip, + "op": op, + }, + }, + } + + @classmethod + def process_node( + cls, + root_id: str, + ticket_data: Optional[Dict], + sub_ticket_data: Optional[Dict], + file_path, + exec_node: MongoNode, + sub_pipeline: SubBuilder, + act_name: str, + op: str, + ) -> SubBuilder: + """ + cluster can be a ReplicaSet or a ShardedCluster + """ + + # 创建子流程 + if sub_pipeline is None: + sub_pipeline = SubBuilder(root_id=root_id, data=ticket_data) + + kwargs = cls.make_kwargs(file_path, exec_node, op) + act = { + "act_name": _("{} {}:{}".format(act_name, exec_node.ip, exec_node.port)), + "act_component_code": ExecJobComponent2.code, + "kwargs": kwargs, + } + sub_pipeline.add_act(**act) + return sub_pipeline diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/pitr_rebuild_sub.py b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/pitr_rebuild_sub.py new file mode 100644 index 0000000000..bc4c8a6b7e --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/mongodb/sub_task/pitr_rebuild_sub.py @@ -0,0 +1,51 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" + +from backend.flow.consts import MongoDBActuatorActionEnum +from backend.flow.engine.bamboo.scene.mongodb.sub_task.base_subtask import BaseSubTask +from backend.flow.utils.mongodb.mongodb_dataclass import CommonContext +from backend.flow.utils.mongodb.mongodb_repo import MongoNode +from backend.flow.utils.mongodb.mongodb_util import MongoUtil + + +# PitrRebuildSubTask 重新构建集群 +class PitrRebuildSubTask(BaseSubTask): + """ + payload: 整体的ticket_data + sub_payload: 这个子任务的ticket_data + rs: + """ + + @classmethod + def make_kwargs(cls, file_path, exec_node: MongoNode, src_shard, dst_shard, src_cluster, dst_cluster) -> dict: + dba_user, dba_pwd = MongoUtil.get_dba_user_password(exec_node.ip, exec_node.port, exec_node.bk_cloud_id) + return { + "set_trans_data_dataclass": CommonContext.__name__, + "get_trans_data_ip_var": None, + "bk_cloud_id": exec_node.bk_cloud_id, + "exec_ip": exec_node.ip, + "db_act_template": { + "action": MongoDBActuatorActionEnum.MongoPitrRebuild, + "file_path": file_path, + "exec_account": "root", + "sudo_account": "mysql", + "payload": { + "ip": exec_node.ip, + "port": int(exec_node.port), + "adminUsername": dba_user, + "adminPassword": dba_pwd, + "src_cluster": src_cluster.__json__(), + "dst_cluster": dst_cluster.__json__(), + "src_shard": src_shard.__json__(), + "dst_shard": dst_shard.__json__(), + }, + }, + } diff --git a/dbm-ui/backend/flow/plugins/components/collections/mongodb/prepare_instance_info.py b/dbm-ui/backend/flow/plugins/components/collections/mongodb/prepare_instance_info.py index c8abfffb87..1ef20fd209 100644 --- a/dbm-ui/backend/flow/plugins/components/collections/mongodb/prepare_instance_info.py +++ b/dbm-ui/backend/flow/plugins/components/collections/mongodb/prepare_instance_info.py @@ -35,11 +35,7 @@ def _execute(self, data, parent_data) -> bool: kwargs 私有变量 """ - # trans_data = data.get_one_of_inputs("trans_data") kwargs = data.get_one_of_inputs("kwargs") - - # if trans_data is None or trans_data == "${trans_data}": - # 表示没有加载上下文内容,则在此添加 trans_data = getattr(flow_context, kwargs["set_trans_data_dataclass"])() iplist = kwargs["trans_data_var"]["iplist"] diff --git a/dbm-ui/backend/flow/utils/mongodb/mongodb_repo.py b/dbm-ui/backend/flow/utils/mongodb/mongodb_repo.py index 7fc11038e2..391a9b3bc7 100644 --- a/dbm-ui/backend/flow/utils/mongodb/mongodb_repo.py +++ b/dbm-ui/backend/flow/utils/mongodb/mongodb_repo.py @@ -1,3 +1,4 @@ +import re from abc import abstractmethod from typing import List, Union @@ -12,6 +13,7 @@ from backend.flow.utils.mongodb import mongodb_password from backend.ticket.constants import InstanceType + # entities # Node -> ReplicaSet -> Cluster[Rs,ShardedCluster] # MongoNodeWithLabel @@ -19,9 +21,9 @@ class MongoNode: - def __init__(self, ip, port, role, bk_cloud_id, mtype, domain=None): + def __init__(self, ip: str, port: int, role: str, bk_cloud_id: int, mtype: str, domain: str = None): self.ip: str = ip - self.port: str = port + self.port: int = port self.role: str = role self.bk_cloud_id: int = bk_cloud_id self.machine_type = mtype @@ -37,10 +39,28 @@ def from_instance(cls, s: Union[ProxyInstance, StorageInstance], with_domain: bo if with_domain: domain = s.bind_entry.first().entry node = MongoNode( - s.ip_port.split(":")[0], str(s.port), meta_role, s.machine.bk_cloud_id, s.machine_type, domain + s.ip_port.split(":")[0], s.port, meta_role, s.machine.bk_cloud_id, s.machine_type, domain ) return node + def equal(self, other: 'MongoNode') -> bool: + return self.ip == other.ip and self.port == other.port and self.bk_cloud_id == other.bk_cloud_id + + @classmethod + def from_conf(cls, conf) -> 'MongoNode': + """from dict""" + return MongoNode(conf["ip"], int(conf["port"]), conf["role"], int(conf["bk_cloud_id"]), "") + + def __json__(self): + return { + "ip": self.ip, + "port": self.port, + "role": self.role, + "bk_cloud_id": self.bk_cloud_id, + "machine_type": self.machine_type, + "domain": self.domain, + } + class ReplicaSet: set_name: str @@ -52,6 +72,20 @@ def __init__(self, set_type: str, set_name: str = None, members: List[MongoNode] self.set_name = set_name self.members = members + @classmethod + def from_conf(cls, conf, set_type: None) -> 'ReplicaSet': + """from dict""" + if set_type is None: + if set_type is None: + raise Exception("conf.set_type is None") + set_type = conf["set_type"] + + return ReplicaSet( + set_type, + conf["set_name"], + [MongoNode.from_conf(m) for m in conf["members"]], + ) + # get_backup_node 返回MONGO_BACKUP member def get_backup_node(self): i = len(self.members) - 1 @@ -76,6 +110,13 @@ def get_bk_cloud_id(self): return i.bk_cloud_id return None + def __json__(self): + return { + "set_name": self.set_name, + "set_type": self.set_type, + "members": [m.__json__() for m in self.members], + } + # MongoDBCluster [interface] 有cluster_id cluster_name cluster_type class MongoDBCluster: @@ -92,16 +133,16 @@ class MongoDBCluster: cluster_id: str def __init__( - self, - bk_cloud_id: int = None, - cluster_id: str = None, - name: str = None, - cluster_type: str = None, - major_version: str = None, - bk_biz_id: int = None, - immute_domain: str = None, - app: str = None, - region: str = None, + self, + bk_cloud_id: int = None, + cluster_id: str = None, + name: str = None, + cluster_type: str = None, + major_version: str = None, + bk_biz_id: int = None, + immute_domain: str = None, + app: str = None, + region: str = None, ): self.cluster_id = cluster_id self.name = name @@ -114,13 +155,18 @@ def __init__( self.region = region @abstractmethod - def get_shards(self) -> List[ReplicaSet]: + def get_shards(self, with_config: bool = False, sort_by_set_name: bool = False) -> List[ReplicaSet]: raise NotImplementedError @abstractmethod def get_mongos(self) -> List[MongoNode]: raise NotImplementedError + @abstractmethod + def get_connect_node(self) -> MongoNode: + """ 返回可连接的节点 集群是mongos, 副本集是第1个节点""" + raise NotImplementedError + @abstractmethod def get_config(self) -> ReplicaSet: raise NotImplementedError @@ -132,6 +178,7 @@ def is_sharded_cluster(self) -> bool: return self.cluster_type == str(ClusterType.MongoShardedCluster.value) def get_iplist(self) -> List: + """ return all iplist of cluster""" iplist = [] for shard in self.get_shards(): for member in shard.members: @@ -142,23 +189,28 @@ def get_iplist(self) -> List: iplist.append(member.ip) for mongos in self.get_mongos(): iplist.append(mongos.ip) + + iplist = list(set(iplist)) return iplist class ReplicaSetCluster(MongoDBCluster): + def get_connect_node(self) -> MongoNode: + return self.shard.members[0] + shard: ReplicaSet # storages def __init__( - self, - bk_cloud_id=None, - cluster_id=None, - name=None, - major_version=None, - bk_biz_id=None, - immute_domain=None, - app: str = None, - region: str = None, - shard: ReplicaSet = None, + self, + bk_cloud_id=None, + cluster_id=None, + name=None, + major_version=None, + bk_biz_id=None, + immute_domain=None, + app: str = None, + region: str = None, + shard: ReplicaSet = None, ): super().__init__( bk_cloud_id, @@ -173,8 +225,8 @@ def __init__( ) self.shard = shard - def get_shards(self, with_config: bool = False) -> List[ReplicaSet]: - # no config + def get_shards(self, with_config: bool = False, sort_by_set_name: bool = False) -> List[ReplicaSet]: + # get_shards return [ReplicaSet] return [self.shard] def get_mongos(self) -> List[MongoNode]: @@ -185,25 +237,42 @@ def get_config(self) -> ReplicaSet: """Not Implemented""" return None + def __json__(self): + return { + "bk_cloud_id": self.bk_cloud_id, + "cluster_id": self.cluster_id, + "name": self.name, + "cluster_type": self.cluster_type, + "major_version": self.major_version, + "bk_biz_id": self.bk_biz_id, + "immute_domain": self.immute_domain, + "app": self.app, + "region": self.region, + "shard": self.shard.__json__(), + } + class ShardedCluster(MongoDBCluster): + def get_connect_node(self) -> MongoNode: + return self.get_mongos()[0] + shards: List[ReplicaSet] # storages mongos: List[MongoNode] # proxies configsvr: ReplicaSet # configs def __init__( - self, - bk_cloud_id=None, - cluster_id=None, - name=None, - major_version=None, - bk_biz_id=None, - immute_domain=None, - app: str = None, - region: str = None, - shards: List[ReplicaSet] = None, - mongos: List[MongoNode] = None, - configsvr: ReplicaSet = None, + self, + bk_cloud_id=None, + cluster_id=None, + name=None, + major_version=None, + bk_biz_id=None, + immute_domain=None, + app: str = None, + region: str = None, + shards: List[ReplicaSet] = None, + mongos: List[MongoNode] = None, + configsvr: ReplicaSet = None, ): super().__init__( bk_cloud_id, @@ -220,12 +289,20 @@ def __init__( self.mongos = mongos self.config = configsvr - def get_shards(self, with_config: bool = False) -> List[ReplicaSet]: - if not with_config: - return self.shards + def get_shards(self, with_config: bool = False, sort_by_set_name: bool = False) -> List[ReplicaSet]: + """ 返回 shards 列表,可以选择是否包含configsvr, 是否按照set_name排序. """ + + def __get_shard_idx(set_name: str): + matches = re.findall("[0-9]+$", set_name) + return int(matches[-1]) if matches else 0 - shards = [self.config] + shards = [] + if with_config: + shards.append(self.config) shards.extend(self.shards) + + if sort_by_set_name: + shards.sort(key=lambda x: __get_shard_idx(x.set_name)) return shards def get_config(self) -> ReplicaSet: @@ -234,6 +311,22 @@ def get_config(self) -> ReplicaSet: def get_mongos(self) -> List[MongoNode]: return self.mongos + def __json__(self): + return { + "bk_cloud_id": self.bk_cloud_id, + "cluster_id": self.cluster_id, + "name": self.name, + "cluster_type": self.cluster_type, + "major_version": self.major_version, + "bk_biz_id": self.bk_biz_id, + "immute_domain": self.immute_domain, + "app": self.app, + "region": self.region, + "mongos": [m.__json__() for m in self.mongos], + "shards": [s.__json__() for s in self.shards], + "configsvr": self.config.__json__(), + } + # MongoRepository # @@ -241,6 +334,41 @@ class MongoRepository: def __init__(self): pass + @staticmethod + def new_cluster_from_conf(conf) -> MongoDBCluster: + """ + NewCluster 根据conf创建一个MongoDBCluster, 它可能在cmdb中不存在了. 但是我们仍然可以创建一个MongoDBCluster + 此处不会检测各个数据的合法性,请在调用前检查 + """ + if conf["cluster_type"] == ClusterType.MongoReplicaSet.value: + return ReplicaSetCluster( + bk_cloud_id=conf["bk_cloud_id"], + cluster_id=conf["cluster_id"], + name=conf["name"], + major_version=conf["major_version"], + bk_biz_id=conf["bk_biz_id"], + immute_domain=conf["immute_domain"], + app=conf["app"], + region=conf["region"], + shard=ReplicaSet.from_conf(conf["shard"], set_type=MongoDBClusterRole.Replicaset.value), + ) + elif conf["cluster_type"] == ClusterType.MongoShardedCluster.value: + return ShardedCluster( + bk_cloud_id=conf["bk_cloud_id"], + cluster_id=conf["cluster_id"], + name=conf["name"], + major_version=conf["major_version"], + bk_biz_id=conf["bk_biz_id"], + immute_domain=conf["immute_domain"], + app=conf["app"], + region=conf["region"], + mongos=[MongoNode.from_conf(m) for m in conf["mongos"]], + configsvr=ReplicaSet.from_conf(conf["configsvr"], set_type=MongoDBClusterRole.ConfigSvr.value), + shards=[ReplicaSet.from_conf(m, set_type=MongoDBClusterRole.ShardSvr.value) for m in conf["shards"]], + ) + else: + raise Exception("bad cluster_type {}".format(conf["cluster_type"])) + @classmethod def fetch_many_cluster(cls, with_domain: bool, **kwargs): # with_domain 是否: 获取复制集和mongos的域名,赋值在MongoNode的domain属性上 @@ -284,7 +412,6 @@ def fetch_many_cluster(cls, with_domain: bool, **kwargs): if m.instance.machine_type == machine_type.MachineType.MONOG_CONFIG.value: shard = ReplicaSet(MongoDBClusterRole.ConfigSvr.value, set_name=m.seg_range, members=members) configsvr = shard - # shardsvr else: shard = ReplicaSet(MongoDBClusterRole.ShardSvr.value, set_name=m.seg_range, members=members) @@ -303,7 +430,6 @@ def fetch_many_cluster(cls, with_domain: bool, **kwargs): configsvr=configsvr, region=i.region, ) - rows.append(row) return rows @@ -400,18 +526,18 @@ def is_partial(cls, payload: dict) -> bool: return False else: if ( - payload["db_patterns"] is None - and payload["ignore_dbs"] is None - and payload["table_patterns"] is None - and payload["ignore_tables"] is None + payload["db_patterns"] is None + and payload["ignore_dbs"] is None + and payload["table_patterns"] is None + and payload["ignore_tables"] is None ): return False if ( - payload["db_patterns"] is None - or payload["ignore_dbs"] is None - or payload["table_patterns"] is None - or payload["ignore_tables"] is None + payload["db_patterns"] is None + or payload["ignore_dbs"] is None + or payload["table_patterns"] is None + or payload["ignore_tables"] is None ): raise Exception("bad nsFilter {}".format(payload)) return True @@ -574,7 +700,7 @@ def from_hosts(iplist: List, bk_cloud_id: int) -> List: return instance_list @staticmethod - def append_password(nodes: List, username: str): + def append_password(nodes: List, username: str, allow_empty_password: bool = False): """ 为每个节点添加密码 """