diff --git a/CHANGELOG.md b/CHANGELOG.md index d46785d..83125b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/). # [unreleased] +## Changed + +- USLP implementation: FECF is fixed to 2 bytes and always uses the standard CRC16 CCITT checksum + calculation now. Consequently, the USLP API was adapted and simplified. + - `FecfProperties` removed, not required anymore + - `TransferFrame` constructor now expects `has_fecf` boolean parameter which defaults to true. + The checksum will be calculated and appended by the `pack` method. The `unpack` method + expects the `has_fecf` flag now as well and will perform a CRC16 calculation when the flag + is set to True. + # [v0.26.1] 2024-11-30 ## Fixed diff --git a/docs/examples.rst b/docs/examples.rst index 9eecd66..a8fc973 100644 --- a/docs/examples.rst +++ b/docs/examples.rst @@ -183,14 +183,14 @@ packet. ) var_frame = TransferFrame(header=frame_header, tfdf=tfdf) var_frame_packed = var_frame.pack() - print("USLP variable length frame without FECF containing a simple space packet") + print("USLP variable length frame with checksum containing a simple space packet") print(f"Contained space packet (hex): [{var_frame_packed.hex(sep=',')}]") Output: .. testoutput:: uslp - USLP variable length frame without FECF containing a simple space packet - Contained space packet (hex): [c0,07,30,20,00,00,00,e0,10,73,c0,00,00,03,01,02,03,04] + USLP variable length frame with checksum containing a simple space packet + Contained space packet (hex): [c0,07,30,20,00,00,00,e0,10,73,c0,00,00,03,01,02,03,04,4e,03] diff --git a/src/__init__.py b/src/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/spacepackets/ecss/pus_1_verification.py b/src/spacepackets/ecss/pus_1_verification.py index 7fccb7a..e2fcaaa 100644 --- a/src/spacepackets/ecss/pus_1_verification.py +++ b/src/spacepackets/ecss/pus_1_verification.py @@ -51,7 +51,10 @@ def len(self) -> int: @classmethod def unpack( - cls, data: bytes | bytearray, num_bytes_err_code: int, num_bytes_data: int | None = None + cls, + data: bytes | bytearray, + num_bytes_err_code: int, + num_bytes_data: int | None = None, ) -> FailureNotice: pfc = num_bytes_err_code * 8 if num_bytes_data is None: @@ -281,7 +284,10 @@ def error_code(self) -> ErrorCode | None: @property def is_step_reply(self) -> bool: - return self.subservice in (Subservice.TM_STEP_FAILURE, Subservice.TM_STEP_SUCCESS) + return self.subservice in ( + Subservice.TM_STEP_FAILURE, + Subservice.TM_STEP_SUCCESS, + ) @property def step_id(self) -> StepId | None: diff --git a/src/spacepackets/uslp/defs.py b/src/spacepackets/uslp/defs.py index d6e943c..1a81e6d 100644 --- a/src/spacepackets/uslp/defs.py +++ b/src/spacepackets/uslp/defs.py @@ -24,3 +24,7 @@ class UslpVersionMissmatchError(Exception): class UslpTypeMissmatchError(Exception): pass + + +class UslpChecksumError(Exception): + pass diff --git a/src/spacepackets/uslp/frame.py b/src/spacepackets/uslp/frame.py index f11340f..d6d85a4 100644 --- a/src/spacepackets/uslp/frame.py +++ b/src/spacepackets/uslp/frame.py @@ -4,7 +4,10 @@ import struct from typing import Union +from spacepackets.crc import CRC16_CCITT_FUNC + from .defs import ( + UslpChecksumError, UslpFhpVhopFieldMissingError, UslpInvalidConstructionRulesError, UslpInvalidFrameHeaderError, @@ -30,28 +33,19 @@ def __init__(self, present: bool, size: int): self.size = size -class FecfProperties: - def __init__(self, present: bool, size: int): - self.present = present - self.size = size - - class FramePropertiesBase: def __init__( self, has_insert_zone: bool, has_fecf: bool, insert_zone_len: int | None = None, - fecf_len: int | None = None, ): if has_insert_zone and insert_zone_len is None: raise ValueError - if has_fecf and fecf_len is None: - raise ValueError self.insert_zone_properties = InsertZoneProperties( present=has_insert_zone, size=insert_zone_len ) - self.fecf_properties = FecfProperties(present=has_fecf, size=fecf_len) + self.has_fecf = has_fecf class FixedFrameProperties(FramePropertiesBase): @@ -61,7 +55,6 @@ def __init__( has_insert_zone: bool, has_fecf: bool, insert_zone_len: int | None = None, - fecf_len: int | None = None, ): """Contains properties required when unpacking fixed USLP frames. These properties can not be determined by parsing the frame. The standard refers to these properties @@ -76,7 +69,6 @@ def __init__( has_insert_zone=has_insert_zone, has_fecf=has_fecf, insert_zone_len=insert_zone_len, - fecf_len=fecf_len, ) self.fixed_len = fixed_len @@ -88,23 +80,20 @@ def __init__( has_fecf: bool, truncated_frame_len: int, insert_zone_len: int | None = None, - fecf_len: int | None = None, ): """Contains properties required when unpacking variable USLP frames. These properties can not be determined by parsing the frame. The standard refers to these properties as managed parameters. :param has_insert_zone: - :param insert_zone_len: :param has_fecf: - :param fecf_len: :param truncated_frame_len: + :param insert_zone_len: """ super().__init__( has_insert_zone=has_insert_zone, has_fecf=has_fecf, insert_zone_len=insert_zone_len, - fecf_len=fecf_len, ) self.truncated_frame_len = truncated_frame_len @@ -324,13 +313,13 @@ def __init__( tfdf: TransferFrameDataField, insert_zone: bytes | None = None, op_ctrl_field: bytes | None = None, - fecf: bytes | None = None, + has_fecf: bool = True, ): self.header = header self.tfdf = tfdf self.insert_zone = insert_zone self.op_ctrl_field = op_ctrl_field - self.fecf = fecf + self.has_fecf = has_fecf def pack(self, truncated: bool = False, frame_type: FrameType | None = None) -> bytearray: frame = bytearray() @@ -346,8 +335,8 @@ def pack(self, truncated: bool = False, frame_type: FrameType | None = None) -> frame.extend(self.op_ctrl_field) elif not truncated and self.header.op_ctrl_flag: raise UslpInvalidFrameHeaderError - if self.fecf is not None: - frame.extend(self.fecf) + if self.has_fecf: + frame.extend(struct.pack("!H", CRC16_CCITT_FUNC(frame))) return frame def set_frame_len_in_header(self) -> None: @@ -363,8 +352,8 @@ def len(self) -> int: size += len(self.insert_zone) if self.op_ctrl_field is not None: size += len(self.op_ctrl_field) - if self.fecf is not None: - size += len(self.fecf) + if self.has_fecf: + size += 2 return size @classmethod @@ -383,13 +372,16 @@ def __empty(cls) -> TransferFrame: tfdf=empty_data_field, insert_zone=None, op_ctrl_field=None, - fecf=None, + has_fecf=False, ) # TODO: Fix lint by creating helper methods. @classmethod def unpack( # noqa: PLR0912 too many branches - cls, raw_frame: bytes, frame_type: FrameType, frame_properties: FramePropertiesT + cls, + raw_frame: bytes | bytearray, + frame_type: FrameType, + frame_properties: FramePropertiesT, ) -> TransferFrame: """Unpack a USLP transfer frame from a raw bytearray. All managed parameters have to be passed explicitly. @@ -457,11 +449,11 @@ def unpack( # noqa: PLR0912 too many branches frame.op_ctrl_field = raw_frame[current_idx : current_idx + 4] current_idx += 4 # Parse Frame Error Control field if present - if frame_properties.fecf_properties.present: - frame.fecf = raw_frame[ - current_idx : current_idx + frame_properties.fecf_properties.size - ] - current_idx += frame_properties.fecf_properties.size + if frame_properties.has_fecf: + crc_check = CRC16_CCITT_FUNC(raw_frame[0 : current_idx + 2]) + if crc_check != 0: + raise UslpChecksumError + return frame @staticmethod @@ -497,8 +489,8 @@ def __get_tfdf_len( exact_tfdf_len = properties.truncated_frame_len - header_len else: exact_tfdf_len = header.frame_len + 1 - header_len - if properties.fecf_properties.present: - exact_tfdf_len -= properties.fecf_properties.size + if properties.has_fecf: + exact_tfdf_len -= 2 if header_type != HeaderType.TRUNCATED and header.op_ctrl_flag: exact_tfdf_len -= 4 if properties.insert_zone_properties.present: diff --git a/src/spacepackets/uslp/header.py b/src/spacepackets/uslp/header.py index 59572b9..77b146b 100644 --- a/src/spacepackets/uslp/header.py +++ b/src/spacepackets/uslp/header.py @@ -183,9 +183,9 @@ def __init__( vcid: int, map_id: int, frame_len: int, - bypass_seq_ctrl_flag: BypassSequenceControlFlag, - prot_ctrl_cmd_flag: ProtocolCommandFlag, - op_ctrl_flag: bool, + bypass_seq_ctrl_flag: BypassSequenceControlFlag = BypassSequenceControlFlag.SEQ_CTRLD_QOS, + prot_ctrl_cmd_flag: ProtocolCommandFlag = ProtocolCommandFlag.USER_DATA, + op_ctrl_flag: bool = False, vcf_count_len: int = 0, vcf_count: int | None = None, ): @@ -286,7 +286,7 @@ def len(self) -> int: return 7 + self.vcf_count_len -def determine_header_type(header_start: bytes) -> HeaderType: +def determine_header_type(header_start: bytes | bytearray) -> HeaderType: """Determine header type from raw header. :param header_start: :raises ValueError: Passed bytearray shorter than minimum length 4 diff --git a/tests/cfdp/tlvslvs/test_msg_to_user.py b/tests/cfdp/tlvslvs/test_msg_to_user.py index 415c218..c4aee35 100644 --- a/tests/cfdp/tlvslvs/test_msg_to_user.py +++ b/tests/cfdp/tlvslvs/test_msg_to_user.py @@ -1,6 +1,11 @@ from unittest import TestCase -from spacepackets.cfdp import MessageToUserTlv, TlvHolder, TlvType, TlvTypeMissmatchError +from spacepackets.cfdp import ( + MessageToUserTlv, + TlvHolder, + TlvType, + TlvTypeMissmatchError, +) from spacepackets.cfdp.tlv import ( CfdpTlv, ProxyMessageType, diff --git a/tests/test_uslp.py b/tests/test_uslp.py index 34f2ff7..463b14c 100644 --- a/tests/test_uslp.py +++ b/tests/test_uslp.py @@ -1,5 +1,7 @@ +import struct from unittest import TestCase +from spacepackets.crc import CRC16_CCITT_FUNC from spacepackets.uslp.defs import ( UslpFhpVhopFieldMissingError, UslpInvalidConstructionRulesError, @@ -30,6 +32,37 @@ class TestUslp(TestCase): + def setUp(self) -> None: + # Initialize with frame length 0 and set frame length field later with a helper function + self.primary_header = PrimaryHeader( + scid=0x10, + map_id=0b0011, + src_dest=SourceOrDestField.SOURCE, + vcid=0b110111, + frame_len=0, + op_ctrl_flag=False, + vcf_count_len=0, + prot_ctrl_cmd_flag=ProtocolCommandFlag.USER_DATA, + bypass_seq_ctrl_flag=BypassSequenceControlFlag.SEQ_CTRLD_QOS, + ) + self.fp_tfdf = TransferFrameDataField( + tfdz_cnstr_rules=TfdzConstructionRules.FpPacketSpanningMultipleFrames, + uslp_ident=UslpProtocolIdentifier.SPACE_PACKETS_ENCAPSULATION_PACKETS, + tfdz=bytearray(), + fhp_or_lvop=0, + ) + # Packet without op control field, without insert zone and without frame error control + self.transfer_frame = TransferFrame( + header=self.primary_header, + op_ctrl_field=None, + insert_zone=None, + tfdf=self.fp_tfdf, + has_fecf=False, + ) + # This sets the correct frame length in the primary header + self.transfer_frame.set_frame_len_in_header() + return super().setUp() + def test_header(self): primary_header = PrimaryHeader( scid=pow(2, 16) - 2, @@ -139,7 +172,9 @@ def test_header(self): self.assertEqual(packed_header, unpacked_primary_header.pack()) self.assertRaises(UslpTypeMissmatchError, TruncatedPrimaryHeader.unpack, packed_header) self.assertRaises( - UslpInvalidRawPacketOrFrameLenError, TruncatedPrimaryHeader.unpack, bytearray() + UslpInvalidRawPacketOrFrameLenError, + TruncatedPrimaryHeader.unpack, + bytearray(), ) self.assertRaises( UslpInvalidRawPacketOrFrameLenError, @@ -202,36 +237,10 @@ def test_header(self): unpacked_truncated = TruncatedPrimaryHeader.unpack(raw_packet=packed_header) self.assertEqual(unpacked_truncated.pack(), packed_header) - def test_frame(self): - # Initialize with frame length 0 and set frame length field later with a helper function - primary_header = PrimaryHeader( - scid=0x10, - map_id=0b0011, - src_dest=SourceOrDestField.SOURCE, - vcid=0b110111, - frame_len=0, - op_ctrl_flag=False, - vcf_count_len=0, - prot_ctrl_cmd_flag=ProtocolCommandFlag.USER_DATA, - bypass_seq_ctrl_flag=BypassSequenceControlFlag.SEQ_CTRLD_QOS, - ) - fp_tfdf = TransferFrameDataField( - tfdz_cnstr_rules=TfdzConstructionRules.FpPacketSpanningMultipleFrames, - uslp_ident=UslpProtocolIdentifier.SPACE_PACKETS_ENCAPSULATION_PACKETS, - tfdz=bytearray(), - fhp_or_lvop=0, - ) - # Packet without op control field, without insert zone and without frame error control - transfer_frame = TransferFrame( - header=primary_header, - op_ctrl_field=None, - insert_zone=None, - tfdf=fp_tfdf, - fecf=None, - ) + def test_frame_pack(self): # This sets the correct frame length in the primary header - transfer_frame.set_frame_len_in_header() - frame_packed = transfer_frame.pack(truncated=False, frame_type=FrameType.FIXED) + self.transfer_frame.set_frame_len_in_header() + frame_packed = self.transfer_frame.pack(truncated=False, frame_type=FrameType.FIXED) # 7 byte primary header + TFDF with 3 bytes, TFDZ is empty self.assertEqual(len(frame_packed), 10) self.assertEqual((frame_packed[4] << 8) | frame_packed[5], len(frame_packed) - 1) @@ -239,10 +248,10 @@ def test_frame(self): self.assertEqual(frame_packed[7], 0x00) self.assertEqual(frame_packed[8], 0x00) self.assertEqual(frame_packed[9], 0x00) - transfer_frame.tfdf.tfdz_contr_rules = TfdzConstructionRules.FpFixedStartOfMapaSDU - transfer_frame.tfdf.uslp_ident = UslpProtocolIdentifier.IDLE_DATA - transfer_frame.tfdf.fhp_or_lvop = 0xAFFE - frame_packed = transfer_frame.pack() + self.transfer_frame.tfdf.tfdz_contr_rules = TfdzConstructionRules.FpFixedStartOfMapaSDU + self.transfer_frame.tfdf.uslp_ident = UslpProtocolIdentifier.IDLE_DATA + self.transfer_frame.tfdf.fhp_or_lvop = 0xAFFE + frame_packed = self.transfer_frame.pack() self.assertEqual(len(frame_packed), 10) self.assertEqual((frame_packed[4] << 8) | frame_packed[5], len(frame_packed) - 1) # TFDZ construction rule is 0b001, USLP identifier is 0b11111, concatenation is 0x3f @@ -251,14 +260,20 @@ def test_frame(self): # still set correctly self.assertEqual(frame_packed[8], 0xAF) self.assertEqual(frame_packed[9], 0xFE) + + def test_frame_unpack(self): frame_properties = FixedFrameProperties(has_insert_zone=False, has_fecf=False, fixed_len=10) + self.fp_tfdf.uslp_ident = UslpProtocolIdentifier.IDLE_DATA + self.fp_tfdf.tfdz_contr_rules = TfdzConstructionRules.FpFixedStartOfMapaSDU + self.fp_tfdf.fhp_or_lvop = 0xAFFE + frame_packed = self.transfer_frame.pack(truncated=False, frame_type=FrameType.FIXED) frame_unpacked = TransferFrame.unpack( raw_frame=frame_packed, frame_type=FrameType.FIXED, frame_properties=frame_properties, ) self.assertEqual(frame_unpacked.insert_zone, None) - self.assertEqual(frame_unpacked.fecf, None) + self.assertEqual(frame_unpacked.has_fecf, False) self.assertEqual(frame_unpacked.op_ctrl_field, None) self.assertEqual(frame_unpacked.tfdf.uslp_ident, UslpProtocolIdentifier.IDLE_DATA) self.assertEqual( @@ -267,11 +282,11 @@ def test_frame(self): ) self.assertEqual(frame_unpacked.tfdf.fhp_or_lvop, 0xAFFE) self.assertEqual(frame_unpacked.tfdf.tfdz, bytearray()) - self.assertEqual(frame_unpacked.header.pack(), transfer_frame.header.pack()) + self.assertEqual(frame_unpacked.header.pack(), self.transfer_frame.header.pack()) + + def test_some_errors(self): with self.assertRaises(ValueError): FixedFrameProperties(fixed_len=5, has_fecf=False, has_insert_zone=True) - with self.assertRaises(ValueError): - FixedFrameProperties(fixed_len=5, has_fecf=True, has_insert_zone=False) with self.assertRaises(ValueError): invalid_tfdz = bytearray(70000) tfdf = TransferFrameDataField( @@ -280,8 +295,8 @@ def test_frame(self): tfdz=invalid_tfdz, fhp_or_lvop=0, ) - self.assertEqual(fp_tfdf.verify_frame_type(frame_type=FrameType.FIXED), True) - self.assertEqual(fp_tfdf.verify_frame_type(frame_type=FrameType.VARIABLE), False) + self.assertEqual(self.fp_tfdf.verify_frame_type(frame_type=FrameType.FIXED), True) + self.assertEqual(self.fp_tfdf.verify_frame_type(frame_type=FrameType.VARIABLE), False) vp_tfdf = TransferFrameDataField( tfdz_cnstr_rules=TfdzConstructionRules.VpNoSegmentation, uslp_ident=UslpProtocolIdentifier.SPACE_PACKETS_ENCAPSULATION_PACKETS, @@ -293,7 +308,7 @@ def test_frame(self): False, ) self.assertEqual( - fp_tfdf.should_have_fhp_or_lvp_field(truncated=False, frame_type=FrameType.FIXED), + self.fp_tfdf.should_have_fhp_or_lvp_field(truncated=False, frame_type=FrameType.FIXED), True, ) empty_short_tfdf = TransferFrameDataField( @@ -370,14 +385,13 @@ def test_frame(self): fhp_or_lvop=0, ) insert_zone = bytearray(4) - op_ctrl_field = bytearray([1, 2, 3, 4]) - fecf = bytearray([3, 4]) + op_ctrl_field = bytearray([4, 3, 2, 1]) larger_frame = TransferFrame( header=primary_header, insert_zone=insert_zone, op_ctrl_field=op_ctrl_field, tfdf=tfdf, - fecf=fecf, + has_fecf=True, ) larger_frame.set_frame_len_in_header() self.assertEqual(larger_frame.header.frame_len, 24 - 1) @@ -386,8 +400,9 @@ def test_frame(self): self.assertEqual(len(larger_frame_packed), 24) self.assertEqual(larger_frame_packed[7 : 7 + 4], bytearray(4)) self.assertEqual(larger_frame_packed[14 : 14 + 4], bytearray([1, 2, 3, 4])) - self.assertEqual(larger_frame_packed[18 : 18 + 4], bytearray([1, 2, 3, 4])) - self.assertEqual(larger_frame_packed[22:], bytearray([3, 4])) + self.assertEqual(larger_frame_packed[18 : 18 + 4], bytearray([4, 3, 2, 1])) + crc16 = struct.pack("!H", CRC16_CCITT_FUNC(larger_frame_packed[0:22])) + self.assertEqual(larger_frame_packed[22:], crc16) with self.assertRaises(UslpInvalidRawPacketOrFrameLenError): TransferFrame.unpack( raw_frame=larger_frame_packed, @@ -397,7 +412,6 @@ def test_frame(self): has_insert_zone=True, insert_zone_len=4, has_fecf=True, - fecf_len=2, ), ) with self.assertRaises(UslpInvalidRawPacketOrFrameLenError): @@ -409,7 +423,6 @@ def test_frame(self): has_insert_zone=True, insert_zone_len=4, has_fecf=True, - fecf_len=2, ), ) with self.assertRaises(ValueError): @@ -421,7 +434,6 @@ def test_frame(self): has_insert_zone=True, insert_zone_len=4, has_fecf=True, - fecf_len=2, ), ) larger_frame.header.op_ctrl_flag = False @@ -444,12 +456,10 @@ def test_frame(self): has_insert_zone=True, insert_zone_len=4, has_fecf=True, - fecf_len=2, ), ) - self.assertEqual(larger_frame_unpacked.fecf, bytearray([3, 4])) self.assertEqual(larger_frame_unpacked.insert_zone, bytearray(4)) - self.assertEqual(larger_frame_unpacked.op_ctrl_field, bytearray([1, 2, 3, 4])) + self.assertEqual(larger_frame_unpacked.op_ctrl_field, bytearray([4, 3, 2, 1])) with self.assertRaises(UslpInvalidRawPacketOrFrameLenError): TransferFrame.unpack( raw_frame=bytearray(3), @@ -459,7 +469,6 @@ def test_frame(self): has_insert_zone=True, insert_zone_len=4, has_fecf=True, - fecf_len=2, ), ) truncated_header = TruncatedPrimaryHeader( @@ -475,7 +484,7 @@ def test_frame(self): insert_zone=None, op_ctrl_field=None, tfdf=tfdf_truncated, - fecf=None, + has_fecf=False, ) truncated_frame_packed = truncated_frame.pack(truncated=True) self.assertEqual(len(truncated_frame_packed), 9) @@ -488,7 +497,6 @@ def test_frame(self): has_insert_zone=True, insert_zone_len=4, has_fecf=True, - fecf_len=2, ), ) truncated_frame_unpacked = TransferFrame.unpack(