diff --git a/src/signage/src/signage/external_signage.py b/src/signage/src/signage/external_signage.py new file mode 100644 index 0000000..e72bdb3 --- /dev/null +++ b/src/signage/src/signage/external_signage.py @@ -0,0 +1,136 @@ +from dataclasses import dataclass +import datetime +import time +import serial +from ament_index_python.packages import get_package_share_directory +import signage.packet_tools as packet_tools + + +@dataclass +class Display: + address1: int + address2: int + height: int + width: int + ack_query_ack: list + ack_data_chunk: list + + +class Protocol: + SOT = 0xAA + EOT = 0x55 + SEND_COLOR = "\x1B[34;1m" + RECV_COLOR = "\x1B[32;1m" + ERR_COLOR = "\x1B[31;1m" + + def __init__(self): + self.front = Display( + address1=0x70, + address2=0x8F, + height=16, + width=128, + ack_query_ack=[0xAA, 0x70, 0x8F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55], + ack_data_chunk=[0xAA, 0x70, 0x8F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55], + ) + self.back = Display( + address1=0x80, + address2=0x7F, + height=16, + width=128, + ack_query_ack=[0xAA, 0x80, 0x7F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55], + ack_data_chunk=[0xAA, 0x80, 0x7F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55], + ) + self.side = Display( + address1=0x90, + address2=0x6F, + height=24, + width=80, + ack_query_ack=[0xAA, 0x90, 0x6F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55], + ack_data_chunk=[0xAA, 0x90, 0x6F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55], + ) + + +class DataSender: + def __init__(self, bus, parser, protocol, node_logger): + self._bus = bus + self._parser = parser + self._protocol = protocol + self._logger = node_logger + self._delay_time = 0.02 + + def _send_heartbeat(self, data, ACK_QueryACK): + timestamp = datetime.datetime.now() + name_time_packet = packet_tools.gen_name_time_packet(data.linename, timestamp, False) + self._bus.write(name_time_packet) + packet_tools.dump_packet(name_time_packet, None, self._protocol.SEND_COLOR) + time.sleep(self._delay_time) + self._bus.write(data.heartbeat_packet) + packet_tools.dump_packet(data.heartbeat_packet, None, self._protocol.SEND_COLOR) + buf = self._parser.wait_ack() + if not packet_tools.lists_match(buf, ACK_QueryACK): + if len(buf) == 0: + self._logger.error("No ACK received for heartbeat.") + else: + packet_tools.dump_packet(buf, None, self._protocol.ERR_COLOR) + + def _send_data_packets(self, data, ACK_DataChunk): + for packet in data.data_packets: + packet_tools.dump_packet(packet, None, self._protocol.SEND_COLOR) + self._bus.write(packet) + buf = self._parser.wait_ack() + if not packet_tools.lists_match(buf, ACK_DataChunk): + if len(buf) == 0: + self._logger.error("No ACK received for data packet.") + else: + packet_tools.dump_packet(buf, None, self._protocol.ERR_COLOR) + + def send(self, data, ACK_QueryACK, ACK_DataChunk): + self._send_heartbeat(data, ACK_QueryACK) + self._send_data_packets(data, ACK_DataChunk) + return # Exit after sending all data packets + + +class ExternalSignage: + def __init__(self, node): + self.node = node + self.protocol = Protocol() + package_path = get_package_share_directory("signage") + "/resource/td5_file/" + self.bus = serial.Serial( + "/dev/ttyUSB0", baudrate=38400, parity=serial.PARITY_EVEN, timeout=0.2, exclusive=False + ) + self.parser = packet_tools.Parser(self.bus) + + self.displays = { + "front": self._load_display_data(self.protocol.front, package_path), + "back": self._load_display_data(self.protocol.back, package_path), + "side": self._load_display_data(self.protocol.side, package_path), + } + + def _load_display_data(self, display, package_path): + auto_path = package_path + f"/automatic_{display.width}x{display.height}.td5" + null_path = package_path + f"/null_{display.width}x{display.height}.td5" + return { + "auto": packet_tools.TD5Data( + auto_path, display.address1, display.address2, display.height, display.width + ), + "null": packet_tools.TD5Data( + null_path, display.address1, display.address2, display.height, display.width + ), + } + + def send_data(self, display_key, data_key): + display = self.displays[display_key] + data = display[data_key] + ack_query_ack = self.protocol.__dict__[display_key].ack_query_ack + ack_data_chunk = self.protocol.__dict__[display_key].ack_data_chunk + sender = DataSender(self.bus, self.parser, self.protocol, self.node.get_logger()) + sender.send(data, ack_query_ack, ack_data_chunk) + + def trigger(self): + for display_key in self.displays: + self.send_data(display_key, "auto") + time.sleep(1) + + def close(self): + for display_key in self.displays: + self.send_data(display_key, "null") diff --git a/src/signage/src/signage/packet_tools.py b/src/signage/src/signage/packet_tools.py new file mode 100644 index 0000000..37d485a --- /dev/null +++ b/src/signage/src/signage/packet_tools.py @@ -0,0 +1,267 @@ +import datetime +import operator +from functools import reduce + +# Packet start and end markers +SOT = 0xAA +EOT = 0x55 + + +def dump_packet(packet, timestamp=None, color=None): + """ + Prints a packet's contents in a formatted manner, optionally with color and timestamp. + + Parameters: + - packet: List[int], the packet data including start and end markers. + - timestamp: datetime.datetime, optional timestamp to prepend to the output. + - color: str, an optional terminal color code to apply to the output. + """ + verified = verify_sum(packet) + if color: + print(color, end="") + if timestamp: + print(timestamp, end=" ") + for p in packet: + print(f"{p:02x}", end=" ") + if color: + print("\033[0m", end="") + print() + + +def calc_sum(payload): + """ + Calculates the checksum of a given payload. + + Parameters: + - payload: List[int], the payload data from which to calculate the checksum. + + Returns: + - List[int], a two-element list containing the checksum bytes. + """ + s = reduce(operator.add, payload) + return [s & 0xFF, (s & 0xFF00) >> 8] + + +def verify_sum(packet): + """ + Verifies the checksum of a packet. + + Parameters: + - packet: List[int], the complete packet including checksum and markers. + + Returns: + - bool, True if the checksum is correct, False otherwise. + """ + sum = calc_sum(packet[1:-3]) + return (packet[-3] == sum[0]) and (packet[-2] == sum[1]) + + +def gen_data_packet(data, seq, addr1, addr2): + """ + Generates a single data packet from given data and sequence number. + + Parameters: + - data: List[int], the data to include in the packet. + - seq: int, the sequence number of the packet. + - addr1: int, the first part of the address. + - addr2: int, the second part of the address. + + Returns: + - List[int], the assembled packet. + """ + length = len(data) + 8 + cmd = 0x20 + payload = [addr1, addr2, length, cmd, seq, 0x00] + list(data) + checksum = calc_sum(payload) + return [SOT] + payload + checksum + [EOT] + + +def gen_data_packets(data, addr1, addr2): + """ + Generates a sequence of data packets from a larger dataset. + + Parameters: + - data: List[int], the complete set of data to be sent in packets. + - addr1: int, the first part of the address for the packets. + - addr2: int, the second part of the address for the packets. + + Returns: + - List[List[int]], a list of assembled packets. + """ + packets = [] + seq = 0 + for i in range(0, len(data), 128): + packet_data = data[i : i + 128] + packets.append(gen_data_packet(packet_data, seq, addr1, addr2)) + seq += 1 + return packets + + +def gen_name_time_packet(linename, timestamp, nightmode): + """ + Generates a packet containing a line name and the current time. + + Parameters: + - linename: str, the name of the line, must be 16 characters. + - timestamp: datetime.datetime, the current time. + - nightmode: bool, whether night mode is enabled. + + Returns: + - List[int], the assembled packet. + """ + addr1 = 0x60 + addr2 = 0x9F + length = 0x24 + cmd = 0x01 + if len(linename) != 16: + raise ValueError("Line Name length invalid") + payload = [addr1, addr2, length, cmd] + list(linename) + payload += map(lambda x: int(x, 16), timestamp.strftime("%M %H %d %w %m %y").split()) + payload.append(0x10 if nightmode else 0x00) + payload.extend([0x00] * 7) + checksum = calc_sum(payload) + return [SOT] + payload + checksum + [EOT] + + +def lists_match(l1, l2): + """ + Compares two lists for equality. + + Parameters: + - l1: List, the first list to compare. + - l2: List, the second list to compare. + + Returns: + - bool, True if the lists are equal, False otherwise. + """ + return len(l1) == len(l2) and all(x == y for x, y in zip(l1, l2)) + + +class TD5Data: + """ + Represents data extracted from a .td5 file, including line name, data packets, and heartbeat packet. + + Attributes: + - width: int, the width of the display. + - height: int, the height of the display. + - linename: bytes, the name of the line extracted from the file. + - data_packets: List[List[int]], the data packets ready for transmission. + - heartbeat_packet: List[int], the heartbeat packet for maintaining connection. + """ + + def __init__(self, filename, addr1, addr2, height, width): + self.width = width + self.height = height + with open(filename, "rb") as f: + f.seek(0x400) + self.linename = f.read(16) + f.seek(0x61C) + f.seek(0x607) + raw_linedatalen = f.read(2) + self.linedatalen = int.from_bytes(raw_linedatalen, "little") + f.seek(0x600) + linedata = f.read(self.linedatalen) + self.data_packets = gen_data_packets(linedata, addr1, addr2) + f.seek(0x603) + data = f.read(2) + self.heartbeat_packet = self.gen_heartbeat_packet( + data, raw_linedatalen, height, width, addr1, addr2 + ) + + def gen_heartbeat_packet(self, data, linedatalen, height, width, addr1, addr2): + """ + Generates a heartbeat packet based on the given parameters. + + Parameters: + - data: bytes, part of the data extracted from the .td5 file. + - linedatalen: bytes, the length of the line data. + - height: int, the height of the display. + - width: int, the width of the display. + - addr1: int, the first part of the display address. + - addr2: int, the second part of the display address. + + Returns: + - List[int], the assembled heartbeat packet. + """ + cmd1 = 0x16 + cmd2 = 0x12 + payload = [addr1, addr2, cmd1, cmd2] + payload.extend([0x00, 0x00, height, 0x00]) + payload.extend(linedatalen) + payload.extend([0x00, 0x00]) + payload.extend(data) + payload.extend([0x00, 0x00, 0x53, 0x30, 0x30, 0x35]) + checksum = calc_sum(payload) + return [SOT] + payload + checksum + [EOT] + + +class Parser: + """ + Parses incoming data from a serial bus into complete packets. + + Attributes: + - bus: serial.Serial, the serial bus from which to read data. + """ + + def __init__(self, bus): + self.state = 0 + self.buf = [] + self.i = 0 + self.datalen = 0 + self.bus = bus + + def read(self): + """ + Reads a single byte from the serial bus. + + Returns: + - int, the byte read, or None if no data is available. + """ + return self.bus.read(1) + + def parse(self, c): + """ + Parse a single byte and update the state machine. + + Parameters: + - c: int, the byte to parse. + + Returns: + - int: 1 if a complete packet is parsed successfully, 0 otherwise. + """ + if self.state == 0 and c == 0xAA: # Start of packet + self.buf = [c] + self.state = 1 + elif self.state == 1: # Reading packet header + self.buf.append(c) + self.i += 1 + if self.i == 2: # Length byte next + self.state = 2 + elif self.state == 2: # Reading packet length + self.buf.append(c) + self.datalen = int(c) + self.state = 3 + self.i = 0 + elif self.state == 3: # Reading packet payload + self.buf.append(c) + self.i += 1 + if self.i == (self.datalen - 2): # Packet complete + self.state = 0 + return 1 # Indicate a complete packet has been parsed + return 0 # Indicate parsing is ongoing + + def wait_ack(self): + """ + Wait for an acknowledgement packet. + + Returns: + - list: The received acknowledgement packet, or an empty list if none is received. + """ + while True: + received = self.read() + if not received or len(received) == 0: + break # No more data available + c = received[0] + if self.parse(c): + return self.buf # Return the complete packet + return [] # No acknowledgement received diff --git a/src/signage/src/signage/route_handler.py b/src/signage/src/signage/route_handler.py index 8bc1aaa..6a12179 100644 --- a/src/signage/src/signage/route_handler.py +++ b/src/signage/src/signage/route_handler.py @@ -26,6 +26,7 @@ def __init__( autoware_interface, parameter_interface, ros_service_interface, + external_signage, ): self._node = node self._viewController = viewController @@ -33,6 +34,7 @@ def __init__( self._autoware = autoware_interface self._parameter = parameter_interface.parameter self._service_interface = ros_service_interface + self._external_signage = external_signage self.AUTOWARE_IP = os.getenv("AUTOWARE_IP", "localhost") self._fms_payload = { "method": "get", @@ -283,6 +285,8 @@ def route_checker_callback(self): self._trigger_external_signage = True if not self._announce_engage and self._parameter.signage_stand_alone: self._announce_interface.send_announce("engage") + self._external_signage.trigger() + self._trigger_external_signage = True self._announce_engage = True elif self._autoware.information.route_state == RouteState.ARRIVED: # Check whether the vehicle arrive to goal diff --git a/src/signage/src/signage/signage.py b/src/signage/src/signage/signage.py index 71f427a..4020678 100644 --- a/src/signage/src/signage/signage.py +++ b/src/signage/src/signage/signage.py @@ -14,6 +14,7 @@ from signage.parameter_interface import ParameterInterface from signage.route_handler import RouteHandler from signage.ros_service_interface import RosServiceInterface +from signage.external_signage import ExternalSignage from ament_index_python.packages import get_package_share_directory @@ -33,6 +34,7 @@ def main(args=None): ros_service_interface = RosServiceInterface(node, parameter_interface) viewController = ViewControllerProperty(node, parameter_interface) announceController = AnnounceControllerProperty(node, autoware_interface, parameter_interface) + external_signage = ExternalSignage(node) route_handler = RouteHandler( node, viewController, @@ -40,6 +42,7 @@ def main(args=None): autoware_interface, parameter_interface, ros_service_interface, + external_signage, ) ctx = engine.rootContext()