-
Notifications
You must be signed in to change notification settings - Fork 1
/
portscan.py
136 lines (110 loc) · 4.2 KB
/
portscan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import shelve
import os
import sys
import socket
import datetime
from datetime import datetime
from servicer import check_output, get_service_name
PORTS_TO_SCAN = [22, 25, 69, 80, 4000, 7777, 7890, 25565, ] # range(1, 65535)
def scan(target):
try:
status = {"target": target}
# scan ports
for port in PORTS_TO_SCAN:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.setdefaulttimeout(1)
result = s.connect_ex((target, port))
if result == 0:
print(f"found {port} open ")
s.sendall("\n".encode())
try:
status[port] = s.recv(4096).decode("utf-8")
# print(status[port])
except socket.error:
status[port] = None
print("no response from server")
service_id, version = check_output(status[port])
status[port] = {"version": version, "serviceID": service_id,
"service": get_service_name(service_id), "response": status[port], "new": False}
s.close()
db = shelve.open(get_path())
print("Scan complete")
if target not in db:
db[target] = {"last": {}}
port_history = db[target]["last"]
ports_old = [k for k in port_history.keys() if isinstance(k, int)]
ports_new = [k for k in status.keys() if isinstance(k, int)]
print(f"old ports: {ports_old}")
print(f"new ports: {ports_new}")
# TODO check only scanned ports
unchanged_ports = [port for port in ports_new if port in ports_old]
for port in unchanged_ports:
if 'removed' in port_history[port]:
continue
if port_history[port]['version'] != status[port]['version']:
print(
f"Service {port_history[port]['version']} changed to {status[port]['version']}")
status[port]["updated"] = port_history[port]
# TODO check only scanned ports
removed_ports = [port for port in ports_old if port not in ports_new]
for port in removed_ports:
print(f"Removed {port}, {port_history[port]['version']}")
status[port] = {"removed": True}
# TODO check only scanned ports
new_ports = [port for port in ports_new if port not in ports_old]
for port in new_ports:
print(f"Added {port}, {status[port]['version']}")
status[port]["new"] = True
status["summary"] = {"open": len(
ports_new), "new": len(new_ports)}
# save current scan and overwrite the latest scan
now_ts = datetime.now().timestamp()
x = db[target]
x[str(now_ts)] = status
x["last"] = status
db[target] = x
return target + "/" + str(now_ts)
except socket.gaierror:
print("\n Hostname Could Not Be Resolved")
sys.exit()
finally:
db.close()
def get_last(target):
db = shelve.open(get_path(), flag='c')
try:
if target in db:
return db[target]["last"]
return {}
finally:
db.close()
def get_scans():
db = shelve.open(get_path(), flag='c')
try:
x = ([{"id": scan, "target": target, "summary": db[target][scan]["summary"]}
for target in db for scan in db[target] if scan != "last"])
x.sort(key=lambda x: x["id"], reverse=True)
return x
finally:
db.close()
def get_scans_from_target(target):
db = shelve.open(get_path(), flag='c')
try:
x = ([{"id": scan, "target": target, "summary": db[target][scan]["summary"]}
for scan in db[target] if scan != "last"])
x.sort(key=lambda x: x["id"], reverse=True)
return x
finally:
db.close()
def get_scan(timestamp, target):
return shelve.open(get_path(), flag='c')[target][timestamp]
def get_targets():
return list(shelve.open(get_path()).keys())
def get_path() -> str:
return "port_data"
if __name__ == "__main__":
# print(scan())
# print(get_last())
print(get_scans())
# print(get_targets())
# print(get_scan('1616876222.176661'))
# print(get_scans("localhost"))