diff --git a/fact_extractor/plugins/unpacking/tenvis_pk2/__init__.py b/fact_extractor/plugins/unpacking/tenvis_pk2/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/tenvis_pk2/code/__init__.py b/fact_extractor/plugins/unpacking/tenvis_pk2/code/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/tenvis_pk2/code/tenvis_pk2.py b/fact_extractor/plugins/unpacking/tenvis_pk2/code/tenvis_pk2.py new file mode 100644 index 00000000..60b01150 --- /dev/null +++ b/fact_extractor/plugins/unpacking/tenvis_pk2/code/tenvis_pk2.py @@ -0,0 +1,174 @@ +import struct +from datetime import datetime +from pathlib import Path +from typing import BinaryIO + +NAME = 'tenvis_pk2' +MIME_PATTERNS = ['firmware/pk2'] +VERSION = '0.1.0' + +PK2_MAGIC = b'PK2\x00' +XOR_KEY = [0xA1, 0x83, 0x24, 0x78, 0xB3, 0x41, 0x43, 0x56] +KEY_LEN = len(XOR_KEY) + + +class Pk2FileHeader: + """ + Header struct: + 0 | 4 | uint32 magic + 4 | 4 | uint32 camera type + 8 | 4 | uint32 creation time + 12 | 4 | char[4] version + 16 | 8 | char[8] reserved + 24 | 4 | uint32 section count + total size: 28 bytes + """ + + size = 28 + + def __init__(self, fp: BinaryIO): + file_hdr_data = fp.read(self.size) + self.magic = file_hdr_data[:4] # we parse the magic as bytes + self.camera_type, self.creation_time = struct.unpack(' 0: + chunk_size = min(self._BLOCK_SIZE, remaining) + data = self._fp.read(chunk_size) + if not data: + break + output = _decrypt(data) + out_fp.write(output) + remaining -= chunk_size + + def to_dict(self): + return {k: v for k, v in self.__dict__.items() if not k.startswith('_')} + + +class Pk2Cmd: + """ + 0 | x | char[x] command (XOR encrypted) + total size: x bytes (== section payload size) + """ + + def __init__(self, fp: BinaryIO, offset: int, size: int): + fp.seek(offset) + self.command = _decrypt(fp.read(size)).rstrip(b'\x00').decode('ascii', errors='replace') + self.size = size + + def __str__(self): + cmd = repr(self.command) + return f'CMD: {cmd}' + + +def _decrypt(data: bytes) -> bytearray: + output = bytearray() + for index, char in enumerate(data): + output.append(char ^ XOR_KEY[index % KEY_LEN]) + return output + + +def unpack_function(file_path: str, tmp_dir: str): + input_path, output_dir = Path(file_path), Path(tmp_dir) + meta = {'sections': []} + + if input_path.stat().st_size < Pk2FileHeader.size + Pk2SectionHeader.size: + meta['errors'] = ['file too small'] + return meta + + with input_path.open('rb') as fp: + file_header = Pk2FileHeader(fp) + offset = file_header.size + meta['header'] = file_header.to_dict() + for _ in range(file_header.section_count): + try: + section_header = Pk2SectionHeader(fp, offset) + section_meta = section_header.to_dict() + offset += section_header.size + if section_header.type == 'FILE': + pk2_file = Pk2File(fp, offset, section_header.payload_size) + pk2_file.save(output_dir) + offset += pk2_file.size + section_meta['file'] = pk2_file.to_dict() + elif section_header.type == 'CMD': + pk2_command = Pk2Cmd(fp, offset, section_header.payload_size) + offset += pk2_command.size + section_meta['command'] = pk2_command.command + else: + meta.setdefault('errors', []).append(f'unknown section type: {section_header.type}') + break + meta['sections'].append(section_meta) + except struct.error: + meta.setdefault('errors', []).append(f'error while parsing section at offset {offset}') + break + + return {'output': meta} + + +# ----> Do not edit below this line <---- +def setup(unpack_tool): + for item in MIME_PATTERNS: + unpack_tool.register_plugin(item, (unpack_function, NAME, VERSION)) diff --git a/fact_extractor/plugins/unpacking/tenvis_pk2/test/__init__.py b/fact_extractor/plugins/unpacking/tenvis_pk2/test/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/fact_extractor/plugins/unpacking/tenvis_pk2/test/data/broken.pk2 b/fact_extractor/plugins/unpacking/tenvis_pk2/test/data/broken.pk2 new file mode 100644 index 00000000..7bcae751 Binary files /dev/null and b/fact_extractor/plugins/unpacking/tenvis_pk2/test/data/broken.pk2 differ diff --git a/fact_extractor/plugins/unpacking/tenvis_pk2/test/data/test.pk2 b/fact_extractor/plugins/unpacking/tenvis_pk2/test/data/test.pk2 new file mode 100644 index 00000000..2bd7801c Binary files /dev/null and b/fact_extractor/plugins/unpacking/tenvis_pk2/test/data/test.pk2 differ diff --git a/fact_extractor/plugins/unpacking/tenvis_pk2/test/test_tenvis_pk2.py b/fact_extractor/plugins/unpacking/tenvis_pk2/test/test_tenvis_pk2.py new file mode 100644 index 00000000..9732f55f --- /dev/null +++ b/fact_extractor/plugins/unpacking/tenvis_pk2/test/test_tenvis_pk2.py @@ -0,0 +1,36 @@ +from pathlib import Path + +from test.unit.unpacker.test_unpacker import TestUnpackerBase + +TEST_DATA_DIR = Path(__file__).parent / 'data' + + +class TestTenvisPk2Unpacker(TestUnpackerBase): + def test_unpacker_selection_generic(self): + self.check_unpacker_selection('firmware/pk2', 'tenvis_pk2') + + def test_extraction_pk2(self): + in_file = TEST_DATA_DIR / 'test.pk2' + assert in_file.is_file() + files, meta = self.unpacker.extract_files_from_file(str(in_file), self.tmp_dir.name) + assert meta['plugin_used'] == 'tenvis_pk2' + assert len(files) == 2 + assert {Path(f).name for f in files} == {'bar', 'foo'}, 'not all files unpacked' + output_file = Path(sorted(files)[1]) + assert output_file.read_bytes() == b'foobar\n', 'files not decrypted correctly' + + assert 'sections' in meta['output'] + assert len(meta['output']['sections']) == 3 + sections_by_offset = {s['offset']: s for s in meta['output']['sections']} + assert set(sections_by_offset) == {28, 67, 111} + assert sections_by_offset[28]['type'] == 'CMD' + assert sections_by_offset[28]['command'] == 'echo "foobar"\n' + + def test_extraction_pk2_error(self): + in_file = TEST_DATA_DIR / 'broken.pk2' + assert in_file.is_file() + files, meta = self.unpacker.extract_files_from_file(str(in_file), self.tmp_dir.name) + assert meta['plugin_used'] == 'tenvis_pk2' + assert len(files) == 0 + assert 'errors' in meta['output'] + assert meta['output']['errors'] == ['error while parsing section at offset 91'] diff --git a/pyproject.toml b/pyproject.toml index 748a318a..9f2963f7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -53,7 +53,7 @@ ignore = [ fixable = ["ALL"] [tool.ruff.lint.per-file-ignores] -"test*.py" = ["ARG002"] +"test*.py" = ["ARG002", "PLR2004"] "conftest.py" = ["ARG002"] [tool.ruff.lint.isort]