Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Ready] William's queue - SPSC and MPMC #103

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
493 changes: 493 additions & 0 deletions cds/container/williams_queue.h

Large diffs are not rendered by default.

203 changes: 203 additions & 0 deletions cds/container/williams_queue_spsc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
#ifndef CDSLIB_CONTAINER_WilliamsQueueSPSC_H
#define CDSLIB_CONTAINER_WilliamsQueueSPSC_H

#include <cds/algo/flat_combining.h>
#include <cds/algo/elimination_opt.h>

namespace cds
{
namespace container
{
namespace williams_queue_spsc
{
struct traits
{
/// Node allocator
typedef CDS_DEFAULT_ALLOCATOR allocator;

/// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
typedef atomicity::empty_item_counter item_counter;
};

template <typename... Options>
struct make_traits {
#ifdef CDS_DOXYGEN_INVOKED
typedef implementation_defined type; ///< Metafunction result
#else
typedef typename cds::opt::make_options<
typename cds::opt::find_type_traits< traits, Options... >::type
, Options...
>::type type;
#endif
};
}

template <typename T, typename traits = williams_queue_spsc::traits>
class WilliamsQueueSPSC
{
public:
typedef T value_type;
typedef typename traits::item_counter item_counter;
private:
struct node
{
std::shared_ptr<T> data;
node *next;

node() : next(nullptr) {}
};

item_counter itemCounter;

std::atomic<node *> head;
std::atomic<node *> tail;



bool enqueue(std::shared_ptr<T> value)
{
node *p = new node;
node *const old_tail = tail.load();
old_tail->data.swap(value);
old_tail->next = p;
tail.store(p);
itemCounter++;
return true;
}

node *dequeue_head()
{
node *const old_head = head.load();
if (old_head == tail.load())
{
return nullptr;
}
head.store(old_head->next);
itemCounter--;
return old_head;
}
public:

WilliamsQueueSPSC() : head(new node), tail(head.load()) {}

WilliamsQueueSPSC(const WilliamsQueueSPSC &other) = delete;
WilliamsQueueSPSC &operator=(const WilliamsQueueSPSC &other) = delete;

~WilliamsQueueSPSC()
{
while (node *const old_head = head.load())
{
head.store(old_head->next);
delete old_head;
}
}




bool enqueue(value_type const& value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(value));
return enqueue(new_data);
}


bool enqueue(value_type&& value)
{
std::shared_ptr<T> new_data(std::make_shared<T>(value));
return enqueue(new_data);
}


bool push(value_type const& value)
{
return enqueue(value);
}

bool push(value_type&& value)
{
return enqueue(value);
}

template <typename Func>
bool enqueue_with(Func f)
{
value_type val;
f(val);
return enqueue(val);
}

template <typename Func>
bool push_with(Func f)
{
return enqueue_with(f);
}



template <typename Func>
bool dequeue_with(Func f)
{
node *old_head = dequeue_head();

if (!old_head)
{
return false;
}

f(*old_head->data);
delete old_head;

return true;
}

template <typename Func>
bool pop_with(Func f)
{
return dequeue_with(f);
}



bool dequeue(value_type& destination)
{
return dequeue_with([&destination](value_type& src) { destination = src; });
}

bool pop(value_type& destination)
{
return dequeue(destination);
}

void clear()
{
value_type v;
while (dequeue(v));
}

bool empty() const
{
return head.load() == tail.load();
}

size_t size() const
{
return itemCounter.value();
}

template <typename... Args>
bool emplace(Args&&... args)
{
value_type val(std::forward<Args>(args)...);
return enqueue(val);
}

std::nullptr_t statistics() const
{
return nullptr;
}
};
};
}

