diff --git a/apps/backend/components/collections/agent_new/base.py b/apps/backend/components/collections/agent_new/base.py index e758be54d..31476bc03 100644 --- a/apps/backend/components/collections/agent_new/base.py +++ b/apps/backend/components/collections/agent_new/base.py @@ -28,7 +28,6 @@ from apps.backend.subscription.steps.agent_adapter.adapter import AgentStepAdapter from apps.node_man import constants, models from apps.node_man.exceptions import AliveProxyNotExistsError -from apps.node_man.periodic_tasks.sync_cmdb_host import bulk_differential_sync_biz_hosts from apps.prometheus import metrics from apps.prometheus.helper import SetupObserve @@ -78,22 +77,6 @@ def get_common_data(cls, data): # 引入背景:在聚合流程中,类似 host.agent_config, host.ap 的逻辑会引发 n + 1 DB查询问题 host_id__ap_map: Dict[int, models.AccessPoint] = {} - expected_bk_host_ids_gby_bk_biz_id: Dict[int, List[int]] = defaultdict(list) - if len(common_data.host_id_obj_map) < len(common_data.bk_host_ids): - deleted_host_ids = set(common_data.bk_host_ids) - set(common_data.host_id_obj_map.keys()) - for deleted_host_id in deleted_host_ids: - bk_biz_id = common_data.sub_inst_id__sub_inst_obj_map[ - common_data.host_id__sub_inst_id_map[deleted_host_id] - ].instance_info["host"]["bk_biz_id"] - expected_bk_host_ids_gby_bk_biz_id[bk_biz_id].append(deleted_host_id) - - bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id=expected_bk_host_ids_gby_bk_biz_id) - - host_id__ap_map_with_pullback: Dict[int, models.AccessPoint] = models.Host.host_id_obj_map( - bk_host_id__in=deleted_host_ids - ) - common_data.host_id_obj_map.update(host_id__ap_map_with_pullback) - for bk_host_id in common_data.bk_host_ids: host = common_data.host_id_obj_map.get(bk_host_id) if not host: @@ -368,19 +351,6 @@ def get_host_ap(self, common_data: AgentCommonData, host: models.Host) -> Option return host_ap - def get_host(self, common_data: AgentCommonData, host_id: int) -> Optional[models.Host]: - host_id_obj_map = common_data.host_id_obj_map - host = host_id_obj_map.get(host_id) - if not host: - code = self.__class__.__name__ - logger.info(f"[task_engine][service_run_exc_handler:{code}] exc -> GetHostError, host_id -> {host_id}") - self.move_insts_to_failed( - [common_data.host_id__sub_inst_id_map[host_id]], - _("主机不存在或未同步"), - ) - - return host - # 根据 JOB 的插件额外封装一层,保证后续基于 Agent 增加定制化功能的可扩展性 diff --git a/apps/backend/components/collections/base.py b/apps/backend/components/collections/base.py index 9c72bf8c7..8cb1abf86 100644 --- a/apps/backend/components/collections/base.py +++ b/apps/backend/components/collections/base.py @@ -12,6 +12,7 @@ import os import traceback import typing +from collections import defaultdict from dataclasses import dataclass from typing import ( Any, @@ -34,10 +35,12 @@ from apps.adapters.api.gse import GseApiBaseHelper, get_gse_api_helper from apps.backend.api.constants import POLLING_TIMEOUT +from apps.backend.constants import ActionNameType from apps.backend.subscription import errors from apps.core.files.storage import get_storage from apps.exceptions import parse_exception from apps.node_man import constants, models +from apps.node_man.periodic_tasks.sync_cmdb_host import bulk_differential_sync_biz_hosts from apps.prometheus import metrics from apps.prometheus.helper import SetupObserve from apps.utils import cache, time_handler, translation @@ -366,6 +369,31 @@ def get_job_meta(data) -> Dict[str, Any]: scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value) return {"bk_biz_id": scope_id, "bk_scope_type": scope_type, "bk_scope_id": scope_id} + @classmethod + def handle_deleted_host_ids( + cls, + deleted_host_ids: Set[int], + host_id_obj_map: Dict[int, models.Host], + sub_inst_id__sub_inst_obj_map: Dict[int, Any], + host_id__sub_inst_id_map: Dict[int, int], + ) -> None: + expected_bk_host_ids_gby_bk_biz_id: Dict[int, List[int]] = defaultdict(list) + for deleted_host_id in deleted_host_ids: + bk_biz_id = sub_inst_id__sub_inst_obj_map[host_id__sub_inst_id_map[deleted_host_id]].instance_info["host"][ + "bk_biz_id" + ] + expected_bk_host_ids_gby_bk_biz_id[bk_biz_id].append(deleted_host_id) + + bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id=expected_bk_host_ids_gby_bk_biz_id) + + host_id__obj_map_with_pullback: Dict[int, models.Host] = models.Host.host_id_obj_map( + bk_host_id__in=deleted_host_ids + ) + + pullback_host_ids: List[int] = [host_obj.bk_host_id for host_obj in host_id__obj_map_with_pullback.values()] + logger.info("[handle_deleted_host_ids][engine] pullback host_ids -> %s", pullback_host_ids) + host_id_obj_map.update(host_id__obj_map_with_pullback) + @classmethod def get_common_data(cls, data): """ @@ -403,6 +431,21 @@ def get_common_data(cls, data): host_id_obj_map: Dict[int, models.Host] = models.Host.host_id_obj_map(bk_host_id__in=bk_host_ids) + steps: List[Dict[str, Any]] = cls.get_meta(data)["STEPS"] + for step in steps: + action = step.get("action") + + # 【插件安装】或【非插件操作】做主机差量同步 + if action and (action == ActionNameType.MAIN_INSTALL_PLUGIN or "AGENT" in action): + if len(host_id_obj_map) < len(bk_host_ids): + cls.handle_deleted_host_ids( + deleted_host_ids=set(bk_host_ids) - set(host_id_obj_map.keys()), + host_id_obj_map=host_id_obj_map, + sub_inst_id__sub_inst_obj_map=sub_inst_id__sub_inst_obj_map, + host_id__sub_inst_id_map=host_id__sub_inst_id_map, + ) + break + ap_id_obj_map = models.AccessPoint.ap_id_obj_map() return CommonData( bk_host_ids=bk_host_ids, @@ -578,3 +621,16 @@ def outputs_format(self): required=True, ) ] + + def get_host(self, common_data: CommonData, host_id: int) -> Optional[models.Host]: + host_id_obj_map = common_data.host_id_obj_map + host = host_id_obj_map.get(host_id) + if not host: + code = self.__class__.__name__ + logger.info(f"[task_engine][service_run_exc_handler:{code}] exc -> GetHostError, host_id -> {host_id}") + self.move_insts_to_failed( + [common_data.host_id__sub_inst_id_map[host_id]], + _("主机不存在或未同步"), + ) + + return host diff --git a/apps/backend/components/collections/job.py b/apps/backend/components/collections/job.py index 48c7fddaa..d576f1fa4 100644 --- a/apps/backend/components/collections/job.py +++ b/apps/backend/components/collections/job.py @@ -432,7 +432,9 @@ def _execute(self, data, parent_data, common_data: CommonData): multi_job_params_map: Dict[str, Dict[str, Any]] = defaultdict(lambda: defaultdict(list)) for sub_inst in common_data.subscription_instances: bk_host_id = sub_inst.instance_info["host"]["bk_host_id"] - host_obj = common_data.host_id_obj_map[bk_host_id] + host_obj: Optional[models.Host] = self.get_host(common_data, bk_host_id) + if not host_obj: + continue script_param = self.get_script_param(data=data, common_data=common_data, host=host_obj) script_content = self.get_script_content(data=data, common_data=common_data, host=host_obj) target_servers = self.get_target_servers(data=data, common_data=common_data, host=host_obj) @@ -502,7 +504,10 @@ def _execute(self, data, parent_data, common_data: CommonData): multi_job_params_map: Dict[str, Dict[str, Any]] = {} for sub_inst in common_data.subscription_instances: bk_host_id = sub_inst.instance_info["host"]["bk_host_id"] - host_obj = common_data.host_id_obj_map[bk_host_id] + host_obj: Optional[models.Host] = self.get_host(common_data, bk_host_id) + if not host_obj: + continue + target_servers = self.get_target_servers(data=data, common_data=common_data, host=host_obj) # 所有的逻辑处理都基于 host_obj, 仅执行目标使用 target_host @@ -601,7 +606,9 @@ def _execute(self, data, parent_data, common_data: CommonData): multi_job_params_map: Dict[str, Dict[str, Any]] = {} for sub_inst in common_data.subscription_instances: bk_host_id = sub_inst.instance_info["host"]["bk_host_id"] - host_obj = common_data.host_id_obj_map[bk_host_id] + host_obj: Optional[models.Host] = self.get_host(common_data, bk_host_id) + if not host_obj: + continue config_info_list = self.get_config_info_list(data=data, common_data=common_data, host=host_obj) file_target_path = self.get_file_target_path(data=data, common_data=common_data, host=host_obj) diff --git a/apps/backend/tests/components/collections/agent_new/test_install.py b/apps/backend/tests/components/collections/agent_new/test_install.py index 77f1e1abf..036cdb995 100644 --- a/apps/backend/tests/components/collections/agent_new/test_install.py +++ b/apps/backend/tests/components/collections/agent_new/test_install.py @@ -1067,6 +1067,23 @@ def tearDown(self) -> None: class KeyErrorWithPullAllBackTestCase(InstallBaseTestCase): + def setUp(self) -> None: + super().setUp() + self.common_inputs["meta"] = { + "GSE_VERSION": "V2", + "STEPS": [ + { + "action": "REINSTALL_AGENT_2", + "extra_info": {}, + "id": "agent", + "index": 0, + "node_name": "[agent] 重装", + "pipeline_id": "xxxxxxxxxxxxxxxxxxxxxxx", + "type": "AGENT", + } + ], + } + @classmethod def get_default_case_name(cls) -> str: return "测试主机误删除全部拉回来的场景" @@ -1081,16 +1098,11 @@ def _do_case_assert(self, service, method, assertion, no, name, args=None, kwarg super()._do_case_assert(service, method, assertion, no, name, args, kwargs) -class KeyErrorWithPullPartialBackTestCase(InstallBaseTestCase): +class KeyErrorWithPullPartialBackTestCase(KeyErrorWithPullAllBackTestCase): @classmethod def get_default_case_name(cls) -> str: return "测试主机拉回来部分,host_id_obj_map获取host_obj过程中出现KeyError场景" - @classmethod - def setup_obj_factory(cls): - """设置 obj_factory""" - cls.obj_factory.init_host_num = 20 - def _do_case_assert(self, service, method, assertion, no, name, args=None, kwargs=None): models.Host.objects.all().delete() try: @@ -1121,7 +1133,11 @@ def init_mock_clients(self): self.cmdb_mock_client = api_mkd.cmdb.utils.CCApiMockClient( batch_update_host=mock_data_utils.MockReturn( return_type=mock_data_utils.MockReturnType.SIDE_EFFECT.value, - return_obj=[ApiResultError("更新主机cpu架构信息失败"), ApiResultError("更新主机cpu架构信息失败")], + return_obj=[ + ApiResultError("更新主机cpu架构信息失败"), + ApiResultError("更新主机cpu架构信息失败"), + ApiResultError("更新主机cpu架构信息失败"), + ], ) ) @@ -1139,7 +1155,7 @@ def _do_case_assert(self, service, method, assertion, no, name, args=None, kwarg self.assertEqual(list(failed_subscription_instance_id_reason_map.values()), ["[3800002] 更新主机cpu架构信息失败"]) # 验证重试次数 - self.assertEqual(self.cmdb_mock_client.batch_update_host.call_count, 2) + self.assertEqual(self.cmdb_mock_client.batch_update_host.call_count, 3) class RetrySuccessTestCase(LinuxInstallTestCase): diff --git a/apps/backend/tests/components/collections/plugin/test_init_proc_script.py b/apps/backend/tests/components/collections/plugin/test_init_proc_script.py index ae5a4ff4a..bf02b417f 100644 --- a/apps/backend/tests/components/collections/plugin/test_init_proc_script.py +++ b/apps/backend/tests/components/collections/plugin/test_init_proc_script.py @@ -10,6 +10,7 @@ """ import os +import mock from mock.mock import patch from apps.backend.api.job import process_parms @@ -18,7 +19,13 @@ from apps.backend.tests.components.collections.plugin.test_install_package import ( InstallPackageTest, ) -from apps.backend.tests.components.collections.plugin.utils import JOB_INSTANCE_ID +from apps.backend.tests.components.collections.plugin.utils import ( + JOB_INSTANCE_ID, + PluginTestObjFactory, +) +from apps.mock_data import api_mkd +from apps.mock_data import utils as mock_data_utils +from apps.node_man import models from pipeline.component_framework.test import ( ComponentTestCase, ExecuteAssertion, @@ -27,13 +34,17 @@ class InitProcOperateScriptTest(InstallPackageTest): + @classmethod + def get_default_case_name(cls) -> str: + return "测试初始化插件操作脚本" + def component_cls(self): return InitProcOperateScriptComponent def cases(self): return [ ComponentTestCase( - name="测试初始化插件操作脚本", + name=self.get_default_case_name(), inputs=self.COMMON_INPUTS, parent_data={}, execute_assertion=ExecuteAssertion( @@ -55,6 +66,10 @@ def cases(self): class InitProcOperateInTmpDir(InitProcOperateScriptTest): + @classmethod + def get_default_case_name(cls) -> str: + return "【下发脚本】到临时路径" + def test_component(self): with patch( "apps.backend.tests.components.collections.plugin.utils.JobMockClient.fast_execute_script" @@ -77,3 +92,91 @@ def test_component(self): process_parms(INITIALIZE_SCRIPT), fast_execute_script.call_args[0][0]["script_content"], ) + + +class InstallPluginWhenHostDoesNotExist(InitProcOperateScriptTest): + @classmethod + def get_default_case_name(cls) -> str: + return "主机不存在时【安装插件】" + + @property + def list_biz_hosts(self): + host_params = PluginTestObjFactory.host_obj() + host_params["bk_host_innerip"] = host_params["inner_ip"] + return { + "count": 1, + "info": [host_params], + } + + def init_mock_clients(self): + self.cmdb_mock_client = api_mkd.cmdb.utils.CCApiMockClient( + list_biz_hosts_return=mock_data_utils.MockReturn( + return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.list_biz_hosts + ), + ) + + def start_patch(self): + mock.patch("apps.component.esbclient.client_v2.cc", self.cmdb_mock_client).start() + + def adjust_data(self): + # 填充meta信息 + self.COMMON_INPUTS["meta"] = { + "GSE_VERSION": "V2", + "STEPS": [ + { + "action": "MAIN_INSTALL_PLUGIN", + "extra_info": {}, + "id": "bkmonitorbeat", + "index": 0, + "node_name": "[bkmonitorbeat] 部署插件程序", + "pipeline_id": "xxxxxxxxxxxxxxxxx", + "type": "PLUGIN", + } + ], + } + + # 主机删除 + models.Host.objects.all().delete() + + def setUp(self): + super().setUp() + self.init_mock_clients() + self.start_patch() + self.adjust_data() + + +class ReloadPluginWhenHostDoesNotExist(InstallPluginWhenHostDoesNotExist): + @classmethod + def get_default_case_name(cls) -> str: + return "主机不存在时【重载插件】" + + def adjust_data(self): + super().adjust_data() + self.COMMON_INPUTS["meta"] = { + "GSE_VERSION": "V2", + "STEPS": [ + { + "action": "MAIN_RELOAD_PLUGIN", + "extra_info": {}, + "id": "bkmonitorbeat", + "index": 0, + "node_name": "[bkmonitorbeat] 重载插件", + "pipeline_id": "xxxxxxxxxxxxxxxxx", + "type": "PLUGIN", + } + ], + } + + def _do_case_assert(self, service, method, assertion, no, name, args=None, kwargs=None): + try: + super()._do_case_assert(service, method, assertion, no, name, args, kwargs) + except AssertionError: + self.assertEqual( + list(service.failed_subscription_instance_id_reason_map.keys()), + self.COMMON_INPUTS["subscription_instance_ids"], + ) + + self.assertEqual( + list(service.failed_subscription_instance_id_reason_map.values()), + ["主机不存在或未同步"], + )