Skip to content

Commit

Permalink
Merge pull request #3 from FunPythonEC/bug-topic-negotiation
Browse files Browse the repository at this point in the history
Robust bug topic negotiation
  • Loading branch information
sasilva1998 authored Oct 7, 2021
2 parents 5bed97a + 255b0b5 commit a446013
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 66 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -127,3 +127,4 @@ dmypy.json

# Pyre type checker
.pyre/
.vscode
145 changes: 79 additions & 66 deletions src/uros/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,14 @@
uio: packet buffer
struct: serlization
_TopicInfo: for topic negotiation
already provided in rosserial_msgs
already provided in rosserial_msgs
"""

import sys
import logging
import machine as m
import uio
import ustruct as struct
from time import sleep, sleep_ms, sleep_us
from rosserial_msgs import TopicInfo
import sys
import os
import logging

# for now threads are used, will be changed with asyncio in the future
if sys.platform == "esp32":
Expand All @@ -29,17 +26,18 @@

# class to manage publish and subscribe
# COULD BE CHANGED AFTERWARDS
class NodeHandle(object):
def __init__(self, serial_id=2, baudrate=115200, **kwargs):
class NodeHandle:
"""Initiates connection through rosserial using UART ports."""

def __init__(self, serial_id=2, baudrate=115200, **kwargs):
"""
id: used for topics id (negotiation)
advertised_topics: manage already negotiated topics
subscribing_topics: topics to which will be subscribed are here
serial_id: uart id
baudrate: baudrate used for serial comm
"""
self.id = 101
id: used for topics id (negotiation)
advertised_topics: manage already negotiated topics
subscribing_topics: topics to which will be subscribed are here
serial_id: uart id
baudrate: baudrate used for serial comm
"""
self._id = 101
self.advertised_topics = dict()
self.subscribing_topics = dict()
self.serial_id = serial_id
Expand Down Expand Up @@ -69,28 +67,18 @@ def __init__(self, serial_id=2, baudrate=115200, **kwargs):

# method to manage and advertise topic
# before publishing or subscribing
def _advertise_topic(self, topic_name, msg, endpoint, buffer_size):

