Skip to content

Commit

Permalink
added unpacking plugin for tenvis pk2 container
Browse files Browse the repository at this point in the history
  • Loading branch information
jstucke committed Nov 15, 2024
1 parent f01521c commit a17810b
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 1 deletion.
Empty file.
Empty file.
174 changes: 174 additions & 0 deletions fact_extractor/plugins/unpacking/tenvis_pk2/code/tenvis_pk2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import struct
from datetime import datetime
from pathlib import Path
from typing import BinaryIO

NAME = 'tenvis_pk2'
MIME_PATTERNS = ['firmware/pk2']
VERSION = '0.1.0'

PK2_MAGIC = b'PK2\x00'
XOR_KEY = [0xA1, 0x83, 0x24, 0x78, 0xB3, 0x41, 0x43, 0x56]
KEY_LEN = len(XOR_KEY)


class Pk2FileHeader:
"""
Header struct:
0 | 4 | uint32 magic
4 | 4 | uint32 camera type
8 | 4 | uint32 creation time
12 | 4 | char[4] version
16 | 8 | char[8] reserved
24 | 4 | uint32 section count
total size: 28 bytes
"""

size = 28

def __init__(self, fp: BinaryIO):
file_hdr_data = fp.read(self.size)
self.magic = file_hdr_data[:4] # we parse the magic as bytes
self.camera_type, self.creation_time = struct.unpack('<II', file_hdr_data[4:12])
self.creation_time_readable = datetime.fromtimestamp(self.creation_time).isoformat()
self.version = file_hdr_data[12:16]
self.reserved = file_hdr_data[16:24]
self.section_count = struct.unpack('<I', file_hdr_data[24:28])[0]

def to_dict(self):
return {
'magic': self.magic.decode(errors='replace'),
'camera_type': self.camera_type,
'creation_time': self.creation_time_readable,
'version': self.version.rstrip(b'\x00').decode('ascii', errors='replace'),
'reserved': self.reserved.rstrip(b'\x00').decode('ascii', errors='replace'),
'section_count': self.section_count,
}


class Pk2SectionHeader:
"""
0 | 4 | uint32 section type
4 | 16 | char[16] hash
20 | 4 | uint32 payload size
total size: 24 bytes
"""

size = 24

def __init__(self, fp: BinaryIO, offset: int):
fp.seek(offset)
section_header_data = fp.read(self.size)
self.offset = offset
self.type = section_header_data[:4].rstrip(b'\x00').decode('ascii', errors='replace')
self.hash = section_header_data[4:20].hex()
self.payload_size = struct.unpack('<I', section_header_data[20:24])[0]

def to_dict(self):
return self.__dict__


class Pk2File:
"""
0 | 4 | uint32 filename size
4 | x | char[x] filename
4+x | 4 | uint32 data size
x+8 | y | ?? data (XOR encrypted)
total size: 4 + x + 4 + y bytes (== section payload size)
"""

_BLOCK_SIZE = 1024 # we read the file block-wise to save memory

def __init__(self, fp: BinaryIO, offset: int, size: int):
self._fp = fp
self.size = size
self._file_offset = offset
self._fp.seek(self._file_offset)
filename_size = struct.unpack('<I', self._fp.read(4)[:4])[0]
self.filename = self._fp.read(filename_size).rstrip(b'\x00').decode('ascii', errors='replace')
self.data_offset = self._file_offset + 4 + filename_size + 4
self.data_size = struct.unpack('<I', self._fp.read(4))[0]

def save(self, save_dir: Path):
output_path = save_dir / self.filename.lstrip('/')
output_path.parent.mkdir(exist_ok=True, parents=True)
self._fp.seek(self.data_offset)
remaining = self.data_size
with output_path.open('wb') as out_fp:
while remaining > 0:
chunk_size = min(self._BLOCK_SIZE, remaining)
data = self._fp.read(chunk_size)
if not data:
break
output = _decrypt(data)
out_fp.write(output)
remaining -= chunk_size

def to_dict(self):
return {k: v for k, v in self.__dict__.items() if not k.startswith('_')}


