Skip to content

Commit

Permalink
Progress update.
Browse files Browse the repository at this point in the history
Added an out topic to link entries so one can specify an out topic name if
the8y want. Also added code so that all topics don't go to all robots.
  • Loading branch information
kbrowne15 committed Dec 27, 2023
1 parent 2b2210b commit ff272da
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 62 deletions.
9 changes: 6 additions & 3 deletions astrobee/config/communications/comms_bridge.config
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ links = {
-- and relay_both (to be relayed in both directions). Providing all three fields gives the user
-- full directional control while minimizing repetition and copy/paste errors.
relay_forward = {
{name = "mgt/executive/agent_state"},
{in_topic = "beh/inspection/feedback"},
{in_topic = "beh/inspection/result"},
{in_topic = "beh/inspection/state"},
},
relay_backward = {
{name = "dummy1"},
{in_topic = "bumble/beh/inspection/cancel", out_topic = "beh/inspection/cancel"},
{in_topic = "bumble/beh/inspection/goal", out_topic = "beh/inspection/goal"},
},
relay_both = {
{name = "dummy2"},
{in_topic = "mgt/executive/agent_state"},
},
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,21 @@ class GenericROSSubRapidPub : public BridgeSubscriber {
void InitializeDDS(std::vector<std::string> const& connections);

// Called with the mutex held
virtual void subscribeTopic(std::string const& in_topic, const RelayTopicInfo& info);
virtual void subscribeTopic(std::string const& in_topic,
const RelayTopicInfo& info);

// Called with the mutex held
virtual void advertiseTopic(const RelayTopicInfo& info);

// Called with the mutex held
virtual void relayMessage(const RelayTopicInfo& topic_info, ContentInfo const& content_info);
virtual void relayMessage(const RelayTopicInfo& topic_info,
ContentInfo const& content_info);

private:
bool dds_initialized_;

std::map<std::string, std::vector<std::pair<std::string, std::string>>> topic_mapping_;
std::map<std::string, GenericRapidPubPtr> robot_connections_;
std::string robot_name_;
};

} // end namespace ff
Expand Down
65 changes: 33 additions & 32 deletions communications/comms_bridge/src/comms_bridge_nodelet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
#include <stdio.h>
#include <getopt.h>

#include <map>
#include <string>
#include <utility>
#include <vector>

#include "comms_bridge/generic_rapid_msg_ros_pub.h"
Expand Down Expand Up @@ -222,14 +224,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
ros_pub_.get());
content_rapid_subs_.push_back(content_sub);
}

std::string ns = std::string("/") + agent_name_ + "/";
ns[1] = std::tolower(ns[1]); // namespaces don't start with upper case

for (size_t i = 0; i < topics_sub_.size(); i++) {
ROS_INFO_STREAM("Initialize DDS topic sub: " << topics_sub_[i]);
ros_sub_.addTopic(topics_sub_[i], (ns + topics_sub_[i]));
}
ros_sub_.AddTopics(link_entries_);
}

bool ReadParams() {
Expand All @@ -255,34 +250,38 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
return false;
}

std::string ns = std::string("/") + agent_name_ + "/";
ns[1] = std::tolower(ns[1]); // namespaces don't start with upper case
ROS_INFO_STREAM("Read Params numebr of links: " << links.GetSize());
for (int i = 1; i <= links.GetSize(); i++) {
if (!links.GetTable(i, &link)) {
NODELET_ERROR("Comms Bridge Nodelet: Could read link table row %i", i);
continue;
}
std::string config_agent;
std::string config_agent, connection_agent;
ROS_INFO_STREAM("Link " << i << " from " << link.GetStr("from", &config_agent));
ROS_INFO_STREAM("Link " << i << " to " << link.GetStr("to", &config_agent));
if (link.GetStr("from", &config_agent) && config_agent == agent_name_) {
AddRapidConnections(link, "to");
AddTableToSubs(link, "relay_forward");
AddTableToSubs(link, "relay_both");
if (AddRapidConnections(link, "to", connection_agent)) {
AddTableToSubs(link, "relay_forward", ns, connection_agent);
AddTableToSubs(link, "relay_both", ns, connection_agent);
}
} else if (link.GetStr("to", &config_agent) && config_agent == agent_name_) {
AddRapidConnections(link, "from");
AddTableToSubs(link, "relay_backward");
AddTableToSubs(link, "relay_both");
if (AddRapidConnections(link, "from", connection_agent)) {
AddTableToSubs(link, "relay_backward", ns, connection_agent);
AddTableToSubs(link, "relay_both", ns, connection_agent);
}
}
}
return true;
}