def _advertise_topic(self, topic_name, _id, msg, endpoint, buffer_size):
"""
topic_name: eg. (Greet)
msg: message object
endpoint: corresponds to TopicInfo.msg typical topic id values
"""
topic_name: eg. (Greet)
msg: message object
endpoint: corresponds to TopicInfo.msg typical topic id values
"""
register = TopicInfo()
register.topic_id = self.id
register.topic_id = _id
register.topic_name = topic_name
register.message_type = msg._type
register.md5sum = msg._md5sum

self.advertised_topics[topic_name] = self.id

# id are summed by one
self.id += 1

try:
register.buffer_size = buffer_size
except Exception as e:
logging.info("No buffer size could be defined for topic negotiation.")
register.buffer_size = buffer_size

# serialization
packet = uio.StringIO()
Expand All @@ -101,17 +89,31 @@ def _advertise_topic(self, topic_name, msg, endpoint, buffer_size):
length = len(packet)

# both checksums
crclen = [checksum(le(length))]
crcpack = [checksum(le(endpoint) + packet)]
crclen = [checksum(_le(length))]
crcpack = [checksum(_le(endpoint) + packet)]

# final packet to be sent
fpacket = header + le(length) + crclen + le(endpoint) + packet + crcpack
fpacket = header + _le(length) + crclen + _le(endpoint) + packet + crcpack
self.uart.write(bytearray(fpacket))

def _advertise_all_topics(self):
for key, value in self.advertised_topics.items():
self._advertise_topic(key, value[0], value[1], value[2], value[3])

def publish(self, topic_name, msg, buffer_size=1024):
"""publishes messages to topics
Args:
topic_name (string): name of destination topic in ROS network.
msg (ROS message): custom message object generated by ugenpy.
buffer_size (int, optional): maximum size of buffer for message. Defaults to 1024.
"""

if topic_name not in self.advertised_topics:
self._advertise_topic(topic_name, msg, 0, buffer_size)
self.advertised_topics[topic_name] = [self._id, msg, 0, buffer_size]
# id are summed by one
self._advertise_topic(topic_name, self._id, msg, 0, buffer_size)
self._id += 1

# same as advertise
packet = uio.StringIO()
Expand All @@ -120,23 +122,33 @@ def publish(self, topic_name, msg, buffer_size=1024):
packet = list(packet.getvalue().encode("utf-8"))
length = len(packet)

topic_id = le(self.advertised_topics.get(topic_name))
crclen = [checksum(le(length))]
topic_id = _le(self.advertised_topics.get(topic_name)[0])
crclen = [checksum(_le(length))]
crcpack = [checksum(topic_id + packet)]

fpacket = header + le(length) + crclen + topic_id + packet + crcpack
fpacket = header + _le(length) + crclen + topic_id + packet + crcpack
self.uart.write(bytearray(fpacket))

def subscribe(self, topic_name, msgobj, cb, buffer_size=1024):
assert cb is not None, "Subscribe callback is not set"
def subscribe(self, topic_name, msg, _cb, buffer_size=1024):
"""subscribes to a topic receiving messages and processing them by a callback function
Args:
topic_name (string): name of destiny topic to send messages.
msg (ROS message): custom message object generated by ugenpy.
cb (function): callback function to process incoming messages.
buffer_size (int, optional): maximum size of buffer for message. Defaults to 1024.
"""
assert _cb is not None, "Subscribe callback is not set"

# subscribing topic attributes are added
self.subscribing_topics[self.id] = [msgobj, cb]
self.subscribing_topics[self._id] = [msg, _cb]

# advertised if not already subscribed
if topic_name not in self.advertised_topics:
msg = msgobj()
self._advertise_topic(topic_name, msg, 1, buffer_size)
self.advertised_topics[topic_name] = [self._id, msg, 1, buffer_size]
# id are summed by one
self._advertise_topic(topic_name, self._id, msg, 1, buffer_size)
self._id += 1

def _listen(self):
while True:
Expand Down Expand Up @@ -166,46 +178,47 @@ def _listen(self):
try:
# incoming object msg initialized
msgobj = self.subscribing_topics.get(inid)[0]
except Exception:
logging.info("TX request was made or got message from not available subscribed topic.")
except (OSError, TypeError, IndexError):
logging.info(
"TX request was made or got message from"
+ "not available subscribed topic."
)
# object sent to callback
callback = self.subscribing_topics.get(inid)[1]
fdata = msgobj()
fdata = fdata.deserialize(msgdata)
callback(fdata)
else:
raise ValueError("Message plus Topic ID Checksum is wrong!")
else:
self._advertise_all_topics()

except Exception as e:
except (OSError, TypeError, ValueError):
logging.info("No incoming data could be read for subscribes.")


# functions to be used in class
def word(l, h):
def word(_l, _h):
"""
Given a low and high bit, converts the number back into a word.
"""
Given a low and high bit, converts the number back into a word.
"""
return (h << 8) + l
return (_h << 8) + _l


# checksum method, receives array
def checksum(arr):
return 255 - ((sum(arr)) % 256)

"""Generates checksum value of message.
# little-endian method
def le(h):
h &= 0xFFFF
return [h & 0xFF, h >> 8]
Args:
arr (list): list of hexadecimal values.
Returns:
[int]: returns an value between 0 and 256
"""
return 255 - ((sum(arr)) % 256)

# example code
if __name__ == "__main__":
from std_msgs import String
from uros import NodeHandle

msg = String()
msg.data = "HiItsMeMario"
node = NodeHandle(2, 115200)
while True:
node.publish("greet", msg)
# little-endian method
def _le(_h):
_h &= 0xFFFF
return [_h & 0xFF, _h >> 8]

0 comments on commit a446013

Please sign in to comment.