Skip to content

Commit

Permalink
Instrument client/service for end-to-end request/response tracking
Browse files Browse the repository at this point in the history
Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard-apexai committed Nov 23, 2024
1 parent 92f0925 commit 5440d55
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 12 deletions.
14 changes: 14 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
#include "rcutils/types/uint8_array.h"
#include "rcpputils/thread_safety_annotations.hpp"

#include "tracetools/tracetools.h"

/******************************************************************************
* General helpers and utilities.
******************************************************************************/
Expand Down Expand Up @@ -581,6 +583,9 @@ class RMW_Connext_Client
RMW_Connext_Subscriber * reply_sub;
std::atomic_uint next_request_id;
rmw_context_impl_t * ctx;
#ifndef TRACETOOLS_DISABLED
const rmw_client_t * rmw_client;
#endif // TRACETOOLS_DISABLED

RMW_Connext_Client()
: request_pub(nullptr),
Expand All @@ -597,6 +602,9 @@ class RMW_Connext_Client
DDS_Publisher * const pub,
DDS_Subscriber * const sub,
const rosidl_service_type_support_t * const type_supports,
#ifndef TRACETOOLS_DISABLED
const rmw_client_t * const rmw_client,
#endif // TRACETOOLS_DISABLED
const char * const svc_name,
const rmw_qos_profile_t * const qos_policies);

Expand Down Expand Up @@ -649,6 +657,9 @@ class RMW_Connext_Service
RMW_Connext_Publisher * reply_pub;
RMW_Connext_Subscriber * request_sub;
rmw_context_impl_t * ctx;
#ifndef TRACETOOLS_DISABLED
const rmw_service_t * rmw_service;
#endif // TRACETOOLS_DISABLED

public:
static RMW_Connext_Service *
Expand All @@ -658,6 +669,9 @@ class RMW_Connext_Service
DDS_Publisher * const pub,
DDS_Subscriber * const sub,
const rosidl_service_type_support_t * const type_supports,
#ifndef TRACETOOLS_DISABLED
const rmw_service_t * const rmw_service,
#endif // TRACETOOLS_DISABLED
const char * const svc_name,
const rmw_qos_profile_t * const qos_policies);

Expand Down
51 changes: 51 additions & 0 deletions rmw_connextdds_common/src/common/rmw_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2411,6 +2411,9 @@ RMW_Connext_Client::create(
DDS_Publisher * const pub,
DDS_Subscriber * const sub,
const rosidl_service_type_support_t * const type_supports,
#ifndef TRACETOOLS_DISABLED
const rmw_client_t * const rmw_client,
#endif // TRACETOOLS_DISABLED
const char * const svc_name,
const rmw_qos_profile_t * const qos_policies)
{
Expand All @@ -2423,6 +2426,9 @@ RMW_Connext_Client::create(
}

client_impl->ctx = ctx;
#ifndef TRACETOOLS_DISABLED
client_impl->rmw_client = rmw_client;
#endif // TRACETOOLS_DISABLED

auto scope_exit_client_impl_delete = rcpputils::make_scope_exit(
[client_impl]()
Expand Down Expand Up @@ -2692,6 +2698,13 @@ RMW_Connext_Client::take_response(
rr_msg.sn)
}

TRACETOOLS_TRACEPOINT(
rmw_take_response,
static_cast<const void *>(this->rmw_client),
static_cast<const void *>(ros_response),
request_header->request_id.sequence_number,
request_header->source_timestamp,
*taken);
return RMW_RET_OK;
}

Expand Down Expand Up @@ -2734,10 +2747,28 @@ RMW_Connext_Client::send_request(
return RMW_RET_ERROR;
}

#ifndef TRACETOOLS_DISABLED
// In this case, we can get the sequence number before the write() call
if (this->ctx->request_reply_mapping == RMW_Connext_RequestReplyMapping::Basic) {
TRACETOOLS_TRACEPOINT(
rmw_send_request,
static_cast<const void *>(this->rmw_client),
static_cast<const void *>(ros_request),
*sequence_id);
}
#endif // TRACETOOLS_DISABLED

rmw_ret_t rc = this->request_pub->write(&rr_msg, false /* serialized */, &write_params);

if (this->ctx->request_reply_mapping != RMW_Connext_RequestReplyMapping::Basic) {
*sequence_id = write_params.sequence_number;

// In this other case, we can only get the sequence number after the write() call
TRACETOOLS_TRACEPOINT(
rmw_send_request,
static_cast<const void *>(this->rmw_client),
static_cast<const void *>(ros_request),
*sequence_id);
}

