Skip to content

Commit

Permalink
Code enchancement (#586)
Browse files Browse the repository at this point in the history
* Code enchancement

* DCfix
  • Loading branch information
babenek authored Jul 22, 2024
1 parent 9d4b2df commit dcf0a75
Show file tree
Hide file tree
Showing 16 changed files with 545 additions and 528 deletions.
3 changes: 3 additions & 0 deletions credsweeper/credentials/candidate.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ def __str__(self) -> str:
f" | api_validation: {self.api_validation.name}" \
f" | ml_validation: {self.ml_validation.name}"

def __repr__(self):
return str(self)

def to_json(self) -> Dict:
"""Convert credential candidate object to dictionary.
Expand Down
41 changes: 22 additions & 19 deletions credsweeper/credentials/line_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,8 @@ def sanitize_value(self):
self.value_start += start
self.value_end = self.value_start + len(self.value)

def clean_url_parameters(self) -> None:
"""Clean url address from 'query parameters'.
If line seem to be a URL - split by & character.
Variable should be right most value after & or ? ([-1]). And value should be left most before & ([0])
"""
# line length cannot exceed MAX_LINE_LENGTH
assert MAX_LINE_LENGTH >= len(self.line)
def check_url_part(self) -> bool:
"""Determines whether value is part of url like line"""
line_before_value = self.line[:self.value_start]
url_pos = -1
find_pos = 0
Expand All @@ -161,17 +155,23 @@ def clean_url_parameters(self) -> None:
self.url_part &= not self.url_chars_not_allowed_pattern.search(line_before_value, pos=url_pos + 3)
self.url_part |= self.line[self.variable_start - 1] in "?&" if 0 < self.variable_start else False
self.url_part |= bool(self.url_value_pattern.match(self.value))
if not self.url_part:
return
return self.url_part

# all checks have passed - line before the value may be a URL
self.variable = self.variable.rsplit('&')[-1].rsplit('?')[-1].rsplit(';')[-1]
self.value = self.value.split('&', maxsplit=1)[0].split(';', maxsplit=1)[0].split('#', maxsplit=1)[0]
if not self.variable.endswith("://"):
# skip sanitize in case of URL credential rule
value_spl = self.url_param_split.split(self.value)
if len(value_spl) > 1:
self.value = value_spl[0]
def clean_url_parameters(self) -> None:
"""Clean url address from 'query parameters'.
If line seem to be a URL - split by & character.
Variable should be right most value after & or ? ([-1]). And value should be left most before & ([0])
"""
if self.check_url_part():
# all checks have passed - line before the value may be a URL
self.variable = self.variable.rsplit('&')[-1].rsplit('?')[-1].rsplit(';')[-1]
self.value = self.value.split('&', maxsplit=1)[0].split(';', maxsplit=1)[0].split('#', maxsplit=1)[0]
if not self.variable.endswith("://"):
# skip sanitize in case of URL credential rule
value_spl = self.url_param_split.split(self.value)
if len(value_spl) > 1:
self.value = value_spl[0]

def clean_bash_parameters(self) -> None:
"""Split variable and value by bash special characters, if line assumed to be CLI command."""
Expand Down Expand Up @@ -287,10 +287,13 @@ def is_source_file_with_quotes(self) -> bool:
return True
return False

def __repr__(self) -> str:
def __str__(self):
return f"line: '{self.line}' | line_num: {self.line_num} | path: {self.path}" \
f" | value: '{self.value}' | entropy_validation: {EntropyValidator(self.value)}"

def __repr__(self):
return str(self)

def to_json(self) -> Dict:
"""Convert line data object to dictionary.
Expand Down
9 changes: 6 additions & 3 deletions credsweeper/deep_scanner/bzip2_scanner.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import bz2
import logging
from abc import ABC
from pathlib import Path
from typing import List

from credsweeper.credentials import Candidate
Expand All @@ -22,10 +23,12 @@ def data_scan(
"""Extracts data from bzip2 archive and launches data_scan"""
candidates = []
try:
new_path = data_provider.file_path if ".bz2" != Util.get_extension(
data_provider.file_path) else data_provider.file_path[:-4]
file_path = Path(data_provider.file_path)
new_path = file_path.as_posix()
if ".bz2" == file_path.suffix:
new_path = new_path[:-4]
bzip2_content_provider = DataContentProvider(data=bz2.decompress(data_provider.data),
file_path=data_provider.file_path,
file_path=new_path,
file_type=Util.get_extension(new_path),
info=f"{data_provider.info}|BZIP2|{new_path}")
new_limit = recursive_limit_size - len(bzip2_content_provider.data)
Expand Down
3 changes: 2 additions & 1 deletion credsweeper/deep_scanner/deep_scanner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import datetime
import logging
from pathlib import Path
from typing import List, Optional, Any, Tuple, Union

from credsweeper.common.constants import RECURSIVE_SCAN_LIMITATION
Expand Down Expand Up @@ -136,7 +137,7 @@ def scan(self,
data_provider = DataContentProvider(data=data,
file_path=content_provider.file_path,
file_type=content_provider.file_type,
info=content_provider.file_path)
info=Path(content_provider.file_path).as_posix())
# iterate for all possibly scanner methods WITHOUT ByteContentProvider for TextContentProvider
scanner_classes = self.get_deep_scanners(data, content_provider.file_type)
for scan_class in scanner_classes:
Expand Down
11 changes: 7 additions & 4 deletions credsweeper/deep_scanner/gzip_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import io
import logging
from abc import ABC
from pathlib import Path
from typing import List

from credsweeper.utils import Util
from credsweeper.credentials import Candidate
from credsweeper.deep_scanner.abstract_scanner import AbstractScanner
from credsweeper.file_handler.data_content_provider import DataContentProvider
from credsweeper.utils import Util

logger = logging.getLogger(__name__)

Expand All @@ -24,10 +25,12 @@ def data_scan(
candidates = []
try:
with gzip.open(io.BytesIO(data_provider.data)) as f:
new_path = data_provider.file_path if ".gz" != Util.get_extension(
data_provider.file_path) else data_provider.file_path[:-3]
file_path = Path(data_provider.file_path)
new_path = file_path.as_posix()
if ".gz" == file_path.suffix:
new_path = new_path[:-3]
gzip_content_provider = DataContentProvider(data=f.read(),
file_path=data_provider.file_path,
file_path=new_path,
file_type=Util.get_extension(new_path),
info=f"{data_provider.info}|GZIP|{new_path}")
new_limit = recursive_limit_size - len(gzip_content_provider.data)
Expand Down
24 changes: 12 additions & 12 deletions credsweeper/filters/value_entropy_base32_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ class ValueEntropyBase32Check(Filter):
def __init__(self, config: Config = None) -> None:
pass

@staticmethod
def get_min_data_entropy(x: int) -> float:
"""Returns average entropy for size of random data. Precalculated data is applied for speedup"""
if 16 == x:
y = 3.46
elif 10 <= x:
# approximation does not exceed stdev
y = 0.64 * math.log2(x) + 0.9
else:
y = 0
return y

def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""Run filter checks on received credential candidate data 'line_data'.
Expand All @@ -28,15 +40,3 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
entropy = Util.get_shannon_entropy(line_data.value, Chars.BASE32_CHARS.value)
min_entropy = ValueEntropyBase32Check.get_min_data_entropy(len(line_data.value))
return min_entropy > entropy or 0 == min_entropy

@staticmethod
def get_min_data_entropy(x: int) -> float:
"""Returns average entropy for size of random data. Precalculated data is applied for speedup"""
if 16 == x:
y = 3.46
elif 10 <= x:
# approximation does not exceed stdev
y = 0.64 * math.log2(x) + 0.9
else:
y = 0
return y
30 changes: 15 additions & 15 deletions credsweeper/filters/value_entropy_base36_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,6 @@ class ValueEntropyBase36Check(Filter):
def __init__(self, config: Config = None) -> None:
pass

def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""Run filter checks on received credential candidate data 'line_data'.
Args:
line_data: credential candidate data
target: multiline target from which line data was obtained
Return:
True, if need to filter candidate and False if left
"""
entropy = Util.get_shannon_entropy(line_data.value, Chars.BASE36_CHARS.value)
min_entropy = ValueEntropyBase36Check.get_min_data_entropy(len(line_data.value))
return min_entropy > entropy or 0 == min_entropy

@staticmethod
def get_min_data_entropy(x: int) -> float:
"""Returns minimal entropy for size of random data. Precalculated data is applied for speedup"""
Expand All @@ -44,3 +29,18 @@ def get_min_data_entropy(x: int) -> float:
else:
y = 0
return y

def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""Run filter checks on received credential candidate data 'line_data'.
Args:
line_data: credential candidate data
target: multiline target from which line data was obtained
Return:
True, if need to filter candidate and False if left
"""
entropy = Util.get_shannon_entropy(line_data.value, Chars.BASE36_CHARS.value)
min_entropy = ValueEntropyBase36Check.get_min_data_entropy(len(line_data.value))
return min_entropy > entropy or 0 == min_entropy
36 changes: 18 additions & 18 deletions credsweeper/filters/value_entropy_base64_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,6 @@ class ValueEntropyBase64Check(Filter):
def __init__(self, config: Config = None) -> None:
pass

def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""Run filter checks on received credential candidate data 'line_data'.
Args:
line_data: credential candidate data
target: multiline target from which line data was obtained
Return:
True, if need to filter candidate and False if left
"""
if '-' in line_data.value or '_' in line_data.value:
entropy = Util.get_shannon_entropy(line_data.value, Chars.BASE64URL_CHARS.value)
else:
entropy = Util.get_shannon_entropy(line_data.value, Chars.BASE64STD_CHARS.value)
min_entropy = ValueEntropyBase64Check.get_min_data_entropy(len(line_data.value))
return min_entropy > entropy or 0 == min_entropy

@staticmethod
def get_min_data_entropy(x: int) -> float:
"""Returns minimal average entropy for size of random data. Precalculated round data is applied for speedup"""
Expand All @@ -54,3 +36,21 @@ def get_min_data_entropy(x: int) -> float:
else:
y = 0
return y

def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""Run filter checks on received credential candidate data 'line_data'.
Args:
line_data: credential candidate data
target: multiline target from which line data was obtained
Return:
True, if need to filter candidate and False if left
"""
if '-' in line_data.value or '_' in line_data.value:
entropy = Util.get_shannon_entropy(line_data.value, Chars.BASE64URL_CHARS.value)
else:
entropy = Util.get_shannon_entropy(line_data.value, Chars.BASE64STD_CHARS.value)
min_entropy = ValueEntropyBase64Check.get_min_data_entropy(len(line_data.value))
return min_entropy > entropy or 0 == min_entropy
4 changes: 1 addition & 3 deletions credsweeper/filters/value_not_allowed_pattern_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
True, if need to filter candidate and False if left
"""
if line_data.is_well_quoted_value:
return False
if self.NOT_ALLOWED_PATTERN.search(line_data.value):
if not line_data.is_well_quoted_value and self.NOT_ALLOWED_PATTERN.search(line_data.value):
return True
return False
54 changes: 30 additions & 24 deletions credsweeper/filters/value_not_part_encoded_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,29 @@ class ValueNotPartEncodedCheck(Filter):
def __init__(self, config: Config = None) -> None:
pass

@staticmethod
def check_line_target_fit(line_data: LineData, target: AnalysisTarget) -> bool:
"""Verifies whether line data fit to be a part of many lines"""
return line_data.line_num == target.line_num \
and len(line_data.line) == target.line_len \
and line_data.line == target.line \
and 0 < target.line_num <= target.lines_len \
and line_data.line == target.lines[target.line_num - 1]

@staticmethod
def check_val(line: str, pattern: re.Pattern) -> Optional[bool]:
"""Verifies whether the line looks like a pattern"""
match_obj = pattern.match(line)
if match_obj:
val = match_obj.group("val")
# not a path-like
if not val.startswith('/'):
return True
# padding sign
if '=' == val[-1]:
return True
return None

def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""Run filter checks on received credential candidate data 'line_data'.
Expand All @@ -30,49 +53,32 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""

if line_data.line_num == target.line_num \
and len(line_data.line) == target.line_len \
and line_data.line == target.line \
and 0 < target.line_num <= target.lines_len \
and line_data.line == target.lines[target.line_num - 1]:
if ValueNotPartEncodedCheck.check_line_target_fit(line_data, target):
# suppose, there is plain lines order
if 1 < target.line_num:
result = ValueNotPartEncodedCheck._check_val(
target.lines[line_data.line_num - 2], ValueNotPartEncodedCheck.BASE64_ENCODED_DATA_PATTERN_BEFORE)
result = ValueNotPartEncodedCheck.check_val(target.lines[line_data.line_num - 2],
ValueNotPartEncodedCheck.BASE64_ENCODED_DATA_PATTERN_BEFORE)
if result is not None:
return result
if target.lines_len > target.line_num:
result = ValueNotPartEncodedCheck._check_val(target.lines[line_data.line_num],
ValueNotPartEncodedCheck.BASE64_ENCODED_DATA_PATTERN_AFTER)
result = ValueNotPartEncodedCheck.check_val(target.lines[line_data.line_num],
ValueNotPartEncodedCheck.BASE64_ENCODED_DATA_PATTERN_AFTER)
if result is not None:
return result
else:
# otherwise - need to iterate for all lines
for i in range(target.lines_len):
if line_data.line == target.lines[i]:
if 0 < i:
result = ValueNotPartEncodedCheck._check_val(
result = ValueNotPartEncodedCheck.check_val(
target.lines[i - 1], ValueNotPartEncodedCheck.BASE64_ENCODED_DATA_PATTERN_BEFORE)
if result is not None:
return result
i += 1
if target.lines_len > i:
result = ValueNotPartEncodedCheck._check_val(
result = ValueNotPartEncodedCheck.check_val(
target.lines[i], ValueNotPartEncodedCheck.BASE64_ENCODED_DATA_PATTERN_AFTER)
if result is not None:
return result
break
return False

@staticmethod
def _check_val(line: str, pattern: re.Pattern) -> Optional[bool]:
match_obj = pattern.match(line)
if match_obj:
val = match_obj.group("val")
# not a path-like
if not val.startswith('/'):
return True
# padding sign
if '=' == val[-1]:
return True
return None
22 changes: 11 additions & 11 deletions credsweeper/filters/value_token_base32_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,17 @@ class ValueTokenBase32Check(Filter):
def __init__(self, config: Config = None) -> None:
pass

@staticmethod
def get_min_strength(x: int) -> float:
"""Returns minimal strength. Precalculated data is applied for speedup"""
if 16 == x:
y = 0.7047
elif 8 <= x <= 32:
y = ((0.000046 * x - 0.0044) * x + 0.146) * x - 0.7
else:
y = 1
return y

def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
"""Run filter checks on received credential candidate data 'line_data'.
Expand All @@ -27,14 +38,3 @@ def run(self, line_data: LineData, target: AnalysisTarget) -> bool:
strength = float(PasswordStats(line_data.value).strength())
min_strength = ValueTokenBase32Check.get_min_strength(len(line_data.value))
return min_strength > strength

@staticmethod
def get_min_strength(x: int) -> float:
"""Returns minimal strength. Precalculated data is applied for speedup"""
if 16 == x:
y = 0.7047
elif 8 <= x <= 32:
y = ((0.000046 * x - 0.0044) * x + 0.146) * x - 0.7
else:
y = 1
return y
Loading

0 comments on commit dcf0a75

Please sign in to comment.