From 4c5354a9c51aa0e318c27cfc284732bd85661dc4 Mon Sep 17 00:00:00 2001 From: Eddie Song Date: Mon, 29 Jan 2024 19:32:06 +0000 Subject: [PATCH 1/6] finished separate topic replayer, working on bagging --- fogros2-rt-x/fogros2_rt_x/dataset_spec.py | 1 - fogros2-rt-x/fogros2_rt_x/replayer.py | 171 ++++++++++++---------- 2 files changed, 94 insertions(+), 78 deletions(-) diff --git a/fogros2-rt-x/fogros2_rt_x/dataset_spec.py b/fogros2-rt-x/fogros2_rt_x/dataset_spec.py index 44560df..533df33 100644 --- a/fogros2-rt-x/fogros2_rt_x/dataset_spec.py +++ b/fogros2-rt-x/fogros2_rt_x/dataset_spec.py @@ -34,7 +34,6 @@ import numpy as np import tensorflow as tf import tensorflow_datasets as tfds -from fogros2_rt_x_msgs.msg import Step, Observation, Action from cv_bridge import CvBridge from std_msgs.msg import MultiArrayLayout, MultiArrayDimension from sensor_msgs.msg import Image diff --git a/fogros2-rt-x/fogros2_rt_x/replayer.py b/fogros2-rt-x/fogros2_rt_x/replayer.py index 2b78182..435602a 100755 --- a/fogros2-rt-x/fogros2_rt_x/replayer.py +++ b/fogros2-rt-x/fogros2_rt_x/replayer.py @@ -34,13 +34,13 @@ import socket import rclpy +import rosbag2_py as rosbag from rclpy.node import Node from .dataset_utils import * -from fogros2_rt_x_msgs.msg import Step, Observation, Action -from .dataset_spec import DatasetFeatureSpec +from .dataset_spec import DatasetFeatureSpec, FeatureSpec from .plugins.conf_base import * import time - +import tensorflow_datasets as tfds class DatasetReplayer(Node): """ @@ -62,8 +62,6 @@ def __init__(self): self.declare_parameter("dataset_name", "berkeley_fanuc_manipulation") dataset_name = self.get_parameter("dataset_name").value - self.config = get_dataset_plugin_config_from_str(dataset_name) - self.feature_spec = self.config.get_dataset_feature_spec() self.declare_parameter("per_episode_interval", 5) # second self.per_episode_interval = self.get_parameter("per_episode_interval").value @@ -75,21 +73,30 @@ def __init__(self): ) # as_single_topic | as_separate_topics | both replay_type = self.get_parameter("replay_type").value + self.declare_parameter("enable_bagging", True) + self.dataset = load_rlds_dataset(dataset_name) self.logger = self.get_logger() - self.logger.info("Loading Dataset " + str(get_dataset_info([dataset_name]))) + self.dataset_info = get_dataset_info([dataset_name]) + self.logger.info("Loading Dataset " + str(self.dataset_info)) self.episode = next(iter(self.dataset)) + self.dataset_features = self.dataset_info[0][1].features + self.step_features = self.dataset_features["steps"] + self.topics = list() + self.init_topics_from_features(self.step_features) + if replay_type == "as_separate_topics": self.topic_name_to_publisher_dict = dict() + self.topic_name_to_recorder_dict = dict() self.init_publisher_separate_topics() - elif replay_type == "as_single_topic": - self.init_publisher_single_topic() - elif replay_type == "both": - self.topic_name_to_publisher_dict = dict() - self.init_publisher_separate_topics() - self.init_publisher_single_topic() + # elif replay_type == "as_single_topic": + # self.init_publisher_single_topic() + # elif replay_type == "both": + # self.topic_name_to_publisher_dict = dict() + # self.init_publisher_separate_topics() + # self.init_publisher_single_topic() else: raise ValueError( "Invalid replay_type: " @@ -97,22 +104,25 @@ def __init__(self): + ". Must be one of: as_separate_topics, as_single_topic." ) - def init_publisher_separate_topics(self): - for observation in self.feature_spec.observation_spec: - publisher = self.create_publisher( - observation.ros_type, observation.ros_topic_name, 10 - ) - self.topic_name_to_publisher_dict[observation.ros_topic_name] = publisher + def init_topics_from_features(self, features): + for name, tf_feature in features.items(): + if isinstance(tf_feature, tfds.features.FeaturesDict): + self.init_topics_from_features(tf_feature) + else: + if tf_feature.shape == () and tf_feature.dtype.is_bool: + print("Adding to topics:", name, Scalar(dtype=tf.bool)) + self.topics.append(FeatureSpec(name, Scalar(dtype=tf.bool))) + else: + print("Adding to topics:", name, tf_feature) + self.topics.append(FeatureSpec(name, tf_feature)) + - for action in self.feature_spec.action_spec: + def init_publisher_separate_topics(self): + for topic in self.topics: publisher = self.create_publisher( - action.ros_type, action.ros_topic_name, 10 + topic.ros_type, topic.ros_topic_name, 10 ) - self.topic_name_to_publisher_dict[action.ros_topic_name] = publisher - - for step in self.feature_spec.step_spec: - publisher = self.create_publisher(step.ros_type, step.ros_topic_name, 10) - self.topic_name_to_publisher_dict[step.ros_topic_name] = publisher + self.topic_name_to_publisher_dict[topic.ros_topic_name] = publisher self.create_timer( self.per_episode_interval, self.timer_callback_separate_topics @@ -125,67 +135,74 @@ def init_publisher_single_topic(self): def timer_callback_separate_topics(self): for step in self.episode["steps"]: - for observation in self.feature_spec.observation_spec: - if observation.tf_name not in step["observation"]: - self.logger.warn( - f"Observation {observation.tf_name} not found in step data" + for topic in self.topics: + if topic.tf_name in step: + # Fetch from step data + msg = topic.convert_tf_tensor_data_to_ros2_msg( + step[topic.tf_name] ) - continue - msg = observation.convert_tf_tensor_data_to_ros2_msg( - step["observation"][observation.tf_name] - ) - self.logger.info( - f"Publishing observation {observation.tf_name} on topic {observation.ros_topic_name}" - ) - self.topic_name_to_publisher_dict[observation.ros_topic_name].publish( - msg - ) - - for action in self.feature_spec.action_spec: - if type(step["action"]) is not dict: - # action is only one tensor/datatype, not a dictionary - msg = action.convert_tf_tensor_data_to_ros2_msg(step["action"]) - else: - if action.tf_name not in step["action"]: - self.logger.warn( - f"Action {action.tf_name} not found in step data" - ) - continue - msg = action.convert_tf_tensor_data_to_ros2_msg( - step["action"][action.tf_name] + self.logger.info( + f"Publishing step {topic.tf_name} on topic {topic.ros_topic_name}" + ) + if type(step["observation"]) is dict and topic.tf_name in step["observation"]: + # Fetch from observation data + msg = topic.convert_tf_tensor_data_to_ros2_msg( + step["observation"][topic.tf_name] ) - self.logger.info( - f"Publishing action {action.tf_name} on topic {action.ros_topic_name}" - ) - self.topic_name_to_publisher_dict[action.ros_topic_name].publish(msg) - - for step_feature in self.feature_spec.step_spec: - if step_feature.tf_name not in step: - self.logger.warn( - f"Step {step_feature.tf_name} not found in step data" + self.logger.info( + f"Publishing observation {topic.tf_name} on topic {topic.ros_topic_name}" ) - continue - msg = step_feature.convert_tf_tensor_data_to_ros2_msg( - step[step_feature.tf_name] - ) - self.logger.info( - f"Publishing step {step_feature.tf_name} on topic {step_feature.ros_topic_name}" - ) - self.topic_name_to_publisher_dict[step_feature.ros_topic_name].publish( - msg - ) + if type(step["action"]) is dict and topic.tf_name in step["action"]: + # Fetch from action data + msg = topic.convert_tf_tensor_data_to_ros2_msg( + step["action"][topic.tf_name] + ) + self.logger.info( + f"Publishing action {topic.tf_name} on topic {topic.ros_topic_name}" + ) + + self.topic_name_to_publisher_dict[topic.ros_topic_name].publish(msg) time.sleep(self.per_step_interval) self.episode = next(iter(self.dataset)) + self.create_timer( + self.per_episode_interval, self.timer_callback_separate_topics + ) + + def init_recorder_separate_topics(self): + for topic in self.topics: + writer = rosbag.SequentialWriter() + storage_options = rosbag._storage.StorageOptions( + uri=f'{topic.ros_topic_name}_bag_{1}', + storage_id='sqlite3' + ) + converter_options = rosbag._storage.ConverterOptions('','') + writer.open(storage_options, converter_options) - def timer_callback_single_topic(self): - for step in self.episode["steps"]: - msg = self.feature_spec.convert_tf_step_to_ros2_msg( - step, step["action"], step["observation"] + topic_info = rosbag._storage.TopicMetadata( + name=topic.ros_topic_name, + type=topic.ros_type, + serialization_format='cdr' ) - self.publisher.publish(msg) - self.episode = next(iter(self.dataset)) + writer.create_topic(topic_info) + + subscription = self.create_subscription( + String, + topic.ros_topic_name, + self.topic_callback, + 10 + ) + + self.topic_name_to_recorder_dict[topic.ros_topic_name] = (writer, subscription) + + # def timer_callback_single_topic(self): + # for step in self.episode["steps"]: + # msg = self.feature_spec.convert_tf_step_to_ros2_msg( + # step, step["action"], step["observation"] + # ) + # self.publisher.publish(msg) + # self.episode = next(iter(self.dataset)) def main(args=None): From 49c166347d5c7eb920ba4fdbb07c59fc00011d11 Mon Sep 17 00:00:00 2001 From: Eddie Song Date: Mon, 29 Jan 2024 21:31:59 +0000 Subject: [PATCH 2/6] Added base bag recorder, not fully functioning yet --- fogros2-rt-x/fogros2_rt_x/replayer.py | 43 ++++++++++++++++++++------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/fogros2-rt-x/fogros2_rt_x/replayer.py b/fogros2-rt-x/fogros2_rt_x/replayer.py index 435602a..fd3fdeb 100755 --- a/fogros2-rt-x/fogros2_rt_x/replayer.py +++ b/fogros2-rt-x/fogros2_rt_x/replayer.py @@ -38,6 +38,7 @@ from rclpy.node import Node from .dataset_utils import * from .dataset_spec import DatasetFeatureSpec, FeatureSpec +from .dataset_spec import tf_feature_definition_to_ros_msg_class_str from .plugins.conf_base import * import time import tensorflow_datasets as tfds @@ -84,6 +85,7 @@ def __init__(self): self.dataset_features = self.dataset_info[0][1].features self.step_features = self.dataset_features["steps"] self.topics = list() + self.episode_counter = 1 self.init_topics_from_features(self.step_features) @@ -91,6 +93,7 @@ def __init__(self): self.topic_name_to_publisher_dict = dict() self.topic_name_to_recorder_dict = dict() self.init_publisher_separate_topics() + # self.init_recorder_separate_topics() # elif replay_type == "as_single_topic": # self.init_publisher_single_topic() # elif replay_type == "both": @@ -110,10 +113,10 @@ def init_topics_from_features(self, features): self.init_topics_from_features(tf_feature) else: if tf_feature.shape == () and tf_feature.dtype.is_bool: - print("Adding to topics:", name, Scalar(dtype=tf.bool)) + # print("Adding to topics:", name, Scalar(dtype=tf.bool)) self.topics.append(FeatureSpec(name, Scalar(dtype=tf.bool))) else: - print("Adding to topics:", name, tf_feature) + # print("Adding to topics:", name, tf_feature) self.topics.append(FeatureSpec(name, tf_feature)) @@ -162,7 +165,8 @@ def timer_callback_separate_topics(self): ) self.topic_name_to_publisher_dict[topic.ros_topic_name].publish(msg) - + + # self.check_last_step_update_recorder(step) time.sleep(self.per_step_interval) self.episode = next(iter(self.dataset)) @@ -172,30 +176,49 @@ def timer_callback_separate_topics(self): def init_recorder_separate_topics(self): for topic in self.topics: + ros_msg_type_string = tf_feature_definition_to_ros_msg_class_str(topic.tf_type) + writer = rosbag.SequentialWriter() storage_options = rosbag._storage.StorageOptions( - uri=f'{topic.ros_topic_name}_bag_{1}', - storage_id='sqlite3' + uri=f"bags/episode{self.episode_counter}/{topic.ros_topic_name}_bag", + storage_id="sqlite3" ) - converter_options = rosbag._storage.ConverterOptions('','') + converter_options = rosbag._storage.ConverterOptions("","") writer.open(storage_options, converter_options) topic_info = rosbag._storage.TopicMetadata( name=topic.ros_topic_name, - type=topic.ros_type, - serialization_format='cdr' + type=ros_msg_type_string, + serialization_format="cdr" ) writer.create_topic(topic_info) subscription = self.create_subscription( - String, + topic.ros_type, topic.ros_topic_name, - self.topic_callback, + self.create_topic_callback_function(topic.ros_topic_name), 10 ) self.topic_name_to_recorder_dict[topic.ros_topic_name] = (writer, subscription) + def create_topic_callback_function(self, topic_name): + self.logger.info(f"Creating topic callback function for {topic_name}") + def topic_callback(self, msg=''): + recorder, _ = self.topic_name_to_recorder_dict[topic_name] + recorder.write( + topic_name, + rclpy.serialization.serialize_message(msg), + self.get_clock().now().nanoseconds + ) + return topic_callback + + def check_last_step_update_recorder(self, step): + if step["is_last"]: + self.logger.info(f"End of episode {self.episode_counter}") + self.episode_counter += 1 + self.init_recorder_separate_topics() + # def timer_callback_single_topic(self): # for step in self.episode["steps"]: # msg = self.feature_spec.convert_tf_step_to_ros2_msg( From 427b3ad24d540780ee7c99cdb5e5c7c414dd19f4 Mon Sep 17 00:00:00 2001 From: Eddie Song Date: Mon, 29 Jan 2024 22:54:56 +0000 Subject: [PATCH 3/6] added default msg for callback, need to change later --- fogros2-rt-x/fogros2_rt_x/replayer.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fogros2-rt-x/fogros2_rt_x/replayer.py b/fogros2-rt-x/fogros2_rt_x/replayer.py index fd3fdeb..fb39387 100755 --- a/fogros2-rt-x/fogros2_rt_x/replayer.py +++ b/fogros2-rt-x/fogros2_rt_x/replayer.py @@ -93,7 +93,7 @@ def __init__(self): self.topic_name_to_publisher_dict = dict() self.topic_name_to_recorder_dict = dict() self.init_publisher_separate_topics() - # self.init_recorder_separate_topics() + self.init_recorder_separate_topics() # elif replay_type == "as_single_topic": # self.init_publisher_single_topic() # elif replay_type == "both": @@ -166,7 +166,7 @@ def timer_callback_separate_topics(self): self.topic_name_to_publisher_dict[topic.ros_topic_name].publish(msg) - # self.check_last_step_update_recorder(step) + self.check_last_step_update_recorder(step) time.sleep(self.per_step_interval) self.episode = next(iter(self.dataset)) From 885bd4639ffcddee242859ecb4c6ac6612bd177d Mon Sep 17 00:00:00 2001 From: Eddie Song Date: Wed, 31 Jan 2024 17:54:35 +0000 Subject: [PATCH 4/6] separated recorder from replayer --- fogros2-rt-x/fogros2_rt_x/recorder.py | 53 ++++++++++++++++ fogros2-rt-x/fogros2_rt_x/replayer.py | 86 +++++++------------------- fogros2-rt-x/launch/replayer.launch.py | 10 +++ fogros2-rt-x/setup.py | 1 + 4 files changed, 85 insertions(+), 65 deletions(-) create mode 100644 fogros2-rt-x/fogros2_rt_x/recorder.py diff --git a/fogros2-rt-x/fogros2_rt_x/recorder.py b/fogros2-rt-x/fogros2_rt_x/recorder.py new file mode 100644 index 0000000..89ab542 --- /dev/null +++ b/fogros2-rt-x/fogros2_rt_x/recorder.py @@ -0,0 +1,53 @@ +import socket + +import rclpy +from rosbag2_py import Recorder, RecordOptions, StorageOptions +from rclpy.node import Node +import time + +class DatasetRecorder(Node): + """ + A class for replaying datasets in ROS2. + + Args: + dataset_name (str): The name of the dataset to be replayed. + + Attributes: + publisher (Publisher): The publisher for sending step information. + dataset (Dataset): The loaded RLDS dataset. + logger (Logger): The logger for logging information. + feature_spec (DatasetFeatureSpec): The feature specification for the dataset. + episode (Episode): The current episode being replayed. + """ + + def __init__(self): + super().__init__("fogros2_rt_x_recorder") + self.episode_counter = 1 + self.storage_options = StorageOptions( + uri=f"rosbags/episode_{self.episode_counter}", + storage_id="sqlite3" + ) + + self.record_options = RecordOptions() + self.record_options.all = True + self.recorder = Recorder() + + try: + self.recorder.record(self.storage_options, self.record_options) + except KeyboardInterrupt: + pass + + +def main(args=None): + + rclpy.init(args=args) + node = DatasetRecorder() + + rclpy.spin(node) + + node.destroy_node() + rclpy.shutdown() + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/fogros2-rt-x/fogros2_rt_x/replayer.py b/fogros2-rt-x/fogros2_rt_x/replayer.py index fb39387..223f990 100755 --- a/fogros2-rt-x/fogros2_rt_x/replayer.py +++ b/fogros2-rt-x/fogros2_rt_x/replayer.py @@ -35,6 +35,7 @@ import rclpy import rosbag2_py as rosbag +from rosbag2_py import Recorder, RecordOptions, StorageOptions from rclpy.node import Node from .dataset_utils import * from .dataset_spec import DatasetFeatureSpec, FeatureSpec @@ -74,8 +75,6 @@ def __init__(self): ) # as_single_topic | as_separate_topics | both replay_type = self.get_parameter("replay_type").value - self.declare_parameter("enable_bagging", True) - self.dataset = load_rlds_dataset(dataset_name) self.logger = self.get_logger() self.dataset_info = get_dataset_info([dataset_name]) @@ -93,13 +92,6 @@ def __init__(self): self.topic_name_to_publisher_dict = dict() self.topic_name_to_recorder_dict = dict() self.init_publisher_separate_topics() - self.init_recorder_separate_topics() - # elif replay_type == "as_single_topic": - # self.init_publisher_single_topic() - # elif replay_type == "both": - # self.topic_name_to_publisher_dict = dict() - # self.init_publisher_separate_topics() - # self.init_publisher_single_topic() else: raise ValueError( "Invalid replay_type: " @@ -166,7 +158,7 @@ def timer_callback_separate_topics(self): self.topic_name_to_publisher_dict[topic.ros_topic_name].publish(msg) - self.check_last_step_update_recorder(step) + # self.check_last_step_update_recorder(step) time.sleep(self.per_step_interval) self.episode = next(iter(self.dataset)) @@ -174,59 +166,26 @@ def timer_callback_separate_topics(self): self.per_episode_interval, self.timer_callback_separate_topics ) - def init_recorder_separate_topics(self): - for topic in self.topics: - ros_msg_type_string = tf_feature_definition_to_ros_msg_class_str(topic.tf_type) - - writer = rosbag.SequentialWriter() - storage_options = rosbag._storage.StorageOptions( - uri=f"bags/episode{self.episode_counter}/{topic.ros_topic_name}_bag", - storage_id="sqlite3" - ) - converter_options = rosbag._storage.ConverterOptions("","") - writer.open(storage_options, converter_options) - - topic_info = rosbag._storage.TopicMetadata( - name=topic.ros_topic_name, - type=ros_msg_type_string, - serialization_format="cdr" - ) - writer.create_topic(topic_info) - - subscription = self.create_subscription( - topic.ros_type, - topic.ros_topic_name, - self.create_topic_callback_function(topic.ros_topic_name), - 10 - ) - - self.topic_name_to_recorder_dict[topic.ros_topic_name] = (writer, subscription) - - def create_topic_callback_function(self, topic_name): - self.logger.info(f"Creating topic callback function for {topic_name}") - def topic_callback(self, msg=''): - recorder, _ = self.topic_name_to_recorder_dict[topic_name] - recorder.write( - topic_name, - rclpy.serialization.serialize_message(msg), - self.get_clock().now().nanoseconds - ) - return topic_callback + # def init_recorder(self): + # storage_options = StorageOptions( + # uri=f"rosbags/episode {self.episode_counter}", + # storage_id="sqlite3" + # ) + + # record_options = RecordOptions() + # record_options.all = True + # recorder = Recorder() + + # try: + # recorder.record(storage_options, record_options) + # except KeyboardInterrupt: + # pass - def check_last_step_update_recorder(self, step): - if step["is_last"]: - self.logger.info(f"End of episode {self.episode_counter}") - self.episode_counter += 1 - self.init_recorder_separate_topics() - - # def timer_callback_single_topic(self): - # for step in self.episode["steps"]: - # msg = self.feature_spec.convert_tf_step_to_ros2_msg( - # step, step["action"], step["observation"] - # ) - # self.publisher.publish(msg) - # self.episode = next(iter(self.dataset)) - + # def check_last_step_update_recorder(self, step): + # if step["is_last"]: + # self.logger.info(f"End of episode {self.episode_counter}") + # self.episode_counter += 1 + # self.init_recorder_separate_topics() def main(args=None): # tf.experimental.numpy.experimental_enable_numpy_behavior() @@ -236,9 +195,6 @@ def main(args=None): rclpy.spin(node) - # Destroy the timer attached to the node explicitly - # (optional - otherwise it will be done automatically - # when the garbage collector destroys the node object) node.destroy_node() rclpy.shutdown() diff --git a/fogros2-rt-x/launch/replayer.launch.py b/fogros2-rt-x/launch/replayer.launch.py index a00dfe8..c26c5a5 100644 --- a/fogros2-rt-x/launch/replayer.launch.py +++ b/fogros2-rt-x/launch/replayer.launch.py @@ -49,4 +49,14 @@ def generate_launch_description(): ) ld.add_action(replayer_node) + + recorder_node = Node( + package="fogros2_rt_x", + executable="recorder", + output="screen", + parameters = [ + {"dataset_name": "berkeley_fanuc_manipulation"}, + ] + ) + ld.add_action(recorder_node) return ld diff --git a/fogros2-rt-x/setup.py b/fogros2-rt-x/setup.py index eaf217f..6cbfe04 100755 --- a/fogros2-rt-x/setup.py +++ b/fogros2-rt-x/setup.py @@ -48,6 +48,7 @@ entry_points={ "console_scripts": [ "replayer = fogros2_rt_x.replayer:main", + "recorder = fogros2_rt_x.recorder:main", ], "ros2cli.command": [ "fgr = fogros2_rt_x.cli:FogCommand", From 13d27b7e9173ad9c7f3a350f155c7144abff3223 Mon Sep 17 00:00:00 2001 From: Eddie Song Date: Thu, 1 Feb 2024 02:20:32 +0000 Subject: [PATCH 5/6] not working bagging with Recorder.record --- fogros2-rt-x/fogros2_rt_x/recorder.py | 51 ++++++++++++++------ fogros2-rt-x/fogros2_rt_x/replayer.py | 41 +++++----------- fogros2-rt-x/fogros2_rt_x/replayer_common.py | 6 +++ 3 files changed, 54 insertions(+), 44 deletions(-) create mode 100644 fogros2-rt-x/fogros2_rt_x/replayer_common.py diff --git a/fogros2-rt-x/fogros2_rt_x/recorder.py b/fogros2-rt-x/fogros2_rt_x/recorder.py index 89ab542..22f2980 100644 --- a/fogros2-rt-x/fogros2_rt_x/recorder.py +++ b/fogros2-rt-x/fogros2_rt_x/recorder.py @@ -4,6 +4,37 @@ from rosbag2_py import Recorder, RecordOptions, StorageOptions from rclpy.node import Node import time +from .replayer_common import episode_recorder + +def init_recorder(): + global episode_recorder + storage_options = StorageOptions( + uri=f"rosbags/episode_{1}", + storage_id="sqlite3" + ) + record_options = RecordOptions() + record_options.all = True + episode_recorder.record(storage_options, record_options) + + +def start_new_recorder(episode_counter): + global episode_recorder + print("CURRENT EPISODE IS:", episode_counter) + storage_options = StorageOptions( + uri=f"rosbags/episode_{episode_counter}", + storage_id="sqlite3" + ) + record_options = RecordOptions() + record_options.all = True + # episode_recorder = Recorder() + episode_recorder.record(storage_options, record_options) + +def restart_recorder(): + global episode_recorder + print("ATTR:", dir(episode_recorder)) + episode_recorder.cancel() + episode_recorder.stop() + class DatasetRecorder(Node): """ @@ -19,24 +50,14 @@ class DatasetRecorder(Node): feature_spec (DatasetFeatureSpec): The feature specification for the dataset. episode (Episode): The current episode being replayed. """ - def __init__(self): super().__init__("fogros2_rt_x_recorder") + self.logger = self.get_logger() self.episode_counter = 1 - self.storage_options = StorageOptions( - uri=f"rosbags/episode_{self.episode_counter}", - storage_id="sqlite3" - ) - - self.record_options = RecordOptions() - self.record_options.all = True - self.recorder = Recorder() - - try: - self.recorder.record(self.storage_options, self.record_options) - except KeyboardInterrupt: - pass - + init_recorder() + while True: + self.episode_counter += 1 + start_new_recorder(self.episode_counter) def main(args=None): diff --git a/fogros2-rt-x/fogros2_rt_x/replayer.py b/fogros2-rt-x/fogros2_rt_x/replayer.py index 223f990..4816be8 100755 --- a/fogros2-rt-x/fogros2_rt_x/replayer.py +++ b/fogros2-rt-x/fogros2_rt_x/replayer.py @@ -35,8 +35,9 @@ import rclpy import rosbag2_py as rosbag -from rosbag2_py import Recorder, RecordOptions, StorageOptions +from rosbag2_py import Recorder from rclpy.node import Node +from .recorder import restart_recorder from .dataset_utils import * from .dataset_spec import DatasetFeatureSpec, FeatureSpec from .dataset_spec import tf_feature_definition_to_ros_msg_class_str @@ -105,12 +106,13 @@ def init_topics_from_features(self, features): self.init_topics_from_features(tf_feature) else: if tf_feature.shape == () and tf_feature.dtype.is_bool: - # print("Adding to topics:", name, Scalar(dtype=tf.bool)) self.topics.append(FeatureSpec(name, Scalar(dtype=tf.bool))) else: - # print("Adding to topics:", name, tf_feature) self.topics.append(FeatureSpec(name, tf_feature)) + + + def init_publisher_separate_topics(self): for topic in self.topics: @@ -123,11 +125,6 @@ def init_publisher_separate_topics(self): self.per_episode_interval, self.timer_callback_separate_topics ) - def init_publisher_single_topic(self): - self.publisher = self.create_publisher(Step, "step_info", 10) - callback = self.timer_callback_single_topic - self.create_timer(self.per_episode_interval, callback) - def timer_callback_separate_topics(self): for step in self.episode["steps"]: for topic in self.topics: @@ -158,7 +155,7 @@ def timer_callback_separate_topics(self): self.topic_name_to_publisher_dict[topic.ros_topic_name].publish(msg) - # self.check_last_step_update_recorder(step) + self.check_last_step_update_recorder(step) time.sleep(self.per_step_interval) self.episode = next(iter(self.dataset)) @@ -166,26 +163,12 @@ def timer_callback_separate_topics(self): self.per_episode_interval, self.timer_callback_separate_topics ) - # def init_recorder(self): - # storage_options = StorageOptions( - # uri=f"rosbags/episode {self.episode_counter}", - # storage_id="sqlite3" - # ) - - # record_options = RecordOptions() - # record_options.all = True - # recorder = Recorder() - - # try: - # recorder.record(storage_options, record_options) - # except KeyboardInterrupt: - # pass - - # def check_last_step_update_recorder(self, step): - # if step["is_last"]: - # self.logger.info(f"End of episode {self.episode_counter}") - # self.episode_counter += 1 - # self.init_recorder_separate_topics() + def check_last_step_update_recorder(self, step): + if step["is_last"]: + self.logger.info(f"End of the current episode") + self.episode_counter += 1 + # restart_recorder(self.episode_counter) + restart_recorder() def main(args=None): # tf.experimental.numpy.experimental_enable_numpy_behavior() diff --git a/fogros2-rt-x/fogros2_rt_x/replayer_common.py b/fogros2-rt-x/fogros2_rt_x/replayer_common.py new file mode 100644 index 0000000..ef4d532 --- /dev/null +++ b/fogros2-rt-x/fogros2_rt_x/replayer_common.py @@ -0,0 +1,6 @@ +import rclpy +from rosbag2_py import Recorder + +global episode_recorder + +episode_recorder = Recorder() \ No newline at end of file From 2e33efb5489d0b8d561483764d0c10c66c0af1cf Mon Sep 17 00:00:00 2001 From: Eric Chen Date: Thu, 1 Feb 2024 17:58:02 +0000 Subject: [PATCH 6/6] Refactor recorder and replayer modules --- fogros2-rt-x/fogros2_rt_x/recorder.py | 65 ++++++++++---------- fogros2-rt-x/fogros2_rt_x/replayer.py | 13 +++- fogros2-rt-x/fogros2_rt_x/replayer_common.py | 6 -- 3 files changed, 41 insertions(+), 43 deletions(-) delete mode 100644 fogros2-rt-x/fogros2_rt_x/replayer_common.py diff --git a/fogros2-rt-x/fogros2_rt_x/recorder.py b/fogros2-rt-x/fogros2_rt_x/recorder.py index 22f2980..d0be4f3 100644 --- a/fogros2-rt-x/fogros2_rt_x/recorder.py +++ b/fogros2-rt-x/fogros2_rt_x/recorder.py @@ -4,37 +4,9 @@ from rosbag2_py import Recorder, RecordOptions, StorageOptions from rclpy.node import Node import time -from .replayer_common import episode_recorder +from std_srvs.srv import Empty +from threading import Thread -def init_recorder(): - global episode_recorder - storage_options = StorageOptions( - uri=f"rosbags/episode_{1}", - storage_id="sqlite3" - ) - record_options = RecordOptions() - record_options.all = True - episode_recorder.record(storage_options, record_options) - - -def start_new_recorder(episode_counter): - global episode_recorder - print("CURRENT EPISODE IS:", episode_counter) - storage_options = StorageOptions( - uri=f"rosbags/episode_{episode_counter}", - storage_id="sqlite3" - ) - record_options = RecordOptions() - record_options.all = True - # episode_recorder = Recorder() - episode_recorder.record(storage_options, record_options) - -def restart_recorder(): - global episode_recorder - print("ATTR:", dir(episode_recorder)) - episode_recorder.cancel() - episode_recorder.stop() - class DatasetRecorder(Node): """ @@ -52,13 +24,38 @@ class DatasetRecorder(Node): """ def __init__(self): super().__init__("fogros2_rt_x_recorder") + + self.new_episode_notification_service = self.create_service(Empty, 'new_episode_notification_service', self.new_episode_notification_service_callback) + self.episode_recorder = Recorder() + self.logger = self.get_logger() self.episode_counter = 1 - init_recorder() - while True: - self.episode_counter += 1 - start_new_recorder(self.episode_counter) + self.init_recorder() + self.logger.info("Recording started") + + def new_episode_notification_service_callback(self, request, response): + self.logger.info("Received request to start new episode") + self.stop_recorder() + self.start_new_recorder() + return response + + def init_recorder(self): + self.start_new_recorder() + + def start_new_recorder(self): + self.logger.info(f"starting episode #: {self.episode_counter}") + storage_options = StorageOptions( + uri=f"rosbags/episode_{self.episode_counter}", + storage_id="sqlite3" + ) + record_options = RecordOptions() + record_options.all = True + self.thread = Thread(target=self.episode_recorder.record, args=(storage_options, record_options)).start() + self.episode_counter += 1 + def stop_recorder(self): + self.episode_recorder.cancel() + def main(args=None): rclpy.init(args=args) diff --git a/fogros2-rt-x/fogros2_rt_x/replayer.py b/fogros2-rt-x/fogros2_rt_x/replayer.py index 4816be8..6c3904f 100755 --- a/fogros2-rt-x/fogros2_rt_x/replayer.py +++ b/fogros2-rt-x/fogros2_rt_x/replayer.py @@ -37,13 +37,13 @@ import rosbag2_py as rosbag from rosbag2_py import Recorder from rclpy.node import Node -from .recorder import restart_recorder from .dataset_utils import * from .dataset_spec import DatasetFeatureSpec, FeatureSpec from .dataset_spec import tf_feature_definition_to_ros_msg_class_str -from .plugins.conf_base import * +from .conf_base import * import time import tensorflow_datasets as tfds +from std_srvs.srv import Empty class DatasetReplayer(Node): """ @@ -88,6 +88,9 @@ def __init__(self): self.episode_counter = 1 self.init_topics_from_features(self.step_features) + # create an empty ros2 servcice to start and stop recording + self.new_episode_notification_client = self.create_client(Empty, 'new_episode_notification_service') + self.new_episode_notification_req = Empty.Request() if replay_type == "as_separate_topics": self.topic_name_to_publisher_dict = dict() @@ -168,7 +171,11 @@ def check_last_step_update_recorder(self, step): self.logger.info(f"End of the current episode") self.episode_counter += 1 # restart_recorder(self.episode_counter) - restart_recorder() + # restart_recorder() + while not self.new_episode_notification_client.wait_for_service(timeout_sec=1.0): + self.get_logger().info('Service not available, waiting again...') + self.future = self.new_episode_notification_client.call_async(self.new_episode_notification_req) + def main(args=None): # tf.experimental.numpy.experimental_enable_numpy_behavior() diff --git a/fogros2-rt-x/fogros2_rt_x/replayer_common.py b/fogros2-rt-x/fogros2_rt_x/replayer_common.py deleted file mode 100644 index ef4d532..0000000 --- a/fogros2-rt-x/fogros2_rt_x/replayer_common.py +++ /dev/null @@ -1,6 +0,0 @@ -import rclpy -from rosbag2_py import Recorder - -global episode_recorder - -episode_recorder = Recorder() \ No newline at end of file