Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 系统任务执行状态自监控能力支持 #7457 #7516

Open
wants to merge 3 commits into
base: release_3.32.2_by_master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
"blueapps.opentelemetry.instrument_app",
"apigw_manager.apigw",
"bk_notice_sdk",
"gcloud.contrib.monitor",
)

# 这里是默认的中间件,大部分情况下,不需要改动
Expand Down
10 changes: 5 additions & 5 deletions config/urls_custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,11 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
from rest_framework import permissions
from drf_yasg.views import get_schema_view
from drf_yasg import openapi
from django.conf.urls import include, url
from django.conf import settings

from django.conf.urls import include, url
from drf_yasg import openapi
from drf_yasg.views import get_schema_view
from rest_framework import permissions

# 用户自定义 urlconf
urlpatterns_custom = [
Expand All @@ -41,6 +40,7 @@
url(r"^plugin_service/", include("plugin_service.urls")),
url(r"^mako_operations/", include("gcloud.mako_template_helper.urls")),
url(r"^engine_admin/", include("pipeline.contrib.engine_admin.urls")),
url(r"^monitor/", include("gcloud.contrib.monitor.urls")),
]

schema_view = get_schema_view(
Expand Down
14 changes: 14 additions & 0 deletions gcloud/contrib/monitor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

default_app_config = "gcloud.contrib.monitor.apps.MonitorConfig"
19 changes: 19 additions & 0 deletions gcloud/contrib/monitor/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from django.apps import AppConfig


class MonitorConfig(AppConfig):
name = "gcloud.contrib.monitor"
verbose_name = "GcloudContribMonitor"
24 changes: 24 additions & 0 deletions gcloud/contrib/monitor/urls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

from django.conf.urls import url

from gcloud.contrib.monitor import views

urlpatterns = [
url(r"^get_failed_task/$", views.get_failed_task),
url(r"^get_executing_task/$", views.get_executing_task),
url(r"^get_schedule_times/$", views.get_schedule_times),
url(r"^get_mq_overview/$", views.get_mq_overview),
url(r"^get_mq_data/$", views.get_mq_data),
]
198 changes: 198 additions & 0 deletions gcloud/contrib/monitor/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# -*- coding: utf-8 -*-
"""
Tencent is pleased to support the open source community by making 蓝鲸智云PaaS平台社区版 (BlueKing PaaS Community
Edition) available.
Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
Licensed under the MIT License (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://opensource.org/licenses/MIT
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""

import pyrabbit2
from django.db.models import Q
from django.http import JsonResponse
from django.utils import timezone
from django.views.decorators.http import require_GET
from pipeline.eri.models import Schedule, State

from gcloud.iam_auth.intercept import iam_intercept
from gcloud.iam_auth.view_interceptors.statistics import StatisticsViewInpterceptor
from gcloud.taskflow3.models import TaskFlowInstance


@require_GET
@iam_intercept(StatisticsViewInpterceptor())
def get_failed_task(request):
"""
获取失败任务
"""
limit = int(request.GET.get("limit", 100))
offset = int(request.GET.get("offset", 0))
st = timezone.now() - timezone.timedelta(days=30)
start_time = request.GET.get("start_time", st)
states = State.objects.filter(name="FAILED", started_time__gte=start_time).values("root_id")
root_ids = [state["root_id"] for state in states]
tasks = (
TaskFlowInstance.objects.select_related("project", "pipeline_instance")
.filter(
pipeline_instance__is_deleted=False,
pipeline_instance__is_expired=False,
pipeline_instance__is_finished=False,
pipeline_instance__is_revoked=False,
pipeline_instance__is_started=True,
pipeline_instance__instance_id__in=root_ids,
)
.values("id", "project__name", "pipeline_instance__name")[offset : limit + offset]
)
failed_tasks = [
{
"task_id": task["id"],
"project_name": task["project__name"],
"task_name": task["pipeline_instance__name"],
}
for task in tasks
]
return JsonResponse({"result": True, "data": failed_tasks})


@require_GET
@iam_intercept(StatisticsViewInpterceptor())
def get_executing_task(request):
"""
获取执行中任务
"""
limit = int(request.GET.get("limit", 100))
offset = int(request.GET.get("offset", 0))
st = timezone.now() - timezone.timedelta(days=30)
start_time = request.GET.get("start_time", st)
failed_states = State.objects.filter(name="FAILED", started_time__gte=start_time).values("root_id")
failed_root_ids = [state["root_id"] for state in failed_states]
# 失败的任务
failed_tasks = (
TaskFlowInstance.objects.select_related("project", "pipeline_instance")
.filter(
pipeline_instance__is_deleted=False,
pipeline_instance__is_expired=False,
pipeline_instance__is_finished=False,
pipeline_instance__is_revoked=False,
pipeline_instance__is_started=True,
pipeline_instance__instance_id__in=failed_root_ids,
)
.values(
"pipeline_instance__id",
)[offset : limit + offset]
)
failed_task_ids = [task["pipeline_instance__id"] for task in failed_tasks]

states = State.objects.filter(~Q(name="FAILED")).filter(started_time__gte=start_time).values("root_id")
root_ids = [state["root_id"] for state in states]
# 非失败的任务
tasks = (
TaskFlowInstance.objects.select_related("project", "pipeline_instance")
.filter(
pipeline_instance__is_deleted=False,
pipeline_instance__is_expired=False,
pipeline_instance__is_finished=False,
pipeline_instance__is_revoked=False,
pipeline_instance__is_started=True,
pipeline_instance__instance_id__in=root_ids,
)
.values("id", "project__name", "pipeline_instance__name", "pipeline_instance__id")[offset : limit + offset]
)
# 求差获得执行中的任务
executing_tasks = [
{
"task_id": task["id"],
"project_name": task["project__name"],
"task_name": task["pipeline_instance__name"],
}
for task in tasks
if task["pipeline_instance__id"] not in failed_task_ids
]
return JsonResponse({"result": True, "data": executing_tasks})


@require_GET
@iam_intercept(StatisticsViewInpterceptor())
def get_schedule_times(request):
"""
获取调度次数
"""
limit = int(request.GET.get("limit", 100))
offset = int(request.GET.get("offset", 0))
st = timezone.now() - timezone.timedelta(days=30)
start_time = request.GET.get("start_time", st)
schedules = Schedule.objects.filter(scheduling=False).values("node_id", "schedule_times")
schedules = {schedule["node_id"]: schedule["schedule_times"] for schedule in schedules}
states = State.objects.filter(started_time__gte=start_time, node_id__in=list(schedules.keys())).values(
"node_id", "root_id"
)
root_ids = {state["root_id"]: schedules[state["node_id"]] for state in states}
tasks = (
TaskFlowInstance.objects.select_related("project", "pipeline_instance")
.filter(pipeline_instance__instance_id__in=list(root_ids.keys()))
.values(
"id",
"project__name",
"pipeline_instance__name",
"pipeline_instance__creator",
"pipeline_instance__instance_id",
)[offset : offset + limit]
)
schedule_times = [
{
"id": task["id"],
"project_name": task["project__name"],
"creator": task["pipeline_instance__name"],
"schedule_times": root_ids[task["pipeline_instance__instance_id"]],
}
for task in tasks
]
return JsonResponse({"result": True, "data": schedule_times})


@require_GET
@iam_intercept(StatisticsViewInpterceptor())
def get_mq_overview(request):
"""
获取mq总览
"""
data = {}
cl = pyrabbit2.Client("localhost:15672", "guest", "guest")
overview = cl.get_overview()
data = {
"totals": {
"ready": overview["queue_totals"]["messages_ready"],
"unacked": overview["queue_totals"]["messages_unacknowledged"],
"total": overview["queue_totals"]["messages"],
},
"global_totals": overview["object_totals"],
"nodes": cl.get_nodes(),
}
return JsonResponse({"result": True, "data": data})


@require_GET
@iam_intercept(StatisticsViewInpterceptor())
def get_mq_data(request):
"""
获取mq数据
"""
cl = pyrabbit2.Client("localhost:15672", "guest", "guest")
data = {
vhost: [
{
"vhost": vhost,
"queue_name": queue["name"],
"message_count": queue["messages"],
"queue_state": queue["state"],
"messages": cl.get_messages(vhost, queue["name"], count=queue["messages"], requeue=True),
}
for queue in cl.get_queues(vhost=vhost)
]
for vhost in cl.get_vhost_names()
}
return JsonResponse({"result": True, "data": data})
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ bkstorages==1.0.1
ujson==4.1.0
django-dbconn-retry==0.1.5
pydantic==1.9.1
pyrabbit2==1.0.7


# monitor
django-prometheus==2.1.0
Expand Down
Loading