Skip to content

Commit

Permalink
fix: enable run celery task
Browse files Browse the repository at this point in the history
  • Loading branch information
narasux committed Sep 20, 2023
1 parent f7914d9 commit 3a6eecb
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 12 deletions.
10 changes: 10 additions & 0 deletions src/bk-user/bin/start_celery.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#!/bin/bash

## 设置环境变量
CELERY_CONCURRENCY=${CELERY_CONCURRENCY:-6}
CELERY_LOG_LEVEL=${CELERY_LOG_LEVEL:-info}

command="celery -A bkuser worker -l ${CELERY_LOG_LEVEL} --concurrency ${CELERY_CONCURRENCY}"

## Run!
exec bash -c "$command"
14 changes: 11 additions & 3 deletions src/bk-user/bkuser/apis/web/data_source/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from django.db import transaction
from django.utils.translation import gettext_lazy as _
from drf_yasg.utils import swagger_auto_schema
from openpyxl.utils.exceptions import InvalidFileException
from rest_framework import generics, status
from rest_framework.response import Response

Expand Down Expand Up @@ -251,6 +250,8 @@ def patch(self, request, *args, **kwargs):
class DataSourceTemplateApi(CurrentUserTenantDataSourceMixin, generics.ListAPIView):
"""获取本地数据源数据导入模板"""

pagination_class = None

@swagger_auto_schema(
tags=["data_source"],
operation_description="下载数据源导入模板",
Expand All @@ -270,6 +271,8 @@ def get(self, request, *args, **kwargs):
class DataSourceExportApi(CurrentUserTenantDataSourceMixin, generics.ListAPIView):
"""本地数据源用户导出"""

pagination_class = None

@swagger_auto_schema(
tags=["data_source"],
operation_description="下载本地数据源用户数据",
Expand Down Expand Up @@ -307,7 +310,7 @@ def post(self, request, *args, **kwargs):
# Request file 转换成 openpyxl.workbook
try:
workbook = openpyxl.load_workbook(data["file"])
except InvalidFileException:
except Exception: # pylint: disable=broad-except
logger.exception("本地数据源导入失败")
raise error_codes.DATA_SOURCE_IMPORT_FAILED.f(_("文件格式异常"))

Expand All @@ -318,7 +321,12 @@ def post(self, request, *args, **kwargs):
trigger=SyncTaskTrigger.MANUAL,
)

task = DataSourceSyncManager(data_source, options).execute(context={"workbook": workbook})
try:
task = DataSourceSyncManager(data_source, options).execute(context={"workbook": workbook})
except Exception as e: # pylint: disable=broad-except
logger.exception("本地数据源导入失败")
raise error_codes.DATA_SOURCE_IMPORT_FAILED.f(str(e))

return Response(
LocalDataSourceImportOutputSLZ(
instance={"task_id": task.id, "status": task.status, "summary": task.summary}
Expand Down
7 changes: 3 additions & 4 deletions src/bk-user/bkuser/apps/sync/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,19 @@
"""
from typing import Any, Dict

from celery import shared_task

from bkuser.apps.sync.models import DataSourceSyncTask, TenantSyncTask
from bkuser.apps.sync.runners import DataSourceSyncTaskRunner, TenantSyncTaskRunner
from bkuser.celery import app


@shared_task(ignore_result=True)
@app.task(ignore_result=True)
def sync_data_source(task_id: int, context: Dict[str, Any]):
"""同步数据源数据"""
task = DataSourceSyncTask.objects.get(id=task_id)
DataSourceSyncTaskRunner(task, context).run()


@shared_task(ignore_result=True)
@app.task(ignore_result=True)
def sync_tenant(task_id: int):
"""同步数据源数据"""
task = TenantSyncTask.objects.get(id=task_id)
Expand Down
2 changes: 2 additions & 0 deletions src/bk-user/bkuser/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,5 @@
app.conf.task_queues = [
Queue("bkuser", Exchange("bkuser"), routing_key="bkuser", queue_arguments={"x-ha-policy": "all"}),
]

app.conf.task_default_queue = "bkuser"
13 changes: 8 additions & 5 deletions src/bk-user/bkuser/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"rest_framework",
"corsheaders",
"django_celery_beat",
"django_celery_results",
"django_prometheus",
"drf_yasg",
"bkuser.auth",
Expand Down Expand Up @@ -241,8 +242,8 @@
# CELERY_IMPORTS = []
# 内置的周期任务
# CELERYBEAT_SCHEDULE = {}
# Celery消息队列
BROKER_URL = env.str("BK_BROKER_URL", default="")
# Celery 消息队列配置
CELERY_BROKER_URL = env.str("BK_BROKER_URL", default="")

# ------------------------------------------ 缓存配置 ------------------------------------------

Expand Down Expand Up @@ -325,12 +326,14 @@
CACHES["redis"]["OPTIONS"]["CONNECTION_POOL_CLASS"] = "redis.sentinel.SentinelConnectionPool"

# default celery broker
if not BROKER_URL:
if not CELERY_BROKER_URL:
# use Redis as the default broker
BROKER_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
CELERY_BROKER_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
# https://docs.celeryq.dev/en/v5.3.1/getting-started/backends-and-brokers/redis.html#broker-redis
if REDIS_USE_SENTINEL:
BROKER_URL = ";".join([f"sentinel://:{REDIS_PASSWORD}@{addr}/{REDIS_DB}" for addr in REDIS_SENTINEL_ADDR])
CELERY_BROKER_URL = ";".join(
[f"sentinel://:{REDIS_PASSWORD}@{addr}/{REDIS_DB}" for addr in REDIS_SENTINEL_ADDR]
)
BROKER_TRANSPORT_OPTIONS = {
"master_name": REDIS_SENTINEL_MASTER_NAME,
"sentinel_kwargs": {"password": REDIS_SENTINEL_PASSWORD},
Expand Down

0 comments on commit 3a6eecb

Please sign in to comment.