From 38fccd2c50108edfb8ccad987a7a8b42fff4f694 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?v=5Fdcdding=28=E4=B8=81=E8=B6=85=E8=BE=BE=29?= <1151627903@qq.com> Date: Mon, 1 Apr 2024 20:28:32 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E8=AE=A2=E9=98=85=E5=B7=A1=E6=A3=80?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E6=A0=B9=E6=8D=AE=E4=B8=9A=E5=8A=A1=E5=88=86?= =?UTF-8?q?=E5=8F=91=E4=BB=BB=E5=8A=A1=E5=88=B0=E4=B8=8D=E5=90=8C=E9=98=9F?= =?UTF-8?q?=E5=88=97=20(closed=20#2061)=20#=20Reviewed,=20transaction=20id?= =?UTF-8?q?:=206119?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../update_subscription_instances.py | 20 +++++++++++++++---- apps/backend/subscription/tools.py | 18 +++++++++++++++++ apps/node_man/models.py | 2 ++ .../periodic_tasks/resource_watch_task.py | 10 +++++++++- 4 files changed, 45 insertions(+), 5 deletions(-) diff --git a/apps/backend/periodic_tasks/update_subscription_instances.py b/apps/backend/periodic_tasks/update_subscription_instances.py index ce2cfec3c..f1302ec96 100644 --- a/apps/backend/periodic_tasks/update_subscription_instances.py +++ b/apps/backend/periodic_tasks/update_subscription_instances.py @@ -2,9 +2,14 @@ import logging from celery.task import periodic_task +from django.db.models import Value from apps.backend.subscription.constants import SUBSCRIPTION_UPDATE_INTERVAL from apps.backend.subscription.tasks import update_subscription_instances_chunk +from apps.backend.subscription.tools import ( + by_biz_dispatch_task_queue, + get_biz_ids_gby_queue, +) from apps.node_man import models from apps.utils.periodic_task import calculate_countdown @@ -24,12 +29,19 @@ def update_subscription_instances(): # 关闭订阅自动巡检 return - subscription_ids = list( - models.Subscription.objects.filter(enable=True, is_deleted=False).values_list("id", flat=True) + subscriptions = models.Subscription.objects.filter(enable=Value(1), is_deleted=Value(0)).values( + "id", "bk_biz_id", "bk_biz_scope" ) + subscription_ids = [subscription["id"] for subscription in subscriptions] + subscription_id__biz_ids_map = { + subscription["id"]: subscription["bk_biz_scope"] + [subscription["bk_biz_id"]] for subscription in subscriptions + } + biz_ids_gby_queue = get_biz_ids_gby_queue() + count = len(subscription_ids) for index, subscription_id in enumerate(subscription_ids): # 把订阅平均分布到10分钟内执行,用于削峰 countdown = calculate_countdown(count=count, index=index, duration=SUBSCRIPTION_UPDATE_INTERVAL) - logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds.") - update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown) + task_queue = by_biz_dispatch_task_queue(biz_ids_gby_queue, subscription_id__biz_ids_map[subscription_id]) + logger.info(f"subscription({subscription_id}) will be run after {countdown} seconds in queue ({task_queue}).") + update_subscription_instances_chunk.apply_async(([subscription_id],), countdown=countdown, queue=task_queue) diff --git a/apps/backend/subscription/tools.py b/apps/backend/subscription/tools.py index 07f6e2f63..07af8c6c7 100644 --- a/apps/backend/subscription/tools.py +++ b/apps/backend/subscription/tools.py @@ -1512,3 +1512,21 @@ def check_subscription_is_disabled( logger.info(f"[check_subscription_is_disabled] {subscription_identity}: not in the disable list, skipping") return False + + +def get_biz_ids_gby_queue(): + """返回任务队列与业务ID列表的映射""" + biz_ids_gby_queue: Dict[str, List[int]] = models.GlobalSettings.get_config( + key=models.GlobalSettings.KeyEnum.SUBSCRIPTION_UPDATE_TASK_QUEUE.value, default={} + ) + return biz_ids_gby_queue + + +def by_biz_dispatch_task_queue(biz_ids_gby_queue: Dict[str, List[int]], bk_biz_ids: List[Union[int, None]]): + """通过业务ID列表分配任务队列""" + default_task_queue: str = "backend_additional_task" + for task_queue, partial_biz_ids in biz_ids_gby_queue.items(): + if set(partial_biz_ids) & set(bk_biz_ids): + return task_queue + + return default_task_queue diff --git a/apps/node_man/models.py b/apps/node_man/models.py index 2f4c701a2..f579639a2 100644 --- a/apps/node_man/models.py +++ b/apps/node_man/models.py @@ -154,6 +154,8 @@ class KeyEnum(Enum): ENABLE_NOTICE_CENTER = "ENABLE_NOTICE_CENTER" # 禁用已停用插件 DISABLE_STOPPED_PLUGIN = "DISABLE_STOPPED_PLUGIN" + # 根据订阅分配任务队列 + SUBSCRIPTION_UPDATE_TASK_QUEUE = "SUBSCRIPTION_UPDATE_TASK_QUEUE" key = models.CharField(_("键"), max_length=255, db_index=True, primary_key=True) v_json = JSONField(_("值")) diff --git a/apps/node_man/periodic_tasks/resource_watch_task.py b/apps/node_man/periodic_tasks/resource_watch_task.py index 4d61b09b3..ef92e6c6f 100644 --- a/apps/node_man/periodic_tasks/resource_watch_task.py +++ b/apps/node_man/periodic_tasks/resource_watch_task.py @@ -19,6 +19,10 @@ from django.db.models import Q from django.db.utils import IntegrityError +from apps.backend.subscription.tools import ( + by_biz_dispatch_task_queue, + get_biz_ids_gby_queue, +) from apps.component.esbclient import client_v2 from apps.node_man import constants from apps.node_man.models import GlobalSettings, Host, ResourceWatchEvent, Subscription @@ -427,11 +431,15 @@ def trigger_nodeman_subscription(bk_biz_id, debounce_time=0): method="subscription", bk_biz_id=bk_biz_id, debounce_time=debounce_time ).inc() + biz_ids_gby_queue = get_biz_ids_gby_queue() + task_queue: str = by_biz_dispatch_task_queue(biz_ids_gby_queue, [bk_biz_id]) + update_subscription_instances_chunk.apply_async( - kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time + kwargs={"subscription_ids": subscription_ids}, countdown=debounce_time, queue=task_queue ) logger.info( f"[trigger_nodeman_subscription] following subscriptions " f"will be run -> ({subscription_ids}) after {debounce_time} s" + f" in queue -> ({task_queue})" )