Skip to content

Commit

Permalink
mavlink_ulog: parallel mavlink logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jnippula authored and jlaitine committed Oct 26, 2023
1 parent 79b9cbb commit bdd8196
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 27 deletions.
85 changes: 59 additions & 26 deletions src/modules/mavlink/mavlink_ulog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ MavlinkULog::MavlinkULog(int datarate, float max_rate_factor, uint8_t target_sys

}

PX4_INFO("[Mavlink_ulog] Parallel logging supported");

while (_ulog_stream_acked_sub.update()) {

}

_waiting_for_initial_ack = true;
_last_sent_time = hrt_absolute_time(); //(ab)use this timestamp during initialization
_next_rate_check = _last_sent_time + _rate_calculation_delta_t;
Expand All @@ -84,7 +90,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel)
static_assert(sizeof(ulog_stream_s::data) == MAVLINK_MSG_LOGGING_DATA_FIELD_DATA_LEN,
"Invalid uorb ulog_stream.data length");
static_assert(sizeof(ulog_stream_s::data) == MAVLINK_MSG_LOGGING_DATA_ACKED_FIELD_DATA_LEN,
"Invalid uorb ulog_stream.data length");
"Invalid uorb ulog_stream_acked.data length");

if (_waiting_for_initial_ack) {
if (hrt_elapsed_time(&_last_sent_time) > 3e5) {
Expand All @@ -96,8 +102,10 @@ int MavlinkULog::handle_update(mavlink_channel_t channel)
}

// check if we're waiting for an ACK
bool check_for_updates = true;

if (_last_sent_time) {
bool check_for_updates = false;
check_for_updates = false;

if (_ack_received) {
_last_sent_time = 0;
Expand All @@ -113,7 +121,7 @@ int MavlinkULog::handle_update(mavlink_channel_t channel)
PX4_DEBUG("re-sending ulog mavlink message (try=%i)", _sent_tries);
_last_sent_time = hrt_absolute_time();

const ulog_stream_s &ulog_data = _ulog_stream_sub.get();
const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get();

mavlink_logging_data_acked_t msg;
msg.sequence = ulog_data.msg_sequence;
Expand All @@ -126,25 +134,58 @@ int MavlinkULog::handle_update(mavlink_channel_t channel)
}
}
}

if (!check_for_updates) {
return 0;
}
}

while ((_current_num_msgs < _max_num_messages) && (_ulog_stream_sub.updated() || _ulog_stream_acked_sub.updated())) {
if (_ulog_stream_sub.updated()) {
const unsigned last_generation = _ulog_stream_sub.get_last_generation();
_ulog_stream_sub.update();

if (_ulog_stream_sub.get_last_generation() != last_generation + 1) {
perf_count(_msg_missed_ulog_stream_perf);
}

const ulog_stream_s &ulog_data = _ulog_stream_sub.get();

if (ulog_data.timestamp > 0) {
if (ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) {
_sent_tries = 1;
_last_sent_time = hrt_absolute_time();
lock();
_wait_for_ack_sequence = ulog_data.msg_sequence;
_ack_received = false;
unlock();

mavlink_logging_data_acked_t msg;
msg.sequence = ulog_data.msg_sequence;
msg.length = ulog_data.length;
msg.first_message_offset = ulog_data.first_message_offset;
msg.target_system = _target_system;
msg.target_component = _target_component;
memcpy(msg.data, ulog_data.data, sizeof(msg.data));
mavlink_msg_logging_data_acked_send_struct(channel, &msg);

while ((_current_num_msgs < _max_num_messages) && _ulog_stream_sub.updated()) {
const unsigned last_generation = _ulog_stream_sub.get_last_generation();
_ulog_stream_sub.update();
} else {
mavlink_logging_data_t msg;
msg.sequence = ulog_data.msg_sequence;
msg.length = ulog_data.length;
msg.first_message_offset = ulog_data.first_message_offset;
msg.target_system = _target_system;
msg.target_component = _target_component;
memcpy(msg.data, ulog_data.data, sizeof(msg.data));
mavlink_msg_logging_data_send_struct(channel, &msg);
}
}

if (_ulog_stream_sub.get_last_generation() != last_generation + 1) {
perf_count(_msg_missed_ulog_stream_perf);
++_current_num_msgs;
}

const ulog_stream_s &ulog_data = _ulog_stream_sub.get();
if (check_for_updates && _ulog_stream_acked_sub.updated()) {
_ulog_stream_acked_sub.update();

const ulog_stream_s &ulog_data = _ulog_stream_acked_sub.get();

if (ulog_data.timestamp > 0) {
if (ulog_data.flags & ulog_stream_s::FLAGS_NEED_ACK) {
if (ulog_data.timestamp > 0) {
_sent_tries = 1;
_last_sent_time = hrt_absolute_time();
lock();
Expand All @@ -161,19 +202,10 @@ int MavlinkULog::handle_update(mavlink_channel_t channel)
memcpy(msg.data, ulog_data.data, sizeof(msg.data));
mavlink_msg_logging_data_acked_send_struct(channel, &msg);

} else {
mavlink_logging_data_t msg;
msg.sequence = ulog_data.msg_sequence;
msg.length = ulog_data.length;
msg.first_message_offset = ulog_data.first_message_offset;
msg.target_system = _target_system;
msg.target_component = _target_component;
memcpy(msg.data, ulog_data.data, sizeof(msg.data));
mavlink_msg_logging_data_send_struct(channel, &msg);
}
}

++_current_num_msgs;
++_current_num_msgs;
}
}

//need to update the rate?
Expand Down Expand Up @@ -247,6 +279,7 @@ void MavlinkULog::handle_ack(mavlink_logging_ack_t ack)
lock();

if (_instance) { // make sure stop() was not called right before
//PX4_INFO("<< %u (wait: %u)", ack.sequence, _wait_for_ack_sequence);
if (_wait_for_ack_sequence == ack.sequence) {
_ack_received = true;
publish_ack(ack.sequence);
Expand Down
4 changes: 3 additions & 1 deletion src/modules/mavlink/mavlink_ulog.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ class MavlinkULog
static constexpr hrt_abstime _rate_calculation_delta_t = 100_ms; ///< rate update interval

uORB::SubscriptionData<ulog_stream_s> _ulog_stream_sub{ORB_ID(ulog_stream)};
uORB::Publication<ulog_stream_ack_s> _ulog_stream_ack_pub{ORB_ID(ulog_stream_ack)};
uORB::SubscriptionData<ulog_stream_s> _ulog_stream_acked_sub {ORB_ID(ulog_stream_acked)};

uORB::Publication<ulog_stream_ack_s> _ulog_stream_ack_pub {ORB_ID(ulog_stream_ack)};
uint16_t _wait_for_ack_sequence;
uint8_t _sent_tries = 0;
volatile bool _ack_received = false; ///< set to true if a matching ack received
Expand Down

0 comments on commit bdd8196

Please sign in to comment.