Skip to content

Commit

Permalink
feature: 可观测建设 (closed #1852)
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Oct 12, 2023
1 parent 0bde9bb commit 4ee759f
Show file tree
Hide file tree
Showing 55 changed files with 2,154 additions and 319 deletions.
164 changes: 155 additions & 9 deletions apps/backend/components/collections/agent_new/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import random
import socket
import time
import typing
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union

from django.conf import settings
from django.utils import timezone, translation
Expand All @@ -36,10 +37,14 @@
from apps.backend.utils.wmi import execute_cmd, put_file
from apps.core.concurrent import controller
from apps.core.remote import conns
from apps.exceptions import AuthOverdueException
from apps.exceptions import AuthOverdueException, parse_exception
from apps.node_man import constants, models
from apps.utils import concurrent, exc, sync
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve
from apps.utils import concurrent, sync
from apps.utils.exc import ExceptionHandler
from common.api import JobApi
from common.log import logger
from pipeline.core.flow import Service, StaticIntervalGenerator

from .. import core
Expand All @@ -56,6 +61,85 @@ def __init__(self, sub_inst_id: int, host: models.Host, installation_tool: Insta
super().__init__(sub_inst_id=sub_inst_id, host=host, identity_data=installation_tool.identity_data)


def parse_common_labels_by_install_obj(
method: str, params: typing.Dict[str, typing.Any]
) -> typing.Dict[str, typing.Any]:
install_sub_inst_obj: InstallSubInstObj = params["install_sub_inst_obj"]
common_labels: typing.Dict[str, typing.Any] = {
"method": method,
"username": install_sub_inst_obj.conns_init_params["username"],
"port": install_sub_inst_obj.conns_init_params["port"],
"auth_type": install_sub_inst_obj.identity_data.auth_type,
"os_type": install_sub_inst_obj.host.os_type,
}
return common_labels


def parse_common_labels_by_host_identity(
method: str, params: typing.Dict[str, typing.Any]
) -> typing.Dict[str, typing.Any]:
host: models.Host = params["host"]
identity_data: models.IdentityData = params["identity_data"]
common_labels: typing.Dict[str, typing.Any] = {
"method": method,
"username": identity_data.account,
"port": identity_data.port,
"auth_type": identity_data.auth_type,
"os_type": host.os_type,
}
return common_labels


def execute_shell_solution_async_exc_handler(
wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any], exc: Exception
) -> Optional[List]:
"""
默认的单订阅实例任务异常处理,用于批量调用时规避单任务异常导致整体执行失败的情况
:param wrapped: 被装饰的函数或类方法
:param instance: 基础Pipeline服务
:param exc: 捕获到异常
:param args: 位置参数
:param kwargs: 关键字参数
:return:
"""
common_labels: typing.Dict[str, typing.Any] = parse_common_labels_by_install_obj("ssh", kwargs)
metrics.app_core_remote_connects_total.labels(**common_labels, status="failed").inc()
metrics.app_core_remote_connect_exceptions_total.labels(**common_labels, **parse_exception(exc)).inc()
return core.default_sub_inst_task_exc_handler(wrapped, instance, args, kwargs, exc)


def execute_windows_commands_exc_handler(
wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any], exc: Exception
) -> Optional[List]:
"""
默认的单订阅实例任务异常处理,用于批量调用时规避单任务异常导致整体执行失败的情况
:param wrapped: 被装饰的函数或类方法
:param instance: 基础Pipeline服务
:param exc: 捕获到异常
:param args: 位置参数
:param kwargs: 关键字参数
:return:
"""
common_labels: typing.Dict[str, typing.Any] = parse_common_labels_by_host_identity("wmiexe", kwargs)
metrics.app_core_remote_connects_total.labels(**common_labels, status="failed").inc()
metrics.app_core_remote_connect_exceptions_total.labels(**common_labels, **parse_exception(exc)).inc()
return core.default_sub_inst_task_exc_handler(wrapped, instance, args, kwargs, exc)


def execute_shell_solution_async_success_handler(
wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any]
) -> None:
common_labels: typing.Dict[str, typing.Any] = parse_common_labels_by_install_obj("ssh", kwargs)
metrics.app_core_remote_connects_total.labels(**common_labels, status="success").inc()


def execute_windows_commands_success_handler(
wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any]
) -> None:
common_labels: typing.Dict[str, typing.Any] = parse_common_labels_by_host_identity("wmiexe", kwargs)
metrics.app_core_remote_connects_total.labels(**common_labels, status="success").inc()


class InstallService(base.AgentBaseService, remote.RemoteServiceMixin):
__need_schedule__ = True
interval = StaticIntervalGenerator(5)
Expand All @@ -71,6 +155,12 @@ def outputs_format(self):
Service.InputItem(name="polling_time", key="polling_time", type="int", required=True),
]

@SetupObserve(
histogram=metrics.app_core_remote_batch_execute_duration_seconds,
labels={"method": "job"},
# 不统计异常耗时
include_exception_histogram=False,
)
@controller.ConcurrentController(
data_list_name="install_sub_inst_objs",
batch_call_func=concurrent.batch_call,
Expand All @@ -88,6 +178,12 @@ def handle_non_lan_inst(self, install_sub_inst_objs: List[InstallSubInstObj]) ->
]
return concurrent.batch_call(func=self.execute_job_commands, params_list=params_list)

