Skip to content

Commit

Permalink
feat: 插件安装支持不存在主机差量同步 (closed TencentBlueKing#2183)
Browse files Browse the repository at this point in the history
  • Loading branch information
ping15 authored and ZhuoZhuoCrayon committed May 27, 2024
1 parent b9ca10e commit dd3f842
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 43 deletions.
30 changes: 0 additions & 30 deletions apps/backend/components/collections/agent_new/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from apps.backend.subscription.steps.agent_adapter.adapter import AgentStepAdapter
from apps.node_man import constants, models
from apps.node_man.exceptions import AliveProxyNotExistsError
from apps.node_man.periodic_tasks.sync_cmdb_host import bulk_differential_sync_biz_hosts
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve

Expand Down Expand Up @@ -78,22 +77,6 @@ def get_common_data(cls, data):
# 引入背景:在聚合流程中,类似 host.agent_config, host.ap 的逻辑会引发 n + 1 DB查询问题
host_id__ap_map: Dict[int, models.AccessPoint] = {}

expected_bk_host_ids_gby_bk_biz_id: Dict[int, List[int]] = defaultdict(list)
if len(common_data.host_id_obj_map) < len(common_data.bk_host_ids):
deleted_host_ids = set(common_data.bk_host_ids) - set(common_data.host_id_obj_map.keys())
for deleted_host_id in deleted_host_ids:
bk_biz_id = common_data.sub_inst_id__sub_inst_obj_map[
common_data.host_id__sub_inst_id_map[deleted_host_id]
].instance_info["host"]["bk_biz_id"]
expected_bk_host_ids_gby_bk_biz_id[bk_biz_id].append(deleted_host_id)

bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id=expected_bk_host_ids_gby_bk_biz_id)

host_id__ap_map_with_pullback: Dict[int, models.AccessPoint] = models.Host.host_id_obj_map(
bk_host_id__in=deleted_host_ids
)
common_data.host_id_obj_map.update(host_id__ap_map_with_pullback)

for bk_host_id in common_data.bk_host_ids:
host = common_data.host_id_obj_map.get(bk_host_id)
if not host:
Expand Down Expand Up @@ -368,19 +351,6 @@ def get_host_ap(self, common_data: AgentCommonData, host: models.Host) -> Option

return host_ap

def get_host(self, common_data: AgentCommonData, host_id: int) -> Optional[models.Host]:
host_id_obj_map = common_data.host_id_obj_map
host = host_id_obj_map.get(host_id)
if not host:
code = self.__class__.__name__
logger.info(f"[task_engine][service_run_exc_handler:{code}] exc -> GetHostError, host_id -> {host_id}")
self.move_insts_to_failed(
[common_data.host_id__sub_inst_id_map[host_id]],
_("主机不存在或未同步"),
)

return host


# 根据 JOB 的插件额外封装一层,保证后续基于 Agent 增加定制化功能的可扩展性

