-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
66 lines (55 loc) · 1.95 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import asyncio
import signal
import ssl
import sys
from config import defaults
from config.cluster import cluster
from hypercorn.asyncio import serve
from hypercorn.config import Config
from hypercorn.middleware import ProxyFixMiddleware
from web import app
hypercorn_config = Config()
hypercorn_config.bind = [defaults.HYPERCORN_BIND]
hypercorn_config.certfile = defaults.TLS_CERTFILE
hypercorn_config.keyfile = defaults.TLS_KEYFILE
hypercorn_config.include_server_header = False
hypercorn_config.server_names = defaults.HOSTNAME
hypercorn_config.ciphers = "ECDHE+AESGCM"
hypercorn_shutdown_event = asyncio.Event()
def _signal_handler(*_) -> None:
hypercorn_shutdown_event.set()
async def main():
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGTERM, _signal_handler)
def _exception_handler(loop, context):
exception = context.get("exception")
if isinstance(exception, ssl.SSLError) or isinstance(
exception, asyncio.sslproto._SSLProtocolTransport
):
pass
elif isinstance(exception, OSError) and exception.errno == 99:
print("ERROR: Unable to bind to an address")
hypercorn_shutdown_event.set()
else:
loop.default_exception_handler(context)
loop.set_exception_handler(_exception_handler)
async with asyncio.TaskGroup() as group:
asyncio.create_task(
serve(
ProxyFixMiddleware(app, mode="legacy", trusted_hops=1),
hypercorn_config,
shutdown_trigger=hypercorn_shutdown_event.wait,
),
name="qrt",
),
asyncio.create_task(
cluster.run(shutdown_trigger=hypercorn_shutdown_event),
name="cluster",
),
try:
await hypercorn_shutdown_event.wait()
print("Received shutdown event, exit")
sys.exit(0)
except asyncio.CancelledError:
pass
asyncio.run(main())