From 9dea4423ef36155c1c1368073c66c6205f7a5643 Mon Sep 17 00:00:00 2001 From: Robin Mueller Date: Sat, 30 Nov 2024 13:51:57 +0100 Subject: [PATCH] raw byte APIs accept both bytes and bytearray --- spacepackets/ccsds/spacepacket.py | 6 +++--- spacepackets/cfdp/lv.py | 4 ++-- spacepackets/cfdp/pdu/ack.py | 2 +- spacepackets/cfdp/pdu/eof.py | 6 ++++-- spacepackets/cfdp/pdu/file_data.py | 6 +++--- spacepackets/cfdp/pdu/file_directive.py | 6 +++--- spacepackets/cfdp/pdu/finished.py | 6 +++--- spacepackets/cfdp/pdu/header.py | 10 ++++++---- spacepackets/cfdp/pdu/helper.py | 12 +++++++----- spacepackets/cfdp/pdu/keep_alive.py | 2 +- spacepackets/cfdp/pdu/metadata.py | 4 ++-- spacepackets/cfdp/pdu/nak.py | 2 +- spacepackets/cfdp/pdu/prompt.py | 4 ++-- spacepackets/cfdp/tlv/msg_to_user.py | 12 ++++++------ spacepackets/cfdp/tlv/tlv.py | 14 +++++++------- spacepackets/ecss/__init__.py | 2 +- spacepackets/ecss/fields.py | 6 ++++-- spacepackets/ecss/pus_17_test.py | 2 +- spacepackets/ecss/pus_1_verification.py | 8 ++++---- spacepackets/ecss/req_id.py | 6 ++++-- spacepackets/ecss/tc.py | 8 ++++---- spacepackets/ecss/tm.py | 18 ++++++++---------- spacepackets/uslp/frame.py | 8 ++++---- spacepackets/uslp/header.py | 10 ++++++---- spacepackets/util.py | 14 +++++++------- tests/cfdp/pdus/test_directive.py | 2 +- tests/ecss/test_pus_tm.py | 5 +++-- 27 files changed, 98 insertions(+), 87 deletions(-) diff --git a/spacepackets/ccsds/spacepacket.py b/spacepackets/ccsds/spacepacket.py index 4b5b05b..fee8745 100644 --- a/spacepackets/ccsds/spacepacket.py +++ b/spacepackets/ccsds/spacepacket.py @@ -394,7 +394,7 @@ def packet_len(self) -> int: return CCSDS_HEADER_LEN + self.data_len + 1 @classmethod - def unpack(cls, data: bytes) -> SpacePacketHeader: + def unpack(cls, data: bytes | bytearray) -> SpacePacketHeader: """Unpack a raw space packet into the space packet header instance. :raise ValueError: Raw packet length invalid @@ -445,8 +445,8 @@ class SpacePacket: def __init__( self, sp_header: SpacePacketHeader, - sec_header: bytes | None, - user_data: bytes | None, + sec_header: bytes | bytearray | None, + user_data: bytes | bytearray | None, ): self.sp_header = sp_header self.sec_header = sec_header diff --git a/spacepackets/cfdp/lv.py b/spacepackets/cfdp/lv.py index f5105a4..07d380a 100644 --- a/spacepackets/cfdp/lv.py +++ b/spacepackets/cfdp/lv.py @@ -7,7 +7,7 @@ class CfdpLv: - def __init__(self, value: bytes): + def __init__(self, value: bytes | bytearray): """This class encapsulates CFDP Length-Value (LV) fields. Raises @@ -42,7 +42,7 @@ def pack(self) -> bytearray: return packet @classmethod - def unpack(cls, raw_bytes: bytes) -> CfdpLv: + def unpack(cls, raw_bytes: bytes | bytearray) -> CfdpLv: """Parses LV field at the start of the given bytearray :raise ValueError: Invalid length found diff --git a/spacepackets/cfdp/pdu/ack.py b/spacepackets/cfdp/pdu/ack.py index 232f631..c5dd9e1 100644 --- a/spacepackets/cfdp/pdu/ack.py +++ b/spacepackets/cfdp/pdu/ack.py @@ -121,7 +121,7 @@ def __repr__(self): ) @classmethod - def unpack(cls, data: bytes) -> AckPdu: + def unpack(cls, data: bytes | bytearray) -> AckPdu: """Generate an object instance from raw data. Care should be taken to check whether the raw bytestream really contains an ACK PDU. diff --git a/spacepackets/cfdp/pdu/eof.py b/spacepackets/cfdp/pdu/eof.py index f2b158b..4cfa146 100644 --- a/spacepackets/cfdp/pdu/eof.py +++ b/spacepackets/cfdp/pdu/eof.py @@ -122,7 +122,7 @@ def pack(self) -> bytearray: return eof_pdu @classmethod - def unpack(cls, data: bytes) -> EofPdu: + def unpack(cls, data: bytes | bytearray) -> EofPdu: """Generate an object instance from raw data. Care should be taken to check whether the raw bytestream really contains an EOF PDU. @@ -156,7 +156,9 @@ def unpack(cls, data: bytes) -> EofPdu: eof_pdu.fault_location = EntityIdTlv.unpack(data=data[current_idx:]) return eof_pdu - def __eq__(self, other: EofPdu): + def __eq__(self, other: object) -> bool: + if not isinstance(other, EofPdu): + return False return ( self.pdu_file_directive == other.pdu_file_directive and self.condition_code == other.condition_code diff --git a/spacepackets/cfdp/pdu/file_data.py b/spacepackets/cfdp/pdu/file_data.py index 8b9713a..c249919 100644 --- a/spacepackets/cfdp/pdu/file_data.py +++ b/spacepackets/cfdp/pdu/file_data.py @@ -215,7 +215,7 @@ def pack(self) -> bytearray: return file_data_pdu @classmethod - def unpack(cls, data: bytes) -> FileDataPdu: + def unpack(cls, data: bytes | bytearray) -> FileDataPdu: """Generate an object instance from raw data. Care should be taken to check whether the raw bytestream really contains a File Data PDU. @@ -242,7 +242,7 @@ def unpack(cls, data: bytes) -> FileDataPdu: metadata = data[current_idx : current_idx + segment_metadata_len] current_idx += segment_metadata_len file_data_packet.segment_metadata = SegmentMetadata( - record_cont_state=rec_cont_state, metadata=metadata + record_cont_state=rec_cont_state, metadata=bytes(metadata) ) if not file_data_packet.pdu_header.large_file_flag_set: struct_arg_tuple = ("!I", 4) @@ -259,7 +259,7 @@ def unpack(cls, data: bytes) -> FileDataPdu: if file_data_packet.pdu_header.crc_flag == CrcFlag.WITH_CRC: data = data[:-2] if current_idx < len(data): - file_data_packet._params.file_data = data[current_idx:] + file_data_packet._params.file_data = bytes(data[current_idx:]) return file_data_packet @property diff --git a/spacepackets/cfdp/pdu/file_directive.py b/spacepackets/cfdp/pdu/file_directive.py index 528418e..ad3b52c 100644 --- a/spacepackets/cfdp/pdu/file_directive.py +++ b/spacepackets/cfdp/pdu/file_directive.py @@ -133,7 +133,7 @@ def __init__( ) self._directive_type = directive_code - def verify_length_and_checksum(self, data: bytes) -> None: + def verify_length_and_checksum(self, data: bytes | bytearray) -> None: self.pdu_header.verify_length_and_checksum(data) @property @@ -172,7 +172,7 @@ def pack(self) -> bytearray: return data @classmethod - def unpack(cls, raw_packet: bytes) -> FileDirectivePduBase: + def unpack(cls, raw_packet: bytes | bytearray) -> FileDirectivePduBase: """Unpack a raw bytearray into the File Directive PDU object representation. :param raw_packet: Unpack PDU file directive base @@ -195,7 +195,7 @@ def _verify_file_len(self, file_size: int) -> None: if self.pdu_header.file_flag == LargeFileFlag.NORMAL and file_size > pow(2, 32): raise ValueError(f"File size {file_size} larger than 32 bit field") - def parse_fss_field(self, raw_packet: bytes, current_idx: int) -> tuple[int, int]: + def parse_fss_field(self, raw_packet: bytes | bytearray, current_idx: int) -> tuple[int, int]: """Parse the FSS field, which has different size depending on the large file flag being set or not. Returns the current index incremented and the parsed file size. diff --git a/spacepackets/cfdp/pdu/finished.py b/spacepackets/cfdp/pdu/finished.py index f318c67..f32084e 100644 --- a/spacepackets/cfdp/pdu/finished.py +++ b/spacepackets/cfdp/pdu/finished.py @@ -205,7 +205,7 @@ def pack(self) -> bytearray: return packet @classmethod - def unpack(cls, data: bytes) -> FinishedPdu: + def unpack(cls, data: bytes | bytearray) -> FinishedPdu: """Generate an object instance from raw data. Care should be taken to check whether the raw bytestream really contains a Finished PDU. @@ -241,7 +241,7 @@ def unpack(cls, data: bytes) -> FinishedPdu: finished_pdu._unpack_tlvs(rest_of_packet=data[current_idx:end_of_optional_tlvs_idx]) return finished_pdu - def _unpack_tlvs(self, rest_of_packet: bytes) -> int: + def _unpack_tlvs(self, rest_of_packet: bytes | bytearray) -> int: current_idx = 0 fs_responses_list = [] fault_loc = None @@ -266,7 +266,7 @@ def _unpack_tlvs(self, rest_of_packet: bytes) -> int: self.fault_location = fault_loc return current_idx - def __eq__(self, other: object): + def __eq__(self, other: object) -> bool: if not isinstance(other, FinishedPdu): return False return self._params == other._params and self.pdu_file_directive == other.pdu_file_directive diff --git a/spacepackets/cfdp/pdu/header.py b/spacepackets/cfdp/pdu/header.py index 8a5dd3e..5701b90 100644 --- a/spacepackets/cfdp/pdu/header.py +++ b/spacepackets/cfdp/pdu/header.py @@ -120,7 +120,9 @@ def packet_len(self) -> int: def large_file_flag_set(self) -> bool: return self.file_flag == LargeFileFlag.LARGE - def __eq__(self, other: AbstractPduBase): + def __eq__(self, other: object) -> bool: + if not isinstance(other, AbstractPduBase): + return False return ( self.pdu_type == other.pdu_type and self.file_flag == other.file_flag @@ -131,7 +133,7 @@ def __eq__(self, other: AbstractPduBase): ) @staticmethod - def header_len_from_raw(data: bytes) -> int: + def header_len_from_raw(data: bytes | bytearray) -> int: entity_id_len = ((data[3] >> 4) & 0b111) + 1 seq_num_len = (data[3] & 0b111) + 1 return AbstractPduBase.FIXED_LENGTH + 2 * entity_id_len + seq_num_len @@ -306,7 +308,7 @@ def __empty(cls) -> PduHeader: ) @classmethod - def unpack(cls, data: bytes) -> PduHeader: + def unpack(cls, data: bytes | bytearray) -> PduHeader: """Unpack a raw bytearray into the PDU header object representation. :param data: @@ -352,7 +354,7 @@ def unpack(cls, data: bytes) -> PduHeader: pdu_header.set_entity_ids(source_entity_id=source_entity_id, dest_entity_id=dest_entity_id) return pdu_header - def verify_length_and_checksum(self, data: bytes) -> int: + def verify_length_and_checksum(self, data: bytes | bytearray) -> int: if len(data) < self.packet_len: raise BytesTooShortError(self.packet_len, len(data)) if ( diff --git a/spacepackets/cfdp/pdu/helper.py b/spacepackets/cfdp/pdu/helper.py index 3029c0b..c103dd3 100644 --- a/spacepackets/cfdp/pdu/helper.py +++ b/spacepackets/cfdp/pdu/helper.py @@ -126,7 +126,9 @@ class PduFactory: """Helper class to generate PDUs and retrieve PDU information from a raw bytestream""" @staticmethod - def from_raw(data: bytes) -> GenericPduPacket | None: # noqa: PLR0911 + def from_raw(data: bytes | bytearray) -> GenericPduPacket | None: # noqa: PLR0911 + if len(data) == 0: + return None if not PduFactory.is_file_directive(data): return FileDataPdu.unpack(data) directive = PduFactory.pdu_directive_type(data) @@ -147,19 +149,19 @@ def from_raw(data: bytes) -> GenericPduPacket | None: # noqa: PLR0911 return None @staticmethod - def from_raw_to_holder(data: bytes) -> PduHolder: + def from_raw_to_holder(data: bytes | bytearray) -> PduHolder: return PduHolder(PduFactory.from_raw(data)) @staticmethod - def pdu_type(data: bytes) -> PduType: + def pdu_type(data: bytes | bytearray) -> PduType: return PduType((data[0] >> 4) & 0x01) @staticmethod - def is_file_directive(data: bytes) -> bool: + def is_file_directive(data: bytes | bytearray) -> bool: return PduFactory.pdu_type(data) == PduType.FILE_DIRECTIVE @staticmethod - def pdu_directive_type(data: bytes) -> DirectiveType | None: + def pdu_directive_type(data: bytes | bytearray) -> DirectiveType | None: """Retrieve the PDU directive type from a raw bytestream. :raises ValueError: Invalid directive type. diff --git a/spacepackets/cfdp/pdu/keep_alive.py b/spacepackets/cfdp/pdu/keep_alive.py index ba95ac1..ef904c2 100644 --- a/spacepackets/cfdp/pdu/keep_alive.py +++ b/spacepackets/cfdp/pdu/keep_alive.py @@ -75,7 +75,7 @@ def pack(self) -> bytearray: return keep_alive_packet @classmethod - def unpack(cls, data: bytes) -> KeepAlivePdu: + def unpack(cls, data: bytes | bytearray) -> KeepAlivePdu: """Generate an object instance from raw data. Care should be taken to check whether the raw bytestream really contains a Keep Alive PDU. diff --git a/spacepackets/cfdp/pdu/metadata.py b/spacepackets/cfdp/pdu/metadata.py index f10a26c..2c655bd 100644 --- a/spacepackets/cfdp/pdu/metadata.py +++ b/spacepackets/cfdp/pdu/metadata.py @@ -197,7 +197,7 @@ def pack(self) -> bytearray: return packet @classmethod - def unpack(cls, data: bytes) -> MetadataPdu: + def unpack(cls, data: bytes | bytearray) -> MetadataPdu: """Generate an object instance from raw data. Care should be taken to check whether the raw bytestream really contains a Metadata PDU. @@ -244,7 +244,7 @@ def unpack(cls, data: bytes) -> MetadataPdu: metadata_pdu._parse_options(raw_packet=data, start_idx=current_idx) return metadata_pdu - def _parse_options(self, raw_packet: bytes, start_idx: int) -> None: + def _parse_options(self, raw_packet: bytes | bytearray, start_idx: int) -> None: self._options = [] current_idx = start_idx while True: diff --git a/spacepackets/cfdp/pdu/nak.py b/spacepackets/cfdp/pdu/nak.py index 9be30a3..314b8a2 100644 --- a/spacepackets/cfdp/pdu/nak.py +++ b/spacepackets/cfdp/pdu/nak.py @@ -227,7 +227,7 @@ def pack(self) -> bytearray: return nak_pdu @classmethod - def unpack(cls, data: bytes) -> NakPdu: + def unpack(cls, data: bytes | bytearray) -> NakPdu: """Generate an object instance from raw data. The user should take care to check whether the raw bytestream really contains a NAK PDU. diff --git a/spacepackets/cfdp/pdu/prompt.py b/spacepackets/cfdp/pdu/prompt.py index cfa9101..6630f92 100644 --- a/spacepackets/cfdp/pdu/prompt.py +++ b/spacepackets/cfdp/pdu/prompt.py @@ -60,7 +60,7 @@ def __repr__(self): ) @classmethod - def unpack(cls, data: bytes) -> PromptPdu: + def unpack(cls, data: bytes | bytearray) -> PromptPdu: """Generate an object instance from raw data. Care should be taken to check whether the raw bytestream really contains a Prompt PDU. @@ -83,7 +83,7 @@ def unpack(cls, data: bytes) -> PromptPdu: prompt_pdu.response_required = ResponseRequired((data[current_idx] & 0x80) >> 7) return prompt_pdu - def __eq__(self, other: object): + def __eq__(self, other: object) -> bool: if not isinstance(other, PromptPdu): return False return ( diff --git a/spacepackets/cfdp/tlv/msg_to_user.py b/spacepackets/cfdp/tlv/msg_to_user.py index 0ee9f02..e676c10 100644 --- a/spacepackets/cfdp/tlv/msg_to_user.py +++ b/spacepackets/cfdp/tlv/msg_to_user.py @@ -71,7 +71,7 @@ def __empty(cls) -> MessageToUserTlv: return cls(b"") @classmethod - def unpack(cls, data: bytes) -> MessageToUserTlv: + def unpack(cls, data: bytes | bytearray) -> MessageToUserTlv: msg_to_user_tlv = cls.__empty() msg_to_user_tlv.tlv = CfdpTlv.unpack(data) msg_to_user_tlv.check_type(MessageToUserTlv.TLV_TYPE) @@ -98,7 +98,7 @@ class ReservedCfdpMessage(AbstractTlvBase): conversion. """ - def __init__(self, msg_type: int, value: bytes): + def __init__(self, msg_type: int, value: bytes | bytearray): assert msg_type < pow(2, 8) - 1 full_value = bytearray(b"cfdp") full_value.append(msg_type) @@ -217,7 +217,7 @@ def get_proxy_closure_requested(self) -> bool | None: or self.get_cfdp_proxy_message_type() != ProxyMessageType.CLOSURE_REQUEST ): return None - return self.value[5] & 0b1 + return bool(self.value[5] & 0b1) def get_proxy_transmission_mode(self) -> TransmissionMode | None: if ( @@ -270,7 +270,7 @@ def get_dir_listing_options(self) -> DirListingOptions | None: raise ValueError( f"value with length {len(self.value)} too small for dir listing options." ) - return DirListingOptions((self.value[5] >> 1) & 0b1, self.value[5] & 0b1) + return DirListingOptions(bool((self.value[5] >> 1) & 0b1), bool(self.value[5] & 0b1)) @dataclasses.dataclass @@ -284,7 +284,7 @@ def source_file_as_str(self) -> str: return self.source_file_name.value.decode() @property - def source_file_as_path(self) -> str: + def source_file_as_path(self) -> Path: return Path(self.source_file_as_str) @property @@ -292,7 +292,7 @@ def dest_file_as_str(self) -> str: return self.dest_file_name.value.decode() @property - def dest_file_as_path(self) -> str: + def dest_file_as_path(self) -> Path: return Path(self.dest_file_as_str) diff --git a/spacepackets/cfdp/tlv/tlv.py b/spacepackets/cfdp/tlv/tlv.py index 951aa45..2afe382 100644 --- a/spacepackets/cfdp/tlv/tlv.py +++ b/spacepackets/cfdp/tlv/tlv.py @@ -55,7 +55,7 @@ class CfdpTlv(AbstractTlvBase): MINIMAL_LEN = 2 - def __init__(self, tlv_type: TlvType, value: bytes): + def __init__(self, tlv_type: TlvType, value: bytes | bytearray): """Constructor for TLV field. Raises @@ -80,7 +80,7 @@ def tlv_type(self, tlv_type: TlvType) -> None: @property def value(self) -> bytes: - return self._value + return bytes(self._value) def pack(self) -> bytearray: tlv_data = bytearray() @@ -90,7 +90,7 @@ def pack(self) -> bytearray: return tlv_data @classmethod - def unpack(cls, data: bytes) -> CfdpTlv: + def unpack(cls, data: bytes | bytearray) -> CfdpTlv: """Parses LV field at the start of the given bytearray :param data: @@ -284,7 +284,7 @@ def _check_raw_tlv_field(first_byte: int, expected: TlvType) -> None: @staticmethod def _common_unpacker( - raw_bytes: bytes, + raw_bytes: bytes | bytearray, ) -> tuple[FilestoreActionCode, str, int, int, str | None]: """Does only unpack common fields, does not unpack the filestore message of a Filestore Response package @@ -456,7 +456,7 @@ def _build_tlv(self) -> CfdpTlv: return CfdpTlv(tlv_type=TlvType.FILESTORE_RESPONSE, value=tlv_value) @classmethod - def unpack(cls, data: bytes) -> FileStoreResponseTlv: + def unpack(cls, data: bytes | bytearray) -> FileStoreResponseTlv: cls._check_raw_tlv_field(data[0], FileStoreResponseTlv.TLV_TYPE) filestore_reply = cls.__empty() cls._set_fields(filestore_reply, data[2:]) @@ -471,7 +471,7 @@ def from_tlv(cls, cfdp_tlv: CfdpTlv) -> FileStoreResponseTlv: return fs_response @classmethod - def _set_fields(cls, instance: FileStoreResponseTlv, data: bytes) -> None: + def _set_fields(cls, instance: FileStoreResponseTlv, data: bytes | bytearray) -> None: action_code, first_name, status_code, idx, second_name = cls._common_unpacker( raw_bytes=data ) @@ -519,7 +519,7 @@ def __empty(cls) -> EntityIdTlv: return cls(entity_id=b"") @classmethod - def unpack(cls, data: bytes) -> EntityIdTlv: + def unpack(cls, data: bytes | bytearray) -> EntityIdTlv: entity_id_tlv = cls.__empty() entity_id_tlv.tlv = CfdpTlv.unpack(data=data) entity_id_tlv.check_type(tlv_type=TlvType.ENTITY_ID) diff --git a/spacepackets/ecss/__init__.py b/spacepackets/ecss/__init__.py index 1775616..8244812 100644 --- a/spacepackets/ecss/__init__.py +++ b/spacepackets/ecss/__init__.py @@ -40,7 +40,7 @@ ] -def check_pus_crc(tc_packet: bytes) -> bool: +def check_pus_crc(tc_packet: bytes | bytearray) -> bool: """Checks the CRC of a given raw PUS packet. It is expected that the passed packet is the exact raw PUS packet. Both TC and TM packets can be passed to this function because both packet formats have a CCITT-CRC16 at the last two bytes as specified in the PUS standard. diff --git a/spacepackets/ecss/fields.py b/spacepackets/ecss/fields.py index 8125f8b..7ecec0e 100644 --- a/spacepackets/ecss/fields.py +++ b/spacepackets/ecss/fields.py @@ -100,7 +100,7 @@ def len(self) -> int: return self.check_pfc(self.pfc) @classmethod - def unpack(cls, data: bytes, pfc: int) -> PacketFieldEnum: + def unpack(cls, data: bytes | bytearray, pfc: int) -> PacketFieldEnum: """Construct from a raw bytestream. :raises BytesTooShortError: Raw bytestream too short. @@ -129,7 +129,9 @@ def check_pfc(pfc: int) -> int: def __repr__(self): return f"{self.__class__.__name__}(pfc={self.pfc!r}, val={self.val!r})" - def __eq__(self, other: PacketFieldEnum): + def __eq__(self, other: object) -> bool: + if not isinstance(other, PacketFieldEnum): + return False return self.pfc == other.pfc and self.val == other.val diff --git a/spacepackets/ecss/pus_17_test.py b/spacepackets/ecss/pus_17_test.py index 756d538..c209280 100644 --- a/spacepackets/ecss/pus_17_test.py +++ b/spacepackets/ecss/pus_17_test.py @@ -80,7 +80,7 @@ def __empty(cls) -> Service17Tm: return cls(apid=0, subservice=0, timestamp=b"") @classmethod - def unpack(cls, data: bytes, timestamp_len: int) -> Service17Tm: + def unpack(cls, data: bytes | bytearray, timestamp_len: int) -> Service17Tm: """ :raises BytesTooShortError: Passed bytestream too short. diff --git a/spacepackets/ecss/pus_1_verification.py b/spacepackets/ecss/pus_1_verification.py index b0d5b78..7fccb7a 100644 --- a/spacepackets/ecss/pus_1_verification.py +++ b/spacepackets/ecss/pus_1_verification.py @@ -51,14 +51,14 @@ def len(self) -> int: @classmethod def unpack( - cls, data: bytes, num_bytes_err_code: int, num_bytes_data: int | None = None + cls, data: bytes | bytearray, num_bytes_err_code: int, num_bytes_data: int | None = None ) -> FailureNotice: pfc = num_bytes_err_code * 8 if num_bytes_data is None: num_bytes_data = len(data) - num_bytes_err_code return cls( code=PacketFieldEnum.unpack(data, pfc), - data=data[num_bytes_err_code : num_bytes_err_code + num_bytes_data], + data=bytes(data[num_bytes_err_code : num_bytes_err_code + num_bytes_data]), ) def __repr__(self): @@ -125,7 +125,7 @@ def __init__( self, apid: int, subservice: Subservice, - timestamp: bytes, + timestamp: bytes | bytearray, verif_params: VerificationParams | None = None, seq_count: int = 0, packet_version: int = 0b000, @@ -148,7 +148,7 @@ def __init__( ) if verif_params is not None: verif_params.verify_against_subservice(subservice) - self.pus_tm.tm_data = verif_params.pack() + self.pus_tm.tm_data = bytes(verif_params.pack()) def pack(self) -> bytearray: return self.pus_tm.pack() diff --git a/spacepackets/ecss/req_id.py b/spacepackets/ecss/req_id.py index 94d891f..fcd3e32 100644 --- a/spacepackets/ecss/req_id.py +++ b/spacepackets/ecss/req_id.py @@ -36,7 +36,7 @@ def empty(cls) -> RequestId: return cls(PacketId.empty(), PacketSeqCtrl.empty()) @classmethod - def unpack(cls, tm_data: bytes) -> RequestId: + def unpack(cls, tm_data: bytes | bytearray) -> RequestId: if len(tm_data) < 4: raise BytesTooShortError(4, len(tm_data)) packet_id_version_raw = struct.unpack("!H", tm_data[0:2])[0] @@ -79,7 +79,9 @@ def __repr__(self): def __str__(self): return f"Request ID: [{self.tc_packet_id}, {self.tc_psc}]" - def __eq__(self, other: RequestId): + def __eq__(self, other: object) -> bool: + if not isinstance(other, RequestId): + return False return self.as_u32() == other.as_u32() def __hash__(self): diff --git a/spacepackets/ecss/tc.py b/spacepackets/ecss/tc.py index bea17e1..50d2d3f 100644 --- a/spacepackets/ecss/tc.py +++ b/spacepackets/ecss/tc.py @@ -57,7 +57,7 @@ def pack(self) -> bytearray: return header_raw @classmethod - def unpack(cls, data: bytes) -> PusTcDataFieldHeader: + def unpack(cls, data: bytes | bytearray) -> PusTcDataFieldHeader: """Unpack a TC data field header. :param data: Start of raw data belonging to the TC data field header @@ -122,7 +122,7 @@ def __init__( service: int, subservice: int, apid: int = 0, - app_data: bytes = b"", + app_data: bytes | bytearray = b"", seq_count: int = 0, source_id: int = 0, ack_flags: int = 0b1111, @@ -268,7 +268,7 @@ def pack(self, recalc_crc: bool = True) -> bytearray: return packed_data @classmethod - def unpack(cls, data: bytes) -> PusTc: + def unpack(cls, data: bytes | bytearray) -> PusTc: """Create an instance from a raw bytestream. :raises BytesTooShortError: Passed bytestream too short. @@ -283,7 +283,7 @@ def unpack(cls, data: bytes) -> PusTc: if len(data) < expected_packet_len: raise BytesTooShortError(expected_packet_len, len(data)) tc_unpacked._app_data = data[header_len : expected_packet_len - 2] - tc_unpacked._crc16 = data[expected_packet_len - 2 : expected_packet_len] + tc_unpacked._crc16 = bytes(data[expected_packet_len - 2 : expected_packet_len]) if CRC16_CCITT_FUNC(data[:expected_packet_len]) != 0: raise InvalidTcCrc16Error(tc_unpacked) return tc_unpacked diff --git a/spacepackets/ecss/tm.py b/spacepackets/ecss/tm.py index b0c0fc6..9134c82 100644 --- a/spacepackets/ecss/tm.py +++ b/spacepackets/ecss/tm.py @@ -77,7 +77,7 @@ def __init__( self, service: int, subservice: int, - timestamp: bytes, + timestamp: bytes | bytearray, message_counter: int, dest_id: int = 0, spacecraft_time_ref: int = 0, @@ -127,7 +127,7 @@ def pack(self) -> bytearray: return secondary_header @classmethod - def unpack(cls, data: bytes, timestamp_len: int) -> PusTmSecondaryHeader: + def unpack(cls, data: bytes | bytearray, timestamp_len: int) -> PusTmSecondaryHeader: """Unpack the PUS TM secondary header from the raw packet starting at the header index. :param data: Raw data. Please note that the passed buffer should start where the actual @@ -224,8 +224,8 @@ def __init__( self, service: int, subservice: int, - timestamp: bytes, - source_data: bytes = b"", + timestamp: bytes | bytearray, + source_data: bytes | bytearray = b"", apid: int = 0, seq_count: int = 0, message_counter: int = 0, @@ -287,7 +287,7 @@ def calc_crc(self) -> None: self._crc16 = struct.pack("!H", crc.crcValue) @classmethod - def unpack(cls, data: bytes, timestamp_len: int) -> PusTm: + def unpack(cls, data: bytes | bytearray, timestamp_len: int) -> PusTm: """Attempts to construct a generic PusTelemetry class given a raw bytearray. :param data: Raw bytes containing the PUS telemetry packet. @@ -297,8 +297,6 @@ def unpack(cls, data: bytes, timestamp_len: int) -> PusTm: :raises ValueError: Unsupported PUS version. :raises InvalidTmCrc16Error: Invalid CRC16. """ - if data is None: - raise ValueError("byte stream invalid") pus_tm = cls.empty() pus_tm.space_packet_header = SpacePacketHeader.unpack(data=data) expected_packet_len = get_total_space_packet_len_from_len_field( @@ -316,7 +314,7 @@ def unpack(cls, data: bytes, timestamp_len: int) -> PusTm: pus_tm.pus_tm_sec_header.header_size + SPACE_PACKET_HEADER_SIZE : expected_packet_len - 2 ] - pus_tm._crc16 = data[expected_packet_len - 2 : expected_packet_len] + pus_tm._crc16 = bytes(data[expected_packet_len - 2 : expected_packet_len]) # CRC16-CCITT checksum if CRC16_CCITT_FUNC(data[:expected_packet_len]) != 0: raise InvalidTmCrc16Error(pus_tm) @@ -394,7 +392,7 @@ def ccsds_version(self) -> int: @property def timestamp(self) -> bytes: - return self.pus_tm_sec_header.timestamp + return bytes(self.pus_tm_sec_header.timestamp) @property def service(self) -> int: @@ -419,7 +417,7 @@ def tm_data(self) -> bytes: """ :return: TM source data (raw) """ - return self._source_data + return bytes(self._source_data) @tm_data.setter def tm_data(self, data: bytes) -> None: diff --git a/spacepackets/uslp/frame.py b/spacepackets/uslp/frame.py index 6850862..f11340f 100644 --- a/spacepackets/uslp/frame.py +++ b/spacepackets/uslp/frame.py @@ -164,7 +164,7 @@ def __init__( self, tfdz_cnstr_rules: TfdzConstructionRules, uslp_ident: UslpProtocolIdentifier, - tfdz: bytes, + tfdz: bytes | bytearray, fhp_or_lvop: int | None = None, ): """ @@ -199,7 +199,7 @@ def __init__( self.fhp_or_lvop = fhp_or_lvop self._size = 0 - self.tfdz = tfdz + self.tfdz = bytes(tfdz) allowed_max_len = USLP_TFDF_MAX_SIZE - self.header_len() if self.len() > allowed_max_len: raise ValueError @@ -276,13 +276,13 @@ def __empty(cls) -> TransferFrameDataField: tfdz_cnstr_rules=TfdzConstructionRules.FpPacketSpanningMultipleFrames, uslp_ident=UslpProtocolIdentifier.SPACE_PACKETS_ENCAPSULATION_PACKETS, fhp_or_lvop=None, - tfdz=bytearray(), + tfdz=b"", ) @classmethod def unpack( cls, - raw_tfdf: bytes, + raw_tfdf: bytes | bytearray, truncated: bool, exact_len: int, frame_type: FrameType | None, diff --git a/spacepackets/uslp/header.py b/spacepackets/uslp/header.py index 2b38330..59572b9 100644 --- a/spacepackets/uslp/header.py +++ b/spacepackets/uslp/header.py @@ -70,7 +70,7 @@ def truncated(self) -> bool: @staticmethod def _unpack_raw_header_base_fields( - raw_packet: bytes, + raw_packet: bytes | bytearray, truncated: bool = False, uslp_version: int = USLP_VERSION_NUMBER, ) -> tuple[int, SourceOrDestField, int, int]: @@ -130,7 +130,7 @@ def truncated(self) -> bool: @classmethod def unpack( cls, - raw_packet: bytes, + raw_packet: bytes | bytearray, uslp_version: int = USLP_VERSION_NUMBER, ) -> TruncatedPrimaryHeader: """Unpack USLP primary header from raw bytearray. @@ -208,7 +208,7 @@ def pack(self) -> bytearray: | self.vcf_count_len ) if self.vcf_count_len > 0 and self.vcf_count is None: - raise ValueError + raise ValueError(f"VCF count is None for VCS count length {self.vcf_count_len}") if self.vcf_count_len == 1: packet.append(self.vcf_count) elif self.vcf_count_len == 2: @@ -239,7 +239,9 @@ def __empty(cls) -> PrimaryHeader: ) @classmethod - def unpack(cls, raw_packet: bytes, uslp_version: int = USLP_VERSION_NUMBER) -> PrimaryHeader: + def unpack( + cls, raw_packet: bytes | bytearray, uslp_version: int = USLP_VERSION_NUMBER + ) -> PrimaryHeader: """Unpack a regular transfer frame header from a raw bytearray :param raw_packet: diff --git a/spacepackets/util.py b/spacepackets/util.py index d04ad1a..b42217d 100644 --- a/spacepackets/util.py +++ b/spacepackets/util.py @@ -34,7 +34,7 @@ def get_bin_data_string(data: bytes) -> str: return string_to_print -def get_printable_data_string(print_format: PrintFormats, data: bytes) -> str: +def get_printable_data_string(print_format: PrintFormats, data: bytes | bytearray) -> str: """Returns the TM data in a clean printable hex string format :return: The string """ @@ -126,7 +126,7 @@ def __init__(self, val: int, byte_len: int): self._val_as_bytes = IntByteConversion.to_unsigned(self.byte_len, self.value) @classmethod - def from_bytes(cls, raw: bytes) -> UnsignedByteField: + def from_bytes(cls, raw: bytes | bytearray) -> UnsignedByteField: return cls( struct.unpack(IntByteConversion.unsigned_struct_specifier(len(raw)), raw)[0], len(raw), @@ -232,7 +232,7 @@ def __init__(self, val: int): super().__init__(val, 1) @classmethod - def from_u8_bytes(cls, stream: bytes) -> ByteFieldU8: + def from_u8_bytes(cls, stream: bytes | bytearray) -> ByteFieldU8: if len(stream) < 1: raise ValueError("Passed stream not large enough, should be at least 1 byte") return cls(stream[0]) @@ -248,7 +248,7 @@ def __init__(self, val: int): super().__init__(val, 2) @classmethod - def from_u16_bytes(cls, stream: bytes) -> ByteFieldU16: + def from_u16_bytes(cls, stream: bytes | bytearray) -> ByteFieldU16: if len(stream) < 2: raise ValueError("Passed stream not large enough, should be at least 2 byte") return cls(struct.unpack(IntByteConversion.unsigned_struct_specifier(2), stream[0:2])[0]) @@ -264,7 +264,7 @@ def __init__(self, val: int): super().__init__(val, 4) @classmethod - def from_u32_bytes(cls, stream: bytes) -> ByteFieldU32: + def from_u32_bytes(cls, stream: bytes | bytearray) -> ByteFieldU32: if len(stream) < 4: raise ValueError("passed stream not large enough, should be at least 4 bytes") return cls(struct.unpack(IntByteConversion.unsigned_struct_specifier(4), stream[0:4])[0]) @@ -280,7 +280,7 @@ def __init__(self, val: int): super().__init__(val, 8) @classmethod - def from_u64_bytes(cls, stream: bytes) -> ByteFieldU64: + def from_u64_bytes(cls, stream: bytes | bytearray) -> ByteFieldU64: if len(stream) < 8: raise ValueError("passed stream not large enough, should be at least 8 byte") return cls(struct.unpack(IntByteConversion.unsigned_struct_specifier(8), stream[0:8])[0]) @@ -308,7 +308,7 @@ def from_int(byte_len: int, val: int) -> UnsignedByteField: raise ValueError(f"invalid byte length {byte_len}") @staticmethod - def from_bytes(byte_len: int, stream: bytes) -> UnsignedByteField: + def from_bytes(byte_len: int, stream: bytes | bytearray) -> UnsignedByteField: """Generate an :py:class:`UnsignedByteField` from a raw bytestream and a length. :raise ValueError: Byte length is not one of [1, 2, 4, 8].""" diff --git a/tests/cfdp/pdus/test_directive.py b/tests/cfdp/pdus/test_directive.py index 39583e1..bd836f6 100644 --- a/tests/cfdp/pdus/test_directive.py +++ b/tests/cfdp/pdus/test_directive.py @@ -26,6 +26,6 @@ def test_file_directive(self): invalid_fss = bytes([0x00, 0x01]) with self.assertRaises(ValueError): file_directive_header.parse_fss_field(raw_packet=invalid_fss, current_idx=0) - file_directive_header.pdu_header.file_size = LargeFileFlag.LARGE + file_directive_header.pdu_header.file_flag = LargeFileFlag.LARGE with self.assertRaises(ValueError): file_directive_header.parse_fss_field(raw_packet=invalid_fss, current_idx=0) diff --git a/tests/ecss/test_pus_tm.py b/tests/ecss/test_pus_tm.py index 43d9d65..3154e93 100755 --- a/tests/ecss/test_pus_tm.py +++ b/tests/ecss/test_pus_tm.py @@ -21,6 +21,7 @@ PusTm, PusTmSecondaryHeader, ) +from spacepackets.exceptions import BytesTooShortError from spacepackets.util import PrintFormats, get_printable_data_string from .common import TEST_STAMP @@ -223,8 +224,8 @@ def test_crc_always_calced_if_none(self): self.assertEqual(tc_unpacked, new_ping_tm) def test_faulty_unpack(self): - self.assertRaises(ValueError, PusTm.unpack, None, None) - self.assertRaises(ValueError, PusTm.unpack, bytearray(), None) + self.assertRaises(TypeError, PusTm.unpack, None, None) + self.assertRaises(BytesTooShortError, PusTm.unpack, bytearray(), None) def test_invalid_sec_header_unpack(self): invalid_secondary_header = bytearray([0x20, 0x00, 0x01, 0x06])