Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: change health and metrics behavior #514

Merged
merged 2 commits into from
Jul 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 36 additions & 44 deletions anjani/internal_plugins/canonical.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
from base64 import b64encode
from typing import Any, ClassVar, MutableMapping

from aiohttp import web
from aiopath import AsyncPath
from prometheus_client import REGISTRY, generate_latest
from pymongo.errors import PyMongoError
from pyrogram.enums.chat_member_status import ChatMemberStatus
from pyrogram.enums.chat_members_filter import ChatMembersFilter
Expand All @@ -33,38 +35,15 @@
Message,
)

try:
from prometheus_client import make_asgi_app
from userbotindo import WebServer

_run_canonical = True
except ImportError:
from anjani.util.types import WebServer

_run_canonical = False


from anjani import command, filters, listener, plugin
from anjani.core.metrics import MessageStat


class EndpointFilter(logging.Filter):
def __init__(
self,
path: str,
*args: Any,
**kwargs: Any,
):
super().__init__(*args, **kwargs)
self._path = path

def filter(self, record: logging.LogRecord) -> bool:
return record.getMessage().find(self._path) == -1


# metrics endpoint filter
logging.getLogger("uvicorn.access").addFilter(EndpointFilter("/metrics"))
logging.getLogger("uvicorn.access").addFilter(EndpointFilter("GET / "))
logging.getLogger("aiohttp.access").setLevel(logging.WARNING)

async def metrics_handler(_: web.Request):
metrics = generate_latest(REGISTRY)
return web.Response(body=metrics, content_type="text/plain")


class Canonical(plugin.Plugin):
Expand All @@ -74,12 +53,13 @@ class Canonical(plugin.Plugin):
"""

name: ClassVar[str] = "Canonical"
disabled: ClassVar[bool] = not _run_canonical

# Private
_api: WebServer
_web_runner: web.AppRunner
_web_site: web.TCPSite

__web_task: asyncio.Task[None]
__task: asyncio.Task[None]
__web_server: asyncio.Task[None]
_mt: MutableMapping[MessageMediaType, str] = {
MessageMediaType.STICKER: "sticker",
MessageMediaType.PHOTO: "photo",
Expand All @@ -90,29 +70,41 @@ class Canonical(plugin.Plugin):
async def on_load(self) -> None:
self.db = self.bot.db.get_collection("TEST")
self.chats_db = self.bot.db.get_collection("CHATS")
self._api = WebServer(
title="Anjani API Docs", description="API Documentation for Anjani Services"
)
prom_client = make_asgi_app()
self._api.app.mount("/metrics", prom_client, "metrics")
await self._setup_web_app()

async def _setup_web_app(self):
app = web.Application()
app.add_routes([web.get("/metrics", metrics_handler)])
self._web_runner = web.AppRunner(app)
await self._web_runner.setup()
self._web_site = web.TCPSite(self._web_runner, "localhost", 8080)
await self._web_site.start()

async def stop_aiohttp_server(self):
if self._web_site:
await self._web_site.stop()
if self._web_runner:
await self._web_runner.cleanup()

async def on_start(self, _: int) -> None:
self.log.debug("Starting watch streams")
self.__task = self.bot.loop.create_task(self.watch_streams())
self.__web_task = self.bot.loop.create_task(self._setup_web_app())

async def _web_shutdown(task: asyncio.Task[None]) -> None:
if task.cancelled():
await self.stop_aiohttp_server()

def server_done_cb(task: asyncio.Task[None]):
if task.cancelled:
self.log.debug("Stopping web server")
asyncio.ensure_future(self._api.stop())
def shutdown_wrapper(task):
asyncio.create_task(_web_shutdown(task))

self.log.debug("Starting web server")
self.__web_server = self.bot.loop.create_task(self._api.run())
self.__web_server.add_done_callback(server_done_cb)
self.__web_task.add_done_callback(shutdown_wrapper)

async def on_stop(self) -> None:
self.log.debug("Stopping watch streams")
self.__task.cancel()
self.__web_server.cancel()
self.log.debug("Shutting down web app")
self.__web_task.cancel()

def get_type(self, message: Message) -> str:
if message.command:
Expand Down
71 changes: 71 additions & 0 deletions anjani/internal_plugins/health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
""" Health Check plugin for @dAnjani_bot """

# Copyright (C) 2020 - 2023 UserbotIndo Team, <https://github.com/userbotindo.git>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import asyncio
from datetime import datetime
from typing import ClassVar

from pyrogram.raw.functions.ping import Ping


from anjani import plugin


class Health(plugin.Plugin):
name: ClassVar[str] = "HealthCheck"
webhook_url: str
interval: int

# Private
_run_check: bool = False
__task: asyncio.Task[None]

async def on_load(self) -> None:
self.webhook_url = self.bot.config.HEALTH_CHECK_WEBHOOK_URL
if not self.webhook_url:
self.log.debug("Health Check Webhook URL is not set, disabling health check")
self.bot.unload_plugin(self)
return
self._run_check = True
self.interval = self.bot.config.HEALTH_CHECK_INTERVAL

async def on_start(self, _: int) -> None:
self.log.debug("Starting Health Check Push")
self.__task = self.bot.loop.create_task(self.push_health())

async def on_stop(self) -> None:
self.log.debug("Stopping health check push")
self._run_check = False
self.__task.cancel()

async def push_health(self) -> None:
while self._run_check:
try:
await self.bot.http.get(
self.webhook_url,
params={"status": "up", "msg": "OK", "ping": await self.get_ping()},
)
except Exception as e:
self.log.error(f"Error pushing health: {e}")

await asyncio.sleep(self.interval)

async def get_ping(self) -> float:
time = datetime.now()
await self.bot.client.invoke(Ping(ping_id=1))
end = datetime.now()
return (end - time).microseconds / 1000
6 changes: 6 additions & 0 deletions anjani/util/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ class Config:
PLUGIN_FLAG: list[str]
FEATURE_FLAG: list[str]

HEALTH_CHECK_INTERVAL: Optional[int]
HEALTH_CHECK_WEBHOOK_URL: Optional[str]

IS_CI: bool

def __init__(self) -> None:
Expand Down Expand Up @@ -53,6 +56,9 @@ def __init__(self) -> None:
filter(None, [i.strip() for i in getenv("FEATURE_FLAG", "").split(";")])
)

self.HEALTH_CHECK_INTERVAL = int(getenv("HEALTH_CHECK_INTERVAL", 60))
self.HEALTH_CHECK_WEBHOOK_URL = getenv("HEALTH_CHECK_WEBHOOK_URL")

self.IS_CI = getenv("IS_CI", "false").lower() == "true"

# check if all the required variables are set
Expand Down
Loading