Skip to content

Commit

Permalink
feature: 可观测建设 (closed #1852)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Oct 23, 2023
1 parent c052348 commit b3977e2
Show file tree
Hide file tree
Showing 57 changed files with 2,243 additions and 339 deletions.
213 changes: 194 additions & 19 deletions apps/backend/components/collections/agent_new/install.py

Large diffs are not rendered by default.

83 changes: 74 additions & 9 deletions apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
import os
import traceback
import typing
Expand All @@ -34,12 +35,16 @@
from apps.adapters.api.gse import GseApiBaseHelper, get_gse_api_helper
from apps.backend.subscription import errors
from apps.core.files.storage import get_storage
from apps.exceptions import parse_exception
from apps.node_man import constants, models
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve
from apps.utils import cache, time_handler, translation
from apps.utils.exc import ExceptionHandler
from common.log import logger
from pipeline.core.flow import Service

logger = logging.getLogger("celery")


class ActivityType:
HEAD = 0
Expand Down Expand Up @@ -87,11 +92,15 @@ def service_run_exc_handler(

act_name = data.get_one_of_inputs("act_name")
sub_inst_ids = instance.get_subscription_instance_ids(data)
code = instance.__class__.__name__

error_msg = _("{act_name} 失败: {exc},请先尝试查看错误日志进行处理,若无法解决,请联系管理员处理").format(act_name=act_name, exc=str(exc))
logger.exception(error_msg)
metrics.app_task_engine_service_run_exceptions_total.labels(code=code, **parse_exception(exc)).inc()

logger.exception(f"[task_engine][service_run_exc_handler:{code}] act_name -> {act_name}, exc -> {str(exc)}")

error_msg = _("{act_name} 失败: {exc},请先尝试查看错误日志进行处理,若无法解决,请联系管理员处理").format(act_name=act_name, exc=str(exc))
instance.bulk_set_sub_inst_act_status(
data=data,
sub_inst_ids=sub_inst_ids,
status=constants.JobStatusType.FAILED,
common_log=instance.log_maker.error_log(error_msg),
Expand All @@ -115,6 +124,12 @@ def get_language_func(
return data.get_one_of_inputs("blueking_language")


def get_labels_func(
wrapped: Callable, instance: "BaseService", args: Tuple[Any], kwargs: Dict[str, Any]
) -> typing.Dict[str, str]:
return {"code": instance.__class__.__name__}


class LogMixin:

# 日志类
Expand Down Expand Up @@ -252,19 +267,40 @@ def sub_inst_failed_handler(self, sub_inst_ids: Union[List[int], Set[int]]):
"""
raise NotImplementedError()

def bulk_set_sub_inst_status(self, status: str, sub_inst_ids: Union[List[int], Set[int]]):
@SetupObserve(histogram=metrics.app_task_engine_set_sub_inst_statuses_duration_seconds)
def bulk_set_sub_inst_status(self, data, status: str, sub_inst_ids: Union[List[int], Set[int]]):
"""批量设置实例状态,对于实例及原子的状态更新只应该在base内部使用"""
models.SubscriptionInstanceRecord.objects.filter(id__in=sub_inst_ids).update(
status=status, update_time=timezone.now()
)
# status -> PENDING -> RUNNING -> FAILED | SUCCESS
metrics.app_task_engine_sub_inst_statuses_total.labels(status=status).inc(len(sub_inst_ids))

meta: Dict[str, Any] = self.get_meta(data)
steps: List[Dict] = meta.get("STEPS") or []
gse_version: str = meta.get("GSE_VERSION") or "unknown"
for step in steps:
metrics.app_task_engine_sub_inst_step_statuses_total.labels(
step_id=step.get("id") or "unknown",
step_type=step.get("type") or "unknown",
step_num=len(steps),
step_index=step.get("index") or 0,
gse_version=gse_version,
action=step.get("action") or "unknown",
code=self.__class__.__name__,
status=status,
).inc(amount=len(sub_inst_ids))

if status in [constants.JobStatusType.FAILED]:
self.sub_inst_failed_handler(sub_inst_ids)

@SetupObserve(histogram=metrics.app_task_engine_set_sub_inst_act_statuses_duration_seconds)
def bulk_set_sub_inst_act_status(
self, sub_inst_ids: Union[List[int], Set[int]], status: str, common_log: str = None
self, data, sub_inst_ids: Union[List[int], Set[int]], status: str, common_log: str = None
):
"""
批量设置实例状态
:param data:
:param sub_inst_ids:
:param status:
:param common_log: 全局日志,用于需要全局暴露的异常
Expand All @@ -281,7 +317,7 @@ def bulk_set_sub_inst_act_status(

# 失败的实例需要更新汇总状态
if status in [constants.JobStatusType.FAILED]:
self.bulk_set_sub_inst_status(constants.JobStatusType.FAILED, sub_inst_ids)
self.bulk_set_sub_inst_status(data, constants.JobStatusType.FAILED, sub_inst_ids)

@staticmethod
def get_subscription_instance_ids(data):
Expand All @@ -293,6 +329,13 @@ def get_subscription_instance_ids(data):
subscription_instance_ids = succeeded_subscription_instance_ids
return subscription_instance_ids

@staticmethod
def get_meta(data) -> Dict[str, Any]:
meta: Dict[str, Any] = data.get_one_of_inputs("meta", {})
if "STEPS" not in meta:
meta["STEPS"] = []
return meta

@classmethod
def get_common_data(cls, data):
"""
Expand Down Expand Up @@ -334,7 +377,7 @@ def get_common_data(cls, data):
sub_inst_id__host_id_map=sub_inst_id__host_id_map,
host_id__sub_inst_id_map=host_id__sub_inst_id_map,
ap_id_obj_map=ap_id_obj_map,
gse_api_helper=get_gse_api_helper(gse_version=data.get_one_of_inputs("meta", {}).get("GSE_VERSION")),
gse_api_helper=get_gse_api_helper(gse_version=cls.get_meta(data).get("GSE_VERSION")),
subscription=subscription,
subscription_step=subscription_step,
subscription_instances=subscription_instances,
Expand All @@ -343,6 +386,7 @@ def get_common_data(cls, data):

def set_current_id(self, subscription_instance_ids: List[int]):
# 更新当前实例的pipeline id
# TODO 偶发死锁
models.SubscriptionInstanceRecord.objects.filter(id__in=subscription_instance_ids).update(pipeline_id=self.id)

def set_outputs_data(self, data, common_data: CommonData) -> bool:
Expand All @@ -367,7 +411,7 @@ def run(self, service_func, data, parent_data, **kwargs) -> bool:
act_type = data.get_one_of_inputs("act_type")
# 流程起始设置RUNNING
if service_func == self._execute and act_type in [ActivityType.HEAD, ActivityType.HEAD_TAIL]:
self.bulk_set_sub_inst_status(constants.JobStatusType.RUNNING, subscription_instance_ids)
self.bulk_set_sub_inst_status(data, constants.JobStatusType.RUNNING, subscription_instance_ids)

service_func(data, parent_data, **kwargs)

Expand All @@ -387,6 +431,7 @@ def run(self, service_func, data, parent_data, **kwargs) -> bool:
)

self.bulk_set_sub_inst_act_status(
data=data,
sub_inst_ids=revoked_subscription_instance_ids,
status=constants.JobStatusType.FAILED,
common_log=self.log_maker.warning_log(
Expand All @@ -410,6 +455,7 @@ def run(self, service_func, data, parent_data, **kwargs) -> bool:

# failed_subscription_instance_id_set - sub_inst_ids_previous_failed_set 取差集,仅更新本轮失败的订阅实例详情
self.bulk_set_sub_inst_act_status(
data=data,
sub_inst_ids=failed_subscription_instance_id_set - previous_failed_subscription_instance_id_set,
status=constants.JobStatusType.FAILED,
common_log=self.log_maker.error_log(
Expand All @@ -422,6 +468,7 @@ def run(self, service_func, data, parent_data, **kwargs) -> bool:
return bool(succeeded_subscription_instance_ids)

self.bulk_set_sub_inst_act_status(
data=data,
sub_inst_ids=succeeded_subscription_instance_ids,
status=constants.JobStatusType.SUCCESS,
common_log=self.log_maker.info_log(_("{act_name} 成功").format(act_name=act_name)),
Expand All @@ -430,16 +477,29 @@ def run(self, service_func, data, parent_data, **kwargs) -> bool:
# 流程结束设置成功的实例
if act_type in [ActivityType.TAIL, ActivityType.HEAD_TAIL]:
self.bulk_set_sub_inst_status(
constants.JobStatusType.SUCCESS, sub_inst_ids=succeeded_subscription_instance_ids
data, constants.JobStatusType.SUCCESS, sub_inst_ids=succeeded_subscription_instance_ids
)

return bool(succeeded_subscription_instance_ids)

@translation.RespectsLanguage(get_language_func=get_language_func)
@SetupObserve(
gauge=metrics.app_task_engine_running_executes_info,
histogram=metrics.app_task_engine_execute_duration_seconds,
get_labels_func=get_labels_func,
)
@ExceptionHandler(exc_handler=service_run_exc_handler)
def execute(self, data, parent_data):
common_data = self.get_common_data(data)
act_name = data.get_one_of_inputs("act_name")
act_type = data.get_one_of_inputs("act_type")
if act_type in [ActivityType.HEAD, ActivityType.HEAD_TAIL]:
logger.info(
"[sub_lifecycle<sub(%s), task(%s)>][engine] enter",
common_data.subscription.id,
common_data.subscription_instances[0].task_id,
)

subscription_instance_ids = self.get_subscription_instance_ids(data)
to_be_created_sub_statuses = [
models.SubscriptionInstanceStatusDetail(
Expand All @@ -456,6 +516,11 @@ def execute(self, data, parent_data):
return self.run(self._execute, data, parent_data, common_data=common_data)

@translation.RespectsLanguage(get_language_func=get_language_func)
@SetupObserve(
gauge=metrics.app_task_engine_running_schedules_info,
histogram=metrics.app_task_engine_schedule_duration_seconds,
get_labels_func=get_labels_func,
)
@ExceptionHandler(exc_handler=service_run_exc_handler)
def schedule(self, data, parent_data, callback_data=None):
return self.run(self._schedule, data, parent_data, callback_data=callback_data)
Expand Down
3 changes: 3 additions & 0 deletions apps/backend/components/collections/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from apps.core.concurrent import core_concurrent_constants
from apps.node_man import models
from apps.utils import enum
from common.log import logger

from . import base

Expand Down Expand Up @@ -110,7 +111,9 @@ def default_task_exc_handler(
:param kwargs: 关键字参数
:return:
"""
code = instance.__class__.__name__
sub_inst_id = sub_inst_id_extractor(args, kwargs)
logger.exception(f"[task_engine][service_task_exc_handler:{code}] sub_inst_id -> {sub_inst_id}, exc -> {str(exc)}")
instance.move_insts_to_failed(sub_inst_id if isinstance(sub_inst_id, Iterable) else [sub_inst_id], str(exc))
# 打印 DEBUG 日志
instance.log_debug(sub_inst_id, log_content=traceback.format_exc(), fold=True)
Expand Down
4 changes: 4 additions & 0 deletions apps/backend/components/collections/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
from apps.exceptions import AppBaseException, ComponentCallError
from apps.node_man import constants, exceptions, models
from apps.node_man.handlers.cmdb import CmdbHandler
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve
from apps.utils import cache, md5
from apps.utils.batch_request import request_multi_thread
from apps.utils.files import PathHandler
Expand Down Expand Up @@ -90,6 +92,7 @@ class PluginBaseService(BaseService, metaclass=abc.ABCMeta):
"""

@classmethod
@SetupObserve(histogram=metrics.app_task_engine_get_common_data_duration_seconds, labels={"step_type": "PLUGIN"})
def get_common_data(cls, data):
"""
初始化常用数据,注意这些数据不能放在 self 属性里,否则会产生较大的 process snap shot,
Expand Down Expand Up @@ -968,6 +971,7 @@ def _execute(self, data, parent_data, common_data: PluginCommonData):
{"group_id": process_status.group_id},
context,
package_obj=package,
source="engine",
)
process_status.configs = rendered_configs
process_status.save()
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ class PluginMigrateType:
REMOVE_FROM_SCOPE = "REMOVE_FROM_SCOPE"
NOT_SYNC_HOST = "NOT_SYNC_HOST"
MANUAL_OP_EXEMPT = "MANUAL_OP_EXEMPT"
ABNORMAL_AGENT_STATUS = "ABNORMAL_AGENT_STATUS"

MIGRATE_TYPE_ALIAS_MAP = {
NEW_INSTALL: _("新安装"),
VERSION_CHANGE: _("版本变更"),
CONFIG_CHANGE: _("配置变更"),
PROC_NUM_NOT_MATCH: _("进程数量不匹配"),
ABNORMAL_PROC_STATUS: _("进程状态异常"),
ABNORMAL_AGENT_STATUS: _("Agent 状态异常"),
NOT_CHANGE: _("无需变更"),
REMOVE_FROM_SCOPE: _("从范围中移除"),
NOT_SYNC_HOST: _("未同步的主机"),
Expand Down
3 changes: 2 additions & 1 deletion apps/backend/periodic_tasks/cache_scope_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def get_instances_by_scope_task(subscription_id):
f" scope_md5: {scope_md5}, scope: {subscription.scope}"
)
# 查询后会进行缓存,详见 get_instances_by_scope 的装饰器 func_cache_decorator
tools.get_instances_by_scope(subscription.scope)
tools.get_instances_by_scope(subscription.scope, source="get_instances_by_scope_task")
logger.info(f"[cache_subscription_scope_instances] (subscription: {subscription_id}) end.")


Expand All @@ -43,6 +43,7 @@ def get_instances_by_scope_task(subscription_id):
def cache_scope_instances():
"""定时缓存订阅范围实例,用于提高 instance_status、statistics 等接口的速度"""
subscriptions = models.Subscription.objects.filter(enable=True, is_deleted=False)
# TODO 可以再按 scope md5 聚合一次,避免重复缓存
count = subscriptions.count()
for index, subscription in enumerate(subscriptions):
countdown = calculate_countdown(count=count, index=index, duration=constants.SUBSCRIPTION_UPDATE_INTERVAL)
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/periodic_tasks/collect_auto_trigger_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from collections import defaultdict

from celery.task import periodic_task
from django.conf import settings
from django.db import transaction
from django.db.models import Q

Expand Down Expand Up @@ -99,6 +100,7 @@ def collect_auto_trigger_job():
statistics={f"{k}_count": 0 for k in ["success", "failed", "pending", "running", "total"]},
error_hosts=[],
created_by="admin",
from_system=settings.APP_CODE,
# TODO 将历史多个自动触发task先行整合到一个job,后续根据实际情况考虑是否拆分
task_id_list=task_ids_gby_sub_id[subscription["id"]],
is_auto_trigger=True,
Expand Down
1 change: 1 addition & 0 deletions apps/backend/plugin/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def create_plugin_register_task(self, request):
# 2. 创建一个新的task,返回任务ID
job = models.Job.objects.create(
created_by=params["bk_username"],
from_system=settings.APP_CODE,
job_type=constants.JobType.PACKING_PLUGIN,
# TODO 打包任务是否也用一次性订阅的方式下发
subscription_id=-1,
Expand Down
19 changes: 16 additions & 3 deletions apps/backend/subscription/commons.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from apps.component.esbclient import client_v2
from apps.exceptions import BizNotExistError
from apps.node_man import constants
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve, get_call_resource_labels_func
from apps.utils.batch_request import batch_request

logger = logging.getLogger("app")
Expand All @@ -35,6 +37,7 @@ def get_host_object_attribute(bk_biz_id):
return custom_fields


@SetupObserve(counter=metrics.app_common_method_requests_total, get_labels_func=get_call_resource_labels_func)
def list_biz_hosts(bk_biz_id, condition, func, split_params=False):
biz_custom_property = []
kwargs = {
Expand Down Expand Up @@ -84,12 +87,14 @@ def get_host_by_inst(bk_biz_id, inst_list):
elif inst["bk_obj_id"] in bk_obj_id_list:
# 自定义层级
topo_cond = {"bk_obj_id": inst["bk_obj_id"], "bk_inst_id": inst["bk_inst_id"]}
hosts.extend(list_biz_hosts(bk_biz_id, topo_cond, "find_host_by_topo"))
hosts.extend(
list_biz_hosts(bk_biz_id, topo_cond, "find_host_by_topo", source="get_host_by_inst:find_host_by_topo")
)

if bk_biz_ids:
# 业务查询
for bk_biz_id in bk_biz_ids:
hosts.extend(list_biz_hosts(bk_biz_id, {}, "list_biz_hosts"))
hosts.extend(list_biz_hosts(bk_biz_id, {}, "list_biz_hosts", source="get_host_by_inst:list_biz_hosts:biz"))
if bk_set_ids:
# 集群查询
hosts.extend(
Expand All @@ -101,6 +106,14 @@ def get_host_by_inst(bk_biz_id, inst_list):
)
if bk_module_ids:
# 模块查询 这里CMDB限制了bk_module_ids不能超过500, 需要拆分参数 split_params=True
hosts.extend(list_biz_hosts(bk_biz_id, {"bk_module_ids": bk_module_ids}, "list_biz_hosts", split_params=True))
hosts.extend(
list_biz_hosts(
bk_biz_id,
{"bk_module_ids": bk_module_ids},
"list_biz_hosts",
split_params=True,
source="get_host_by_inst:list_biz_hosts:module",
)
)

return hosts
4 changes: 2 additions & 2 deletions apps/backend/subscription/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def task_result(
# 如果不需要已不在订阅范围内的执行快照,查询订阅范围过滤掉移除的实例 ID
subscription = models.Subscription.objects.get(id=self.subscription_id)
scope_instance_id_list: Set[str] = set(
tools.get_instances_by_scope(subscription.scope, get_cache=True).keys()
tools.get_instances_by_scope(subscription.scope, get_cache=True, source="task_result").keys()
)
base_kwargs["instance_id__in"] = scope_instance_id_list

Expand Down Expand Up @@ -491,7 +491,7 @@ def statistic(subscription_id_list: List[int]) -> List[Dict]:
sub_statistic_list: List[Dict] = []
for subscription in subscriptions:
sub_statistic = {"subscription_id": subscription.id, "status": []}
current_instances = tools.get_instances_by_scope(subscription.scope, get_cache=True)
current_instances = tools.get_instances_by_scope(subscription.scope, get_cache=True, source="statistic")

status_statistic = {"SUCCESS": 0, "PENDING": 0, "FAILED": 0, "RUNNING": 0}
plugin_versions = defaultdict(lambda: defaultdict(int))
Expand Down
2 changes: 1 addition & 1 deletion apps/backend/subscription/render_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def get_hosts_by_node(config_hosts):
if config_hosts[0].get("bk_host_id"):
from apps.backend.subscription.tools import get_host_detail

host_infos = get_host_detail(config_hosts)
host_infos = get_host_detail(config_hosts, source="get_hosts_by_node")
for host_info in host_infos:
host_info["ip"] = host_info["bk_host_innerip"] or host_info["bk_host_innerip_v6"]
instances.append(host_info)
Expand Down
Loading

0 comments on commit b3977e2

Please sign in to comment.