Skip to content

Commit

Permalink
Load TOML configuration with Pydantic
Browse files Browse the repository at this point in the history
Convert config format to TOML (in practice it's not that different)
Use pydantic to load and validate configuration data.
Normalise notifier configuration structure in preparation for adding
more notifier types in future.

Signed-off-by: Joe Groocock <[email protected]>
  • Loading branch information
frebib committed Mar 20, 2024
1 parent c6d67d7 commit 2bfe328
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 88 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM spritsail/alpine:3.19

ARG NOTIFY_VER=1.4
ARG NOTIFY_VER=1.5

LABEL maintainer="Adam Dodman <[email protected]>" \
org.label-schema.vendor="Spritsail" \
Expand All @@ -19,4 +19,4 @@ RUN --mount=type=bind,target=/src,rw \
WORKDIR /config
VOLUME ["/config"]

CMD ["/usr/bin/python3", "-m", "drone_notify", "/config/notify.conf"]
CMD ["/usr/bin/python3", "-m", "drone_notify"]
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,21 +10,21 @@
[![Docker Stars](https://img.shields.io/docker/stars/spritsail/drone-notify.svg)][hub]
[![Build Status](https://drone.spritsail.io/api/badges/spritsail/drone-notify/status.svg)][drone]

This script sets up a webhook listener for Drone's global webhooks. It then sends a notification to a Telegram channel every time a build passes or fails.
This script sets up a webhook listener for Drone's global webhooks. It then sends notification(s) to Telegram every time a build passes or fails.

## Getting Started

Run the docker container with a config file (notify.conf) mounted to /config/notify.conf. Configure the required parameters. At the bare minimum a Telegram bot token (`main.token`) and default channel (`channels.default`) will need to be added.
Run the docker container with a config file (notify.toml) mounted to /config/notify.toml. Configure the required parameters. At the bare minimum a Telegram bot (`[telegram.bot."my bot name"]`) and at least one notifier (`[notifier."my notifier"]`) referencing that bot (`bot = "my bot name"`) will need to be added.

An example config file can be found in `notify.conf.example`
An example config file can be found in `notify.toml.example`

Then run the container:

```shell
docker run -d \
--name=drone-notify \
--restart=always \
-v path/to/notify.conf:/config/notify.conf \
-v path/to/notify.toml:/config/notify.toml \
spritsail/drone-notify
```

Expand All @@ -40,7 +40,7 @@ services:
- DRONE_WEBHOOK_SECRET=YOUR_SECRET

notify:
image: spritsail/drone-notify:1.3
image: spritsail/drone-notify:1.5
volumes:
- path/to/notify.conf:/config/notify.conf
- path/to/notify.toml:/config/notify.toml
```
139 changes: 75 additions & 64 deletions drone_notify/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
"""

import asyncio
import configparser
import functools
import importlib.metadata
import ipaddress
import logging
import os.path
import signal
import socket
import sys
Expand All @@ -19,9 +20,11 @@
import aiohttp
from aiohttp import web

from drone_notify.config import Config, load_toml_file
from drone_notify.digest import DigestVerifier
from drone_notify.drone import WebhookEvent, WebhookRequest
from drone_notify.http_signature import verify_drone_signature
from drone_notify.notify import repo_match
from drone_notify.types import Middleware

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -55,7 +58,9 @@ def format_duration(start: int | float, end: int | float) -> str:
return datestr


async def send_telegram_msg(chatid: str, message: str, parse_mode: str = "html") -> None:
async def send_telegram_msg(
bot_token: str, chatid: str, message: str, parse_mode: str = "html"
) -> None:
"""
Send a formatted message to a Telegram chat
"""
Expand All @@ -70,7 +75,7 @@ async def send_telegram_msg(chatid: str, message: str, parse_mode: str = "html")
respbody: str | None = None
try:
async with session.post(
f"https://api.telegram.org/bot{ttoken}/sendmessage",
f"https://api.telegram.org/bot{bot_token}/sendmessage",
json=postdata,
timeout=60,
) as resp:
Expand All @@ -81,18 +86,10 @@ async def send_telegram_msg(chatid: str, message: str, parse_mode: str = "html")
log.exception("Failed to send notification for %s: %s", postdata, respbody)


async def do_notify(event: WebhookRequest) -> None:
def generate_msg(event: WebhookRequest) -> str:
"""
Generate a formatted notification message and send it
Generate a HTML formatted notification message from Webhook event data
"""
if event.build is None or event.repo is None or event.system is None:
# Satisfy type checkers. We already checked these
return

if "[NOTIFY SKIP]" in event.build.message or "[SKIP NOTIFY]" in event.build.message:
log.debug("Skipping build as flags set")
return

is_pr = ""
if event.build.event == "pull_request":
# This isn't pretty, but it works.
Expand All @@ -119,15 +116,13 @@ async def do_notify(event: WebhookRequest) -> None:
commit_firstline = event.build.message
commit_rest = ""

notifytmpl = (
"<b>{repo} [{PR}{branch}]</b> #{number}: <b>{status}</b> in {time}\n"
return (
"<b>{repo} [{is_pr}{branch}]</b> #{number}: <b>{status}</b> in {time}\n"
+ "<a href='{drone_link}'>{drone_link}</a>\n"
+ "{multi_stage}<a href='{git_link}'>#{commit:7.7}</a> ({committer}): "
+ "<i>{commit_firstline}</i>\n{commit_rest}"
)

notifymsg = notifytmpl.format(
PR=is_pr,
).format(
is_pr=is_pr,
branch=escape(event.build.target),
commit=escape(event.build.after),
commit_firstline=escape(commit_firstline),
Expand All @@ -142,26 +137,53 @@ async def do_notify(event: WebhookRequest) -> None:
time=format_duration(event.build.started, event.build.finished),
)

log.info(
"Sending Telegram notification(s) for %s #%d",
event.repo.slug,
event.build.number,
)

tchat = config["channels"].get(event.repo.slug, default_channel)
async def do_notify(config: Config, event: WebhookRequest) -> None:
"""
Dispatch notifications to all notifiers
"""
if event.build is None or event.repo is None or event.system is None:
# Satisfy type checkers. We already checked these
return

if "[NOTIFY SKIP]" in event.build.message or "[SKIP NOTIFY]" in event.build.message:
log.debug("Skipping notification as commit message requested it")
return

message = generate_msg(event)

senders = []
# Send normal telegram notification
senders.append(send_telegram_msg(tchat, notifymsg))
for name, notif in config.notifier.items():
if notif.status is not None and event.build.status not in notif.status:
log.debug(
"Notifier '%s' isn't used for builds with status %s", name, event.build.status
)
continue

# Skip if notifier disallows repo
if notif.repos is not None and not repo_match(name, event.repo.slug, notif.repos):
continue

# Elaborate hack to extract the bot token from the config
bot_token = getattr(getattr(config, notif.kind), "bot")[notif.bot].bot_token

log.info(
"Sending Telegram notification for %s #%d with notifier '%s' to channel %s",
event.repo.slug,
event.build.number,
name,
notif.channel,
)
senders.append(send_telegram_msg(bot_token, notif.channel, message))

# If theres a failure channel defined & the build has failed, notify that too
if event.build.status != "success" and failure_channel is not None:
senders.append(send_telegram_msg(failure_channel, notifymsg))
if not senders:
log.info("No matching notifiers for %s #%d", event.repo.slug, event.build.number)
return

await asyncio.gather(*senders)


async def hook(request: web.Request) -> web.StreamResponse:
async def hook(config: Config, request: web.Request) -> web.StreamResponse:
"""
Handle incoming webhooks from (hopefully) Drone
"""
Expand All @@ -178,7 +200,7 @@ async def hook(request: web.Request) -> web.StreamResponse:
)

if event.build.status in VALID_BUILD_STATES:
await do_notify(event)
await do_notify(config, event)
log.debug("Returning %s to %s", event.build.status, request.remote)
return web.Response(body=event.build.status)

Expand All @@ -187,28 +209,28 @@ async def hook(request: web.Request) -> web.StreamResponse:
return web.Response(body=b"accepted")


async def startup() -> None:
async def startup(config: Config) -> None:
"""
drone-notify entrypoint
"""
log.info("Started Drone Notify v%s. Default Notification Channel: %s", VERSION, default_channel)
log.info("Started Drone Notify v%s. Loaded %d notifiers", VERSION, len(config.notifier))
log.debug("Debug logging is enabled - prepare for logspam")

host = ipaddress.ip_address(config["main"].get("host", "::"))
port = int(config["main"].get("port", "5000"))
host = ipaddress.ip_address(config.main.host)
port = config.main.port
hostport = ("[%s]:%d" if host.version == 6 else "%s:%d") % (host, port)

middlewares: list[Middleware] = []

if "secret" in config["main"]:
if config.main.secret is not None:
log.debug("Enabled webhook signature verification")
middlewares.append(verify_drone_signature(config["main"]["secret"].encode()))
middlewares.append(verify_drone_signature(config.main.secret.encode()))

# Drone adds the `Digest:` header to all of it's requests
middlewares.append(DigestVerifier(require=True).verify_digest_headers)

handler = web.Application(middlewares=middlewares)
handler.add_routes([web.post("/hook", hook)])
handler.add_routes([web.post("/hook", functools.partial(hook, config))])

runner = web.AppRunner(handler)
await runner.setup()
Expand All @@ -229,38 +251,27 @@ async def startup() -> None:
stream=sys.stdout,
)

# TODO: Add some sanity checks to make sure the file exists, is readable
# and contains everything we need.
cfg_path: str = sys.argv[1] if len(sys.argv) > 1 else "notify.conf"

config = configparser.ConfigParser()
config.read(cfg_path)

ttoken = config["main"]["token"]
default_channel = config["channels"]["default"]

if config.has_option("main", "debug"):
if config["main"].getboolean("debug"):
log.setLevel(logging.DEBUG)

# If a failure channel exists, assign it to a var
failure_channel: str | None = None
if len(sys.argv) > 1:
cfg_path = sys.argv[1]
elif os.path.isfile("notify.toml"):
cfg_path = "notify.toml"
else:
log.warning("Falling back to old config filename 'notify.conf'")
log.warning(
"Configuration has migrated to TOML format. Please update your configuration file"
)
cfg_path = "notify.conf"

if config.has_option("channels", "failure"):
failure_channel = config["channels"]["failure"]
cfg = load_toml_file(cfg_path)

if not ttoken:
log.error("Required variable `main.token' empty or unset")
sys.exit(1)
elif not default_channel:
log.error("Required value `channels.default' empty or unset")
sys.exit(1)
if cfg.main.debug:
log.setLevel(logging.DEBUG)

try:
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, loop.stop)
loop.add_signal_handler(signal.SIGINT, loop.stop)
loop.run_until_complete(startup())
loop.run_until_complete(startup(cfg))
loop.run_forever()
except KeyboardInterrupt:
log.info("Caught ^C, stopping")
Expand Down
98 changes: 98 additions & 0 deletions drone_notify/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Configuration parsing, validation and representation for drone-notify
configuration data.
"""

import os
import tomllib
from typing import Annotated, Any, Literal

from pydantic import BaseModel, ConfigDict, Field, model_validator


class StrictModel(BaseModel):
"""
Base pydantic model that enables strict model configuration
This should be used as the base class for all configuration classes
"""

model_config = ConfigDict(strict=True, extra="forbid")


class BaseNotifier(StrictModel):
"""
An abstract notifier type with no special service-specific behaviours
"""

kind: str
status: list[str] | None = None
repos: list[str] | None = None


class TelegramNotifier(BaseNotifier):
"""
A Telegram notifier type that uses a bot to notify a Telegram channel
"""

kind: Literal["telegram"]
bot: str
channel: str


class TelegramBot(StrictModel):
"""
A Bot object for sending messages to Telegram as a bot user
"""

bot_token: str


class Telegram(StrictModel):
"""
Container for one or more Telegram bot definitions
"""

bot: dict[str, TelegramBot]


class Main(StrictModel):
"""
Main application-level configuration options
"""

host: str = Field(default="::")
port: int = Field(default=5000)
secret: str | None = None
debug: bool = False


class Config(StrictModel):
"""
Top-level application configuration
"""

main: Main
notifier: dict[str, Annotated[TelegramNotifier, Field(discriminator="kind")]]
telegram: Telegram | None

@model_validator(mode="after")
def match_notifiers(self) -> "Config":
"""
Validates that each notifier references a defined bot object
"""
for name, notif in self.notifier.items():
bots: dict[str, Any] = getattr(getattr(self, notif.kind), "bot")
if notif.bot not in bots:
raise ValueError(
f"Notifier '{name}' references undefined {notif.kind} bot '{notif.bot}'"
)

return self


def load_toml_file(path: str | bytes | os.PathLike[str]) -> Config:
"""
Loads a Config object from an ini file given a path
"""
with open(path, "rb") as fp:
return Config(**tomllib.load(fp))
Loading

0 comments on commit 2bfe328

Please sign in to comment.