Skip to content

Commit

Permalink
chore: ruff fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Oct 28, 2024
1 parent 972826a commit be010d5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 47 deletions.
18 changes: 11 additions & 7 deletions fact_extractor/helperFunctions/statistics.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
from configparser import ConfigParser
from __future__ import annotations

from contextlib import suppress
from pathlib import Path
from typing import Dict, List
from typing import TYPE_CHECKING

import magic
from common_helper_files import safe_rglob
from common_helper_unpacking_classifier import avg_entropy, get_binary_size_without_padding, is_compressed

from helperFunctions.config import read_list_from_config

if TYPE_CHECKING:
from configparser import ConfigParser
from pathlib import Path


def add_unpack_statistics(extraction_dir: Path, meta_data: Dict):
def add_unpack_statistics(extraction_dir: Path, meta_data: dict):
unpacked_files, unpacked_directories = 0, 0
for extracted_item in safe_rglob(extraction_dir):
if extracted_item.is_file():
Expand All @@ -23,7 +27,7 @@ def add_unpack_statistics(extraction_dir: Path, meta_data: Dict):


def get_unpack_status(
file_path: str, binary: bytes, extracted_files: List[Path], meta_data: Dict, config: ConfigParser
file_path: str, binary: bytes, extracted_files: list[Path], meta_data: dict, config: ConfigParser
):
meta_data['summary'] = []
meta_data['entropy'] = avg_entropy(binary)
Expand All @@ -43,7 +47,7 @@ def get_unpack_status(
_detect_unpack_loss(binary, extracted_files, meta_data, config.getint('ExpertSettings', 'header_overhead'))


def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: Dict, header_overhead: int):
def _detect_unpack_loss(binary: bytes, extracted_files: list[Path], meta_data: dict, header_overhead: int):
decoding_overhead = 1 - meta_data.get('encoding_overhead', 0)
cleaned_size = get_binary_size_without_padding(binary) * decoding_overhead - header_overhead
size_of_extracted_files = _total_size_of_extracted_files(extracted_files)
Expand All @@ -52,7 +56,7 @@ def _detect_unpack_loss(binary: bytes, extracted_files: List[Path], meta_data: D
meta_data['summary'] = ['data lost'] if cleaned_size > size_of_extracted_files else ['no data lost']


def _total_size_of_extracted_files(extracted_files: List[Path]) -> int:
def _total_size_of_extracted_files(extracted_files: list[Path]) -> int:
total_size = 0
for item in extracted_files:
with suppress(OSError):
Expand Down
37 changes: 19 additions & 18 deletions fact_extractor/install/common.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import logging
import subprocess as sp
import os
from contextlib import suppress
from pathlib import Path

from helperFunctions.config import load_config
from helperFunctions.install import (
apt_install_packages, apt_update_sources, pip_install_packages, load_requirements_file, OperateInDirectory
OperateInDirectory,
apt_install_packages,
apt_update_sources,
load_requirements_file,
pip_install_packages,
)

APT_DEPENDENCIES = {
Expand All @@ -31,6 +33,7 @@
],
}
PIP_DEPENDENCY_FILE = Path(__file__).parent.parent.parent / 'requirements-common.txt'
BIN_DIR = Path(__file__).parent.parent / 'bin'


def install_apt_dependencies(distribution: str):
Expand All @@ -39,23 +42,23 @@ def install_apt_dependencies(distribution: str):


def _install_magic():
bin_dir = Path(__file__).parent.parent / 'bin'
with OperateInDirectory(bin_dir):
with OperateInDirectory(BIN_DIR):
sp.run(
[
"wget",
"--output-document",
"../bin/firmware.xz",
"https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.1/firmware.xz",
'wget',
'--output-document',
'../bin/firmware.xz',
'https://github.com/fkie-cad/firmware-magic-database/releases/download/v0.2.1/firmware.xz',
],
check=True,
)
sp.run(
[
"unxz",
"--force",
"../bin/firmware.xz",
]
'unxz',
'--force',
'../bin/firmware.xz',
],
check=False,
)


Expand All @@ -67,15 +70,13 @@ def main(distribution):
install_apt_dependencies(distribution)
pip_install_packages(*load_requirements_file(PIP_DEPENDENCY_FILE))

# make bin dir
with suppress(FileExistsError):
os.mkdir('../bin')
BIN_DIR.mkdir(exist_ok=True)

_install_magic()

config = load_config('main.cfg')
data_folder = config.get('unpack', 'data_folder')
os.makedirs(str(Path(data_folder, 'files')), exist_ok=True)
os.makedirs(str(Path(data_folder, 'reports')), exist_ok=True)
Path(data_folder, 'files').mkdir(exist_ok=True)
Path(data_folder, 'reports').mkdir(exist_ok=True)

return 0
51 changes: 29 additions & 22 deletions fact_extractor/unpacker/unpackBase.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# noqa: N999
from __future__ import annotations

