Skip to content

Commit

Permalink
fix: 适配编排更新订阅任务兜底 (closed #2506)
Browse files Browse the repository at this point in the history
  • Loading branch information
Huayeaaa committed Dec 13, 2024
1 parent c9e28ad commit 63f841b
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 26 deletions.
6 changes: 4 additions & 2 deletions apps/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ def needs_batch_request(self) -> bool:
# 处理卸载残留订阅任务间隔
HANDLE_UNINSTALL_REST_SUBSCRIPTION_TASK_INTERVAL = 6 * 60 * 60

# 最大订阅任务数量
MAX_SUBSCRIPTION_TASK_COUNT = 50
# 最大更新订阅任务储存数量
MAX_STORE_SUBSCRIPTION_TASK_COUNT = 1000
# 最大执行订阅任务数量
MAX_RUN_SUBSCRIPTION_TASK_COUNT = 50

# 订阅删除时间小时数
SUBSCRIPTION_DELETE_HOURS = 6
1 change: 1 addition & 0 deletions apps/backend/periodic_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
from .clean_sub_data import clean_sub_data_task # noqa
from .clean_subscription_data import clean_subscription_data # noqa
from .collect_auto_trigger_job import collect_auto_trigger_job # noqa
from .schedule_running_subscription_task import * # noqa
from .update_subscription_instances import update_subscription_instances # noqa
23 changes: 18 additions & 5 deletions apps/backend/periodic_tasks/check_zombie_sub_inst_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

from apps.backend.subscription.constants import CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL
from apps.backend.subscription.constants import (
CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL,
ZOMBIE_SUB_INST_RECORD_COUNT,
)
from apps.node_man import constants, models
from apps.utils.time_handler import strftime_local

Expand Down Expand Up @@ -48,10 +51,20 @@ def check_zombie_sub_inst_record():
"status__in": [constants.JobStatusType.PENDING, constants.JobStatusType.RUNNING],
}
base_update_kwargs = {"status": constants.JobStatusType.FAILED, "update_time": timezone.now()}

forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs).update(
**base_update_kwargs
)
# 先count确认是否需要update,如果count数量小于100传主键 update,否则继续沿用现在的方式
subscription_instance_record_qs = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs)
if not subscription_instance_record_qs.exists():
logger.info("no zombie_sub_inst_record skipped")
return
if subscription_instance_record_qs.count() < ZOMBIE_SUB_INST_RECORD_COUNT:
forced_failed_inst_record_ids = set(subscription_instance_record_qs.values_list("id", flat=True))
forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(
id__in=forced_failed_inst_record_ids
).update(**base_update_kwargs)
else:
forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs).update(
**base_update_kwargs
)

