Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: 适配编排更新订阅任务兜底 (closed #2506) #2513

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions apps/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,10 @@ def needs_batch_request(self) -> bool:
# 处理卸载残留订阅任务间隔
HANDLE_UNINSTALL_REST_SUBSCRIPTION_TASK_INTERVAL = 6 * 60 * 60

# 最大订阅任务数量
MAX_SUBSCRIPTION_TASK_COUNT = 50
# 最大更新订阅任务储存数量
MAX_STORE_SUBSCRIPTION_TASK_COUNT = 1000
# 最大执行订阅任务数量
MAX_RUN_SUBSCRIPTION_TASK_COUNT = 50

# 订阅删除时间小时数
SUBSCRIPTION_DELETE_HOURS = 6
1 change: 1 addition & 0 deletions apps/backend/periodic_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
from .clean_sub_data import clean_sub_data_task # noqa
from .clean_subscription_data import clean_subscription_data # noqa
from .collect_auto_trigger_job import collect_auto_trigger_job # noqa
from .schedule_running_subscription_task import * # noqa
from .update_subscription_instances import update_subscription_instances # noqa
23 changes: 18 additions & 5 deletions apps/backend/periodic_tasks/check_zombie_sub_inst_record.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _

from apps.backend.subscription.constants import CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL
from apps.backend.subscription.constants import (
CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL,
ZOMBIE_SUB_INST_RECORD_COUNT,
)
from apps.node_man import constants, models
from apps.utils.time_handler import strftime_local

Expand Down Expand Up @@ -48,10 +51,20 @@ def check_zombie_sub_inst_record():
"status__in": [constants.JobStatusType.PENDING, constants.JobStatusType.RUNNING],
}
base_update_kwargs = {"status": constants.JobStatusType.FAILED, "update_time": timezone.now()}

forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs).update(
**base_update_kwargs
)
# 先count确认是否需要update,如果count数量小于100传主键 update,否则继续沿用现在的方式
subscription_instance_record_qs = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs)
if not subscription_instance_record_qs.exists():
logger.info("no zombie_sub_inst_record skipped")
return
if subscription_instance_record_qs.count() < ZOMBIE_SUB_INST_RECORD_COUNT:
forced_failed_inst_record_ids = set(subscription_instance_record_qs.values_list("id", flat=True))
forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(
id__in=forced_failed_inst_record_ids
).update(**base_update_kwargs)
else:
forced_failed_inst_num = models.SubscriptionInstanceRecord.objects.filter(**query_kwargs).update(
**base_update_kwargs
)

