Skip to content

Commit

Permalink
Merge branch 'develop'
Browse files Browse the repository at this point in the history
  • Loading branch information
LoRexxar committed Dec 27, 2017
2 parents b3adbb9 + 1c5b544 commit 5d462f3
Show file tree
Hide file tree
Showing 13 changed files with 167 additions and 37 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,8 @@ result/*

# real_test
tests/real_test/*

# new rule for test
rules/secret/*
!rules/secret/demo.py
!rules/secret/__init.py
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ Cobra-W是从Cobra2.0发展而来的分支,着眼于白帽子使用的白盒
- Cobra-W 0.7.3
- 修复了部分处理ast的bug
- 完善了cvi-1009
- 2017-12-27
- Cobra-W 0.8
- 全新的secret机制,自定义解决不同cms的过滤不一,导致的扫描误差大问题
- 更新修复了判断是否被修复的问题

# README(开发文档)

Expand Down
5 changes: 3 additions & 2 deletions cobra/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ def main():
parser_group_scan.add_argument('-f', '--format', dest='format', action='store', default='csv', metavar='<format>', choices=['html', 'json', 'csv', 'xml'], help='vulnerability output format (formats: %(choices)s)')
parser_group_scan.add_argument('-o', '--output', dest='output', action='store', default='', metavar='<output>', help='vulnerability output STREAM, FILE')
parser_group_scan.add_argument('-r', '--rule', dest='special_rules', action='store', default=None, metavar='<rule_id>', help='specifies rules e.g: 1000, 1001')
parser_group_scan.add_argument('-s', '--secret', dest='secret_name', action='store', default=None, metavar='<secret_name>', help='secret repair function e.g: wordpress')
parser_group_scan.add_argument('-d', '--debug', dest='debug', action='store_true', default=False, help='open debug mode')

args = parser.parse_args()
Expand All @@ -53,7 +54,7 @@ def main():
logger.setLevel(logging.DEBUG)
logger.debug('[INIT] set logging level: debug')

if args.target is '' and args.output is '':
if args.target is '' and args.output is '':
parser.print_help()
exit()

Expand All @@ -66,7 +67,7 @@ def main():
}
Running(a_sid).status(data)

cli.start(args.target, args.format, args.output, args.special_rules, a_sid)
cli.start(args.target, args.format, args.output, args.special_rules, a_sid, args.secret_name)

t2 = time.time()
logger.info('[INIT] Done! Consume Time:{ct}s'.format(ct=t2 - t1))
Expand Down
3 changes: 2 additions & 1 deletion cobra/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
__issue_page__ = 'https://github.com/LoRexxar/Cobra-W/issues/new'
__python_version__ = sys.version.split()[0]
__platform__ = platform.platform()
__version__ = '0.7.3'
__version__ = '0.8'
__author__ = 'LoRexxar'
__author_email__ = '[email protected]'
__license__ = 'MIT License'
Expand All @@ -25,6 +25,7 @@
__epilog__ = """Usage:
python {m} -t {td}
python {m} -t {td} -r 1000, 1001
python {m} -t {td} -s wordpress
python {m} -t {td} -f json -o /tmp/report.json
python {m} -t {td} --debug
""".format(m='cobra.py', td='tests/vulnerabilities')
6 changes: 3 additions & 3 deletions cobra/cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
class CAST(object):
languages = ['php', 'java']

def __init__(self, rule, target_directory, file_path, line, code, files=None, rule_class=None, ast=False):
def __init__(self, rule, target_directory, file_path, line, code, files=None, rule_class=None, repair_functions=[]):
self.target_directory = target_directory
self.data = []
self.rule = rule
Expand All @@ -37,7 +37,7 @@ def __init__(self, rule, target_directory, file_path, line, code, files=None, ru
self.param_value = None
self.language = None
self.sr = rule_class
self.ast = ast
self.repair_functions = repair_functions
for language in self.languages:
if self.file_path[-len(language):].lower() == language:
self.language = language
Expand Down Expand Up @@ -237,7 +237,7 @@ def is_controllable_param(self):

logger.debug("[Deep AST] Start AST for param {param_name}".format(param_name=param_name))

_is_co, _cp, expr_lineno = anlysis_params(param_name, param_content, self.file_path, self.line, self.sr.vul_function)
_is_co, _cp, expr_lineno = anlysis_params(param_name, param_content, self.file_path, self.line, self.sr.vul_function, self.repair_functions)

if _is_co == 1:
logger.debug("[AST] Is assign string: `Yes`")
Expand Down
5 changes: 3 additions & 2 deletions cobra/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ def get_sid(target, is_a_sid=False):
return sid.lower()


def start(target, formatter, output, special_rules, a_sid=None):
def start(target, formatter, output, special_rules, a_sid=None, secret_name=None):
"""
Start CLI
:param secret_id: secret id or name?
:param target: File, FOLDER, GIT
:param formatter:
:param output:
Expand Down Expand Up @@ -86,7 +87,7 @@ def start(target, formatter, output, special_rules, a_sid=None):
# scan
scan(target_directory=target_directory, a_sid=a_sid, s_sid=s_sid, special_rules=pa.special_rules,
language=main_language, framework=main_framework, file_count=file_count, extension_count=len(files),
files=files)
files=files, secret_name=secret_name)
except KeyboardInterrupt as e:
logger.critical("[!] KeyboardInterrupt, exit...")
exit()
Expand Down
59 changes: 41 additions & 18 deletions cobra/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,15 @@ def score2level(score):
return '{l}-{s}: {ast}'.format(l=level[:1], s=score_full, ast=a)


def scan_single(target_directory, single_rule, files=None, ast=False):
def scan_single(target_directory, single_rule, files=None, secret_name=None):
try:
return SingleRule(target_directory, single_rule, files, ast).process()
return SingleRule(target_directory, single_rule, files, secret_name).process()
except Exception:
raise


def scan(target_directory, a_sid=None, s_sid=None, special_rules=None, language=None, framework=None, file_count=0,
extension_count=0, files=None):
extension_count=0, files=None, secret_name=None):
r = Rule()
vulnerabilities = r.vulnerabilities
rules = r.rules(special_rules)
Expand Down Expand Up @@ -182,7 +182,7 @@ def store(result):
vulnerability=rule.vulnerability,
language=rule.language
))
result = scan_single(target_directory, rule, files)
result = scan_single(target_directory, rule, files, secret_name)
store(result)

# print
Expand Down Expand Up @@ -236,13 +236,13 @@ def store(result):


class SingleRule(object):
def __init__(self, target_directory, single_rule, files, ast=False):
def __init__(self, target_directory, single_rule, files, secret_name=None):
self.target_directory = target_directory
self.find = Tool().find
self.grep = Tool().grep
self.sr = single_rule
self.files = files
self.ast = ast
self.secret_name = secret_name
# Single Rule Vulnerabilities
"""
[
Expand Down Expand Up @@ -312,7 +312,7 @@ def process(self):
try:
datas = Core(self.target_directory, vulnerability, self.sr, 'project name',
['whitelist1', 'whitelist2'], test=is_test, index=index,
files=self.files).scan()
files=self.files, secret_name=self.secret_name).scan()

if len(datas) == 3:
is_vulnerability, reason, data = datas
Expand Down Expand Up @@ -369,7 +369,7 @@ def parse_match(self, single_match):

class Core(object):
def __init__(self, target_directory, vulnerability_result, single_rule, project_name, white_list, test=False,
index=None, files=None, count=0):
index=None, files=None, secret_name=None):
"""
Initialize
:param: target_directory:
Expand All @@ -380,9 +380,11 @@ def __init__(self, target_directory, vulnerability_result, single_rule, project_
:param test: is test
:param index: vulnerability index
:param files: core file list
:param count:
:param secret_name: secret name
"""
self.data = []
self.repair_dict = {}
self.repair_functions = []

self.target_directory = target_directory

Expand All @@ -391,11 +393,13 @@ def __init__(self, target_directory, vulnerability_result, single_rule, project_
# self.code_content = vulnerability_result.code_content.strip()
self.code_content = vulnerability_result.code_content
self.files = files
self.secret_name = secret_name

self.rule_match = single_rule.match
self.rule_match_mode = single_rule.match_mode
self.vul_function = single_rule.vul_function
self.cvi = single_rule.svid
self.lan = single_rule.language
self.single_rule = single_rule

self.project_name = project_name
Expand Down Expand Up @@ -509,6 +513,27 @@ def is_can_parse(self):
return True
return False

def init_repair(self):
"""
初始化修复函数规则
:return:
"""
# self.single_rule.svid
a = __import__('rules.secret.demo', fromlist=['IS_REPAIR_DEFAULT'])
self.repair_dict = getattr(a, 'IS_REPAIR_DEFAULT')

if self.secret_name is not None:
try:
a = __import__('rules.secret.' + self.secret_name, fromlist=[self.secret_name])
self.repair_dict = dict(self.repair_dict.items() + a.items())
except ImportError:
logger.warning('[AST][INIT] Secret_name init error... No nodule named {}'.format(self.secret_name))

# init
for key in self.repair_dict:
if self.single_rule.svid in self.repair_dict[key]:
self.repair_functions.append(key)

def scan(self):
"""
Scan vulnerabilities
Expand Down Expand Up @@ -553,8 +578,9 @@ def scan(self):
logger.debug('[CVI-{cvi}] match-mode {mm}'.format(cvi=self.cvi, mm=self.rule_match_mode))
if self.file_path[-3:].lower() == 'php':
try:
self.init_repair()
ast = CAST(self.rule_match, self.target_directory, self.file_path, self.line_number,
self.code_content, files=self.files, rule_class=self.single_rule)
self.code_content, files=self.files, rule_class=self.single_rule, repair_functions=self.repair_functions)

# only match
if self.rule_match_mode == const.mm_regex_only_match:
Expand All @@ -572,23 +598,20 @@ def scan(self):
try:
with open(self.file_path, 'r') as fi:
code_contents = fi.read()
result = scan_parser(code_contents, rule_match, self.line_number, self.file_path)
result = scan_parser(code_contents, rule_match, self.line_number, self.file_path, repair_functions=self.repair_functions)
logger.debug('[AST] [RET] {c}'.format(c=result))
if len(result) > 0:
if result[0]['code'] == 1: # 函数参数可控
return True, 'Function-param-controllable'

if result[0]['code'] == 2: # 函数为敏感函数
return False, 'Function-sensitive'

if result[0]['code'] == 0: # 漏洞修复
if result[0]['code'] == 2: # 漏洞修复
return False, 'Function-param-controllable but fixed'

if result[0]['code'] == -1: # 函数参数不可控
return False, 'Function-param-uncon'

if result[0]['code'] == 4: # 新规则生成
return False, 'New Core', result[0]['source']
return False, 'New Core', result[0]['source']

logger.debug('[AST] [CODE] {code}'.format(code=result[0]['code']))
else:
Expand Down Expand Up @@ -726,7 +749,7 @@ def auto_parse_match(single_match):
mr.line_number = 0

# vulnerability information
mr.rule_name = 'auto rule'
mr.rule_name = 'Auto rule'
mr.id = '00000'
mr.language = 'None'
mr.commit_author = 'Cobra-W'
Expand Down Expand Up @@ -800,7 +823,7 @@ def NewCore(target_directory, new_rules, files, count=0):

try:
datas = Core(target_directory, vulnerability, sr, 'project name',
['whitelist1', 'whitelist2'], files=files).scan()
['whitelist1', 'whitelist2'], files=files).scan()
if len(datas) == 3:
is_vulnerability, reason, data = datas
elif len(datas) == 2:
Expand Down
Loading

0 comments on commit 5d462f3

Please sign in to comment.