Skip to content

Commit

Permalink
1.0-beta
Browse files Browse the repository at this point in the history
- Bug fixes
- Scan URLs from file
- JSON output to file
- Custom HTTP header support
- Switch to ignore helpful messages
  • Loading branch information
s0md3v authored Jan 16, 2020
1 parent 2e145e6 commit f8e5747
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 127 deletions.
16 changes: 4 additions & 12 deletions core/requester.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,10 @@

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:70.0) Gecko/20100101 Firefox/70.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip',
'DNT': '1',
'Connection': 'close',
}

def requester(url, scheme, origin):

def requester(url, scheme, headers, origin):
headers['Origin'] = scheme + origin
response = requests.get(url, headers=headers, verify=False).headers
return response.get('Access-Control-Allow-Origin', None)

for key, value in response.items():
if key.lower() == 'access-control-allow-origin':
return response
129 changes: 81 additions & 48 deletions core/tests.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,92 @@
import sys
import time

from core.utils import host
from core.requester import requester
from core.utils import host, load_json

details = load_json(sys.path[0] + '/db/details.json')

def passive_tests(url, acao_header):
def passive_tests(url, headers):
root = host(url)
acao_header, acac_header = headers['access-control-allow-origin'], headers.get('access-control-allow-credentials', None)
if acao_header == '*':
return 'Wildcard value'
info = details['wildcard value']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}
if root:
if root != host(acao_header):
print(acao_header)
return 'Third party allowed'
elif url.startswith('http://'):
return 'HTTP origin allowed'
else:
return None
else:
return 'Invalid value'

def active_tests(url, root, scheme, delay):
acao_header = requester(url, scheme, 'example.com')
if acao_header and acao_header == (scheme + 'example.com'):
return 'Origin reflected'
time.sleep(delay)

acao_header = requester(url, scheme, root + '.example.com')
if acao_header and acao_header == (scheme + root + '.example.com'):
return 'Post-domain wildcard'
time.sleep(delay)

acao_header = requester(url, scheme, 'd3v' + root)
if acao_header and acao_header == (scheme + 'd3v' + root):
return 'Pre-domain wildcard'
time.sleep(delay)

acao_header = requester(url, '', 'null')
if acao_header and acao_header == 'null':
return 'Null origin allowed'
time.sleep(delay)

acao_header = requester(url, scheme, root + '%60.example.com')
if acao_header and '`.example.com' in acao_header:
return 'Broken parser'

if root.count('.') > 1:
if host(acao_header) and root != host(acao_header):
info = details['third party allowed']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}


def active_tests(url, root, scheme, header_dict, delay):
headers = requester(url, scheme, header_dict, 'example.com')
if headers:
acao_header, acac_header = headers['access-control-allow-origin'], headers.get('access-control-allow-credentials', None)
if acao_header and acao_header == (scheme + 'example.com'):
info = details['origin reflected']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}
elif not acao_header:
return
time.sleep(delay)
spoofed_root = root.replace('.', 'x', 1)
acao_header = requester(url, scheme, spoofed_root)
if acao_header and host(acao_header) == spoofed_root:
return 'Unescaped regex'

headers = requester(url, scheme, header_dict, root + '.example.com')
acao_header, acac_header = headers['access-control-allow-origin'], headers.get('access-control-allow-credentials', None)
if acao_header and acao_header == (scheme + root + '.example.com'):
info = details['post-domain wildcard']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}
time.sleep(delay)

headers = requester(url, scheme, header_dict, 'd3v' + root)
acao_header, acac_header = headers['access-control-allow-origin'], headers.get('access-control-allow-credentials', None)
if acao_header and acao_header == (scheme + 'd3v' + root):
info = details['pre-domain wildcard']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}
time.sleep(delay)

acao_header = requester(url, 'http', root)
if acao_header and acao_header.startswith('http://'):
return 'HTTP origin allowed'
else:
return passive_tests(url, acao_header)
headers = requester(url, '', header_dict, 'null')
acao_header, acac_header = headers['access-control-allow-origin'], headers.get('access-control-allow-credentials', None)
if acao_header and acao_header == 'null':
info = details['null origin allowed']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}
time.sleep(delay)

headers = requester(url, scheme, header_dict, root + '%60.example.com')
acao_header, acac_header = headers['access-control-allow-origin'], headers.get('access-control-allow-credentials', None)
if acao_header and '`.example.com' in acao_header:
info = details['broken parser']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}
time.sleep(delay)

if root.count('.') > 1:
spoofed_root = root.replace('.', 'x', 1)
headers = requester(url, scheme, header_dict, spoofed_root)
acao_header, acac_header = headers['access-control-allow-origin'], headers.get('access-control-allow-credentials', None)
if acao_header and host(acao_header) == spoofed_root:
info = details['unescaped regex']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}
time.sleep(delay)
headers = requester(url, 'http', header_dict, root)
acao_header, acac_header = headers['access-control-allow-origin'], headers.get('access-control-allow-credentials', None)
if acao_header and acao_header.startswith('http://'):
info = details['http origin allowed']
info['acao header'] = acao_header
info['acac header'] = acac_header
return {url : info}
else:
return passive_tests(url, headers)
58 changes: 58 additions & 0 deletions core/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,69 @@
import os
import tld
import json
import tempfile


def host(string):
if string and '*' not in string:
return tld.get_fld(string, fix_protocol=True, fail_silently=True)


def load_json(file):
with open(file) as f:
return json.load(f)


def format_result(result):
new_result = {}
for each in result:
if each:
for i in each:
new_result[i] = each[i]
return new_result


