Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beta/v2.3.0 #102

Merged
merged 11 commits into from
Jun 10, 2024
4 changes: 4 additions & 0 deletions src/external_signage/launch/external_signage.launch.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
<?xml version="1.0"?>
<launch>
<node pkg="external_signage" exec="external_signage" output="screen" />
</launch>
26 changes: 26 additions & 0 deletions src/external_signage/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?xml version="1.0"?>
<package format="2">
<name>external_signage</name>
<version>0.1.0</version>
<description>The signage package</description>

<maintainer email="[email protected]">yabuta</maintainer>

<license>Apache License 2.0</license>
<exec_depend>ament_index_python</exec_depend>
<exec_depend>signage_version</exec_depend>

<depend>autoware_auto_system_msgs</depend>
<depend>diagnostic_updater</depend>
<depend>python-pulsectl-pip</depend>
<depend>rclpy</depend>
<depend>std_srvs</depend>
<depend>tier4_api_msgs</depend>
<depend>tier4_debug_msgs</depend>
<depend>tier4_external_api_msgs</depend>
<depend>tier4_hmi_msgs</depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
4 changes: 4 additions & 0 deletions src/external_signage/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[develop]
script-dir=$base/lib/external_signage
[install]
install-scripts=$base/lib/external_signage
47 changes: 47 additions & 0 deletions src/external_signage/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env python3

import os

from setuptools import setup


def package_files(directory):
paths = []
for (path, directories, filenames) in os.walk(directory):
for filename in filenames:
paths.append(os.path.join(path, filename))
return paths


package_name = "external_signage"
setup(
name=package_name,
version="0.1.0",
package_dir={"": "src"},
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", ["resource/" + package_name]),
("share/" + package_name + "/resource/td5_file", package_files("resource/td5_file")),
("share/" + package_name, ["package.xml"]),
("share/" + package_name + "/launch", ["launch/external_signage.launch.xml"]),
],
install_requires=["setuptools"],
zip_safe=True,
author="Makoto Yabuta",
maintainer="Makoto Yabuta",
maintainer_email="[email protected]",
keywords=["ROS"],
classifiers=[
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python",
"Topic :: Software Development",
],
description=("external_signage provides a GUI for passanger."),
license="TODO",
entry_points={
"console_scripts": [
"external_signage = external_signage.external_signage:main",
]
},
)
4 changes: 4 additions & 0 deletions src/external_signage/src/external_signage/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# This Python file uses the following encoding: utf-8

# if__name__ == "__main__":
# pass
23 changes: 23 additions & 0 deletions src/external_signage/src/external_signage/external_signage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# This Python file uses the following encoding: utf-8
import sys

import rclpy
from rclpy.node import Node

from external_signage.external_signage_core import ExternalSignage
from ament_index_python.packages import get_package_share_directory

def main(args=None):
package_path = get_package_share_directory("external_signage")

rclpy.init(args=args)
node = Node("external_signage")

external_signage = ExternalSignage(node)

while True:
rclpy.spin_once(node, timeout_sec=0.01)


if __name__ == "__main__":
main()
150 changes: 150 additions & 0 deletions src/external_signage/src/external_signage/external_signage_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
from dataclasses import dataclass
import datetime
import time
import serial
from std_srvs.srv import SetBool
from ament_index_python.packages import get_package_share_directory
import external_signage.packet_tools as packet_tools


@dataclass
class Display:
address1: int
address2: int
height: int
width: int
ack_query_ack: list
ack_data_chunk: list


class Protocol:
SOT = 0xAA
EOT = 0x55
SEND_COLOR = "\x1B[34;1m"
RECV_COLOR = "\x1B[32;1m"
ERR_COLOR = "\x1B[31;1m"