import fnmatch
import logging
from os import getgid, getuid
from subprocess import PIPE, Popen
from time import time
from typing import Callable, Dict, List, Tuple
from typing import Callable

import magic
from common_helper_files import get_files_in_dir
Expand All @@ -12,10 +15,10 @@
from fact_extractor.helperFunctions.plugin import import_plugins


class UnpackBase(object):
'''
class UnpackBase:
"""
The unpacker module unpacks all files included in a file
'''
"""

def __init__(self, config=None, extract_everything: bool = False):
self.config = config
Expand All @@ -37,39 +40,43 @@ def load_plugins(self):

def _set_whitelist(self):
self.blacklist = read_list_from_config(self.config, 'unpack', 'blacklist')
logging.debug(f'''Ignore (Blacklist): {', '.join(self.blacklist)}''')
logging.debug(f"""Ignore (Blacklist): {', '.join(self.blacklist)}""")
for item in self.blacklist:
self.register_plugin(item, self.unpacker_plugins['generic/nop'])

def register_plugin(self, mime_type: str, unpacker_name_and_function: Tuple[Callable[[str, str], Dict], str, str]):
def register_plugin(self, mime_type: str, unpacker_name_and_function: tuple[Callable[[str, str], dict], str, str]):
self.unpacker_plugins[mime_type] = unpacker_name_and_function

def get_unpacker(self, mime_type: str):
if mime_type in list(self.unpacker_plugins.keys()):
return self.unpacker_plugins[mime_type]
else:
return self.unpacker_plugins['generic/carver']
return self.unpacker_plugins['generic/carver']

def extract_files_from_file(self, file_path: str, tmp_dir) -> Tuple[List, Dict]:
def extract_files_from_file(self, file_path: str, tmp_dir) -> tuple[list, dict]:
current_unpacker = self.get_unpacker(magic.from_file(file_path, mime=True))
return self._extract_files_from_file_using_specific_unpacker(file_path, tmp_dir, current_unpacker)

def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) -> Tuple[List, Dict]:
def unpacking_fallback(self, file_path, tmp_dir, old_meta, fallback_plugin_mime) -> tuple[list, dict]:
fallback_plugin = self.unpacker_plugins[fallback_plugin_mime]
old_meta[f'''0_FALLBACK_{old_meta['plugin_used']}'''] = f'''{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)'''
if 'output' in old_meta.keys():
old_meta[f'''0_ERROR_{old_meta['plugin_used']}'''] = old_meta['output']
return self._extract_files_from_file_using_specific_unpacker(file_path, tmp_dir, fallback_plugin, meta_data=old_meta)
old_meta[f"""0_FALLBACK_{old_meta['plugin_used']}"""] = (
f"""{old_meta['plugin_used']} (failed) -> {fallback_plugin_mime} (fallback)"""
)
if 'output' in old_meta:
old_meta[f"""0_ERROR_{old_meta['plugin_used']}"""] = old_meta['output']
return self._extract_files_from_file_using_specific_unpacker(
file_path, tmp_dir, fallback_plugin, meta_data=old_meta
)

def _should_ignore(self, file):
path = str(file)
for pattern in self.exclude:
if fnmatch.fnmatchcase(path, pattern):
return True
return False
return any(fnmatch.fnmatchcase(path, pattern) for pattern in self.exclude)

def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict = None) -> Tuple[List, Dict]:
unpack_function, name, version = selected_unpacker # TODO Refactor register method to directly use four parameters instead of three
def _extract_files_from_file_using_specific_unpacker(
self, file_path: str, tmp_dir: str, selected_unpacker, meta_data: dict | None = None
) -> tuple[list, dict]:
unpack_function, name, version = (
selected_unpacker # TODO Refactor register method to directly use four parameters instead of three
)

if meta_data is None:
meta_data = {}
Expand All @@ -82,7 +89,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d
additional_meta = unpack_function(file_path, tmp_dir)
except Exception as error:
logging.debug(f'Unpacking of {file_path} failed: {error}', exc_info=True)
additional_meta = {'error': f'{type(error)}: {str(error)}'}
additional_meta = {'error': f'{type(error)}: {error!s}'}
if isinstance(additional_meta, dict):
meta_data.update(additional_meta)

Expand All @@ -102,7 +109,7 @@ def _extract_files_from_file_using_specific_unpacker(self, file_path: str, tmp_d
meta_data['number_of_excluded_files'] = excluded_count
return out, meta_data

def change_owner_back_to_me(self, directory: str = None, permissions: str = 'u+r'):
def change_owner_back_to_me(self, directory: str, permissions: str = 'u+r'):
with Popen(f'sudo chown -R {getuid()}:{getgid()} {directory}', shell=True, stdout=PIPE, stderr=PIPE) as pl:
pl.communicate()
self.grant_read_permission(directory, permissions)
Expand Down

0 comments on commit be010d5

Please sign in to comment.