From 11e21467eb48a371438c0f7c3f1c82246b58ca95 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Thu, 4 Jul 2013 19:15:51 -0400 Subject: [PATCH 01/16] Implement "deadline" push/get operations on edges These methods are meant to be used in cases where a certain (realtime) frequency is required of a process. --- src/sprokit/pipeline/CMakeLists.txt | 5 +- src/sprokit/pipeline/edge.cxx | 183 +++++++++++++++++----------- src/sprokit/pipeline/edge.h | 25 ++++ 3 files changed, 139 insertions(+), 74 deletions(-) diff --git a/src/sprokit/pipeline/CMakeLists.txt b/src/sprokit/pipeline/CMakeLists.txt index 1a2762f9..7be7981a 100644 --- a/src/sprokit/pipeline/CMakeLists.txt +++ b/src/sprokit/pipeline/CMakeLists.txt @@ -153,11 +153,12 @@ sprokit_add_library(sprokit_pipeline SHARED ${pipeline_headers} ${pipeline_private_headers}) target_link_libraries(sprokit_pipeline - LINK_PRIVATE + LINK_PUBLIC ${Boost_CHRONO_LIBRARY} + ${Boost_SYSTEM_LIBRARY} + LINK_PRIVATE ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} - ${Boost_SYSTEM_LIBRARY} ${Boost_THREAD_LIBRARY} ${CMAKE_DL_LIBS} ${CMAKE_THREAD_LIBS_INIT}) diff --git a/src/sprokit/pipeline/edge.cxx b/src/sprokit/pipeline/edge.cxx index 20507372..6633cac6 100644 --- a/src/sprokit/pipeline/edge.cxx +++ b/src/sprokit/pipeline/edge.cxx @@ -37,6 +37,8 @@ #include #include #include +#include +#include #include #include @@ -88,10 +90,12 @@ class edge::priv typedef boost::weak_ptr process_ref_t; - bool has_data() const; bool full_of_data() const; void complete_check() const; + bool push(edge_datum_t const& datum, boost::optional const& duration = boost::none); + boost::optional pop(boost::optional const& duration = boost::none); + bool const depends; size_t const capacity; bool downstream_complete; @@ -151,7 +155,7 @@ ::has_data() const (void)lock; - return d->has_data(); + return !d->q.empty(); } bool @@ -180,67 +184,14 @@ void edge ::push_datum(edge_datum_t const& datum) { - { - priv::shared_lock_t const lock(d->complete_mutex); - - (void)lock; - - if (d->downstream_complete) - { - return; - } - } - - { - priv::upgrade_lock_t lock(d->mutex); - - while (d->full_of_data()) - { - d->cond_have_space.wait(lock); - } - - { - priv::upgrade_to_unique_lock_t const write_lock(lock); - - (void)write_lock; - - d->q.push_back(datum); - } - } - - d->cond_have_data.notify_one(); + d->push(datum); } edge_datum_t edge ::get_datum() { - d->complete_check(); - - edge_datum_t dat; - - { - priv::upgrade_lock_t lock(d->mutex); - - while (!d->has_data()) - { - d->cond_have_data.wait(lock); - } - - dat = d->q.front(); - - { - priv::upgrade_to_unique_lock_t const write_lock(lock); - - (void)write_lock; - - d->q.pop_front(); - } - } - - d->cond_have_space.notify_one(); - - return dat; + return *d->pop(); } edge_datum_t @@ -251,10 +202,8 @@ ::peek_datum(size_t idx) const priv::shared_lock_t lock(d->mutex); - while (d->q.size() <= idx) - { - d->cond_have_data.wait(lock); - } + d->cond_have_data.wait(lock, + boost::bind(&priv::edge_queue_t::size, &d->q) > idx); return d->q.at(idx); } @@ -268,10 +217,8 @@ ::pop_datum() { priv::upgrade_lock_t lock(d->mutex); - while (!d->has_data()) - { - d->cond_have_data.wait(lock); - } + d->cond_have_data.wait(lock, + !boost::bind(&priv::edge_queue_t::empty, &d->q)); { priv::upgrade_to_unique_lock_t const write_lock(lock); @@ -285,6 +232,20 @@ ::pop_datum() d->cond_have_space.notify_one(); } +bool +edge +::try_push_datum(edge_datum_t const& datum, duration_t const& duration) +{ + return d->push(datum, duration); +} + +boost::optional +edge +::try_get_datum(duration_t const& duration) +{ + return d->pop(duration); +} + void edge ::mark_downstream_as_complete() @@ -374,13 +335,6 @@ edge::priv { } -bool -edge::priv -::has_data() const -{ - return !q.empty(); -} - bool edge::priv ::full_of_data() const @@ -407,4 +361,89 @@ ::complete_check() const } } +bool +edge::priv +::push(edge_datum_t const& datum, boost::optional const& duration) +{ + { + shared_lock_t const lock(complete_mutex); + + (void)lock; + + if (downstream_complete) + { + return true; + } + } + + { + upgrade_lock_t lock(mutex); + boost::function const predicate = !boost::bind(&sprokit::edge::priv::full_of_data, this); + + if (duration) + { + if (!cond_have_space.wait_for(lock, *duration, predicate)) + { + return false; + } + } + else + { + cond_have_space.wait(lock, predicate); + } + + { + upgrade_to_unique_lock_t const write_lock(lock); + + (void)write_lock; + + q.push_back(datum); + } + } + + cond_have_data.notify_one(); + + return true; +} + +boost::optional +edge::priv +::pop(boost::optional const& duration) +{ + complete_check(); + + edge_datum_t dat; + + { + upgrade_lock_t lock(mutex); + boost::function const predicate = !boost::bind(&edge_queue_t::empty, &q); + + if (duration) + { + if (!cond_have_data.wait_for(lock, *duration, predicate)) + { + return boost::none; + } + } + else + { + cond_have_data.wait(lock, predicate); + } + + dat = q.front(); + + { + upgrade_to_unique_lock_t const write_lock(lock); + + (void)write_lock; + + q.pop_front(); + } + } + + cond_have_space.notify_one(); + + return dat; +} + } diff --git a/src/sprokit/pipeline/edge.h b/src/sprokit/pipeline/edge.h index 55b2a48c..bb7fc8c7 100644 --- a/src/sprokit/pipeline/edge.h +++ b/src/sprokit/pipeline/edge.h @@ -36,8 +36,10 @@ #include "config.h" #include "types.h" +#include #include #include +#include #include #include @@ -234,6 +236,29 @@ class SPROKIT_PIPELINE_EXPORT edge */ void pop_datum(); + typedef boost::chrono::high_resolution_clock clock_t; + typedef clock_t::duration duration_t; + + /** + * \brief Push a datum into the edge. + * + * \see push_datum + * + * \param datum The datum to put into the edge. + * \param duration The maximum amount of time to wait. + */ + bool try_push_datum(edge_datum_t const& datum, duration_t const& duration); + /** + * \brief Extract a datum from the edge or fail if a timeout is reached. + * + * \see get_datum + * + * \param duration The maximum amount of time to wait. + * + * \returns The next datum available from the edge, or \c boost::none if the timeout was reached. + */ + boost::optional try_get_datum(duration_t const& duration); + /** * \brief Trigger the edge to flush all data and not accept any more data. * From 21439fbefcd114168347d59ae49b7d1e39508050 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Tue, 9 Jul 2013 12:39:45 -0400 Subject: [PATCH 02/16] Factor out checking for a time --- tests/sprokit/pipeline/test_edge.cxx | 46 ++++++++++++++++++---------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/tests/sprokit/pipeline/test_edge.cxx b/tests/sprokit/pipeline/test_edge.cxx index a6af9e99..34df707c 100644 --- a/tests/sprokit/pipeline/test_edge.cxx +++ b/tests/sprokit/pipeline/test_edge.cxx @@ -395,6 +395,17 @@ IMPLEMENT_TEST(get_data_from_complete) "popping data from a complete edge"); } +namespace +{ + +// This clock is used because it is both steady (which rules out system_clock) +// and uses the wall time (which rules out thread_clock). +typedef boost::chrono::process_real_cpu_clock time_clock_t; +typedef time_clock_t::time_point time_point_t; +typedef time_clock_t::duration duration_t; + +} + #define SECONDS_TO_WAIT 1 #define WAIT_DURATION boost::chrono::seconds(SECONDS_TO_WAIT) @@ -452,15 +463,11 @@ IMPLEMENT_TEST(capacity) } } +static void check_time(duration_t const& actual, duration_t const& expected, char const* const message); + void push_datum(sprokit::edge_t edge, sprokit::edge_datum_t edat) { - // This clock is used because it is both steady (which rules out system_clock) - // and uses the wall time (which rules out thread_clock). - typedef boost::chrono::process_real_cpu_clock time_clock_t; - typedef time_clock_t::time_point time_point_t; - typedef time_clock_t::duration duration_t; - time_point_t const start = time_clock_t::now(); // This should be blocking. @@ -470,20 +477,27 @@ push_datum(sprokit::edge_t edge, sprokit::edge_datum_t edat) duration_t const duration = end - start; - static double const tolerance = 0.75; + check_time(duration, WAIT_DURATION, "pushing into a full edge"); - if (duration < (tolerance * WAIT_DURATION)) + if (edge->datum_count() != 1) { - TEST_ERROR("It seems as though blocking did not " - "occur when pushing into a full edge: " - "expected to wait between " - << tolerance * WAIT_DURATION << " and " - << WAIT_DURATION << ", but waited for " - << duration << " instead"); + TEST_ERROR("A datum was pushed into a full edge"); } +} - if (edge->datum_count() != 1) +void +check_time(duration_t const& actual, duration_t const& expected, char const* const message) +{ + static double const tolerance = 0.75; + boost::chrono::duration const allowed = tolerance * WAIT_DURATION; + + if (actual < allowed) { - TEST_ERROR("A datum was pushed into a full edge"); + TEST_ERROR("It seems as though blocking did not " + "occur when " << message << ": " + "expected to wait between " + << allowed << " and " + << expected << ", but waited for " + << actual << " instead"); } } From 75a9fa813ecc5604b996520a32ad77462905fb35 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Thu, 4 Jul 2013 19:30:01 -0400 Subject: [PATCH 03/16] Implement try_* method tests for an edge --- tests/sprokit/pipeline/test_edge.cxx | 70 ++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/tests/sprokit/pipeline/test_edge.cxx b/tests/sprokit/pipeline/test_edge.cxx index 34df707c..cea132e8 100644 --- a/tests/sprokit/pipeline/test_edge.cxx +++ b/tests/sprokit/pipeline/test_edge.cxx @@ -411,6 +411,7 @@ typedef time_clock_t::duration duration_t; static void push_datum(sprokit::edge_t edge, sprokit::edge_datum_t edat); +TEST_PROPERTY(TIMEOUT, 5) IMPLEMENT_TEST(capacity) { sprokit::config_t const config = sprokit::config::empty_config(); @@ -465,6 +466,75 @@ IMPLEMENT_TEST(capacity) static void check_time(duration_t const& actual, duration_t const& expected, char const* const message); +TEST_PROPERTY(TIMEOUT, 5) +IMPLEMENT_TEST(try_push_datum) +{ + sprokit::config_t const config = sprokit::config::empty_config(); + + sprokit::config::value_t const value_capacity = boost::lexical_cast(1); + + config->set_value(sprokit::edge::config_capacity, value_capacity); + + sprokit::edge_t const edge = boost::make_shared(config); + + sprokit::stamp::increment_t const inc = sprokit::stamp::increment_t(1); + + sprokit::datum_t const dat1 = sprokit::datum::empty_datum(); + sprokit::datum_t const dat2 = sprokit::datum::complete_datum(); + sprokit::stamp_t const stamp1 = sprokit::stamp::new_stamp(inc); + sprokit::stamp_t const stamp2 = sprokit::stamp::incremented_stamp(stamp1); + + sprokit::edge_datum_t const edat1 = sprokit::edge_datum_t(dat1, stamp1); + sprokit::edge_datum_t const edat2 = sprokit::edge_datum_t(dat2, stamp2); + + // Fill the edge. + edge->push_datum(edat1); + + time_point_t const start = time_clock_t::now(); + + // This should be blocking. + bool const pushed = edge->try_push_datum(edat2, WAIT_DURATION); + + time_point_t const end = time_clock_t::now(); + + if (pushed) + { + TEST_ERROR("Returned true when a push should have timed out"); + } + + duration_t const duration = end - start; + + check_time(duration, WAIT_DURATION, "trying to get a datum from an edge"); + + // Make sure the edge still is at capacity. + if (edge->datum_count() != 1) + { + TEST_ERROR("A datum was pushed into a full edge"); + } +} + +TEST_PROPERTY(TIMEOUT, 5) +IMPLEMENT_TEST(try_get_datum) +{ + sprokit::edge_t const edge = boost::make_shared(); + + time_point_t const start = time_clock_t::now(); + + // This should be blocking. + boost::optional const opt_datum = edge->try_get_datum(WAIT_DURATION); + + time_point_t const end = time_clock_t::now(); + + if (opt_datum) + { + TEST_ERROR("Returned a datum from an empty edge"); + } + + duration_t const duration = end - start; + + check_time(duration, WAIT_DURATION, "trying to get a datum from an edge"); +} + void push_datum(sprokit::edge_t edge, sprokit::edge_datum_t edat) { From ca7f2c3a305da880f7f0797502a13aad59ab280d Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Tue, 9 Jul 2013 13:17:02 -0400 Subject: [PATCH 04/16] Add bindings for edge::try_* methods --- src/bindings/python/sprokit/pipeline/edge.cxx | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/src/bindings/python/sprokit/pipeline/edge.cxx b/src/bindings/python/sprokit/pipeline/edge.cxx index 9841e3fb..2eefe03c 100644 --- a/src/bindings/python/sprokit/pipeline/edge.cxx +++ b/src/bindings/python/sprokit/pipeline/edge.cxx @@ -1,5 +1,5 @@ /*ckwg +29 - * Copyright 2011-2012 by Kitware, Inc. + * Copyright 2011-2013 by Kitware, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -32,8 +32,10 @@ #include #include +#include #include +#include #include #include #include @@ -46,6 +48,9 @@ using namespace boost::python; +static bool edge_try_push_datum(sprokit::edge_t const& self, sprokit::edge_datum_t const& datum, double duration); +static boost::optional edge_try_get_datum(sprokit::edge_t const& self, double duration); + BOOST_PYTHON_MODULE(edge) { class_("EdgeDatum" @@ -64,6 +69,8 @@ BOOST_PYTHON_MODULE(edge) .def(vector_indexing_suite()) ; + sprokit::python::register_optional_converter("EdgeDatumOpt", "An optional edge datum."); + class_("Edge" , "A communication channel between processes." , no_init) @@ -87,6 +94,12 @@ BOOST_PYTHON_MODULE(edge) , "Returns the next datum packet from the edge.") .def("pop_datum", &sprokit::edge::pop_datum , "Remove the next datum packet from the edge.") + .def("try_push_datum", &edge_try_push_datum + , (arg("datum"), arg("duration")) + , "Pushes a datum packet into the edge and returns True or returns False if unable to meet the duration.") + .def("try_get_datum", &edge_try_get_datum + , (arg("duration")) + , "Returns the next datum packet from the edge, removing it in the process or None if unable to meet the duration.") .def("set_upstream_process", &sprokit::edge::set_upstream_process , (arg("process")) , "Set the process which is feeding data into the edge.") @@ -101,3 +114,28 @@ BOOST_PYTHON_MODULE(edge) .def_readonly("config_capacity", &sprokit::edge::config_capacity) ; } + +namespace +{ + +typedef boost::chrono::duration py_duration_t; + +} + +bool +edge_try_push_datum(sprokit::edge_t const& self, sprokit::edge_datum_t const& datum, double duration) +{ + py_duration_t const duration_sec = py_duration_t(duration); + sprokit::edge::duration_t const duration_edge = boost::chrono::duration_cast(duration_sec); + + return self->try_push_datum(datum, duration_edge); +} + +boost::optional +edge_try_get_datum(sprokit::edge_t const& self, double duration) +{ + py_duration_t const duration_sec = py_duration_t(duration); + sprokit::edge::duration_t const duration_edge = boost::chrono::duration_cast(duration_sec); + + return self->try_get_datum(duration_edge); +} From 4b2254ac25423ca74319ff3e40d7cc8bcf0225c5 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Tue, 9 Jul 2013 13:17:19 -0400 Subject: [PATCH 05/16] Test the try_* method bindings --- tests/bindings/python/sprokit/pipeline/test-edge.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/bindings/python/sprokit/pipeline/test-edge.py b/tests/bindings/python/sprokit/pipeline/test-edge.py index 2ce6d46c..6f96cbe4 100755 --- a/tests/bindings/python/sprokit/pipeline/test-edge.py +++ b/tests/bindings/python/sprokit/pipeline/test-edge.py @@ -60,6 +60,7 @@ def test_datum_create(): edge.EdgeData() +# TEST_PROPERTY(TIMEOUT, 5) def test_api_calls(): from sprokit.pipeline import config from sprokit.pipeline import datum @@ -88,6 +89,17 @@ def test_api_calls(): e.peek_datum() e.pop_datum() + wait = 1 + fail = e.try_get_datum(wait) + e.try_push_datum(ed, wait) + succeed = e.try_get_datum(wait) + + if fail is not None: + test_error("Received a datum when that should have timed out") + + if succeed is None: + test_error("Did not receive a datum from a get that should have succeeded") + modules.load_known_modules() reg = process_registry.ProcessRegistry.self() From 9bfd9ae2027b6c43e335944af9fb7f03f5a828a6 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Tue, 9 Jul 2013 13:17:32 -0400 Subject: [PATCH 06/16] Test peek_datum with an index --- tests/bindings/python/sprokit/pipeline/test-edge.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/bindings/python/sprokit/pipeline/test-edge.py b/tests/bindings/python/sprokit/pipeline/test-edge.py index 6f96cbe4..84facb1d 100755 --- a/tests/bindings/python/sprokit/pipeline/test-edge.py +++ b/tests/bindings/python/sprokit/pipeline/test-edge.py @@ -87,6 +87,9 @@ def test_api_calls(): e.push_datum(ed) e.peek_datum() + e.push_datum(ed) + e.peek_datum(1) + e.pop_datum() e.pop_datum() wait = 1 From dc989004c679d4ebad74a020ff915c485983d8e7 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Thu, 27 Feb 2014 18:08:05 -0500 Subject: [PATCH 07/16] Add tests for edge_datum equality --- src/bindings/python/sprokit/pipeline/edge.cxx | 8 +++ .../python/sprokit/pipeline/test-edge.py | 2 + tests/sprokit/pipeline/test_edge.cxx | 66 +++++++++++++++++++ 3 files changed, 76 insertions(+) diff --git a/src/bindings/python/sprokit/pipeline/edge.cxx b/src/bindings/python/sprokit/pipeline/edge.cxx index 2eefe03c..b2e1833e 100644 --- a/src/bindings/python/sprokit/pipeline/edge.cxx +++ b/src/bindings/python/sprokit/pipeline/edge.cxx @@ -48,6 +48,7 @@ using namespace boost::python; +static bool edge_datum_eq(sprokit::edge_datum_t const& self, sprokit::edge_datum_t const& rhs); static bool edge_try_push_datum(sprokit::edge_t const& self, sprokit::edge_datum_t const& datum, double duration); static boost::optional edge_try_get_datum(sprokit::edge_t const& self, double duration); @@ -57,6 +58,7 @@ BOOST_PYTHON_MODULE(edge) , no_init) .def(init<>()) .def(init()) + .def("__eq__", &edge_datum_eq) .def_readwrite("datum", &sprokit::edge_datum_t::datum) .def_readwrite("stamp", &sprokit::edge_datum_t::stamp) ; @@ -115,6 +117,12 @@ BOOST_PYTHON_MODULE(edge) ; } +bool +edge_datum_eq(sprokit::edge_datum_t const& self, sprokit::edge_datum_t const& rhs) +{ + return (self == rhs); +} + namespace { diff --git a/tests/bindings/python/sprokit/pipeline/test-edge.py b/tests/bindings/python/sprokit/pipeline/test-edge.py index 84facb1d..810919da 100755 --- a/tests/bindings/python/sprokit/pipeline/test-edge.py +++ b/tests/bindings/python/sprokit/pipeline/test-edge.py @@ -92,6 +92,8 @@ def test_api_calls(): e.pop_datum() e.pop_datum() + ed == ed + wait = 1 fail = e.try_get_datum(wait) e.try_push_datum(ed, wait) diff --git a/tests/sprokit/pipeline/test_edge.cxx b/tests/sprokit/pipeline/test_edge.cxx index cea132e8..870b5ad1 100644 --- a/tests/sprokit/pipeline/test_edge.cxx +++ b/tests/sprokit/pipeline/test_edge.cxx @@ -63,6 +63,72 @@ main(int argc, char* argv[]) RUN_TEST(testname); } +IMPLEMENT_TEST(edge_datum_equal) +{ + sprokit::edge_datum_t edat1 = sprokit::edge_datum_t(); + sprokit::edge_datum_t edat2 = sprokit::edge_datum_t(); + + if (edat1 != edat2) + { + TEST_ERROR("Empty edge data are not equivalent"); + } + + edat1.stamp = sprokit::stamp::new_stamp(1); + edat2.stamp = sprokit::stamp::new_stamp(1); + + if (edat1 != edat2) + { + TEST_ERROR("Edge data with just a new stamp are not equivalent"); + } + + edat1.stamp = sprokit::stamp_t(); + edat2.stamp = sprokit::stamp_t(); + + sprokit::datum_t const dat = sprokit::datum::complete_datum(); + + edat1.datum = dat; + edat2.datum = dat; + + if (edat1 != edat2) + { + TEST_ERROR("Edge data with just the same datum are not equivalent"); + } + + edat1.stamp = sprokit::stamp_t(); + edat2.stamp = sprokit::stamp_t(); + + if (edat1 != edat2) + { + TEST_ERROR("Edge data with just the same datum and new stamps are not equivalent"); + } + + edat1.stamp = sprokit::stamp::new_stamp(1); + edat1.stamp = sprokit::stamp::incremented_stamp(edat1.stamp); + + if (edat1 == edat2) + { + TEST_ERROR("Edge data with the same datum, but different stamps are equivalent"); + } + + edat1.stamp = sprokit::stamp::new_stamp(1); + + sprokit::datum_t const dat2 = sprokit::datum::complete_datum(); + + edat1.datum = dat2; + + if (edat1 == edat2) + { + TEST_ERROR("Edge data with the same stamp, but different data (of the same type) are equivalent"); + } + + edat1.stamp = sprokit::stamp::incremented_stamp(edat1.stamp); + + if (edat1 == edat2) + { + TEST_ERROR("Edge data with different stamps and data are equivalent"); + } +} + IMPLEMENT_TEST(null_config) { sprokit::config_t const config; From 06e43c4743b38309e8253f69f22556e394f0f81e Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Fri, 19 Jul 2013 20:14:00 -0400 Subject: [PATCH 08/16] Handle NULL in edge_datum comparisons --- src/sprokit/pipeline/edge.cxx | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/src/sprokit/pipeline/edge.cxx b/src/sprokit/pipeline/edge.cxx index 6633cac6..a1772faa 100644 --- a/src/sprokit/pipeline/edge.cxx +++ b/src/sprokit/pipeline/edge.cxx @@ -75,8 +75,22 @@ bool edge_datum_t ::operator == (edge_datum_t const& rhs) const { - return (( datum == rhs.datum) && - (*stamp == *rhs.stamp)); + if (datum != rhs.datum) + { + return false; + } + + if (stamp == stamp) + { + return true; + } + + if (!stamp || !datum) + { + return false; + } + + return (*stamp == *rhs.stamp); } config::key_t const edge::config_dependency = config::key_t("_dependency"); From ffd8a7bb5f4015d81dff6fe2de17314258a3b428 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Tue, 23 Jul 2013 13:40:14 -0400 Subject: [PATCH 09/16] Implement equality operator for datum --- src/sprokit/pipeline/datum.cxx | 57 ++++++++++++++++++++++++++++++++++ src/sprokit/pipeline/datum.h | 15 +++++++++ 2 files changed, 72 insertions(+) diff --git a/src/sprokit/pipeline/datum.cxx b/src/sprokit/pipeline/datum.cxx index 30c65d4c..62d9de77 100644 --- a/src/sprokit/pipeline/datum.cxx +++ b/src/sprokit/pipeline/datum.cxx @@ -89,6 +89,46 @@ ::get_error() const return m_error; } +static bool any_equal(boost::any const& a, boost::any const& b); + +bool +datum +::operator == (datum const& dat) const +{ + if (this == &dat) + { + return true; + } + + if (m_type != dat.m_type) + { + return false; + } + + bool ret = false; + + switch (m_type) + { + case data: + ret = any_equal(m_datum, dat.m_datum); + break; + case empty: + case flush: + case complete: + ret = true; + break; + case error: + ret = (m_error == dat.m_error); + break; + case invalid: + default: + ret = false; + break; + } + + return ret; +} + datum ::datum(type_t ty) : m_type(ty) @@ -167,6 +207,23 @@ bad_datum_cast_exception { } +bool +any_equal(boost::any const& a, boost::any const& b) +{ + if (a.empty() && b.empty()) + { + return true; + } + + if (a.type() != b.type()) + { + return false; + } + + // Be safe. + return false; +} + char const* string_for_type(datum::type_t type) { diff --git a/src/sprokit/pipeline/datum.h b/src/sprokit/pipeline/datum.h index ec57319d..1e4f7ae0 100644 --- a/src/sprokit/pipeline/datum.h +++ b/src/sprokit/pipeline/datum.h @@ -36,6 +36,7 @@ #include "types.h" #include +#include #include @@ -56,6 +57,7 @@ namespace sprokit * \ingroup base_classes */ class SPROKIT_PIPELINE_EXPORT datum + : boost::equality_comparable { public: /// Information about an error that occurred within a process. @@ -148,6 +150,19 @@ class SPROKIT_PIPELINE_EXPORT datum */ template T get_datum() const; + + /** + * \brief Compare two data for equality. + * + * \note This returns false for two data packets which point to the same + * internal data since \c boost::any does not give access to it without + * knowing the type. + * + * \param dat The datum to compare to. + * + * \returns True if \p dat and \c *this definitely have the same value, false otherwise. + */ + bool operator == (datum const& dat) const; private: SPROKIT_PIPELINE_NO_EXPORT datum(type_t ty); SPROKIT_PIPELINE_NO_EXPORT datum(error_t const& err); From 7fded3863474fb0bb171c194152d76f8e8ddea16 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Tue, 23 Jul 2013 13:40:35 -0400 Subject: [PATCH 10/16] Factor out comparing pointers in edge_datum --- src/sprokit/pipeline/edge.cxx | 38 ++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/sprokit/pipeline/edge.cxx b/src/sprokit/pipeline/edge.cxx index a1772faa..1de323ca 100644 --- a/src/sprokit/pipeline/edge.cxx +++ b/src/sprokit/pipeline/edge.cxx @@ -71,26 +71,15 @@ edge_datum_t { } +template +static bool pointers_equal(T const& a, T const& b); + bool edge_datum_t ::operator == (edge_datum_t const& rhs) const { - if (datum != rhs.datum) - { - return false; - } - - if (stamp == stamp) - { - return true; - } - - if (!stamp || !datum) - { - return false; - } - - return (*stamp == *rhs.stamp); + return (pointers_equal(datum, rhs.datum) && + pointers_equal(stamp, rhs.stamp)); } config::key_t const edge::config_dependency = config::key_t("_dependency"); @@ -460,4 +449,21 @@ ::pop(boost::optional const& duration) return dat; } +template +bool +pointers_equal(T const& a, T const& b) +{ + if (a == b) + { + return true; + } + + if (!a || !b) + { + return false; + } + + return (*a == *b); +} + } From 6e586589a8ac2bc57de17c6b5096574a1994c4ec Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Thu, 27 Feb 2014 15:14:41 -0500 Subject: [PATCH 11/16] Add Python bindings for datum equality --- src/bindings/python/sprokit/pipeline/datum.cxx | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/bindings/python/sprokit/pipeline/datum.cxx b/src/bindings/python/sprokit/pipeline/datum.cxx index db989ed4..96fca3bf 100644 --- a/src/bindings/python/sprokit/pipeline/datum.cxx +++ b/src/bindings/python/sprokit/pipeline/datum.cxx @@ -1,5 +1,5 @@ /*ckwg +29 - * Copyright 2011-2012 by Kitware, Inc. + * Copyright 2011-2013 by Kitware, Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -59,6 +59,7 @@ static sprokit::datum_t new_datum(object const& obj); static sprokit::datum::type_t datum_type(sprokit::datum_t const& self); static sprokit::datum::error_t datum_get_error(sprokit::datum_t const& self); static object datum_get_datum(sprokit::datum_t const& self); +static bool datum_eq(sprokit::datum_t const& self, sprokit::datum_t const& other); BOOST_PYTHON_MODULE(datum) { @@ -97,6 +98,7 @@ BOOST_PYTHON_MODULE(datum) , "The error contained within the datum packet.") .def("get_datum", &datum_get_datum , "Get the data contained within the packet.") + .def("__eq__", &datum_eq) ; sprokit::python::register_type(0); @@ -147,3 +149,9 @@ datum_get_datum(sprokit::datum_t const& self) return object(any); } + +bool +datum_eq(sprokit::datum_t const& self, sprokit::datum_t const& other) +{ + return (*self == *other); +} From 8558c7d497b75bbdf1e16212be97c3977090ca1a Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Tue, 23 Jul 2013 13:41:27 -0400 Subject: [PATCH 12/16] Move any extraction from the Python GIL --- src/bindings/python/sprokit/pipeline/datum.cxx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/bindings/python/sprokit/pipeline/datum.cxx b/src/bindings/python/sprokit/pipeline/datum.cxx index 96fca3bf..bb149c5e 100644 --- a/src/bindings/python/sprokit/pipeline/datum.cxx +++ b/src/bindings/python/sprokit/pipeline/datum.cxx @@ -141,12 +141,12 @@ datum_get_error(sprokit::datum_t const& self) object datum_get_datum(sprokit::datum_t const& self) { + boost::any const any = self->get_datum(); + sprokit::python::python_gil const gil; (void)gil; - boost::any const any = self->get_datum(); - return object(any); } From 02960ab5054037107487f3b731ef92ec92cce34e Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Tue, 23 Jul 2013 13:41:50 -0400 Subject: [PATCH 13/16] Add a test for datum equality --- tests/sprokit/pipeline/test_datum.cxx | 104 ++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/tests/sprokit/pipeline/test_datum.cxx b/tests/sprokit/pipeline/test_datum.cxx index c9d3d6f9..e56e4439 100644 --- a/tests/sprokit/pipeline/test_datum.cxx +++ b/tests/sprokit/pipeline/test_datum.cxx @@ -149,3 +149,107 @@ IMPLEMENT_TEST(new) dat->get_datum(), "retrieving an int as a string"); } + +IMPLEMENT_TEST(equality) +{ + sprokit::datum_t const empty1 = sprokit::datum::empty_datum(); + sprokit::datum_t const empty2 = sprokit::datum::empty_datum(); + sprokit::datum_t const flush1 = sprokit::datum::flush_datum(); + sprokit::datum_t const flush2 = sprokit::datum::flush_datum(); + sprokit::datum_t const complete1 = sprokit::datum::complete_datum(); + sprokit::datum_t const complete2 = sprokit::datum::complete_datum(); + + sprokit::datum::error_t const errora = sprokit::datum::error_t("An error"); + sprokit::datum::error_t const errorb = sprokit::datum::error_t("Another error"); + + sprokit::datum_t const error1a = sprokit::datum::error_datum(errora); + sprokit::datum_t const error2a = sprokit::datum::error_datum(errora); + sprokit::datum_t const error1b = sprokit::datum::error_datum(errorb); + sprokit::datum_t const error2b = sprokit::datum::error_datum(errorb); + + boost::any const dummy1 = boost::any(); + boost::any const dummy2 = boost::any(); + boost::any const in_value1 = boost::any(1); + boost::any const in_value2 = boost::any(2); + sprokit::datum_t const value_dummy1 = sprokit::datum::new_datum(dummy1); + sprokit::datum_t const value_dummy2 = sprokit::datum::new_datum(dummy2); + sprokit::datum_t const value1 = sprokit::datum::new_datum(in_value1); + sprokit::datum_t const value2a = sprokit::datum::new_datum(in_value2); + sprokit::datum_t const value2b = sprokit::datum::new_datum(in_value2); + +#define test_equality(a, b, type, desc) \ + do \ + { \ + if (*a != *b) \ + { \ + TEST_ERROR("Expected a datum with " \ + "type " type " to be " \ + "equal: " desc); \ + } \ + } while (false) + +#define test_self_equality(a, type) \ + test_equality(a, a, type, "self comparison") + + test_self_equality(empty1, "empty"); + test_equality(empty1, empty2, "empty", "all empty data are equivalent"); + + test_self_equality(flush1, "flush"); + test_equality(flush1, flush2, "flush", "all flush data are equivalent"); + + test_self_equality(complete1, "complete"); + test_equality(complete1, complete2, "complete", "all complete data are equivalent"); + + test_self_equality(error1a, "error"); + test_equality(error1a, error2a, "error", "all error data with the same error string are equivalent"); + + test_self_equality(error1b, "error"); + test_equality(error1b, error2b, "error", "all error data with the same error string are equivalent"); + + test_self_equality(value_dummy1, "data"); + test_equality(value_dummy1, value_dummy2, "data", "empty internal data"); + + test_self_equality(value1, "data"); + /// \todo Is this possible? + //test_equality(value2a, value2b, "data", "same internal data value"); + +#undef test_self_equality +#undef test_equality + +#define test_inequality(a, b, atype, btype, desc) \ + do \ + { \ + if (*a == *b) \ + { \ + TEST_ERROR("Expected a datum with type " \ + atype " to be unequal to a " \ + "with type " btype ": " desc); \ + } \ + } while (false) + + test_inequality(empty1, flush1, "empty", "flush", "different types"); + test_inequality(empty1, complete1, "empty", "complete", "different types"); + test_inequality(empty1, error1a, "empty", "error", "different types"); + test_inequality(empty1, error1b, "empty", "error", "different types"); + test_inequality(empty1, value_dummy1, "empty", "data", "different types"); + test_inequality(empty1, value1, "empty", "data", "different types"); + + test_inequality(flush1, complete1, "flush", "complete", "different types"); + test_inequality(flush1, error1a, "flush", "error", "different types"); + test_inequality(flush1, error1b, "flush", "error", "different types"); + test_inequality(flush1, value_dummy1, "flush", "data", "different types"); + test_inequality(flush1, value1, "flush", "data", "different types"); + + test_inequality(complete1, error1a, "complete", "error", "different types"); + test_inequality(complete1, error1b, "complete", "error", "different types"); + test_inequality(complete1, value_dummy1, "complete", "data", "different types"); + test_inequality(complete1, value1, "complete", "data", "different types"); + + test_inequality(error1a, error1b, "error", "error", "different error strings"); + test_inequality(error1a, value_dummy1, "error", "data", "different types"); + test_inequality(error1a, value1, "error", "data", "different types"); + + test_inequality(value_dummy1, value1, "data", "data", "different internal data"); + +#undef test_inequality +} From b8d0628cc082de8b6d9d1a058bf36324745a814f Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Thu, 27 Feb 2014 18:10:03 -0500 Subject: [PATCH 14/16] Test Python bindings for datum equality --- tests/bindings/python/sprokit/pipeline/test-datum.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/bindings/python/sprokit/pipeline/test-datum.py b/tests/bindings/python/sprokit/pipeline/test-datum.py index 26033e8b..c59528b8 100755 --- a/tests/bindings/python/sprokit/pipeline/test-datum.py +++ b/tests/bindings/python/sprokit/pipeline/test-datum.py @@ -123,6 +123,16 @@ def test_error_(): test_error("An error datum does not have None as its data") +def test_compare(): + from sprokit.pipeline import datum + + d1 = datum.complete() + d2 = datum.complete() + + if not d1 == d2: + test_error("A complete datum is not equal to a complete datum") + + if __name__ == '__main__': import os import sys From ccd2e88ca9333797c826ec990633d71dec455a45 Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Thu, 12 Sep 2013 15:18:45 -0400 Subject: [PATCH 15/16] Use an inequality for capacity checks --- src/sprokit/pipeline/edge.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sprokit/pipeline/edge.cxx b/src/sprokit/pipeline/edge.cxx index 1de323ca..539a9b21 100644 --- a/src/sprokit/pipeline/edge.cxx +++ b/src/sprokit/pipeline/edge.cxx @@ -347,7 +347,7 @@ ::full_of_data() const return false; } - return (q.size() == capacity); + return (capacity <= q.size()); } void From db2ec265292efdc58321d05021a4f2409aa29fef Mon Sep 17 00:00:00 2001 From: Ben Boeckel Date: Thu, 27 Feb 2014 18:13:25 -0500 Subject: [PATCH 16/16] Quote some variable expansions --- tests/sprokit/pipeline/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/sprokit/pipeline/CMakeLists.txt b/tests/sprokit/pipeline/CMakeLists.txt index 5662b5e6..cbf5fbf9 100644 --- a/tests/sprokit/pipeline/CMakeLists.txt +++ b/tests/sprokit/pipeline/CMakeLists.txt @@ -149,9 +149,9 @@ function (sprokit_add_tooled_run_test group instance) endif () endif () - sprokit_add_tooled_test(${group} ${instance}-${scheduler}) + sprokit_add_tooled_test("${group}" "${instance}-${scheduler}") - set_tests_properties(test-${group}-${instance}-${scheduler} + set_tests_properties("test-${group}-${instance}-${scheduler}" PROPERTIES TIMEOUT 5)