From 2e6ee7f7a6f8e82f742f348704e89bfab0473e21 Mon Sep 17 00:00:00 2001 From: OMG-By <504094596@qq.com> Date: Wed, 25 Oct 2023 16:01:00 +0800 Subject: [PATCH] =?UTF-8?q?feat(redis=5Fmigrate):=20redis=20migrate=20?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scene/redis/redis_cluster_apply_flow.py | 2 +- .../redis/redis_cluster_migrate_compair.py | 119 +++++ .../scene/redis/redis_cluster_migrate_load.py | 411 ++++++++++++++++++ .../redis/redis_cluster_migrate_precheck.py | 158 +++++++ .../backend/flow/engine/controller/redis.py | 24 + dbm-ui/backend/flow/urls.py | 6 + .../flow/utils/redis/redis_act_playload.py | 14 +- .../backend/flow/utils/redis/redis_db_meta.py | 134 +++++- dbm-ui/backend/flow/views/redis_cluster.py | 36 ++ dbm-ui/backend/ticket/constants.py | 1 + 10 files changed, 887 insertions(+), 18 deletions(-) create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_compair.py create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_load.py create mode 100644 dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_precheck.py diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_apply_flow.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_apply_flow.py index 98fb474887..f951b229e7 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_apply_flow.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_apply_flow.py @@ -281,7 +281,7 @@ def deploy_redis_cluster_flow(self): "db_version": self.data["db_version"], "domain_name": self.data["domain_name"], } - if self.data["cluster_type"] != ClusterType.TwemproxyTendisSSDInstance.value: + if self.data["cluster_type"] == ClusterType.TendisTwemproxyRedisInstance.value: act_kwargs.cluster["conf"]["cluster-enabled"] = ClusterStatus.REDIS_CLUSTER_NO act_kwargs.get_redis_payload_func = RedisActPayload.set_redis_config.__name__ diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_compair.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_compair.py new file mode 100644 index 0000000000..ee33db1fe5 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_compair.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging.config +from collections import defaultdict +from typing import Dict, Optional + +from backend.db_meta import api +from backend.db_meta.models import CLBEntryDetail, Cluster + +logger = logging.getLogger("flow") + + +class RedisClusterMigrateCompairFlow(object): + """ + 迁移后数据对比 + """ + + def __init__(self, root_id: str, data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param data : 单据传递过来的参数列表,是dict格式 + """ + self.root_id = root_id + self.data = data + + def __check(self, data: dict): + for param in data["clusters"]: + # 处理input + input_seg_dict = defaultdict(dict) + input_proxy_list = [] + for backend in param["backends"]: + input_seg_dict[backend["shard"]] = { + "master": "{}:{}".format(backend["nodes"]["master"]["ip"], backend["nodes"]["master"]["port"]), + "slave": "{}:{}".format(backend["nodes"]["slave"]["ip"], backend["nodes"]["slave"]["port"]), + } + for proxy in param["proxies"]: + input_proxy_list.append("{}:{}".format(proxy["ip"], proxy["port"])) + + # 获取db元数据 + domain = param["clusterinfo"]["immute_domain"] + c = Cluster.objects.get(immute_domain=domain) + cluster = api.cluster.nosqlcomm.other.get_cluster_detail(c.id)[0] + db_proxy_list = cluster["twemproxy_set"] + redis_master_set = cluster["redis_master_set"] + redis_slave_set = cluster["redis_slave_set"] + db_seg_dict = defaultdict(dict) + for node_info in redis_master_set: + ins = str.split(node_info, " ")[0] + seg = str.split(node_info, " ")[1] + db_seg_dict[seg]["master"] = ins + for node_info in redis_slave_set: + ins = str.split(node_info, " ")[0] + seg = str.split(node_info, " ")[1] + db_seg_dict[seg]["slave"] = ins + self.__check_machine(input_proxy_list, input_seg_dict, db_proxy_list, db_seg_dict) + + if param["entry"]["clb"]: + clb_ip = cluster["clusterentry_set"]["clbDns"][0] + clb_info = CLBEntryDetail.objects.filter(clb_ip=clb_ip).values()[0] + db_clb = { + "ip": clb_ip, + "id": clb_info["clb_id"], + "listener_id": clb_info["listener_id"], + "region": clb_info["clb_region"], + } + # clb_domain属于域名检查项,不在这里检查 + param["entry"]["clb"].pop("clb_domain", None) + self.__check_dict_info(param["entry"]["clb"], db_clb, "clb info") + if param["entry"]["polairs"]: + polairs_info = cluster["clusterentry_set"]["polaris"][0] + db_polairs = { + "name": polairs_info["polaris_name"], + "l5": polairs_info["polaris_l5"], + "token": polairs_info["polaris_token"], + } + self.__check_dict_info(param["entry"]["polairs"], db_polairs, "polaris info") + + def __check_machine(self, input_proxy_list, input_seg_dict, db_proxy_list, db_seg_dict): + if len(input_proxy_list) != len(db_proxy_list): + raise Exception("proxy num is diff") + input_proxy_list.sort() + db_proxy_list.sort() + for index, proxy_ins in enumerate(input_proxy_list): + if db_proxy_list[index] != proxy_ins: + raise Exception("{} is not in db".format(proxy_ins)) + for seg, ins in input_seg_dict.items(): + if not db_seg_dict.get(seg): + raise Exception("seg[{}] is not exist in db info]".format(seg)) + if ins["master"] != db_seg_dict[seg]["master"]: + raise Exception( + "seg[{}] master is diff. input[{}],db[{}]".format(seg, ins["master"], db_seg_dict[seg]["master"]) + ) + if ins["slave"] != db_seg_dict[seg]["slave"]: + raise Exception( + "seg[{}] slave is diff. input[{}],db[{}]".format(seg, ins["slave"], db_seg_dict[seg]["slave"]) + ) + + def __check_dict_info(self, input_dict: dict, db_dict: dict, notice): + for k, v in input_dict.items(): + if not db_dict.get(k): + raise Exception("seg[{}] is not exist in db info]".format(k)) + if v != db_dict[k]: + raise Exception("seg[{}] {}. input[{}],db[{}]".format(k, notice, v, db_dict[k])) + + def redis_cluster_migrate_compair(self): + """ + 实例列表、seg + 主从关系 + proxy列表 + """ + self.__check(self.data) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_load.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_load.py new file mode 100644 index 0000000000..1bd8a14e39 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_load.py @@ -0,0 +1,411 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import copy +import logging.config +from collections import defaultdict +from dataclasses import asdict +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.configuration.constants import DBType +from backend.db_meta.enums import InstanceRole +from backend.db_meta.enums.cluster_type import ClusterType +from backend.db_meta.enums.machine_type import MachineType +from backend.db_meta.models import AppCache, Spec +from backend.flow.consts import ClusterStatus, InstanceStatus +from backend.flow.engine.bamboo.scene.common.builder import Builder, SubBuilder +from backend.flow.engine.bamboo.scene.common.get_file_list import GetFileList +from backend.flow.plugins.components.collections.redis.exec_actuator_script import ExecuteDBActuatorScriptComponent +from backend.flow.plugins.components.collections.redis.get_redis_payload import GetRedisActPayloadComponent +from backend.flow.plugins.components.collections.redis.redis_config import RedisConfigComponent +from backend.flow.plugins.components.collections.redis.redis_db_meta import RedisDBMetaComponent +from backend.flow.plugins.components.collections.redis.trans_flies import TransFileComponent +from backend.flow.utils.redis.redis_act_playload import RedisActPayload +from backend.flow.utils.redis.redis_context_dataclass import ActKwargs, CommonContext +from backend.flow.utils.redis.redis_db_meta import RedisDBMeta +from backend.flow.utils.redis.redis_proxy_util import get_cache_backup_mode + +logger = logging.getLogger("flow") + + +class RedisClusterMigrateLoadFlow(object): + """ + redis清档 + """ + + def __init__(self, root_id: str, data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param data : 单据传递过来的参数列表,是dict格式 + """ + self.root_id = root_id + self.data = data + + def __dispose_cluster_params(self, cluster_info: dict) -> dict: + """ + 处理集群参数 + 返回需要的处理后的数据格式 + """ + master_ips = [] + slave_ips = [] + spec_id_dict = {} + seg_dict = {} + role_dict = {} + ip_port_dict = defaultdict(list) + server_shards = defaultdict(dict) + repl_list = [] + proxy_ips = [proxy["ip"] for proxy in cluster_info["proxies"]] + proxy_spec_id = cluster_info["proxies"][0]["spec_id"] + + proxy_port = cluster_info["proxies"][0]["port"] + for backend in cluster_info["backends"]: + mip = backend["nodes"]["master"]["ip"] + sip = backend["nodes"]["slave"]["ip"] + mport = backend["nodes"]["master"]["port"] + sport = backend["nodes"]["slave"]["port"] + master_ips.append(mip) + slave_ips.append(sip) + ip_port_dict[mip].append(mport) + ip_port_dict[sip].append(sport) + spec_id_dict[mip] = backend["nodes"]["master"]["spec_id"] + spec_id_dict[sip] = backend["nodes"]["slave"]["spec_id"] + role_dict[mip] = "new_master_ips" + role_dict[sip] = "new_slave_ips" + seg_dict["{}:{}".format(mip, mport)] = backend["shard"] + server_shards[mip]["{}:{}".format(mip, mport)] = backend["shard"] + server_shards[sip]["{}:{}".format(sip, mport)] = backend["shard"] + repl_list.append({"master_ip": mip, "master_port": int(mport), "slave_ip": sip, "slave_port": int(sport)}) + + return { + "spec_id_dict": spec_id_dict, + "seg_dict": seg_dict, + "repl_list": repl_list, + "role_dict": role_dict, + "ip_port_dict": dict(ip_port_dict), + "proxy_ips": proxy_ips, + "proxy_port": proxy_port, + "proxy_spec_id": proxy_spec_id, + "proxy_spec_config": Spec.objects.get(spec_id=proxy_spec_id).get_spec_info(), + "master_ips": list(set(master_ips)), + "slave_ips": list(set(slave_ips)), + "server_shards": dict(server_shards), + } + + def redis_cluster_migrate_load_flow(self): + """ + if dbha: + 只更新元数据 + 1、写元数据(这里地方挪了cc) + 1.1 proxy元数据 + 1.2 redis元数据 + 1.3 集群元数据 + 2、写配置文件 + 3、if 安装dbmon + """ + app = AppCache.get_app_attr(self.data["bk_biz_id"], "db_app_abbr") + app_name = AppCache.get_app_attr(self.data["bk_biz_id"], "bk_biz_name") + ins_status = InstanceStatus.UNAVAILABLE + if self.data["migrate_ctl"]["dbha"]: + ins_status = InstanceStatus.RUNNING + # 暂时只支持twemproxy的迁移 + proxy_type = MachineType.TWEMPROXY.value + redis_pipeline = Builder(root_id=self.root_id, data=self.data) + act_kwargs = ActKwargs() + act_kwargs.set_trans_data_dataclass = CommonContext.__name__ + act_kwargs.is_update_trans_data = True + cluster_tpl = { + "created_by": self.data["created_by"], + "bk_biz_id": self.data["bk_biz_id"], + "bk_cloud_id": self.data["bk_cloud_id"], + } + + sub_pipelines = [] + for params in self.data["clusters"]: + cluster = self.__dispose_cluster_params(params) + sub_pipeline = SubBuilder(root_id=self.root_id, data=self.data) + act_kwargs.cluster = { + "cluster_type": params["clusterinfo"]["cluster_type"], + "db_version": params["clusterinfo"]["db_version"], + } + + if ins_status == InstanceStatus.RUNNING: + acts_list = [] + for redis_ip in cluster["master_ips"] + cluster["slave_ips"]: + act_kwargs.cluster["meta_update_ip"] = redis_ip + act_kwargs.cluster["meta_update_ports"] = cluster["ip_port_dict"][redis_ip] + act_kwargs.cluster["meta_update_status"] = InstanceStatus.RUNNING + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.instances_status_update.__name__ + acts_list.append( + { + "act_name": _("{}-更新redis状态".format(redis_ip)), + "act_component_code": RedisDBMetaComponent.code, + "kwargs": asdict(act_kwargs), + }, + ) + for proxy_ip in cluster["proxy_ips"]: + act_kwargs.cluster["meta_update_ip"] = proxy_ip + act_kwargs.cluster["meta_update_ports"] = [cluster["proxy_port"]] + act_kwargs.cluster["meta_update_status"] = InstanceStatus.RUNNING + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.instances_status_update.__name__ + acts_list.append( + { + "act_name": _("{}-更新proxy状态".format(proxy_ip)), + "act_component_code": RedisDBMetaComponent.code, + "kwargs": asdict(act_kwargs), + }, + ) + sub_pipeline.add_parallel_acts(acts_list) + sub_pipelines.append( + sub_pipeline.build_sub_process( + sub_name=_("{}更新状态子任务").format(params["clusterinfo"]["immute_domain"]) + ) + ) + continue + + sub_pipeline.add_act( + act_name=_("初始化配置"), act_component_code=GetRedisActPayloadComponent.code, kwargs=asdict(act_kwargs) + ) + + # 下发介质包 + trans_files = GetFileList(db_type=DBType.Redis) + act_kwargs.file_list = trans_files.redis_dbmon() + act_kwargs.exec_ip = cluster["proxy_ips"] + cluster["master_ips"] + cluster["slave_ips"] + sub_pipeline.add_act( + act_name=_("下发介质包"), + act_component_code=TransFileComponent.code, + kwargs=asdict(act_kwargs), + ) + + # 写配置文件 begin + acts_list = [] + act_kwargs.cluster = { + "conf": { + "maxmemory": str(params["config"]["maxmemory"]), + "databases": str(params["config"]["databases"]), + "requirepass": params["config"]["requirepass"], + }, + "db_version": params["clusterinfo"]["db_version"], + "domain_name": params["clusterinfo"]["immute_domain"], + } + if params["clusterinfo"]["cluster_type"] == ClusterType.TendisTwemproxyRedisInstance.value: + act_kwargs.cluster["conf"]["cluster-enabled"] = ClusterStatus.REDIS_CLUSTER_NO + + act_kwargs.get_redis_payload_func = RedisActPayload.set_redis_config.__name__ + acts_list.append( + { + "act_name": _("回写集群配置[Redis]"), + "act_component_code": RedisConfigComponent.code, + "kwargs": asdict(act_kwargs), + }, + ) + + act_kwargs.cluster = { + "conf": { + "password": params["config"]["proxypass"], + "redis_password": params["config"]["requirepass"], + "port": str(cluster["proxy_port"]), + }, + "domain_name": params["clusterinfo"]["immute_domain"], + } + act_kwargs.get_redis_payload_func = RedisActPayload.set_proxy_config.__name__ + acts_list.append( + { + "act_name": _("回写集群配置[Twemproxy]"), + "act_component_code": RedisConfigComponent.code, + "kwargs": asdict(act_kwargs), + } + ) + sub_pipeline.add_parallel_acts(acts_list) + # 写配置文件 end + + # proxy 相关操作 + act_kwargs.cluster = copy.deepcopy(cluster_tpl) + act_kwargs.cluster["machine_type"] = proxy_type + act_kwargs.cluster["cluster_type"] = params["clusterinfo"]["cluster_type"] + # proxy元数据 - 批量写入 + act_kwargs.cluster["ins_status"] = ins_status + act_kwargs.cluster["new_proxy_ips"] = cluster["proxy_ips"] + act_kwargs.cluster["port"] = cluster["proxy_port"] + act_kwargs.cluster["spec_id"] = cluster["proxy_spec_id"] + act_kwargs.cluster["spec_config"] = cluster["proxy_spec_config"] + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.proxy_install.__name__ + sub_pipeline.add_act( + act_name=_("Proxy写入元数据"), + act_component_code=RedisDBMetaComponent.code, + kwargs=asdict(act_kwargs), + ) + + # proxy 安装dbmon + acts_list = [] + for proxy_ip in cluster["proxy_ips"]: + act_kwargs.cluster["servers"] = [ + { + "app": app, + "app_name": app_name, + "bk_biz_id": str(self.data["bk_biz_id"]), + "bk_cloud_id": int(self.data["bk_cloud_id"]), + "server_ip": proxy_ip, + "server_ports": [cluster["proxy_port"]], + "meta_role": proxy_type, + "cluster_domain": params["clusterinfo"]["immute_domain"], + "cluster_name": params["clusterinfo"]["name"], + "cluster_type": params["clusterinfo"]["cluster_type"], + } + ] + act_kwargs.exec_ip = proxy_ip + act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ + + acts_list.append( + { + "act_name": _("Proxy{}-安装监控").format(proxy_ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + }, + ) + sub_pipeline.add_parallel_acts(acts_list) + + # redis 相关操作 + # 写入元数据,后面实例的规格可能不一样,所以不能批量写入 + acts_list = [] + for redis_ip in cluster["master_ips"] + cluster["slave_ips"]: + act_kwargs.cluster = copy.deepcopy(cluster_tpl) + act_kwargs.cluster["cluster_type"] = params["clusterinfo"]["cluster_type"] + act_kwargs.cluster["ins_status"] = ins_status + + act_kwargs.cluster[cluster["role_dict"][redis_ip]] = [redis_ip] + act_kwargs.cluster["ports"] = cluster["ip_port_dict"][redis_ip] + act_kwargs.cluster["spec_id"] = cluster["spec_id_dict"][redis_ip] + act_kwargs.cluster["spec_config"] = Spec.objects.get( + spec_id=cluster["spec_id_dict"][redis_ip] + ).get_spec_info() + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.redis_install.__name__ + acts_list.append( + { + "act_name": _("Redis-{}-写入元数据").format(redis_ip), + "act_component_code": RedisDBMetaComponent.code, + "kwargs": asdict(act_kwargs), + }, + ) + sub_pipeline.add_parallel_acts(acts_list) + # 安装dbmon + acts_list = [] + for redis_ip in cluster["master_ips"]: + act_kwargs.cluster["servers"] = [ + { + "app": app, + "app_name": app_name, + "bk_biz_id": str(self.data["bk_biz_id"]), + "bk_cloud_id": int(self.data["bk_cloud_id"]), + "server_ip": redis_ip, + "server_ports": cluster["ip_port_dict"][redis_ip], + "meta_role": InstanceRole.REDIS_MASTER.value, + "cluster_domain": params["clusterinfo"]["immute_domain"], + "cluster_name": params["clusterinfo"]["name"], + "cluster_type": params["clusterinfo"]["cluster_type"], + "server_shards": cluster["server_shards"][redis_ip], + "cache_backup_mode": get_cache_backup_mode(self.data["bk_biz_id"], 0), + } + ] + act_kwargs.exec_ip = redis_ip + act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ + acts_list.append( + { + "act_name": _("Redis{}-安装监控").format(redis_ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + }, + ) + for redis_ip in cluster["slave_ips"]: + act_kwargs.cluster["servers"] = [ + { + "app": app, + "app_name": app_name, + "bk_biz_id": str(self.data["bk_biz_id"]), + "bk_cloud_id": int(self.data["bk_cloud_id"]), + "server_ip": redis_ip, + "server_ports": cluster["ip_port_dict"][redis_ip], + "meta_role": InstanceRole.REDIS_SLAVE.value, + "cluster_domain": params["clusterinfo"]["immute_domain"], + "cluster_name": params["clusterinfo"]["name"], + "cluster_type": params["clusterinfo"]["cluster_type"], + "server_shards": cluster["server_shards"][redis_ip], + "cache_backup_mode": get_cache_backup_mode(self.data["bk_biz_id"], 0), + } + ] + act_kwargs.exec_ip = redis_ip + act_kwargs.get_redis_payload_func = RedisActPayload.bkdbmon_install.__name__ + acts_list.append( + { + "act_name": _("Redis{}-安装监控").format(redis_ip), + "act_component_code": ExecuteDBActuatorScriptComponent.code, + "kwargs": asdict(act_kwargs), + }, + ) + sub_pipeline.add_parallel_acts(acts_list) + # 建立主从关系 + act_kwargs.cluster = { + "repl": cluster["repl_list"], + "created_by": self.data["created_by"], + "meta_func_name": RedisDBMeta.replicaof_link.__name__, + } + sub_pipeline.add_act( + act_name=_("redis建立主从 元数据"), act_component_code=RedisDBMetaComponent.code, kwargs=asdict(act_kwargs) + ) + + # 建立集群关系 + servers = [] + for ins, seg in cluster["seg_dict"].items(): + servers.append("{} {} {} {}".format(ins, params["clusterinfo"]["name"], seg, 1)) + act_kwargs.cluster = { + "new_proxy_ips": cluster["proxy_ips"], + "servers": servers, + "proxy_port": cluster["proxy_port"], + "cluster_type": params["clusterinfo"]["cluster_type"], + "bk_biz_id": self.data["bk_biz_id"], + "bk_cloud_id": self.data["bk_cloud_id"], + "cluster_name": params["clusterinfo"]["name"], + "cluster_alias": params["clusterinfo"]["alias"], + "db_version": params["clusterinfo"]["db_version"], + "immute_domain": params["clusterinfo"]["immute_domain"], + "created_by": self.data["created_by"], + "region": params["clusterinfo"]["region"], + "meta_func_name": RedisDBMeta.redis_make_cluster.__name__, + } + sub_pipeline.add_act( + act_name=_("建立集群 元数据"), act_component_code=RedisDBMetaComponent.code, kwargs=asdict(act_kwargs) + ) + + # clb、北极星需要写元数据 + if len(params["entry"]["clb"]) != 0: + act_kwargs.cluster = params["entry"]["clb"] + act_kwargs.cluster["bk_cloud_id"] = self.data["bk_cloud_id"] + act_kwargs.cluster["immute_domain"] = params["clusterinfo"]["immute_domain"] + act_kwargs.cluster["created_by"] = self.data["created_by"] + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.add_clb_domain.__name__ + sub_pipeline.add_act( + act_name=_("clb元数据写入"), act_component_code=RedisDBMetaComponent.code, kwargs=asdict(act_kwargs) + ) + + if len(params["entry"]["polairs"]) != 0: + act_kwargs.cluster = params["entry"]["polairs"] + act_kwargs.cluster["bk_cloud_id"] = self.data["bk_cloud_id"] + act_kwargs.cluster["immute_domain"] = params["clusterinfo"]["immute_domain"] + act_kwargs.cluster["created_by"] = self.data["created_by"] + act_kwargs.cluster["meta_func_name"] = RedisDBMeta.add_polairs_domain.__name__ + sub_pipeline.add_act( + act_name=_("polairs元数据写入"), act_component_code=RedisDBMetaComponent.code, kwargs=asdict(act_kwargs) + ) + sub_pipelines.append( + sub_pipeline.build_sub_process(sub_name=_("{}迁移子任务").format(params["clusterinfo"]["immute_domain"])) + ) + redis_pipeline.add_parallel_sub_pipeline(sub_flow_list=sub_pipelines) + redis_pipeline.run_pipeline() diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_precheck.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_precheck.py new file mode 100644 index 0000000000..043c39fc22 --- /dev/null +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_migrate_precheck.py @@ -0,0 +1,158 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-DB管理系统(BlueKing-BK-DBM) available. +Copyright (C) 2017-2023 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import logging.config +from collections import defaultdict +from dataclasses import asdict +from typing import Dict, Optional + +from django.utils.translation import ugettext as _ + +from backend.components import DBConfigApi +from backend.components.db_name_service.client import NameServiceApi +from backend.components.dbconfig.constants import FormatType, LevelName +from backend.components.gcs_dns.client import GcsDnsApi +from backend.configuration.constants import DBType +from backend.db_meta.enums import InstanceRole +from backend.db_meta.models import Cluster, Machine +from backend.flow.consts import DEFAULT_TWEMPROXY_SEG_TOTOL_NUM, ConfigTypeEnum + +logger = logging.getLogger("flow") + + +class RedisClusterMigratePrecheckFlow(object): + """ + redis集群迁移前置检查 + """ + + def __init__(self, root_id: str, data: Optional[Dict]): + """ + @param root_id : 任务流程定义的root_id + @param data : 单据传递过来的参数列表,是dict格式 + """ + self.root_id = root_id + self.data = data + + def __check(self, data: dict): + for cluster_node in data["clusters"]: + domain = cluster_node["clusterinfo"]["immute_domain"] + db_version = cluster_node["clusterinfo"]["db_version"] + + proxy_ips = [proxy["ip"] for proxy in cluster_node["proxies"]] + proxy_ports = [proxy["port"] for proxy in cluster_node["proxies"]] + if len(set(proxy_ports)) != 1: + raise Exception("have more diff port {}".format(set(proxy_ports))) + all_ins = [] + master_ips = [] + slave_ips = [] + all_seg = [] + for backend in cluster_node["backends"]: + all_seg.append(backend["shard"]) + mip = backend["nodes"]["master"]["ip"] + sip = backend["nodes"]["slave"]["ip"] + all_ins.append("{}:{}".format(mip, backend["nodes"]["master"]["port"])) + all_ins.append("{}:{}".format(sip, backend["nodes"]["slave"]["port"])) + master_ips.append(mip) + slave_ips.append(sip) + # 去重 + master_ips = list(set(master_ips)) + slave_ips = list(set(slave_ips)) + all_ips = master_ips + slave_ips + proxy_ips + self.__check_meta(all_ips, domain) + self.__check_params(all_ins, all_seg) + + dbconfig = { + "bk_biz_id": str(data["bk_biz_id"]), + "level_name": LevelName.APP, + "level_value": str(data["bk_biz_id"]), + "conf_file": db_version, + "conf_type": ConfigTypeEnum.DBConf, + "namespace": cluster_node["clusterinfo"]["cluster_type"], + "format": FormatType.MAP, + } + self.__check_other_module(domain, cluster_node["entry"]["clb"], cluster_node["entry"]["polairs"], dbconfig) + + def __check_meta(self, ips: list, domain: str): + """ + 机器有没有被复用 + db_meta_machine: 查机器 + db_meta_cluster: 查域名 + db_meta_proxyinstance: 查proxy实例 有外键,不需要检查 + db_meta_storageinstance: 查redis实例 有外键,不需要检查 + """ + if len(ips) != len(set(ips)): + raise Exception("ip have repetition") + d = Cluster.objects.filter(immute_domain=domain).values("immute_domain") + if len(d) != 0: + raise Exception("domain cluster [{}] is exist.".format(domain)) + m = Machine.objects.filter(ip__in=ips).values("ip") + if len(m) != 0: + raise Exception("[{}] is used.".format(m)) + + def __check_other_module(self, domain: str, clb: dict, polairs: dict, dbconfig: dict): + """ + 1、域名系统查域名是否已存在 + 2、clb、北极星是否已存在 + 3、dbconfig中是否已存在对应的db_version模板 + """ + data = DBConfigApi.query_conf_item(params=dbconfig) + if len(data["content"]) == 0: + raise Exception("db_version config is not definition") + + res = GcsDnsApi.get_domain({"domain_name": f"{domain}."}) + if len(res["detail"]) != 0: + raise Exception("domain_name {} is exist".format(domain)) + if len(clb) != 0: + res = NameServiceApi.clb_get_target_private_ips( + {"region": clb["region"], "loadbalancerid": clb["id"], "listenerid": clb["listener_id"]} + ) + if len(res) == 0: + raise Exception("clb {} is not exist".format(clb["listener_id"])) + + if len(polairs) != 0: + res = NameServiceApi.polaris_describe_targets({"servicename": polairs["name"]}) + if len(res) == 0: + raise Exception("polairs {} is not exist".format(polairs["name"])) + + def __check_params(self, all_ins: list, all_seg: list): + """ + 1、实例是否有重复 + 2、seg是否有重复和全覆盖 + """ + if len(all_ins) != len(set(all_ins)): + raise Exception("ips have repetition") + + begin_seg_list = [] + end_seg_list = [] + try: + for seg in all_seg: + begin_seg_list.append(int(str.split(seg, "-")[0])) + end_seg_list.append(int(str.split(seg, "-")[1])) + + begin_seg_list.sort() + end_seg_list.sort() + + seg_num = len(begin_seg_list) + if begin_seg_list[0] != 0 or end_seg_list[seg_num - 1] != DEFAULT_TWEMPROXY_SEG_TOTOL_NUM - 1: + raise Exception("redis set is not cover all [{} or {}]".format(0, DEFAULT_TWEMPROXY_SEG_TOTOL_NUM - 1)) + for index in (1, seg_num - 1): + if begin_seg_list[index] != end_seg_list[index - 1] + 1: + raise Exception( + "redis set is not cover all [{} ~ {}]".format(begin_seg_list[index], end_seg_list[index - 1]) + ) + + except ImportError as exc: + raise exc + + def redis_cluster_migrate_precheck_flow(self): + """ + 集群迁移前置检查 + """ + self.__check(self.data) diff --git a/dbm-ui/backend/flow/engine/controller/redis.py b/dbm-ui/backend/flow/engine/controller/redis.py index 44bd2abc49..ede7297ebe 100644 --- a/dbm-ui/backend/flow/engine/controller/redis.py +++ b/dbm-ui/backend/flow/engine/controller/redis.py @@ -18,6 +18,9 @@ from backend.flow.engine.bamboo.scene.redis.redis_cluster_instance_shutdown import ( RedisClusterInstanceShutdownSceneFlow, ) +from backend.flow.engine.bamboo.scene.redis.redis_cluster_migrate_compair import RedisClusterMigrateCompairFlow +from backend.flow.engine.bamboo.scene.redis.redis_cluster_migrate_load import RedisClusterMigrateLoadFlow +from backend.flow.engine.bamboo.scene.redis.redis_cluster_migrate_precheck import RedisClusterMigratePrecheckFlow from backend.flow.engine.bamboo.scene.redis.redis_cluster_open_close import RedisClusterOpenCloseFlow from backend.flow.engine.bamboo.scene.redis.redis_cluster_scene_auotfix import RedisClusterAutoFixSceneFlow from backend.flow.engine.bamboo.scene.redis.redis_cluster_scene_cmr import RedisClusterCMRSceneFlow @@ -232,6 +235,27 @@ def redis_cluster_add_slave(self): flow = RedisClusterAddSlaveFlow(root_id=self.root_id, data=self.ticket_data) flow.add_slave_flow() + def redis_cluster_migrate_precheck(self): + """ + redis迁移前置检查 + """ + flow = RedisClusterMigratePrecheckFlow(root_id=self.root_id, data=self.ticket_data) + flow.redis_cluster_migrate_precheck_flow() + + def redis_cluster_migrate_load(self): + """ + redis迁移 + """ + flow = RedisClusterMigrateLoadFlow(root_id=self.root_id, data=self.ticket_data) + flow.redis_cluster_migrate_load_flow() + + def redis_cluster_migrate_compair(self): + """ + redis迁移后置验证 + """ + flow = RedisClusterMigrateCompairFlow(root_id=self.root_id, data=self.ticket_data) + flow.redis_cluster_migrate_compair() + def redis_cluster_version_update_online(self): """ redis 集群版本在线升级 diff --git a/dbm-ui/backend/flow/urls.py b/dbm-ui/backend/flow/urls.py index 7bda501844..9739bb2e1a 100644 --- a/dbm-ui/backend/flow/urls.py +++ b/dbm-ui/backend/flow/urls.py @@ -114,6 +114,9 @@ RedisClusterBackupSceneApiView, RedisClusterDataCheckRepairApiView, RedisClusterDataCopySceneApiView, + RedisClusterMigrateCompair, + RedisClusterMigrateLoad, + RedisClusterMigratePrecheck, RedisClusterOpenCloseSceneApiView, RedisClusterShardNumUpdateSceneApiView, RedisClusterShutdownSceneApiView, @@ -199,6 +202,9 @@ url(r"^scene/redis_data_structure$", RedisDataStructureSceneApiView.as_view()), url(r"^scene/redis_data_structure_task_delete$", RedisDataStructureTaskDeleteSceneApiView.as_view()), url(r"^scene/redis_cluster_add_slave$", RedisClusterAddSlaveApiView.as_view()), + url(r"^scene/redis_migrate_precheck$", RedisClusterMigratePrecheck.as_view()), + url(r"^scene/redis_migrate_load$", RedisClusterMigrateLoad.as_view()), + url(r"^scene/redis_migrate_compair$", RedisClusterMigrateCompair.as_view()), url(r"^scene/redis_cluster_version_update_online$", RedisClusterVersionUpdateOnlineApiView.as_view()), # redis api url end # dns api diff --git a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py index 3367d4c63e..7081b05d6a 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_act_playload.py +++ b/dbm-ui/backend/flow/utils/redis/redis_act_playload.py @@ -69,6 +69,7 @@ TicketType.REDIS_CLUSTER_CUTOFF.value, TicketType.REDIS_CLUSTER_ADD_SLAVE.value, ] +migrate_list = [TicketType.TENDIS_META_MITRATE.value] tool_list = [TicketType.REDIS_DATA_STRUCTURE.value, TicketType.REDIS_DATA_STRUCTURE_TASK_DELETE.value] twemproxy_cluster_type_list = [ ClusterType.TendisTwemproxyRedisInstance.value, @@ -104,12 +105,15 @@ def __init__(self, ticket_data: dict, cluster: dict): ) self.__init_dbconfig_params() - if self.ticket_data["ticket_type"] in apply_list + cutoff_list + tool_list: + if self.ticket_data["ticket_type"] in apply_list + cutoff_list + tool_list + migrate_list: self.account = self.__get_define_config(NameSpaceEnum.Common, ConfigFileEnum.OS, ConfigTypeEnum.OSConf) - if "db_version" in self.ticket_data: - self.init_redis_config = self.__get_define_config( - self.namespace, self.ticket_data["db_version"], ConfigTypeEnum.DBConf - ) + db_version = "" + if "db_version" in self.cluster: + db_version = self.cluster["db_version"] + elif "db_version" in self.ticket_data: + db_version = self.ticket_data["db_version"] + if db_version != "": + self.init_redis_config = self.__get_define_config(self.namespace, db_version, ConfigTypeEnum.DBConf) self.init_proxy_config = self.__get_define_config( self.namespace, self.proxy_version, ConfigTypeEnum.ProxyConf ) diff --git a/dbm-ui/backend/flow/utils/redis/redis_db_meta.py b/dbm-ui/backend/flow/utils/redis/redis_db_meta.py index 4e7a625c56..7635a499b9 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_db_meta.py +++ b/dbm-ui/backend/flow/utils/redis/redis_db_meta.py @@ -33,7 +33,16 @@ InstanceRole, MachineType, ) -from backend.db_meta.models import Cluster, ClusterEntry, Machine, ProxyInstance, StorageInstance, StorageInstanceTuple +from backend.db_meta.models import ( + CLBEntryDetail, + Cluster, + ClusterEntry, + Machine, + PolarisEntryDetail, + ProxyInstance, + StorageInstance, + StorageInstanceTuple, +) from backend.db_services.dbbase.constants import IP_PORT_DIVIDER, SPACE_DIVIDER from backend.db_services.redis.rollback.models import TbTendisRollbackTasks from backend.flow.consts import DEFAULT_DB_MODULE_ID, ConfigFileEnum, ConfigTypeEnum, InstanceStatus @@ -92,12 +101,13 @@ def proxy_install(self) -> bool: proxies.append({"ip": ip, "port": self.cluster["port"]}) with atomic(): + ins_status = InstanceStatus.RUNNING + if self.cluster.get("ins_status"): + ins_status = self.cluster["ins_status"] api.machine.create( machines=machines, creator=self.cluster["created_by"], bk_cloud_id=self.cluster["bk_cloud_id"] ) - api.proxy_instance.create( - proxies=proxies, creator=self.cluster["created_by"], status=InstanceStatus.RUNNING - ) + api.proxy_instance.create(proxies=proxies, creator=self.cluster["created_by"], status=ins_status) return True def proxy_add_cluster(self) -> bool: @@ -190,9 +200,13 @@ def redis_install(self) -> bool: "spec_config": self.cluster["spec_config"], } ) - for n in range(0, self.cluster["inst_num"]): - port = n + self.cluster["start_port"] - ins.append({"ip": ip, "port": port, "instance_role": InstanceRole.REDIS_MASTER.value}) + if self.cluster.get("inst_num"): + for n in range(0, self.cluster["inst_num"]): + port = n + self.cluster["start_port"] + ins.append({"ip": ip, "port": port, "instance_role": InstanceRole.REDIS_MASTER.value}) + elif self.cluster.get("ports"): + for port in self.cluster["ports"]: + ins.append({"ip": ip, "port": port, "instance_role": InstanceRole.REDIS_MASTER.value}) if self.cluster.get("new_slave_ips"): for ip in self.cluster.get("new_slave_ips"): @@ -205,23 +219,38 @@ def redis_install(self) -> bool: "spec_config": self.cluster["spec_config"], } ) - for n in range(0, self.cluster["inst_num"]): - port = n + self.cluster["start_port"] - ins.append({"ip": ip, "port": port, "instance_role": InstanceRole.REDIS_SLAVE.value}) + if self.cluster.get("inst_num"): + for n in range(0, self.cluster["inst_num"]): + port = n + self.cluster["start_port"] + ins.append({"ip": ip, "port": port, "instance_role": InstanceRole.REDIS_SLAVE.value}) + elif self.cluster.get("ports"): + for port in self.cluster["ports"]: + ins.append({"ip": ip, "port": port, "instance_role": InstanceRole.REDIS_SLAVE.value}) with atomic(): bk_cloud_id = 0 + ins_status = InstanceStatus.RUNNING + if self.cluster.get("ins_status"): + ins_status = self.cluster["ins_status"] if "bk_cloud_id" in self.ticket_data: bk_cloud_id = self.ticket_data["bk_cloud_id"] else: bk_cloud_id = self.cluster["bk_cloud_id"] api.machine.create(machines=machines, creator=self.ticket_data["created_by"], bk_cloud_id=bk_cloud_id) - api.storage_instance.create(instances=ins, creator=self.ticket_data["created_by"]) + api.storage_instance.create(instances=ins, creator=self.ticket_data["created_by"], status=ins_status) return True def replicaof(self) -> bool: """ - 批量配置主从关系 + 批量配置主从关系,传参为cluster。 主要是在安装时使用 + "inst_num":xx, + "start_port":xx, + "bacth_pairs": [ + { + "master_ip":xxx, + "slave_ip":xxx, + } + ] """ replic_tuple = [] for pair in self.cluster["bacth_pairs"]: @@ -247,6 +276,33 @@ def replicaof(self) -> bool: api.storage_instance_tuple.create(replic_tuple, creator=self.cluster["created_by"]) return True + def replicaof_link(self) -> bool: + """ + 批量配置主从关系。传参为实例对应关系列表 + [ + { + "master_ip":xxx, + "master_port:xx, + "slave_ip":xxx, + "slave_port":xx + } + ] + """ + replic_tuple = [] + for repl_info in self.cluster["repl"]: + master_ip = repl_info["master_ip"] + master_port = repl_info["master_port"] + slave_ip = repl_info["slave_ip"] + slave_port = repl_info["slave_port"] + replic_tuple.append( + { + "ejector": {"ip": master_ip, "port": master_port}, + "receiver": {"ip": slave_ip, "port": slave_port}, + } + ) + api.storage_instance_tuple.create(replic_tuple, creator=self.cluster["created_by"]) + return True + def redis_make_cluster(self) -> bool: """ 建立集群关系 @@ -577,6 +633,60 @@ def redis_role_swap_4_scene(self) -> bool: return True + def add_clb_domain(self): + """ + 增加clb记录 + """ + entry_type = ClusterEntryType.CLB + if self.cluster["clb_domain"] != "": + entry_type = ClusterEntryType.CLBDNS + cluster = Cluster.objects.get( + bk_cloud_id=self.cluster["bk_cloud_id"], immute_domain=self.cluster["immute_domain"] + ) + cluster_entry = ClusterEntry.objects.create( + cluster=cluster, + cluster_entry_type=entry_type, + entry=self.cluster["ip"], + creator=self.cluster["created_by"], + ) + cluster_entry.save() + clb_entry = CLBEntryDetail.objects.create( + clb_ip=self.cluster["ip"], + clb_id=self.cluster["id"], + listener_id=self.cluster["listener_id"], + clb_region=self.cluster["region"], + entry_id=cluster_entry.id, + creator=self.cluster["created_by"], + ) + clb_entry.save() + + def add_polairs_domain(self): + """ + 增加polairs记录 + """ + cluster = Cluster.objects.get( + bk_cloud_id=self.cluster["bk_cloud_id"], immute_domain=self.cluster["immute_domain"] + ) + cluster_entry = ClusterEntry.objects.create( + cluster=cluster, + cluster_entry_type=ClusterEntryType.POLARIS, + entry=self.cluster["name"], + creator=self.cluster["created_by"], + ) + cluster_entry.save() + alias_token = "" + if self.cluster.get("alias_token"): + alias_token = self.cluster["alias_token"] + polaris_entry = PolarisEntryDetail.objects.create( + polaris_name=self.cluster["name"], + polaris_l5=self.cluster["l5"], + polaris_token=self.cluster["token"], + alias_token=alias_token, + entry_id=cluster_entry.id, + creator=self.cluster["created_by"], + ) + polaris_entry.save() + def tendis_add_clb_domain_4_scene(self): """ 增加CLB 域名 """ cluster = Cluster.objects.get( diff --git a/dbm-ui/backend/flow/views/redis_cluster.py b/dbm-ui/backend/flow/views/redis_cluster.py index 958b1c4c1f..d2eaa112f3 100644 --- a/dbm-ui/backend/flow/views/redis_cluster.py +++ b/dbm-ui/backend/flow/views/redis_cluster.py @@ -564,3 +564,39 @@ def post(request): root_id = uuid.uuid1().hex RedisController(root_id=root_id, ticket_data=request.data).redis_cluster_version_update_online() return Response({"root_id": root_id}) + + +class RedisClusterMigratePrecheck(FlowTestView): + """ + 集群迁移前置检查 + """ + + @staticmethod + def post(request): + root_id = uuid.uuid1().hex + RedisController(root_id=root_id, ticket_data=request.data).redis_cluster_migrate_precheck() + return Response({"root_id": root_id}) + + +class RedisClusterMigrateLoad(FlowTestView): + """ + 集群迁移 + """ + + @staticmethod + def post(request): + root_id = uuid.uuid1().hex + RedisController(root_id=root_id, ticket_data=request.data).redis_cluster_migrate_load() + return Response({"root_id": root_id}) + + +class RedisClusterMigrateCompair(FlowTestView): + """ + 集群迁移数据对比 + """ + + @staticmethod + def post(request): + root_id = uuid.uuid1().hex + RedisController(root_id=root_id, ticket_data=request.data).redis_cluster_migrate_compair() + return Response({"root_id": root_id}) diff --git a/dbm-ui/backend/ticket/constants.py b/dbm-ui/backend/ticket/constants.py index 854ffa098a..68823303f0 100644 --- a/dbm-ui/backend/ticket/constants.py +++ b/dbm-ui/backend/ticket/constants.py @@ -235,6 +235,7 @@ def get_choice_value(cls, label: str) -> str: REDIS_DATACOPY_CHECK_REPAIR = EnumField("REDIS_DATACOPY_CHECK_REPAIR", _("Redis 数据校验与修复")) REDIS_CLUSTER_ADD_SLAVE = EnumField("REDIS_CLUSTER_ADD_SLAVE", _("Redis 新增slave节点")) REDIS_DTS_ONLINE_SWITCH = EnumField("REDIS_DTS_ONLINE_SWITCH", _("Redis DTS在线切换")) + TENDIS_META_MITRATE = EnumField("TENDIS_META_MITRATE", _("Redis 数据迁移")) # 大数据 KAFKA_APPLY = EnumField("KAFKA_APPLY", _("Kafka 集群部署"))