From be010d52ac56ff7c26e3fc91bb4b1b7ee38d70ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Stucke?= Date: Mon, 28 Oct 2024 14:33:15 +0100 Subject: [PATCH] chore: ruff fixes --- fact_extractor/helperFunctions/statistics.py | 18 ++++--- fact_extractor/install/common.py | 37 +++++++------- fact_extractor/unpacker/unpackBase.py | 51 +++++++++++--------- 3 files changed, 59 insertions(+), 47 deletions(-) diff --git a/fact_extractor/helperFunctions/statistics.py b/fact_extractor/helperFunctions/statistics.py index 200ce319..8ba5489a 100644 --- a/fact_extractor/helperFunctions/statistics.py +++ b/fact_extractor/helperFunctions/statistics.py @@ -1,7 +1,7 @@ -from configparser import ConfigParser +from __future__ import annotations + from contextlib import suppress -from pathlib import Path -from typing import Dict, List +from typing import TYPE_CHECKING import magic from common_helper_files import safe_rglob @@ -9,8 +9,12 @@ from helperFunctions.config import read_list_from_config +if TYPE_CHECKING: + from configparser import ConfigParser + from pathlib import Path + -def add_unpack_statistics(extraction_dir: Path, meta_data: Dict): +def add_unpack_statistics(extraction_dir: Path, meta_data: dict): unpacked_files, unpacked_directories = 0, 0 for extracted_item in safe_rglob(extraction_dir): if extracted_item.is_file(): @@ -23,7 +27,7 @@ def add_unpack_statistics(extraction_dir: Path, meta_data: Dict): def get_unpack_status( - file_path: str, binary: bytes, extracted_files: List[Path], meta_data: Dict, config: ConfigParser + file_path: str, binary: bytes, extracted_files: list[Path], meta_data: dict, config: ConfigParser ): meta_data['summary'] = [] meta_data['entropy'] = avg_entropy(binary) @@ -43,7 +47,7 @@ def get_unpack_status( _detect_unpack_loss(binary, extracted_files, meta_data, config.getint('ExpertSettings', 'header_overhead')) -def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: Dict, header_overhead: int): +def _detect_unpack_loss(binary: bytes, extracted_files: list[Path], meta_data: dict, header_overhead: int): decoding_overhead = 1 - meta_data.get('encoding_overhead', 0) cleaned_size = get_binary_size_without_padding(binary) * decoding_overhead - header_overhead size_of_extracted_files = _total_size_of_extracted_files(extracted_files) @@ -52,7 +56,7 @@ def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: D meta_data['summary'] = ['data lost'] if cleaned_size > size_of_extracted_files else ['no data lost'] -def _total_size_of_extracted_files(extracted_files: List[Path]) -> int: +def _total_size_of_extracted_files(extracted_files: list[Path]) -> int: total_size = 0 for item in extracted_files: with suppress(OSError): diff --git a/fact_extractor/install/common.py b/fact_extractor/install/common.py index b435996c..0eb25ba3 100644 --- a/fact_extractor/install/common.py +++ b/fact_extractor/install/common.py @@ -1,12 +1,14 @@ import logging import subprocess as sp -import os -from contextlib import suppress from pathlib import Path from helperFunctions.config import load_config from helperFunctions.install import ( - apt_install_packages, apt_update_sources, pip_install_packages, load_requirements_file, OperateInDirectory + OperateInDirectory, + apt_install_packages, + apt_update_sources, + load_requirements_file, + pip_install_packages, ) APT_DEPENDENCIES = { @@ -31,6 +33,7 @@ ], } PIP_DEPENDENCY_FILE = Path(__file__).parent.parent.parent / 'requirements-common.txt' +BIN_DIR = Path(__file__).parent.parent / 'bin' def install_apt_dependencies(distribution: str): @@ -39,23 +42,23 @@ def install_apt_dependencies(distribution: str): def _install_magic(): - bin_dir = Path(__file__).parent.parent / 'bin' - with OperateInDirectory(bin_dir): + with OperateInDirectory(BIN_DIR): sp.run( [ - "wget", - "--output-document", - "../bin/firmware.xz", - "https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.1/firmware.xz", + 'wget', + '--output-document', + '../bin/firmware.xz', + 'https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.1/firmware.xz', ], check=True, ) sp.run( [ - "unxz", - "--force", - "../bin/firmware.xz", - ] + 'unxz', + '--force', + '../bin/firmware.xz', + ], + check=False, ) @@ -67,15 +70,13 @@ def main(distribution): install_apt_dependencies(distribution) pip_install_packages(*load_requirements_file(PIP_DEPENDENCY_FILE)) - # make bin dir - with suppress(FileExistsError): - os.mkdir('../bin') + BIN_DIR.mkdir(exist_ok=True) _install_magic() config = load_config('main.cfg') data_folder = config.get('unpack', 'data_folder') - os.makedirs(str(Path(data_folder, 'files')), exist_ok=True) - os.makedirs(str(Path(data_folder, 'reports')), exist_ok=True) + Path(data_folder, 'files').mkdir(exist_ok=True) + Path(data_folder, 'reports').mkdir(exist_ok=True) return 0 diff --git a/fact_extractor/unpacker/unpackBase.py b/fact_extractor/unpacker/unpackBase.py index c871ab27..6ee09fee 100644 --- a/fact_extractor/unpacker/unpackBase.py +++ b/fact_extractor/unpacker/unpackBase.py @@ -1,9 +1,12 @@ +# noqa: N999 +from __future__ import annotations + import fnmatch import logging from os import getgid, getuid from subprocess import PIPE, Popen from time import time -from typing import Callable, Dict, List, Tuple +from typing import Callable import magic from common_helper_files import get_files_in_dir @@ -12,10 +15,10 @@ from fact_extractor.helperFunctions.plugin import import_plugins -class UnpackBase(object): - ''' +class UnpackBase: + """ The unpacker module unpacks all files included in a file - ''' + """ def __init__(self, config=None, extract_everything: bool = False): self.config = config @@ -37,39 +40,43 @@ def load_plugins(self): def _set_whitelist(self): self.blacklist = read_list_from_config(self.config, 'unpack', 'blacklist') - logging.debug(f'''Ignore (Blacklist): {', '.join(self.blacklist)}''') + logging.debug(f"""Ignore (Blacklist): {', '.join(self.blacklist)}""") for item in self.blacklist: self.register_plugin(item, self.unpacker_plugins['generic/nop']) - def register_plugin(self, mime_type: str, unpacker_name_and_function: Tuple[Callable[[str, str], Dict], str, str]): + def register_plugin(self, mime_type: str, unpacker_name_and_function: tuple[Callable[[str, str], dict], str, str]): self.unpacker_plugins[mime_type] = unpacker_name_and_function def get_unpacker(self, mime_type: str): if mime_type in list(self.unpacker_plugins.keys()): return self.unpacker_plugins[mime_type] - else: - return self.unpacker_plugins['generic/carver'] + return self.unpacker_plugins['generic/carver'] - def extract_files_from_file(self, file_path: str, tmp_dir) -> Tuple[List, Dict]: + def extract_files_from_file(self, file_path: str, tmp_dir) -> tuple[list, dict]: current_unpacker = self.get_unpacker(magic.from_file(file_path, mime=True)) return self._extract_files_from_file_using_specific_unpacker(file_path, tmp_dir, current_unpacker) - def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) -> Tuple[List, Dict]: + def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) -> tuple[list, dict]: fallback_plugin = self.unpacker_plugins[fallback_plugin_mime] - old_meta[f'''0_FALLBACK_{old_meta['plugin_used']}'''] = f'''{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)''' - if 'output' in old_meta.keys(): - old_meta[f'''0_ERROR_{old_meta['plugin_used']}'''] = old_meta['output'] - return self._extract_files_from_file_using_specific_unpacker(file_path, tmp_dir, fallback_plugin, meta_data=old_meta) + old_meta[f"""0_FALLBACK_{old_meta['plugin_used']}"""] = ( + f"""{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)""" + ) + if 'output' in old_meta: + old_meta[f"""0_ERROR_{old_meta['plugin_used']}"""] = old_meta['output'] + return self._extract_files_from_file_using_specific_unpacker( + file_path, tmp_dir, fallback_plugin, meta_data=old_meta + ) def _should_ignore(self, file): path = str(file) - for pattern in self.exclude: - if fnmatch.fnmatchcase(path, pattern): - return True - return False + return any(fnmatch.fnmatchcase(path, pattern) for pattern in self.exclude) - def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict = None) -> Tuple[List, Dict]: - unpack_function, name, version = selected_unpacker # TODO Refactor register method to directly use four parameters instead of three + def _extract_files_from_file_using_specific_unpacker( + self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict | None = None + ) -> tuple[list, dict]: + unpack_function, name, version = ( + selected_unpacker # TODO Refactor register method to directly use four parameters instead of three + ) if meta_data is None: meta_data = {} @@ -82,7 +89,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d additional_meta = unpack_function(file_path, tmp_dir) except Exception as error: logging.debug(f'Unpacking of {file_path} failed: {error}', exc_info=True) - additional_meta = {'error': f'{type(error)}: {str(error)}'} + additional_meta = {'error': f'{type(error)}: {error!s}'} if isinstance(additional_meta, dict): meta_data.update(additional_meta) @@ -102,7 +109,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d meta_data['number_of_excluded_files'] = excluded_count return out, meta_data - def change_owner_back_to_me(self, directory: str = None, permissions: str = 'u+r'): + def change_owner_back_to_me(self, directory: str, permissions: str = 'u+r'): with Popen(f'sudo chown -R {getuid()}:{getgid()} {directory}', shell=True, stdout=PIPE, stderr=PIPE) as pl: pl.communicate() self.grant_read_permission(directory, permissions)