diff --git a/.gitignore b/.gitignore index 03bc4cae..3194777c 100644 --- a/.gitignore +++ b/.gitignore @@ -84,3 +84,8 @@ result/* # real_test tests/real_test/* + +# new rule for test +rules/secret/* +!rules/secret/demo.py +!rules/secret/__init.py diff --git a/README.md b/README.md index 491e86b9..0ed928d0 100644 --- a/README.md +++ b/README.md @@ -89,6 +89,10 @@ Cobra-W是从Cobra2.0发展而来的分支,着眼于白帽子使用的白盒 - Cobra-W 0.7.3 - 修复了部分处理ast的bug - 完善了cvi-1009 +- 2017-12-27 + - Cobra-W 0.8 + - 全新的secret机制,自定义解决不同cms的过滤不一,导致的扫描误差大问题 + - 更新修复了判断是否被修复的问题 # README(开发文档) diff --git a/cobra/__init__.py b/cobra/__init__.py index daae6e29..dd6221ee 100644 --- a/cobra/__init__.py +++ b/cobra/__init__.py @@ -45,6 +45,7 @@ def main(): parser_group_scan.add_argument('-f', '--format', dest='format', action='store', default='csv', metavar='', choices=['html', 'json', 'csv', 'xml'], help='vulnerability output format (formats: %(choices)s)') parser_group_scan.add_argument('-o', '--output', dest='output', action='store', default='', metavar='', help='vulnerability output STREAM, FILE') parser_group_scan.add_argument('-r', '--rule', dest='special_rules', action='store', default=None, metavar='', help='specifies rules e.g: 1000, 1001') + parser_group_scan.add_argument('-s', '--secret', dest='secret_name', action='store', default=None, metavar='', help='secret repair function e.g: wordpress') parser_group_scan.add_argument('-d', '--debug', dest='debug', action='store_true', default=False, help='open debug mode') args = parser.parse_args() @@ -53,7 +54,7 @@ def main(): logger.setLevel(logging.DEBUG) logger.debug('[INIT] set logging level: debug') - if args.target is '' and args.output is '': + if args.target is '' and args.output is '': parser.print_help() exit() @@ -66,7 +67,7 @@ def main(): } Running(a_sid).status(data) - cli.start(args.target, args.format, args.output, args.special_rules, a_sid) + cli.start(args.target, args.format, args.output, args.special_rules, a_sid, args.secret_name) t2 = time.time() logger.info('[INIT] Done! Consume Time:{ct}s'.format(ct=t2 - t1)) diff --git a/cobra/__version__.py b/cobra/__version__.py index 4df44e8f..e9c07045 100644 --- a/cobra/__version__.py +++ b/cobra/__version__.py @@ -7,7 +7,7 @@ __issue_page__ = 'https://github.com/LoRexxar/Cobra-W/issues/new' __python_version__ = sys.version.split()[0] __platform__ = platform.platform() -__version__ = '0.7.3' +__version__ = '0.8' __author__ = 'LoRexxar' __author_email__ = 'LoRexxar@gmail.com' __license__ = 'MIT License' @@ -25,6 +25,7 @@ __epilog__ = """Usage: python {m} -t {td} python {m} -t {td} -r 1000, 1001 + python {m} -t {td} -s wordpress python {m} -t {td} -f json -o /tmp/report.json python {m} -t {td} --debug """.format(m='cobra.py', td='tests/vulnerabilities') diff --git a/cobra/cast.py b/cobra/cast.py index 10c1835b..34b65cb4 100644 --- a/cobra/cast.py +++ b/cobra/cast.py @@ -25,7 +25,7 @@ class CAST(object): languages = ['php', 'java'] - def __init__(self, rule, target_directory, file_path, line, code, files=None, rule_class=None, ast=False): + def __init__(self, rule, target_directory, file_path, line, code, files=None, rule_class=None, repair_functions=[]): self.target_directory = target_directory self.data = [] self.rule = rule @@ -37,7 +37,7 @@ def __init__(self, rule, target_directory, file_path, line, code, files=None, ru self.param_value = None self.language = None self.sr = rule_class - self.ast = ast + self.repair_functions = repair_functions for language in self.languages: if self.file_path[-len(language):].lower() == language: self.language = language @@ -237,7 +237,7 @@ def is_controllable_param(self): logger.debug("[Deep AST] Start AST for param {param_name}".format(param_name=param_name)) - _is_co, _cp, expr_lineno = anlysis_params(param_name, param_content, self.file_path, self.line, self.sr.vul_function) + _is_co, _cp, expr_lineno = anlysis_params(param_name, param_content, self.file_path, self.line, self.sr.vul_function, self.repair_functions) if _is_co == 1: logger.debug("[AST] Is assign string: `Yes`") diff --git a/cobra/cli.py b/cobra/cli.py index 368fb577..7703f9c2 100644 --- a/cobra/cli.py +++ b/cobra/cli.py @@ -35,9 +35,10 @@ def get_sid(target, is_a_sid=False): return sid.lower() -def start(target, formatter, output, special_rules, a_sid=None): +def start(target, formatter, output, special_rules, a_sid=None, secret_name=None): """ Start CLI + :param secret_id: secret id or name? :param target: File, FOLDER, GIT :param formatter: :param output: @@ -86,7 +87,7 @@ def start(target, formatter, output, special_rules, a_sid=None): # scan scan(target_directory=target_directory, a_sid=a_sid, s_sid=s_sid, special_rules=pa.special_rules, language=main_language, framework=main_framework, file_count=file_count, extension_count=len(files), - files=files) + files=files, secret_name=secret_name) except KeyboardInterrupt as e: logger.critical("[!] KeyboardInterrupt, exit...") exit() diff --git a/cobra/engine.py b/cobra/engine.py index a6e4dfe2..b452562b 100644 --- a/cobra/engine.py +++ b/cobra/engine.py @@ -138,15 +138,15 @@ def score2level(score): return '{l}-{s}: {ast}'.format(l=level[:1], s=score_full, ast=a) -def scan_single(target_directory, single_rule, files=None, ast=False): +def scan_single(target_directory, single_rule, files=None, secret_name=None): try: - return SingleRule(target_directory, single_rule, files, ast).process() + return SingleRule(target_directory, single_rule, files, secret_name).process() except Exception: raise def scan(target_directory, a_sid=None, s_sid=None, special_rules=None, language=None, framework=None, file_count=0, - extension_count=0, files=None): + extension_count=0, files=None, secret_name=None): r = Rule() vulnerabilities = r.vulnerabilities rules = r.rules(special_rules) @@ -182,7 +182,7 @@ def store(result): vulnerability=rule.vulnerability, language=rule.language )) - result = scan_single(target_directory, rule, files) + result = scan_single(target_directory, rule, files, secret_name) store(result) # print @@ -236,13 +236,13 @@ def store(result): class SingleRule(object): - def __init__(self, target_directory, single_rule, files, ast=False): + def __init__(self, target_directory, single_rule, files, secret_name=None): self.target_directory = target_directory self.find = Tool().find self.grep = Tool().grep self.sr = single_rule self.files = files - self.ast = ast + self.secret_name = secret_name # Single Rule Vulnerabilities """ [ @@ -312,7 +312,7 @@ def process(self): try: datas = Core(self.target_directory, vulnerability, self.sr, 'project name', ['whitelist1', 'whitelist2'], test=is_test, index=index, - files=self.files).scan() + files=self.files, secret_name=self.secret_name).scan() if len(datas) == 3: is_vulnerability, reason, data = datas @@ -369,7 +369,7 @@ def parse_match(self, single_match): class Core(object): def __init__(self, target_directory, vulnerability_result, single_rule, project_name, white_list, test=False, - index=None, files=None, count=0): + index=None, files=None, secret_name=None): """ Initialize :param: target_directory: @@ -380,9 +380,11 @@ def __init__(self, target_directory, vulnerability_result, single_rule, project_ :param test: is test :param index: vulnerability index :param files: core file list - :param count: + :param secret_name: secret name """ self.data = [] + self.repair_dict = {} + self.repair_functions = [] self.target_directory = target_directory @@ -391,11 +393,13 @@ def __init__(self, target_directory, vulnerability_result, single_rule, project_ # self.code_content = vulnerability_result.code_content.strip() self.code_content = vulnerability_result.code_content self.files = files + self.secret_name = secret_name self.rule_match = single_rule.match self.rule_match_mode = single_rule.match_mode self.vul_function = single_rule.vul_function self.cvi = single_rule.svid + self.lan = single_rule.language self.single_rule = single_rule self.project_name = project_name @@ -509,6 +513,27 @@ def is_can_parse(self): return True return False + def init_repair(self): + """ + 初始化修复函数规则 + :return: + """ + # self.single_rule.svid + a = __import__('rules.secret.demo', fromlist=['IS_REPAIR_DEFAULT']) + self.repair_dict = getattr(a, 'IS_REPAIR_DEFAULT') + + if self.secret_name is not None: + try: + a = __import__('rules.secret.' + self.secret_name, fromlist=[self.secret_name]) + self.repair_dict = dict(self.repair_dict.items() + a.items()) + except ImportError: + logger.warning('[AST][INIT] Secret_name init error... No nodule named {}'.format(self.secret_name)) + + # init + for key in self.repair_dict: + if self.single_rule.svid in self.repair_dict[key]: + self.repair_functions.append(key) + def scan(self): """ Scan vulnerabilities @@ -553,8 +578,9 @@ def scan(self): logger.debug('[CVI-{cvi}] match-mode {mm}'.format(cvi=self.cvi, mm=self.rule_match_mode)) if self.file_path[-3:].lower() == 'php': try: + self.init_repair() ast = CAST(self.rule_match, self.target_directory, self.file_path, self.line_number, - self.code_content, files=self.files, rule_class=self.single_rule) + self.code_content, files=self.files, rule_class=self.single_rule, repair_functions=self.repair_functions) # only match if self.rule_match_mode == const.mm_regex_only_match: @@ -572,23 +598,20 @@ def scan(self): try: with open(self.file_path, 'r') as fi: code_contents = fi.read() - result = scan_parser(code_contents, rule_match, self.line_number, self.file_path) + result = scan_parser(code_contents, rule_match, self.line_number, self.file_path, repair_functions=self.repair_functions) logger.debug('[AST] [RET] {c}'.format(c=result)) if len(result) > 0: if result[0]['code'] == 1: # 函数参数可控 return True, 'Function-param-controllable' - if result[0]['code'] == 2: # 函数为敏感函数 - return False, 'Function-sensitive' - - if result[0]['code'] == 0: # 漏洞修复 + if result[0]['code'] == 2: # 漏洞修复 return False, 'Function-param-controllable but fixed' if result[0]['code'] == -1: # 函数参数不可控 return False, 'Function-param-uncon' if result[0]['code'] == 4: # 新规则生成 - return False, 'New Core', result[0]['source'] + return False, 'New Core', result[0]['source'] logger.debug('[AST] [CODE] {code}'.format(code=result[0]['code'])) else: @@ -726,7 +749,7 @@ def auto_parse_match(single_match): mr.line_number = 0 # vulnerability information - mr.rule_name = 'auto rule' + mr.rule_name = 'Auto rule' mr.id = '00000' mr.language = 'None' mr.commit_author = 'Cobra-W' @@ -800,7 +823,7 @@ def NewCore(target_directory, new_rules, files, count=0): try: datas = Core(target_directory, vulnerability, sr, 'project name', - ['whitelist1', 'whitelist2'], files=files).scan() + ['whitelist1', 'whitelist2'], files=files).scan() if len(datas) == 3: is_vulnerability, reason, data = datas elif len(datas) == 2: diff --git a/cobra/parser.py b/cobra/parser.py index cc42ad99..069d759e 100644 --- a/cobra/parser.py +++ b/cobra/parser.py @@ -19,6 +19,7 @@ with_line = True scan_results = [] # 结果存放列表初始化 +is_repair_functions = [] # 修复函数初始化 def export(items): @@ -259,7 +260,8 @@ def is_repair(expr): :return: """ is_re = False # 是否修复,默认值是未修复 - if expr == 'escapeshellcmd': + global is_repair_functions + if expr in is_repair_functions: is_re = True return is_re @@ -558,6 +560,11 @@ def parameters_back(param, nodes, function_params=None, lineno=0, param_node = get_node_name(node.node) # param_node为被赋值的变量 param_expr, expr_lineno, is_re = get_expr_name(node.expr) # param_expr为赋值表达式,param_expr为变量或者列表 + if param_name == param_node and is_re is True: + is_co = 2 + cp = None + return is_co, cp, expr_lineno + if param_name == param_node and not isinstance(param_expr, list): # 找到变量的来源,开始继续分析变量的赋值表达式是否可控 is_co, cp = is_controllable(param_expr) # 开始判断变量是否可控 @@ -609,7 +616,7 @@ def parameters_back(param, nodes, function_params=None, lineno=0, vul_nodes = [] for function_node in function_nodes: - if int(function_lineno) <= function_node.lineno < int(lineno): + if function_node is not None and int(function_lineno) <= function_node.lineno < int(lineno): vul_nodes.append(function_node) if len(vul_nodes) > 0: @@ -799,9 +806,10 @@ def get_function_params(nodes): return params -def anlysis_params(param, code_content, file_path, lineno, vul_function=None): +def anlysis_params(param, code_content, file_path, lineno, vul_function=None, repair_functions=None): """ 在cast调用时做中转数据预处理 + :param repair_functions: :param vul_function: :param lineno: :param param: @@ -809,8 +817,12 @@ def anlysis_params(param, code_content, file_path, lineno, vul_function=None): :param file_path: :return: """ + global is_repair_functions count = 0 function_params = None + if repair_functions is not None: + is_repair_functions = repair_functions + if type(param) is str and "->" in param: param_left = php.Variable(param.split("->")[0]) param_right = param.split("->")[1] @@ -822,9 +834,7 @@ def anlysis_params(param, code_content, file_path, lineno, vul_function=None): vul_nodes = [] for node in all_nodes: - if not node: - continue - if node.lineno < int(lineno): + if node is not None and node.lineno < int(lineno): vul_nodes.append(node) is_co, cp, expr_lineno = deep_parameters_back(param, vul_nodes, function_params, count, file_path, lineno, vul_function=vul_function) @@ -898,6 +908,7 @@ def analysis_functioncall(node, back_node, vul_function, vul_lineno): def analysis_binaryop_node(node, back_node, vul_function, vul_lineno, function_params=None, file_path=None): """ 处理BinaryOp类型节点-->取出参数-->回溯判断参数是否可控-->输出结果 + :param file_path: :param node: :param back_node: :param vul_function: @@ -928,6 +939,7 @@ def analysis_binaryop_node(node, back_node, vul_function, vul_lineno, function_p def analysis_objectproperry_node(node, back_node, vul_function, vul_lineno, function_params=None, file_path=None): """ 处理_objectproperry类型节点-->取出参数-->回溯判断参数是否可控-->输出结果 + :param file_path: :param node: :param back_node: :param vul_function: @@ -971,6 +983,7 @@ def analysis_arrayoffset_node(node, vul_function, vul_lineno): def analysis_functioncall_node(node, back_node, vul_function, vul_lineno, function_params=None, file_path=None): """ 处理FunctionCall类型节点-->取出参数-->回溯判断参数是否可控-->输出结果 + :param file_path: :param node: :param back_node: :param vul_function: @@ -1022,7 +1035,7 @@ def analysis_variable_node(node, back_node, vul_function, vul_lineno, function_p set_scan_results(is_co, cp, expr_lineno, vul_function, param, vul_lineno) -def analysis_ternaryop_node(node, back_node, vul_function, vul_lineno, function_params=None, file_path=None): +def analysis_ternaryop_node(node, back_node, vul_function, vul_lineno, function_params=None, file_path=None, repair_functions=[]): """ 处理三元提交判断语句,回溯双变量 :param node: @@ -1288,7 +1301,7 @@ def analysis(nodes, vul_function, back_node, vul_lineo, file_path=None, function back_node.append(node) -def scan_parser(code_content, sensitive_func, vul_lineno, file_path): +def scan_parser(code_content, sensitive_func, vul_lineno, file_path, repair_functions=[]): """ 开始检测函数 :param code_content: 要检测的文件内容 @@ -1298,8 +1311,9 @@ def scan_parser(code_content, sensitive_func, vul_lineno, file_path): :return: """ try: - global scan_results + global scan_results, is_repair_functions scan_results = [] + is_repair_functions = repair_functions parser = make_parser() all_nodes = parser.parse(code_content, debug=False, lexer=lexer.clone(), tracking=with_line) diff --git a/rules/php/CVI_1010.py b/rules/php/CVI_1010.py new file mode 100644 index 00000000..dbabeb49 --- /dev/null +++ b/rules/php/CVI_1010.py @@ -0,0 +1,45 @@ +# -*- coding: utf-8 -*- + +""" + CVI-1010 + ~~~~ + + ldap injection + + :author: LoRexxar + :homepage: https://github.com/LoRexxar/cobra + :license: MIT, see LICENSE for more details. + :copyright: Copyright (c) 2017 LoRexxar. All rights reserved +""" + +from cobra.file import file_grep + + +class CVI_1010(): + """ + rule class + """ + + def __init__(self): + + self.svid = 1010 + self.language = "PHP" + self.author = "LoRexxar/wufeifei" + self.vulnerability = "LDAPI" + self.description = "LDAP injection" + + # status + self.status = True + + # 部分配置 + self.match_mode = "function-param-regex" + self.match = "(ldap_add|ldap_delete|ldap_list|ldap_read|ldap_search|ldap_bind)" + self.vul_function = None + + def main(self, regex_string): + """ + regex string input + :regex_string: regex match string + :return: + """ + pass diff --git a/rules/php/templates.py b/rules/php/demo.py similarity index 96% rename from rules/php/templates.py rename to rules/php/demo.py index bba7d6b6..5c8a2b68 100644 --- a/rules/php/templates.py +++ b/rules/php/demo.py @@ -1,7 +1,7 @@ # -*- coding: utf-8 -*- """ - templates + demo ~~~~ just tamplates for rule @@ -13,7 +13,7 @@ """ -class templates(): +class demo(): """ rule class diff --git a/rules/secret/__init__.py b/rules/secret/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/rules/secret/demo.py b/rules/secret/demo.py new file mode 100644 index 00000000..d37274df --- /dev/null +++ b/rules/secret/demo.py @@ -0,0 +1,23 @@ +# -*- coding: utf-8 -*- + +""" + demo + ~~~~ + + just demo for secret + + :author: LoRexxar + :homepage: https://github.com/LoRexxar/cobra + :license: MIT, see LICENSE for more details. + :copyright: Copyright (c) 2017 LoRexxar. All rights reserved +""" + +IS_REPAIR_DEFAULT = { + "htmlspecialchars": [1000, 10001], + "htmlentities": [1000, 10001], + "ldap_escape": [1010], + "mysql_real_escape_string": [1004, 1005, 1006], + "addslashes": [1004, 1005, 1006], + "escapeshellcmd": [1009, 1011], + "escapeshellarg": [1009,1011], +} diff --git a/tests/ast/test_function/test_function.php b/tests/ast/test_function/test_function.php new file mode 100644 index 00000000..97695e8c --- /dev/null +++ b/tests/ast/test_function/test_function.php @@ -0,0 +1,13 @@ +