RMW_CONNEXT_LOG_DEBUG_A(
Expand Down Expand Up @@ -2809,6 +2840,9 @@ RMW_Connext_Service::create(
DDS_Publisher * const pub,
DDS_Subscriber * const sub,
const rosidl_service_type_support_t * const type_supports,
#ifndef TRACETOOLS_DISABLED
const rmw_service_t * const rmw_service,
#endif // TRACETOOLS_DISABLED
const char * const svc_name,
const rmw_qos_profile_t * const qos_policies)
{
Expand All @@ -2821,6 +2855,9 @@ RMW_Connext_Service::create(
}

svc_impl->ctx = ctx;
#ifndef TRACETOOLS_DISABLED
svc_impl->rmw_service = rmw_service;
#endif // TRACETOOLS_DISABLED

auto scope_exit_svc_impl_delete = rcpputils::make_scope_exit(
[svc_impl]()
Expand Down Expand Up @@ -2998,6 +3035,13 @@ RMW_Connext_Service::take_request(
rr_msg.sn)
}

TRACETOOLS_TRACEPOINT(
rmw_take_request,
static_cast<const void *>(this->rmw_service),
static_cast<const void *>(ros_request),
request_header->request_id.writer_guid,
request_header->request_id.sequence_number,
*taken);
return RMW_RET_OK;
}

Expand Down Expand Up @@ -3034,6 +3078,13 @@ RMW_Connext_Service::send_response(
reinterpret_cast<const uint32_t *>(rr_msg.gid.data)[2],
reinterpret_cast<const uint32_t *>(rr_msg.gid.data)[3],
rr_msg.sn)
TRACETOOLS_TRACEPOINT(
rmw_send_response,
static_cast<const void *>(this->rmw_service),
static_cast<const void *>(ros_response),
request_id->writer_guid,
request_id->sequence_number,
dds_time_to_u64(&write_params.timestamp));

return this->reply_pub->write(&rr_msg, false /* serialized */, &write_params);
}
Expand Down
36 changes: 24 additions & 12 deletions rmw_connextdds_common/src/common/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#include "rmw/validate_full_topic_name.h"

#include "tracetools/tracetools.h"

/******************************************************************************
* Clients/Servers functions
******************************************************************************/
Expand Down Expand Up @@ -166,13 +168,22 @@ rmw_api_connextdds_create_client(
rmw_context_impl_t * ctx = node->context->impl;
std::lock_guard<std::mutex> guard(ctx->endpoint_mutex);

rmw_client_t * rmw_client = rmw_client_allocate();
if (nullptr == rmw_client) {
RMW_CONNEXT_LOG_ERROR_SET("failed to create RMW client")
return nullptr;
}

RMW_Connext_Client * const client_impl =
RMW_Connext_Client::create(
ctx,
ctx->participant,
ctx->dds_pub,
ctx->dds_sub,
type_supports,
#ifndef TRACETOOLS_DISABLED
rmw_client,
#endif // TRACETOOLS_DISABLED
service_name,
&adapted_qos_policies);

Expand All @@ -191,12 +202,6 @@ rmw_api_connextdds_create_client(
delete client_impl;
});

rmw_client_t * rmw_client = rmw_client_allocate();
if (nullptr == rmw_client) {
RMW_CONNEXT_LOG_ERROR_SET("failed to create RMW client")
return nullptr;
}

rmw_client->implementation_identifier = RMW_CONNEXTDDS_ID;
rmw_client->data = client_impl;
const size_t svc_name_len = strlen(service_name) + 1;
Expand Down Expand Up @@ -224,6 +229,10 @@ rmw_api_connextdds_create_client(
}

scope_exit_client_impl_delete.cancel();
TRACETOOLS_TRACEPOINT(
rmw_client_init,
static_cast<const void *>(rmw_client),
client_impl->gid().data);
return rmw_client;
}

Expand Down Expand Up @@ -382,13 +391,22 @@ rmw_api_connextdds_create_service(
rmw_context_impl_t * ctx = node->context->impl;
std::lock_guard<std::mutex> guard(ctx->endpoint_mutex);

rmw_service_t * rmw_service = rmw_service_allocate();
if (nullptr == rmw_service) {
RMW_CONNEXT_LOG_ERROR_SET("failed to create RMW service")
return nullptr;
}

RMW_Connext_Service * const svc_impl =
RMW_Connext_Service::create(
ctx,
ctx->participant,
ctx->dds_pub,
ctx->dds_sub,
type_supports,
#ifndef TRACETOOLS_DISABLED
rmw_service,
#endif // TRACETOOLS_DISABLED
service_name,
&adapted_qos_policies);

Expand All @@ -407,12 +425,6 @@ rmw_api_connextdds_create_service(
delete svc_impl;
});

rmw_service_t * rmw_service = rmw_service_allocate();
if (nullptr == rmw_service) {
RMW_CONNEXT_LOG_ERROR_SET("failed to create RMW service")
return nullptr;
}

rmw_service->implementation_identifier = RMW_CONNEXTDDS_ID;
rmw_service->data = svc_impl;
const size_t svc_name_len = strlen(service_name) + 1;
Expand Down

0 comments on commit 5440d55

Please sign in to comment.