forced_failed_status_detail_num = models.SubscriptionInstanceStatusDetail.objects.filter(**query_kwargs).update(
**base_update_kwargs,
Expand Down
16 changes: 6 additions & 10 deletions apps/backend/periodic_tasks/schedule_running_subscription_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,14 @@ def get_need_clean_subscription_app_code():
@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_update_subscription():
name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
# 先计算出要从redis取数据的长度
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
# 从redis中取出对应长度的数据
update_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
# 使用ltrim保留剩下的,可以保证redis中新push的值不会丢失
REDIS_INST.ltrim(name, 0, -length - 1)
# 翻转数据,先进的数据先处理
update_params.reverse()
# 取出该hashset中所有的参数
update_params: Dict[str, bytes] = REDIS_INST.hgetall(name=name)
# 删除该hashset内的所有参数
REDIS_INST.delete(name)
results = []
if not update_params:
return
for update_param in update_params:
for update_param in update_params.values():
# redis取出为bytes类型,需进行解码后转字典
params = json.loads(update_param.decode())
subscription_id = params["subscription_id"]
Expand All @@ -64,7 +60,7 @@ def schedule_update_subscription():
@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_run_subscription():
name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
length: int = min(REDIS_INST.llen(name), constants.MAX_RUN_SUBSCRIPTION_TASK_COUNT)
run_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
REDIS_INST.ltrim(name, 0, -length - 1)
run_params.reverse()
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/subscription/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

# 检查僵尸订阅实例记录周期
CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL = 15 * constants.TimeUnit.MINUTE
# 僵尸订阅实例记录数量
ZOMBIE_SUB_INST_RECORD_COUNT = 100

# 任务超时时间。距离 create_time 多久后会被判定为超时,防止 pipeline 后台僵死的情况
TASK_TIMEOUT = 15 * constants.TimeUnit.MINUTE
Expand Down
21 changes: 18 additions & 3 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,16 @@ def run(self, scope: Dict = None, actions: Dict[str, str] = None) -> Dict[str, i
raise errors.SubscriptionIncludeGrayBizError()

if subscription.is_running():
# 这里仍使用lpush的原因在于订阅任务可能执行的动作不一样,不能使用更新
name = backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
if REDIS_INST.llen(name) > backend_constants.MAX_STORE_SUBSCRIPTION_TASK_COUNT:
logger.info("redis list store params is full")
return {
"subscription_id": subscription.id,
"message": _("该订阅ID下有正在RUNNING的订阅任务,且任务编排数量已达阈值,请稍后再试,如造成不便,请联系管理员处理"),
}
params = json.dumps({"subscription_id": subscription.id, "scope": scope, "actions": actions})
REDIS_INST.lpush(backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL, params)
REDIS_INST.lpush(name, params)
logger.info(f"run subscription[{subscription.id}] store params into redis: {params}")
return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")}

Expand Down Expand Up @@ -704,8 +712,15 @@ def update_subscription(params: Dict[str, Any]):
):
raise errors.SubscriptionIncludeGrayBizError()
if subscription.is_running():
REDIS_INST.lpush(backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL, json.dumps(params))
logger.info(f"update subscription[{subscription.id}] store params into redis: {params}")
name = backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
if REDIS_INST.hlen(name=name) > backend_constants.MAX_STORE_SUBSCRIPTION_TASK_COUNT:
logger.info("redis hashset store params is full")
return {
"subscription_id": subscription.id,
"message": _("该订阅ID下有正在RUNNING的订阅任务,且任务编排数量已达阈值,请稍后再试,如造成不便,请联系管理员处理"),
}
REDIS_INST.hset(name, key=f"subscription_id_{subscription.id}", value=json.dumps(params))
logger.info(f"update subscription[{subscription.id}] store or update params into redis: {params}")
return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")}

with transaction.atomic():
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/subscription/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def delete_subscription(self, request):
raise errors.SubscriptionNotExist({"subscription_id": subscription_id})
# 调用delete()方法才会记录删除时间
subscription_qs.delete()
logger.info(f"deleted subscription: {subscription_id}")
logger.info(f"deleted_subscription_id: {subscription_id}")
return Response({"deleted_subscription_id": subscription_id})

@swagger_auto_schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def setUp(self) -> None:

def test_schedule_running_subscription_task(self):
name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
length: int = min(REDIS_INST.llen(name), constants.MAX_RUN_SUBSCRIPTION_TASK_COUNT)
run_params = REDIS_INST.lrange(name, -length, -1)
self.assertEqual(
json.loads(run_params[0].decode()),
Expand Down Expand Up @@ -102,9 +102,10 @@ def setUp(self) -> None:

def test_schedule_update_subscription_task(self):
name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
run_params = REDIS_INST.lrange(name, -length, -1)
self.assertEqual(json.loads(run_params[0].decode()), self.params)

update_params = REDIS_INST.hgetall(name=name)
for update_param in update_params.values():
self.assertEqual(json.loads(update_param.decode()), self.params)
models.SubscriptionInstanceRecord.objects.filter(
id=self.ids["subscription_instance_record_id"], subscription_id=self.ids["subscription_id"]
).update(status="SUCCESS")
Expand Down
8 changes: 7 additions & 1 deletion apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import uuid
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta
from distutils.dir_util import copy_tree
from enum import Enum
from functools import cmp_to_key, reduce
Expand Down Expand Up @@ -1928,7 +1929,12 @@ def get_subscription(cls, subscription_id: int, show_deleted=False):

def is_running(self, instance_id_list: List[str] = None):
"""订阅下是否有运行中的任务"""
base_kwargs = {"subscription_id": self.id, "is_latest": True}
# 只需检查近两小时内的订阅实例
base_kwargs = {
"subscription_id": self.id,
"is_latest": True,
"update_time__gte": timezone.now() - timedelta(hours=2),
}
if instance_id_list is not None:
base_kwargs["instance_id__in"] = instance_id_list
status_set = set(SubscriptionInstanceRecord.objects.filter(**base_kwargs).values_list("status", flat=True))
Expand Down

0 comments on commit 63f841b

Please sign in to comment.