Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PoC] deep scan in git repos #506

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,8 @@ ignore_missing_imports = True
[mypy-docx.*]
ignore_missing_imports = True

[mypy-pydriller.*]
ignore_missing_imports = True

[mypy-base62.*]
ignore_missing_imports = True
114 changes: 104 additions & 10 deletions credsweeper/__main__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import base64
import binascii
import hashlib
import io
import logging
import os
import sys
import time
import warnings
from argparse import ArgumentParser, ArgumentTypeError, Namespace
from typing import Any, Union, Dict
from typing import Any, Union, Optional, Dict, List, Tuple

from pydriller import Repository

from credsweeper import __version__
from credsweeper.app import APP_PATH, CredSweeper
Expand Down Expand Up @@ -116,16 +122,27 @@ def get_arguments() -> Namespace:
const="log.yaml",
dest="export_log_config",
metavar="PATH")
group.add_argument("--git", nargs="+", help="git repo to scan", dest="git", metavar="PATH")
parser.add_argument("--commits",
help="scan git repo for N commits only",
type=positive_int,
dest="commits",
default=0,
metavar="POSITIVE_INT")
parser.add_argument("--branch",
help="scan git repo for single branch, otherwise - all branches were scanned (slow)",
dest="branch",
type=str)
parser.add_argument("--rules",
help="path of rule config file (default: credsweeper/rules/config.yaml). "
f"severity:{[i.value for i in Severity]} "
f"type:{[i.value for i in RuleType]}",
f"severity:{[i.value for i in Severity]} "
f"type:{[i.value for i in RuleType]}",
default=None,
dest="rule_path",
metavar="PATH")
parser.add_argument("--severity",
help=f"set minimum level for rules to apply {[i.value for i in Severity]}"
f"(default: '{Severity.INFO}', case insensitive)",
f"(default: '{Severity.INFO}', case insensitive)",
default=Severity.INFO,
dest="severity",
type=severity_levels)
Expand Down Expand Up @@ -159,9 +176,9 @@ def get_arguments() -> Namespace:
parser.add_argument("--doc", help="document-specific scanning", dest="doc", action="store_true")
parser.add_argument("--ml_threshold",
help="setup threshold for the ml model. "
"The lower the threshold - the more credentials will be reported. "
f"Allowed values: float between 0 and 1, or any of {[e.value for e in ThresholdPreset]} "
"(default: medium)",
"The lower the threshold - the more credentials will be reported. "
f"Allowed values: float between 0 and 1, or any of {[e.value for e in ThresholdPreset]} "
"(default: medium)",
type=threshold_or_float,
default=ThresholdPreset.medium,
dest="ml_threshold",
Expand Down Expand Up @@ -232,7 +249,7 @@ def get_arguments() -> Namespace:
parser.add_argument("--log",
"-l",
help=f"provide logging level of {list(Logger.LEVELS.keys())}"
f"(default: 'warning', case insensitive)",
f"(default: 'warning', case insensitive)",
default="warning",
dest="log",
metavar="LOG_LEVEL",
Expand All @@ -252,7 +269,6 @@ def get_arguments() -> Namespace:
version=f"CredSweeper {__version__}")
return parser.parse_args()


def scan(args: Namespace, content_provider: AbstractProvider) -> int:
"""Scan content_provider data, print results or save them to json_filename is not None

Expand Down Expand Up @@ -299,9 +315,80 @@ def scan(args: Namespace, content_provider: AbstractProvider) -> int:
return credsweeper.run(content_provider=content_provider)
except Exception as exc:
logger.critical(exc, exc_info=True)
logger.exception(exc)
return -1


def drill(args: Namespace) -> Tuple[int, int, int]:
"""Scan repository for branches and commits
Returns:
total credentials found
total scanned branches
total scanned commits
"""
total_credentials = 0
total_branches = 0
total_commits = 0
try:
sha1git = hashlib.sha1(str(args.git).encode()).digest()
repo_hash = base64.b32encode(sha1git).decode("ascii")
journal_filename = f"{repo_hash}.json"
logger.info(f"{args.git} sha1 in base32 {repo_hash}")
repo_journal = Util.json_load(journal_filename)
if not isinstance(repo_journal, dict):
with open(journal_filename, "w") as f:
f.write("{}")
repo_journal = {"repo": args.git}
credsweeper = CredSweeper(rule_path=args.rule_path,
config_path=args.config_path,
sort_output=args.sort_output,
use_filters=args.no_filters,
pool_count=args.jobs,
ml_batch_size=args.ml_batch_size,
ml_threshold=args.ml_threshold,
ml_providers=args.ml_providers,
find_by_ext=args.find_by_ext,
depth=args.depth,
doc=args.doc,
severity=args.severity,
size_limit=args.size_limit,
log_level=args.log)
repository = Repository(args.git, only_in_branch=args.branch)
for commit in repository.traverse_commits():
if commit.hash in repo_journal:
logger.debug(f"Skip already scanned commit: {commit.hash}")
continue
logger.info(f"Scan commit: {commit.hash}")
paths: List[Tuple[str, io.BytesIO]] = []
for file in commit.modified_files:
logger.info(f"FILE: {file.old_path} -> {file.new_path}")
try:
if file.new_path is not None:
_io = io.BytesIO(file.content)
paths.append((file.new_path or file.old_path, _io))
except ValueError as exc:
logger.error("Possible missed submodule:%s", str(exc))
provider = FilesProvider(paths)
if args.json_filename:
ext = Util.get_extension(args.json_filename, False)
credsweeper.json_filename = f"{args.json_filename[:-len(ext)]}.{commit.hash}{ext}"
if args.xlsx_filename:
ext = Util.get_extension(args.xlsx_filename, False)
credsweeper.xlsx_filename = f"{args.xlsx_filename[:-len(ext)]}.{commit.hash}{ext}"

commit_cred_number = credsweeper.run(provider)
credsweeper.credential_manager.candidates.clear()
total_credentials += commit_cred_number
total_commits += 1
repo_journal[commit.hash] = commit_cred_number
Util.json_dump(repo_journal, journal_filename)
total_branches += 1
except Exception as exc:
logger.critical(exc, exc_info=True)
return -1, total_branches, total_commits
return total_credentials, total_branches, total_commits


def main() -> int:
"""Main function"""
result = EXIT_FAILURE
Expand All @@ -310,7 +397,7 @@ def main() -> int:
if args.banner:
print(f"CredSweeper {__version__} crc32:{check_integrity():08x}")
Logger.init_logging(args.log, args.log_config_path)
logger.info(f"Init CredSweeper object with arguments: {args}")
logger.info(f"Init CredSweeper object with arguments: {args} CWD: {os.getcwd()}")
summary: Dict[str, int] = {}
if args.path:
logger.info(f"Run analyzer on path: {args.path}")
Expand All @@ -332,6 +419,13 @@ def main() -> int:
summary["Deleted File Credentials"] = del_credentials_number
if 0 <= add_credentials_number and 0 <= del_credentials_number:
result = EXIT_SUCCESS
elif args.git:
logger.info(f"Run analyzer on GIT: {args.git}")
credentials_number, branches_number, commits_number = drill(args)
summary[
f"Detected Credentials in {branches_number} branches and {commits_number} commits "] = credentials_number
if 0 <= credentials_number:
result = EXIT_SUCCESS
elif args.export_config:
logging.info(f"Exporting default config to file: {args.export_config}")
config_dict = Util.json_load(APP_PATH / "secret" / "config.json")
Expand Down
9 changes: 8 additions & 1 deletion docs/source/guide.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ Get all argument list:

.. code-block:: text

usage: python -m credsweeper [-h] (--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH])
usage: python -m credsweeper [-h]
(--path PATH [PATH ...] | --diff_path PATH [PATH ...] | --export_config [PATH] | --export_log_config [PATH] | --git PATH [PATH ...])
[--commits POSITIVE_INT] [--branch BRANCH]
[--rules PATH] [--severity SEVERITY] [--config PATH] [--log_config PATH] [--denylist PATH]
[--find-by-ext] [--depth POSITIVE_INT] [--no-filters] [--doc] [--ml_threshold FLOAT_OR_STR]
[--ml_batch_size POSITIVE_INT] [--ml_config PATH] [--ml_model PATH] [--ml_providers STR]
Expand All @@ -31,6 +33,11 @@ Get all argument list:
exporting default config to file (default: config.json)
--export_log_config [PATH]
exporting default logger config to file (default: log.yaml)
--git PATH [PATH ...]
git repo to scan
--commits POSITIVE_INT
scan git repo for N commits only
--branch BRANCH scan git repo for single branch, otherwise - all branches were scanned (slow)
--rules PATH path of rule config file (default: credsweeper/rules/config.yaml). severity:['critical', 'high', 'medium', 'low', 'info'] type:['keyword', 'pattern', 'pem_key', 'multi']
--severity SEVERITY set minimum level for rules to apply ['critical', 'high', 'medium', 'low', 'info'](default: 'Severity.INFO', case insensitive)
--config PATH use custom config (default: built-in)
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pandas==2.2.3; python_version >= '3.9'
password-strength==0.0.3.post2
pdfminer.six==20240706
pybase62==1.0.0
PyDriller==2.7
pyjks==20.0.0
python-dateutil==2.9.0.post0
python-docx==1.1.2
Expand Down
2 changes: 1 addition & 1 deletion tests/data/depth_3.json
Original file line number Diff line number Diff line change
Expand Up @@ -13859,4 +13859,4 @@
}
]
}
]
]
2 changes: 1 addition & 1 deletion tests/data/doc.json
Original file line number Diff line number Diff line change
Expand Up @@ -19486,4 +19486,4 @@
}
]
}
]
]
2 changes: 1 addition & 1 deletion tests/data/ml_threshold.json
Original file line number Diff line number Diff line change
Expand Up @@ -11424,4 +11424,4 @@
}
]
}
]
]
2 changes: 1 addition & 1 deletion tests/data/output.json
Original file line number Diff line number Diff line change
Expand Up @@ -10592,4 +10592,4 @@
}
]
}
]
]
33 changes: 33 additions & 0 deletions tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
import subprocess
import sys
import tempfile
from pathlib import Path
import time
from typing import AnyStr, Tuple
from unittest import TestCase

import deepdiff
import numpy as np
import pandas as pd
from git import Repo

from credsweeper.app import APP_PATH
from credsweeper.utils import Util
Expand Down Expand Up @@ -203,7 +205,10 @@ def test_it_works_n(self) -> None:
" | --diff_path PATH [PATH ...]" \
" | --export_config [PATH]" \
" | --export_log_config [PATH]" \
" | --git PATH [PATH ...]" \
")" \
" [--commits POSITIVE_INT]" \
" [--branch BRANCH]" \
" [--rules PATH]" \
" [--severity SEVERITY]" \
" [--config PATH]" \
Expand Down Expand Up @@ -235,6 +240,7 @@ def test_it_works_n(self) -> None:
" --diff_path" \
" --export_config" \
" --export_log_config" \
" --git" \
" is required "
expected = " ".join(expected.split())
self.assertEqual(expected, output)
Expand Down Expand Up @@ -707,6 +713,33 @@ def test_doc_n(self) -> None:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_pydriller_p(self) -> None:
with tempfile.TemporaryDirectory() as tmp_dir:
with Repo.init(tmp_dir) as repo:
cred_file = Path(tmp_dir) / "with_cred"
value = "GbdD@23#d0"
with open(cred_file, "w") as f:
f.write(f"git_password: {value}")
repo.index.add([cred_file])
repo.index.commit("added file")
with open(cred_file, "w") as f:
f.write("DELETED")
repo.index.add([cred_file])
repo.index.commit("cleared file")
# check that value is not in the file
with open(cred_file, "r") as f:
self.assertNotIn(value, f.read())
# run git scan
_stdout, _stderr = self._m_credsweeper(["--log", "DEBUG", "--git", str(tmp_dir)])
self.assertIn("Detected Credentials in 1 branches and 2 commits : 1", _stdout, _stdout)
self.assertNotIn("CRITICAL", _stdout, _stdout)
self.assertNotIn("CRITICAL", _stderr, _stderr)
# check detected value in stdout
self.assertIn(value, _stdout, _stdout)
# del repo

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_external_ml_n(self) -> None:
# not existed ml_config
_stdout, _stderr = self._m_credsweeper(
Expand Down
42 changes: 42 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,48 @@ def test_find_by_ext_and_not_ignore_p(self) -> None:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_multiple_invocation_p(self) -> None:
# test whether ml_validator is created once
self.maxDiff = None
cred_sweeper = CredSweeper()
self.assertFalse(cred_sweeper.is_ml_validator_inited)
# found candidate is not ML validated
provider = StringContentProvider(["qpF8Q~PCM5MhMoyTFc5TYEomnzRUKim9UJhe8a6E"])
candidates = cred_sweeper.file_scan(provider)
self.assertEqual(1, len(candidates))
self.assertEqual("Azure Secret Value", candidates[0].rule_name)
self.assertFalse(cred_sweeper.is_ml_validator_inited)
cred_sweeper.credential_manager.set_credentials(candidates)
cred_sweeper.post_processing()
self.assertFalse(cred_sweeper.is_ml_validator_inited)

# found candidate is ML validated
provider = StringContentProvider(['"nonce": "qPRjfoZWaBPH0KbXMCicm5v1VdG5Hj0DUFMHdSxPOiS"'])
candidates = cred_sweeper.file_scan(provider)
self.assertEqual(1, len(candidates))
self.assertEqual("Nonce", candidates[0].rule_name)
self.assertFalse(cred_sweeper.is_ml_validator_inited)
cred_sweeper.credential_manager.set_credentials(candidates)
cred_sweeper.post_processing()
self.assertTrue(cred_sweeper.is_ml_validator_inited)
# remember id of the validator
validator_id = id(cred_sweeper.ml_validator)

# found candidate is ML validated also
provider = StringContentProvider(["password = Xdj@jcN834b"])
candidates = cred_sweeper.file_scan(provider)
self.assertEqual(1, len(candidates))
self.assertEqual("Password", candidates[0].rule_name)
# the ml_validator still initialized
self.assertTrue(cred_sweeper.is_ml_validator_inited)
cred_sweeper.credential_manager.set_credentials(candidates)
cred_sweeper.post_processing()
self.assertTrue(cred_sweeper.is_ml_validator_inited)
# the same id of the validator
self.assertEqual(validator_id, id(cred_sweeper.ml_validator))

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

def test_multi_jobs_p(self) -> None:
# real result might be shown in code coverage
content_provider: AbstractProvider = FilesProvider([SAMPLES_PATH])
Expand Down
Loading