Skip to content

Commit

Permalink
feat(redis): 单据优化(自愈、整机替换) TencentBlueKing#7346
Browse files Browse the repository at this point in the history
  • Loading branch information
xiepaup authored and iSecloud committed Oct 16, 2024
1 parent 06a9069 commit a327b26
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 39 deletions.
11 changes: 11 additions & 0 deletions dbm-ui/backend/db_meta/api/cluster/apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ def domain_exists(domains: List[str]) -> Dict[str, bool]:
return res


def query_cluster_by_hosts_biz(hosts: List, biz_id: int, cloud_id: int):
"""
根据 bk_biz_id , bk_cloud_id 在过滤下
"""
clusters = []
for cluster in query_cluster_by_hosts(hosts):
if cluster["bk_biz_id"] == biz_id and cluster["bk_cloud_id"] == cloud_id:
clusters.append(cluster)
return clusters


def query_cluster_by_hosts(hosts: List):
"""根据提供的IP 查询集群信息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ def redis_clusternodes_update_record():
oper_type = ""
for oper in opers:
if oper["ticket_type"] and oper["ticket_type"] in [
TicketType.REDIS_CLUSTER_AUTOFIX.value,
TicketType.REDIS_SCALE_UPDOWN.value,
TicketType.REDIS_CLUSTER_CUTOFF.value,
TicketType.REDIS_CLUSTER_INSTANCE_SHUTDOWN.value,
Expand Down Expand Up @@ -369,9 +370,16 @@ def update_cc_info(self, cluster: Cluster):
# 发起自愈流程
def start_autofix_flow(self, cluster: Cluster):
# 如果存在集群中某个slave机器,所有实例均变成了 unrunning 状态,则对其发起自愈流程
slave_group_by_ip = defaultdict(list)
for slave_obj in cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_SLAVE.value):
slave_group_by_ip[slave_obj.machine.ip].append(slave_obj)
slave_group_by_ip, nodes_by_ip = defaultdict(list), defaultdict(list)
for node_obj in cluster.storageinstance_set.all():
nodes_by_ip[node_obj.machine.ip].append(node_obj)
for ip, objs in slave_group_by_ip.items():
for obj in objs: # 只要有一个实例角色是Slave就放进来
if obj.instance_role == InstanceRole.REDIS_SLAVE.value:
slave_group_by_ip[ip].append(objs)
break
# for slave_obj in cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_SLAVE.value):
# slave_group_by_ip[slave_obj.machine.ip].append(slave_obj)
fault_machines = []
for ip, slave_objs in slave_group_by_ip.items():
if all([slave_obj.status == InstanceStatus.UNAVAILABLE.value for slave_obj in slave_objs]):
Expand Down
38 changes: 31 additions & 7 deletions dbm-ui/backend/db_services/dbresource/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ class TendisPlusSpecFilter(RedisSpecFilter):
"""TendisPlus集群规格过滤器"""

# 最佳容量管理大小 300G
OPTIMAL_MANAGE_CAPACITY = 300
SINGLE_SHARD_SIZE = 300
# 单机 1 , 2,4 分片 可选
SINGLE_SHARD_NUMBS = [1, 2, 4]

def calc_machine_pair(self):
"""计算每种规格所需的机器组数,TendisPlus至少需要三组"""
Expand All @@ -170,12 +172,34 @@ def calc_machine_pair(self):
spec["cluster_capacity"] = spec["machine_pair"] * spec["capacity"]

def calc_cluster_shard_num(self):
# 先进行排序
self.specs.sort(key=lambda x: (x["capacity"]))

# 选取合适的规格
spec_idx, spec_cnt, candidate_specs = 0, len(self.specs), []
# 取相近的规格
for spec in self.specs:
spec["cluster_shard_num"] = max(3, math.ceil(self.capacity / self.OPTIMAL_MANAGE_CAPACITY))
# 将分片数上取整为机器组数的倍数
spec["cluster_shard_num"] = (
math.ceil(spec["cluster_shard_num"] / spec["machine_pair"]) * spec["machine_pair"]
)
if self.capacity <= spec["capacity"]:
candidate_specs.append(spec)
if spec_idx >= 1:
candidate_specs.append(self.specs[spec_idx - 1])
break
spec_idx += 1

# 最后取两个规格
if self.capacity > self.specs[spec_cnt - 1]["capacity"]:
candidate_specs.append(self.specs[spec_cnt - 1])
if spec_cnt > 2:
candidate_specs.append(self.specs[spec_cnt - 2])

aviable_specs: List[Dict[str, Any]] = []
for spec in candidate_specs:
shard = max(1, math.ceil(spec["capacity"] / self.SINGLE_SHARD_SIZE) - 1)
if shard > 2:
shard = int(shard / 2) * 2
spec["cluster_shard_num"] = spec["machine_pair"] * shard
aviable_specs.append(spec)
self.specs = aviable_specs

def custom_filter(self):
super().custom_filter()
Expand Down Expand Up @@ -257,7 +281,7 @@ class TendisSSDSpecFilter(RedisSpecFilter):
"""TendisSSD集群规格过滤器"""

# 单实例最大容量 50G
SINGLE_MAX_CAPACITY = 50
SINGLE_MAX_CAPACITY = 100
MACHINE_PAIR_SORT = True

def calc_cluster_shard_num(self):
Expand Down
9 changes: 5 additions & 4 deletions dbm-ui/backend/db_services/redis/autofix/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

from backend.components.hadb.client import HADBApi
from backend.constants import DEFAULT_BK_CLOUD_ID
from backend.db_meta.api.cluster.apis import query_cluster_by_hosts
from backend.db_meta.api.cluster.apis import query_cluster_by_hosts_biz
from backend.db_meta.enums import ClusterType
from backend.exceptions import ApiRequestError, ApiResultError
from backend.utils.time import datetime2timestamp
Expand Down Expand Up @@ -49,8 +49,7 @@ def watcher_get_by_hosts() -> (int, dict):
)
except (ApiResultError, ApiRequestError, Exception) as error: # pylint: disable=broad-except
# 捕获ApiResultError, ApiRequestError和其他未知异常
logger.warn("meet exception {} when request switch logs".format(error))
return 0, {}
raise Exception("meet exception {} when request switch logs".format(error))

# 遍历切换队列,聚合故障机
switch_hosts, batch_small_id = {}, SWITCH_SMALL
Expand All @@ -66,7 +65,9 @@ def watcher_get_by_hosts() -> (int, dict):
)
)
# 忽略没有集群信息、或者多集群共用的情况
cluster = query_cluster_by_hosts([switch_ip]) # return: [{},{}]
cluster = query_cluster_by_hosts_biz(
[switch_ip], int(switch_inst["app"]), int(switch_inst["cloud_id"])
) # return: [{},{}]
if not cluster:
logger.info("will ignore got none cluster info by ip {}".format(switch_ip))
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ def __init__(self, root_id: str, data: Optional[Dict]):

@staticmethod
def get_cluster_info(bk_biz_id, cluster_id):
cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id)
cluster = Cluster.objects.prefetch_related(
"proxyinstance_set",
"storageinstance_set",
"storageinstance_set__machine",
"storageinstance_set__as_ejector",
).get(id=cluster_id, bk_biz_id=bk_biz_id)
if cluster.cluster_type == ClusterType.TendisRedisInstance.value:
"""
如果是主从版,根据cluster_id找到cluster,进而找到相同 master ip,所有master/slave实例
Expand All @@ -74,7 +79,9 @@ def get_cluster_info(bk_biz_id, cluster_id):
)
)
master_ip = master_inst.machine.ip
cluster_masters = StorageInstance.objects.filter(machine__ip=master_ip)
cluster_masters = StorageInstance.objects.prefetch_related("as_ejector", "machine").filter(
machine__ip=master_ip
)
else:
cluster_masters = cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,12 @@ def get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict:
2. master 上的端口列表
3. 实例对应关系:{master:port:slave:port}
"""
cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id)
cluster = Cluster.objects.prefetch_related(
"proxyinstance_set",
"storageinstance_set",
"storageinstance_set__machine",
"storageinstance_set__as_ejector",
).get(id=cluster_id, bk_biz_id=bk_biz_id)

master_ports, slave_ports = defaultdict(list), defaultdict(list)
slave_master_map, slave_ins_map = defaultdict(), defaultdict()
Expand Down Expand Up @@ -203,8 +208,7 @@ def start_redis_auotfix(self):
cluster_kwargs = deepcopy(act_kwargs)
cluster_info = self.get_cluster_info(self.data["bk_biz_id"], cluster_id)
flow_data = self.data
for k, v in cluster_info.items():
cluster_kwargs.cluster[k] = v
cluster_kwargs.cluster.update(cluster_info)
cluster_kwargs.cluster["created_by"] = self.data["created_by"]
flow_data["fix_info"] = cluster_fix
redis_pipeline.add_act(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,23 @@ def __init__(self, root_id: str, data: Optional[Dict]):
self.root_id = root_id
self.data = data
self.precheck_for_compelete_replace()
self.cluster_cache = {}

@staticmethod
def get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict:
def get_cluster_info(self, bk_biz_id: int, cluster_id: int) -> dict:
"""获取集群现有信息
1. master 对应 slave 机器
2. master 上的端口列表
3. 实例对应关系:{master:port:slave:port}
"""
cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id)

if self.cluster_cache.get(cluster_id):
return self.cluster_cache[cluster_id]

cluster = Cluster.objects.prefetch_related(
"proxyinstance_set",
"storageinstance_set",
"storageinstance_set__machine",
"storageinstance_set__as_ejector",
).get(id=cluster_id, bk_biz_id=bk_biz_id)
master_ports, slave_ports = defaultdict(list), defaultdict(list)
ins_pair_map, slave_ins_map = defaultdict(), defaultdict()
master_slave_map, slave_master_map = defaultdict(), defaultdict()
Expand Down Expand Up @@ -148,7 +155,7 @@ def get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict:
proxy_port = cluster.proxyinstance_set.first().port
proxy_ips = [proxy_obj.machine.ip for proxy_obj in cluster.proxyinstance_set.all()]

return {
cluster_info = {
"immute_domain": cluster.immute_domain,
"bk_biz_id": str(cluster.bk_biz_id),
"bk_cloud_id": cluster.bk_cloud_id,
Expand All @@ -167,6 +174,10 @@ def get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict:
"backend_servers": servers,
}

# 加到这一次的缓存里边
self.cluster_cache[cluster_id] = cluster_info
return self.cluster_cache[cluster_id]

@staticmethod
def __get_cluster_config(bk_biz_id: int, namespace: str, domain_name: str, db_version: str) -> Any:
"""
Expand Down Expand Up @@ -220,8 +231,7 @@ def complete_machine_replace(self):
sync_type = SyncType.SYNC_SMS.value

