Skip to content

Commit

Permalink
feature: 可观测建设
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhuoZhuoCrayon committed Oct 4, 2023
1 parent d102f5e commit 6d040b6
Show file tree
Hide file tree
Showing 48 changed files with 1,897 additions and 281 deletions.
3 changes: 3 additions & 0 deletions apps/backend/components/collections/agent_new/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from apps.backend.subscription.steps.agent_adapter.adapter import AgentStepAdapter
from apps.node_man import constants, models
from apps.node_man.exceptions import AliveProxyNotExistsError
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve

from .. import job
from ..base import BaseService, CommonData
Expand Down Expand Up @@ -59,6 +61,7 @@ def sub_inst_failed_handler(self, sub_inst_ids: Union[List[int], Set[int]]):
pass

@classmethod
@SetupObserve(histogram=metrics.app_task_engine_get_common_data_duration_seconds, labels={"step_type": "AGENT"})
def get_common_data(cls, data):
"""
初始化常用数据,注意这些数据不能放在 self 属性里,否则会产生较大的 process snap shot,
Expand Down
164 changes: 155 additions & 9 deletions apps/backend/components/collections/agent_new/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
import random
import socket
import time
import typing
from collections import defaultdict
from typing import Any, Dict, List, Optional, Set, Tuple, Union
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union

from django.conf import settings
from django.utils import timezone, translation
Expand All @@ -36,10 +37,14 @@
from apps.backend.utils.wmi import execute_cmd, put_file
from apps.core.concurrent import controller
from apps.core.remote import conns
from apps.exceptions import AuthOverdueException
from apps.exceptions import AuthOverdueException, parse_exception
from apps.node_man import constants, models
from apps.utils import concurrent, exc, sync
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve
from apps.utils import concurrent, sync
from apps.utils.exc import ExceptionHandler
from common.api import JobApi
from common.log import logger
from pipeline.core.flow import Service, StaticIntervalGenerator

from .. import core
Expand All @@ -56,6 +61,85 @@ def __init__(self, sub_inst_id: int, host: models.Host, installation_tool: Insta
super().__init__(sub_inst_id=sub_inst_id, host=host, identity_data=installation_tool.identity_data)


def parse_common_labels_by_install_obj(
method: str, params: typing.Dict[str, typing.Any]
) -> typing.Dict[str, typing.Any]:
install_sub_inst_obj: InstallSubInstObj = params["install_sub_inst_obj"]
common_labels: typing.Dict[str, typing.Any] = {
"method": method,
"username": install_sub_inst_obj.conns_init_params["username"],
"port": install_sub_inst_obj.conns_init_params["port"],
"auth_type": install_sub_inst_obj.identity_data.auth_type,
"os_type": install_sub_inst_obj.host.os_type,
}
return common_labels


def parse_common_labels_by_host_identity(
method: str, params: typing.Dict[str, typing.Any]
) -> typing.Dict[str, typing.Any]:
host: models.Host = params["host"]
identity_data: models.IdentityData = params["identity_data"]
common_labels: typing.Dict[str, typing.Any] = {
"method": method,
"username": identity_data.account,
"port": identity_data.port,
"auth_type": identity_data.auth_type,
"os_type": host.os_type,
}
return common_labels


def execute_shell_solution_async_exc_handler(
wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any], exc: Exception
) -> Optional[List]:
"""
默认的单订阅实例任务异常处理,用于批量调用时规避单任务异常导致整体执行失败的情况
:param wrapped: 被装饰的函数或类方法
:param instance: 基础Pipeline服务
:param exc: 捕获到异常
:param args: 位置参数
:param kwargs: 关键字参数
:return:
"""
common_labels: typing.Dict[str, typing.Any] = parse_common_labels_by_install_obj("ssh", kwargs)
metrics.app_core_remote_connects_total.labels(**common_labels, status="failed").inc()
metrics.app_core_remote_connect_exceptions_total.labels(**common_labels, **parse_exception(exc)).inc()
return core.default_sub_inst_task_exc_handler(wrapped, instance, args, kwargs, exc)


def execute_windows_commands_exc_handler(
wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any], exc: Exception
) -> Optional[List]:
"""
默认的单订阅实例任务异常处理,用于批量调用时规避单任务异常导致整体执行失败的情况
:param wrapped: 被装饰的函数或类方法
:param instance: 基础Pipeline服务
:param exc: 捕获到异常
:param args: 位置参数
:param kwargs: 关键字参数
:return:
"""
common_labels: typing.Dict[str, typing.Any] = parse_common_labels_by_host_identity("wmiexe", kwargs)
metrics.app_core_remote_connects_total.labels(**common_labels, status="failed").inc()
metrics.app_core_remote_connect_exceptions_total.labels(**common_labels, **parse_exception(exc)).inc()
return core.default_sub_inst_task_exc_handler(wrapped, instance, args, kwargs, exc)


def execute_shell_solution_async_success_handler(
wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any]
) -> None:
common_labels: typing.Dict[str, typing.Any] = parse_common_labels_by_install_obj("shell", kwargs)
metrics.app_core_remote_connects_total.labels(**common_labels, status="success").inc()


def execute_windows_commands_success_handler(
wrapped: Callable, instance: base.BaseService, args: Tuple[Any], kwargs: Dict[str, Any]
) -> None:
common_labels: typing.Dict[str, typing.Any] = parse_common_labels_by_host_identity("wmiexe", kwargs)
metrics.app_core_remote_connects_total.labels(**common_labels, status="success").inc()


class InstallService(base.AgentBaseService, remote.RemoteServiceMixin):
__need_schedule__ = True
interval = StaticIntervalGenerator(5)
Expand All @@ -71,6 +155,12 @@ def outputs_format(self):
Service.InputItem(name="polling_time", key="polling_time", type="int", required=True),
]

@SetupObserve(
histogram=metrics.app_core_remote_batch_execute_duration_seconds,
labels={"method": "job"},
# 不统计异常耗时
include_exception_histogram=False,
)
@controller.ConcurrentController(
data_list_name="install_sub_inst_objs",
batch_call_func=concurrent.batch_call,
Expand All @@ -88,6 +178,12 @@ def handle_non_lan_inst(self, install_sub_inst_objs: List[InstallSubInstObj]) ->
]
return concurrent.batch_call(func=self.execute_job_commands, params_list=params_list)

@SetupObserve(
histogram=metrics.app_core_remote_batch_execute_duration_seconds,
labels={"method": "wmiexe"},
# 不统计异常耗时
include_exception_histogram=False,
)
@controller.ConcurrentController(
data_list_name="install_sub_inst_objs",
batch_call_func=concurrent.batch_call,
Expand Down Expand Up @@ -170,6 +266,12 @@ def _filter_params_list_in_next_step(
params_list=_filter_params_list_in_next_step(run_install_params_list, succeed_sub_inst_ids),
)

@SetupObserve(
histogram=metrics.app_core_remote_batch_execute_duration_seconds,
labels={"method": "shell"},
# 不统计异常耗时
include_exception_histogram=False,
)
@controller.ConcurrentController(
data_list_name="install_sub_inst_objs",
batch_call_func=concurrent.batch_call,
Expand Down Expand Up @@ -295,7 +397,7 @@ def _execute(self, data, parent_data, common_data: base.AgentCommonData):
)
data.outputs.polling_time = 0

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
def get_gse_config_tuple(
self,
sub_inst_id: int,
Expand All @@ -308,7 +410,15 @@ def get_gse_config_tuple(
content = agent_step_adapter.get_config(host=host, filename=file_name, node_type=general_node_type, ap=ap)
return REDIS_AGENT_CONF_KEY_TPL.format(file_name=file_name, sub_inst_id=sub_inst_id), content

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(
exc_handler=execute_windows_commands_exc_handler, success_handler=execute_windows_commands_success_handler
)
@SetupObserve(
histogram=metrics.app_core_remote_execute_duration_seconds,
labels={"method": "wmiexe"},
# 不统计异常耗时
include_exception_histogram=False,
)
@base.RetryHandler(interval=0, retry_times=2, exception_types=[ConnectionResetError])
def execute_windows_commands(
self, sub_inst_id: int, host: models.Host, commands: List[str], identity_data: models.IdentityData
Expand Down Expand Up @@ -364,7 +474,7 @@ def execute_windows_commands(

return sub_inst_id

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@base.RetryHandler(interval=0, retry_times=2, exception_types=[ConnectionResetError])
def push_curl_exe(
self,
Expand Down Expand Up @@ -399,7 +509,13 @@ def push_curl_exe(
raise e
return sub_inst_id

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@SetupObserve(
histogram=metrics.app_core_remote_execute_duration_seconds,
labels={"method": "job"},
# 不统计异常耗时
include_exception_histogram=False,
)
def execute_job_commands(self, sub_inst_id, installation_tool: InstallationTools):
# p-agent 走 作业平台,再 ssh 到 p-agent,这样可以无需保存 proxy 密码
host = installation_tool.host
Expand Down Expand Up @@ -481,7 +597,16 @@ def execute_job_commands(self, sub_inst_id, installation_tool: InstallationTools
host.save(update_fields=["upstream_nodes"])
return sub_inst_id

@exc.ExceptionHandler(exc_handler=core.default_sub_inst_task_exc_handler)
@ExceptionHandler(
exc_handler=execute_shell_solution_async_exc_handler,
success_handler=execute_shell_solution_async_success_handler,
)
@SetupObserve(
histogram=metrics.app_core_remote_execute_duration_seconds,
labels={"method": "ssh"},
# 不统计异常耗时
include_exception_histogram=False,
)
async def execute_shell_solution_async(
self, meta: Dict[str, Any], sub_inst_id: int, install_sub_inst_obj: InstallSubInstObj
) -> int:
Expand Down Expand Up @@ -564,7 +689,9 @@ def handle_report_data(self, sub_inst_id: int, success_callback_step: str) -> Di
error_log = log
is_finished = True
else:
logs.append(log)
if step != "metrics":
logs.append(log)

if step == "report_cpu_arch":
cpu_arch = data["log"]
elif step == "report_agent_id":
Expand Down Expand Up @@ -598,6 +725,25 @@ def handle_report_data(self, sub_inst_id: int, success_callback_step: str) -> Di

logs.append(f"{tag} [{step}] parse healthz result: \n {json.dumps(healthz_result_dict, indent=4)}")

elif step == "metrics":
logger.info(f"[app_core_remote:handle_report_data] sub_inst_id -> {sub_inst_id}, data -> {data}")
try:
name: str = data["metrics"]["name"]
if name == "app_core_remote_proxy_info":
metrics.app_core_remote_proxy_info.labels(**data["metrics"]["labels"]).set(1)
elif name == "app_core_remote_connect_exceptions_total":
metrics.app_core_remote_connect_exceptions_total.labels(**data["metrics"]["labels"]).inc()
elif name == "app_core_remote_execute_duration_seconds_labels":
metrics.app_core_remote_execute_duration_seconds.labels(**data["metrics"]["labels"]).observe(
data["metrics"]["data"]["cost_time"]
)
elif name == "app_core_remote_connects_total":
metrics.app_core_remote_connects_total.labels(**data["metrics"]["labels"]).inc()
except Exception:
logger.exception(
f"[app_core_remote:handle_report_data] sub_inst_id -> {sub_inst_id}, data -> {data}"
)

# 只要匹配到成功返回步骤完成,则认为是执行完成了
if step == success_callback_step and status == "DONE":
is_finished = True
Expand Down
48 changes: 44 additions & 4 deletions apps/backend/components/collections/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
"""
import logging
import os
import traceback
import typing
Expand All @@ -34,12 +35,16 @@
from apps.adapters.api.gse import GseApiBaseHelper, get_gse_api_helper
from apps.backend.subscription import errors
from apps.core.files.storage import get_storage
from apps.exceptions import parse_exception
from apps.node_man import constants, models
from apps.utils import cache, time_handler, translation
from apps.prometheus import metrics
from apps.prometheus.helper import SetupObserve
from apps.utils import cache, local, time_handler, translation
from apps.utils.exc import ExceptionHandler
from common.log import logger
from pipeline.core.flow import Service

