Skip to content

Commit

Permalink
DataSink: Fix signal metadata propagation
Browse files Browse the repository at this point in the history
Use the automatic propagation of signal_name, signal_unit properties
etc. via default tags instead of trying to handle it manually, which
didn't work correctly.
  • Loading branch information
frankosterfeld committed Feb 22, 2024
1 parent 73bfcb8 commit cd6afff
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 72 deletions.
54 changes: 10 additions & 44 deletions blocks/basic/include/gnuradio-4.0/basic/DataSink.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ class DataSink : public Block<DataSink<T>> {
bool _listeners_finished = false;
std::mutex _listener_mutex;
std::optional<gr::HistoryBuffer<T>> _history;
bool _has_signal_info_from_settings = false;

public:
Annotated<float, "sample rate", Doc<"signal sample rate">, Unit<"Hz">> sample_rate = 1.f;
Expand Down Expand Up @@ -404,14 +403,16 @@ class DataSink : public Block<DataSink<T>> {
}

void
settingsChanged(const property_map &oldSettings, const property_map &newSettings) {
if (applySignalInfo(newSettings)) {
_has_signal_info_from_settings = true;
settingsChanged(const property_map &oldSettings, const property_map & /*newSettings*/) {
const auto oldSignalName = detail::getProperty<std::string>(oldSettings, "signal_name");
if (oldSignalName != signal_name) {
DataSinkRegistry::instance().updateSignalName(this, oldSignalName.value_or(""), signal_name);
}
const auto oldSignalName = detail::getProperty<std::string>(oldSettings, tag::SIGNAL_NAME.key());
const auto newSignalName = detail::getProperty<std::string>(newSettings, tag::SIGNAL_NAME.key());
if (oldSignalName != newSignalName) {
DataSinkRegistry::instance().updateSignalName(this, oldSignalName.value_or(""), newSignalName.value_or(""));
std::lock_guard lg{ _listener_mutex };
for (auto &listener : _listeners) {
listener->applySampleRate(sample_rate);
listener->setDataSetTemplate(
DataSet<T>{ .signal_names = { signal_name }, .signal_units = { signal_unit }, .signal_ranges = { { static_cast<T>(signal_min), static_cast<T>(signal_max) } } });
}
}

Expand Down Expand Up @@ -500,10 +501,6 @@ class DataSink : public Block<DataSink<T>> {
if (this->input_tags_present()) {
assert(this->mergedInputTag().index == 0);
tagData = this->mergedInputTag().map;
// signal info from settings overrides info from tags
if (!_has_signal_info_from_settings) {
applySignalInfo(this->mergedInputTag().map);
}
}

{
Expand All @@ -523,37 +520,6 @@ class DataSink : public Block<DataSink<T>> {
}

private:
bool
applySignalInfo(const property_map &properties) {
const auto rate_ = detail::getProperty<float>(properties, tag::SAMPLE_RATE.key());
const auto name_ = detail::getProperty<std::string>(properties, tag::SIGNAL_NAME.key());
const auto unit_ = detail::getProperty<std::string>(properties, tag::SIGNAL_UNIT.key());
const auto min_ = detail::getProperty<float>(properties, tag::SIGNAL_MIN.key());
const auto max_ = detail::getProperty<float>(properties, tag::SIGNAL_MAX.key());
if (!rate_ && !name_ && !unit_ && !min_ && !max_) {
return false;
}

sample_rate = rate_.value_or(sample_rate);
signal_name = name_.value_or(signal_name);
signal_unit = unit_.value_or(signal_unit);
signal_min = min_.value_or(signal_min);
signal_max = max_.value_or(signal_max);

// forward to listeners
for (auto &listener : _listeners) {
std::lock_guard lg{ _listener_mutex };
if (rate_) {
listener->applySampleRate(sample_rate);
}
if (name_ || unit_ || min_ || max_) {
listener->setDataSetTemplate(DataSet<T>{ .signal_names = { signal_name }, .signal_units = { signal_unit }, .signal_ranges = { { T(signal_min), T(signal_max) } } });
}
}

return name_ && unit_ && min_ && max_;
}

void
ensureHistorySize(std::size_t new_size) {
const auto old_size = _history ? _history->capacity() : std::size_t{ 0 };
Expand All @@ -575,7 +541,7 @@ class DataSink : public Block<DataSink<T>> {

void
addListener(std::unique_ptr<AbstractListener> &&l, bool block) {
l->setDataSetTemplate(DataSet<T>{ .signal_names = { signal_name }, .signal_units = { signal_unit }, .signal_ranges = { { T(signal_min), T(signal_max) } } });
l->setDataSetTemplate(DataSet<T>{ .signal_names = { signal_name }, .signal_units = { signal_unit }, .signal_ranges = { { static_cast<T>(signal_min), static_cast<T>(signal_max) } } });
l->applySampleRate(sample_rate);
if (block) {
_listeners.push_back(std::move(l));
Expand Down
98 changes: 71 additions & 27 deletions blocks/basic/test/qa_DataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,6 @@ const boost::ut::suite DataSinkTests = [] {
Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

sink.stop(); // TODO the scheduler should call this

auto lg = std::lock_guard{ m2 };
expect(eq(chunksSeen1.load(), 201UZ));
expect(eq(chunksSeen2, 201UZ));
Expand Down Expand Up @@ -314,8 +312,6 @@ const boost::ut::suite DataSinkTests = [] {
Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

sink.stop(); // TODO the scheduler should call this

const auto pollerAfterStop = DataSinkRegistry::instance().getStreamingPoller<float>(DataSinkQuery::sinkName("test_sink"));
expect(pollerAfterStop->finished.load());
}
Expand Down Expand Up @@ -388,8 +384,6 @@ const boost::ut::suite DataSinkTests = [] {
Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

sink.stop(); // TODO the scheduler should call this

const auto &[receivedData, receivedTags] = polling.get();
const auto expected_tags = { tags[0], tags[2] }; // triggers-only

Expand All @@ -400,19 +394,79 @@ const boost::ut::suite DataSinkTests = [] {
expect(eq(poller->drop_count.load(), 0UZ));
};

"propagation of signal metadata per data set"_test = [] {
constexpr gr::Size_t kSamples = 20000000;

gr::Graph testGraph;
auto &src = testGraph.emplaceBlock<gr::testing::TagSource<int32_t>>(
{ { "n_samples_max", kSamples }, { "mark_tag", false }, { "signal_name", "test signal" }, { "signal_unit", "no unit" }, { "signal_min", -2.f }, { "signal_max", 2.f } });
const auto tags = std::vector<Tag>{ { 19000000, { { "TYPE", "TRIGGER" } } } };
src.tags = tags;
auto &sink = testGraph.emplaceBlock<DataSink<int32_t>>();

expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink)));

auto polling = std::async([] {
std::vector<int32_t> receivedData;
std::vector<Tag> receivedTags;
bool seenFinished = false;
auto isTrigger = [](const Tag &) { return TriggerMatchResult::Matching; };
std::shared_ptr<DataSink<int32_t>::DataSetPoller> poller;
int tries = 0;
// wait until the signal name is propagated to the sink
while (!poller && tries < 100) {
poller = DataSinkRegistry::instance().getTriggerPoller<int32_t>(DataSinkQuery::signalName("test signal"), isTrigger, 0UZ, 2UZ, BlockingMode::Blocking);
if (!poller) {
tries++;
std::this_thread::sleep_for(std::chrono::milliseconds{ 1 });
}
}

expect(neq(poller, nullptr));
if (!poller) {
return std::make_tuple(receivedData, receivedTags);
}

while (!seenFinished) {
seenFinished = poller->finished;
[[maybe_unused]] auto r = poller->process([&receivedData, &receivedTags](const auto &datasets) {
for (const auto &dataset : datasets) {
receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end());
// signal info from sink settings
expect(eq(dataset.signal_names.size(), 1u));
expect(eq(dataset.signal_units.size(), 1u));
expect(eq(dataset.signal_ranges.size(), 1u));
expect(eq(dataset.timing_events.size(), 1u));
expect(eq(dataset.signal_names[0], "test signal"s));
expect(eq(dataset.signal_units[0], "no unit"s));
expect(eq(dataset.signal_ranges[0], std::vector{ -2, +2 }));
expect(eq(dataset.timing_events[0].size(), 1u));
expect(eq(dataset.timing_events[0][0].index, 0));
receivedTags.insert(receivedTags.end(), dataset.timing_events[0].begin(), dataset.timing_events[0].end());
}
});
}
return std::make_tuple(receivedData, receivedTags);
});

Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

const auto &[receivedData, receivedTags] = polling.get();
expect(eq(receivedData, std::vector<int32_t>{ 19000000, 19000001 }));
};

"blocking snapshot mode"_test = [] {
constexpr gr::Size_t kSamples = 200000;

gr::Graph testGraph;
auto &src = testGraph.emplaceBlock<gr::testing::TagSource<int32_t>>({ { "n_samples_max", kSamples }, { "mark_tag", false } });
src.tags = { { 0,
{ { std::string(tag::SIGNAL_NAME.key()), "test signal" },
{ std::string(tag::SIGNAL_UNIT.key()), "none" },
{ std::string(tag::SIGNAL_MIN.key()), int32_t{ 0 } },
{ std::string(tag::SIGNAL_MAX.key()), kSamples - 1 } } },
{ 3000, { { "TYPE", "TRIGGER" } } },
{ 8000, { { "TYPE", "NO_TRIGGER" } } },
{ 180000, { { "TYPE", "TRIGGER" } } } };
auto &src = testGraph.emplaceBlock<gr::testing::TagSource<int32_t>>({ { "n_samples_max", kSamples },
{ "mark_tag", false },
{ "signal_name", "test signal" },
{ "signal_unit", "none" },
{ "signal_min", 0.f },
{ "signal_max", static_cast<float>(kSamples - 1) } });
src.tags = { { 3000, { { "TYPE", "TRIGGER" } } }, { 8000, { { "TYPE", "NO_TRIGGER" } } }, { 180000, { { "TYPE", "TRIGGER" } } } };
auto &sink = testGraph.emplaceBlock<DataSink<int32_t>>({ { "name", "test_sink" }, { "sample_rate", 10000.f } });

expect(eq(ConnectionResult::SUCCESS, testGraph.connect<"out">(src).to<"in">(sink)));
Expand Down Expand Up @@ -440,14 +494,14 @@ const boost::ut::suite DataSinkTests = [] {
seenFinished = poller->finished;
[[maybe_unused]] auto r = poller->process([&receivedData](const auto &datasets) {
for (const auto &dataset : datasets) {
// signal info from tags
// signal info propagated from source to sink
expect(eq(dataset.signal_names.size(), 1u));
expect(eq(dataset.signal_units.size(), 1u));
expect(eq(dataset.signal_ranges.size(), 1u));
expect(eq(dataset.timing_events.size(), 1u));
expect(eq(dataset.signal_names[0], "test signal"s));
expect(eq(dataset.signal_units[0], "none"s));
expect(eq(dataset.signal_ranges[0], std::vector<int32_t>{ -1, +1 }));
expect(eq(dataset.signal_ranges[0], std::vector<int32_t>{ 0, kSamples - 1 }));
expect(eq(dataset.timing_events[0].size(), 1u));
expect(eq(dataset.timing_events[0][0].index, -5000));
receivedData.insert(receivedData.end(), dataset.signal_values.begin(), dataset.signal_values.end());
Expand All @@ -461,8 +515,6 @@ const boost::ut::suite DataSinkTests = [] {
Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

sink.stop(); // TODO the scheduler should call this

const auto receivedData = poller_result.get();
expect(eq(receivedDataCb, receivedData));
expect(eq(receivedData, std::vector<int32_t>{ 8000, 185000 }));
Expand Down Expand Up @@ -543,8 +595,6 @@ const boost::ut::suite DataSinkTests = [] {
Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

sink.stop(); // TODO the scheduler should call this

for (std::size_t i = 0; i < results.size(); ++i) {
expect(eq(results[i].get(), expected[i]));
expect(eq(resultsCb[i], expected[i]));
Expand Down Expand Up @@ -596,8 +646,6 @@ const boost::ut::suite DataSinkTests = [] {
Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

sink.stop(); // TODO the scheduler should call this

const auto &[receivedData, receivedTags] = polling.get();
auto expectedStart = std::vector<float>{ 57000, 61999, 57001, 62000, 57002 };
expect(eq(poller->drop_count.load(), 0u));
Expand Down Expand Up @@ -638,8 +686,6 @@ const boost::ut::suite DataSinkTests = [] {
Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

sink.stop(); // TODO the scheduler should call this

std::lock_guard lg{ m };
auto expectedStart = std::vector<float>{ 57000, 61999, 57001, 62000, 57002 };
expect(eq(receivedData.size(), 2 * kTriggers));
Expand Down Expand Up @@ -680,8 +726,6 @@ const boost::ut::suite DataSinkTests = [] {
Scheduler sched{ std::move(testGraph) };
sched.runAndWait();

sink.stop(); // TODO the scheduler should call this

const auto samplesSeen = polling.get();
expect(eq(samplesSeen + poller->drop_count, static_cast<std::size_t>(kSamples)));
};
Expand Down
8 changes: 7 additions & 1 deletion blocks/testing/include/gnuradio-4.0/testing/TagMonitors.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#ifndef GNURADIO_TAGMONITORS_HPP
#define GNURADIO_TAGMONITORS_HPP

#include <limits>

#include <fmt/format.h>
#include <fmt/ranges.h>

Expand Down Expand Up @@ -93,6 +95,9 @@ struct TagSource : public Block<TagSource<T, UseProcessVariant>> {
gr::Size_t n_samples_produced{ 0 };
float sample_rate = 1000.0f;
std::string signal_name = "unknown signal";
std::string signal_unit = "unknown unit";
float signal_min = std::numeric_limits<T>::lowest();
float signal_max = std::numeric_limits<T>::max();
bool verbose_console = false;
bool mark_tag = true; // true: mark tagged samples with '1' or '0' otherwise. false: [0, 1, 2, ..., ]

Expand Down Expand Up @@ -345,7 +350,8 @@ struct TagSink : public Block<TagSink<T, UseProcessVariant>> {

} // namespace gr::testing

ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSource<T, b>), out, n_samples_max, sample_rate, signal_name, verbose_console, mark_tag);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSource<T, b>), out, n_samples_max, sample_rate, signal_name, signal_unit, signal_min, signal_max,
verbose_console, mark_tag);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagMonitor<T, b>), in, out, n_samples_expected, sample_rate, signal_name, n_samples_produced, log_tags,
log_samples, verbose_console, samples);
ENABLE_REFLECTION_FOR_TEMPLATE_FULL((typename T, gr::testing::ProcessFunction b), (gr::testing::TagSink<T, b>), in, n_samples_expected, sample_rate, signal_name, n_samples_produced, log_tags,
Expand Down

0 comments on commit cd6afff

Please sign in to comment.