Skip to content

Commit

Permalink
chore: change health and metrics behavior (#514)
Browse files Browse the repository at this point in the history
  • Loading branch information
MrMissx authored Jul 14, 2024
1 parent 5fc7e20 commit 26ee6be
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 44 deletions.
80 changes: 36 additions & 44 deletions anjani/internal_plugins/canonical.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from base64 import b64encode
from typing import Any, ClassVar, MutableMapping

from aiohttp import web
from aiopath import AsyncPath
from prometheus_client import REGISTRY, generate_latest
from pymongo.errors import PyMongoError
from pyrogram.enums.chat_member_status import ChatMemberStatus
from pyrogram.enums.chat_members_filter import ChatMembersFilter
Expand All @@ -33,38 +35,15 @@
Message,
)

try:
from prometheus_client import make_asgi_app
from userbotindo import WebServer

_run_canonical = True
except ImportError:
from anjani.util.types import WebServer

_run_canonical = False


from anjani import command, filters, listener, plugin
from anjani.core.metrics import MessageStat


class EndpointFilter(logging.Filter):
def __init__(
self,
path: str,
*args: Any,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
self._path = path

def filter(self, record: logging.LogRecord) -> bool:
return record.getMessage().find(self._path) == -1


# metrics endpoint filter
logging.getLogger("uvicorn.access").addFilter(EndpointFilter("/metrics"))
logging.getLogger("uvicorn.access").addFilter(EndpointFilter("GET / "))
logging.getLogger("aiohttp.access").setLevel(logging.WARNING)

async def metrics_handler(_: web.Request):
metrics = generate_latest(REGISTRY)
return web.Response(body=metrics, content_type="text/plain")


class Canonical(plugin.Plugin):
Expand All @@ -74,12 +53,13 @@ class Canonical(plugin.Plugin):
"""

name: ClassVar[str] = "Canonical"
disabled: ClassVar[bool] = not _run_canonical

# Private
_api: WebServer
_web_runner: web.AppRunner
_web_site: web.TCPSite

__web_task: asyncio.Task[None]
__task: asyncio.Task[None]
__web_server: asyncio.Task[None]
_mt: MutableMapping[MessageMediaType, str] = {
MessageMediaType.STICKER: "sticker",
MessageMediaType.PHOTO: "photo",
Expand All @@ -90,29 +70,41 @@ class Canonical(plugin.Plugin):
async def on_load(self) -> None:
self.db = self.bot.db.get_collection("TEST")
self.chats_db = self.bot.db.get_collection("CHATS")
self._api = WebServer(
title="Anjani API Docs", description="API Documentation for Anjani Services"
)
prom_client = make_asgi_app()
self._api.app.mount("/metrics", prom_client, "metrics")
await self._setup_web_app()

async def _setup_web_app(self):
app = web.Application()
app.add_routes([web.get("/metrics", metrics_handler)])
self._web_runner = web.AppRunner(app)
await self._web_runner.setup()
self._web_site = web.TCPSite(self._web_runner, "localhost", 8080)
await self._web_site.start()

async def stop_aiohttp_server(self):
if self._web_site:
await self._web_site.stop()
if self._web_runner:
await self._web_runner.cleanup()

async def on_start(self, _: int) -> None:
self.log.debug("Starting watch streams")
self.__task = self.bot.loop.create_task(self.watch_streams())
self.__web_task = self.bot.loop.create_task(self._setup_web_app())

async def _web_shutdown(task: asyncio.Task[None]) -> None:
if task.cancelled():
await self.stop_aiohttp_server()

def server_done_cb(task: asyncio.Task[None]):
if task.cancelled:
self.log.debug("Stopping web server")
asyncio.ensure_future(self._api.stop())
def shutdown_wrapper(task):
asyncio.create_task(_web_shutdown(task))

self.log.debug("Starting web server")
self.__web_server = self.bot.loop.create_task(self._api.run())
self.__web_server.add_done_callback(server_done_cb)
self.__web_task.add_done_callback(shutdown_wrapper)

async def on_stop(self) -> None:
self.log.debug("Stopping watch streams")
self.__task.cancel()
self.__web_server.cancel()
self.log.debug("Shutting down web app")
self.__web_task.cancel()

def get_type(self, message: Message) -> str:
if message.command:
Expand Down
71 changes: 71 additions & 0 deletions anjani/internal_plugins/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
""" Health Check plugin for @dAnjani_bot """

# Copyright (C) 2020 - 2023 UserbotIndo Team, <https://github.com/userbotindo.git>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import asyncio
from datetime import datetime
from typing import ClassVar

from pyrogram.raw.functions.ping import Ping


from anjani import plugin


class Health(plugin.Plugin):
name: ClassVar[str] = "HealthCheck"
webhook_url: str
interval: int

# Private
_run_check: bool = False
__task: asyncio.Task[None]

async def on_load(self) -> None:
self.webhook_url = self.bot.config.HEALTH_CHECK_WEBHOOK_URL
if not self.webhook_url:
self.log.debug("Health Check Webhook URL is not set, disabling health check")
self.bot.unload_plugin(self)
return
self._run_check = True
self.interval = self.bot.config.HEALTH_CHECK_INTERVAL

async def on_start(self, _: int) -> None:
self.log.debug("Starting Health Check Push")
self.__task = self.bot.loop.create_task(self.push_health())

async def on_stop(self) -> None:
self.log.debug("Stopping health check push")
self._run_check = False
self.__task.cancel()

async def push_health(self) -> None:
while self._run_check:
try:
await self.bot.http.get(
self.webhook_url,
params={"status": "up", "msg": "OK", "ping": await self.get_ping()},
)
except Exception as e:
self.log.error(f"Error pushing health: {e}")

await asyncio.sleep(self.interval)

async def get_ping(self) -> float:
time = datetime.now()
await self.bot.client.invoke(Ping(ping_id=1))
end = datetime.now()
return (end - time).microseconds / 1000
6 changes: 6 additions & 0 deletions anjani/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ class Config:
PLUGIN_FLAG: list[str]
FEATURE_FLAG: list[str]

HEALTH_CHECK_INTERVAL: Optional[int]
HEALTH_CHECK_WEBHOOK_URL: Optional[str]

IS_CI: bool

def __init__(self) -> None:
Expand Down Expand Up @@ -53,6 +56,9 @@ def __init__(self) -> None:
filter(None, [i.strip() for i in getenv("FEATURE_FLAG", "").split(";")])
)

self.HEALTH_CHECK_INTERVAL = int(getenv("HEALTH_CHECK_INTERVAL", 60))
self.HEALTH_CHECK_WEBHOOK_URL = getenv("HEALTH_CHECK_WEBHOOK_URL")

self.IS_CI = getenv("IS_CI", "false").lower() == "true"

# check if all the required variables are set
Expand Down

0 comments on commit 26ee6be

Please sign in to comment.