flow_data = self.data
for k, v in cluster_info.items():
cluster_kwargs.cluster[k] = v
cluster_kwargs.cluster.update(cluster_info)
cluster_kwargs.cluster["created_by"] = self.data["created_by"]
flow_data["sync_type"] = sync_type
flow_data["replace_info"] = cluster_replacement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,12 @@ def __get_cluster_info(bk_biz_id: int, cluster_id: int) -> dict:
defaultdict(),
defaultdict(list),
)
cluster = Cluster.objects.get(id=cluster_id, bk_biz_id=bk_biz_id)

cluster = Cluster.objects.prefetch_related(
"proxyinstance_set",
"storageinstance_set",
"storageinstance_set__machine",
"storageinstance_set__as_ejector",
).get(id=cluster_id, bk_biz_id=bk_biz_id)
for master_obj in cluster.storageinstance_set.filter(instance_role=InstanceRole.REDIS_MASTER.value):
slave_obj = master_obj.as_ejector.get().receiver
master_ports[master_obj.machine.ip].append(master_obj.port)
Expand Down Expand Up @@ -151,8 +155,7 @@ def redis_ms_switch(self):
cluster_info = self.__get_cluster_info(self.data["bk_biz_id"], cluster_id)

flow_data = self.data
for k, v in cluster_info.items():
cluster_kwargs.cluster[k] = v
cluster_kwargs.cluster.update(cluster_info)
cluster_kwargs.cluster["created_by"] = self.data["created_by"]
cluster_kwargs.cluster["switch_option"] = ms_switch["online_switch_type"]
flow_data["switch_input"] = ms_switch
Expand Down
22 changes: 13 additions & 9 deletions dbm-ui/backend/flow/utils/redis/redis_db_meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -683,7 +683,7 @@ def redis_replace_pair(self) -> bool:
api.cluster.nosqlcomm.make_sync(cluster=cluster, tendisss=tendiss)
return True

