From cd6afffaee41703022678094a6c7f5ba44dc9d19 Mon Sep 17 00:00:00 2001 From: Frank Osterfeld Date: Thu, 22 Feb 2024 19:07:40 +0100 Subject: [PATCH] DataSink: Fix signal metadata propagation Use the automatic propagation of signal_name, signal_unit properties etc. via default tags instead of trying to handle it manually, which didn't work correctly. --- .../include/gnuradio-4.0/basic/DataSink.hpp | 54 ++-------- blocks/basic/test/qa_DataSink.cpp | 98 ++++++++++++++----- .../gnuradio-4.0/testing/TagMonitors.hpp | 8 +- 3 files changed, 88 insertions(+), 72 deletions(-) diff --git a/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp b/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp index 813318a53..425361e5c 100644 --- a/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp +++ b/blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp @@ -319,7 +319,6 @@ class DataSink : public Block> { bool _listeners_finished = false; std::mutex _listener_mutex; std::optional> _history; - bool _has_signal_info_from_settings = false; public: Annotated, Unit<"Hz">> sample_rate = 1.f; @@ -404,14 +403,16 @@ class DataSink : public Block> { } void - settingsChanged(const property_map &oldSettings, const property_map &newSettings) { - if (applySignalInfo(newSettings)) { - _has_signal_info_from_settings = true; + settingsChanged(const property_map &oldSettings, const property_map & /*newSettings*/) { + const auto oldSignalName = detail::getProperty(oldSettings, "signal_name"); + if (oldSignalName != signal_name) { + DataSinkRegistry::instance().updateSignalName(this, oldSignalName.value_or(""), signal_name); } - const auto oldSignalName = detail::getProperty(oldSettings, tag::SIGNAL_NAME.key()); - const auto newSignalName = detail::getProperty(newSettings, tag::SIGNAL_NAME.key()); - if (oldSignalName != newSignalName) { - DataSinkRegistry::instance().updateSignalName(this, oldSignalName.value_or(""), newSignalName.value_or("")); + std::lock_guard lg{ _listener_mutex }; + for (auto &listener : _listeners) { + listener->applySampleRate(sample_rate); + listener->setDataSetTemplate( + DataSet{ .signal_names = { signal_name }, .signal_units = { signal_unit }, .signal_ranges = { { static_cast(signal_min), static_cast(signal_max) } } }); } } @@ -500,10 +501,6 @@ class DataSink : public Block> { if (this->input_tags_present()) { assert(this->mergedInputTag().index == 0); tagData = this->mergedInputTag().map; - // signal info from settings overrides info from tags - if (!_has_signal_info_from_settings) { - applySignalInfo(this->mergedInputTag().map); - } } { @@ -523,37 +520,6 @@ class DataSink : public Block> { } private: - bool - applySignalInfo(const property_map &properties) { - const auto rate_ = detail::getProperty(properties, tag::SAMPLE_RATE.key()); - const auto name_ = detail::getProperty(properties, tag::SIGNAL_NAME.key()); - const auto unit_ = detail::getProperty(properties, tag::SIGNAL_UNIT.key()); - const auto min_ = detail::getProperty(properties, tag::SIGNAL_MIN.key()); - const auto max_ = detail::getProperty(properties, tag::SIGNAL_MAX.key()); - if (!rate_ && !name_ && !unit_ && !min_ && !max_) { - return false; - } - - sample_rate = rate_.value_or(sample_rate); - signal_name = name_.value_or(signal_name); - signal_unit = unit_.value_or(signal_unit); - signal_min = min_.value_or(signal_min); - signal_max = max_.value_or(signal_max); - - // forward to listeners - for (auto &listener : _listeners) { - std::lock_guard lg{ _listener_mutex }; - if (rate_) { - listener->applySampleRate(sample_rate); - } - if (name_ || unit_ || min_ || max_) { - listener->setDataSetTemplate(DataSet{ .signal_names = { signal_name }, .signal_units = { signal_unit }, .signal_ranges = { { T(signal_min), T(signal_max) } } }); - } - } - - return name_ && unit_ && min_ && max_; - } - void ensureHistorySize(std::size_t new_size) { const auto old_size = _history ? _history->capacity() : std::size_t{ 0 }; @@ -575,7 +541,7 @@ class DataSink : public Block> { void addListener(std::unique_ptr &&l, bool block) { - l->setDataSetTemplate(DataSet{ .signal_names = { signal_name }, .signal_units = { signal_unit }, .signal_ranges = { { T(signal_min), T(signal_max) } } }); + l->setDataSetTemplate(DataSet{ .signal_names = { signal_name }, .signal_units = { signal_unit }, .signal_ranges = { { static_cast(signal_min), static_cast(signal_max) } } }); l->applySampleRate(sample_rate); if (block) { _listeners.push_back(std::move(l)); diff --git a/blocks/basic/test/qa_DataSink.cpp b/blocks/basic/test/qa_DataSink.cpp index ff0d075be..82fb84803 100644 --- a/blocks/basic/test/qa_DataSink.cpp +++ b/blocks/basic/test/qa_DataSink.cpp @@ -251,8 +251,6 @@ const boost::ut::suite DataSinkTests = [] { Scheduler sched{ std::move(testGraph) }; sched.runAndWait(); - sink.stop(); // TODO the scheduler should call this - auto lg = std::lock_guard{ m2 }; expect(eq(chunksSeen1.load(), 201UZ)); expect(eq(chunksSeen2, 201UZ)); @@ -314,8 +312,6 @@ const boost::ut::suite DataSinkTests = [] { Scheduler sched{ std::move(testGraph) }; sched.runAndWait(); - sink.stop(); // TODO the scheduler should call this - const auto pollerAfterStop = DataSinkRegistry::instance().getStreamingPoller(DataSinkQuery::sinkName("test_sink")); expect(pollerAfterStop->finished.load()); } @@ -388,8 +384,6 @@ const boost::ut::suite DataSinkTests = [] { Scheduler sched{ std::move(testGraph) }; sched.runAndWait(); - sink.stop(); // TODO the scheduler should call this - const auto &[receivedData, receivedTags] = polling.get(); const auto expected_tags = { tags[0], tags[2] }; // triggers-only @@ -400,19 +394,79 @@ const boost::ut::suite DataSinkTests = [] { expect(eq(poller->drop_count.load(), 0UZ)); }; + "propagation of signal metadata per data set"_test = [] { + constexpr gr::Size_t kSamples = 20000000; + + gr::Graph testGraph; + auto &src = testGraph.emplaceBlock>( + { { "n_samples_max", kSamples }, { "mark_tag", false }, { "signal_name", "test signal" }, { "signal_unit", "no unit" }, { "signal_min", -2.f }, { "signal_max", 2.f } }); + const auto tags = std::vector{ { 19000000, { { "TYPE", "TRIGGER" } } } }; + src.tags = tags; + auto &sink = testGraph.emplaceBlock>(); + + expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); + + auto polling = std::async([] { + std::vector receivedData; + std::vector receivedTags; + bool seenFinished = false; + auto isTrigger = [](const Tag &) { return TriggerMatchResult::Matching; }; + std::shared_ptr::DataSetPoller> poller; + int tries = 0; + // wait until the signal name is propagated to the sink + while (!poller && tries < 100) { + poller = DataSinkRegistry::instance().getTriggerPoller(DataSinkQuery::signalName("test signal"), isTrigger, 0UZ, 2UZ, BlockingMode::Blocking); + if (!poller) { + tries++; + std::this_thread::sleep_for(std::chrono::milliseconds{ 1 }); + } + } + + expect(neq(poller, nullptr)); + if (!poller) { + return std::make_tuple(receivedData, receivedTags); + } + + while (!seenFinished) { + seenFinished = poller->finished; + [[maybe_unused]] auto r = poller->process([&receivedData, &receivedTags](const auto &datasets) { + for (const auto &dataset : datasets) { + receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end()); + // signal info from sink settings + expect(eq(dataset.signal_names.size(), 1u)); + expect(eq(dataset.signal_units.size(), 1u)); + expect(eq(dataset.signal_ranges.size(), 1u)); + expect(eq(dataset.timing_events.size(), 1u)); + expect(eq(dataset.signal_names[0], "test signal"s)); + expect(eq(dataset.signal_units[0], "no unit"s)); + expect(eq(dataset.signal_ranges[0], std::vector{ -2, +2 })); + expect(eq(dataset.timing_events[0].size(), 1u)); + expect(eq(dataset.timing_events[0][0].index, 0)); + receivedTags.insert(receivedTags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end()); + } + }); + } + return std::make_tuple(receivedData, receivedTags); + }); + + Scheduler sched{ std::move(testGraph) }; + sched.runAndWait(); + + const auto &[receivedData, receivedTags] = polling.get(); + expect(eq(receivedData, std::vector{ 19000000, 19000001 })); + }; + "blocking snapshot mode"_test = [] { constexpr gr::Size_t kSamples = 200000; gr::Graph testGraph; - auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples }, { "mark_tag", false } }); - src.tags = { { 0, - { { std::string(tag::SIGNAL_NAME.key()), "test signal" }, - { std::string(tag::SIGNAL_UNIT.key()), "none" }, - { std::string(tag::SIGNAL_MIN.key()), int32_t{ 0 } }, - { std::string(tag::SIGNAL_MAX.key()), kSamples - 1 } } }, - { 3000, { { "TYPE", "TRIGGER" } } }, - { 8000, { { "TYPE", "NO_TRIGGER" } } }, - { 180000, { { "TYPE", "TRIGGER" } } } }; + auto &src = testGraph.emplaceBlock>({ { "n_samples_max", kSamples }, + { "mark_tag", false }, + { "signal_name", "test signal" }, + { "signal_unit", "none" }, + { "signal_min", 0.f }, + { "signal_max", static_cast(kSamples - 1) } }); + src.tags = { { 3000, { { "TYPE", "TRIGGER" } } }, { 8000, { { "TYPE", "NO_TRIGGER" } } }, { 180000, { { "TYPE", "TRIGGER" } } } }; auto &sink = testGraph.emplaceBlock>({ { "name", "test_sink" }, { "sample_rate", 10000.f } }); expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink))); @@ -440,14 +494,14 @@ const boost::ut::suite DataSinkTests = [] { seenFinished = poller->finished; [[maybe_unused]] auto r = poller->process([&receivedData](const auto &datasets) { for (const auto &dataset : datasets) { - // signal info from tags + // signal info propagated from source to sink expect(eq(dataset.signal_names.size(), 1u)); expect(eq(dataset.signal_units.size(), 1u)); expect(eq(dataset.signal_ranges.size(), 1u)); expect(eq(dataset.timing_events.size(), 1u)); expect(eq(dataset.signal_names[0], "test signal"s)); expect(eq(dataset.signal_units[0], "none"s)); - expect(eq(dataset.signal_ranges[0], std::vector{ -1, +1 })); + expect(eq(dataset.signal_ranges[0], std::vector{ 0, kSamples - 1 })); expect(eq(dataset.timing_events[0].size(), 1u)); expect(eq(dataset.timing_events[0][0].index, -5000)); receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end()); @@ -461,8 +515,6 @@ const boost::ut::suite DataSinkTests = [] { Scheduler sched{ std::move(testGraph) }; sched.runAndWait(); - sink.stop(); // TODO the scheduler should call this - const auto receivedData = poller_result.get(); expect(eq(receivedDataCb, receivedData)); expect(eq(receivedData, std::vector{ 8000, 185000 })); @@ -543,8 +595,6 @@ const boost::ut::suite DataSinkTests = [] { Scheduler sched{ std::move(testGraph) }; sched.runAndWait(); - sink.stop(); // TODO the scheduler should call this - for (std::size_t i = 0; i < results.size(); ++i) { expect(eq(results[i].get(), expected[i])); expect(eq(resultsCb[i], expected[i])); @@ -596,8 +646,6 @@ const boost::ut::suite DataSinkTests = [] { Scheduler sched{ std::move(testGraph) }; sched.runAndWait(); - sink.stop(); // TODO the scheduler should call this - const auto &[receivedData, receivedTags] = polling.get(); auto expectedStart = std::vector{ 57000, 61999, 57001, 62000, 57002 }; expect(eq(poller->drop_count.load(), 0u)); @@ -638,8 +686,6 @@ const boost::ut::suite DataSinkTests = [] { Scheduler sched{ std::move(testGraph) }; sched.runAndWait(); - sink.stop(); // TODO the scheduler should call this - std::lock_guard lg{ m }; auto expectedStart = std::vector{ 57000, 61999, 57001, 62000, 57002 }; expect(eq(receivedData.size(), 2 * kTriggers)); @@ -680,8 +726,6 @@ const boost::ut::suite DataSinkTests = [] { Scheduler sched{ std::move(testGraph) }; sched.runAndWait(); - sink.stop(); // TODO the scheduler should call this - const auto samplesSeen = polling.get(); expect(eq(samplesSeen + poller->drop_count, static_cast(kSamples))); }; diff --git a/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp b/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp index cf49d0cd7..4abdb5a06 100644 --- a/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp +++ b/blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp @@ -1,6 +1,8 @@ #ifndef GNURADIO_TAGMONITORS_HPP #define GNURADIO_TAGMONITORS_HPP +#include + #include #include @@ -93,6 +95,9 @@ struct TagSource : public Block> { gr::Size_t n_samples_produced{ 0 }; float sample_rate = 1000.0f; std::string signal_name = "unknown signal"; + std::string signal_unit = "unknown unit"; + float signal_min = std::numeric_limits::lowest(); + float signal_max = std::numeric_limits::max(); bool verbose_console = false; bool mark_tag = true; // true: mark tagged samples with '1' or '0' otherwise. false: [0, 1, 2, ..., ] @@ -345,7 +350,8 @@ struct TagSink : public Block> { } // namespace gr::testing -ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSource), out, n_samples_max, sample_rate, signal_name, verbose_console, mark_tag); +ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSource), out, n_samples_max, sample_rate, signal_name, signal_unit, signal_min, signal_max, + verbose_console, mark_tag); ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagMonitor), in, out, n_samples_expected, sample_rate, signal_name, n_samples_produced, log_tags, log_samples, verbose_console, samples); ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSink), in, n_samples_expected, sample_rate, signal_name, n_samples_produced, log_tags,