Skip to content

Commit

Permalink
Add Lock-free Queue (backport #253) (#270)
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Jan 29, 2025
1 parent 5a3c63e commit f891f6c
Show file tree
Hide file tree
Showing 6 changed files with 807 additions and 1 deletion.
1 change: 1 addition & 0 deletions .github/workflows/jazzy-binary-build-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ jobs:
ros_distro: jazzy
ref_for_scheduled_build: jazzy
os_name: windows-2019
install_boost: true
3 changes: 2 additions & 1 deletion .github/workflows/rolling-binary-build-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ jobs:
# (github.event_name == 'workflow_dispatch')
uses: ros-controls/ros2_control_ci/.github/workflows/reusable-ros-tooling-win-build.yml@master
with:
ros_distro: jazzy
ros_distro: jazzy # no pre-compiled rolling version is available
ref_for_scheduled_build: master
os_name: windows-2019
install_boost: true
17 changes: 17 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ foreach(Dependency IN ITEMS ${THIS_PACKAGE_INCLUDE_DEPENDS})
find_package(${Dependency} REQUIRED)
endforeach()

if(POLICY CMP0167)
cmake_policy(SET CMP0167 NEW)
endif()
find_package(Boost COMPONENTS headers)
if(NOT Boost_headers_FOUND)
find_package(Boost REQUIRED)
endif()

add_library(realtime_tools SHARED
src/realtime_clock.cpp
src/realtime_helpers.cpp
Expand Down Expand Up @@ -74,6 +82,15 @@ if(BUILD_TESTING)
ament_add_gmock(realtime_buffer_tests test/realtime_buffer_tests.cpp)
target_link_libraries(realtime_buffer_tests realtime_tools)

ament_add_gmock(lock_free_queue_tests test/lock_free_queue_tests.cpp)
if(WIN32)
# atomic is not found on Windows, but also not needed
target_link_libraries(lock_free_queue_tests realtime_tools Boost::boost)
else()
# without adding atomic, clang throws a linker error
target_link_libraries(lock_free_queue_tests realtime_tools atomic Boost::boost)
endif()

ament_add_gmock(realtime_clock_tests test/realtime_clock_tests.cpp)
target_link_libraries(realtime_clock_tests realtime_tools)

Expand Down
333 changes: 333 additions & 0 deletions include/realtime_tools/lock_free_queue.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,333 @@
// Copyright 2025 PAL Robotics S.L.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/// \author Sai Kishor Kothakota

#ifndef REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_
#define REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_

#include <chrono>
#include <mutex>
#include <type_traits>
#include <utility>

#include <boost/lockfree/queue.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/lockfree/stack.hpp>

namespace
{
// Trait to check if the capacity is set
template <typename T>
struct has_capacity : std::false_type
{
};

template <typename T, std::size_t Capacity>
struct has_capacity<boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
: std::true_type
{
};

template <typename T, std::size_t Capacity>
struct has_capacity<boost::lockfree::queue<T, boost::lockfree::capacity<Capacity>>> : std::true_type
{
};

template <typename T, std::size_t Capacity, bool FixedSize>
struct has_capacity<boost::lockfree::queue<
T, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>> : std::true_type
{
};

// Traits to check if the queue is spsc_queue
template <typename T>
struct is_spsc_queue : std::false_type
{
};

template <typename T>
struct is_spsc_queue<boost::lockfree::spsc_queue<T>> : std::true_type
{
};

template <typename T, std::size_t Capacity>
struct is_spsc_queue<boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
: std::true_type
{
};

// Traits to get the capacity of the queue
// Default case: no capacity
template <typename T>
struct get_boost_lockfree_queue_capacity
{
static constexpr std::size_t value = 0; // Default to 0 if capacity is not defined
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity>
struct get_boost_lockfree_queue_capacity<
boost::lockfree::spsc_queue<T, boost::lockfree::capacity<Capacity>>>
{
static constexpr std::size_t value = Capacity;
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity>
struct get_boost_lockfree_queue_capacity<
boost::lockfree::queue<T, boost::lockfree::capacity<Capacity>>>
{
static constexpr std::size_t value = Capacity;
};

// Specialization for queues with capacity
template <typename T, std::size_t Capacity, bool FixedSize>
struct get_boost_lockfree_queue_capacity<boost::lockfree::queue<
T, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>>
{
static constexpr std::size_t value = Capacity;
};

} // namespace

namespace realtime_tools
{
template <typename DataType, typename LockFreeContainer>
class LockFreeQueueBase
/**
* @brief A base class for lock-free queues.
*
* This class provides a base implementation for lock-free queues with various functionalities
* such as pushing, popping, and checking the state of the queue. It supports both single-producer
* single-consumer (SPSC) and multiple-producer multiple-consumer (MPMC) queues.
*
* @tparam DataType The type of data to be stored in the queue.
* @tparam LockFreeContainer The underlying lock-free container type - Typically boost::lockfree::spsc_queue or boost::lockfree::queue with their own template parameters
*/
{
public:
using T = DataType;

/**
* @brief Construct a new LockFreeQueueBase object
* @note This constructor is enabled only if the LockFreeContainer has a capacity set
*/
template <
bool HasCapacity = has_capacity<LockFreeContainer>::value,
typename std::enable_if_t<HasCapacity, int> = 0>
LockFreeQueueBase() : capacity_(get_boost_lockfree_queue_capacity<LockFreeContainer>::value)
{
}

/**
* @brief Construct a new LockFreeQueueBase object
* @param capacity Capacity of the queue
* @note This constructor is enabled only if the LockFreeContainer has no capacity set
*/
template <
bool HasCapacity = has_capacity<LockFreeContainer>::value,
typename std::enable_if_t<!HasCapacity, int> = 1>
explicit LockFreeQueueBase(std::size_t capacity) : data_queue_(capacity), capacity_(capacity)
{
}

virtual ~LockFreeQueueBase() = default;

/**
* @brief Pop the data from the queue
* @param data Data to be popped
* @return true If the data is popped successfully
* @return false If the queue is empty or the data could not be popped
*/
[[nodiscard]] bool pop(T & data) { return data_queue_.pop(data); }

/**
* @brief Pop the data from the queue
* @param data Data to be popped
* @return true If the data is popped successfully
* @return false If the queue is empty or the data could not be popped
* @note This function is enabled only if the data type is convertible to the template type of the queue
*/
template <typename U>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> pop(U & data)
{
return data_queue_.pop(data);
}

/**
* @brief Push the data into the queue
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the queue is full or the data could not be pushed
*/

[[nodiscard]] bool push(const T & data)
{
if constexpr (is_spsc_queue<LockFreeContainer>::value) {
return data_queue_.push(data);
} else {
return data_queue_.bounded_push(data);
}
}

/**
* @brief Push the data into the queue
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the queue is full or the data could not be pushed
* @note This function is enabled only if the data type is convertible to the template type of the queue
*/
template <typename U>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> push(const U & data)
{
if constexpr (is_spsc_queue<LockFreeContainer>::value) {
return data_queue_.push(data);
} else {
return data_queue_.bounded_push(data);
}
}

/**
* @brief The bounded_push function pushes the data into the queue and pops the oldest data if the queue is full
* @param data Data to be pushed
* @return true If the data is pushed successfully
* @return false If the data could not be pushed
* @note This function is enabled only if the queue is a spsc_queue and only if the data type
* is convertible to the template type of the queue
* @note For a SPSC Queue, to be used in single-threaded applications
* @warning For a SPSC Queue, this method might not work as expected when used in multi-threaded applications
* if it is used with two different threads, one doing bounded_push and the other doing pop. In this case, the
* queue is no longer a single producer single consumer queue. So, the behavior might not be as expected.
*/
template <typename U>
[[nodiscard]] std::enable_if_t<std::is_convertible_v<T, U>, bool> bounded_push(const U & data)
{
if (!push(data)) {
T dummy;
data_queue_.pop(dummy);
return push(data);
}
return true;
}

/**
* @brief Get the latest data from the queue
* @param data Data to be filled with the latest data
* @return true If the data is filled with the latest data, false otherwise
* @return false If the queue is empty
* @note This function consumes all the data in the queue until the last data and returns the last element of the queue
*/
[[nodiscard]] bool get_latest(T & data)
{
return data_queue_.consume_all([&data](const T & d) { data = d; }) > 0;
}

/**
* @brief Check if the queue is empty
* @return true If the queue is empty
* @return false If the queue is not empty
* @note The result is only accurate, if no other thread modifies the queue. Therefore, it is rarely practical to use this value in program logic.
* @note For a SPSC Queue, it should only be called from the consumer thread where pop is called. If need to be called from producer thread, use write_available() instead.
*/
bool empty() const
{
if constexpr (is_spsc_queue<LockFreeContainer>::value) {
return data_queue_.read_available() == 0;
} else {
return data_queue_.empty();
}
}

/**
* @brief Get the capacity of the queue
* @return std::size_t Capacity of the queue
*/
size_t capacity() const { return capacity_; }

/**
* @brief Get the size of the queue
* @return std::size_t Size of the queue
* @note This function is enabled only if the queue is a spsc_queue
*/
template <
bool IsSPSCQueue = is_spsc_queue<LockFreeContainer>::value,
typename std::enable_if_t<IsSPSCQueue, int> = 0>
std::size_t size() const
{
return data_queue_.read_available();
}

/**
* @brief The method to check if the queue is lock free
* @return true If the queue is lock free, false otherwise
* @warning It only checks, if the queue head and tail nodes and the freelist can
* be modified in a lock-free manner. On most platforms, the whole implementation
* is lock-free, if this is true. Using c++0x-style atomics, there is no possibility
* to provide a completely accurate implementation, because one would need to test
* every internal node, which is impossible if further nodes will be allocated from
* the operating system.
* @link https://www.boost.org/doc/libs/1_74_0/doc/html/boost/lockfree/queue.html
*/
bool is_lock_free() const
{
return (is_spsc_queue<LockFreeContainer>::value) || data_queue_.is_lock_free();
}

/**
* @brief Get the lockfree container
* @return const LockFreeContainer& Reference to the lockfree container
*/
const LockFreeContainer & get_lockfree_container() const { return data_queue_; }

/**
* @brief Get the lockfree container
* @return LockFreeContainer& Reference to the lockfree container
*/
LockFreeContainer & get_lockfree_container() { return data_queue_; }

private:
LockFreeContainer data_queue_;
std::size_t capacity_;
}; // class

/**
* @brief Lock-free Single Producer Single Consumer Queue
* @tparam DataType Type of the data to be stored in the queue
* @tparam Capacity Capacity of the queue
*/
template <class DataType, std::size_t Capacity = 0>
using LockFreeSPSCQueue = std::conditional_t<
Capacity == 0, LockFreeQueueBase<DataType, boost::lockfree::spsc_queue<DataType>>,
LockFreeQueueBase<
DataType, boost::lockfree::spsc_queue<DataType, boost::lockfree::capacity<Capacity>>>>;

/**
* @brief Lock-free Multiple Producer Multiple Consumer Queue
* @tparam DataType Type of the data to be stored in the queue
* @tparam Capacity Capacity of the queue
* @tparam FixedSize Fixed size of the queue
*/
template <class DataType, std::size_t Capacity = 0, bool FixedSize = true>
using LockFreeMPMCQueue = std::conditional_t<
Capacity == 0,
LockFreeQueueBase<
DataType, boost::lockfree::queue<DataType, boost::lockfree::fixed_sized<FixedSize>>>,
LockFreeQueueBase<
DataType,
boost::lockfree::queue<
DataType, boost::lockfree::capacity<Capacity>, boost::lockfree::fixed_sized<FixedSize>>>>;

} // namespace realtime_tools
#endif // REALTIME_TOOLS__LOCK_FREE_QUEUE_HPP_
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

<buildtool_export_depend>ament_cmake</buildtool_export_depend>

<depend>libboost-dev</depend>
<depend>rclcpp</depend>
<depend>rclcpp_action</depend>
<depend>libcap-dev</depend>
Expand Down
Loading

0 comments on commit f891f6c

Please sign in to comment.