Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dev/deadline-push #3

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/bindings/python/sprokit/pipeline/datum.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*ckwg +29
* Copyright 2011-2012 by Kitware, Inc.
* Copyright 2011-2013 by Kitware, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -59,6 +59,7 @@ static sprokit::datum_t new_datum(object const& obj);
static sprokit::datum::type_t datum_type(sprokit::datum_t const& self);
static sprokit::datum::error_t datum_get_error(sprokit::datum_t const& self);
static object datum_get_datum(sprokit::datum_t const& self);
static bool datum_eq(sprokit::datum_t const& self, sprokit::datum_t const& other);

BOOST_PYTHON_MODULE(datum)
{
Expand Down Expand Up @@ -97,6 +98,7 @@ BOOST_PYTHON_MODULE(datum)
, "The error contained within the datum packet.")
.def("get_datum", &datum_get_datum
, "Get the data contained within the packet.")
.def("__eq__", &datum_eq)
;

sprokit::python::register_type<std::string>(0);
Expand Down Expand Up @@ -139,11 +141,17 @@ datum_get_error(sprokit::datum_t const& self)
object
datum_get_datum(sprokit::datum_t const& self)
{
boost::any const any = self->get_datum<boost::any>();

sprokit::python::python_gil const gil;

(void)gil;

boost::any const any = self->get_datum<boost::any>();

return object(any);
}

bool
datum_eq(sprokit::datum_t const& self, sprokit::datum_t const& other)
{
return (*self == *other);
}
48 changes: 47 additions & 1 deletion src/bindings/python/sprokit/pipeline/edge.cxx
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*ckwg +29
* Copyright 2011-2012 by Kitware, Inc.
* Copyright 2011-2013 by Kitware, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -32,8 +32,10 @@
#include <sprokit/pipeline/edge.h>
#include <sprokit/pipeline/stamp.h>

#include <sprokit/python/util/python_convert_optional.h>
#include <sprokit/python/util/python_gil.h>

#include <boost/chrono/duration.hpp>
#include <boost/python/suite/indexing/vector_indexing_suite.hpp>
#include <boost/python/class.hpp>
#include <boost/python/module.hpp>
Expand All @@ -46,12 +48,17 @@

using namespace boost::python;

static bool edge_datum_eq(sprokit::edge_datum_t const& self, sprokit::edge_datum_t const& rhs);
static bool edge_try_push_datum(sprokit::edge_t const& self, sprokit::edge_datum_t const& datum, double duration);
static boost::optional<sprokit::edge_datum_t> edge_try_get_datum(sprokit::edge_t const& self, double duration);

BOOST_PYTHON_MODULE(edge)
{
class_<sprokit::edge_datum_t>("EdgeDatum"
, no_init)
.def(init<>())
.def(init<sprokit::datum_t, sprokit::stamp_t>())
.def("__eq__", &edge_datum_eq)
.def_readwrite("datum", &sprokit::edge_datum_t::datum)
.def_readwrite("stamp", &sprokit::edge_datum_t::stamp)
;
Expand All @@ -64,6 +71,8 @@ BOOST_PYTHON_MODULE(edge)
.def(vector_indexing_suite<sprokit::edges_t>())
;

sprokit::python::register_optional_converter<sprokit::edge_datum_t>("EdgeDatumOpt", "An optional edge datum.");

class_<sprokit::edge, sprokit::edge_t, boost::noncopyable>("Edge"
, "A communication channel between processes."
, no_init)
Expand All @@ -87,6 +96,12 @@ BOOST_PYTHON_MODULE(edge)
, "Returns the next datum packet from the edge.")
.def("pop_datum", &sprokit::edge::pop_datum
, "Remove the next datum packet from the edge.")
.def("try_push_datum", &edge_try_push_datum
, (arg("datum"), arg("duration"))
, "Pushes a datum packet into the edge and returns True or returns False if unable to meet the duration.")
.def("try_get_datum", &edge_try_get_datum
, (arg("duration"))
, "Returns the next datum packet from the edge, removing it in the process or None if unable to meet the duration.")
.def("set_upstream_process", &sprokit::edge::set_upstream_process
, (arg("process"))
, "Set the process which is feeding data into the edge.")
Expand All @@ -101,3 +116,34 @@ BOOST_PYTHON_MODULE(edge)
.def_readonly("config_capacity", &sprokit::edge::config_capacity)
;
}