void AddRapidConnections(config_reader::ConfigReader::Table &link_table,
std::string direction) {
std::string connection;
bool AddRapidConnections(config_reader::ConfigReader::Table &link_table,
std::string const& direction,
std::string &connection) {
if (!link_table.GetStr(direction.c_str(), &connection)) {
NODELET_ERROR("Comms Bridge Nodelet: %s not specified for one link", direction.c_str());
return;
return false;
}

// This should be very quick since we shouldn't have more than 2 connections
Expand All @@ -296,30 +295,31 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
if (!found) {
rapid_connections_.push_back(connection);
}
return true;
}

void AddTableToSubs(config_reader::ConfigReader::Table &link_table,
std::string table_name) {
std::string table_name,
std::string const& current_robot_ns,
std::string const& connection_robot) {
config_reader::ConfigReader::Table relay_table, relay_item;
std::string topic_name;
std::string in_topic, out_topic;
if (link_table.GetTable(table_name.c_str(), &relay_table)) {
for (size_t i = 1; i <= relay_table.GetSize(); i++) {
relay_table.GetTable(i, &relay_item);
if (!relay_item.GetStr("name", &topic_name)) {
NODELET_ERROR("Comms Bridge Nodelet: Agent topic name not specified!");
if (!relay_item.GetStr("in_topic", &in_topic)) {
NODELET_ERROR("Comms Bridge Nodelet: In topic not specified!");
continue;
}

bool found = false;
for (size_t i = 0; i < topics_sub_.size() && !found; i++) {
if (topic_name == topics_sub_[i]) {
found = true;
}
if (!relay_item.GetStr("out_topic", &out_topic)) {
out_topic = current_robot_ns + in_topic;
}

if (!found) {
topics_sub_.push_back(topic_name);
}
// Save all output topics under the same in topic since we don't want
// to subscribe to the same topic multiple times
link_entries_[in_topic].push_back(std::make_pair(connection_robot,
out_topic));
}
}
}
Expand All @@ -332,7 +332,8 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
std::shared_ptr<kn::DdsEntitiesFactorySvc> dds_entities_factory_;
std::shared_ptr<ff::GenericRapidMsgRosPub> ros_pub_;
std::string agent_name_, participant_name_;
std::vector<std::string> rapid_connections_, topics_sub_;
std::vector<std::string> rapid_connections_;
std::map<std::string, std::vector<std::pair<std::string, std::string>>> link_entries_;
};

PLUGINLIB_EXPORT_CLASS(comms_bridge::CommsBridgeNodelet, nodelet::Nodelet)
Expand Down
80 changes: 56 additions & 24 deletions communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ void GenericROSSubRapidPub::AddTopics(
}

void GenericROSSubRapidPub::InitializeDDS(std::vector<std::string> const& connections) {
// std::string robot_name;
std::string robot_name;
for (size_t i = 0; i < connections.size(); ++i) {
robot_name_ = connections[i];
GenericRapidPubPtr rapid_pub = std::make_shared<GenericRapidPub>(robot_name_);
robot_connections_[robot_name_] = rapid_pub;
robot_name = connections[i];
GenericRapidPubPtr rapid_pub = std::make_shared<GenericRapidPub>(robot_name);
robot_connections_[robot_name] = rapid_pub;
}


dds_initialized_ = true;
}

Expand All @@ -66,38 +67,69 @@ void GenericROSSubRapidPub::subscribeTopic(std::string const& in_topic,
// Called with the mutex held
void GenericROSSubRapidPub::advertiseTopic(const RelayTopicInfo& relay_info) {
const AdvertisementInfo &info = relay_info.ad_info;
std::string out_topic = relay_info.out_topic;
std::string out_topic = relay_info.out_topic, robot_name, robot_out_topic;

ROS_ERROR("Received ros advertise topic for topic %s\n", out_topic.c_str());

// Check robot connection exists
if (robot_connections_.find(robot_name_) == robot_connections_.end()) {
ROS_ERROR("Comms Bridge: No connection for %s.\n", robot_name_.c_str());
// Make sure we recognize the topic
if (topic_mapping_.find(out_topic) == topic_mapping_.end()) {
ROS_ERROR("Comms Bridge: Output topic %s unknown in advertise topic.\n",
out_topic.c_str());
return;
}

robot_connections_[robot_name_]->ProcessAdvertisementInfo(out_topic,
info.latching,
info.data_type,
info.md5_sum,
info.definition);
for (size_t i = 0; i < topic_mapping_[out_topic].size(); ++i) {
robot_name = topic_mapping_[out_topic][i].first;
robot_out_topic = topic_mapping_[out_topic][i].second;

ROS_ERROR("Robot name: %s Robot out topic: %s\n", robot_name.c_str(), robot_out_topic.c_str());

// Check robot connection exists
if (robot_connections_.find(robot_name) == robot_connections_.end()) {
ROS_ERROR("Comms Bridge: No connection for %s.\n", robot_name.c_str());
continue;
}

robot_connections_[robot_name]->ProcessAdvertisementInfo(out_topic,
info.latching,
info.data_type,
info.md5_sum,
info.definition);
}
}

// Called with the mutex held
void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info, ContentInfo const& content_info) {
std::string out_topic = topic_info.out_topic;

void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info,
ContentInfo const& content_info) {
std::string out_topic = topic_info.out_topic, robot_name, robot_out_topic;
unsigned int size;
ROS_ERROR("Received ros content message for topic %s\n", out_topic.c_str());

// Check robot connection exists
if (robot_connections_.find(robot_name_) == robot_connections_.end()) {
ROS_ERROR("Comms Bridge: No connection for %s.\n", robot_name_.c_str());
// Make sure we recognize the topic
if (topic_mapping_.find(out_topic) == topic_mapping_.end()) {
ROS_ERROR("Comms Bridge: Output topic %s unknown in relay message.\n",
out_topic.c_str());
return;
}

robot_connections_[robot_name_]->ProcessContent(out_topic,
content_info.type_md5_sum,
content_info.data,
content_info.data_size,
topic_info.relay_seqnum);
for (size_t i = 0; i < topic_mapping_[out_topic].size(); ++i) {
robot_name = topic_mapping_[out_topic][i].first;
robot_out_topic = topic_mapping_[out_topic][i].second;

ROS_ERROR("Robot name: %s Robot out topic: %s\n", robot_name.c_str(), robot_out_topic.c_str());

// Check robot connection exists
if (robot_connections_.find(robot_name) == robot_connections_.end()) {
ROS_ERROR("Comms Bridge: No connection for %s.\n", robot_name.c_str());
continue;
}

robot_connections_[robot_name]->ProcessContent(out_topic,
content_info.type_md5_sum,
content_info.data,
content_info.data_size,
topic_info.relay_seqnum);
}
}

} // end namespace ff

0 comments on commit ff272da

Please sign in to comment.