forced_failed_status_detail_num = models.SubscriptionInstanceStatusDetail.objects.filter(**query_kwargs).update(
**base_update_kwargs,
Expand Down
16 changes: 6 additions & 10 deletions apps/backend/periodic_tasks/schedule_running_subscription_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,14 @@ def get_need_clean_subscription_app_code():
@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_update_subscription():
name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
# 先计算出要从redis取数据的长度
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
# 从redis中取出对应长度的数据
update_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
# 使用ltrim保留剩下的,可以保证redis中新push的值不会丢失
REDIS_INST.ltrim(name, 0, -length - 1)
# 翻转数据,先进的数据先处理
update_params.reverse()
# 取出该hashset中所有的参数
update_params: Dict[str, bytes] = REDIS_INST.hgetall(name=name)
# 删除该hashset内的所有参数
REDIS_INST.delete(name)
results = []
if not update_params:
return
for update_param in update_params:
for update_param in update_params.values():
# redis取出为bytes类型,需进行解码后转字典
params = json.loads(update_param.decode())
subscription_id = params["subscription_id"]
Expand All @@ -64,7 +60,7 @@ def schedule_update_subscription():
@periodic_task(run_every=constants.UPDATE_SUBSCRIPTION_TASK_INTERVAL, queue="backend", options={"queue": "backend"})
def schedule_run_subscription():
name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
length: int = min(REDIS_INST.llen(name), constants.MAX_RUN_SUBSCRIPTION_TASK_COUNT)
run_params: List[bytes] = REDIS_INST.lrange(name, -length, -1)
REDIS_INST.ltrim(name, 0, -length - 1)
run_params.reverse()
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/subscription/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

# 检查僵尸订阅实例记录周期
CHECK_ZOMBIE_SUB_INST_RECORD_INTERVAL = 15 * constants.TimeUnit.MINUTE
# 僵尸订阅实例记录数量
ZOMBIE_SUB_INST_RECORD_COUNT = 100

# 任务超时时间。距离 create_time 多久后会被判定为超时,防止 pipeline 后台僵死的情况
TASK_TIMEOUT = 15 * constants.TimeUnit.MINUTE
Expand Down
21 changes: 18 additions & 3 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,16 @@ def run(self, scope: Dict = None, actions: Dict[str, str] = None) -> Dict[str, i
raise errors.SubscriptionIncludeGrayBizError()

if subscription.is_running():
# 这里仍使用lpush的原因在于订阅任务可能执行的动作不一样,不能使用更新
name = backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
if REDIS_INST.llen(name) > backend_constants.MAX_STORE_SUBSCRIPTION_TASK_COUNT:
logger.info("redis list store params is full")
return {
"subscription_id": subscription.id,
"message": _("该订阅ID下有正在RUNNING的订阅任务,且任务编排数量已达阈值,请稍后再试,如造成不便,请联系管理员处理"),
}
params = json.dumps({"subscription_id": subscription.id, "scope": scope, "actions": actions})
REDIS_INST.lpush(backend_constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL, params)
REDIS_INST.lpush(name, params)
logger.info(f"run subscription[{subscription.id}] store params into redis: {params}")
return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")}

Expand Down Expand Up @@ -704,8 +712,15 @@ def update_subscription(params: Dict[str, Any]):
):
raise errors.SubscriptionIncludeGrayBizError()
if subscription.is_running():
REDIS_INST.lpush(backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL, json.dumps(params))
logger.info(f"update subscription[{subscription.id}] store params into redis: {params}")
name = backend_constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
if REDIS_INST.hlen(name=name) > backend_constants.MAX_STORE_SUBSCRIPTION_TASK_COUNT:
logger.info("redis hashset store params is full")
return {
"subscription_id": subscription.id,
"message": _("该订阅ID下有正在RUNNING的订阅任务,且任务编排数量已达阈值,请稍后再试,如造成不便,请联系管理员处理"),
}
REDIS_INST.hset(name, key=f"subscription_id_{subscription.id}", value=json.dumps(params))
logger.info(f"update subscription[{subscription.id}] store or update params into redis: {params}")
return {"subscription_id": subscription.id, "message": _("该订阅ID下有正在RUNNING的订阅任务,已进入任务编排")}

with transaction.atomic():
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/subscription/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def delete_subscription(self, request):
raise errors.SubscriptionNotExist({"subscription_id": subscription_id})
# 调用delete()方法才会记录删除时间
subscription_qs.delete()
logger.info(f"deleted subscription: {subscription_id}")
logger.info(f"deleted_subscription_id: {subscription_id}")
return Response({"deleted_subscription_id": subscription_id})

@swagger_auto_schema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def setUp(self) -> None:

def test_schedule_running_subscription_task(self):
name: str = constants.RUN_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
length: int = min(REDIS_INST.llen(name), constants.MAX_RUN_SUBSCRIPTION_TASK_COUNT)
run_params = REDIS_INST.lrange(name, -length, -1)
self.assertEqual(
json.loads(run_params[0].decode()),
Expand Down Expand Up @@ -102,9 +102,10 @@ def setUp(self) -> None:

def test_schedule_update_subscription_task(self):
name: str = constants.UPDATE_SUBSCRIPTION_REDIS_KEY_TPL
length: int = min(REDIS_INST.llen(name), constants.MAX_SUBSCRIPTION_TASK_COUNT)
run_params = REDIS_INST.lrange(name, -length, -1)
self.assertEqual(json.loads(run_params[0].decode()), self.params)

update_params = REDIS_INST.hgetall(name=name)
for update_param in update_params.values():
self.assertEqual(json.loads(update_param.decode()), self.params)
models.SubscriptionInstanceRecord.objects.filter(
id=self.ids["subscription_instance_record_id"], subscription_id=self.ids["subscription_id"]
).update(status="SUCCESS")
Expand Down
8 changes: 7 additions & 1 deletion apps/node_man/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import uuid
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import timedelta
from distutils.dir_util import copy_tree
from enum import Enum
from functools import cmp_to_key, reduce
Expand Down Expand Up @@ -1928,7 +1929,12 @@ def get_subscription(cls, subscription_id: int, show_deleted=False):

def is_running(self, instance_id_list: List[str] = None):
"""订阅下是否有运行中的任务"""
base_kwargs = {"subscription_id": self.id, "is_latest": True}
# 只需检查近两小时内的订阅实例
base_kwargs = {
"subscription_id": self.id,
"is_latest": True,
"update_time__gte": timezone.now() - timedelta(hours=2),
}
if instance_id_list is not None:
base_kwargs["instance_id__in"] = instance_id_list
status_set = set(SubscriptionInstanceRecord.objects.filter(**base_kwargs).values_list("status", flat=True))
Expand Down