-
Notifications
You must be signed in to change notification settings - Fork 0
/
packet-analyzer.py
137 lines (106 loc) · 5.22 KB
/
packet-analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#!/usr/bin/env python3
import socket
import struct
import textwrap
import time
try:
file1 = open('packet-analyzer-header.txt', 'r')
print(' ')
print (file1.read())
file1.close()
except IOError:
print('\nBanner File not found!')
def main():
# create socket
connect = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3))
print("\n[+] Packet Analyzer is active. Displaying detailed Packet analysis on current interface.")
# infinite loop to sniff packets
while True:
raw_data, addr = connect.recvfrom(65535)
dest_mac, src_mac, protocol, data = ethernet_frame(raw_data)
print('\n\n-------------------------------------------------------------------------------------------------------------------------')
print('\n\n[+] Ethernet Frame =>')
print('\t - Destination MAC : {}, Source MAC : {}, Protocol : {}'.format(dest_mac, src_mac, protocol))
# for ethernet protocol 8 (regular ipv4 traffic)
if protocol == 8:
(version, headerlength, ttl, protocol, src, dest, data) = ipv4_packet(data)
print('\n[+] IPv4 Header =>' )
print('\t - Version : {}, Header Length : {}, TTL : {}'.format(version, headerlength, ttl))
print('\t - Protocol : {}, Source : {}, Destination : {}'.format(protocol, src, dest))
# icmp packet
if protocol == 1:
(type, code, checksum, data) = icmp_packet(data)
print('\n[+] ICMP Packet =>')
print('\t - Type : {}, Code : {}, Checksum : {}'.format(type, code, checksum))
print('\t - Payload : ')
print(multi_line_formatter('\t\t ', data))
# tcp packet
elif protocol == 6:
(src_port, dest_port, sequence, ack, flag_urg, flag_ack, flag_psh, flag_rst, flag_syn, flag_fin, data) = tcp_packet(data)
print('\n[+] TCP Packet =>')
print('\t - Source Port : {}, Destination Port : {}'.format(src_port, dest_port))
print('\t - Sequence : {}, Acknowledgment : {}'.format(sequence, ack))
print('\t - Flags : ')
print('\t\t URG : {}, ACK : {}, PSH : {}, RST : {}, SYN : {}, FIN : {}'.format(flag_urg, flag_ack, flag_psh, flag_rst, flag_syn, flag_fin))
print('\t - Payload : ')
print(multi_line_formatter('\t\t ', data))
# udp packet
elif protocol == 17:
(src_port, dest_port, size, data) = udp_packet(data)
print('\n[+] TCP Packet =>')
print('\t - Source Port : {}, Destination Port : {}, Length : {}'.format(src_port, dest_port, size))
print('\t - Payload : ')
print(multi_line_formatter('\t\t ', data))
# other packets
else:
print('\n[+] Unidentified Packet =>')
print('\t - Payload : ')
print(multi_line_formatter('\t\t ', data))
time.sleep(5)
# unpack ethernet frame (total 14 bytes of sender, receiver and frame length info)
def ethernet_frame(data):
dest_mac, src_mac, protocol = struct.unpack('! 6s 6s H', data[:14])
return mac_formatter(dest_mac), mac_formatter(src_mac), socket.htons(protocol), data[14:]
# format mac address to human readable format (Ex: AA:BB:CC:DD:EE:FF)
def mac_formatter(bytes_addr):
bytes_str = map('{:02x}'.format, bytes_addr)
mac_addr = ':'.join(bytes_str).upper()
return mac_addr
# unpack ipv4 packet (header and payload)
def ipv4_packet(data):
version_headerlength = data[0]
version = version_headerlength >> 4
headerlength = (version_headerlength & 15) * 4
ttl, protocol, src, dest = struct.unpack('! 8x B B 2x 4s 4s', data[:20])
return version, headerlength, ttl, protocol, ipv4_formatter(src), ipv4_formatter(dest), data[headerlength:]
# format ipv4 address to human readable format (Ex: 255.255.255.255)
def ipv4_formatter(addr):
return '.'.join(map(str, addr))
# unpack icmp packet
def icmp_packet(data):
type, code, checksum = struct.unpack('! B B H', data[:4])
return type, code, checksum, data[4:]
# unpack tcp packet
def tcp_packet(data):
(src_port, dest_port, sequence, ack, offset_reserved_tcpflags) = struct.unpack('! H H L L H', data[:14])
offset = (offset_reserved_tcpflags >> 12) * 4
flag_urg = (offset_reserved_tcpflags & 32) >> 5
flag_ack = (offset_reserved_tcpflags & 16) >> 4
flag_psh = (offset_reserved_tcpflags & 8) >> 3
flag_rst = (offset_reserved_tcpflags & 4) >> 2
flag_syn = (offset_reserved_tcpflags & 2) >> 1
flag_fin = offset_reserved_tcpflags & 1
return src_port, dest_port, sequence, ack, flag_urg, flag_ack, flag_psh, flag_rst, flag_syn, flag_fin, data[offset:]
# unpack udp packet
def udp_packet(data):
src_port, dest_port, size = struct.unpack('! H H 2x H', data[:8])
return src_port, dest_port, size, data[8:]
# format multi-line data
def multi_line_formatter(prefix, string, size=80):
size -= len(prefix)
if isinstance(string, bytes):
string = ''.join(r'\x{:02x}'.format(byte) for byte in string)
if size % 2:
size -= 1
return '\n'.join([prefix + line for line in textwrap.wrap(string, size)])
main()