Skip to content

Commit

Permalink
Merge branch 'develop' into beta/v2.3.0
Browse files Browse the repository at this point in the history
  • Loading branch information
tkhmy authored Jun 7, 2024
2 parents 977e7b2 + 2aea669 commit 5f153f0
Show file tree
Hide file tree
Showing 4 changed files with 410 additions and 0 deletions.
136 changes: 136 additions & 0 deletions src/signage/src/signage/external_signage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
from dataclasses import dataclass
import datetime
import time
import serial
from ament_index_python.packages import get_package_share_directory
import signage.packet_tools as packet_tools


@dataclass
class Display:
address1: int
address2: int
height: int
width: int
ack_query_ack: list
ack_data_chunk: list


class Protocol:
SOT = 0xAA
EOT = 0x55
SEND_COLOR = "\x1B[34;1m"
RECV_COLOR = "\x1B[32;1m"
ERR_COLOR = "\x1B[31;1m"

def __init__(self):
self.front = Display(
address1=0x70,
address2=0x8F,
height=16,
width=128,
ack_query_ack=[0xAA, 0x70, 0x8F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55],
ack_data_chunk=[0xAA, 0x70, 0x8F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55],
)
self.back = Display(
address1=0x80,
address2=0x7F,
height=16,
width=128,
ack_query_ack=[0xAA, 0x80, 0x7F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55],
ack_data_chunk=[0xAA, 0x80, 0x7F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55],
)
self.side = Display(
address1=0x90,
address2=0x6F,
height=24,
width=80,
ack_query_ack=[0xAA, 0x90, 0x6F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55],
ack_data_chunk=[0xAA, 0x90, 0x6F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55],
)


class DataSender:
def __init__(self, bus, parser, protocol, node_logger):
self._bus = bus
self._parser = parser
self._protocol = protocol
self._logger = node_logger
self._delay_time = 0.02

def _send_heartbeat(self, data, ACK_QueryACK):
timestamp = datetime.datetime.now()
name_time_packet = packet_tools.gen_name_time_packet(data.linename, timestamp, False)
self._bus.write(name_time_packet)
packet_tools.dump_packet(name_time_packet, None, self._protocol.SEND_COLOR)
time.sleep(self._delay_time)
self._bus.write(data.heartbeat_packet)
packet_tools.dump_packet(data.heartbeat_packet, None, self._protocol.SEND_COLOR)
buf = self._parser.wait_ack()
if not packet_tools.lists_match(buf, ACK_QueryACK):
if len(buf) == 0:
self._logger.error("No ACK received for heartbeat.")
else:
packet_tools.dump_packet(buf, None, self._protocol.ERR_COLOR)

def _send_data_packets(self, data, ACK_DataChunk):
for packet in data.data_packets:
packet_tools.dump_packet(packet, None, self._protocol.SEND_COLOR)
self._bus.write(packet)
buf = self._parser.wait_ack()
if not packet_tools.lists_match(buf, ACK_DataChunk):
if len(buf) == 0:
self._logger.error("No ACK received for data packet.")
else:
packet_tools.dump_packet(buf, None, self._protocol.ERR_COLOR)

def send(self, data, ACK_QueryACK, ACK_DataChunk):
self._send_heartbeat(data, ACK_QueryACK)
self._send_data_packets(data, ACK_DataChunk)
return # Exit after sending all data packets


class ExternalSignage:
def __init__(self, node):
self.node = node
self.protocol = Protocol()
package_path = get_package_share_directory("signage") + "/resource/td5_file/"
self.bus = serial.Serial(
"/dev/ttyUSB0", baudrate=38400, parity=serial.PARITY_EVEN, timeout=0.2, exclusive=False
)
self.parser = packet_tools.Parser(self.bus)

self.displays = {
"front": self._load_display_data(self.protocol.front, package_path),
"back": self._load_display_data(self.protocol.back, package_path),
"side": self._load_display_data(self.protocol.side, package_path),
}

def _load_display_data(self, display, package_path):
auto_path = package_path + f"/automatic_{display.width}x{display.height}.td5"
null_path = package_path + f"/null_{display.width}x{display.height}.td5"
return {
"auto": packet_tools.TD5Data(
auto_path, display.address1, display.address2, display.height, display.width
),
"null": packet_tools.TD5Data(
null_path, display.address1, display.address2, display.height, display.width
),
}

def send_data(self, display_key, data_key):
display = self.displays[display_key]
data = display[data_key]
ack_query_ack = self.protocol.__dict__[display_key].ack_query_ack
ack_data_chunk = self.protocol.__dict__[display_key].ack_data_chunk
sender = DataSender(self.bus, self.parser, self.protocol, self.node.get_logger())
sender.send(data, ack_query_ack, ack_data_chunk)

def trigger(self):
for display_key in self.displays:
self.send_data(display_key, "auto")
time.sleep(1)

def close(self):
for display_key in self.displays:
self.send_data(display_key, "null")
Loading

0 comments on commit 5f153f0

Please sign in to comment.