Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2.4.8 dev #2467

Open
wants to merge 30 commits into
base: v2.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
f965663
minor: auto push 2.4.7 release log
ZhuoZhuoCrayon Oct 22, 2024
7f47072
minor: start new version 2.4.8
ZhuoZhuoCrayon Oct 22, 2024
8de7edd
feat: 安装插件中原子执行动作顺序优化 (closed #2438)
Huayeaaa Oct 18, 2024
156a90b
fix: 修复host_search接口业务拓扑丢失 (closed #2449)
Huayeaaa Sep 29, 2024
5730fa1
feat: arm架构安装proxy优化 (closed #2472)
chalice-1831 Oct 17, 2024
f14f01b
feat: arm架构安装proxy优化 (closed #2472)
wyyalt Nov 1, 2024
1305056
feat: 优化订阅任务按顺序执行而非抛错 (closed #2447)
Huayeaaa Nov 13, 2024
ce731fc
fix: 修复主机信息未更新成功导致沿用旧Agent-ID问题 (closed #2451)
Huayeaaa Oct 18, 2024
d06a615
fix: 修复proxy主机转移模块后变成pagent问题 (closed #2474)
Huayeaaa Oct 28, 2024
847a5d7
feat: agent重装优化
hyunfa Nov 18, 2024
3111219
feat: tlinux4系统 proxy安装脚本及gsectl适配 (closed #2483)
Huayeaaa Nov 1, 2024
f2f2ada
fix: 修复Windows下解压插件进程占用问题 (closed #2486)
Huayeaaa Nov 8, 2024
0b95ce2
feat: 插件部署-插件包参数优化 (closed #2461)
Huayeaaa Oct 18, 2024
82432f4
feat: 替换表单
hyunfa Sep 5, 2024
c9e28ad
feat: 插件operate接口支持主机差量同步 (closed #2510)
Huayeaaa Dec 5, 2024
f6bd1c6
feat: 安装策略修改
hyunfa Dec 13, 2024
d83db27
feat: 切换云上API (closed #2515)
wyyalt Dec 17, 2024
d53080e
fix: 修复差量同步主机忽略内网IP变更场景 (closed #2504)
Huayeaaa Dec 5, 2024
2e22e19
fix: 适配编排更新订阅任务兜底 (closed #2506)
Huayeaaa Dec 11, 2024
d4edc61
feat: 上云环境针对0区域限制新增主机校验提到API层级 (closed #2501)
jpyoung3 Dec 9, 2024
089d033
feat: 向cmdb同步云区域服务商 (closed #2386)
Huayeaaa Aug 26, 2024
f3bc640
fix: 修复读取k8s大整型变成浮点数问题及单测修复 (closed #2521)
Huayeaaa Dec 20, 2024
4f86151
fix: 修复article高度设置引起的外部样式污染问题修复
hyunfa Dec 24, 2024
2d566ca
feat: gse 接口调用分片优化 (closed #2520)
ping15 Dec 20, 2024
b5814a6
feat: 创建订阅支持开启订阅巡检 (closed #2525)
Huayeaaa Dec 24, 2024
1525c9a
fix: 修复同步云服务商CC不存在某个云区域问题 (closed #2529)
Huayeaaa Dec 26, 2024
f7c3d8d
fix: 非centos或debian等操作系统下安装插件时初始化脚本可能存在报错 (closed #2470)
chalice-1831 Oct 23, 2024
81b1d3c
fix: 修复已禁用插件仍能通过监控平台的相关订阅步骤触发下发动作问题(closed #2313)
chalice-1831 Jul 3, 2024
afa880c
feat: 安装预设插件锁定版本 (closed #2482)
jpyoung3 Dec 13, 2024
ad70bb8
feat: 安装预设插件锁定版本 (closed #2482)
jpyoung3 Dec 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ introduction: 通过节点管理,可以对蓝鲸体系中的gse agent进行管
introduction_en: NodeMan can be used to manage the gse agent in the BlueKing system.
Its functions include agent installation, status query, version update, plugin management,
health check, process control, and so on.
version: 2.4.7
version: 2.4.8
category: 运维工具
language_support: 英语,中文
desktop:
Expand Down
7 changes: 6 additions & 1 deletion apps/adapters/api/gse/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,12 @@ def list_proc_state(
data_list_name="_host_info_list",
batch_call_func=concurrent.batch_call,
extend_result=False,
get_config_dict_func=lambda: {"limit": constants.QUERY_PROC_STATUS_HOST_LENS},
get_config_dict_func=lambda: {
"limit": models.GlobalSettings.get_config(
key=models.GlobalSettings.KeyEnum.QUERY_PROC_STATUS_HOST_LENS.value,
default=constants.QUERY_PROC_STATUS_HOST_LENS,
)
},
)
def get_proc_status_inner(
_namespace: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@
class PushFilesToProxyService(AgentTransferFileService):
def get_file_list(self, data, common_data: AgentCommonData, host: models.Host) -> List[str]:
file_list = data.get_one_of_inputs("file_list", default=[])
exclude_map = {
# 后续加入新的架构需要加入到此map
constants.CpuType.x86_64: ("aarch64.tgz",),
constants.CpuType.aarch64: ("x86_64.tgz",),
}
# 获取当前架构对应的排除后缀
exclude_suffix = exclude_map.get(host.cpu_arch, tuple())
if exclude_suffix:
file_list = [item for item in file_list if not item.endswith(exclude_suffix)]
from_type = data.get_one_of_inputs("from_type")
host_ap: Optional[models.AccessPoint] = self.get_host_ap(common_data, host)
if not host_ap:
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/components/collections/common/script_content.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
fi
done

cd "$setup_path"

rm -rf $subscription_tmp_dir
"""

Expand Down
21 changes: 16 additions & 5 deletions apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def get_package_by_process_status(
"""通过进程状态得到插件包对象"""
host = self.get_host_by_process_status(process_status, common_data)
policy_step_adapter = common_data.policy_step_adapter
package = policy_step_adapter.get_matching_package_obj(host.os_type, host.cpu_arch)
package = policy_step_adapter.get_matching_package_obj(host.os_type, host.cpu_arch, host.bk_biz_id)
return package

def get_plugin_root_by_process_status(
Expand Down Expand Up @@ -280,11 +280,12 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
# target_host_objs 的长度通常为1或2,此处也不必担心时间复杂度问题
# 指定 target_host 主要用于远程采集的场景,常见于第三方插件,如拨测
for host in target_host_objs:
bk_biz_id = host.bk_biz_id
bk_host_id = host.bk_host_id
os_type = host.os_type.lower()
cpu_arch = host.cpu_arch
group_id = create_group_id(subscription, subscription_instance.instance_info)
package = self.get_package(subscription_instance, policy_step_adapter, os_type, cpu_arch)
package = self.get_package(subscription_instance, policy_step_adapter, os_type, cpu_arch, bk_biz_id)
ap_config = self.get_ap_config(ap_id_obj_map, host)
setup_path, pid_path, log_path, data_path = self.get_plugins_paths(
package, plugin_name, ap_config, group_id, subscription
Expand Down Expand Up @@ -340,10 +341,11 @@ def get_package(
policy_step_adapter: PolicyStepAdapter,
os_type: str,
cpu_arch: str,
bk_biz_id: int,
) -> models.Packages:
"""获取插件包对象"""
try:
return policy_step_adapter.get_matching_package_obj(os_type, cpu_arch)
return policy_step_adapter.get_matching_package_obj(os_type, cpu_arch, bk_biz_id)
except errors.PackageNotExists as error:
# 插件包不支持或不存在时,记录异常信息,此实例不参与后续流程
self.move_insts_to_failed([subscription_instance.id], str(error))
Expand Down Expand Up @@ -723,6 +725,11 @@ def generate_script_params_by_process_status(
if category == constants.CategoryType.external and group_id:
# 设置插件实例目录
script_param += " -i %s" % group_id
host = self.get_host_by_process_status(process_status, common_data)
if host.os_type == constants.OsType.WINDOWS:
# 设置Windows临时解压目录
temp_sub_unpack_dir: str = "{}\\{}".format(agent_config["temp_path"], group_id)
script_param += " -u %s" % temp_sub_unpack_dir
return script_param


Expand Down Expand Up @@ -974,7 +981,9 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):

# 根据配置模板和上下文变量渲染配置文件
rendered_configs = render_config_files_by_config_templates(
policy_step_adapter.get_matching_config_tmpl_objs(target_host.os_type, target_host.cpu_arch),
policy_step_adapter.get_matching_config_tmpl_objs(
target_host.os_type, target_host.cpu_arch, package, subscription_step.config
),
{"group_id": process_status.group_id},
context,
package_obj=package,
Expand Down Expand Up @@ -1194,6 +1203,8 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):

meta_name = self.get_plugin_meta_name(plugin, process_status)
gse_control = self.get_gse_control(host.os_type, package_control, process_status)
# 优先使用instance_info里的最新的Agent-ID,host里的Agent-ID可能为旧的
bk_agent_id: str = subscription_instance.instance_info["host"].get("bk_agent_id") or host.bk_agent_id

gse_op_params = {
"meta": {"namespace": constants.GSE_NAMESPACE, "name": meta_name},
Expand All @@ -1203,7 +1214,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
"process_status_id": process_status.id,
"subscription_instance_id": subscription_instance.id,
},
"hosts": [{"ip": host.inner_ip, "bk_agent_id": host.bk_agent_id, "bk_cloud_id": host.bk_cloud_id}],
"hosts": [{"ip": host.inner_ip, "bk_agent_id": bk_agent_id, "bk_cloud_id": host.bk_cloud_id}],
"spec": {
"identity": {
"index_key": "",
Expand Down
22 changes: 22 additions & 0 deletions apps/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]:
# redis Gse Agent 配置缓存
REDIS_AGENT_CONF_KEY_TPL = f"{settings.APP_CODE}:backend:agent:config:" + "{file_name}:str:{sub_inst_id}"

# 更新订阅参数储存redis键名模板
UPDATE_SUBSCRIPTION_REDIS_KEY_TPL = f"{settings.APP_CODE}:backend:subscription:update_subscription:params"

# 执行订阅参数储存redis键名模板
RUN_SUBSCRIPTION_REDIS_KEY_TPL = f"{settings.APP_CODE}:backend:subscription:run_subscription:params"


class SubscriptionSwithBizAction(enum.EnhanceEnum):
ENABLE = "enable"
Expand Down Expand Up @@ -166,3 +172,19 @@ def needs_batch_request(self) -> bool:
DEFAULT_CLEAN_RECORD_LIMIT = 5000

POWERSHELL_SERVICE_CHECK_SSHD = "powershell -c Get-Service -Name sshd"

# 处理更新订阅任务间隔
UPDATE_SUBSCRIPTION_TASK_INTERVAL = 2 * 60

# 处理执行订阅任务间隔
RUN_SUBSCRIPTION_TASK_INTERVAL = 3 * 60
# 处理卸载残留订阅任务间隔
HANDLE_UNINSTALL_REST_SUBSCRIPTION_TASK_INTERVAL = 6 * 60 * 60

# 最大更新订阅任务储存数量
MAX_STORE_SUBSCRIPTION_TASK_COUNT = 1000
# 最大执行订阅任务数量
MAX_RUN_SUBSCRIPTION_TASK_COUNT = 50

# 订阅删除时间小时数
SUBSCRIPTION_DELETE_HOURS = 6
1 change: 1 addition & 0 deletions apps/backend/periodic_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
from .clean_sub_data import clean_sub_data_task # noqa
from .clean_subscription_data import clean_subscription_data # noqa
from .collect_auto_trigger_job import collect_auto_trigger_job # noqa
from .schedule_running_subscription_task import * # noqa
from .update_subscription_instances import update_subscription_instances # noqa
23 changes: 18 additions & 5 deletions apps/backend/periodic_tasks/check_zombie_sub_inst_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

from apps.backend.subscription.constants import CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL
from apps.backend.subscription.constants import (
CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL,
ZOMBIE_SUB_INST_RECORD_COUNT,
)
from apps.node_man import constants, models
from apps.utils.time_handler import strftime_local

Expand Down Expand Up @@ -48,10 +51,20 @@ def check_zombie_sub_inst_record():
"status__in": [constants.JobStatusType.PENDING, constants.JobStatusType.RUNNING],
}
base_update_kwargs = {"status": constants.JobStatusType.FAILED, "update_time": timezone.now()}

forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs).update(
**base_update_kwargs
)
# 先count确认是否需要update,如果count数量小于100传主键 update,否则继续沿用现在的方式
subscription_instance_record_qs = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs)
if not subscription_instance_record_qs.exists():
logger.info("no zombie_sub_inst_record skipped")
return
if subscription_instance_record_qs.count() < ZOMBIE_SUB_INST_RECORD_COUNT:
forced_failed_inst_record_ids = set(subscription_instance_record_qs.values_list("id", flat=True))
forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(
id__in=forced_failed_inst_record_ids
).update(**base_update_kwargs)
else:
forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs).update(
**base_update_kwargs
)

forced_failed_status_detail_num = models.SubscriptionInstanceStatusDetail.objects.filter(**query_kwargs).update(
**base_update_kwargs,
Expand Down
154 changes: 154 additions & 0 deletions apps/backend/periodic_tasks/schedule_running_subscription_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# -*- coding: utf-8 -*-
"""
TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available.
Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at https://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import json
from datetime import timedelta
from typing import Any, Dict, List, Set

from celery.task import periodic_task
from django.db.models import QuerySet
from django.utils import timezone

from apps.backend import constants
from apps.backend.subscription.handler import SubscriptionHandler
from apps.backend.utils.redis import REDIS_INST
from apps.node_man import constants as node_man_constants
from apps.node_man import models
from common.log import logger


def get_need_clean_subscription_app_code():
"""
获取配置需要清理的appcode
"""
app_codes: List[str] = models.GlobalSettings.get_config(
key=models.GlobalSettings.KeyEnum.NEED_CLEAN_SUBSCRIPTION_APP_CODE.value, default=[]
)
return app_codes


@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_update_subscription():
logger.info("start schedule update subscription")
name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
# 取出该hashset中所有的参数
update_params: Dict[str, bytes] = REDIS_INST.hgetall(name=name)
# 删除该hashset内的所有参数
REDIS_INST.delete(name)
results = []
if not update_params:
return
for update_param in update_params.values():
# redis取出为bytes类型,需进行解码后转字典
params = json.loads(update_param.decode())
subscription_id = params["subscription_id"]
try:
result: Dict[str, int] = SubscriptionHandler.update_subscription(params=params)
except Exception as e:
logger.exception(f"{subscription_id} update subscription failed with error: {e}")
result = {"subscription_id": subscription_id, "update_result": False}
results.append(result)
logger.info(f"update subscription with results: {results}, length -> {len(results)} ")


@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_run_subscription():
logger.info("start schedule run subscription")
name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_RUN_SUBSCRIPTION_TASK_COUNT)
run_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
REDIS_INST.ltrim(name, 0, -length - 1)
run_params.reverse()
results = []
if not run_params:
return
for run_param in run_params:
# redis取出为bytes类型,需进行解码后转字典
params = json.loads(run_param.decode())
subscription_id = params["subscription_id"]
scope = params["scope"]
actions = params["actions"]
try:
result: Dict[str, int] = SubscriptionHandler(subscription_id).run(scope=scope, actions=actions)
except Exception as e:
logger.exception(f"{subscription_id} run subscription failed with error: {e}")
result = {"subscription_id": subscription_id, "run_result": False}
results.append(result)
logger.info(f"run subscription with results: {results}, length -> {len(results)}")


@periodic_task(
run_every=constants.HANDLE_UNINSTALL_REST_SUBSCRIPTION_TASK_INTERVAL,
queue="default",
options={"queue": "default"},
)
def clean_deleted_subscription():
"""
清理被删除且有卸载残留的订阅
"""
query_kwargs: Dict[str, Any] = {
"is_deleted": True,
"from_system": "bkmonitorv3",
"deleted_time__range": (
timezone.now() - timedelta(hours=constants.SUBSCRIPTION_DELETE_HOURS),
timezone.now(),
),
}

# 卸载有残留的订阅开启订阅巡检的生命周期允许为12h,需要再次设置为软删,减少资源消耗
again_delete_query_kwargs: Dict[str, Any] = {
"enable": True,
"from_system": "bkmonitorv3",
"deleted_time__range": (
timezone.now() - timedelta(hours=3 * constants.SUBSCRIPTION_DELETE_HOURS),
timezone.now() - timedelta(hours=2 * constants.SUBSCRIPTION_DELETE_HOURS),
),
}

app_codes = get_need_clean_subscription_app_code()
if app_codes:
query_kwargs.pop("from_system")
query_kwargs["from_system__in"] = app_codes
again_delete_query_kwargs.pop("from_system")
again_delete_query_kwargs["from_system__in"] = app_codes
need_reset_deleted_subscription_qs: QuerySet = models.Subscription.objects.filter(**again_delete_query_kwargs)
if need_reset_deleted_subscription_qs.exists():
# 使用update方法,不会刷新删除时间
need_reset_deleted_subscription_qs.update(enable=False, is_deleted=True)
changed_subscription_ids = list(need_reset_deleted_subscription_qs.values_list("id", flat=True))
# 记录再次被软删除的订阅ID
logger.info(
f"reset subscription{changed_subscription_ids} is_deleted, length -> {len(changed_subscription_ids)}"
)
# 查询6个小时内被删除的订阅
subscription_qs: QuerySet = models.Subscription.objects.filter(**query_kwargs)

if not subscription_qs.exists():
# 没有被删除的订阅
return
# 被删除的订阅ID
deleted_subscription_ids: Set[int] = set(subscription_qs.values_list("id", flat=True))
# 被删除且卸载残留(失败)的订阅
failed_subscription_qs: QuerySet = models.SubscriptionInstanceRecord.objects.filter(
subscription_id__in=deleted_subscription_ids, is_latest=True, status=node_man_constants.StatusType.FAILED
)
if not failed_subscription_qs.exists():
# 没有失败的订阅实例
return
# 被删除且有卸载残留的订阅ID
failed_subscription_ids: Set[int] = set(failed_subscription_qs.values_list("subscription_id", flat=True))
# 将订阅下的实例更新为空,并且开启订阅巡检
models.Subscription.objects.filter(id__in=failed_subscription_ids, is_deleted=True).update(
nodes=[], is_deleted=False, enable=True
)

logger.info(
f"set {failed_subscription_ids} nodes be null and enable auto trigger, length -> {len(failed_subscription_ids)}"
)
3 changes: 3 additions & 0 deletions apps/backend/plugin/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ class VariableNodeSerializer(serializers.Serializer):
items = serializers.DictField(required=False, label=_("子变量"))
default = LiteralField(required=False, label=_("默认值"))
depth = serializers.IntegerField(label=_("嵌套深度"), max_value=5)
ui_component = serializers.DictField(required=False, label=_("UI组件"))
description = serializers.CharField(required=False, label=_("tips描述说明"), max_length=128)
value = serializers.CharField(required=False, label=_("下拉列表值"), max_length=128)

@classmethod
def parse_default(cls, default_value: Union[str, bool, int, float], variable_type: str):
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/subscription/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

# 检查僵尸订阅实例记录周期
CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL = 15 * constants.TimeUnit.MINUTE
# 僵尸订阅实例记录数量
ZOMBIE_SUB_INST_RECORD_COUNT = 100

# 任务超时时间。距离 create_time 多久后会被判定为超时,防止 pipeline 后台僵死的情况
TASK_TIMEOUT = 15 * constants.TimeUnit.MINUTE
Expand Down
6 changes: 6 additions & 0 deletions apps/backend/subscription/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,9 @@ class SubscriptionIncludeGrayBizError(AppBaseException):
ERROR_CODE = 19
MESSAGE = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")
MESSAGE_TPL = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击")


class SubscriptionNotDeletedCantOperateError(AppBaseException):
ERROR_CODE = 20
MESSAGE = _("订阅未被删除,无法操作")
MESSAGE_TPL = _("订阅ID:{subscription_id}未被删除,无法进行清理操作,可增加参数is_force=true强制操作")
Loading