Skip to content

Commit

Permalink
update(userspace/libsinsp): elems in mpsc queue with same priority fo…
Browse files Browse the repository at this point in the history
…llow push order

Signed-off-by: Jason Dellaluce <[email protected]>
  • Loading branch information
jasondellaluce authored and poiana committed Nov 23, 2023
1 parent 7cb284c commit b1f439e
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 8 deletions.
33 changes: 28 additions & 5 deletions userspace/libsinsp/mpsc_priority_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ limitations under the License.
* @brief Concurrent priority queue optimized for multiple producer/single consumer
* (mpsc) use cases. This queue allows checking the top element against
* a provided predicate before popping. It is optimized for checking for
* emptyness before popping.
* emptyness before popping. The queue accepts only elements of pointer-type
* in the form of std::shared_ptr<T> or std::unique_ptr<T>. The priority queue
* bases its element ordering constraints on Cmp. Elements with equal priority
* follow the temporal order with which they have been pushed.
*/
template<typename Elm, typename Cmp, typename Mtx = std::mutex>
class mpsc_priority_queue
Expand Down Expand Up @@ -56,7 +59,7 @@ class mpsc_priority_queue
std::scoped_lock<Mtx> lk(m_mtx);
if (m_capacity == 0 || m_queue.size() < m_capacity)
{
m_queue.push(queue_elm{std::move(e)});
m_queue.push(queue_elm{std::move(e), m_elem_counter++});
m_queue_top = m_queue.top().elm.get();
return true;
}
Expand Down Expand Up @@ -155,16 +158,36 @@ class mpsc_priority_queue
private:
using elm_ptr = typename Elm::element_type*;

// workaround to make unique_ptr usable when copying the queue top
// which is const unique<ptr>& and denies moving
struct queue_elm
{
inline bool operator < (const queue_elm& r) const {return Cmp{}(*elm.get(), *r.elm.get());}
inline bool operator < (const queue_elm& r) const
{
// we check if this elem is less than the other. If the comparison
// gives the same result when inverting the operands, then we can
// assume them being equal.
Cmp c{};
auto res = c(*elm.get(), *r.elm.get());
if (res == c(*r.elm.get(), *elm.get()))
{
// if elements have the same priority, order them by
// temporal order of arrival in the queue by using an atomic
// logical clock (counter).
// note(jasondellaluce): this approach is vulnerable to integer overflow
// that would cause the second-level ordering guarantee to be broken,
// but given that we use a uint64_t counter we find this unlikely
return std::greater_equal<uint64_t>{}(num, r.num);
}
return res;
}
// using mutable is a workaround to make unique_ptr usable when copying
// the queue top(), which is returned a const unique<ptr>& and denies moving
mutable Elm elm;
uint64_t num;
};

const size_t m_capacity;
std::priority_queue<queue_elm> m_queue{};
std::atomic<elm_ptr> m_queue_top{nullptr};
std::atomic<uint64_t> m_elem_counter{0};
Mtx m_mtx;
};
55 changes: 52 additions & 3 deletions userspace/libsinsp/test/mpsc_priority_queue.ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,61 @@ limitations under the License.
#include <thread>
#include <chrono>

using val_t = std::unique_ptr<int>;
TEST(mpsc_priority_queue, order_consistency)
{
struct val
{
int v;
int order;
};

struct val_less
{
bool operator()(const val& l, const val& r)
{
return std::greater_equal<int>{}(l.v, r.v);
}
};

using val_t = std::unique_ptr<val>;

mpsc_priority_queue<val_t, val_less> q;
for (int i = 0; i < 100; i++)
{
for (int j = 0; j < 100; j++)
{
// j is used only for tracking the order in which elements
// are pushed for checking it later
q.push(val_t{new val{i,j}});
}
}

val_t cur{nullptr};
val_t prev{nullptr};
while (!q.empty())
{
ASSERT_TRUE(q.try_pop(cur));
if (prev != nullptr)
{
ASSERT_GE(cur->v, prev->v);
if (cur->v == prev->v)
{
ASSERT_GT(cur->order, prev->order);
}
}
prev = std::move(cur);
}

}

// note: emscripten does not support launching threads
#ifndef __EMSCRIPTEN__

TEST(mpsc_priority_queue, single_producer)
TEST(mpsc_priority_queue, single_concurrent_producer)
{
using val_t = std::unique_ptr<int>;
const int max_value = 1000;

mpsc_priority_queue<val_t, std::greater_equal<int>> q;

// single producer
Expand Down Expand Up @@ -69,10 +116,12 @@ TEST(mpsc_priority_queue, single_producer)
ASSERT_EQ(failed, 0);
}

TEST(mpsc_priority_queue, multi_producer)
TEST(mpsc_priority_queue, multi_concurrent_producers)
{
using val_t = std::unique_ptr<int>;
const int num_values = 1000;
const int num_producers = 10;

mpsc_priority_queue<val_t, std::greater_equal<int>> q;
std::atomic<int> counter{1};

Expand Down

0 comments on commit b1f439e

Please sign in to comment.