def __init__(self):
self.front = Display(
address1=0x70,
address2=0x8F,
height=16,
width=128,
ack_query_ack=[0xAA, 0x70, 0x8F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55],
ack_data_chunk=[0xAA, 0x70, 0x8F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55],
)
self.back = Display(
address1=0x80,
address2=0x7F,
height=16,
width=128,
ack_query_ack=[0xAA, 0x80, 0x7F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55],
ack_data_chunk=[0xAA, 0x80, 0x7F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55],
)
self.side = Display(
address1=0x90,
address2=0x6F,
height=24,
width=80,
ack_query_ack=[0xAA, 0x90, 0x6F, 0x07, 0x12, 0x02, 0x1A, 0x01, 0x55],
ack_data_chunk=[0xAA, 0x90, 0x6F, 0x07, 0x20, 0x30, 0x56, 0x01, 0x55],
)


class DataSender:
def __init__(self, bus, parser, protocol, node_logger):
self._bus = bus
self._parser = parser
self._protocol = protocol
self._logger = node_logger
self._delay_time = 0.02

def _send_heartbeat(self, data, ACK_QueryACK):
timestamp = datetime.datetime.now()
name_time_packet = packet_tools.gen_name_time_packet(data.linename, timestamp, False)
self._bus.write(name_time_packet)
packet_tools.dump_packet(name_time_packet, None, self._protocol.SEND_COLOR)
time.sleep(self._delay_time)
self._bus.write(data.heartbeat_packet)
packet_tools.dump_packet(data.heartbeat_packet, None, self._protocol.SEND_COLOR)
buf = self._parser.wait_ack()
if not packet_tools.lists_match(buf, ACK_QueryACK):
if len(buf) == 0:
self._logger.error("No ACK received for heartbeat.")
else:
packet_tools.dump_packet(buf, None, self._protocol.ERR_COLOR)

def _send_data_packets(self, data, ACK_DataChunk):
for packet in data.data_packets:
packet_tools.dump_packet(packet, None, self._protocol.SEND_COLOR)
self._bus.write(packet)
buf = self._parser.wait_ack()
if not packet_tools.lists_match(buf, ACK_DataChunk):
if len(buf) == 0:
self._logger.error("No ACK received for data packet.")
else:
packet_tools.dump_packet(buf, None, self._protocol.ERR_COLOR)

def send(self, data, ACK_QueryACK, ACK_DataChunk):
self._send_heartbeat(data, ACK_QueryACK)
self._send_data_packets(data, ACK_DataChunk)
return # Exit after sending all data packets


class ExternalSignage:
def __init__(self, node):
self.node = node
self.protocol = Protocol()
package_path = get_package_share_directory("signage") + "/resource/td5_file/"
try:
self.bus = serial.Serial(
"/dev/ttyUSB0",
baudrate=38400,
parity=serial.PARITY_EVEN,
timeout=0.2,
exclusive=False,
)
self.parser = packet_tools.Parser(self.bus)
self._external_signage_available = True
except:
self._external_signage_available = False

self.displays = {
"front": self._load_display_data(self.protocol.front, package_path),
"back": self._load_display_data(self.protocol.back, package_path),
"side": self._load_display_data(self.protocol.side, package_path),
}
node.create_service(SetBool, "/signage/trigger_external", self.trigger_external_signage)

def _load_display_data(self, display, package_path):
auto_path = package_path + f"/automatic_{display.width}x{display.height}.td5"
null_path = package_path + f"/null_{display.width}x{display.height}.td5"
return {
"auto": packet_tools.TD5Data(
auto_path, display.address1, display.address2, display.height, display.width
),
"null": packet_tools.TD5Data(
null_path, display.address1, display.address2, display.height, display.width
),
}

def send_data(self, display_key, data_key):
display = self.displays[display_key]
data = display[data_key]
ack_query_ack = self.protocol.__dict__[display_key].ack_query_ack
ack_data_chunk = self.protocol.__dict__[display_key].ack_data_chunk
sender = DataSender(self.bus, self.parser, self.protocol, self.node.get_logger())
sender.send(data, ack_query_ack, ack_data_chunk)

def trigger_external_signage(self, request, response):
if not self._external_signage_available:
return response

if request.data:
for display_key in self.displays:
self.send_data(display_key, "auto")
time.sleep(1)
else:
for display_key in self.displays:
self.send_data(display_key, "null")
return response
Loading
Loading