Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if armed without full parse #4

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pyulog/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ class ULog(object):
"""

## constants ##
HEADER_SIZE = 16
HEADER_BYTES = b'\x55\x4c\x6f\x67\x01\x12\x35'
SYNC_BYTES = b'\x2F\x73\x13\x20\x25\x0C\xBB\x12'


# message types
MSG_TYPE_FORMAT = ord('F')
MSG_TYPE_DATA = ord('D')
Expand Down Expand Up @@ -263,7 +265,7 @@ def _write_file_header(self, file):
header_data.extend(self.HEADER_BYTES)
header_data.extend(struct.pack('B', self._file_version))
header_data.extend(struct.pack('<Q', self._start_timestamp))
if len(header_data) != 16:
if len(header_data) != self.HEADER_SIZE:
raise TypeError("Written header is too short")

file.write(header_data)
Expand Down Expand Up @@ -882,8 +884,8 @@ def _load_file(self, log_file, message_name_filter_list, parse_header_only=False
del self._file_handle

def _read_file_header(self):
header_data = self._file_handle.read(16)
if len(header_data) != 16:
header_data = self._file_handle.read(self.HEADER_SIZE)
if len(header_data) != self.HEADER_SIZE:
raise TypeError("Invalid file format (Header too short)")
if header_data[:7] != self.HEADER_BYTES:
raise TypeError("Invalid file format (Failed to parse header)")
Expand Down Expand Up @@ -925,7 +927,7 @@ def _read_file_definitions(self):
break # end of section
elif header.msg_type == self.MSG_TYPE_FLAG_BITS:
# make sure this is the first message in the log
if self._file_handle.tell() != 16 + 3 + header.msg_size:
if self._file_handle.tell() != self.HEADER_SIZE + 3 + header.msg_size:
print('Error: FLAGS_BITS message must be first message. Offset:',
self._file_handle.tell())
msg_flag_bits = self._MessageFlagBits(data, header)
Expand Down
163 changes: 163 additions & 0 deletions pyulog/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
#! /usr/bin/env python
"""
Check if a log file was armed
"""

import mmap
import struct
import io
import argparse
from typing import Optional, Tuple
from .core import ULog

ULOG_MESSAGE_HEADER_SIZE = 3
MAX_SEARCH_LENGTH = 200000


def find_field_offset(message_format: ULog.MessageFormat, target_field: str) -> int:
"""Find the offset of a field in a message format"""
offset = 0
for data_type, array_length, field_name in message_format.fields:
if field_name == "":
continue
if field_name == target_field:
return offset

array_length = int(array_length) if array_length else 1

try:
offset += ULog.get_field_size(data_type) * array_length
except ValueError as exc:
raise ValueError(f"Unknown data type: {data_type} in field {target_field}") from exc

raise ValueError(f"Field {target_field} not found in format string.")


def get_subscription_id(m_file: mmap.mmap, msg_name: str, max_search_len: int) -> Tuple[bytes, int]:
"""
Find subscription ID for the given message name
:param f: mmap object
:param msg_name: message name
:param max_search_len: Optional maximum length to search in the file
"""

msg_name_b = msg_name.encode()

m_file.seek(ULog.HEADER_SIZE)

while m_file.tell() < max_search_len:
header_and_id = m_file.read(ULOG_MESSAGE_HEADER_SIZE)
if len(header_and_id) < ULOG_MESSAGE_HEADER_SIZE:
# EOF
raise ValueError(f"Could not find subscription for {msg_name}")
msg_size, msg_type = struct.unpack("<HB", header_and_id)

if msg_type != ord('A'):
m_file.seek(msg_size, io.SEEK_CUR) # Skip the rest of the message
continue

msg_bytes = m_file.read(msg_size)
if len(msg_bytes) < msg_size or len(msg_bytes) < 3:
# EOF before reading the full message
raise ValueError(f"Could not find subscription for {msg_name}")
_, msg_id = struct.unpack("<BH", msg_bytes[:3])
if msg_bytes[3:] != msg_name_b:
# Some other message type, continue searching
continue

return msg_id

raise ValueError(f"Could not find subscription for {msg_name}")


def reverse_search_for_arm(
m_file: mmap.mmap,
vehicle_status_id: int,
arm_reason_offset: int) -> bool:
"""Check if the vehicle has been armed based on the latest arming reason"""

offset = len(m_file)
# Keep track of how far towards the beginning of the log we have searched
searched_to = len(m_file)
armed = None
while offset >= ULog.HEADER_SIZE:
offset = m_file.rfind(ULog.SYNC_BYTES, 0, offset)
if offset == -1:
# Perhaps no more sync points, start from the beginning of the log
offset = ULog.HEADER_SIZE
m_file.seek(offset)
else:
m_file.seek(offset + len(ULog.SYNC_BYTES))

while m_file.tell() < searched_to:
header_and_id = m_file.read(ULOG_MESSAGE_HEADER_SIZE + 2)
if len(header_and_id) < ULOG_MESSAGE_HEADER_SIZE + 2:
break # EOF
msg_size, msg_type, msg_id = struct.unpack("<HBH", header_and_id)
if msg_type != ord('D') or msg_id != vehicle_status_id:
m_file.seek(msg_size - 2, io.SEEK_CUR) # Skip the rest
continue

message_bytes = m_file.read(msg_size - 2)
if len(message_bytes) < msg_size - 2:
break # EOF

armed = message_bytes[arm_reason_offset] != 0
if armed:
# We know that the vehicle was armed, return early
return True
# Might have armed after this message, search rest of log
armed = False

searched_to = offset
if armed is not None:
# Searched to end, and found a valid arm status
return armed

offset -= 1

if armed is None:
raise ValueError("No vehicle_status messages found in log file.")

return armed


def check_if_log_was_armed(
ulog_path: str,
max_search_len: Optional[int] = MAX_SEARCH_LENGTH) -> bool:
"""
This method checks if the vehicle was armed at any point during the log
file without parsing the entire file. This is done by searching for the
latest arming reason in the vehicle_status messages.
"""
message_name = "vehicle_status"
field_name = "latest_arming_reason"

ulog = ULog(ulog_path, parse_header_only=True)

message_format = ulog.message_formats.get(message_name)
if message_format is None:
raise ValueError(f"Could not find format definition for {message_name}")

with open(ulog_path, 'rb') as file:
# Memory map the file for faster access
with mmap.mmap(file.fileno(), 0, access=mmap.ACCESS_READ) as f_mem:
if max_search_len is None:
max_search_len = len(f_mem)
else:
max_search_len = min(len(f_mem), max_search_len)

vehicle_status_id = get_subscription_id(f_mem, message_name, max_search_len)
arm_reason_offset = find_field_offset(message_format, field_name)

return reverse_search_for_arm(f_mem, vehicle_status_id, arm_reason_offset)


def main():
"""Command line interface"""
parser = argparse.ArgumentParser(description='Check if vehicle was armed during the log file')
parser.add_argument('filename', metavar='file.ulg', help='ULog input file')
args = parser.parse_args()
ulog_file_name = args.filename
log_was_armed = check_if_log_was_armed(ulog_file_name)
print(f"Log was armed: {log_was_armed}")
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"numpy >= 1.25; python_version >= '3.9'",
],
tests_require=['pytest', 'ddt'],
entry_points = {
entry_points={
'console_scripts': [
'ulog_extract_gps_dump=pyulog.extract_gps_dump:main',
'ulog_info=pyulog.info:main',
Expand All @@ -62,6 +62,7 @@
'ulog2kml=pyulog.ulog2kml:main',
'ulog2rosbag=pyulog.ulog2rosbag:main',
'ulog_migratedb=pyulog.migrate_db:main',
'ulog_check_armed=pyulog.utils:main',
],
},
packages=find_packages(),
Expand Down
Binary file added test/sample_armed_flight.ulg
Binary file not shown.
Binary file added test/sample_not_armed_flight.ulg
Binary file not shown.
36 changes: 36 additions & 0 deletions test/test_log_was_armed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
'''
Tests the ULog class
'''

import os
import inspect
import unittest

from ddt import ddt, data
from pyulog.utils import check_if_log_was_armed

TEST_PATH = os.path.dirname(os.path.abspath(
inspect.getfile(inspect.currentframe())))


@ddt
class TestLogWasArmed(unittest.TestCase):
'''
Tests the check_if_log_was_armed function
'''

@data('sample_armed_flight')
def test_log_was_armed(self, base_name):
'''
Test that the armed log is marked as armed.
'''
ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg')
assert check_if_log_was_armed(ulog_file_name)

@data('sample_not_armed_flight')
def test_log_was_not_armed(self, base_name):
'''
Test that the non-armed log is marked as not armed.
'''
ulog_file_name = os.path.join(TEST_PATH, base_name + '.ulg')
assert not check_if_log_was_armed(ulog_file_name)
Loading