forked from Asuri-Team/pwn-sandbox
-
Notifications
You must be signed in to change notification settings - Fork 7
/
analyser.py
164 lines (125 loc) · 4.35 KB
/
analyser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!/usr/bin/env python2
import hexdump as hd
import struct
import argparse
import os
import glob
class Package(object):
type = None
content = None
def __init__(self, type, content):
self.type = type
self.content = content
def print_hex(self):
print("%s:"%(self.type))
hd.hexdump(self.content)
def print_repr(self):
print("%s = %s" % (self.type, repr(self.content)))
def print_c_array(self):
t = ""
n = 0
for i in self.content:
if n%8 == 0:
t += "\n "
t += "0x%02x, "%(ord(i))
n += 1
t = t.strip(" ").strip(",")
t += "\n"
print("%s = {%s};"%(self.type, t))
@staticmethod
def from_tuple(tuple):
return Package("send" if tuple[0] == 1 else "receive", tuple[1])
class FdReader(object):
def __init__(self, filename):
self._file = open(filename, "r")
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self._file.close();
def _readall(self):
self.content = [];
buf = self._file.read();
while len(buf) > 0:
if len(buf) < 4:
raise ValueError("Length not enough, data corrupted!");
length = struct.unpack("!I", buf[:4])[0]
fd = length >> 24
length = length & 0x00ffffff
buf = buf[4:]
if len(buf) < length:
raise ValueError("Length not enough, data corrupted!");
self.content.append((fd, buf[:length]))
buf = buf[length:]
def _merge(self):
tmp_fd = -1;
buf = "";
new_content = []
for i in self.content:
if tmp_fd == -1:
tmp_fd = i[0]
buf = i[1]
continue
if tmp_fd == i[0]:
buf += i[1]
else :
new_content.append(Package.from_tuple((tmp_fd, buf)))
tmp_fd = i[0]
buf = i[1]
new_content.append(Package.from_tuple((tmp_fd, buf)))
self.content = new_content
def __iter__(self):
self._readall()
self._merge()
return iter(self.content)
def parse_arg():
parser = argparse.ArgumentParser(description="This is a simple traffic analyser for pwn-sandbox.")
parser.add_argument("-f", "--format", type=str, action="store", help="specify output format.", choices=("hexdump", "python", "c"), default="hexdump")
parser.add_argument("-s", action="store_true", help="perfome auto search (execve and clone).")
parser.add_argument("-S", "--search", help="search syscall with given string.")
parser.add_argument("path", help="log path or traffic file generated by pwn-sandbox.")
return parser.parse_args()
def print_file(path, fmt):
with FdReader(path) as reader:
for i in reader:
if fmt == "hexdump":
i.print_hex()
elif fmt == "python":
i.print_repr()
elif fmt == "c":
i.print_c_array()
def parse_file(args):
print_file(args.path, args.format)
def parse_dir(args):
path = args.path
keywords = []
files = []
if args.s:
keywords = ["execve", "clone"]
if args.search:
keywords.extend(map(lambda x: x.strip(), args.search.split("|")))
if keywords:
for f in glob.glob(os.path.join(path, "*-syscall")):
file = os.path.join(path, f)
with open(file, "r") as fd:
for line in fd.readlines():
if any(kw in line for kw in keywords):
files.append(file.replace("-syscall", "-std"))
break
else:
for f in glob.glob(os.path.join(path, "*-std")):
file = os.path.join(path, f)
files.append(file)
for f in files:
print("%s:"%(f))
print_file(f, args.format)
print("")
def main():
args = parse_arg()
if os.path.isfile(args.path):
parse_file(args)
elif os.path.isdir(args.path):
parse_dir(args)
else:
print("Path not exists.")
if __name__ == "__main__":
main()