From a327b26c0a28499227ba52833dc91d0c35d5cb70 Mon Sep 17 00:00:00 2001 From: xiepaup Date: Tue, 15 Oct 2024 11:53:19 +0800 Subject: [PATCH] =?UTF-8?q?feat(redis):=20=E5=8D=95=E6=8D=AE=E4=BC=98?= =?UTF-8?q?=E5=8C=96(=E8=87=AA=E6=84=88=E3=80=81=E6=95=B4=E6=9C=BA?= =?UTF-8?q?=E6=9B=BF=E6=8D=A2=EF=BC=89=20#7346?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbm-ui/backend/db_meta/api/cluster/apis.py | 11 ++++++ .../redis_clusternodes_update/task.py | 14 +++++-- .../db_services/dbresource/handlers.py | 38 +++++++++++++++---- .../db_services/redis/autofix/watcher.py | 9 +++-- .../scene/redis/redis_cluster_add_slave.py | 11 +++++- .../redis/redis_cluster_scene_auotfix.py | 10 +++-- .../scene/redis/redis_cluster_scene_cmr.py | 24 ++++++++---- .../scene/redis/redis_cluster_scene_mss.py | 11 ++++-- .../backend/flow/utils/redis/redis_db_meta.py | 22 ++++++----- 9 files changed, 111 insertions(+), 39 deletions(-) diff --git a/dbm-ui/backend/db_meta/api/cluster/apis.py b/dbm-ui/backend/db_meta/api/cluster/apis.py index 4fa8e9f839..e79dbd4687 100644 --- a/dbm-ui/backend/db_meta/api/cluster/apis.py +++ b/dbm-ui/backend/db_meta/api/cluster/apis.py @@ -52,6 +52,17 @@ def domain_exists(domains: List[str]) -> Dict[str, bool]: return res +def query_cluster_by_hosts_biz(hosts: List, biz_id: int, cloud_id: int): + """ + 根据 bk_biz_id , bk_cloud_id 在过滤下 + """ + clusters = [] + for cluster in query_cluster_by_hosts(hosts): + if cluster["bk_biz_id"] == biz_id and cluster["bk_cloud_id"] == cloud_id: + clusters.append(cluster) + return clusters + + def query_cluster_by_hosts(hosts: List): """根据提供的IP 查询集群信息 diff --git a/dbm-ui/backend/db_periodic_task/local_tasks/redis_clusternodes_update/task.py b/dbm-ui/backend/db_periodic_task/local_tasks/redis_clusternodes_update/task.py index cc1ca4c27c..5c47faafbd 100644 --- a/dbm-ui/backend/db_periodic_task/local_tasks/redis_clusternodes_update/task.py +++ b/dbm-ui/backend/db_periodic_task/local_tasks/redis_clusternodes_update/task.py @@ -126,6 +126,7 @@ def redis_clusternodes_update_record(): oper_type = "" for oper in opers: if oper["ticket_type"] and oper["ticket_type"] in [ + TicketType.REDIS_CLUSTER_AUTOFIX.value, TicketType.REDIS_SCALE_UPDOWN.value, TicketType.REDIS_CLUSTER_CUTOFF.value, TicketType.REDIS_CLUSTER_INSTANCE_SHUTDOWN.value, @@ -369,9 +370,16 @@ def update_cc_info(self, cluster: Cluster): # 发起自愈流程 def start_autofix_flow(self, cluster: Cluster): # 如果存在集群中某个slave机器,所有实例均变成了 unrunning 状态,则对其发起自愈流程 - slave_group_by_ip = defaultdict(list) - for slave_obj in cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_SLAVE.value): - slave_group_by_ip[slave_obj.machine.ip].append(slave_obj) + slave_group_by_ip, nodes_by_ip = defaultdict(list), defaultdict(list) + for node_obj in cluster.storageinstance_set.all(): + nodes_by_ip[node_obj.machine.ip].append(node_obj) + for ip, objs in slave_group_by_ip.items(): + for obj in objs: # 只要有一个实例角色是Slave就放进来 + if obj.instance_role == InstanceRole.REDIS_SLAVE.value: + slave_group_by_ip[ip].append(objs) + break + # for slave_obj in cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_SLAVE.value): + # slave_group_by_ip[slave_obj.machine.ip].append(slave_obj) fault_machines = [] for ip, slave_objs in slave_group_by_ip.items(): if all([slave_obj.status == InstanceStatus.UNAVAILABLE.value for slave_obj in slave_objs]): diff --git a/dbm-ui/backend/db_services/dbresource/handlers.py b/dbm-ui/backend/db_services/dbresource/handlers.py index d606f90b2c..81d7945002 100644 --- a/dbm-ui/backend/db_services/dbresource/handlers.py +++ b/dbm-ui/backend/db_services/dbresource/handlers.py @@ -161,7 +161,9 @@ class TendisPlusSpecFilter(RedisSpecFilter): """TendisPlus集群规格过滤器""" # 最佳容量管理大小 300G - OPTIMAL_MANAGE_CAPACITY = 300 + SINGLE_SHARD_SIZE = 300 + # 单机 1 , 2,4 分片 可选 + SINGLE_SHARD_NUMBS = [1, 2, 4] def calc_machine_pair(self): """计算每种规格所需的机器组数,TendisPlus至少需要三组""" @@ -170,12 +172,34 @@ def calc_machine_pair(self): spec["cluster_capacity"] = spec["machine_pair"] * spec["capacity"] def calc_cluster_shard_num(self): + # 先进行排序 + self.specs.sort(key=lambda x: (x["capacity"])) + + # 选取合适的规格 + spec_idx, spec_cnt, candidate_specs = 0, len(self.specs), [] + # 取相近的规格 for spec in self.specs: - spec["cluster_shard_num"] = max(3, math.ceil(self.capacity / self.OPTIMAL_MANAGE_CAPACITY)) - # 将分片数上取整为机器组数的倍数 - spec["cluster_shard_num"] = ( - math.ceil(spec["cluster_shard_num"] / spec["machine_pair"]) * spec["machine_pair"] - ) + if self.capacity <= spec["capacity"]: + candidate_specs.append(spec) + if spec_idx >= 1: + candidate_specs.append(self.specs[spec_idx - 1]) + break + spec_idx += 1 + + # 最后取两个规格 + if self.capacity > self.specs[spec_cnt - 1]["capacity"]: + candidate_specs.append(self.specs[spec_cnt - 1]) + if spec_cnt > 2: + candidate_specs.append(self.specs[spec_cnt - 2]) + + aviable_specs: List[Dict[str, Any]] = [] + for spec in candidate_specs: + shard = max(1, math.ceil(spec["capacity"] / self.SINGLE_SHARD_SIZE) - 1) + if shard > 2: + shard = int(shard / 2) * 2 + spec["cluster_shard_num"] = spec["machine_pair"] * shard + aviable_specs.append(spec) + self.specs = aviable_specs def custom_filter(self): super().custom_filter() @@ -257,7 +281,7 @@ class TendisSSDSpecFilter(RedisSpecFilter): """TendisSSD集群规格过滤器""" # 单实例最大容量 50G - SINGLE_MAX_CAPACITY = 50 + SINGLE_MAX_CAPACITY = 100 MACHINE_PAIR_SORT = True def calc_cluster_shard_num(self): diff --git a/dbm-ui/backend/db_services/redis/autofix/watcher.py b/dbm-ui/backend/db_services/redis/autofix/watcher.py index 9d95230b28..d3967878a6 100644 --- a/dbm-ui/backend/db_services/redis/autofix/watcher.py +++ b/dbm-ui/backend/db_services/redis/autofix/watcher.py @@ -18,7 +18,7 @@ from backend.components.hadb.client import HADBApi from backend.constants import DEFAULT_BK_CLOUD_ID -from backend.db_meta.api.cluster.apis import query_cluster_by_hosts +from backend.db_meta.api.cluster.apis import query_cluster_by_hosts_biz from backend.db_meta.enums import ClusterType from backend.exceptions import ApiRequestError, ApiResultError from backend.utils.time import datetime2timestamp @@ -49,8 +49,7 @@ def watcher_get_by_hosts() -> (int, dict): ) except (ApiResultError, ApiRequestError, Exception) as error: # pylint: disable=broad-except # 捕获ApiResultError, ApiRequestError和其他未知异常 - logger.warn("meet exception {} when request switch logs".format(error)) - return 0, {} + raise Exception("meet exception {} when request switch logs".format(error)) # 遍历切换队列,聚合故障机 switch_hosts, batch_small_id = {}, SWITCH_SMALL @@ -66,7 +65,9 @@ def watcher_get_by_hosts() -> (int, dict): ) ) # 忽略没有集群信息、或者多集群共用的情况 - cluster = query_cluster_by_hosts([switch_ip]) # return: [{},{}] + cluster = query_cluster_by_hosts_biz( + [switch_ip], int(switch_inst["app"]), int(switch_inst["cloud_id"]) + ) # return: [{},{}] if not cluster: logger.info("will ignore got none cluster info by ip {}".format(switch_ip)) continue diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_add_slave.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_add_slave.py index f5d31309d0..091db259fb 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_add_slave.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_add_slave.py @@ -61,7 +61,12 @@ def __init__(self, root_id: str, data: Optional[Dict]): @staticmethod def get_cluster_info(bk_biz_id, cluster_id): - cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id) + cluster = Cluster.objects.prefetch_related( + "proxyinstance_set", + "storageinstance_set", + "storageinstance_set__machine", + "storageinstance_set__as_ejector", + ).get(id=cluster_id, bk_biz_id=bk_biz_id) if cluster.cluster_type == ClusterType.TendisRedisInstance.value: """ 如果是主从版,根据cluster_id找到cluster,进而找到相同 master ip,所有master/slave实例 @@ -74,7 +79,9 @@ def get_cluster_info(bk_biz_id, cluster_id): ) ) master_ip = master_inst.machine.ip - cluster_masters = StorageInstance.objects.filter(machine__ip=master_ip) + cluster_masters = StorageInstance.objects.prefetch_related("as_ejector", "machine").filter( + machine__ip=master_ip + ) else: cluster_masters = cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value) diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_auotfix.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_auotfix.py index 52bb35f59f..e0f2002b05 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_auotfix.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_auotfix.py @@ -98,7 +98,12 @@ def get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict: 2. master 上的端口列表 3. 实例对应关系:{master:port:slave:port} """ - cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id) + cluster = Cluster.objects.prefetch_related( + "proxyinstance_set", + "storageinstance_set", + "storageinstance_set__machine", + "storageinstance_set__as_ejector", + ).get(id=cluster_id, bk_biz_id=bk_biz_id) master_ports, slave_ports = defaultdict(list), defaultdict(list) slave_master_map, slave_ins_map = defaultdict(), defaultdict() @@ -203,8 +208,7 @@ def start_redis_auotfix(self): cluster_kwargs = deepcopy(act_kwargs) cluster_info = self.get_cluster_info(self.data["bk_biz_id"], cluster_id) flow_data = self.data - for k, v in cluster_info.items(): - cluster_kwargs.cluster[k] = v + cluster_kwargs.cluster.update(cluster_info) cluster_kwargs.cluster["created_by"] = self.data["created_by"] flow_data["fix_info"] = cluster_fix redis_pipeline.add_act( diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_cmr.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_cmr.py index c0a50d1b06..6e1af3b3f4 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_cmr.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_cmr.py @@ -88,16 +88,23 @@ def __init__(self, root_id: str, data: Optional[Dict]): self.root_id = root_id self.data = data self.precheck_for_compelete_replace() + self.cluster_cache = {} - @staticmethod - def get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict: + def get_cluster_info(self, bk_biz_id: int, cluster_id: int) -> dict: """获取集群现有信息 1. master 对应 slave 机器 2. master 上的端口列表 3. 实例对应关系:{master:port:slave:port} """ - cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id) - + if self.cluster_cache.get(cluster_id): + return self.cluster_cache[cluster_id] + + cluster = Cluster.objects.prefetch_related( + "proxyinstance_set", + "storageinstance_set", + "storageinstance_set__machine", + "storageinstance_set__as_ejector", + ).get(id=cluster_id, bk_biz_id=bk_biz_id) master_ports, slave_ports = defaultdict(list), defaultdict(list) ins_pair_map, slave_ins_map = defaultdict(), defaultdict() master_slave_map, slave_master_map = defaultdict(), defaultdict() @@ -148,7 +155,7 @@ def get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict: proxy_port = cluster.proxyinstance_set.first().port proxy_ips = [proxy_obj.machine.ip for proxy_obj in cluster.proxyinstance_set.all()] - return { + cluster_info = { "immute_domain": cluster.immute_domain, "bk_biz_id": str(cluster.bk_biz_id), "bk_cloud_id": cluster.bk_cloud_id, @@ -167,6 +174,10 @@ def get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict: "backend_servers": servers, } + # 加到这一次的缓存里边 + self.cluster_cache[cluster_id] = cluster_info + return self.cluster_cache[cluster_id] + @staticmethod def __get_cluster_config(bk_biz_id: int, namespace: str, domain_name: str, db_version: str) -> Any: """ @@ -220,8 +231,7 @@ def complete_machine_replace(self): sync_type = SyncType.SYNC_SMS.value flow_data = self.data - for k, v in cluster_info.items(): - cluster_kwargs.cluster[k] = v + cluster_kwargs.cluster.update(cluster_info) cluster_kwargs.cluster["created_by"] = self.data["created_by"] flow_data["sync_type"] = sync_type flow_data["replace_info"] = cluster_replacement diff --git a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_mss.py b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_mss.py index 1d5e9b5e28..222d508108 100644 --- a/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_mss.py +++ b/dbm-ui/backend/flow/engine/bamboo/scene/redis/redis_cluster_scene_mss.py @@ -87,8 +87,12 @@ def __get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict: defaultdict(), defaultdict(list), ) - cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id) - + cluster = Cluster.objects.prefetch_related( + "proxyinstance_set", + "storageinstance_set", + "storageinstance_set__machine", + "storageinstance_set__as_ejector", + ).get(id=cluster_id, bk_biz_id=bk_biz_id) for master_obj in cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value): slave_obj = master_obj.as_ejector.get().receiver master_ports[master_obj.machine.ip].append(master_obj.port) @@ -151,8 +155,7 @@ def redis_ms_switch(self): cluster_info = self.__get_cluster_info(self.data["bk_biz_id"], cluster_id) flow_data = self.data - for k, v in cluster_info.items(): - cluster_kwargs.cluster[k] = v + cluster_kwargs.cluster.update(cluster_info) cluster_kwargs.cluster["created_by"] = self.data["created_by"] cluster_kwargs.cluster["switch_option"] = ms_switch["online_switch_type"] flow_data["switch_input"] = ms_switch diff --git a/dbm-ui/backend/flow/utils/redis/redis_db_meta.py b/dbm-ui/backend/flow/utils/redis/redis_db_meta.py index 60eb9febb0..f47bc2a4d4 100644 --- a/dbm-ui/backend/flow/utils/redis/redis_db_meta.py +++ b/dbm-ui/backend/flow/utils/redis/redis_db_meta.py @@ -683,7 +683,7 @@ def redis_replace_pair(self) -> bool: api.cluster.nosqlcomm.make_sync(cluster=cluster, tendisss=tendiss) return True - def instances_status_update(self) -> bool: + def instances_status_update(self, cluster_obj) -> bool: """ Redis/Proxy 实例修改实例状态 {"ip":"","ports":[],"status":11} """ @@ -694,19 +694,23 @@ def instances_status_update(self) -> bool: machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_update_ports"] ).update(status=self.cluster["meta_update_status"]) else: - StorageInstance.objects.filter( - machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_update_ports"] - ).update(status=self.cluster["meta_update_status"]) + # 支持互切的 不修改状态,保持 running + if cluster_obj.cluster_type not in [ + ClusterType.TendisPredixyRedisCluster.value, + ClusterType.TendisPredixyTendisplusCluster.value, + ]: + StorageInstance.objects.filter( + machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_update_ports"] + ).update(status=self.cluster["meta_update_status"]) return True def instances_failover_4_scene(self) -> bool: """1.修改状态、2.切换角色""" - self.instances_status_update() # 获取cluster - cluster_id = self.cluster["cluster_id"] - cluster = Cluster.objects.get(id=cluster_id) + cluster_obj = Cluster.objects.get(id=int(self.cluster["cluster_id"])) + self.instances_status_update(cluster_obj) with atomic(): - cc_manage, bk_host_ids = CcManage(self.cluster["bk_biz_id"], cluster.cluster_type), [] + cc_manage, bk_host_ids = CcManage(int(self.cluster["bk_biz_id"]), cluster_obj.cluster_type), [] for port in self.cluster["meta_update_ports"]: old_master = StorageInstance.objects.get( machine__ip=self.cluster["meta_update_ip"], @@ -839,7 +843,7 @@ def redis_role_swap_4_scene(self) -> bool: cluster_id = self.cluster["cluster_id"] cluster = Cluster.objects.get(id=cluster_id) - cc_manage = CcManage(self.cluster["bk_biz_id"], cluster.cluster_type) + cc_manage = CcManage(int(self.cluster["bk_biz_id"]), cluster.cluster_type) with atomic(): for ins in self.cluster["role_swap_ins"]: ins1 = StorageInstance.objects.get(