From 36667911db5704173740dc231e46a97f6b5c0bd3 Mon Sep 17 00:00:00 2001 From: dcd <1151627903@qq.com> Date: Mon, 20 May 2024 18:15:26 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E5=90=8C=E6=AD=A5=E4=B8=BB=E6=9C=BA?= =?UTF-8?q?=E5=B7=AE=E9=87=8F=E9=80=BB=E8=BE=91=E4=BF=AE=E5=A4=8D=20(close?= =?UTF-8?q?d=20#2167)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/node_man/constants.py | 6 + .../node_man/periodic_tasks/sync_cmdb_host.py | 104 +++++++++++++++++- .../test_sync_cmdb_host.py | 45 +++++++- 3 files changed, 149 insertions(+), 6 deletions(-) diff --git a/apps/node_man/constants.py b/apps/node_man/constants.py index 4e4013922..dd7a9c724 100644 --- a/apps/node_man/constants.py +++ b/apps/node_man/constants.py @@ -66,6 +66,7 @@ class TimeUnit: SYNC_CMDB_BIZ_TOPO_TASK_INTERVAL = 1 * TimeUnit.DAY SYNC_CMDB_HOST_INTERVAL = 1 * TimeUnit.DAY +CLEAR_NEED_DELETE_HOST_IDS_INTERVAL = 1 * TimeUnit.MINUTE ######################################################################################################## # 第三方系统相关配置 @@ -588,6 +589,11 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]: LIST_SERVICE_INSTANCE_DETAIL_LIMIT = 1000 LIST_SERVICE_INSTANCE_DETAIL_INTERVAL = 0.2 +# redis键名模板 +REDIS_NEED_DELETE_HOST_IDS_KEY_TPL = f"{settings.APP_CODE}:node_man:need_delete_host_ids:list" +# 从redis中读取bk_host_ids最大长度 +MAX_HOST_IDS_LENGTH = 5000 + class ProxyFileFromType(Enum): """Proxy上传文件名称来源""" diff --git a/apps/node_man/periodic_tasks/sync_cmdb_host.py b/apps/node_man/periodic_tasks/sync_cmdb_host.py index ef0292a45..96ace2c75 100644 --- a/apps/node_man/periodic_tasks/sync_cmdb_host.py +++ b/apps/node_man/periodic_tasks/sync_cmdb_host.py @@ -17,7 +17,9 @@ from django.db import transaction from apps.backend.celery import app +from apps.backend.utils.redis import REDIS_INST from apps.component.esbclient import client_v2 +from apps.core.concurrent import controller from apps.core.gray.tools import GrayTools from apps.exceptions import ComponentCallError from apps.node_man import constants, models, tools @@ -28,7 +30,7 @@ query_bk_biz_ids, ) from apps.utils.batch_request import batch_request -from apps.utils.concurrent import batch_call +from apps.utils.concurrent import batch_call, batch_call_serial from common.log import logger @@ -434,6 +436,8 @@ def sync_cmdb_host(bk_biz_id=None, task_id=None): """ 同步cmdb主机 """ + from apps.backend.views import LPUSH_AND_EXPIRE_FUNC + logger.info(f"[sync_cmdb_host] start: task_id -> {task_id}, bk_biz_id -> {bk_biz_id}") ap_map_config: SyncHostApMapConfig = get_sync_host_ap_map_config() @@ -466,10 +470,89 @@ def sync_cmdb_host(bk_biz_id=None, task_id=None): if need_delete_host_ids: models.Host.objects.filter(bk_host_id__in=need_delete_host_ids).delete() models.IdentityData.objects.filter(bk_host_id__in=need_delete_host_ids).delete() - models.ProcessStatus.objects.filter(bk_host_id__in=need_delete_host_ids).delete() - logger.info(f"[sync_cmdb_host] task_id -> {task_id}, need_delete_host_ids -> {need_delete_host_ids}") + if not LPUSH_AND_EXPIRE_FUNC: + models.ProcessStatus.objects.filter(bk_host_id__in=need_delete_host_ids).delete() + logger.info( + "Enterprise binary: [sync_cmdb_host] task_id -> %s, need_delete_host_ids -> %s, num -> %s" + % ( + task_id, + need_delete_host_ids, + len(need_delete_host_ids), + ) + ) + else: + name = constants.REDIS_NEED_DELETE_HOST_IDS_KEY_TPL + LPUSH_AND_EXPIRE_FUNC(keys=[name], args=[constants.TimeUnit.DAY] + list(need_delete_host_ids)) + logger.info( + "[sync_cmdb_host] task_id -> %s, store need_delete_host_ids -> %s into redis, num -> %s" + % ( + task_id, + need_delete_host_ids, + len(need_delete_host_ids), + ) + ) + + logger.info("[sync_cmdb_host] complete: task_id -> %s, bk_biz_ids -> %s" % (task_id, bk_biz_ids)) - logger.info(f"[sync_cmdb_host] complete: task_id -> {task_id}, bk_biz_ids -> {bk_biz_ids}") + +def clear_need_delete_host_ids(task_id=None): + name: str = constants.REDIS_NEED_DELETE_HOST_IDS_KEY_TPL + # 计算出要从redis取数据的长度,限制最大长度 + report_data_len: int = min(REDIS_INST.llen(name), constants.MAX_HOST_IDS_LENGTH) + # 从redis中取出对应长度的数据 + report_data: typing.List[str] = REDIS_INST.lrange(name, -report_data_len, -1) + # 使用ltrim保留剩下的,可以保证redis中新push的值不会丢失 + REDIS_INST.ltrim(name, 0, -report_data_len - 1) + need_delete_host_ids: typing.Set[int] = set([int(data) for data in report_data]) + if need_delete_host_ids: + host_ids = list(need_delete_host_ids) + query_cmdb_and_handle_need_delete_host_ids(host_ids=host_ids, task_id=task_id) + + logger.info("[clear_need_delete_host_ids] complete: task_id -> %s" % (task_id,)) + + +@controller.ConcurrentController( + data_list_name="host_ids", + batch_call_func=batch_call_serial, + get_config_dict_func=lambda: {"limit": constants.QUERY_CMDB_LIMIT}, +) +def query_cmdb_and_handle_need_delete_host_ids(host_ids: typing.List[int], task_id: str): + """ + 查询CMDB并处理需要删掉对应主机ID的ProcessStatus记录 + :param host_ids: 主机ID列表 + :param task_id: 任务ID + """ + query_hosts_params: typing.Dict[str, typing.Any] = { + "page": {"start": 0, "limit": constants.QUERY_CMDB_LIMIT}, + "fields": [constants.CC_HOST_FIELDS[0]], + "host_property_filter": { + "condition": "AND", + "rules": [{"field": "bk_host_id", "operator": "in", "value": host_ids}], + }, + } + cmdb_host_infos: typing.List[typing.Dict[str, int]] = client_v2.cc.list_hosts_without_biz(query_hosts_params)[ + "info" + ] + bk_host_ids_in_cmdb: typing.List[int] = [cmdb_host_info.get("bk_host_id") for cmdb_host_info in cmdb_host_infos] + logger.info( + "[find_hosts_in_cmdb] task_id -> %s, bk_host_ids -> %s , num -> %s" + % ( + task_id, + bk_host_ids_in_cmdb, + len(bk_host_ids_in_cmdb), + ) + ) + final_delete_host_ids = set(host_ids) - set(bk_host_ids_in_cmdb) + models.ProcessStatus.objects.filter(bk_host_id__in=final_delete_host_ids).delete() + logger.info( + "[clear_final_delete_host_ids] task_id -> %s, final_delete_host_ids -> %s, num -> %s" + % ( + task_id, + final_delete_host_ids, + len(final_delete_host_ids), + ) + ) + return [] @periodic_task( @@ -492,3 +575,16 @@ def sync_cmdb_host_task(bk_biz_id=None): """ task_id = sync_cmdb_host_task.request.id sync_cmdb_host(bk_biz_id, task_id) + + +@periodic_task( + queue="default", + options={"queue": "default"}, + run_every=constants.CLEAR_NEED_DELETE_HOST_IDS_INTERVAL, +) +def clear_need_delete_host_ids_task(): + """ + 被动周期删除cmdb不存在的主机 + """ + task_id = clear_need_delete_host_ids_task.request.id + clear_need_delete_host_ids(task_id) diff --git a/apps/node_man/tests/test_pericdic_tasks/test_sync_cmdb_host.py b/apps/node_man/tests/test_pericdic_tasks/test_sync_cmdb_host.py index 3385eb8ba..03b279dad 100644 --- a/apps/node_man/tests/test_pericdic_tasks/test_sync_cmdb_host.py +++ b/apps/node_man/tests/test_pericdic_tasks/test_sync_cmdb_host.py @@ -11,9 +11,14 @@ import copy from unittest.mock import patch +from apps.backend.views import LPUSH_AND_EXPIRE_FUNC +from apps.mock_data.common_unit.host import PROCESS_STATUS_MODEL_DATA from apps.node_man import constants -from apps.node_man.models import Host -from apps.node_man.periodic_tasks.sync_cmdb_host import sync_cmdb_host_periodic_task +from apps.node_man.models import Host, ProcessStatus +from apps.node_man.periodic_tasks.sync_cmdb_host import ( + clear_need_delete_host_ids_task, + sync_cmdb_host_periodic_task, +) from apps.utils.unittest.testcase import CustomBaseTestCase from .mock_data import MOCK_BK_BIZ_ID, MOCK_HOST, MOCK_HOST_NUM @@ -56,3 +61,39 @@ def test_sync_cmdb_host(self): # 验证主机信息是否删除成功 self.assertEqual(Host.objects.filter(bk_host_id=-1).count(), 0) + + +class TestClearNeedDeleteHostIds(CustomBaseTestCase): + @staticmethod + def init_db(): + proc_status_data = copy.deepcopy(PROCESS_STATUS_MODEL_DATA) + proc_status_list = [] + for bk_host_id in range(1, 6): + proc_status_data["bk_host_id"] = bk_host_id + proc_status_list.append(ProcessStatus(**proc_status_data)) + + ProcessStatus.objects.bulk_create(proc_status_list) + + need_delete_host_ids = range(1, 6) + name = constants.REDIS_NEED_DELETE_HOST_IDS_KEY_TPL + LPUSH_AND_EXPIRE_FUNC(keys=[name], args=[constants.TimeUnit.DAY] + list(need_delete_host_ids)) + + @staticmethod + def list_hosts_without_biz(*args, **kwargs): + return { + "count": 1, + "info": [ + {"bk_host_id": 1}, + ], + } + + def start_patch(self): + MockClient.cc.list_hosts_without_biz = self.list_hosts_without_biz + + @patch("apps.node_man.periodic_tasks.sync_cmdb_host.client_v2", MockClient) + def test_clear_need_delete_host_ids(self): + self.init_db() + self.start_patch() + clear_need_delete_host_ids_task() + # 验证ProcessStatus中信息是否删除成功 + self.assertEqual(ProcessStatus.objects.count(), 1)