class Pk2Cmd:
"""
0 | x | char[x] command (XOR encrypted)
total size: x bytes (== section payload size)
"""

def __init__(self, fp: BinaryIO, offset: int, size: int):
fp.seek(offset)
self.command = _decrypt(fp.read(size)).rstrip(b'\x00').decode('ascii', errors='replace')
self.size = size

def __str__(self):
cmd = repr(self.command)
return f'CMD: {cmd}'


def _decrypt(data: bytes) -> bytearray:
output = bytearray()
for index, char in enumerate(data):
output.append(char ^ XOR_KEY[index % KEY_LEN])
return output


def unpack_function(file_path: str, tmp_dir: str):
input_path, output_dir = Path(file_path), Path(tmp_dir)
meta = {'sections': []}

if input_path.stat().st_size < Pk2FileHeader.size + Pk2SectionHeader.size:
meta['errors'] = ['file too small']
return meta

with input_path.open('rb') as fp:
file_header = Pk2FileHeader(fp)
offset = file_header.size
meta['header'] = file_header.to_dict()
for _ in range(file_header.section_count):
try:
section_header = Pk2SectionHeader(fp, offset)
section_meta = section_header.to_dict()
offset += section_header.size
if section_header.type == 'FILE':
pk2_file = Pk2File(fp, offset, section_header.payload_size)
pk2_file.save(output_dir)
offset += pk2_file.size
section_meta['file'] = pk2_file.to_dict()
elif section_header.type == 'CMD':
pk2_command = Pk2Cmd(fp, offset, section_header.payload_size)
offset += pk2_command.size
section_meta['command'] = pk2_command.command
else:
meta.setdefault('errors', []).append(f'unknown section type: {section_header.type}')
break
meta['sections'].append(section_meta)
except struct.error:
meta.setdefault('errors', []).append(f'error while parsing section at offset {offset}')
break

return {'output': meta}


# ----> Do not edit below this line <----
def setup(unpack_tool):
for item in MIME_PATTERNS:
unpack_tool.register_plugin(item, (unpack_function, NAME, VERSION))
Empty file.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from pathlib import Path

from test.unit.unpacker.test_unpacker import TestUnpackerBase

TEST_DATA_DIR = Path(__file__).parent / 'data'


class TestTenvisPk2Unpacker(TestUnpackerBase):
def test_unpacker_selection_generic(self):
self.check_unpacker_selection('firmware/pk2', 'tenvis_pk2')

def test_extraction_pk2(self):
in_file = TEST_DATA_DIR / 'test.pk2'
assert in_file.is_file()
files, meta = self.unpacker.extract_files_from_file(str(in_file), self.tmp_dir.name)
assert meta['plugin_used'] == 'tenvis_pk2'
assert len(files) == 2
assert {Path(f).name for f in files} == {'bar', 'foo'}, 'not all files unpacked'
output_file = Path(sorted(files)[1])
assert output_file.read_bytes() == b'foobar\n', 'files not decrypted correctly'

assert 'sections' in meta['output']
assert len(meta['output']['sections']) == 3
sections_by_offset = {s['offset']: s for s in meta['output']['sections']}
assert set(sections_by_offset) == {28, 67, 111}
assert sections_by_offset[28]['type'] == 'CMD'
assert sections_by_offset[28]['command'] == 'echo "foobar"\n'

def test_extraction_pk2_error(self):
in_file = TEST_DATA_DIR / 'broken.pk2'
assert in_file.is_file()
files, meta = self.unpacker.extract_files_from_file(str(in_file), self.tmp_dir.name)
assert meta['plugin_used'] == 'tenvis_pk2'
assert len(files) == 0
assert 'errors' in meta['output']
assert meta['output']['errors'] == ['error while parsing section at offset 91']
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ ignore = [
fixable = ["ALL"]

[tool.ruff.lint.per-file-ignores]
"test*.py" = ["ARG002"]
"test*.py" = ["ARG002", "PLR2004"]
"conftest.py" = ["ARG002"]

[tool.ruff.lint.isort]
Expand Down

0 comments on commit a17810b

Please sign in to comment.