-
Notifications
You must be signed in to change notification settings - Fork 5
/
test_with_valid_zone_files.py
520 lines (470 loc) · 22.6 KB
/
test_with_valid_zone_files.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
"""
Runs tests with valid zone files on different implementations.
Either compares responses from mulitple implementations with each other or uses a
expected response to flag differences (only when one implementation is passed for testing).
usage: test_with_valid_zone_files.py [-h] [-path DIRECTORY_PATH]
[-id {1,2,3,4,5}] [-r START END] [-b]
[-n] [-k] [-p] [-c] [-y] [-m] [-t] [-e] [-l]
optional arguments:
-h, --help show this help message and exit
-path DIRECTORY_PATH The path to the directory containing ZoneFiles
and either Queries or ExpectedResponses directories.
(default: Results/ValidZoneFileTests/)
-id {1,2,3,4,5} Unique id for all the containers (useful when running
comparison in parallel). (default: 1)
-r START END The range of tests to compare. (default: All tests)
-b Disable Bind. (default: False)
-n Disable Nsd. (default: False)
-k Disable Knot. (default: False)
-p Disable PowerDns. (default: False)
-c Disable CoreDns. (default: False)
-y Disable Yadifa. (default: False)
-m Disable MaraDns. (default: False)
-t Disable TrustDns. (default: False)
-e Disable Technitium. (default: False)
-l, --latest Test using latest image tag. (default: False)
"""
#!/usr/bin/env python3
import copy
import json
import pathlib
import subprocess
import sys
import time
from argparse import (SUPPRESS, ArgumentDefaultsHelpFormatter, ArgumentParser,
ArgumentTypeError, Namespace)
from datetime import datetime
from multiprocessing import Process
from typing import Any, Dict, List, Optional, TextIO, Tuple, Union
import dns.message
import dns.query
import dns.rdataclass
import dns.rdatatype
import dns.resolver
from Implementations.Bind.prepare import run as bind
from Implementations.Coredns.prepare import run as coredns
from Implementations.Knot.prepare import run as knot
from Implementations.Maradns.prepare import run as maradns
from Implementations.Nsd.prepare import run as nsd
from Implementations.Powerdns.prepare import run as powerdns
from Implementations.Technitium.prepare import run as technitium
from Implementations.Trustdns.prepare import run as trustdns
from Implementations.Yadifa.prepare import run as yadifa
ZONE_FILES = "ZoneFiles/"
QUERIES = "Queries/"
QUERY_RESPONSES = "ExpectedResponses/"
DIFFERENCES = "Differences/"
# A response is a tuple where the first element is the implementation in string format
# and second element is a DNS response (or "No response") of that implementation
ResponseType = Tuple[str, Union[str, dns.message.Message]]
def get_ports(input_args: Namespace) -> Dict[str, Tuple[bool, int]]:
"""
Returns a map from an implementation to the host port its container port 53
should be mapped and whether that implementation should be tested.
:param input_args: The input arguments
"""
implementations = {}
implementations['bind'] = (not input_args.b, 8000)
implementations['nsd'] = (not input_args.n, 8100)
implementations['knot'] = (not input_args.k, 8200)
implementations['powerdns'] = (not input_args.p, 8300)
implementations['yadifa'] = (not input_args.y, 8400)
implementations['coredns'] = (not input_args.c, 8500)
implementations['maradns'] = (not input_args.m, 8600)
implementations['trustdns'] = (not input_args.t, 8700)
implementations['technitium'] = (not input_args.e, 8800)
return implementations
def remove_container(cid: int) -> None:
"""
Stops the running containers of all the implementations.
:param cid: The unique id for all the containers
"""
# Get the list of containers
cmd_status = subprocess.run(
['docker', 'ps', '-a', '--format', '"{{.Names}}"'], stdout=subprocess.PIPE, check=False)
output = cmd_status.stdout.decode("utf-8")
if cmd_status.returncode != 0:
sys.exit(f'Error in executing Docker ps command: {output}')
all_container_names = [name[1:-1] for name in output.strip().split("\n")]
servers = ["_bind_server", "_nsd_server", "_knot_server", "_powerdns_server",
"_maradns_server", "_yadifa_server", "_trustdns_server", "_coredns_server", "_technitium_server"]
for server in servers:
# Force remove the container if it is running
if str(cid) + server in all_container_names:
subprocess.run(['docker', 'container', 'rm', str(cid) + server, '-f'],
stdout=subprocess.PIPE, check=False)
def start_containers(cid: int, implementations: Dict[str, Tuple[bool, int]], tag: str) -> None:
"""
Starts a container for each requested implementation
:param cid: The unique id for all the containers
:param implementations: Map from an implementation to a tuple of two items
- 1. whether to check that implementation 2. which host port
should be mapped to the container port 53
:param tag: Tag of the images to use
"""
remove_container(cid)
for impl, (check, port) in implementations.items():
if check:
if impl == 'technitium':
subprocess.run(['docker', 'run', '-dp', str(port * cid) + ':53/udp', '-p', f'{str(port * cid + 1)}:5380/tcp',
'--name=' + str(cid) + '_' + impl + '_server', impl + tag], check=True)
else:
subprocess.run(['docker', 'run', '-dp', str(port * cid) + ':53/udp',
'--name=' + str(cid) + '_' + impl + '_server', impl + tag], check=True)
def querier(query_name: str, query_type: str, port: int) -> Union[str, dns.message.Message]:
"""
Sends the input query to the input host port and either DNS response or an error message
:param query_name: Domain name of the query
:param query_type: Record type requested
:param port: The host port to send the query
"""
domain = dns.name.from_text(query_name)
addr = '127.0.0.1'
try:
query = dns.message.make_query(domain, query_type)
# Removes the default Recursion Desired Flag
query.flags = 0
result = dns.query.udp(query, addr, 3, port=port)
return result
except dns.exception.Timeout:
return "No response"
except: # pylint: disable=bare-except
return f'Unexpected error {sys.exc_info()[1]}'
def response_equality_check(response_a: Union[str, dns.message.Message],
response_b: Union[str, dns.message.Message]) -> bool:
"""
Checks whether the two input responses are same or not.
:param response_a: The first response
:param response_b: The second response
Either of the responses can be a string if there was an error during querying.
"""
if type(response_a) != type(response_b):
return False
if isinstance(response_a, str):
return response_a == response_b
if response_a.rcode() != response_b.rcode():
return False
a_flags = dns.flags.to_text(response_a.flags).split()
if 'RA' in a_flags:
a_flags.remove('RA')
b_flags = dns.flags.to_text(response_b.flags).split()
if 'RA' in b_flags:
b_flags.remove('RA')
if a_flags != b_flags:
return False
def check_section(section_a, section_b):
for record in section_a:
if record not in section_b:
return False
for record in section_b:
if record not in section_a:
return False
return True
if not check_section(response_a.question, response_b.question):
return False
if not check_section(response_a.answer, response_b.answer):
return False
if not check_section(response_a.additional, response_b.additional):
return False
# Check authority section only when both the answer sections are non-empty
# as implementations can add SOA/NS records to the authority section
if not (len(response_a.answer) and len(response_b.answer)):
return check_section(response_a.authority, response_b.authority)
return True
def group_responses(responses: List[ResponseType]) -> List[List[ResponseType]]:
"""
Groups (creates a list of lists) responses where in each group (inner list)
all the implementations have the same response.
:param responses: List of responses
"""
groups = [] # type: List[List[ResponseType]]
for response in responses:
found = False
for group in groups:
if response_equality_check(group[0][1], response[1]):
group.append(response)
found = True
break
if not found:
groups.append([response])
return groups
def groups_to_json(groups: List[List[ResponseType]]) -> List[Dict[str, Any]]:
"""
Returns the input grouped responses in a JSON format to output to a file.
:param groups: The list of implementations with the same response
"""
tmp = []
for same_response_group in groups:
servers = ""
for server in same_response_group:
servers += server[0] + " "
group = {}
group["Server/s"] = servers
group["Response"] = same_response_group[0][1] if isinstance(
same_response_group[0][1], str) else same_response_group[0][1].to_text().split('\n')
tmp.append(group)
return tmp
def prepare_containers(zone_file: pathlib.Path,
zone_domain: str,
cid: int,
restart: bool,
implementations: Dict[str, Tuple[bool, int]],
tag: str) -> None:
"""
Either starts new containers or reuses existing containers to prepare the
container to serve the input zone file.
Uses one process for each implementation tested to speedup preparation.
:param zone_file: The path to the zone file
:param zone_domain: The zone origin
:param cid: The unique id for all the containers
:param restart: Whether to load the input zone file in a new container
or reuse the existing container
:param implementations: Map from an implementation to a tuple of two items
- 1. whether to load that implementation container
2. which host port should be mapped to the container port 53
:param tag: Tag of the images to use
"""
process_pool = []
for impl, (check, port) in implementations.items():
if check:
process_pool.append(
Process(target=globals()[impl],
args=(zone_file, zone_domain,
str(cid) + '_' + impl + '_server',
port * cid, restart, tag)))
for process in process_pool:
process.start()
for process in process_pool:
process.join()
def get_queries(zoneid: str,
num_implemetations: int,
directory_path: pathlib.Path,
log_fp: TextIO,
errors: Dict[str, str]) -> List[Dict[str, Any]]:
"""
Returns a list of queries to test againt the zone file with zoneid.
If num_implementations is 1, then it looks for ExpectedResponses directory; otherwise
use Queries directory to get the queries.
:param zoneid: The unique zone identifier
:param num_implementations: The number of implementations being tested
:param directory_path: The path to the directory containing zone files and queries
:param log_fp: The log file pointer
:param errors: A map from zoneid to any error encountered during testing
"""
if num_implemetations == 1:
if not (directory_path / QUERY_RESPONSES).exists():
log_fp.write(
f'{datetime.now()}\tNo {QUERY_RESPONSES} directory with '
'expected responses exists\n')
errors[zoneid] = f'No {QUERY_RESPONSES} directory with expected responses exists'
return []
if not (directory_path / QUERY_RESPONSES / (zoneid + '.json')).exists():
log_fp.write(f'{datetime.now()}\tThere is no {zoneid}.json expected responses file in'
f' {QUERY_RESPONSES} directory\n')
errors[zoneid] = f'There is no {zoneid}.json expected responses file' \
f' in {QUERY_RESPONSES} directory\n'
return []
with open(directory_path / QUERY_RESPONSES / (zoneid + '.json'), 'r') as query_resp_fp:
return json.load(query_resp_fp)
else:
if not (directory_path / QUERIES).exists():
log_fp.write(
f'{datetime.now()}\tThere is no {QUERIES} directory\n')
errors[zoneid] = f'There is no {QUERIES} directory\n'
return []
if not (directory_path / QUERIES / (zoneid + '.json')).exists():
log_fp.write(
f'{datetime.now()}\tThere is no {zoneid}.json queries'
f' file in {QUERIES} directory\n')
errors[zoneid] = f'There is no {zoneid}.json queries file in {QUERIES} directory\n'
return []
with open(directory_path / QUERIES / (zoneid + '.json'), 'r') as query_fp:
return json.load(query_fp)
def run_test(zoneid: str,
parent_directory_path: pathlib.Path,
errors: Dict[str, str],
cid: int,
port_mappings: Dict[str, Tuple[bool, int]],
log_fp: TextIO,
tag: str) -> None:
"""
Runs the tests on the input single zone file.
:param zoneid: The unique zone identifier
:param parent_directory_path: The path to the directory containing zone files and queries
:param errors: A map from zoneid to any error encountered during testing
:param cid: The unique id for all the containers
:param implementations: Map from an implementation to a tuple of two items
- 1. whether to check that implementation 2. which host port
should be mapped to the container port 53
:param log_fp: The log file pointer
:param tag: Tag of the images to use
"""
has_dname = False
zone_domain = ''
with open(parent_directory_path / ZONE_FILES / (zoneid + '.txt'), 'r') as zone_fp:
for line in zone_fp:
if 'SOA' in line:
zone_domain = line.split('\t')[0]
if ' ' in zone_domain:
zone_domain = line.split()[0]
if 'DNAME' in line:
has_dname = True
if not zone_domain:
log_fp.write(f'{datetime.now()}\tSOA not found in {zoneid}\n')
errors[zoneid] = 'SOA not found'
return
implementations = copy.deepcopy(port_mappings)
# Exclude implementations that do not support DNAME type if the zone file has a DNAME record
if has_dname:
implementations['yadifa'] = (
False, implementations['yadifa'][1]) # Yadifa
implementations['trustdns'] = (
False, implementations['trustdns'][1]) # TrustDns
implementations['maradns'] = (
False, implementations['maradns'][1]) # MaraDns
total_impl_tested = sum(x[0] for x in list(implementations.values()))
queries = get_queries(zoneid, total_impl_tested,
parent_directory_path, log_fp, errors)
if not queries:
return
prepare_containers(parent_directory_path / ZONE_FILES /
(zoneid + '.txt'), zone_domain, cid, False, implementations, tag)
differences = []
for query in queries:
qname = query["Query"]["Name"]
qtype = query["Query"]["Type"]
responses = []
for impl, (check, port) in implementations.items():
if check:
respo = querier(qname, qtype, port * int(cid))
# If it is not a proper DNS response, try again with a new container
if not isinstance(respo, dns.message.Message):
single_impl = {}
single_impl[impl] = (True, port)
prepare_containers(parent_directory_path / ZONE_FILES /
(zoneid + '.txt'), zone_domain, cid, True, single_impl, tag)
log_fp.write(f'{datetime.now()}\tRestarted {impl}\'s container while '
f'testing zone {zoneid}\n')
time.sleep(1)
respo = querier(qname, qtype, port * int(cid))
responses.append((impl, respo))
# If there is only one implementation tested, use expected response/s
if len(responses) == 1:
exp_resps = query["Expected Response"]
for exp_res in exp_resps:
responses.append((exp_res["Server/s"],
dns.message.from_text('\n'.join(exp_res["Response"]))))
groups = group_responses(responses)
if len(groups) > 1:
difference = {}
difference["Query Name"] = qname
difference["Query Type"] = qtype
difference["Groups"] = groups_to_json(groups)
differences.append(difference)
if differences:
with open(parent_directory_path / DIFFERENCES / (zoneid + '.json'), 'w') as difference_fp:
json.dump(differences, difference_fp, indent=2)
def run_tests(parent_directory_path: pathlib.Path,
start: int,
end: Optional[int],
input_args: Namespace) -> None:
"""
Runs the tests in the parent directory path against all the implementations in
the input arguments and compares their responses. If a difference in responses
is found, then the responses are outputted as a JSON to the Differences directory.
:param parent_directory_path: The path to the directory containing zone files and queries
:param start: The start index of the tests
:param end: The end index of the tests
:param input_args: The input arguments
"""
errors = {} # type: Dict[str, str]
i = 0
timer = time.time()
sub_timer = time.time()
implementations = get_ports(input_args)
tag = ':oct'
if input_args.latest:
tag = ':latest'
start_containers(input_args.id, implementations, tag)
# Create and dump logs to a file
with open(parent_directory_path / (str(input_args.id) + '_log.txt'), 'w', 1) as log_fp:
for zone in sorted((parent_directory_path / ZONE_FILES).iterdir(),
key=lambda x: int(x.stem))[start:end]:
log_fp.write(f'{datetime.now()}\tChecking zone: {zone.stem}\n')
run_test(zone.stem, parent_directory_path, errors,
input_args.id, implementations, log_fp, tag)
i += 1
if i % 25 == 0:
log_fp.write(
f'{datetime.now()}\tTime taken for {start + i - 25} - {start + i}: '
f'{time.time()-sub_timer}s\n')
sub_timer = time.time()
log_fp.write(
f'{datetime.now()}\tTotal time for checking from {start}-{end if end else i}: '
f'{time.time()-timer}s\n')
log_fp.write("Errors:\n")
log_fp.write(str(errors))
remove_container(input_args.id)
def check_non_negative(value: str) -> int:
"""Check if the input value is non-negative"""
ivalue = int(value)
if ivalue < 0:
raise ArgumentTypeError(f"{value} is an invalid range value")
return ivalue
if __name__ == '__main__':
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter,
description='Runs tests with valid zone files on different '
'implementations. Either compares responses from mulitple '
'implementations with each other or uses a '
'expected response to flag differences '
'(only when one implementation is passed for testing)')
parser.add_argument('-path', metavar='DIRECTORY_PATH', default=SUPPRESS,
help='The path to the directory containing ZoneFiles and '
'either Queries or ExpectedResponses directories.'
'(default: Results/ValidZoneFileTests/)')
parser.add_argument('-id', type=int, default=1, choices=range(1, 6),
help='Unique id for all the containers '
'(useful when running comparison in parallel).')
parser.add_argument('-r', nargs=2, type=check_non_negative, metavar=('START', 'END'),
default=SUPPRESS,
help='The range of tests to compare. (default: All tests)')
parser.add_argument('-b', help='Disable Bind.', action="store_true")
parser.add_argument('-n', help='Disable Nsd.', action="store_true")
parser.add_argument('-k', help='Disable Knot.', action="store_true")
parser.add_argument('-p', help='Disable PowerDns.', action="store_true")
parser.add_argument('-c', help='Disable CoreDns.', action="store_true")
parser.add_argument('-y', help='Disable Yadifa.', action="store_true")
parser.add_argument('-m', help='Disable MaraDns.', action="store_true")
parser.add_argument('-t', help='Disable TrustDns.', action="store_true")
parser.add_argument('-e', help='Disable Technitium.', action="store_true")
parser.add_argument(
'-l', '--latest', help='Test using latest image tag.', action="store_true")
args = parser.parse_args()
if "path" in args:
dir_path = pathlib.Path(args.path)
else:
dir_path = pathlib.Path("Results/ValidZoneFileTests")
if not (dir_path / ZONE_FILES).exists():
sys.exit(
f'The directory {dir_path} does not have ZoneFiles directory')
checked_implementations = (not args.b) + (not args.n) + (not args.k) + \
(not args.p) + (not args.c) + (not args.y) + \
(not args.m) + (not args.t) + (not args.e)
if checked_implementations == 0:
sys.exit('Enable at least one implementation')
if checked_implementations < 2:
if not (dir_path / QUERY_RESPONSES).exists():
sys.exit('Either choose at least two implementations to perform differential testing or'
f' the directory "{dir_path}" should have ExpectedResponses directory')
if not (dir_path / QUERIES).exists() and \
not (dir_path / QUERY_RESPONSES).exists():
sys.exit(
f'There is no Queries or ExpectedResponses directory in "{dir_path}".')
if "r" in args:
START = args.r[0]
END = args.r[1]
else:
START = 0
END = None
(dir_path / DIFFERENCES).mkdir(parents=True, exist_ok=True)
run_tests(dir_path, START, END, args)