From ce5c4ed9a785bc32d64daad33c31b63057dcf859 Mon Sep 17 00:00:00 2001 From: dcd <1151627903@qq.com> Date: Wed, 13 Nov 2024 20:00:30 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BC=98=E5=8C=96=E8=AE=A2=E9=98=85?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=8C=89=E9=A1=BA=E5=BA=8F=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E8=80=8C=E9=9D=9E=E6=8A=9B=E9=94=99=20(closed=20#2447)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/backend/constants.py | 18 ++ .../schedule_running_subscription_task.py | 115 ++++++++++++ apps/backend/subscription/errors.py | 6 + apps/backend/subscription/handler.py | 132 ++++++++++++- apps/backend/subscription/serializers.py | 6 + apps/backend/subscription/views.py | 174 +++++++----------- ...test_schedule_running_subscription_task.py | 127 +++++++++++++ apps/backend/tests/subscription/test_views.py | 15 ++ apps/node_man/models.py | 2 + 9 files changed, 480 insertions(+), 115 deletions(-) create mode 100644 apps/backend/periodic_tasks/schedule_running_subscription_task.py create mode 100644 apps/backend/tests/periodic_tasks/test_schedule_running_subscription_task.py diff --git a/apps/backend/constants.py b/apps/backend/constants.py index e024154c6..1f2e37a79 100644 --- a/apps/backend/constants.py +++ b/apps/backend/constants.py @@ -129,6 +129,12 @@ def _get_member__alias_map(cls) -> Dict[Enum, str]: # redis Gse Agent 配置缓存 REDIS_AGENT_CONF_KEY_TPL = f"{settings.APP_CODE}:backend:agent:config:" + "{file_name}:str:{sub_inst_id}" +# 更新订阅参数储存redis键名模板 +UPDATE_SUBSCRIPTION_REDIS_KEY_TPL = f"{settings.APP_CODE}:backend:subscription:update_subscription:params" + +# 执行订阅参数储存redis键名模板 +RUN_SUBSCRIPTION_REDIS_KEY_TPL = f"{settings.APP_CODE}:backend:subscription:run_subscription:params" + class SubscriptionSwithBizAction(enum.EnhanceEnum): ENABLE = "enable" @@ -166,3 +172,15 @@ def needs_batch_request(self) -> bool: DEFAULT_CLEAN_RECORD_LIMIT = 5000 POWERSHELL_SERVICE_CHECK_SSHD = "powershell -c Get-Service -Name sshd" + +# 处理更新订阅任务间隔 +UPDATE_SUBSCRIPTION_TASK_INTERVAL = 2 * 60 + +# 处理执行订阅任务间隔 +RUN_SUBSCRIPTION_TASK_INTERVAL = 3 * 60 + +# 最大订阅任务数量 +MAX_SUBSCRIPTION_TASK_COUNT = 50 + +# 订阅删除时间 +SUBSCRIPTION_DELETE_DAYS = 1 diff --git a/apps/backend/periodic_tasks/schedule_running_subscription_task.py b/apps/backend/periodic_tasks/schedule_running_subscription_task.py new file mode 100644 index 000000000..47e090dad --- /dev/null +++ b/apps/backend/periodic_tasks/schedule_running_subscription_task.py @@ -0,0 +1,115 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import json +from typing import Any, Dict, List + +from celery.schedules import crontab +from celery.task import periodic_task +from django.db.models import QuerySet +from django.utils import timezone + +from apps.backend import constants +from apps.backend.subscription.handler import SubscriptionHandler +from apps.backend.utils.redis import REDIS_INST +from apps.node_man import models +from common.log import logger + + +def get_need_clean_subscription_app_code(): + """ + 获取配置需要清理的appcode + """ + app_codes: List[str] = models.GlobalSettings.get_config( + key=models.GlobalSettings.KeyEnum.NEED_CLEAN_SUBSCRIPTION_APP_CODE.value, default=[] + ) + return app_codes + + +@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"}) +def schedule_update_subscription(): + name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL + # 先计算出要从redis取数据的长度 + length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT) + # 从redis中取出对应长度的数据 + update_params: List[bytes] = REDIS_INST.lrange(name, -length, -1) + # 使用ltrim保留剩下的,可以保证redis中新push的值不会丢失 + REDIS_INST.ltrim("names", 0, -length - 1) + # 翻转数据,先进的数据先处理 + update_params.reverse() + results = [] + if not update_params: + return + for update_param in update_params: + # redis取出为bytes类型,需进行解码后转字典 + params = json.loads(update_param.decode()) + subscription_id = params["subscription_id"] + try: + result: Dict[str, int] = SubscriptionHandler.update_subscription(params=params) + except Exception as e: + logger.exception(f"{subscription_id} update scription failed with error: {e}") + result = {"subscription_id": subscription_id, "update_result": False} + results.append(result) + logger.info(f" update scription with results: {results}, length -> {len(results)} ") + + +@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"}) +def schedule_run_scription(): + name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL + length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT) + run_params: List[bytes] = REDIS_INST.lrange(name, -length, -1) + REDIS_INST.ltrim("names", 0, -length - 1) + run_params.reverse() + results = [] + if not run_params: + return + for run_param in run_params: + # redis取出为bytes类型,需进行解码后转字典 + params = json.loads(run_param.decode()) + subscription_id = params["subscription_id"] + scope = params["scope"] + actions = params["actions"] + try: + result: Dict[str, int] = SubscriptionHandler(subscription_id).run(scope=scope, actions=actions) + except Exception as e: + logger.exception(f"{subscription_id} run scription failed with error: {e}") + result = {"subscription_id": subscription_id, "run_result": False} + results.append(result) + logger.info(f"run subscription with results: {results}, length -> {len(results)}") + + +@periodic_task( + run_every=crontab(hour="3", minute="0", day_of_week="*", day_of_month="*", month_of_year="*"), + queue="default", + options={"queue": "default"}, +) +def clean_deleted_subscription(): + query_kwargs: Dict[str, Any] = { + "is_deleted": True, + "from_system": "bkmonitorv3", + "deleted_time__range": ( + timezone.now() - timezone.timedelta(days=constants.SUBSCRIPTION_DELETE_DAYS), + timezone.now(), + ), + } + + app_codes = get_need_clean_subscription_app_code() + if app_codes: + query_kwargs.pop("from_system") + query_kwargs["from_system__in"] = app_codes + + subscription_qs: QuerySet = models.Subscription.objects.filter(**query_kwargs) + if not subscription_qs.exists(): + # 没有需要更新的订阅 + return + + subscription_ids = list(subscription_qs.values_list("id", flat=True)) + subscription_qs.update(nodes=[], is_deleted=False, enable=True) + logger.info(f"set {subscription_ids} nodes be null and enable auto trigger, length -> {len(subscription_ids)}") diff --git a/apps/backend/subscription/errors.py b/apps/backend/subscription/errors.py index 93f432e0a..3df242ca3 100644 --- a/apps/backend/subscription/errors.py +++ b/apps/backend/subscription/errors.py @@ -169,3 +169,9 @@ class SubscriptionIncludeGrayBizError(AppBaseException): ERROR_CODE = 19 MESSAGE = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击") MESSAGE_TPL = _("订阅任务包含Gse2.0灰度业务,任务将暂缓执行无需重复点击") + + +class SubscriptionNotDeletedCantOperateError(AppBaseException): + ERROR_CODE = 20 + MESSAGE = _("订阅未被删除,无法操作") + MESSAGE_TPL = _("订阅ID:{subscription_id}未被删除,无法进行清理操作,可增加参数is_force=true强制操作") diff --git a/apps/backend/subscription/handler.py b/apps/backend/subscription/handler.py index 4d9278054..80a49bf57 100644 --- a/apps/backend/subscription/handler.py +++ b/apps/backend/subscription/handler.py @@ -10,6 +10,7 @@ """ from __future__ import absolute_import, unicode_literals +import json import logging import random from collections import Counter, defaultdict @@ -18,13 +19,15 @@ from django.conf import settings from django.core.cache import cache +from django.db import transaction from django.db.models import Max, Q, QuerySet, Value from django.utils.translation import get_language from django.utils.translation import ugettext as _ +from apps.backend import constants as backend_constants from apps.backend.subscription import errors, task_tools, tasks, tools -from apps.backend.subscription.errors import InstanceTaskIsRunning from apps.backend.utils.pipeline_parser import PipelineParser +from apps.backend.utils.redis import REDIS_INST from apps.core.concurrent import controller from apps.node_man import constants, models from apps.utils import concurrent @@ -432,10 +435,6 @@ def run(self, scope: Dict = None, actions: Dict[str, str] = None) -> Dict[str, i subscription = models.Subscription.objects.get(id=self.subscription_id) except models.Subscription.DoesNotExist: raise errors.SubscriptionNotExist({"subscription_id": self.subscription_id}) - - if subscription.is_running(): - raise InstanceTaskIsRunning() - if tools.check_subscription_is_disabled( subscription_identity=f"subscription -> [{subscription.id}]", scope=subscription.scope, @@ -443,6 +442,12 @@ def run(self, scope: Dict = None, actions: Dict[str, str] = None) -> Dict[str, i ): raise errors.SubscriptionIncludeGrayBizError() + if subscription.is_running(): + params = json.dumps({"subscription_id": subscription.id, "scope": scope, "actions": actions}) + REDIS_INST.lpush(backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL, params) + logger.info(f"run subscription[{subscription.id}] store params into redis: {params}") + return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")} + subscription_task = models.SubscriptionTask.objects.create( subscription_id=subscription.id, scope=subscription.scope, actions={} ) @@ -670,3 +675,120 @@ def instance_status(subscription_id_list: List[int], show_task_detail: bool) -> result.append({"subscription_id": subscription.id, "instances": subscription_result}) return result + + def clean_subscription(self, execute_actions: Dict[str, str]): + """ + :param execute_actions: {"bk-beat": "STOP", "exporter": "STOP"} + """ + try: + # 3.调用执行订阅的方法 + result = self.run(actions=execute_actions) + except Exception as e: + result = {"result": False, "message": str(e)} + # 4.删除订阅,使用delete()方法才会记录删除时间 + models.Subscription.objects.filter(id=self.subscription_id).delete() + return result + + @staticmethod + def update_subscription(params: Dict[str, Any]): + scope = params["scope"] + try: + subscription = models.Subscription.objects.get(id=params["subscription_id"], is_deleted=False) + except models.Subscription.DoesNotExist: + raise errors.SubscriptionNotExist({"subscription_id": params["subscription_id"]}) + # 更新订阅不在序列化器中做校验,因为获取更新订阅的类型 step 需要查一次表 + if tools.check_subscription_is_disabled( + subscription_identity=f"subscription -> [{subscription.id}]", + steps=subscription.steps, + scope=scope, + ): + raise errors.SubscriptionIncludeGrayBizError() + if subscription.is_running(): + REDIS_INST.lpush(backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL, json.dumps(params)) + logger.info(f"update subscription[{subscription.id}] store params into redis: {params}") + return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")} + + with transaction.atomic(): + subscription.name = params.get("name", "") + subscription.node_type = scope["node_type"] + subscription.nodes = scope["nodes"] + subscription.bk_biz_id = scope.get("bk_biz_id") + # 避免空列表误判 + if scope.get("instance_selector") is not None: + subscription.instance_selector = scope["instance_selector"] + # 策略部署新增 + subscription.plugin_name = params.get("plugin_name") + subscription.bk_biz_scope = params.get("bk_biz_scope") + # 指定操作进程用户新增 + if params.get("system_account"): + params["operate_info"].insert(0, params["system_account"]) + subscription.operate_info = params["operate_info"] + subscription.save() + + step_ids: Set[str] = set() + step_id__obj_map: Dict[str, models.SubscriptionStep] = { + step_obj.step_id: step_obj for step_obj in subscription.steps + } + step_objs_to_be_created: List[models.SubscriptionStep] = [] + step_objs_to_be_updated: List[models.SubscriptionStep] = [] + + for index, step_info in enumerate(params["steps"]): + + if step_info["id"] in step_id__obj_map: + # 存在则更新 + step_obj: models.SubscriptionStep = step_id__obj_map[step_info["id"]] + step_obj.params = step_info["params"] + if "config" in step_info: + step_obj.config = step_info["config"] + step_obj.index = index + step_objs_to_be_updated.append(step_obj) + else: + # 新增场景 + try: + step_obj_to_be_created: models.SubscriptionStep = models.SubscriptionStep( + subscription_id=subscription.id, + index=index, + step_id=step_info["id"], + type=step_info["type"], + config=step_info["config"], + params=step_info["params"], + ) + except KeyError as e: + logger.warning( + f"update subscription[{subscription.id}] to add step[{step_info['id']}] error: " + f"err_msg -> {e}" + ) + raise errors.SubscriptionUpdateError( + { + "subscription_id": subscription.id, + "msg": _("新增订阅步骤[{step_id}] 需要提供 type & config,错误信息 -> {err_msg}").format( + step_id=step_info["id"], err_msg=e + ), + } + ) + step_objs_to_be_created.append(step_obj_to_be_created) + step_ids.add(step_info["id"]) + + # 删除更新后不存在的 step + models.SubscriptionStep.objects.filter( + subscription_id=subscription.id, step_id__in=set(step_id__obj_map.keys()) - step_ids + ).delete() + models.SubscriptionStep.objects.bulk_update(step_objs_to_be_updated, fields=["config", "params", "index"]) + models.SubscriptionStep.objects.bulk_create(step_objs_to_be_created) + # 更新 steps 需要移除缓存 + if hasattr(subscription, "_steps"): + delattr(subscription, "_steps") + + result = {"subscription_id": subscription.id} + + run_immediately = params["run_immediately"] + if run_immediately: + subscription_task = models.SubscriptionTask.objects.create( + subscription_id=subscription.id, scope=subscription.scope, actions={} + ) + tasks.run_subscription_task_and_create_instance.delay( + subscription, subscription_task, language=get_language() + ) + result["task_id"] = subscription_task.id + + return result diff --git a/apps/backend/subscription/serializers.py b/apps/backend/subscription/serializers.py index a79526645..053db9f87 100644 --- a/apps/backend/subscription/serializers.py +++ b/apps/backend/subscription/serializers.py @@ -284,3 +284,9 @@ class QueryHostSubscriptionsSerializer(TargetHostSerializer): class SubscriptionSwitchBizSerializer(serializers.Serializer): bk_biz_ids = serializers.ListField(child=serializers.IntegerField()) action = serializers.ChoiceField(choices=SubscriptionSwithBizAction.list_choices()) + + +class ClearnSubscriptionSerializer(serializers.Serializer): + subscription_id_list = serializers.ListField(required=True, label=_("订阅ID列表"), child=serializers.IntegerField()) + action_type = serializers.ChoiceField(choices=constants.OpType, default="STOP", label=_("执行动作类型")) + is_force = serializers.BooleanField(default=False, label=_("是否强制清理")) diff --git a/apps/backend/subscription/views.py b/apps/backend/subscription/views.py index 2b8ac6d74..8ca57e5a5 100644 --- a/apps/backend/subscription/views.py +++ b/apps/backend/subscription/views.py @@ -12,15 +12,15 @@ import logging import operator +from collections import defaultdict from dataclasses import asdict from functools import cmp_to_key, reduce -from typing import Any, Dict, List, Set +from typing import Any, Dict, List from django.core.cache import caches from django.db import transaction from django.db.models import Q, Value from django.utils.translation import get_language -from django.utils.translation import gettext_lazy as _ from drf_yasg.utils import swagger_auto_schema from rest_framework import status from rest_framework.decorators import action @@ -189,107 +189,8 @@ def update_subscription(self, request): @apiName update_subscription @apiGroup subscription """ - params = self.validated_data - scope = params["scope"] - run_immediately = params["run_immediately"] - with transaction.atomic(): - try: - subscription = models.Subscription.objects.get(id=params["subscription_id"], is_deleted=False) - except models.Subscription.DoesNotExist: - raise errors.SubscriptionNotExist({"subscription_id": params["subscription_id"]}) - # 更新订阅不在序列化器中做校验,因为获取更新订阅的类型 step 需要查一次表 - if tools.check_subscription_is_disabled( - subscription_identity=f"subscription -> [{subscription.id}]", - steps=subscription.steps, - scope=scope, - ): - raise errors.SubscriptionIncludeGrayBizError() - - subscription.name = params.get("name", "") - subscription.node_type = scope["node_type"] - subscription.nodes = scope["nodes"] - subscription.bk_biz_id = scope.get("bk_biz_id") - # 避免空列表误判 - if scope.get("instance_selector") is not None: - subscription.instance_selector = scope["instance_selector"] - # 策略部署新增 - subscription.plugin_name = params.get("plugin_name") - subscription.bk_biz_scope = params.get("bk_biz_scope") - # 指定操作进程用户新增 - if params.get("system_account"): - params["operate_info"].insert(0, params["system_account"]) - subscription.operate_info = params["operate_info"] - subscription.save() - - step_ids: Set[str] = set() - step_id__obj_map: Dict[str, models.SubscriptionStep] = { - step_obj.step_id: step_obj for step_obj in subscription.steps - } - step_objs_to_be_created: List[models.SubscriptionStep] = [] - step_objs_to_be_updated: List[models.SubscriptionStep] = [] - - for index, step_info in enumerate(params["steps"]): - - if step_info["id"] in step_id__obj_map: - # 存在则更新 - step_obj: models.SubscriptionStep = step_id__obj_map[step_info["id"]] - step_obj.params = step_info["params"] - if "config" in step_info: - step_obj.config = step_info["config"] - step_obj.index = index - step_objs_to_be_updated.append(step_obj) - else: - # 新增场景 - try: - step_obj_to_be_created: models.SubscriptionStep = models.SubscriptionStep( - subscription_id=subscription.id, - index=index, - step_id=step_info["id"], - type=step_info["type"], - config=step_info["config"], - params=step_info["params"], - ) - except KeyError as e: - logger.warning( - f"update subscription[{subscription.id}] to add step[{step_info['id']}] error: " - f"err_msg -> {e}" - ) - raise errors.SubscriptionUpdateError( - { - "subscription_id": subscription.id, - "msg": _("新增订阅步骤[{step_id}] 需要提供 type & config,错误信息 -> {err_msg}").format( - step_id=step_info["id"], err_msg=e - ), - } - ) - step_objs_to_be_created.append(step_obj_to_be_created) - step_ids.add(step_info["id"]) - - # 删除更新后不存在的 step - models.SubscriptionStep.objects.filter( - subscription_id=subscription.id, step_id__in=set(step_id__obj_map.keys()) - step_ids - ).delete() - models.SubscriptionStep.objects.bulk_update(step_objs_to_be_updated, fields=["config", "params", "index"]) - models.SubscriptionStep.objects.bulk_create(step_objs_to_be_created) - # 更新 steps 需要移除缓存 - if hasattr(subscription, "_steps"): - delattr(subscription, "_steps") - - result = {"subscription_id": subscription.id} - - if run_immediately: - if subscription.is_running(): - raise InstanceTaskIsRunning() - - subscription_task = models.SubscriptionTask.objects.create( - subscription_id=subscription.id, scope=subscription.scope, actions={} - ) - tasks.run_subscription_task_and_create_instance.delay( - subscription, subscription_task, language=get_language() - ) - result["task_id"] = subscription_task.id - - return Response(result) + params: Dict[str, Any] = self.validated_data + return Response(SubscriptionHandler.update_subscription(params)) @swagger_auto_schema( operation_summary="删除订阅", @@ -306,13 +207,14 @@ def delete_subscription(self, request): @apiGroup subscription """ params = self.validated_data - try: - subscription = models.Subscription.objects.get(id=params["subscription_id"], is_deleted=False) - except models.Subscription.DoesNotExist: - raise errors.SubscriptionNotExist({"subscription_id": params["subscription_id"]}) - subscription.is_deleted = True - subscription.save() - return Response() + subscription_id = params["subscription_id"] + subscription_qs = models.Subscription.objects.filter(id=subscription_id, is_deleted=False) + if not subscription_qs.exists(): + raise errors.SubscriptionNotExist({"subscription_id": subscription_id}) + # 调用delete()方法才会记录删除时间 + subscription_qs.delete() + logger.info(f"deleted subscription: {subscription_id}") + return Response({"deleted_subscription_id": subscription_id}) @swagger_auto_schema( operation_summary="执行订阅", @@ -969,3 +871,55 @@ def switch_biz(self, request): value=list(set(disable_subscription_bk_biz_ids + data["bk_biz_ids"])), ) return Response(data) + + @swagger_auto_schema(operation_summary="清除野/遗留订阅", tags=SUBSCRIPTION_VIEW_TAGS) + @action(detail=False, methods=["POST"], serializer_class=serializers.ClearnSubscriptionSerializer) + def clean_subscription(self, request): + """ + @api {POST} /subscription/clean_subscription/ 清除野/遗留订阅 + @apiName clean_subscription + @apiGroup subscription + """ + validated_data = self.validated_data + is_force: bool = validated_data["is_force"] + action_type = validated_data["action_type"] + subscription_ids = set(validated_data["subscription_id_list"]) + + # 如果不是强制清理,需要判断订阅是不是已被删除了,已删除的才允许操作 + if not is_force: + # 先查一次确认是否为遗留的订阅配置。如果订阅ID还存在,则不允许此操作 + exist_subscription_ids = set( + models.Subscription.objects.filter(id__in=subscription_ids).values_list("id", flat=True) + ) + if exist_subscription_ids: + raise errors.SubscriptionNotDeletedCantOperateError({"subscription_id": exist_subscription_ids}) + # 1.修改订阅配置,把删除状态更新成未删除,同时enable改成不启动 + models.Subscription.objects.filter(id__in=subscription_ids, show_deleted=True).update( + enable=False, is_deleted=False + ) + final_handle_subscription_ids = set( + models.Subscription.objects.filter(id__in=subscription_ids).values_list("id", flat=True) + ) + not_exists_subscription_ids = list(subscription_ids - final_handle_subscription_ids) + if not_exists_subscription_ids: + raise errors.SubscriptionNotExist({"subscription_id": not_exists_subscription_ids}) + # 2.获取订阅步骤 + step_qs = models.SubscriptionStep.objects.filter(subscription_id__in=final_handle_subscription_ids).values( + "subscription_id", "step_id" + ) + subscription_id__step_id_map = defaultdict(list) + for step in step_qs: + subscription_id__step_id_map[step["subscription_id"]].append(step["step_id"]) + + results = [] + for subscription_id in final_handle_subscription_ids: + step_ids = subscription_id__step_id_map[subscription_id] + # 拼接动作参数,默认仅停用,不删除文件 + execute_actions = {step_id: action_type for step_id in step_ids} + result = SubscriptionHandler(subscription_id).clean_subscription(execute_actions) + results.append(result) + logger.info( + f"clean subscription result: {results}, deleted subscription: {final_handle_subscription_ids}," + f"length: {len(final_handle_subscription_ids)}" + ) + return Response(results) diff --git a/apps/backend/tests/periodic_tasks/test_schedule_running_subscription_task.py b/apps/backend/tests/periodic_tasks/test_schedule_running_subscription_task.py new file mode 100644 index 000000000..32e5b4cf0 --- /dev/null +++ b/apps/backend/tests/periodic_tasks/test_schedule_running_subscription_task.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +""" +TencentBlueKing is pleased to support the open source community by making 蓝鲸智云-节点管理(BlueKing-BK-NODEMAN) available. +Copyright (C) 2017-2022 THL A29 Limited, a Tencent company. All rights reserved. +Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at https://opensource.org/licenses/MIT +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on +an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the +specific language governing permissions and limitations under the License. +""" +import json + +from apps.backend import constants +from apps.backend.periodic_tasks.schedule_running_subscription_task import ( + clean_deleted_subscription, + schedule_run_scription, + schedule_update_subscription, +) +from apps.backend.subscription.handler import SubscriptionHandler +from apps.backend.tests.components.collections.plugin import utils +from apps.backend.utils.redis import REDIS_INST +from apps.node_man import models +from apps.utils.unittest.testcase import CustomBaseTestCase + + +class CreatePreData(CustomBaseTestCase): + def setUp(self) -> None: + super().setUp() + self.init_db() + + def init_db(self): + self.ids = utils.PluginTestObjFactory.init_db() + self.COMMON_INPUTS = utils.PluginTestObjFactory.inputs( + attr_values={ + "description": "description", + "bk_host_id": utils.BK_HOST_ID, + "subscription_instance_ids": [self.ids["subscription_instance_record_id"]], + "subscription_step_id": self.ids["subscription_step_id"], + }, + # 主机信息保持和默认一致 + instance_info_attr_values={}, + ) + + +class TestScheduleRunningSubscriptionTask(CreatePreData): + def setUp(self) -> None: + super().setUp() + models.SubscriptionInstanceRecord.objects.filter(id=self.ids["subscription_instance_record_id"]).update( + status="RUNNING" + ) + SubscriptionHandler(self.ids["subscription_id"]).run() + + def test_schedule_running_subscription_task(self): + name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL + length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT) + run_params = REDIS_INST.lrange(name, -length, -1) + self.assertEqual( + json.loads(run_params[0].decode()), + {"subscription_id": self.ids["subscription_id"], "scope": None, "actions": None}, + ) + # 模拟之前的订阅任务跑完,调度订阅任务执行 + models.SubscriptionInstanceRecord.objects.filter(id=self.ids["subscription_instance_record_id"]).update( + status="SUCCESS" + ) + # 执行订阅后会创建一个订阅任务 + schedule_run_scription() + num = models.SubscriptionTask.objects.filter(subscription_id=self.ids["subscription_id"]).count() + self.assertEqual(num, 2) + + +class TestScheduleUpdateSubscriptionTask(CreatePreData): + def setUp(self) -> None: + super().setUp() + scope = { + "bk_biz_id": 1, + "node_type": "HOST", + "nodes": [{"ip": None, "bk_host_id": 79}], + "need_register": False, + "instance_selector": None, + } + steps = [ + { + "id": 1, + "type": "PLUGIN", + "config": {"plugin_name": "test_plugin", "plugin_version": "1.0.0"}, + "params": {}, + } + ] + self.params = { + "subscription_id": self.ids["subscription_id"], + "scope": scope, + "steps": steps, + "operate_info": [], + "bk_biz_scope": [], + "run_immediately": True, + } + models.SubscriptionInstanceRecord.objects.filter(id=self.ids["subscription_instance_record_id"]).update( + status="RUNNING" + ) + SubscriptionHandler.update_subscription(params=self.params) + + def test_schedule_update_subscription_task(self): + name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL + length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT) + run_params = REDIS_INST.lrange(name, -length, -1) + self.assertEqual(json.loads(run_params[0].decode()), self.params) + models.SubscriptionInstanceRecord.objects.filter( + id=self.ids["subscription_instance_record_id"], subscription_id=self.ids["subscription_id"] + ).update(status="SUCCESS") + schedule_update_subscription() + num = models.SubscriptionTask.objects.filter(subscription_id=self.ids["subscription_id"]).count() + self.assertEqual(num, 2) + + +class TestCleanDeletedSubscriptionTask(CreatePreData): + def setUp(self) -> None: + super().setUp() + models.Subscription.objects.filter(id=self.ids["subscription_id"]).update(from_system="bkmonitorv3") + models.Subscription.objects.filter(id=self.ids["subscription_id"]).delete() + + def test_clean_subscription_task(self): + # d调度清理任务,将nodes设置为空列表,并且启用订阅巡检 + clean_deleted_subscription() + subscription = models.Subscription.objects.get(id=self.ids["subscription_id"]) + self.assertEqual(subscription.nodes, []) + self.assertEqual(subscription.enable, True) + self.assertEqual(subscription.is_deleted, False) diff --git a/apps/backend/tests/subscription/test_views.py b/apps/backend/tests/subscription/test_views.py index b330f4353..2df30b86c 100644 --- a/apps/backend/tests/subscription/test_views.py +++ b/apps/backend/tests/subscription/test_views.py @@ -518,3 +518,18 @@ def test_query_host_subscriptions(self): v6_ip_r = self.client.get(url, dict(request_params, **{"ip": host.inner_ipv6})) for resp in [host_innerip_r, v4_ip_r, v6_ip_r, host_id_r]: self.assertEqual(json.loads(str(resp.content, "utf-8"))["data"][0]["id"], proc.id) + + def test_clean_subscription(self): + subscription_id = self._test_create_subscription() + r = self.client.post( + path="/backend/api/subscription/clean_subscription/", + content_type="application/json", + data=json.dumps({"subscription_id_list": [subscription_id], "is_force": True}), + ) + result = r.data["data"][0] + self.assertEqual(r.status_code, 200) + self.assertIn("task_id", result) + self.assertIn("subscription_id", result) + # 校验软删 + num = Subscription.objects.filter(id=subscription_id, is_deleted=True).count() + self.assertEqual(num, 1) diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 7c9a84ddb..8dc074de9 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -168,6 +168,8 @@ class KeyEnum(Enum): AUTO_SELECT_INSTALL_CHANNEL_ONLY_DIRECT_AREA = "AUTO_SELECT_INSTALL_CHANNEL_ONLY_DIRECT_AREA" # 安装通道ID与网段列表映射 INSTALL_CHANNEL_ID_NETWORK_SEGMENT = "INSTALL_CHANNEL_ID_NETWORK_SEGMENT" + # 需要执行清理订阅的APP_CODE + NEED_CLEAN_SUBSCRIPTION_APP_CODE = "NEED_CLEAN_SUBSCRIPTION_APP_CODE" key = models.CharField(_("键"), max_length=255, db_index=True, primary_key=True) v_json = JSONField(_("值"))