@SetupObserve(
histogram=metrics.app_core_remote_batch_execute_duration_seconds,
labels={"method": "wmiexe"},
# 不统计异常耗时
include_exception_histogram=False,
)
@controller.ConcurrentController(
data_list_name="install_sub_inst_objs",
batch_call_func=concurrent.batch_call,
Expand Down Expand Up @@ -170,6 +266,12 @@ def _filter_params_list_in_next_step(
params_list=_filter_params_list_in_next_step(run_install_params_list, succeed_sub_inst_ids),
)

@SetupObserve(
histogram=metrics.app_core_remote_batch_execute_duration_seconds,
labels={"method": "ssh"},
# 不统计异常耗时
include_exception_histogram=False,
)
@controller.ConcurrentController(
data_list_name="install_sub_inst_objs",
batch_call_func=concurrent.batch_call,
Expand Down Expand Up @@ -295,7 +397,7 @@ def _execute(self, data, parent_data, common_data: base.AgentCommonData):
)
data.outputs.polling_time = 0

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
def get_gse_config_tuple(
self,
sub_inst_id: int,
Expand All @@ -308,7 +410,15 @@ def get_gse_config_tuple(
content = agent_step_adapter.get_config(host=host, filename=file_name, node_type=general_node_type, ap=ap)
return REDIS_AGENT_CONF_KEY_TPL.format(file_name=file_name, sub_inst_id=sub_inst_id), content

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(
exc_handler=execute_windows_commands_exc_handler, success_handler=execute_windows_commands_success_handler
)
@SetupObserve(
histogram=metrics.app_core_remote_execute_duration_seconds,
labels={"method": "wmiexe"},
# 不统计异常耗时
include_exception_histogram=False,
)
@base.RetryHandler(interval=0, retry_times=2, exception_types=[ConnectionResetError])
def execute_windows_commands(
self, sub_inst_id: int, host: models.Host, commands: List[str], identity_data: models.IdentityData
Expand Down Expand Up @@ -364,7 +474,7 @@ def execute_windows_commands(

return sub_inst_id

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@base.RetryHandler(interval=0, retry_times=2, exception_types=[ConnectionResetError])
def push_curl_exe(
self,
Expand Down Expand Up @@ -399,7 +509,13 @@ def push_curl_exe(
raise e
return sub_inst_id

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@SetupObserve(
histogram=metrics.app_core_remote_execute_duration_seconds,
labels={"method": "job"},
# 不统计异常耗时
include_exception_histogram=False,
)
def execute_job_commands(self, sub_inst_id, installation_tool: InstallationTools):
# p-agent 走 作业平台,再 ssh 到 p-agent,这样可以无需保存 proxy 密码
host = installation_tool.host
Expand Down Expand Up @@ -481,7 +597,16 @@ def execute_job_commands(self, sub_inst_id, installation_tool: InstallationTools
host.save(update_fields=["upstream_nodes"])
return sub_inst_id

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(
exc_handler=execute_shell_solution_async_exc_handler,
success_handler=execute_shell_solution_async_success_handler,
)
@SetupObserve(
histogram=metrics.app_core_remote_execute_duration_seconds,
labels={"method": "ssh"},
# 不统计异常耗时
include_exception_histogram=False,
)
async def execute_shell_solution_async(
self, meta: Dict[str, Any], sub_inst_id: int, install_sub_inst_obj: InstallSubInstObj
) -> int:
Expand Down Expand Up @@ -564,7 +689,9 @@ def handle_report_data(self, sub_inst_id: int, success_callback_step: str) -> Di
error_log = log
is_finished = True
else:
logs.append(log)
if step != "metrics":
logs.append(log)

if step == "report_cpu_arch":
cpu_arch = data["log"]
elif step == "report_agent_id":
Expand Down Expand Up @@ -598,6 +725,25 @@ def handle_report_data(self, sub_inst_id: int, success_callback_step: str) -> Di

logs.append(f"{tag} [{step}] parse healthz result: \n {json.dumps(healthz_result_dict, indent=4)}")

elif step == "metrics":
logger.info(f"[app_core_remote:handle_report_data] sub_inst_id -> {sub_inst_id}, data -> {data}")
try:
name: str = data["metrics"]["name"]
if name == "app_core_remote_proxy_info":
metrics.app_core_remote_proxy_info.labels(**data["metrics"]["labels"]).set(1)
elif name == "app_core_remote_connect_exceptions_total":
metrics.app_core_remote_connect_exceptions_total.labels(**data["metrics"]["labels"]).inc()
elif name == "app_core_remote_execute_duration_seconds":
metrics.app_core_remote_execute_duration_seconds.labels(**data["metrics"]["labels"]).observe(
data["metrics"]["data"]["cost_time"]
)
elif name == "app_core_remote_connects_total":
metrics.app_core_remote_connects_total.labels(**data["metrics"]["labels"]).inc()
except Exception:
logger.exception(
f"[app_core_remote:handle_report_data] sub_inst_id -> {sub_inst_id}, data -> {data}"
)

# 只要匹配到成功返回步骤完成,则认为是执行完成了
if step == success_callback_step and status == "DONE":
is_finished = True
Expand Down
Loading

0 comments on commit 4ee759f

Please sign in to comment.