diff --git a/.mypy.ini b/.mypy.ini index 1946e5b90..1b3c8477b 100644 --- a/.mypy.ini +++ b/.mypy.ini @@ -49,5 +49,8 @@ ignore_missing_imports = True [mypy-docx.*] ignore_missing_imports = True +[mypy-pydriller.*] +ignore_missing_imports = True + [mypy-base62.*] ignore_missing_imports = True diff --git a/credsweeper/__main__.py b/credsweeper/__main__.py index f025e5d51..7de893b4c 100644 --- a/credsweeper/__main__.py +++ b/credsweeper/__main__.py @@ -1,10 +1,15 @@ +import base64 import binascii +import hashlib +import io import logging import os import sys import time from argparse import ArgumentParser, ArgumentTypeError, Namespace -from typing import Any, Union, Optional, Dict +from typing import Any, Union, Optional, Dict, List, Tuple + +from pydriller import Repository from credsweeper import __version__ from credsweeper.app import APP_PATH, CredSweeper @@ -116,6 +121,17 @@ def get_arguments() -> Namespace: const="log.yaml", dest="export_log_config", metavar="PATH") + group.add_argument("--git", nargs="+", help="git repo to scan", dest="git", metavar="PATH") + parser.add_argument("--commits", + help="scan git repo for N commits only", + type=positive_int, + dest="commits", + default=0, + metavar="POSITIVE_INT") + parser.add_argument("--branch", + help="scan git repo for single branch, otherwise - all branches were scanned (slow)", + dest="branch", + type=str) parser.add_argument("--rules", help="path of rule config file (default: credsweeper/rules/config.yaml). " f"severity:{[i.value for i in Severity]} " @@ -320,9 +336,84 @@ def scan(args: Namespace, content_provider: AbstractProvider, json_filename: Opt return credsweeper.run(content_provider=content_provider) except Exception as exc: logger.critical(exc, exc_info=True) + logger.exception(exc) return -1 +def scan_git(args: Namespace) -> Tuple[int, int, int]: + """Scan repository for branches and commits + Returns: + total credentials found + total scanned branches + total scanned commits + """ + total_credentials = 0 + total_branches = 0 + total_commits = 0 + try: + sha1git = hashlib.sha1(str(args.git).encode()).digest() + repo_hash = base64.b32encode(sha1git).decode("ascii") + journal_filename = f"{repo_hash}.json" + logger.info(f"{args.git} sha1 in base32 {repo_hash}") + repo_journal = Util.json_load(journal_filename) + if not isinstance(repo_journal, dict): + with open(journal_filename, "w") as f: + f.write("{}") + repo_journal = {"repo": args.git} + credsweeper = CredSweeper(rule_path=args.rule_path, + config_path=args.config_path, + api_validation=args.api_validation, + sort_output=args.sort_output, + use_filters=args.no_filters, + pool_count=args.jobs, + ml_batch_size=args.ml_batch_size, + ml_threshold=args.ml_threshold, + ml_providers=args.ml_providers, + find_by_ext=args.find_by_ext, + depth=args.depth, + doc=args.doc, + severity=args.severity, + size_limit=args.size_limit, + log_level=args.log) + repository = Repository(args.git, only_in_branch=args.branch) + for commit in repository.traverse_commits(): + if commit.hash in repo_journal: + logger.debug(f"Skip already scanned commit: {commit.hash}") + continue + logger.info(f"Scan commit: {commit.hash}") + paths: List[Tuple[str, io.BytesIO]] = [] + for file in commit.modified_files: + logger.info(f"FILE: {file.old_path} -> {file.new_path}") + try: + if file.new_path is not None: + _io = io.BytesIO(file.content) + paths.append((file.filename, _io)) + except ValueError as exc: + logger.error("Possible missed submodule:%s", str(exc)) + provider = FilesProvider(paths) + if args.json_filename: + ext = Util.get_extension(args.json_filename, False) + credsweeper.json_filename = f"{args.json_filename[:-len(ext)]}.{commit.hash}{ext}" + if args.xlsx_filename: + ext = Util.get_extension(args.xlsx_filename, False) + credsweeper.xlsx_filename = f"{args.xlsx_filename[:-len(ext)]}.{commit.hash}{ext}" + + commit_cred_number = credsweeper.run(provider) + if credsweeper.is_ml_validator_inited: + # reset not-pickled object for multiprocess + credsweeper.ml_validator = None + credsweeper.credential_manager.candidates.clear() + total_credentials += commit_cred_number + total_commits += 1 + repo_journal[commit.hash] = commit_cred_number + Util.json_dump(repo_journal, journal_filename) + total_branches += 1 + except Exception as exc: + logger.critical(exc, exc_info=True) + return -1, total_branches, total_commits + return total_credentials, total_branches, total_commits + + def main() -> int: """Main function""" result = EXIT_FAILURE @@ -331,7 +422,7 @@ def main() -> int: if args.banner: print(f"CredSweeper {__version__} crc32:{check_integrity():08x}") Logger.init_logging(args.log, args.log_config_path) - logger.info(f"Init CredSweeper object with arguments: {args}") + logger.info(f"Init CredSweeper object with arguments: {args} CWD: {os.getcwd()}") summary: Dict[str, int] = {} if args.path: logger.info(f"Run analyzer on path: {args.path}") @@ -354,6 +445,13 @@ def main() -> int: summary["Deleted File Credentials"] = del_credentials_number if 0 <= add_credentials_number and 0 <= del_credentials_number: result = EXIT_SUCCESS + elif args.git: + logger.info(f"Run analyzer on GIT: {args.git}") + credentials_number, branches_number, commits_number = scan_git(args) + summary[ + f"Detected Credentials in {branches_number} branches and {commits_number} commits "] = credentials_number + if 0 <= credentials_number: + result = EXIT_SUCCESS elif args.export_config: logging.info(f"Exporting default config to file: {args.export_config}") config_dict = Util.json_load(APP_PATH / "secret" / "config.json") diff --git a/credsweeper/app.py b/credsweeper/app.py index f60b28394..fbe813917 100644 --- a/credsweeper/app.py +++ b/credsweeper/app.py @@ -47,7 +47,7 @@ def __init__(self, sort_output: bool = False, use_filters: bool = True, pool_count: int = 1, - ml_batch_size: Optional[int] = None, + ml_batch_size: Optional[int] = 16, ml_threshold: Union[float, ThresholdPreset] = ThresholdPreset.medium, ml_config: Union[None, str, Path] = None, ml_model: Union[None, str, Path] = None, @@ -187,11 +187,18 @@ def _use_ml_validation(self) -> bool: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + @property + def is_ml_validator_inited(self) -> bool: + """method to check whether ml_validator was inited without creation""" + return bool(self.__ml_validator) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + @property def ml_validator(self) -> MlValidator: """ml_validator getter""" from credsweeper.ml_model import MlValidator - if not self.__ml_validator: + if not self.is_ml_validator_inited: self.__ml_validator: MlValidator = MlValidator( threshold=self.ml_threshold, # ml_config=self.ml_config, # @@ -381,6 +388,7 @@ def post_processing(self) -> None: if ml_cred_groups: logger.info(f"Run ML Validation for {len(ml_cred_groups)} groups") is_cred, probability = self.ml_validator.validate_groups(ml_cred_groups, self.ml_batch_size) + logger.info(f"DONE ML Validation for {len(is_cred)} results") for i, (_, group_candidates) in enumerate(ml_cred_groups): for candidate in group_candidates: if candidate.use_ml: @@ -404,6 +412,12 @@ def export_results(self) -> None: credentials = self.credential_manager.get_credentials() + if credentials: + logger.info(f"Exporting {len(credentials)} credentials") + else: + logger.info("No credentials were found") + return + if self.sort_output: credentials.sort(key=lambda x: ( # x.line_data_list[0].path, # diff --git a/credsweeper/ml_model/ml_validator.py b/credsweeper/ml_model/ml_validator.py index e5f38f725..c881bdd2f 100644 --- a/credsweeper/ml_model/ml_validator.py +++ b/credsweeper/ml_model/ml_validator.py @@ -11,6 +11,7 @@ from credsweeper.credentials import Candidate, CandidateKey import credsweeper.ml_model.features as features from credsweeper.utils import Util +import psutil logger = logging.getLogger(__name__) @@ -238,6 +239,9 @@ def validate_groups(self, group_list: List[Tuple[CandidateKey, List[Candidate]]] variable_input_list.clear() value_input_list.clear() features_list.clear() + elif 0 == tail % 4391: + process = psutil.Process() + logger.info(f"ML Validation tail {tail} meminfo:{process.memory_info()}") if head != tail: probability[head:tail] = self._batch_call_model(line_input_list, variable_input_list, value_input_list, features_list) diff --git a/docs/source/guide.rst b/docs/source/guide.rst index ebebbc67f..cf564ae80 100644 --- a/docs/source/guide.rst +++ b/docs/source/guide.rst @@ -13,7 +13,9 @@ Get all argument list: .. code-block:: text - usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH]) + usage: python -m credsweeper [-h] + (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH] | --git PATH [PATH ...]) + [--commits POSITIVE_INT] [--branch BRANCH] [--rules PATH] [--severity SEVERITY] [--config PATH] [--log_config PATH] [--denylist PATH] [--find-by-ext] [--depth POSITIVE_INT] [--no-filters] [--doc] [--ml_threshold FLOAT_OR_STR] [--ml_batch_size POSITIVE_INT] [--ml_config PATH] [--ml_model PATH] [--ml_providers STR] @@ -31,6 +33,11 @@ Get all argument list: exporting default config to file (default: config.json) --export_log_config [PATH] exporting default logger config to file (default: log.yaml) + --git PATH [PATH ...] + git repo to scan + --commits POSITIVE_INT + scan git repo for N commits only + --branch BRANCH scan git repo for single branch, otherwise - all branches were scanned (slow) --rules PATH path of rule config file (default: credsweeper/rules/config.yaml). severity:['critical', 'high', 'medium', 'low', 'info'] type:['keyword', 'pattern', 'pem_key', 'multi'] --severity SEVERITY set minimum level for rules to apply ['critical', 'high', 'medium', 'low', 'info'](default: 'Severity.INFO', case insensitive) --config PATH use custom config (default: built-in) diff --git a/requirements.txt b/requirements.txt index 654affe8b..a408f302c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,7 @@ pdfminer.six==20231228 password-strength==0.0.3.post2 python-dateutil==2.8.2 pyjks==20.0.0 +PyDriller==2.6 pybase62==1.0.0 base58==2.1.1 diff --git a/tests/test_app.py b/tests/test_app.py index 030b235cf..39be99746 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -5,12 +5,14 @@ import subprocess import sys import tempfile +from pathlib import Path import time from typing import AnyStr, Tuple from unittest import TestCase import deepdiff import pytest +from git import Repo from credsweeper.app import APP_PATH from credsweeper.utils import Util @@ -22,12 +24,12 @@ class TestApp(TestCase): @staticmethod def _m_credsweeper(args) -> Tuple[str, str]: - proc = subprocess.Popen( + with subprocess.Popen( [sys.executable, "-m", "credsweeper", *args], # - cwd=APP_PATH.parent, # - stdout=subprocess.PIPE, # - stderr=subprocess.PIPE) # - _stdout, _stderr = proc.communicate() + cwd=APP_PATH.parent, # + stdout=subprocess.PIPE, # + stderr=subprocess.PIPE) as proc: + _stdout, _stderr = proc.communicate() def transform(x: AnyStr) -> str: if isinstance(x, bytes): @@ -220,7 +222,10 @@ def test_it_works_n(self) -> None: " | --diff_path PATH [PATH ...]" \ " | --export_config [PATH]" \ " | --export_log_config [PATH]" \ + " | --git PATH [PATH ...]" \ ")" \ + " [--commits POSITIVE_INT]" \ + " [--branch BRANCH]" \ " [--rules PATH]" \ " [--severity SEVERITY]" \ " [--config PATH]" \ @@ -252,6 +257,7 @@ def test_it_works_n(self) -> None: " --diff_path" \ " --export_config" \ " --export_log_config" \ + " --git" \ " is required " expected = " ".join(expected.split()) self.assertEqual(expected, output) @@ -352,7 +358,8 @@ def test_patch_save_json_p(self) -> None: _stdout, _stderr = self._m_credsweeper( ["--diff_path", target_path, "--save-json", json_filename, "--log", "silence"]) self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json"))) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) + # deleted patch contains no issues + self.assertFalse(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -490,10 +497,7 @@ def test_find_by_ext_n(self) -> None: json_filename = os.path.join(tmp_dir, f"{__name__}.json") _stdout, _stderr = self._m_credsweeper( ["--path", tmp_dir, "--save-json", json_filename, "--log", "silence"]) - self.assertTrue(os.path.exists(json_filename)) - with open(json_filename, "r") as json_file: - report = json.load(json_file) - self.assertEqual(0, len(report)) + self.assertFalse(os.path.exists(json_filename)) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -555,17 +559,29 @@ def test_denylist_p(self) -> None: _stdout, _stderr = self._m_credsweeper([ "--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence" ]) - with open(json_filename, "r") as json_file: - report = json.load(json_file) - self.assertEqual(0, len(report)) + self.assertFalse(os.path.exists(json_filename)) with open(denylist_filename, "w") as f: - f.write('ghp_00000000000000000000000000000004WZ4EQ') # value only + f.write("abc") _stdout, _stderr = self._m_credsweeper([ "--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence" ]) with open(json_filename, "r") as json_file: report = json.load(json_file) - self.assertEqual(0, len(report)) + self.assertEqual(1, len(report)) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + + def test_denylist_line_p(self) -> None: + target_path = str(SAMPLES_PATH / "password.gradle") + with tempfile.TemporaryDirectory() as tmp_dir: + json_filename = os.path.join(tmp_dir, f"{__name__}.json") + denylist_filename = os.path.join(tmp_dir, "list.txt") + with open(denylist_filename, "w") as f: + f.write(' password = "cackle!" ') + _stdout, _stderr = self._m_credsweeper([ + "--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence" + ]) + self.assertFalse(os.path.exists(json_filename)) # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # @@ -579,6 +595,7 @@ def test_denylist_n(self) -> None: _stdout, _stderr = self._m_credsweeper([ "--path", target_path, "--denylist", denylist_filename, "--save-json", json_filename, "--log", "silence" ]) + self.assertTrue(os.path.exists(json_filename)) with open(json_filename, "r") as json_file: report = json.load(json_file) self.assertEqual(1, len(report)) @@ -600,7 +617,7 @@ def test_rules_ml_p(self) -> None: report_set = set([i["rule"] for i in report]) rules = Util.yaml_load(APP_PATH / "rules" / "config.yaml") rules_set = set([i["name"] for i in rules]) - missed = { # + missed = { # type: ignore "ID_PASSWD_PAIR", "SECRET_PAIR", "IP_ID_PASSWORD_TRIPLE", @@ -689,6 +706,33 @@ def test_doc_n(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_pydriller_p(self) -> None: + with tempfile.TemporaryDirectory() as tmp_dir: + with Repo.init(tmp_dir) as repo: + cred_file = Path(tmp_dir) / "with_cred" + value = "GbdD@23#d0" + with open(cred_file, "w") as f: + f.write(f"git_password: {value}") + repo.index.add([cred_file]) + repo.index.commit("added file") + with open(cred_file, "w") as f: + f.write("DELETED") + repo.index.add([cred_file]) + repo.index.commit("cleared file") + # check that value is not in the file + with open(cred_file, "r") as f: + self.assertNotIn(value, f.read()) + # run git scan + _stdout, _stderr = self._m_credsweeper(["--log", "DEBUG", "--git", str(tmp_dir)]) + self.assertIn("Detected Credentials in 1 branches and 2 commits : 1", _stdout, _stdout) + self.assertNotIn("CRITICAL", _stdout, _stdout) + self.assertNotIn("CRITICAL", _stderr, _stderr) + # check detected value in stdout + self.assertIn(value, _stdout, _stdout) + # del repo + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_external_ml_n(self) -> None: # not existed ml_config _stdout, _stderr = self._m_credsweeper( diff --git a/tests/test_main.py b/tests/test_main.py index c1e30fbe6..b5ac1d1e0 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -107,42 +107,6 @@ def test_use_filters_n(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - @mock.patch("json.dump") - def test_save_json_p(self, mock_json_dump) -> None: - cred_sweeper = CredSweeper(json_filename="unittest_output.json") - cred_sweeper.run([]) - mock_json_dump.assert_called() - self.assertTrue(os.path.exists("unittest_output.json")) - os.remove("unittest_output.json") - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - @mock.patch("json.dump") - def test_save_json_n(self, mock_json_dump) -> None: - cred_sweeper = CredSweeper() - cred_sweeper.run([]) - mock_json_dump.assert_not_called() - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - def test_save_xlsx_p(self) -> None: - with tempfile.TemporaryDirectory() as tmp_dir: - test_filename = os.path.join(tmp_dir, "unittest_output.xlsx") - self.assertFalse(os.path.exists(test_filename)) - cred_sweeper = CredSweeper(xlsx_filename=test_filename) - cred_sweeper.run([]) - self.assertTrue(os.path.exists(test_filename)) - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - - @mock.patch("pandas.DataFrame", return_value=pd.DataFrame(data=[])) - def test_save_xlsx_n(self, mock_xlsx_to_excel) -> None: - cred_sweeper = CredSweeper() - cred_sweeper.run([]) - mock_xlsx_to_excel.assert_not_called() - - # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # - @mock.patch("credsweeper.__main__.scan", return_value=None) @mock.patch("credsweeper.__main__.get_arguments") def test_main_n(self, mock_get_arguments, mock_scan) -> None: @@ -177,8 +141,9 @@ def test_main_path_p(self, mock_get_arguments) -> None: denylist_path=None) mock_get_arguments.return_value = args_mock self.assertEqual(EXIT_SUCCESS, app_main.main()) - self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) self.assertTrue(os.path.exists(os.path.join(tmp_dir, f"{__name__}_added.json"))) + # deleted patch contains no issue + self.assertFalse(os.path.exists(os.path.join(tmp_dir, f"{__name__}_deleted.json"))) report = Util.json_load(os.path.join(tmp_dir, f"{__name__}_added.json")) self.assertTrue(report) self.assertEqual(3, report[0]["line_data_list"][0]["line_num"]) @@ -289,6 +254,38 @@ def test_report_p(self, mock_get_arguments) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + @mock.patch("credsweeper.__main__.get_arguments") + def test_report_n(self, mock_get_arguments) -> None: + # no reports will be generated when no credentials are found + with tempfile.TemporaryDirectory() as tmp_dir: + json_filename = os.path.join(tmp_dir, "report.json") + xlsx_filename = os.path.join(tmp_dir, "report.xlsx") + args_mock = Mock( + log='warning', + config_path=None, + path=[tmp_dir], # empty dir + diff_path=None, + json_filename=json_filename, + xlsx_filename=xlsx_filename, + sort_output=True, + rule_path=None, + jobs=1, + ml_threshold=0.0, + ml_batch_size=16, + depth=0, + doc=False, + size_limit="1G", + find_by_ext=False, + api_validation=False, + denylist_path=None, + severity=Severity.INFO) + mock_get_arguments.return_value = args_mock + self.assertEqual(EXIT_SUCCESS, app_main.main()) + self.assertFalse(os.path.exists(xlsx_filename)) + self.assertFalse(os.path.exists(json_filename)) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + @mock.patch("argparse.ArgumentParser.parse_args") def test_parse_args_n(self, mock_parse) -> None: self.assertTrue(app_main.get_arguments()) @@ -383,6 +380,48 @@ def test_find_by_ext_and_not_ignore_p(self) -> None: # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_multiple_invocation_p(self) -> None: + # test whether ml_validator is created once + self.maxDiff = None + cred_sweeper = CredSweeper() + self.assertFalse(cred_sweeper.is_ml_validator_inited) + # found candidate is not ML validated + provider = StringContentProvider(["qpF8Q~PCM5MhMoyTFc5TYEomnzRUKim9UJhe8a6E"]) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Azure Secret Value", candidates[0].rule_name) + self.assertFalse(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertFalse(cred_sweeper.is_ml_validator_inited) + + # found candidate is ML validated + provider = StringContentProvider(['"nonce": "qPRjfoZWaBPH0KbXMCicm5v1VdG5Hj0DUFMHdSxPOiS"']) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Nonce", candidates[0].rule_name) + self.assertFalse(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertTrue(cred_sweeper.is_ml_validator_inited) + # remember id of the validator + validator_id = id(cred_sweeper.ml_validator) + + # found candidate is ML validated also + provider = StringContentProvider(["password = Xdj@jcN834b"]) + candidates = cred_sweeper.file_scan(provider) + self.assertEqual(1, len(candidates)) + self.assertEqual("Password", candidates[0].rule_name) + # the ml_validator still initialized + self.assertTrue(cred_sweeper.is_ml_validator_inited) + cred_sweeper.credential_manager.set_credentials(candidates) + cred_sweeper.post_processing() + self.assertTrue(cred_sweeper.is_ml_validator_inited) + # the same id of the validator + self.assertEqual(validator_id, id(cred_sweeper.ml_validator)) + + # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # + def test_multi_jobs_p(self) -> None: # real result might be shown in code coverage content_provider: AbstractProvider = FilesProvider([SAMPLES_PATH])