-
-
Notifications
You must be signed in to change notification settings - Fork 60
/
test_network_post_processing.py
128 lines (97 loc) · 3.88 KB
/
test_network_post_processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Copyright 2017 Fedele Mantuano (https://www.linkedin.com/in/fmantuano/)
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
import os
import six
import unittest
import simplejson as json
try:
from collections import ChainMap
except ImportError:
from chainmap import ChainMap
from context import DEFAULTS
OPTIONS = ChainMap(os.environ, DEFAULTS)
logging.getLogger().addHandler(logging.NullHandler())
class TestPostProcessing(unittest.TestCase):
def setUp(self):
self.ipaddress = "8.8.8.8"
@unittest.skipIf(OPTIONS["VIRUSTOTAL_ENABLED"].capitalize() == "False",
"VirusTotal test skipped: "
"set env variable 'VIRUSTOTAL_ENABLED' to True")
def test_virustotal(self):
"""Test add VirusTotal processing."""
from context import networks
virustotal = networks.virustotal
conf = {"enabled": True,
"api_key": OPTIONS["VIRUSTOTAL_APIKEY"]}
results = {}
self.assertFalse(results)
virustotal(conf, self.ipaddress, results)
self.assertTrue(results)
self.assertIn("virustotal", results)
self.assertIsInstance(results["virustotal"], six.text_type)
r = json.loads(results["virustotal"])
self.assertTrue(r)
self.assertIsInstance(r, dict)
@unittest.skipIf(OPTIONS["SHODAN_ENABLED"].capitalize() == "False",
"Shodan.io test skipped: "
"set env variable 'SHODAN_ENABLED' to True")
def test_shodan(self):
"""Test add Shodan processing."""
from context import networks
shodan = networks.shodan
# Complete parameters
conf = {"enabled": True,
"api_key": OPTIONS["SHODAN_APIKEY"]}
results = {}
self.assertFalse(results)
shodan(conf, self.ipaddress, results)
self.assertIn("shodan", results)
self.assertIsInstance(results["shodan"], six.text_type)
r = json.loads(results["shodan"])
self.assertTrue(r)
self.assertIsInstance(r, dict)
self.assertIn("data", r)
results = {}
shodan(conf, "8.8.8", results)
self.assertFalse(results)
@unittest.skipIf(
OPTIONS["SHODAN_ENABLED"].capitalize() == "False" or OPTIONS[
"VIRUSTOTAL_ENABLED"].capitalize() == "False",
"Complete post processing test skipped: "
"set env variables 'SHODAN_ENABLED' and "
"'VIRUSTOTAL_ENABLED' to True")
def test_processors(self):
"""Test all post processing."""
from context import networks
from operator import itemgetter
p_ordered = [i[0] for i in sorted(
networks.processors, key=itemgetter(1))]
conf = {
"virustotal": {"enabled": True,
"api_key": OPTIONS["VIRUSTOTAL_APIKEY"]},
"shodan": {"enabled": True,
"api_key": OPTIONS["SHODAN_APIKEY"]}}
results = {}
self.assertFalse(results)
for p in p_ordered:
p(conf[p.__name__], self.ipaddress, results)
self.assertTrue(results)
self.assertIn("shodan", results)
self.assertIsInstance(results["shodan"], six.text_type)
self.assertIn("virustotal", results)
self.assertIsInstance(results["virustotal"], six.text_type)
if __name__ == '__main__':
unittest.main(verbosity=2)