-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
113 lines (94 loc) · 3.39 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import base64
import os
import secrets
import string
from abc import ABC
from pathlib import Path
import yaml
from hetzner_dns import HetznerDNS
import asyncio
import tornado.web
from hetzner_dns_record import HetznerDNSRecord
class GenerateHandler(tornado.web.RequestHandler, ABC):
async def get(self, api_token, zone_name, record_type, record_name):
dns = HetznerDNS(api_token)
zone = await dns.get_zone(zone_name)
record = await zone.get_record(record_type, record_name)
entry: dict = {
''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(20)): {
'api_token': api_token,
'zone_id': record.zone_id,
'record': {
'id': record.record_id,
'type': record.type,
'name': record.name,
'ttl': record.ttl
}
}
}
entry_yaml = yaml.dump(entry, default_flow_style=False, sort_keys=False)
self.write(f'<pre>{entry_yaml}</pre>')
class UpdateHandler(tornado.web.RequestHandler, ABC):
async def get(self, *args):
if len(args) % 2:
self.write('parameters odd')
return
if any(len(arg) == 0 for arg in args):
self.write('zero length arg')
return
for i in range(int(len(args) / 2)):
if args[i * 2] not in config:
self.write('failed')
return
for i in range(int(len(args) / 2)):
record = HetznerDNSRecord.from_config(config[args[i * 2]])
await record.update(args[i * 2 + 1])
self.write('ok')
class Dyndns2Handler(tornado.web.RequestHandler, ABC):
async def get(self):
if self.get_query_argument('system') != 'dyndns':
print('badagent')
return
ip: str = self.get_query_argument('myip')
auth: str = self.request.headers.get('Authorization', '')
if not auth.startswith('Basic '):
print(f'badauth')
return
auth = auth[6:]
auth = base64.b64decode(auth).decode()
key: str = auth[auth.rfind(':') + 1:]
if key not in config:
self.write('badauth')
return
record = HetznerDNSRecord.from_config(config[key])
await record.update(ip)
self.write(f'good {ip}')
def make_app():
reg: str = r'([\w.:]*)'
update_url: str = f'/update/{reg}/{reg}'
handlers: list = [
(f'/nic/update', Dyndns2Handler),
(update_url, UpdateHandler),
]
for _ in range(int(os.environ.get('MAX_UPDATES_PER_GET', 2)) - 1):
update_url += f'/{reg}/{reg}'
handlers.append((update_url, UpdateHandler))
if 'DISABLE_GENERATE' not in os.environ:
handlers.append((f'/generate/{reg}/{reg}/{reg}/{reg}', GenerateHandler))
return tornado.web.Application(handlers)
async def main():
app = make_app()
app.listen(8888, xheaders=True)
await asyncio.Event().wait()
def get_config_local(filename: Path) -> dict:
if not filename.exists():
return {'error': 'file does not exist'}
with open(filename, 'r') as file:
try:
return yaml.safe_load(file)
except yaml.YAMLError as e:
print(e)
return {'error': str(e)}
if __name__ == '__main__':
config: dict = get_config_local(Path('config.yaml'))
asyncio.run(main())