From 26ee6be14731bf8f68430b01e6ddd3de2f7ebb7c Mon Sep 17 00:00:00 2001 From: Gaung Ramadhan Date: Sun, 14 Jul 2024 22:36:06 +0700 Subject: [PATCH] chore: change health and metrics behavior (#514) --- anjani/internal_plugins/canonical.py | 80 +++++++++++++--------------- anjani/internal_plugins/health.py | 71 ++++++++++++++++++++++++ anjani/util/config.py | 6 +++ 3 files changed, 113 insertions(+), 44 deletions(-) create mode 100644 anjani/internal_plugins/health.py diff --git a/anjani/internal_plugins/canonical.py b/anjani/internal_plugins/canonical.py index 9f69cb779..4c13c6e5d 100644 --- a/anjani/internal_plugins/canonical.py +++ b/anjani/internal_plugins/canonical.py @@ -20,7 +20,9 @@ from base64 import b64encode from typing import Any, ClassVar, MutableMapping +from aiohttp import web from aiopath import AsyncPath +from prometheus_client import REGISTRY, generate_latest from pymongo.errors import PyMongoError from pyrogram.enums.chat_member_status import ChatMemberStatus from pyrogram.enums.chat_members_filter import ChatMembersFilter @@ -33,38 +35,15 @@ Message, ) -try: - from prometheus_client import make_asgi_app - from userbotindo import WebServer - - _run_canonical = True -except ImportError: - from anjani.util.types import WebServer - - _run_canonical = False - - from anjani import command, filters, listener, plugin from anjani.core.metrics import MessageStat - -class EndpointFilter(logging.Filter): - def __init__( - self, - path: str, - *args: Any, - **kwargs: Any, - ): - super().__init__(*args, **kwargs) - self._path = path - - def filter(self, record: logging.LogRecord) -> bool: - return record.getMessage().find(self._path) == -1 - - # metrics endpoint filter -logging.getLogger("uvicorn.access").addFilter(EndpointFilter("/metrics")) -logging.getLogger("uvicorn.access").addFilter(EndpointFilter("GET / ")) +logging.getLogger("aiohttp.access").setLevel(logging.WARNING) + +async def metrics_handler(_: web.Request): + metrics = generate_latest(REGISTRY) + return web.Response(body=metrics, content_type="text/plain") class Canonical(plugin.Plugin): @@ -74,12 +53,13 @@ class Canonical(plugin.Plugin): """ name: ClassVar[str] = "Canonical" - disabled: ClassVar[bool] = not _run_canonical # Private - _api: WebServer + _web_runner: web.AppRunner + _web_site: web.TCPSite + + __web_task: asyncio.Task[None] __task: asyncio.Task[None] - __web_server: asyncio.Task[None] _mt: MutableMapping[MessageMediaType, str] = { MessageMediaType.STICKER: "sticker", MessageMediaType.PHOTO: "photo", @@ -90,29 +70,41 @@ class Canonical(plugin.Plugin): async def on_load(self) -> None: self.db = self.bot.db.get_collection("TEST") self.chats_db = self.bot.db.get_collection("CHATS") - self._api = WebServer( - title="Anjani API Docs", description="API Documentation for Anjani Services" - ) - prom_client = make_asgi_app() - self._api.app.mount("/metrics", prom_client, "metrics") + await self._setup_web_app() + + async def _setup_web_app(self): + app = web.Application() + app.add_routes([web.get("/metrics", metrics_handler)]) + self._web_runner = web.AppRunner(app) + await self._web_runner.setup() + self._web_site = web.TCPSite(self._web_runner, "localhost", 8080) + await self._web_site.start() + + async def stop_aiohttp_server(self): + if self._web_site: + await self._web_site.stop() + if self._web_runner: + await self._web_runner.cleanup() async def on_start(self, _: int) -> None: self.log.debug("Starting watch streams") self.__task = self.bot.loop.create_task(self.watch_streams()) + self.__web_task = self.bot.loop.create_task(self._setup_web_app()) + + async def _web_shutdown(task: asyncio.Task[None]) -> None: + if task.cancelled(): + await self.stop_aiohttp_server() - def server_done_cb(task: asyncio.Task[None]): - if task.cancelled: - self.log.debug("Stopping web server") - asyncio.ensure_future(self._api.stop()) + def shutdown_wrapper(task): + asyncio.create_task(_web_shutdown(task)) - self.log.debug("Starting web server") - self.__web_server = self.bot.loop.create_task(self._api.run()) - self.__web_server.add_done_callback(server_done_cb) + self.__web_task.add_done_callback(shutdown_wrapper) async def on_stop(self) -> None: self.log.debug("Stopping watch streams") self.__task.cancel() - self.__web_server.cancel() + self.log.debug("Shutting down web app") + self.__web_task.cancel() def get_type(self, message: Message) -> str: if message.command: diff --git a/anjani/internal_plugins/health.py b/anjani/internal_plugins/health.py new file mode 100644 index 000000000..7c1000da9 --- /dev/null +++ b/anjani/internal_plugins/health.py @@ -0,0 +1,71 @@ +""" Health Check plugin for @dAnjani_bot """ + +# Copyright (C) 2020 - 2023 UserbotIndo Team, +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import asyncio +from datetime import datetime +from typing import ClassVar + +from pyrogram.raw.functions.ping import Ping + + +from anjani import plugin + + +class Health(plugin.Plugin): + name: ClassVar[str] = "HealthCheck" + webhook_url: str + interval: int + + # Private + _run_check: bool = False + __task: asyncio.Task[None] + + async def on_load(self) -> None: + self.webhook_url = self.bot.config.HEALTH_CHECK_WEBHOOK_URL + if not self.webhook_url: + self.log.debug("Health Check Webhook URL is not set, disabling health check") + self.bot.unload_plugin(self) + return + self._run_check = True + self.interval = self.bot.config.HEALTH_CHECK_INTERVAL + + async def on_start(self, _: int) -> None: + self.log.debug("Starting Health Check Push") + self.__task = self.bot.loop.create_task(self.push_health()) + + async def on_stop(self) -> None: + self.log.debug("Stopping health check push") + self._run_check = False + self.__task.cancel() + + async def push_health(self) -> None: + while self._run_check: + try: + await self.bot.http.get( + self.webhook_url, + params={"status": "up", "msg": "OK", "ping": await self.get_ping()}, + ) + except Exception as e: + self.log.error(f"Error pushing health: {e}") + + await asyncio.sleep(self.interval) + + async def get_ping(self) -> float: + time = datetime.now() + await self.bot.client.invoke(Ping(ping_id=1)) + end = datetime.now() + return (end - time).microseconds / 1000 diff --git a/anjani/util/config.py b/anjani/util/config.py index 8f25133f4..0f8ba8f21 100644 --- a/anjani/util/config.py +++ b/anjani/util/config.py @@ -25,6 +25,9 @@ class Config: PLUGIN_FLAG: list[str] FEATURE_FLAG: list[str] + HEALTH_CHECK_INTERVAL: Optional[int] + HEALTH_CHECK_WEBHOOK_URL: Optional[str] + IS_CI: bool def __init__(self) -> None: @@ -53,6 +56,9 @@ def __init__(self) -> None: filter(None, [i.strip() for i in getenv("FEATURE_FLAG", "").split(";")]) ) + self.HEALTH_CHECK_INTERVAL = int(getenv("HEALTH_CHECK_INTERVAL", 60)) + self.HEALTH_CHECK_WEBHOOK_URL = getenv("HEALTH_CHECK_WEBHOOK_URL") + self.IS_CI = getenv("IS_CI", "false").lower() == "true" # check if all the required variables are set