def create_url_list(target_url, inp_file):
urls = []
if inp_file:
with open(inp_file, 'r') as file:
for line in file:
if line.startswith(('http://', 'https://')):
urls.append(line.rstrip('\n'))
if target_url and target_url.startswith(('http://', 'https://')):
urls.append(target_url)
return urls


def prompt(default=None):
editor = 'nano'
with tempfile.NamedTemporaryFile(mode='r+') as tmpfile:
if default:
tmpfile.write(default)
tmpfile.flush()

child_pid = os.fork()
is_child = child_pid == 0

if is_child:
os.execvp(editor, [editor, tmpfile.name])
else:
os.waitpid(child_pid, 0)
tmpfile.seek(0)
return tmpfile.read().strip()


def extractHeaders(headers):
headers = headers.replace('\\n', '\n')
sorted_headers = {}
matches = re.findall(r'^?(.*?):\s(.*?)[\n$]', headers)
for match in matches:
header = match[0]
value = match[1]
try:
if value[-1] == ',':
value = value[:-1]
sorted_headers[header] = value
except IndexError:
pass
return sorted_headers
113 changes: 77 additions & 36 deletions corsy.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,95 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import sys
import json
import argparse

from core.utils import load_json, host
from core.tests import active_tests
from core.colors import white, green, info, bad, good, grey, end
from core.utils import host, prompt, format_result, create_url_list
from core.colors import bad, end, red, good, grey, info, green, white

try:
from urllib.parse import urlparse
except ImportError:
from urlparse import urlparse

print('''
%sCORSY %s{%sv0.2-beta%s}%s
%sCORSY %s{%sv1.0-beta%s}%s
''' % (green, white, grey, white, end))


try:
import concurrent.futures
from urllib.parse import urlparse
except ImportError:
print('%s corsy needs Python > 3.4 to run.' % bad)
quit()

parser = argparse.ArgumentParser()
parser.add_argument('-u', help='target url', dest='url')
parser.add_argument('-u', help='target url', dest='target')
parser.add_argument('-o', help='json output file', dest='json_file')
parser.add_argument('-i', help='input file urls/subdomains', dest='inp_file')
parser.add_argument('-t', help='thread count', dest='threads', type=int, default=2)
parser.add_argument('-d', help='request delay', dest='delay', type=float, default=0)
parser.add_argument('-q', help='don\'t print help tips', dest='quiet', action='store_true')
parser.add_argument('--headers', help='add headers', dest='header_dict', nargs='?', const=True)
args = parser.parse_args()

target_url = args.url
delay = args.delay
quiet = args.quiet
target = args.target
threads = args.threads
inp_file = args.inp_file
json_file = args.json_file
header_dict = args.header_dict

if type(header_dict) == bool:
header_dict = extractHeaders(prompt())
elif type(header_dict) == str:
header_dict = extractHeaders(header_dict)
else:
header_dict = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:70.0) Gecko/20100101 Firefox/70.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip',
'DNT': '1',
'Connection': 'close',
}

urls = create_url_list(target, inp_file)

def cors(target, header_dict, delay):
url = target
root = host(url)
parsed = urlparse(url)
netloc = parsed.netloc
scheme = parsed.scheme
url = scheme + '://' + netloc
return active_tests(url, root, scheme, header_dict, delay)


def cors(target, delay, scheme=False):
url = target
if not target.startswith(('http://', 'https://')):
url = scheme + '://' + url
root = host(url)
parsed = urlparse(url)
netloc, scheme = parsed.netloc, parsed.scheme
url = scheme + '://' + netloc
active = active_tests(url, root, scheme, delay)
return active

details = load_json('./db/details.json')

if target_url:
if target_url.startswith(('http://', 'https://')):
result = cors(target_url, delay)
if result:
print('%s Misconfiguration found!' % good)
print('%s Title: %s' % (info, result))
print('%s Description: %s' % (info, details[result.lower()]['Description']))
print('%s Severity: %s' % (info, details[result.lower()]['Severity']))
print('%s Exploitation: %s' % (info, details[result.lower()]['Exploitation']))
else:
print('%s No misconfiguration found.' % bad)
else:
print('%s Please use https://example.com not example.com' % bad)
if urls:
print('%s Estimated scan time: %i secs' % (info, round(len(urls) * 1.75)))
results = []
threadpool = concurrent.futures.ThreadPoolExecutor(max_workers=threads)
futures = (threadpool.submit(cors, url, header_dict, delay) for url in urls)
for each in concurrent.futures.as_completed(futures):
result = each.result()
results.append(result)
if result:
for i in result:
print('%s URL: %s' % (good, i))
print(' %s-%s Class: %s' % (green, end, result[i]['class']))
if not quiet:
print(' %s-%s Description: %s' % (green, end, result[i]['description']))
print(' %s-%s Severity: %s' % (green, end, result[i]['severity']))
print(' %s-%s Exploitation: %s' % (green, end, result[i]['exploitation']))
print(' %s-%s ACAO Header: %s' % (green, end, result[i]['acao header']))
print(' %s-%s ACAC Header: %s\n' % (green, end, result[i]['acac header']))
results = format_result(results)
if results:
if json_file:
with open(json_file, 'w+') as file:
json.dump(results, file, indent=4)
else:
print('%s No misconfigurations found.' % bad)
else:
print('\n' + parser.format_help().lower())
print('%s No valid URLs to test.' % bad)
Loading

0 comments on commit f8e5747

Please sign in to comment.