From 7f41dc10b6d011a48e4423cfeba89284dbc7e4fa Mon Sep 17 00:00:00 2001 From: Jari Nippula Date: Thu, 12 Oct 2023 09:50:19 +0300 Subject: [PATCH] logger: parallel mavlink logging --- src/modules/logger/CMakeLists.txt | 4 + src/modules/logger/Kconfig | 7 + src/modules/logger/log_writer.cpp | 13 +- src/modules/logger/log_writer.h | 23 +- src/modules/logger/log_writer_mavlink.cpp | 234 ++++++++++++++++-- src/modules/logger/log_writer_mavlink.h | 52 +++- src/modules/logger/logger.cpp | 156 +++++++++++- src/modules/logger/logger.h | 26 +- .../logger/parallel_mavlink_logging.md | 118 +++++++++ 9 files changed, 603 insertions(+), 30 deletions(-) create mode 100644 src/modules/logger/parallel_mavlink_logging.md diff --git a/src/modules/logger/CMakeLists.txt b/src/modules/logger/CMakeLists.txt index ec07d9adae95..477c87df976e 100644 --- a/src/modules/logger/CMakeLists.txt +++ b/src/modules/logger/CMakeLists.txt @@ -30,6 +30,9 @@ # POSSIBILITY OF SUCH DAMAGE. # ############################################################################ +if(CONFIG_LOGGER_PARALLEL_LOGGING) + set(LOGGER_PARALLEL_COMPILE_FLAG "-DLOGGER_PARALLEL_LOGGING") +endif() px4_add_module( MODULE modules__logger @@ -38,6 +41,7 @@ px4_add_module( COMPILE_FLAGS ${MAX_CUSTOM_OPT_LEVEL} -Wno-cast-align # TODO: fix and enable + ${LOGGER_PARALLEL_COMPILE_FLAG} SRCS logged_topics.cpp logger.cpp diff --git a/src/modules/logger/Kconfig b/src/modules/logger/Kconfig index db2c5a75460e..519afccd190c 100644 --- a/src/modules/logger/Kconfig +++ b/src/modules/logger/Kconfig @@ -10,3 +10,10 @@ menuconfig USER_LOGGER depends on BOARD_PROTECTED && MODULES_LOGGER ---help--- Put logger in userspace memory + +menuconfig LOGGER_PARALLEL_LOGGING + bool "Custom mavlink logging protocol in logger" + default n + depends on MODULES_LOGGER + ---help--- + Utilize custom mavlink logging protocol for speed up logging start phase diff --git a/src/modules/logger/log_writer.cpp b/src/modules/logger/log_writer.cpp index 8511f9e92a3d..402bc888486f 100644 --- a/src/modules/logger/log_writer.cpp +++ b/src/modules/logger/log_writer.cpp @@ -158,7 +158,7 @@ void LogWriter::thread_stop() } } -int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start) +int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start, bool reliable, bool wait) { int ret_file = 0, ret_mavlink = 0; @@ -167,7 +167,16 @@ int LogWriter::write_message(LogType type, void *ptr, size_t size, uint64_t drop } if (_log_writer_mavlink_for_write && type == LogType::Full) { - ret_mavlink = _log_writer_mavlink_for_write->write_message(ptr, size); +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { + ret_mavlink = _log_writer_mavlink_for_write->write_reliable_message(ptr, size, wait); + + } else +#endif + { + ret_mavlink = _log_writer_mavlink_for_write->write_message(ptr, size); + } } // file backend errors takes precedence diff --git a/src/modules/logger/log_writer.h b/src/modules/logger/log_writer.h index 9635d22dadf0..59ec75bc9945 100644 --- a/src/modules/logger/log_writer.h +++ b/src/modules/logger/log_writer.h @@ -73,6 +73,19 @@ class LogWriter void stop_log_mavlink(); +#ifdef LOGGER_PARALLEL_LOGGING + void wait_fifo_empty() + { + if (_log_writer_mavlink) { + _log_writer_mavlink->wait_fifo_count(0); + + while (_log_writer_mavlink->reliable_fifo_is_sending()) { + usleep(10000); + } + } + } +#endif + /** * whether logging is currently active or not (any of the selected backends). */ @@ -90,7 +103,8 @@ class LogWriter * -1 if not enough space in the buffer left (file backend), -2 mavlink backend failed * add type -> pass through, but not to mavlink if mission log */ - int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0); + int write_message(LogType type, void *ptr, size_t size, uint64_t dropout_start = 0, bool reliable = false, + bool wait = false); /** * Select a backend, so that future calls to write_message() only write to the selected @@ -98,7 +112,7 @@ class LogWriter * @param backend */ void select_write_backend(Backend sel_backend); - void unselect_write_backend() { select_write_backend(BackendAll); } + void unselect_write_backend() { select_write_backend(_backend); } /* file logging methods */ @@ -154,7 +168,11 @@ class LogWriter { if (_log_writer_file) { _log_writer_file->set_need_reliable_transfer(need_reliable); } +#ifndef LOGGER_PARALLEL_LOGGING + if (_log_writer_mavlink) { _log_writer_mavlink->set_need_reliable_transfer(need_reliable && mavlink_backed_too); } + +#endif } bool need_reliable_transfer() const @@ -165,7 +183,6 @@ class LogWriter return false; } - #if defined(PX4_CRYPTO) void set_encryption_parameters(px4_crypto_algorithm_t algorithm, uint8_t key_idx, uint8_t exchange_key_idx) { diff --git a/src/modules/logger/log_writer_mavlink.cpp b/src/modules/logger/log_writer_mavlink.cpp index 5322aef074e0..971c6913a5c4 100644 --- a/src/modules/logger/log_writer_mavlink.cpp +++ b/src/modules/logger/log_writer_mavlink.cpp @@ -48,18 +48,142 @@ namespace logger LogWriterMavlink::LogWriterMavlink() { _ulog_stream_data.length = 0; +#ifdef LOGGER_PARALLEL_LOGGING + _ulog_stream_acked_data.length = 0; + pthread_mutex_init(&_fifo.mtx, nullptr); + pthread_cond_init(&_fifo.cv, nullptr); +#endif } +#ifdef LOGGER_PARALLEL_LOGGING +ReliableMsg *LogWriterMavlink::reliable_fifo_pop() +{ + pthread_mutex_lock(&_fifo.mtx); + PX4_DEBUG("reliable POP - wait.."); + + while (_fifo.empty() && !_fifo.sender_should_exit.load()) { + pthread_cond_wait(&_fifo.cv, &_fifo.mtx); + } + + PX4_DEBUG("reliable POP: signalled"); + ReliableMsg *node = _fifo.getHead(); + _fifo.remove(node); + + if (node) { + _fifo.sending = true; + } + + pthread_mutex_unlock(&_fifo.mtx); + return node; +} + +bool LogWriterMavlink::reliable_fifo_push(ReliableMsg *node) +{ + if (!node) { + PX4_ERR("[reliable_fifo_push] nullptr"); + return false; + } + + pthread_mutex_lock(&_fifo.mtx); + _fifo.add(node); + PX4_DEBUG("reliable PUSH - signal sender"); + pthread_cond_signal(&_fifo.cv); + pthread_mutex_unlock(&_fifo.mtx); + return true; +} + +void LogWriterMavlink::reliable_fifo_set_sender_idle() +{ + pthread_mutex_lock(&_fifo.mtx); + _fifo.sending = false; + pthread_mutex_unlock(&_fifo.mtx); +} + +bool LogWriterMavlink::reliable_fifo_is_sending() +{ + bool sending; + pthread_mutex_lock(&_fifo.mtx); + sending = _fifo.sending; + pthread_mutex_unlock(&_fifo.mtx); + return sending; +} + +void LogWriterMavlink::wait_fifo_count(size_t count) +{ + while (reliable_fifo_count() > count) { + usleep(30000); + } +} + +size_t LogWriterMavlink::reliable_fifo_count() +{ + size_t count = 0; + pthread_mutex_lock(&_fifo.mtx); + count = _fifo.size(); + pthread_mutex_unlock(&_fifo.mtx); + return count; +} + +void *LogWriterMavlink::mav_reliable_sender_helper(void *context) +{ + px4_prctl(PR_SET_NAME, "log_writer_mav_reliable_sender", px4_getpid()); + static_cast(context)->mav_reliable_sender(); + return nullptr; +} + +void LogWriterMavlink::mav_reliable_sender() +{ + while (!_fifo.sender_should_exit.load()) { + ReliableMsg *msg = reliable_fifo_pop(); + + PX4_DEBUG("[sender] - msg:%p", msg); + + if (msg) { + write_message(msg->data, msg->len, true); + PX4_DEBUG("[sender] - delete msg"); + delete msg; + reliable_fifo_set_sender_idle(); + } + } +} +#endif + bool LogWriterMavlink::init() { +#ifdef LOGGER_PARALLEL_LOGGING + pthread_attr_t thr_attr; + pthread_attr_init(&thr_attr); + pthread_attr_setstacksize(&thr_attr, PX4_STACK_ADJUSTED(8500)); + PX4_INFO("create mav_reliable_sender_thread"); + int ret = pthread_create(&_mav_reliable_sender_thread, &thr_attr, &LogWriterMavlink::mav_reliable_sender_helper, this); + + if (ret) { + PX4_ERR("mav_reliable_sender_thread create failed: %d", ret); + } + + pthread_attr_destroy(&thr_attr); +#endif return true; } LogWriterMavlink::~LogWriterMavlink() { +#ifdef LOGGER_PARALLEL_LOGGING + pthread_mutex_lock(&_fifo.mtx); + _fifo.sender_should_exit.store(true); + pthread_cond_signal(&_fifo.cv); + pthread_mutex_unlock(&_fifo.mtx); + pthread_join(_mav_reliable_sender_thread, nullptr); +#endif + if (orb_sub_valid(_ulog_stream_ack_sub)) { orb_unsubscribe(_ulog_stream_ack_sub); } + +#ifdef LOGGER_PARALLEL_LOGGING + pthread_mutex_destroy(&_fifo.mtx); + pthread_cond_destroy(&_fifo.cv); +#endif } void LogWriterMavlink::start_log() @@ -76,37 +200,60 @@ void LogWriterMavlink::start_log() _ulog_stream_data.length = 0; _ulog_stream_data.first_message_offset = 0; +#ifdef LOGGER_PARALLEL_LOGGING + _ulog_stream_acked_data.msg_sequence = 0; + _ulog_stream_acked_data.length = 0; + _ulog_stream_acked_data.first_message_offset = 0; +#endif _is_started = true; } void LogWriterMavlink::stop_log() { _ulog_stream_data.length = 0; +#ifdef LOGGER_PARALLEL_LOGGING + _ulog_stream_acked_data.length = 0; +#endif _is_started = false; } -int LogWriterMavlink::write_message(void *ptr, size_t size) +int LogWriterMavlink::write_message(void *ptr, size_t size, bool reliable) { if (!is_started()) { return 0; } - const uint8_t data_len = (uint8_t)sizeof(_ulog_stream_data.data); + ulog_stream_s *ulog_s_p; + +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { + ulog_s_p = &_ulog_stream_acked_data; + + } else { + ulog_s_p = &_ulog_stream_data; + } + +#else + ulog_s_p = &_ulog_stream_data; +#endif + + const uint8_t data_len = (uint8_t)sizeof(ulog_s_p->data); uint8_t *ptr_data = (uint8_t *)ptr; - if (_ulog_stream_data.first_message_offset == 255) { - _ulog_stream_data.first_message_offset = _ulog_stream_data.length; + if (ulog_s_p->first_message_offset == 255) { + ulog_s_p->first_message_offset = ulog_s_p->length; } while (size > 0) { - size_t send_len = math::min((size_t)data_len - _ulog_stream_data.length, size); - memcpy(_ulog_stream_data.data + _ulog_stream_data.length, ptr_data, send_len); - _ulog_stream_data.length += send_len; + size_t send_len = math::min((size_t)data_len - ulog_s_p->length, size); + memcpy(ulog_s_p->data + ulog_s_p->length, ptr_data, send_len); + ulog_s_p->length += send_len; ptr_data += send_len; size -= send_len; - if (_ulog_stream_data.length >= data_len) { - if (publish_message()) { + if (ulog_s_p->length >= data_len) { + if (publish_message(reliable)) { return -2; } } @@ -115,6 +262,30 @@ int LogWriterMavlink::write_message(void *ptr, size_t size) return 0; } +#ifdef LOGGER_PARALLEL_LOGGING +int LogWriterMavlink::write_reliable_message(void *ptr, size_t size, bool wait) +{ + if (wait) { + wait_fifo_count(LOGGER_RELIABLE_FIFO_WAIT_THRESHOLD); + } + + uint8_t *p = (uint8_t *) ptr; + + while (size > 0) { + size_t len = math::min(size, LOGGER_ULOG_STREAM_DATA_LEN); + size -= len; + ReliableMsg *msg = new ReliableMsg(); + memcpy(msg->data, p, len); + p += len; + msg->len = len; + reliable_fifo_push(msg); + } + + return 0; +} +#endif + +#ifndef LOGGER_PARALLEL_LOGGING void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) { if (!need_reliable && _need_reliable_transfer) { @@ -126,19 +297,46 @@ void LogWriterMavlink::set_need_reliable_transfer(bool need_reliable) _need_reliable_transfer = need_reliable; } +#endif -int LogWriterMavlink::publish_message() +int LogWriterMavlink::publish_message(bool reliable) { - _ulog_stream_data.timestamp = hrt_absolute_time(); - _ulog_stream_data.flags = 0; + ulog_stream_s *ulog_s_p; + +#ifdef LOGGER_PARALLEL_LOGGING + + if (reliable) { + ulog_s_p = &_ulog_stream_acked_data; + + } else { + ulog_s_p = &_ulog_stream_data; + } + +#else + ulog_s_p = &_ulog_stream_data; +#endif + + ulog_s_p->timestamp = hrt_absolute_time(); + ulog_s_p->flags = 0; + +#ifdef LOGGER_PARALLEL_LOGGING + + if (!reliable) { + _ulog_stream_pub.publish(*ulog_s_p); + + } else { + ulog_s_p->flags = ulog_s_p->FLAGS_NEED_ACK; + _ulog_stream_acked_pub.publish(*ulog_s_p); +#else if (_need_reliable_transfer) { - _ulog_stream_data.flags = _ulog_stream_data.FLAGS_NEED_ACK; + ulog_s_p->flags = ulog_s_p->FLAGS_NEED_ACK; } - _ulog_stream_pub.publish(_ulog_stream_data); + _ulog_stream_pub.publish(*ulog_s_p); if (_need_reliable_transfer) { +#endif // we need to wait for an ack. Note that this blocks the main logger thread, so if a file logging // is already running, it will miss samples. px4_pollfd_struct_t fds[1]; @@ -160,7 +358,7 @@ int LogWriterMavlink::publish_message() ulog_stream_ack_s ack; orb_copy(ORB_ID(ulog_stream_ack), _ulog_stream_ack_sub, &ack); - if (ack.msg_sequence == _ulog_stream_data.msg_sequence) { + if (ack.msg_sequence == ulog_s_p->msg_sequence) { got_ack = true; } @@ -178,9 +376,9 @@ int LogWriterMavlink::publish_message() PX4_DEBUG("got ack in %i ms", (int)(hrt_elapsed_time(&started) / 1000)); } - _ulog_stream_data.msg_sequence++; - _ulog_stream_data.length = 0; - _ulog_stream_data.first_message_offset = 255; + ulog_s_p->msg_sequence++; + ulog_s_p->length = 0; + ulog_s_p->first_message_offset = 255; return 0; } diff --git a/src/modules/logger/log_writer_mavlink.h b/src/modules/logger/log_writer_mavlink.h index 8aabe2d79235..a3722dbdb64e 100644 --- a/src/modules/logger/log_writer_mavlink.h +++ b/src/modules/logger/log_writer_mavlink.h @@ -37,12 +37,37 @@ #include #include #include +#include + +#ifdef LOGGER_PARALLEL_LOGGING +static constexpr size_t LOGGER_ULOG_STREAM_DATA_LEN {249}; // Size of ulog_stream data buffer +static constexpr size_t LOGGER_RELIABLE_FIFO_WAIT_THRESHOLD{10}; // Msg count threshold for wait fifo trigger +#endif namespace px4 { namespace logger { +#ifdef LOGGER_PARALLEL_LOGGING +class ReliableMsg : public ListNode +{ +public: + uint16_t len; + uint8_t data[LOGGER_ULOG_STREAM_DATA_LEN]; +}; + +class ReliableFifo : public List +{ +public: + pthread_mutex_t mtx; + pthread_cond_t cv; + px4::atomic_bool sender_should_exit{false}; + bool sending{false}; +}; + +#endif + /** * @class LogWriterMavlink * Writes logging data to uORB, and then sent via mavlink @@ -62,9 +87,14 @@ class LogWriterMavlink bool is_started() const { return _is_started; } /** @see LogWriter::write_message() */ - int write_message(void *ptr, size_t size); - + int write_message(void *ptr, size_t size, bool reliable = false); +#ifdef LOGGER_PARALLEL_LOGGING + int write_reliable_message(void *ptr, size_t size, bool wait = false); + bool reliable_fifo_is_sending(); + void wait_fifo_count(size_t count); +#else void set_need_reliable_transfer(bool need_reliable); +#endif bool need_reliable_transfer() const { @@ -72,15 +102,31 @@ class LogWriterMavlink } private: +#ifdef LOGGER_PARALLEL_LOGGING + static void *mav_reliable_sender_helper(void *context); + void mav_reliable_sender(); + + ReliableMsg *reliable_fifo_pop(); + bool reliable_fifo_push(ReliableMsg *node); + void reliable_fifo_set_sender_idle(); + + size_t reliable_fifo_count(); +#endif /** publish message, wait for ack if needed & reset message */ - int publish_message(); + int publish_message(bool reliable = false); ulog_stream_s _ulog_stream_data{}; uORB::Publication _ulog_stream_pub{ORB_ID(ulog_stream)}; orb_sub_t _ulog_stream_ack_sub{ORB_SUB_INVALID}; bool _need_reliable_transfer{false}; bool _is_started{false}; +#ifdef LOGGER_PARALLEL_LOGGING + ulog_stream_s _ulog_stream_acked_data {}; + uORB::Publication _ulog_stream_acked_pub{ORB_ID(ulog_stream_acked)}; + ReliableFifo _fifo; + pthread_t _mav_reliable_sender_thread = 0; +#endif }; } diff --git a/src/modules/logger/logger.cpp b/src/modules/logger/logger.cpp index 50753c1e697f..18aaee5893a3 100644 --- a/src/modules/logger/logger.cpp +++ b/src/modules/logger/logger.cpp @@ -64,6 +64,7 @@ #include #include #include +#include //#define DBGPRINT //write status output every few seconds @@ -400,6 +401,18 @@ Logger::Logger(LogWriter::Backend backend, size_t buffer_size, uint32_t log_inte PX4_ERR("Failed to find topic %s", poll_topic_name); } } + +#ifdef LOGGER_PARALLEL_LOGGING + + if (pthread_key_create(&pthread_data_key, NULL) < 0) { + PX4_ERR("Creating pthread data key failed"); + pthread_setspecific(pthread_data_key, (void *)nullptr); + + } else { + pthread_setspecific(pthread_data_key, (void *)&_thread_main_data); + } + +#endif } Logger::~Logger() @@ -593,7 +606,7 @@ void Logger::run() return; } - //all topics added. Get required message buffer size + //Get required message buffer size int max_msg_size = 0; for (int sub = 0; sub < _num_subscriptions; ++sub) { @@ -1187,11 +1200,11 @@ void Logger::handle_vehicle_command_update() } } -bool Logger::write_message(LogType type, void *ptr, size_t size) +bool Logger::write_message(LogType type, void *ptr, size_t size, bool reliable, bool wait) { Statistics &stats = _statistics[(int)type]; - if (_writer.write_message(type, ptr, size, stats.dropout_start) != -1) { + if (_writer.write_message(type, ptr, size, stats.dropout_start, reliable, wait) != -1) { if (stats.dropout_start) { float dropout_duration = (float)(hrt_elapsed_time(&stats.dropout_start) / 1000) / 1.e3f; @@ -1455,6 +1468,35 @@ void Logger::stop_log_file(LogType type) _writer.stop_log_file(type); } +#ifdef LOGGER_PARALLEL_LOGGING +void *Logger::mav_start_steps_helper(void *context) +{ + px4_prctl(PR_SET_NAME, "log_writer_mavlink_headers", px4_getpid()); + static_cast(context)->mav_start_steps(); + return nullptr; +} + +void Logger::mav_start_steps() +{ + /* This is running in separate thread to keep logging data while sending header&descriptions */ + _thread_mav_start_sender_data.wait_for_ack = true; + pthread_setspecific(pthread_data_key, (void *)&_thread_mav_start_sender_data); + + PX4_INFO("Write static data - Begin"); + write_header(LogType::Full); + write_version(LogType::Full); + write_formats(LogType::Full); + write_parameters(LogType::Full); + write_parameter_defaults(LogType::Full); + write_perf_data(PrintLoadReason::Preflight); + write_console_output(); + write_events_file(LogType::Full); + write_excluded_optional_topics(LogType::Full); + write_all_add_logged_msg(LogType::Full); + PX4_INFO("Write static data - End"); +} +#endif + void Logger::start_log_mavlink() { if (!can_start_mavlink_log()) { @@ -1472,6 +1514,33 @@ void Logger::start_log_mavlink() _writer.start_log_mavlink(); _writer.select_write_backend(LogWriter::BackendMavlink); + +#ifdef LOGGER_PARALLEL_LOGGING + + for (int sub = 0; sub < _num_subscriptions; ++sub) { + if (_subscriptions[sub].valid() && _subscriptions[sub].msg_id == MSG_ID_INVALID) { + if (_next_topic_id == MSG_ID_INVALID) { + // if we land here an uint8 is too small -> switch to uint16 + PX4_ERR("limit for _next_topic_id reached"); + return; + } + + _subscriptions[sub].msg_id = _next_topic_id++; + } + } + + pthread_attr_t thr_attr; + pthread_attr_init(&thr_attr); + pthread_attr_setstacksize(&thr_attr, PX4_STACK_ADJUSTED(8500)); + PX4_INFO("create mav_start_thread"); + int ret = pthread_create(&_mav_start_thread, &thr_attr, &Logger::mav_start_steps_helper, this); + + if (ret) { + PX4_WARN("mav_start_thread create failed: %d", ret); + } + + pthread_attr_destroy(&thr_attr); +#else _writer.set_need_reliable_transfer(true); write_header(LogType::Full); write_version(LogType::Full); @@ -1484,9 +1553,11 @@ void Logger::start_log_mavlink() write_excluded_optional_topics(LogType::Full); write_all_add_logged_msg(LogType::Full); _writer.set_need_reliable_transfer(false); +#endif _writer.unselect_write_backend(); _writer.notify(); + PX4_INFO("Mavlink logging started"); adjust_subscription_updates(); // redistribute updates as sending the header can take some time } @@ -1499,11 +1570,16 @@ void Logger::stop_log_mavlink() _writer.select_write_backend(LogWriter::BackendMavlink); _writer.set_need_reliable_transfer(true); write_perf_data(PrintLoadReason::Postflight); +#ifdef LOGGER_PARALLEL_LOGGING + _writer.wait_fifo_empty(); +#endif _writer.set_need_reliable_transfer(false); _writer.unselect_write_backend(); _writer.notify(); _writer.stop_log_mavlink(); } + + PX4_INFO("Mavlink logging stopped"); } struct perf_callback_data_t { @@ -1704,7 +1780,12 @@ void Logger::write_format(LogType type, const orb_metadata &meta, WrittenFormats size_t msg_size = sizeof(msg) - sizeof(msg.format) + format_len; msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &msg, msg_size, true, th_data->wait_for_ack); +#else write_message(type, &msg, msg_size); +#endif if (level > 1 && !written_formats.push_back(&meta)) { PX4_ERR("Array too small"); @@ -1852,7 +1933,12 @@ void Logger::write_add_logged_msg(LogType type, LoggerSubscription &subscription bool prev_reliable = _writer.need_reliable_transfer(); _writer.set_need_reliable_transfer(true); +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &msg, msg_size, true, th_data->wait_for_ack); +#else write_message(type, &msg, msg_size); +#endif _writer.set_need_reliable_transfer(prev_reliable); } @@ -1875,7 +1961,12 @@ void Logger::write_info(LogType type, const char *name, const char *value) msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } _writer.unlock(); @@ -1901,7 +1992,12 @@ void Logger::write_info_multiple(LogType type, const char *name, const char *val msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } else { PX4_ERR("info_multiple str too long (%" PRIu8 "), key=%s", msg.key_len, msg.key_value_str); @@ -1930,6 +2026,10 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd) int file_offset = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + while (file_offset < file_size) { _writer.lock(); @@ -1946,7 +2046,11 @@ void Logger::write_info_multiple(LogType type, const char *name, int fd) msg_size += read_length; msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif file_offset += ret; } else { @@ -1989,7 +2093,12 @@ void Logger::write_info_template(LogType type, const char *name, T value, const msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif _writer.unlock(); } @@ -2018,7 +2127,13 @@ void Logger::write_header(LogType type) header.magic[7] = 0x01; //file version 1 header.timestamp = hrt_absolute_time(); _writer.lock(); + +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); + write_message(type, &header, sizeof(header), true, th_data->wait_for_ack); +#else write_message(type, &header, sizeof(header)); +#endif // write the Flags message: this MUST be written right after the ulog header ulog_message_flag_bits_s flag_bits{}; @@ -2028,7 +2143,11 @@ void Logger::write_header(LogType type) flag_bits.msg_size = sizeof(flag_bits) - ULOG_MSG_HEADER_LEN; flag_bits.msg_type = static_cast(ULogMessageType::FLAG_BITS); +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, &flag_bits, sizeof(flag_bits), true, th_data->wait_for_ack); +#else write_message(type, &flag_bits, sizeof(flag_bits)); +#endif _writer.unlock(); } @@ -2125,6 +2244,9 @@ void Logger::write_parameter_defaults(LogType type) msg.msg_type = static_cast(ULogMessageType::PARAMETER_DEFAULT); int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif do { // skip over all parameters which are not used @@ -2187,20 +2309,32 @@ void Logger::write_parameter_defaults(LogType type) if (memcmp(&value, &default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], default_value, value_size); msg.default_types = ulog_parameter_default_type_t::current_setup | ulog_parameter_default_type_t::system; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } else { if (memcmp(&value, &default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], default_value, value_size); msg.default_types = ulog_parameter_default_type_t::current_setup; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } if (memcmp(&value, &system_default_value, value_size) != 0) { memcpy(&buffer[msg_size - value_size], system_default_value, value_size); msg.default_types = ulog_parameter_default_type_t::system; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } } @@ -2220,6 +2354,10 @@ void Logger::write_parameters(LogType type) int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + do { // skip over all parameters which are not used do { @@ -2271,7 +2409,11 @@ void Logger::write_parameters(LogType type) msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } while ((param != PARAM_INVALID) && (param_idx < (int) param_count())); @@ -2289,6 +2431,10 @@ void Logger::write_changed_parameters(LogType type) int param_idx = 0; param_t param = 0; +#ifdef LOGGER_PARALLEL_LOGGING + thread_data_t *th_data = (thread_data_t *) pthread_getspecific(pthread_data_key); +#endif + do { // skip over all parameters which are not used do { @@ -2342,7 +2488,11 @@ void Logger::write_changed_parameters(LogType type) // msg_size is now 1 (msg_type) + 2 (msg_size) + 1 (key_len) + key_len + value_size msg.msg_size = msg_size - ULOG_MSG_HEADER_LEN; +#ifdef LOGGER_PARALLEL_LOGGING + write_message(type, buffer, msg_size, true, th_data->wait_for_ack); +#else write_message(type, buffer, msg_size); +#endif } } while ((param != PARAM_INVALID) && (param_idx < (int) param_count())); diff --git a/src/modules/logger/logger.h b/src/modules/logger/logger.h index 65a9f1d179b3..e215401d0861 100644 --- a/src/modules/logger/logger.h +++ b/src/modules/logger/logger.h @@ -69,6 +69,14 @@ namespace px4 namespace logger { +#ifdef LOGGER_PARALLEL_LOGGING +typedef struct thread_data { + bool wait_for_ack{false}; +} thread_data_t; + +pthread_key_t pthread_data_key; +#endif + static constexpr uint8_t MSG_ID_INVALID = UINT8_MAX; struct LoggerSubscription : public uORB::SubscriptionInterval { @@ -221,6 +229,16 @@ class Logger : public ModuleBase, public ModuleParams void stop_log_mavlink(); +#ifdef LOGGER_PARALLEL_LOGGING + /** + * Run in separate thread to continue data logging while sending header&descriptions + */ + void mav_start_steps(); + + static void *mav_start_steps_helper(void *); + +#endif + /** check if mavlink logging can be started */ bool can_start_mavlink_log() const { @@ -290,7 +308,7 @@ class Logger : public ModuleBase, public ModuleParams * Must be called with _writer.lock() held. * @return true if data written, false otherwise (on overflow) */ - bool write_message(LogType type, void *ptr, size_t size); + bool write_message(LogType type, void *ptr, size_t size, bool reliable = false, bool wait = false); /** * Add topic subscriptions from SD file if it exists, otherwise add topics based on the configured profile. @@ -341,6 +359,12 @@ class Logger : public ModuleBase, public ModuleParams void adjust_subscription_updates(); +#ifdef LOGGER_PARALLEL_LOGGING + pthread_t _mav_start_thread {0}; + thread_data_t _thread_main_data; + thread_data_t _thread_mav_start_sender_data; +#endif + uint8_t *_msg_buffer{nullptr}; int _msg_buffer_len{0}; diff --git a/src/modules/logger/parallel_mavlink_logging.md b/src/modules/logger/parallel_mavlink_logging.md new file mode 100644 index 000000000000..d506a53aba65 --- /dev/null +++ b/src/modules/logger/parallel_mavlink_logging.md @@ -0,0 +1,118 @@ +# Parallel data logging over MAVLink + +### Problem +Starting flight logging over mavlink is a slow operation. In case logging is started from ARMING event the log file most likely have several seconds long missing trace period from the takeoff part. This is caused by the protocol that the log is first filled with header and all the static definitions, configuration, boot-log, etc. which are sent using *reliable_transfer*, meaning that every log message/package the px4 logger is sending needs to be acknowleged by the receiver. The round-trip time for this may be long depending on the MAVLink transfer media and the module receiving the log data in the other system behind the Mavlink. + +### Solution +To speed up logging startup and reduce that blackout period in the beginning of the log, the parallel data logging option is implemented. The trick here is that the actual uorb data logging is started as soon as possible when logging is started and the static definitions/configs are sent at the same time through another channel. The receiver end reads both streams and store them to two separate files and in the end of logging it combines them into one ulog file by appending the topic data file in the end of static definition data file. + + +This new protocol is not backward compatible, so BOTH px4 logger and the receiver MUST or MUST NOT implement the parallel logging to make it work! + + +### Original protocol +For Logger, there is one **ulog_stream** uorb channel for transfer data to receiver side and another **ulog_stream_ack** for receiveing ack events. First it collects all the static definitions and send them using *reliable_transfer* method. After static defs are sent it starts sending actual dynamic topic data. + +Mavlink_uorb module listen to **ulog_stream** topic and depending on FLAGS_NEED_ACK flag in the topic msg it sends either **MAVLINK_MSG_LOGGING_DATA** or **MAVLINK_MSG_LOGGING_DATA_ACKED** msg over mavlink. +If **MAVLINK_MSG_LOGGING_DATA_ACKED** is sent it starts waiting for **MAVLINK_MSG_LOGGING_DATA_ACK** and continue sending only after it receives the ack message. publish it to **ulog_stream_ack** + +The receiver reads **MAVLINK_MSG_LOGGING_DATA/_ACKED** messages and store them to ulg file. If **MAVLINK_MSG_LOGGING_DATA_ACKED** is received then **MAVLINK_MSG_LOGGING_DATA_ACK** is sent back to PX4. + +``` ++----------------------------------------------+ +| Logger | +| | +|----------------------------------------------| +| Static data | +| Dyn data | ++----------------------------------------------+ + | ^ + | Publish | Subs + | | + V | ++----------------------------------------------+ +| Mavlink_ulog | +| | +| | +| | ++----------------------------------------------+ + | ^ + Send | +MAVLINK_MSG_LOGGING_DATA Recv +MAVLINK_MSG_LOGGING_DATA_ACKED MAVLINK_MSG_ + | LOGGING_DATA_ACK + V | ++----------------------------------------------+ +| Receiver | +|----------------------------------------------| +| Static data | +| Dyn data | ++----------------------------------------------+ + | + | + | + V ++------------+ +| .ulg file | ++------------+ + +``` + + +### Parallel logging protocol +Logger spawns new thread for sending Static definitions data (reliable transfer enabled) and continues to send dynamic topic data at the same time. Static data is published into **ulog_stream_acked** topic and the dynamic data into **ulog_stream** topic. The thread sending dynamic data does not need to wait anything, but continuously sending the data without waiting any ack. The static data sender thread publishes one message at a time and waits for ack until publishing next one. + +mavlink_uorb reads both **ulog_stream** and **ulog_stream_acked** streams and sends either **MAVLINK_MSG_LOGGING_DATA** or **MAVLINK_MSG_LOGGING_DATA_ACKED** mavlink msgs accordingly. Also it listens to **MAVLINK_MSG_LOGGING_DATA_ACK** messages and publish to **ulog_stream_ack** if one received. +Sending **MAVLINK_MSG_LOGGING_DATA_ACKED** raises a flag to wait for ack. A **MAVLINK_MSG_LOGGING_DATA_ACK** message with expected sequence number shall be received before next _acked message can be sent, but the **MAVLINK_MSG_LOGGING_DATA** messages are always sent in parallel of that without any blocking. + +Receiver listens to both **MAVLINK_MSG_LOGGING_DATA** and **MAVLINK_MSG_LOGGING_DATA_ACKED** messages and write them to separate files accordingly: _DATA into .data file and _DATA_ACKED into .ulg file. For each **MAVLINK_MSG_LOGGING_DATA_ACKED** message it sends back a **MAVLINK_MSG_LOGGING_DATA_ACK** message. When logging is stopped, the receiver append the .data file content into the end of .ulg file and removes the .data file. + +``` ++----------------------------------------------+ +| Logger | +| | +|-----------+ +--------------------------| +| Dyn data | | Static data | ++----------------------------------------------+ + | | ^ + | Publish | Publish | Subs + | | | + v v | ++----------------------------------------------+ +| MAvlink_ulog | +| | +| | +| | ++----------------------------------------------+ + | | ^ +MAVLINK_MSG_ MAVLINK_MSG_ | +LOGGING_DATA LOGGING_DATA_ACKED | + | | MAVLINK_MSG_ + | | LOGGING_DATA_ACK + v v | ++----------------------------------------------+ +| Receiver | +|-----------+ +--------------------------| +| Dyn data | | Static data | ++----------------------------------------------+ + | | + | | + | | + v v ++------------+ +------------+ +| .data file | | .ulg file | +| DATA | | DEFS | ++------------+ +------------+ + | | + | ---- Stop logging---- | + | | + +-------------------------->+ + | + v + +------------+ + | .ulg file | + | DEFS | + | DATA | + +------------+ + +```