def instances_status_update(self) -> bool:
def instances_status_update(self, cluster_obj) -> bool:
"""
Redis/Proxy 实例修改实例状态 {"ip":"","ports":[],"status":11}
"""
Expand All @@ -694,19 +694,23 @@ def instances_status_update(self) -> bool:
machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_update_ports"]
).update(status=self.cluster["meta_update_status"])
else:
StorageInstance.objects.filter(
machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_update_ports"]
).update(status=self.cluster["meta_update_status"])
# 支持互切的 不修改状态,保持 running
if cluster_obj.cluster_type not in [
ClusterType.TendisPredixyRedisCluster.value,
ClusterType.TendisPredixyTendisplusCluster.value,
]:
StorageInstance.objects.filter(
machine__ip=self.cluster["meta_update_ip"], port__in=self.cluster["meta_update_ports"]
).update(status=self.cluster["meta_update_status"])
return True

def instances_failover_4_scene(self) -> bool:
"""1.修改状态、2.切换角色"""
self.instances_status_update()
# 获取cluster
cluster_id = self.cluster["cluster_id"]
cluster = Cluster.objects.get(id=cluster_id)
cluster_obj = Cluster.objects.get(id=int(self.cluster["cluster_id"]))
self.instances_status_update(cluster_obj)
with atomic():
cc_manage, bk_host_ids = CcManage(self.cluster["bk_biz_id"], cluster.cluster_type), []
cc_manage, bk_host_ids = CcManage(int(self.cluster["bk_biz_id"]), cluster_obj.cluster_type), []
for port in self.cluster["meta_update_ports"]:
old_master = StorageInstance.objects.get(
machine__ip=self.cluster["meta_update_ip"],
Expand Down Expand Up @@ -839,7 +843,7 @@ def redis_role_swap_4_scene(self) -> bool:
cluster_id = self.cluster["cluster_id"]
cluster = Cluster.objects.get(id=cluster_id)

cc_manage = CcManage(self.cluster["bk_biz_id"], cluster.cluster_type)
cc_manage = CcManage(int(self.cluster["bk_biz_id"]), cluster.cluster_type)
with atomic():
for ins in self.cluster["role_swap_ins"]:
ins1 = StorageInstance.objects.get(
Expand Down

0 comments on commit a327b26

Please sign in to comment.