logger = logging.getLogger("celery")


class ActivityType:
HEAD = 0
Expand Down Expand Up @@ -87,10 +92,15 @@ def service_run_exc_handler(

act_name = data.get_one_of_inputs("act_name")
sub_inst_ids = instance.get_subscription_instance_ids(data)
code = instance.__class__.__name__

error_msg = _("{act_name} 失败: {exc},请先尝试查看错误日志进行处理,若无法解决,请联系管理员处理").format(act_name=act_name, exc=str(exc))
logger.exception(error_msg)
metrics.app_task_engine_service_run_exceptions_total.labels(
hostname=local.get_hostname(), code=code, **parse_exception(exc)
).inc()

logger.exception(f"[task_engine][service_run_exc_handler:{code}] act_name -> {act_name}, exc -> {str(exc)}")

error_msg = _("{act_name} 失败: {exc},请先尝试查看错误日志进行处理,若无法解决,请联系管理员处理").format(act_name=act_name, exc=str(exc))
instance.bulk_set_sub_inst_act_status(
sub_inst_ids=sub_inst_ids,
status=constants.JobStatusType.FAILED,
Expand All @@ -115,6 +125,12 @@ def get_language_func(
return data.get_one_of_inputs("blueking_language")


def get_labels_func(
wrapped: Callable, instance: "BaseService", args: Tuple[Any], kwargs: Dict[str, Any]
) -> typing.Dict[str, str]:
return {"code": instance.__class__.__name__, "hostname": local.get_hostname()}


class LogMixin:

# 日志类
Expand Down Expand Up @@ -252,14 +268,19 @@ def sub_inst_failed_handler(self, sub_inst_ids: Union[List[int], Set[int]]):
"""
raise NotImplementedError()

@SetupObserve(histogram=metrics.app_task_engine_set_sub_inst_statuses_duration_seconds)
def bulk_set_sub_inst_status(self, status: str, sub_inst_ids: Union[List[int], Set[int]]):
"""批量设置实例状态,对于实例及原子的状态更新只应该在base内部使用"""
models.SubscriptionInstanceRecord.objects.filter(id__in=sub_inst_ids).update(
status=status, update_time=timezone.now()
)
metrics.app_task_engine_sub_inst_statuses_total.labels(hostname=local.get_hostname(), status=status).inc(
len(sub_inst_ids)
)
if status in [constants.JobStatusType.FAILED]:
self.sub_inst_failed_handler(sub_inst_ids)

@SetupObserve(histogram=metrics.app_task_engine_set_sub_inst_act_statuses_duration_seconds)
def bulk_set_sub_inst_act_status(
self, sub_inst_ids: Union[List[int], Set[int]], status: str, common_log: str = None
):
Expand Down Expand Up @@ -343,6 +364,7 @@ def get_common_data(cls, data):

def set_current_id(self, subscription_instance_ids: List[int]):
# 更新当前实例的pipeline id
# TODO 偶发死锁
models.SubscriptionInstanceRecord.objects.filter(id__in=subscription_instance_ids).update(pipeline_id=self.id)

def set_outputs_data(self, data, common_data: CommonData) -> bool:
Expand Down Expand Up @@ -436,10 +458,23 @@ def run(self, service_func, data, parent_data, **kwargs) -> bool:
return bool(succeeded_subscription_instance_ids)

@translation.RespectsLanguage(get_language_func=get_language_func)
@SetupObserve(
gauge=metrics.app_task_engine_running_executes_info,
histogram=metrics.app_task_engine_execute_duration_seconds,
get_labels_func=get_labels_func,
)
@ExceptionHandler(exc_handler=service_run_exc_handler)
def execute(self, data, parent_data):
common_data = self.get_common_data(data)
act_name = data.get_one_of_inputs("act_name")
act_type = data.get_one_of_inputs("act_type")
if act_type in [ActivityType.HEAD, ActivityType.HEAD_TAIL]:
logger.info(
"[sub_lifecycle<sub(%s), task(%s)>][engine] enter",
common_data.subscription.id,
common_data.subscription_instances[0].task_id,
)

subscription_instance_ids = self.get_subscription_instance_ids(data)
to_be_created_sub_statuses = [
models.SubscriptionInstanceStatusDetail(
Expand All @@ -456,6 +491,11 @@ def execute(self, data, parent_data):
return self.run(self._execute, data, parent_data, common_data=common_data)

@translation.RespectsLanguage(get_language_func=get_language_func)
@SetupObserve(
gauge=metrics.app_task_engine_running_schedules_info,
histogram=metrics.app_task_engine_schedule_duration_seconds,
get_labels_func=get_labels_func,
)
@ExceptionHandler(exc_handler=service_run_exc_handler)
def schedule(self, data, parent_data, callback_data=None):
return self.run(self._schedule, data, parent_data, callback_data=callback_data)
Expand Down
Loading

0 comments on commit 6d040b6

Please sign in to comment.