From ee9aa894ae21378bea5e21d851fc2c5f23c7848b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Mon, 28 Oct 2024 14:33:15 +0100 Subject: [PATCH] chore: ruff fixes --- fact_extractor/helperFunctions/statistics.py | 18 ++++--- fact_extractor/install/common.py | 37 +++++++------- .../generic_carver/code/generic_carver.py | 20 ++++---- .../unpacking/generic_fs/code/generic_fs.py | 22 +++++--- fact_extractor/unpacker/unpackBase.py | 51 +++++++++++-------- 5 files changed, 84 insertions(+), 64 deletions(-) diff --git a/fact_extractor/helperFunctions/statistics.py b/fact_extractor/helperFunctions/statistics.py index 200ce319..8ba5489a 100644 --- a/fact_extractor/helperFunctions/statistics.py +++ b/fact_extractor/helperFunctions/statistics.py @@ -1,7 +1,7 @@ -from configparser import ConfigParser +from __future__ import annotations + from contextlib import suppress -from pathlib import Path -from typing import Dict, List +from typing import TYPE_CHECKING import magic from common_helper_files import safe_rglob @@ -9,8 +9,12 @@ from helperFunctions.config import read_list_from_config +if TYPE_CHECKING: + from configparser import ConfigParser + from pathlib import Path + -def add_unpack_statistics(extraction_dir: Path, meta_data: Dict): +def add_unpack_statistics(extraction_dir: Path, meta_data: dict): unpacked_files, unpacked_directories = 0, 0 for extracted_item in safe_rglob(extraction_dir): if extracted_item.is_file(): @@ -23,7 +27,7 @@ def add_unpack_statistics(extraction_dir: Path, meta_data: Dict): def get_unpack_status( - file_path: str, binary: bytes, extracted_files: List[Path], meta_data: Dict, config: ConfigParser + file_path: str, binary: bytes, extracted_files: list[Path], meta_data: dict, config: ConfigParser ): meta_data['summary'] = [] meta_data['entropy'] = avg_entropy(binary) @@ -43,7 +47,7 @@ def get_unpack_status( _detect_unpack_loss(binary, extracted_files, meta_data, config.getint('ExpertSettings', 'header_overhead')) -def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: Dict, header_overhead: int): +def _detect_unpack_loss(binary: bytes, extracted_files: list[Path], meta_data: dict, header_overhead: int): decoding_overhead = 1 - meta_data.get('encoding_overhead', 0) cleaned_size = get_binary_size_without_padding(binary) * decoding_overhead - header_overhead size_of_extracted_files = _total_size_of_extracted_files(extracted_files) @@ -52,7 +56,7 @@ def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: D meta_data['summary'] = ['data lost'] if cleaned_size > size_of_extracted_files else ['no data lost'] -def _total_size_of_extracted_files(extracted_files: List[Path]) -> int: +def _total_size_of_extracted_files(extracted_files: list[Path]) -> int: total_size = 0 for item in extracted_files: with suppress(OSError): diff --git a/fact_extractor/install/common.py b/fact_extractor/install/common.py index b435996c..0eb25ba3 100644 --- a/fact_extractor/install/common.py +++ b/fact_extractor/install/common.py @@ -1,12 +1,14 @@ import logging import subprocess as sp -import os -from contextlib import suppress from pathlib import Path from helperFunctions.config import load_config from helperFunctions.install import ( - apt_install_packages, apt_update_sources, pip_install_packages, load_requirements_file, OperateInDirectory + OperateInDirectory, + apt_install_packages, + apt_update_sources, + load_requirements_file, + pip_install_packages, ) APT_DEPENDENCIES = { @@ -31,6 +33,7 @@ ], } PIP_DEPENDENCY_FILE = Path(__file__).parent.parent.parent / 'requirements-common.txt' +BIN_DIR = Path(__file__).parent.parent / 'bin' def install_apt_dependencies(distribution: str): @@ -39,23 +42,23 @@ def install_apt_dependencies(distribution: str): def _install_magic(): - bin_dir = Path(__file__).parent.parent / 'bin' - with OperateInDirectory(bin_dir): + with OperateInDirectory(BIN_DIR): sp.run( [ - "wget", - "--output-document", - "../bin/firmware.xz", - "https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.1/firmware.xz", + 'wget', + '--output-document', + '../bin/firmware.xz', + 'https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.1/firmware.xz', ], check=True, ) sp.run( [ - "unxz", - "--force", - "../bin/firmware.xz", - ] + 'unxz', + '--force', + '../bin/firmware.xz', + ], + check=False, ) @@ -67,15 +70,13 @@ def main(distribution): install_apt_dependencies(distribution) pip_install_packages(*load_requirements_file(PIP_DEPENDENCY_FILE)) - # make bin dir - with suppress(FileExistsError): - os.mkdir('../bin') + BIN_DIR.mkdir(exist_ok=True) _install_magic() config = load_config('main.cfg') data_folder = config.get('unpack', 'data_folder') - os.makedirs(str(Path(data_folder, 'files')), exist_ok=True) - os.makedirs(str(Path(data_folder, 'reports')), exist_ok=True) + Path(data_folder, 'files').mkdir(exist_ok=True) + Path(data_folder, 'reports').mkdir(exist_ok=True) return 0 diff --git a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py index 9f21867d..cc3172c6 100644 --- a/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py +++ b/fact_extractor/plugins/unpacking/generic_carver/code/generic_carver.py @@ -1,14 +1,15 @@ -''' +""" This plugin unpacks all files via carving -''' +""" + from __future__ import annotations -import magic import logging import re import shutil from pathlib import Path +import magic from common_helper_process import execute_shell_command NAME = 'generic_carver' @@ -24,10 +25,10 @@ def unpack_function(file_path, tmp_dir): - ''' + """ file_path specifies the input file. tmp_dir should be used to store the extracted files. - ''' + """ logging.debug(f'File Type unknown: execute binwalk on {file_path}') output = execute_shell_command(f'binwalk --extract --carve --signature --directory {tmp_dir} {file_path}') @@ -81,10 +82,7 @@ def _is_possible_tar(file_type: str, file_path: Path) -> bool: def _remove_invalid_archives(self, file_path: Path, command, search_key=None): output = execute_shell_command(command.format(file_path)) - if search_key and search_key in output.replace('\n ', ''): - self._remove_file(file_path) - - elif not search_key and _output_is_empty(output): + if search_key and search_key in output.replace('\n ', '') or not search_key and _output_is_empty(output): self._remove_file(file_path) def _remove_file(self, file_path): @@ -115,7 +113,7 @@ def _output_is_empty(output): def _find_trailing_data_index_zip(file_path: Path) -> int | None: - '''Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size.''' + """Archives carved by binwalk often have trailing data at the end. 7z can determine the actual file size.""" output = execute_shell_command(f'7z l {file_path}') if 'There are data after the end of archive' in output: match = REAL_SIZE_REGEX.search(output) @@ -140,7 +138,7 @@ def drop_underscore_directory(tmp_dir): extracted_contents = list(Path(tmp_dir).iterdir()) if not extracted_contents: return - if not len(extracted_contents) == 1 or not extracted_contents[0].name.endswith('.extracted'): + if len(extracted_contents) != 1 or not extracted_contents[0].name.endswith('.extracted'): return for result in extracted_contents[0].iterdir(): shutil.move(str(result), str(result.parent.parent)) diff --git a/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py b/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py index fdc213b0..5d94cbce 100644 --- a/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py +++ b/fact_extractor/plugins/unpacking/generic_fs/code/generic_fs.py @@ -1,17 +1,27 @@ -''' +""" This plugin mounts filesystem images and extracts their content -''' +""" + import re -import magic from shlex import split -from subprocess import run, PIPE, STDOUT +from subprocess import PIPE, STDOUT, run from tempfile import TemporaryDirectory from time import sleep +import magic + NAME = 'genericFS' MIME_PATTERNS = [ - 'filesystem/btrfs', 'filesystem/dosmbr', 'filesystem/f2fs', 'filesystem/jfs', 'filesystem/minix', - 'filesystem/reiserfs', 'filesystem/romfs', 'filesystem/udf', 'filesystem/xfs', 'generic/fs', + 'filesystem/btrfs', + 'filesystem/dosmbr', + 'filesystem/f2fs', + 'filesystem/jfs', + 'filesystem/minix', + 'filesystem/reiserfs', + 'filesystem/romfs', + 'filesystem/udf', + 'filesystem/xfs', + 'generic/fs', ] VERSION = '0.6.1' TYPES = { diff --git a/fact_extractor/unpacker/unpackBase.py b/fact_extractor/unpacker/unpackBase.py index c871ab27..6ee09fee 100644 --- a/fact_extractor/unpacker/unpackBase.py +++ b/fact_extractor/unpacker/unpackBase.py @@ -1,9 +1,12 @@ +# noqa: N999 +from __future__ import annotations + import fnmatch import logging from os import getgid, getuid from subprocess import PIPE, Popen from time import time -from typing import Callable, Dict, List, Tuple +from typing import Callable import magic from common_helper_files import get_files_in_dir @@ -12,10 +15,10 @@ from fact_extractor.helperFunctions.plugin import import_plugins -class UnpackBase(object): - ''' +class UnpackBase: + """ The unpacker module unpacks all files included in a file - ''' + """ def __init__(self, config=None, extract_everything: bool = False): self.config = config @@ -37,39 +40,43 @@ def load_plugins(self): def _set_whitelist(self): self.blacklist = read_list_from_config(self.config, 'unpack', 'blacklist') - logging.debug(f'''Ignore (Blacklist): {', '.join(self.blacklist)}''') + logging.debug(f"""Ignore (Blacklist): {', '.join(self.blacklist)}""") for item in self.blacklist: self.register_plugin(item, self.unpacker_plugins['generic/nop']) - def register_plugin(self, mime_type: str, unpacker_name_and_function: Tuple[Callable[[str, str], Dict], str, str]): + def register_plugin(self, mime_type: str, unpacker_name_and_function: tuple[Callable[[str, str], dict], str, str]): self.unpacker_plugins[mime_type] = unpacker_name_and_function def get_unpacker(self, mime_type: str): if mime_type in list(self.unpacker_plugins.keys()): return self.unpacker_plugins[mime_type] - else: - return self.unpacker_plugins['generic/carver'] + return self.unpacker_plugins['generic/carver'] - def extract_files_from_file(self, file_path: str, tmp_dir) -> Tuple[List, Dict]: + def extract_files_from_file(self, file_path: str, tmp_dir) -> tuple[list, dict]: current_unpacker = self.get_unpacker(magic.from_file(file_path, mime=True)) return self._extract_files_from_file_using_specific_unpacker(file_path, tmp_dir, current_unpacker) - def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) -> Tuple[List, Dict]: + def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) -> tuple[list, dict]: fallback_plugin = self.unpacker_plugins[fallback_plugin_mime] - old_meta[f'''0_FALLBACK_{old_meta['plugin_used']}'''] = f'''{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)''' - if 'output' in old_meta.keys(): - old_meta[f'''0_ERROR_{old_meta['plugin_used']}'''] = old_meta['output'] - return self._extract_files_from_file_using_specific_unpacker(file_path, tmp_dir, fallback_plugin, meta_data=old_meta) + old_meta[f"""0_FALLBACK_{old_meta['plugin_used']}"""] = ( + f"""{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)""" + ) + if 'output' in old_meta: + old_meta[f"""0_ERROR_{old_meta['plugin_used']}"""] = old_meta['output'] + return self._extract_files_from_file_using_specific_unpacker( + file_path, tmp_dir, fallback_plugin, meta_data=old_meta + ) def _should_ignore(self, file): path = str(file) - for pattern in self.exclude: - if fnmatch.fnmatchcase(path, pattern): - return True - return False + return any(fnmatch.fnmatchcase(path, pattern) for pattern in self.exclude) - def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict = None) -> Tuple[List, Dict]: - unpack_function, name, version = selected_unpacker # TODO Refactor register method to directly use four parameters instead of three + def _extract_files_from_file_using_specific_unpacker( + self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict | None = None + ) -> tuple[list, dict]: + unpack_function, name, version = ( + selected_unpacker # TODO Refactor register method to directly use four parameters instead of three + ) if meta_data is None: meta_data = {} @@ -82,7 +89,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d additional_meta = unpack_function(file_path, tmp_dir) except Exception as error: logging.debug(f'Unpacking of {file_path} failed: {error}', exc_info=True) - additional_meta = {'error': f'{type(error)}: {str(error)}'} + additional_meta = {'error': f'{type(error)}: {error!s}'} if isinstance(additional_meta, dict): meta_data.update(additional_meta) @@ -102,7 +109,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d meta_data['number_of_excluded_files'] = excluded_count return out, meta_data - def change_owner_back_to_me(self, directory: str = None, permissions: str = 'u+r'): + def change_owner_back_to_me(self, directory: str, permissions: str = 'u+r'): with Popen(f'sudo chown -R {getuid()}:{getgid()} {directory}', shell=True, stdout=PIPE, stderr=PIPE) as pl: pl.communicate() self.grant_read_permission(directory, permissions)