#endif // #ifndef CDSLIB_CONTAINER_WilliamsQueueSPSC_H
2 changes: 2 additions & 0 deletions projects/Win/vc141/cds.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -756,6 +756,8 @@
<ClInclude Include="..\..\..\cds\container\striped_set\std_set.h" />
<ClInclude Include="..\..\..\cds\container\striped_set\std_vector.h" />
<ClInclude Include="..\..\..\cds\container\weak_ringbuffer.h" />
<ClInclude Include="..\..\..\cds\container\williams_queue.h" />
<ClInclude Include="..\..\..\cds\container\williams_queue_spsc.h" />
<ClInclude Include="..\..\..\cds\details\binary_functor_wrapper.h" />
<ClInclude Include="..\..\..\cds\details\bit_reverse_counter.h" />
<ClInclude Include="..\..\..\cds\details\bounded_container.h" />
Expand Down
6 changes: 6 additions & 0 deletions projects/Win/vc141/cds.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,12 @@
<ClInclude Include="..\..\..\cds\details\throw_exception.h">
<Filter>Header Files\cds\details</Filter>
</ClInclude>
<ClInclude Include="..\..\..\cds\container\williams_queue.h">
<Filter>Header Files\cds\container</Filter>
</ClInclude>
<ClInclude Include="..\..\..\cds\container\williams_queue_spsc.h">
<Filter>Header Files\cds\container</Filter>
</ClInclude>
<ClInclude Include="..\..\..\cds\compiler\gcc\arm8\backoff.h">
<Filter>Header Files\cds\compiler\gcc\arm8</Filter>
</ClInclude>
Expand Down
2 changes: 2 additions & 0 deletions projects/Win/vc141/gtest-queue.vcxproj
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
<ClCompile Include="..\..\..\test\unit\queue\segmented_queue_hp.cpp" />
<ClCompile Include="..\..\..\test\unit\queue\vyukov_mpmc_queue.cpp" />
<ClCompile Include="..\..\..\test\unit\queue\weak_ringbuffer.cpp" />
<ClCompile Include="..\..\..\test\unit\queue\williams_queue.cpp" />
<ClCompile Include="..\..\..\test\unit\queue\williams_queue_spsc.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\test\unit\queue\test_bounded_queue.h" />
Expand Down
6 changes: 6 additions & 0 deletions projects/Win/vc141/gtest-queue.vcxproj.filters
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@
<ClCompile Include="..\..\..\test\unit\queue\weak_ringbuffer.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\unit\queue\williams_queue.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\..\..\test\unit\queue\williams_queue_spsc.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\..\..\test\unit\queue\test_generic_queue.h">
Expand Down
3 changes: 2 additions & 1 deletion test/stress/queue/pop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ namespace {
CDSSTRESS_FCDeque( queue_pop )
CDSSTRESS_RWQueue( queue_pop )
CDSSTRESS_StdQueue( queue_pop )

CDSSTRESS_WilliamsQueue( queue_pop )

#undef CDSSTRESS_Queue_F
#define CDSSTRESS_Queue_F( test_fixture, type_name ) \
TEST_F( test_fixture, type_name ) \
Expand Down
1 change: 1 addition & 0 deletions test/stress/queue/push.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ namespace {
CDSSTRESS_FCDeque( queue_push )
CDSSTRESS_RWQueue( queue_push )
CDSSTRESS_StdQueue( queue_push )
CDSSTRESS_WilliamsQueue( queue_push )

#undef CDSSTRESS_Queue_F
#define CDSSTRESS_Queue_F( test_fixture, type_name ) \
Expand Down
1 change: 1 addition & 0 deletions test/stress/queue/push_pop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ namespace {
CDSSTRESS_FCDeque_HeavyValue( fc_with_heavy_value )
CDSSTRESS_RWQueue( simple_queue_push_pop )
CDSSTRESS_StdQueue( simple_queue_push_pop )
CDSSTRESS_WilliamsQueue( simple_queue_push_pop )

#undef CDSSTRESS_Queue_F
#define CDSSTRESS_Queue_F( test_fixture, type_name ) \
Expand Down
32 changes: 32 additions & 0 deletions test/stress/queue/queue_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
#include <cds/container/fcdeque.h>
#include <cds/container/segmented_queue.h>
#include <cds/container/weak_ringbuffer.h>
#include <cds/container/williams_queue.h>
#include <cds/container/williams_queue_spsc.h>

#include <cds/gc/hp.h>
#include <cds/gc/dhp.h>
Expand Down Expand Up @@ -423,6 +425,28 @@ namespace fc_details{
{};
typedef cds::container::RWQueue< Value, traits_RWQueue_mutex > RWQueue_mutex;

// WilliamsQueue
typedef cds::container::WilliamsQueue< Value > WilliamsQueue_default;

struct traits_WilliamsQueue_ic :
public cds::container::williams_queue::make_traits<
cds::opt::item_counter<cds::atomicity::empty_item_counter>
>::type
{};

typedef cds::container::WilliamsQueue< Value, traits_WilliamsQueue_ic > WilliamsQueue_ic;

// WilliamsQueueSPSC
typedef cds::container::WilliamsQueueSPSC< Value > WilliamsQueueSPSC_default;

struct traits_WilliamsQueueSPSC_ic :
public cds::container::williams_queue_spsc::make_traits<
cds::opt::item_counter<cds::atomicity::empty_item_counter>
>::type
{};

typedef cds::container::WilliamsQueueSPSC< Value, traits_WilliamsQueueSPSC_ic > WilliamsQueueSPSC_ic;

// FCQueue
struct traits_FCQueue_stat:
public cds::container::fcqueue::make_traits<
Expand Down Expand Up @@ -861,6 +885,14 @@ namespace cds_test {
CDSSTRESS_Queue_F( test_fixture, RWQueue_mutex ) \
CDSSTRESS_RWQueue_1( test_fixture )

#define CDSSTRESS_WilliamsQueue( test_fixture ) \
CDSSTRESS_Queue_F( test_fixture, WilliamsQueue_default ) \
CDSSTRESS_Queue_F( test_fixture, WilliamsQueue_ic )

#define CDSSTRESS_WilliamsQueueSPSC( test_fixture ) \
CDSSTRESS_Queue_F( test_fixture, WilliamsQueueSPSC_default ) \
CDSSTRESS_Queue_F( test_fixture, WilliamsQueueSPSC_ic )

#define CDSSTRESS_SegmentedQueue( test_fixture ) \
CDSSTRESS_Queue_F( test_fixture, SegmentedQueue_HP_spin ) \
CDSSTRESS_Queue_F( test_fixture, SegmentedQueue_HP_spin_padding ) \
Expand Down
1 change: 1 addition & 0 deletions test/stress/queue/random.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ namespace {
CDSSTRESS_FCDeque( queue_random )
CDSSTRESS_RWQueue( queue_random )
CDSSTRESS_StdQueue( queue_random )
CDSSTRESS_WilliamsQueue( queue_random )

#undef CDSSTRESS_Queue_F
#define CDSSTRESS_Queue_F( test_fixture, type_name ) \
Expand Down
3 changes: 2 additions & 1 deletion test/stress/queue/spsc_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ namespace {
//CDSSTRESS_FCDeque( spsc_queue )
//CDSSTRESS_RWQueue( spsc_queue )
//CDSSTRESS_StdQueue( spsc_queue )

CDSSTRESS_WilliamsQueueSPSC(spsc_queue)

#undef CDSSTRESS_Queue_F
#define CDSSTRESS_Queue_F( test_fixture, type_name ) \
TEST_F( test_fixture, type_name ) \
Expand Down
4 changes: 3 additions & 1 deletion test/unit/queue/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ set(PACKAGE_NAME unit-queue)

set(CDSGTEST_QUEUE_SOURCES
../main.cpp
basket_queue_hp.cpp
basket_queue_hp.cpp
basket_queue_dhp.cpp
fcqueue.cpp
moirqueue_hp.cpp
Expand All @@ -16,6 +16,8 @@ set(CDSGTEST_QUEUE_SOURCES
segmented_queue_dhp.cpp
vyukov_mpmc_queue.cpp
weak_ringbuffer.cpp
williams_queue.cpp
williams_queue_spsc.cpp
intrusive_basket_queue_hp.cpp
intrusive_basket_queue_dhp.cpp
intrusive_fcqueue.cpp
Expand Down
Loading