Expand Down
56 changes: 56 additions & 0 deletions apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import traceback
import typing
from collections import defaultdict
from dataclasses import dataclass
from typing import (
Any,
Expand All @@ -34,10 +35,12 @@

from apps.adapters.api.gse import GseApiBaseHelper, get_gse_api_helper
from apps.backend.api.constants import POLLING_TIMEOUT
from apps.backend.constants import ActionNameType
from apps.backend.subscription import errors
from apps.core.files.storage import get_storage
from apps.exceptions import parse_exception
from apps.node_man import constants, models
from apps.node_man.periodic_tasks.sync_cmdb_host import bulk_differential_sync_biz_hosts
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve
from apps.utils import cache, time_handler, translation
Expand Down Expand Up @@ -366,6 +369,31 @@ def get_job_meta(data) -> Dict[str, Any]:
scope_type = meta.get("SCOPE_TYPE", constants.BkJobScopeType.BIZ_SET.value)
return {"bk_biz_id": scope_id, "bk_scope_type": scope_type, "bk_scope_id": scope_id}

@classmethod
def handle_deleted_host_ids(
cls,
deleted_host_ids: Set[int],
host_id_obj_map: Dict[int, models.Host],
sub_inst_id__sub_inst_obj_map: Dict[int, Any],
host_id__sub_inst_id_map: Dict[int, int],
) -> None:
expected_bk_host_ids_gby_bk_biz_id: Dict[int, List[int]] = defaultdict(list)
for deleted_host_id in deleted_host_ids:
bk_biz_id = sub_inst_id__sub_inst_obj_map[host_id__sub_inst_id_map[deleted_host_id]].instance_info["host"][
"bk_biz_id"
]
expected_bk_host_ids_gby_bk_biz_id[bk_biz_id].append(deleted_host_id)

bulk_differential_sync_biz_hosts(expected_bk_host_ids_gby_bk_biz_id=expected_bk_host_ids_gby_bk_biz_id)

host_id__obj_map_with_pullback: Dict[int, models.Host] = models.Host.host_id_obj_map(
bk_host_id__in=deleted_host_ids
)

pullback_host_ids: List[int] = [host_obj.bk_host_id for host_obj in host_id__obj_map_with_pullback.values()]
logger.info("[handle_deleted_host_ids][engine] pullback host_ids -> %s", pullback_host_ids)
host_id_obj_map.update(host_id__obj_map_with_pullback)

@classmethod
def get_common_data(cls, data):
"""
Expand Down Expand Up @@ -403,6 +431,21 @@ def get_common_data(cls, data):

host_id_obj_map: Dict[int, models.Host] = models.Host.host_id_obj_map(bk_host_id__in=bk_host_ids)

steps: List[Dict[str, Any]] = cls.get_meta(data)["STEPS"]
for step in steps:
action = step.get("action")

# 【插件安装】或【非插件操作】做主机差量同步
if action and (action == ActionNameType.MAIN_INSTALL_PLUGIN or "AGENT" in action):
if len(host_id_obj_map) < len(bk_host_ids):
cls.handle_deleted_host_ids(
deleted_host_ids=set(bk_host_ids) - set(host_id_obj_map.keys()),
host_id_obj_map=host_id_obj_map,
sub_inst_id__sub_inst_obj_map=sub_inst_id__sub_inst_obj_map,
host_id__sub_inst_id_map=host_id__sub_inst_id_map,
)
break

ap_id_obj_map = models.AccessPoint.ap_id_obj_map()
return CommonData(
bk_host_ids=bk_host_ids,
Expand Down Expand Up @@ -578,3 +621,16 @@ def outputs_format(self):
required=True,
)
]

def get_host(self, common_data: CommonData, host_id: int) -> Optional[models.Host]:
host_id_obj_map = common_data.host_id_obj_map
host = host_id_obj_map.get(host_id)
if not host:
code = self.__class__.__name__
logger.info(f"[task_engine][service_run_exc_handler:{code}] exc -> GetHostError, host_id -> {host_id}")
self.move_insts_to_failed(
[common_data.host_id__sub_inst_id_map[host_id]],
_("主机不存在或未同步"),
)

return host
13 changes: 10 additions & 3 deletions apps/backend/components/collections/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,9 @@ def _execute(self, data, parent_data, common_data: CommonData):
multi_job_params_map: Dict[str, Dict[str, Any]] = defaultdict(lambda: defaultdict(list))
for sub_inst in common_data.subscription_instances:
bk_host_id = sub_inst.instance_info["host"]["bk_host_id"]
host_obj = common_data.host_id_obj_map[bk_host_id]
host_obj: Optional[models.Host] = self.get_host(common_data, bk_host_id)
if not host_obj:
continue
script_param = self.get_script_param(data=data, common_data=common_data, host=host_obj)
script_content = self.get_script_content(data=data, common_data=common_data, host=host_obj)
target_servers = self.get_target_servers(data=data, common_data=common_data, host=host_obj)
Expand Down Expand Up @@ -502,7 +504,10 @@ def _execute(self, data, parent_data, common_data: CommonData):
multi_job_params_map: Dict[str, Dict[str, Any]] = {}
for sub_inst in common_data.subscription_instances:
bk_host_id = sub_inst.instance_info["host"]["bk_host_id"]
host_obj = common_data.host_id_obj_map[bk_host_id]
host_obj: Optional[models.Host] = self.get_host(common_data, bk_host_id)
if not host_obj:
continue

target_servers = self.get_target_servers(data=data, common_data=common_data, host=host_obj)

# 所有的逻辑处理都基于 host_obj, 仅执行目标使用 target_host
Expand Down Expand Up @@ -601,7 +606,9 @@ def _execute(self, data, parent_data, common_data: CommonData):
multi_job_params_map: Dict[str, Dict[str, Any]] = {}
for sub_inst in common_data.subscription_instances:
bk_host_id = sub_inst.instance_info["host"]["bk_host_id"]
host_obj = common_data.host_id_obj_map[bk_host_id]
host_obj: Optional[models.Host] = self.get_host(common_data, bk_host_id)
if not host_obj:
continue

config_info_list = self.get_config_info_list(data=data, common_data=common_data, host=host_obj)
file_target_path = self.get_file_target_path(data=data, common_data=common_data, host=host_obj)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1067,6 +1067,23 @@ def tearDown(self) -> None:


class KeyErrorWithPullAllBackTestCase(InstallBaseTestCase):
def setUp(self) -> None:
super().setUp()
self.common_inputs["meta"] = {
"GSE_VERSION": "V2",
"STEPS": [
{
"action": "REINSTALL_AGENT_2",
"extra_info": {},
"id": "agent",
"index": 0,
"node_name": "[agent] 重装",
"pipeline_id": "xxxxxxxxxxxxxxxxxxxxxxx",
"type": "AGENT",
}
],
}

@classmethod
def get_default_case_name(cls) -> str:
return "测试主机误删除全部拉回来的场景"
Expand All @@ -1081,16 +1098,11 @@ def _do_case_assert(self, service, method, assertion, no, name, args=None, kwarg
super()._do_case_assert(service, method, assertion, no, name, args, kwargs)


class KeyErrorWithPullPartialBackTestCase(InstallBaseTestCase):
class KeyErrorWithPullPartialBackTestCase(KeyErrorWithPullAllBackTestCase):
@classmethod
def get_default_case_name(cls) -> str:
return "测试主机拉回来部分,host_id_obj_map获取host_obj过程中出现KeyError场景"

@classmethod
def setup_obj_factory(cls):
"""设置 obj_factory"""
cls.obj_factory.init_host_num = 20

def _do_case_assert(self, service, method, assertion, no, name, args=None, kwargs=None):
models.Host.objects.all().delete()
try:
Expand Down Expand Up @@ -1121,7 +1133,11 @@ def init_mock_clients(self):
self.cmdb_mock_client = api_mkd.cmdb.utils.CCApiMockClient(
batch_update_host=mock_data_utils.MockReturn(
return_type=mock_data_utils.MockReturnType.SIDE_EFFECT.value,
return_obj=[ApiResultError("更新主机cpu架构信息失败"), ApiResultError("更新主机cpu架构信息失败")],
return_obj=[
ApiResultError("更新主机cpu架构信息失败"),
ApiResultError("更新主机cpu架构信息失败"),
ApiResultError("更新主机cpu架构信息失败"),
],
)
)

Expand All @@ -1139,7 +1155,7 @@ def _do_case_assert(self, service, method, assertion, no, name, args=None, kwarg
self.assertEqual(list(failed_subscription_instance_id_reason_map.values()), ["[3800002] 更新主机cpu架构信息失败"])

# 验证重试次数
self.assertEqual(self.cmdb_mock_client.batch_update_host.call_count, 2)
self.assertEqual(self.cmdb_mock_client.batch_update_host.call_count, 3)


class RetrySuccessTestCase(LinuxInstallTestCase):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"""
import os

import mock
from mock.mock import patch

from apps.backend.api.job import process_parms
Expand All @@ -18,7 +19,13 @@
from apps.backend.tests.components.collections.plugin.test_install_package import (
InstallPackageTest,
)
from apps.backend.tests.components.collections.plugin.utils import JOB_INSTANCE_ID
from apps.backend.tests.components.collections.plugin.utils import (
JOB_INSTANCE_ID,
PluginTestObjFactory,
)
from apps.mock_data import api_mkd
from apps.mock_data import utils as mock_data_utils
from apps.node_man import models
from pipeline.component_framework.test import (
ComponentTestCase,
ExecuteAssertion,
Expand All @@ -27,13 +34,17 @@


class InitProcOperateScriptTest(InstallPackageTest):
@classmethod
def get_default_case_name(cls) -> str:
return "测试初始化插件操作脚本"

def component_cls(self):
return InitProcOperateScriptComponent

def cases(self):
return [
ComponentTestCase(
name="测试初始化插件操作脚本",
name=self.get_default_case_name(),
inputs=self.COMMON_INPUTS,
parent_data={},
execute_assertion=ExecuteAssertion(
Expand All @@ -55,6 +66,10 @@ def cases(self):


class InitProcOperateInTmpDir(InitProcOperateScriptTest):
@classmethod
def get_default_case_name(cls) -> str:
return "【下发脚本】到临时路径"

def test_component(self):
with patch(
"apps.backend.tests.components.collections.plugin.utils.JobMockClient.fast_execute_script"
Expand All @@ -77,3 +92,91 @@ def test_component(self):
process_parms(INITIALIZE_SCRIPT),
fast_execute_script.call_args[0][0]["script_content"],
)


class InstallPluginWhenHostDoesNotExist(InitProcOperateScriptTest):
@classmethod
def get_default_case_name(cls) -> str:
return "主机不存在时【安装插件】"

@property
def list_biz_hosts(self):
host_params = PluginTestObjFactory.host_obj()
host_params["bk_host_innerip"] = host_params["inner_ip"]
return {
"count": 1,
"info": [host_params],
}

def init_mock_clients(self):
self.cmdb_mock_client = api_mkd.cmdb.utils.CCApiMockClient(
list_biz_hosts_return=mock_data_utils.MockReturn(
return_type=mock_data_utils.MockReturnType.RETURN_VALUE.value, return_obj=self.list_biz_hosts
),
)

def start_patch(self):
mock.patch("apps.component.esbclient.client_v2.cc", self.cmdb_mock_client).start()

def adjust_data(self):
# 填充meta信息
self.COMMON_INPUTS["meta"] = {
"GSE_VERSION": "V2",
"STEPS": [
{
"action": "MAIN_INSTALL_PLUGIN",
"extra_info": {},
"id": "bkmonitorbeat",
"index": 0,
"node_name": "[bkmonitorbeat] 部署插件程序",
"pipeline_id": "xxxxxxxxxxxxxxxxx",
"type": "PLUGIN",
}
],
}

# 主机删除
models.Host.objects.all().delete()

def setUp(self):
super().setUp()
self.init_mock_clients()
self.start_patch()
self.adjust_data()


class ReloadPluginWhenHostDoesNotExist(InstallPluginWhenHostDoesNotExist):
@classmethod
def get_default_case_name(cls) -> str:
return "主机不存在时【重载插件】"

def adjust_data(self):
super().adjust_data()
self.COMMON_INPUTS["meta"] = {
"GSE_VERSION": "V2",
"STEPS": [
{
"action": "MAIN_RELOAD_PLUGIN",
"extra_info": {},
"id": "bkmonitorbeat",
"index": 0,
"node_name": "[bkmonitorbeat] 重载插件",
"pipeline_id": "xxxxxxxxxxxxxxxxx",
"type": "PLUGIN",
}
],
}

def _do_case_assert(self, service, method, assertion, no, name, args=None, kwargs=None):
try:
super()._do_case_assert(service, method, assertion, no, name, args, kwargs)
except AssertionError:
self.assertEqual(
list(service.failed_subscription_instance_id_reason_map.keys()),
self.COMMON_INPUTS["subscription_instance_ids"],
)

self.assertEqual(
list(service.failed_subscription_instance_id_reason_map.values()),
["主机不存在或未同步"],
)

0 comments on commit dd3f842

Please sign in to comment.