bool
edge_datum_eq(sprokit::edge_datum_t const& self, sprokit::edge_datum_t const& rhs)
{
return (self == rhs);
}

namespace
{

typedef boost::chrono::duration<double> py_duration_t;

}

bool
edge_try_push_datum(sprokit::edge_t const& self, sprokit::edge_datum_t const& datum, double duration)
{
py_duration_t const duration_sec = py_duration_t(duration);
sprokit::edge::duration_t const duration_edge = boost::chrono::duration_cast<sprokit::edge::duration_t>(duration_sec);

return self->try_push_datum(datum, duration_edge);
}

boost::optional<sprokit::edge_datum_t>
edge_try_get_datum(sprokit::edge_t const& self, double duration)
{
py_duration_t const duration_sec = py_duration_t(duration);
sprokit::edge::duration_t const duration_edge = boost::chrono::duration_cast<sprokit::edge::duration_t>(duration_sec);

return self->try_get_datum(duration_edge);
}
5 changes: 3 additions & 2 deletions src/sprokit/pipeline/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,12 @@ sprokit_add_library(sprokit_pipeline SHARED
${pipeline_headers}
${pipeline_private_headers})
target_link_libraries(sprokit_pipeline
LINK_PRIVATE
LINK_PUBLIC
${Boost_CHRONO_LIBRARY}
${Boost_SYSTEM_LIBRARY}
LINK_PRIVATE
${Boost_DATE_TIME_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
${Boost_SYSTEM_LIBRARY}
${Boost_THREAD_LIBRARY}
${CMAKE_DL_LIBS}
${CMAKE_THREAD_LIBS_INIT})
Expand Down
57 changes: 57 additions & 0 deletions src/sprokit/pipeline/datum.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,46 @@ ::get_error() const
return m_error;
}

static bool any_equal(boost::any const& a, boost::any const& b);

bool
datum
::operator == (datum const& dat) const
{
if (this == &dat)
{
return true;
}

if (m_type != dat.m_type)
{
return false;
}

bool ret = false;

switch (m_type)
{
case data:
ret = any_equal(m_datum, dat.m_datum);
break;
case empty:
case flush:
case complete:
ret = true;
break;
case error:
ret = (m_error == dat.m_error);
break;
case invalid:
default:
ret = false;
break;
}

return ret;
}

datum
::datum(type_t ty)
: m_type(ty)
Expand Down Expand Up @@ -167,6 +207,23 @@ bad_datum_cast_exception
{
}

bool
any_equal(boost::any const& a, boost::any const& b)
{
if (a.empty() && b.empty())
{
return true;
}

if (a.type() != b.type())
{
return false;
}

// Be safe.
return false;
}

char const*
string_for_type(datum::type_t type)
{
Expand Down
15 changes: 15 additions & 0 deletions src/sprokit/pipeline/datum.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "types.h"

#include <boost/any.hpp>
#include <boost/operators.hpp>

#include <string>

Expand All @@ -56,6 +57,7 @@ namespace sprokit
* \ingroup base_classes
*/
class SPROKIT_PIPELINE_EXPORT datum
: boost::equality_comparable<sprokit::datum>
{
public:
/// Information about an error that occurred within a process.
Expand Down Expand Up @@ -148,6 +150,19 @@ class SPROKIT_PIPELINE_EXPORT datum
*/
template <typename T>
T get_datum() const;

/**
* \brief Compare two data for equality.
*
* \note This returns false for two data packets which point to the same
* internal data since \c boost::any does not give access to it without
* knowing the type.
*
* \param dat The datum to compare to.
*
* \returns True if \p dat and \c *this definitely have the same value, false otherwise.
*/
bool operator == (datum const& dat) const;
private:
SPROKIT_PIPELINE_NO_EXPORT datum(type_t ty);
SPROKIT_PIPELINE_NO_EXPORT datum(error_t const& err);
Expand Down
Loading