Skip to content

Commit

Permalink
feature: 提供 Agent 包管理后台接口 (closed #1683)
Browse files Browse the repository at this point in the history
  • Loading branch information
wyyalt committed Jan 5, 2024
1 parent 2a7ec66 commit 1b0d11c
Show file tree
Hide file tree
Showing 35 changed files with 1,243 additions and 75 deletions.
68 changes: 52 additions & 16 deletions apps/backend/agent/artifact_builder/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
from apps.backend.agent.config_parser import GseConfigParser
from apps.core.files import core_files_constants
from apps.core.files.storage import get_storage
from apps.core.tag.constants import AGENT_NAME_TARGET_ID_MAP, TargetType
from apps.core.tag.constants import TargetType
from apps.core.tag.handlers import TagHandler
from apps.core.tag.targets import AgentTargetHelper
from apps.node_man import constants, models
from apps.utils import cache, files

Expand Down Expand Up @@ -441,25 +442,70 @@ def _get_changelog(self, extract_dir: str) -> str:
changelog: str = changelog_fs.read()
return changelog

def update_or_create_record(self, artifact_meta_info: typing.Dict[str, typing.Any]):
def generate_location_path(self, upload_path: str, pkg_name: str) -> str:
if settings.STORAGE_TYPE == core_files_constants.StorageType.BLUEKING_ARTIFACTORY.value:
location_path: str = f"{settings.BKREPO_ENDPOINT_URL}/generic/blueking/bknodeman/{upload_path}/{pkg_name}"
else:
location_path: str = f"http://{settings.BKAPP_LAN_IP}/{upload_path}/{pkg_name}"

return location_path

def update_or_create_package_records(self, package_infos: typing.List[typing.Dict[str, typing.Any]]):
"""
创建或更新制品记录,待 Agent 包管理完善
:param artifact_meta_info:
创建或更新制品记录
:param package_infos:
:return:
"""
pass
for package_info in package_infos:
models.GsePackages.objects.update_or_create(
defaults={
"pkg_size": package_info["package_upload_info"]["pkg_size"],
"pkg_path": package_info["package_upload_info"]["pkg_path"],
"md5": package_info["package_upload_info"]["md5"],
"location": self.generate_location_path(
package_info["package_upload_info"]["pkg_path"],
package_info["package_upload_info"]["pkg_name"],
),
"version_log": package_info["artifact_meta_info"]["changelog"],
},
pkg_name=package_info["package_upload_info"]["pkg_name"],
version=package_info["artifact_meta_info"]["version"],
project=package_info["artifact_meta_info"]["name"],
os=package_info["package_dir_info"]["os"],
cpu_arch=package_info["package_dir_info"]["cpu_arch"],
)
logger.info(
f"[update_or_create_package_record] "
f"package name -> {package_info['package_upload_info']['pkg_name']} success"
)

if package_infos:
models.GsePackageDesc.objects.update_or_create(
defaults={
"description": package_infos[0]["artifact_meta_info"]["changelog"],
},
project=package_infos[0]["artifact_meta_info"]["name"],
category=constants.CategoryType.official,
)

logger.info(
f"[update_or_create_package_record] "
f"package desc -> {package_info['package_upload_info']['pkg_name']}, "
f"project -> {package_infos[0]['artifact_meta_info']['name']} success"
)

def update_or_create_tag(self, artifact_meta_info: typing.Dict[str, typing.Any]):
"""
创建或更新标签记录,待 Agent 包管理完善
:param artifact_meta_info:
:return:
"""
agent_name_target_id_map: typing.Dict[str, int] = AgentTargetHelper.get_agent_name_target_id_map()
for tag in self.tags:
TagHandler.publish_tag_version(
name=tag,
target_type=TargetType.AGENT.value,
target_id=AGENT_NAME_TARGET_ID_MAP[self.NAME],
target_id=agent_name_target_id_map[self.NAME],
target_version=artifact_meta_info["version"],
)
logger.info(
Expand Down Expand Up @@ -517,14 +563,6 @@ def update_or_create_support_files(self, package_infos: typing.List[typing.Dict]
agent_name=self.NAME,
)

def update_or_create_package_records(self, v):
"""
创建或更新安装包记录,待 Agent 包管理完善
:param package_infos:
:return:
"""
pass

def get_artifact_meta_info(self, extract_dir: str) -> typing.Dict[str, typing.Any]:
"""
获取制品的基础信息、配置文件信息
Expand Down Expand Up @@ -591,8 +629,6 @@ def make(
artifact_meta_info["operator"] = operator
# Agent 包先导入文件源 -> 写配置文件 -> 创建包记录 -> 创建 Tag
self.update_or_create_support_files(package_infos)
# TODO update_or_create_record & update_or_create_package_records 似乎是一样的功能?
self.update_or_create_record(artifact_meta_info)
self.update_or_create_package_records(package_infos)
self.update_or_create_tag(artifact_meta_info)

Expand Down
5 changes: 2 additions & 3 deletions apps/backend/agent/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ def check_run_commands(run_commands):


def batch_gen_commands(
base_agent_setup_info: AgentSetupInfo,
agent_step_adapter,
hosts: List[models.Host],
pipeline_id: str,
is_uninstall: bool,
Expand All @@ -336,7 +336,6 @@ def batch_gen_commands(
# 批量查出主机的属性并设置为property,避免在循环中进行ORM查询,提高效率
host_id__installation_tool_map = {}
bk_host_ids = [host.bk_host_id for host in hosts]
base_agent_setup_info_dict: Dict[str, Any] = asdict(base_agent_setup_info)
host_id_identity_map = {
identity.bk_host_id: identity for identity in models.IdentityData.objects.filter(bk_host_id__in=bk_host_ids)
}
Expand All @@ -354,7 +353,7 @@ def batch_gen_commands(
host_id__installation_tool_map[host.bk_host_id] = gen_commands(
agent_setup_info=AgentSetupInfo(
**{
**base_agent_setup_info_dict,
**asdict(agent_step_adapter.get_host_setup_info(host)),
"force_update_agent_id": agent_setup_extra_info_dict.get("force_update_agent_id", False),
}
),
Expand Down
4 changes: 2 additions & 2 deletions apps/backend/components/collections/agent_new/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_agent_pkg_name(
package_type = ("client", "proxy")[host.node_type == constants.NodeType.PROXY]
agent_step_adapter = common_data.agent_step_adapter
if not agent_step_adapter.is_legacy:
setup_info = agent_step_adapter.setup_info
setup_info = agent_step_adapter.get_host_setup_info(host)
return f"{setup_info.name}-{setup_info.version}.tgz"

# GSE1.0 的升级包是独立的,添加了 _upgrade 后缀
Expand Down Expand Up @@ -262,7 +262,7 @@ def get_host_id__installation_tool_map(
host for host in hosts_need_gen_commands if host.bk_host_id in host_id__install_channel_map
]
host_id__installation_tool_map = batch_gen_commands(
base_agent_setup_info=common_data.agent_step_adapter.setup_info,
agent_step_adapter=common_data.agent_step_adapter,
hosts=hosts_need_gen_commands,
pipeline_id=self.id,
is_uninstall=is_uninstall,
Expand Down
8 changes: 8 additions & 0 deletions apps/backend/subscription/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,11 @@ class SubscriptionIncludeGrayBizError(AppBaseException):
ERROR_CODE = 19
MESSAGE = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")
MESSAGE_TPL = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")


class AgentPackageValidationError(AppBaseException):
"""AgentPackage校验错误"""

ERROR_CODE = 20
MESSAGE = _("AgentPackage校验错误")
MESSAGE_TPL = _("{msg}")
92 changes: 80 additions & 12 deletions apps/backend/subscription/steps/agent_adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

from apps.backend.agent.tools import fetch_proxies
from apps.backend.constants import ProxyConfigFile
from apps.core.tag.constants import AGENT_NAME_TARGET_ID_MAP, TargetType
from apps.backend.subscription import errors
from apps.core.tag.constants import TargetType
from apps.core.tag.targets import get_target_helper
from apps.node_man import constants, models
from apps.utils import cache
Expand All @@ -28,17 +29,29 @@
from .config_context import context_helper
from .handlers import GseConfigHandler

# from apps.node_man.serializers.job import AgentVersionSerializer

logger = logging.getLogger("app")


LEGACY = "legacy"


class AgentVersionSerializer(serializers.Serializer):
os_cpu_arch = serializers.CharField(label="系统CPU架构", required=False)
bk_host_id = serializers.IntegerField(label="主机ID", required=False)
version = serializers.CharField(label="Agent Version")


class AgentStepConfigSerializer(serializers.Serializer):
name = serializers.CharField(required=False, label="构件名称")
# LEGACY 表示旧版本 Agent,仅做兼容
version = serializers.CharField(required=False, label="构件版本", default=LEGACY)
job_type = serializers.ChoiceField(required=True, choices=constants.JOB_TUPLE)
choice_version_type = serializers.ChoiceField(
required=False, choices=constants.AgentVersionType.list_choices(), label="选择Agent Version类型"
)
version_map_list = AgentVersionSerializer(many=True)


@dataclass
Expand All @@ -57,13 +70,19 @@ class AgentStepAdapter:
log_prefix: str = field(init=False)
# 配置处理模块缓存
_config_handler_cache: typing.Dict[str, GseConfigHandler] = field(init=False)
_setup_info_cache: typing.Dict[str, base.AgentSetupInfo] = field(init=False)
_target_version_cache: typing.Dict[str, str] = field(init=False)
agent_name: str = field(init=False)

def __post_init__(self):
self.is_legacy = self.gse_version == GseVersion.V1.value
self.log_prefix: str = (
f"[{self.__class__.__name__}({self.subscription_step.step_id})] | {self.subscription_step} |"
)
self._config_handler_cache: typing.Dict[str, GseConfigHandler] = {}
self._setup_info_cache: typing.Dict[str, base.AgentSetupInfo] = {}
self._target_version_cache: typing.Dict[str, str] = {}
self.agent_name = self.config.get("name")

def get_config_handler(self, agent_name: str, target_version: str) -> GseConfigHandler:

Expand Down Expand Up @@ -104,11 +123,12 @@ def _get_config(
install_channel: typing.Tuple[typing.Optional[models.Host], typing.Dict[str, typing.List]],
target_version: typing.Optional[typing.Dict[int, str]] = None,
) -> str:
agent_setup_info: base.AgentSetupInfo = self.setup_info
agent_setup_info: base.AgentSetupInfo = self.get_host_setup_info(host)
# 目标版本优先使用传入版本,传入版本必不会是标签所以可直接使用
config_handler: GseConfigHandler = self.get_config_handler(
agent_setup_info.name, target_version or agent_setup_info.version
)

config_tmpl_obj: base.AgentConfigTemplate = config_handler.get_matching_config_tmpl(
os_type=host.os_type,
cpu_arch=host.cpu_arch,
Expand Down Expand Up @@ -168,29 +188,65 @@ def get_config(

@property
@cache.class_member_cache()
def setup_info(self) -> base.AgentSetupInfo:
def bk_host_id_version_map(self, host: models.Host) -> typing.Dict[int, str]:
return {versiom_map["bk_host_id"]: versiom_map["version"] for versiom_map in self.config["version_map_list"]}

def get_host_setup_info(self, host: models.Host) -> base.AgentSetupInfo:
"""
获取 Agent 设置信息
TODO 后续如需支持多版本,该方法改造为 `get_host_setup_info`,根据维度进行缓存,参考 _config_handler_cache
:return:
"""
# 如果版本号匹配到标签名称,取对应标签下的真实版本号,否则取原来的版本号
agent_name: typing.Optional[str] = self.config.get("name")
if agent_name not in AGENT_NAME_TARGET_ID_MAP:
# 1.0 Install
if self.agent_name is None:
# 1.0 Install 或者 2.0统一版本
target_version = self.config.get("version")
setup_info_cache_key: str = f"agent_name_is_none:version:{target_version}"
else:
target_version: str = get_target_helper(TargetType.AGENT.value).get_target_version(
target_id=AGENT_NAME_TARGET_ID_MAP[agent_name],
target_version=self.config.get("version"),
)
if self.config["choice_version_type"] == constants.AgentVersionType.UNIFIED.value:
agent_version = self.config.get("version")
setup_info_cache_key: str = (
f"agent_name:{self.agent_name}:"
f"type:{constants.AgentVersionType.UNIFIED.value}:version:{agent_version}"
)
elif self.config["choice_version_type"] == constants.AgentVersionType.BY_SYSTEM_ARCH.value:
# TODO 按系统架构维度, 当前只支持按系统,后续需求完善按系统架构
os_cpu_arch_version_list: typing.List[str] = [
versiom_map["version"]
for versiom_map in self.config["version_map_list"]
if host.os_type.lower() in versiom_map["os_cpu_arch"]
]
agent_version: str = os_cpu_arch_version_list[0] if os_cpu_arch_version_list else "stable"
setup_info_cache_key: str = (
f"agent_name:{self.agent_name}:type:{constants.AgentVersionType.BY_SYSTEM_ARCH.value}:"
f"os:{host.os_type.lower()}:version:{agent_version}"
)
else:
# 按主机维度
agent_version: str = self.bk_host_id_version_map[host.bk_host_id]

return base.AgentSetupInfo(
target_version_cache_key: str = f"agent_desc_id:{self.agent_desc.id}:agent_version:{agent_version}"
target_version: str = self._target_version_cache.get(target_version_cache_key)
if target_version is None:
target_version: str = get_target_helper(TargetType.AGENT.value).get_target_version(
target_id=self.agent_desc.id,
target_version=agent_version,
)
self._target_version_cache[target_version_cache_key] = target_version

if self.config["choice_version_type"] != constants.AgentVersionType.BY_HOST.value:
agent_setup_info: typing.Optional[base.AgentSetupInfo] = self._setup_info_cache.get(setup_info_cache_key)
if agent_setup_info:
return agent_setup_info

agent_setup_info: base.AgentSetupInfo = base.AgentSetupInfo(
is_legacy=self.is_legacy,
agent_tools_relative_dir=("agent_tools/agent2", "")[self.is_legacy],
name=self.config.get("name"),
version=target_version,
)
if self.config["choice_version_type"] != constants.AgentVersionType.BY_HOST.value:
self._setup_info_cache[setup_info_cache_key] = agent_setup_info
return agent_setup_info

@staticmethod
def validated_data(data, serializer) -> OrderedDict:
Expand All @@ -204,3 +260,15 @@ def get_os_key(os_type: str, cpu_arch: str) -> str:
os_type = os_type or constants.OsType.LINUX
cpu_arch = cpu_arch or constants.CpuType.x86_64
return f"{os_type.lower()}-{cpu_arch}"

@property
def agent_desc(self) -> models.GsePackageDesc:
if hasattr(self, "_agent_desc") and self._agent_desc:
return self._agent_desc
try:
agent_desc = models.GsePackageDesc.objects.get(project=self.agent_name)
except models.GsePackageDesc.DoesNotExist:
raise errors.AgentPackageValidationError(msg="GsePackageDesc [{name}] 不存在".format(name=self.agent_name))

setattr(self, "_agent_desc", agent_desc)
return self._agent_desc
8 changes: 4 additions & 4 deletions apps/backend/subscription/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from collections import defaultdict
from dataclasses import asdict
from functools import cmp_to_key, reduce
from typing import Any, Dict, List, Set
from typing import Dict, List, Set

from django.core.cache import caches
from django.db import transaction
Expand Down Expand Up @@ -691,14 +691,14 @@ def fetch_commands(self, request):
ap_id_obj_map: Dict[int, models.AccessPoint] = models.AccessPoint.ap_id_obj_map()
host_ap: models.AccessPoint = ap_id_obj_map[host_ap_id]

base_agent_setup_info_dict: Dict[str, Any] = asdict(
AgentStepAdapter(subscription_step=sub_step_obj, gse_version=host_ap.gse_version).setup_info
agent_setup_adapter: AgentStepAdapter = AgentStepAdapter(
subscription_step=sub_step_obj, gse_version=host_ap.gse_version
)
agent_setup_extra_info_dict = sub_inst.instance_info["host"].get("agent_setup_extra_info") or {}
installation_tool = gen_commands(
agent_setup_info=AgentSetupInfo(
**{
**base_agent_setup_info_dict,
**asdict(agent_setup_adapter.get_host_setup_info(host)),
"force_update_agent_id": agent_setup_extra_info_dict.get("force_update_agent_id", False),
}
),
Expand Down
Loading

0 comments on commit 1b0d11c

Please sign in to comment.