Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ipc update #2339

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions src/helics/network/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@ set(TESTCORE_SOURCE_FILES test/TestBroker.cpp test/TestCore.cpp test/TestComms.c

set(INPROCCORE_SOURCE_FILES inproc/InprocBroker.cpp inproc/InprocCore.cpp inproc/InprocComms.cpp)

set(IPC_SOURCE_FILES ipc/IpcCore.cpp ipc/IpcBroker.cpp ipc/IpcComms.cpp ipc/IpcQueueHelper.cpp
# ipc/IpcBlockingPriorityQueue.cpp ipc/IpcBlockingPriorityQueueImpl.cpp
set(IPC_SOURCE_FILES
ipc/IpcCore.cpp
ipc/IpcBroker.cpp
ipc/IpcComms.cpp
ipc/IpcQueueHelper.cpp
ipc/IpcBlockingPriorityQueue.cpp
ipc/IpcBlockingPriorityQueueImpl.cpp
)

set(MPI_SOURCE_FILES mpi/MpiCore.cpp mpi/MpiBroker.cpp mpi/MpiComms.cpp mpi/MpiService.cpp)
Expand Down Expand Up @@ -62,8 +67,13 @@ set(TESTCORE_HEADER_FILES test/TestCore.h test/TestBroker.h test/TestComms.h)

set(INPROCCORE_HEADER_FILES inproc/InprocCore.h inproc/InprocBroker.h inproc/InprocComms.h)

set(IPC_HEADER_FILES ipc/IpcCore.h ipc/IpcBroker.h ipc/IpcComms.h ipc/IpcQueueHelper.h
# ipc/IpcBlockingPriorityQueue.hpp ipc/IpcBlockingPriorityQueueImpl.hpp
set(IPC_HEADER_FILES
ipc/IpcCore.h
ipc/IpcBroker.h
ipc/IpcComms.h
ipc/IpcQueueHelper.h
ipc/IpcBlockingPriorityQueue.hpp
ipc/IpcBlockingPriorityQueueImpl.hpp
)

set(ZMQ_HEADER_FILES
Expand Down
16 changes: 8 additions & 8 deletions src/helics/network/ipc/IpcBlockingPriorityQueue.cpp
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
/*
Copyright (c) 2017-2018,
Copyright (c) 2017-2022,
Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
Energy, LLC All rights reserved. See LICENSE file and DISCLAIMER for more details.
Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
*/
#include "IpcBlockingPriorityQueue.hpp"

#include "helics/external/optional.hpp"

#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>
#include <string>
#include <type_traits>
Expand Down Expand Up @@ -214,7 +214,7 @@ class BlockingPriorityQueue {
@return an optional object with an object of type T if available
*/
template<typename = std::enable_if<std::is_copy_assignable<T>::value>>
stx::optional<T> try_peek() const
std::optional<T> try_peek() const
{
std::lock_guard<std::mutex> lock(m_pullLock);
if (!priorityQueue.empty()) {
Expand All @@ -232,7 +232,7 @@ class BlockingPriorityQueue {
@return an optional containing the value if successful the optional will be empty if there is no
element in the queue
*/
stx::optional<T> try_pop();
std::optional<T> try_pop();

/** blocking call to wait on an object from the stack*/
T pop()
Expand Down Expand Up @@ -273,7 +273,7 @@ class BlockingPriorityQueue {
}

/** blocking call to wait on an object from the stack with timeout*/
stx::optional<T> pop(std::chrono::milliseconds timeout)
std::optional<T> pop(std::chrono::milliseconds timeout)
{
auto val = try_pop();
while (!val) {
Expand Down Expand Up @@ -364,7 +364,7 @@ depending on the number of consumers
};

template<typename T>
stx::optional<T> BlockingPriorityQueue<T>::try_pop()
std::optional<T> BlockingPriorityQueue<T>::try_pop()
{
std::lock_guard<std::mutex> pullLock(m_pullLock); // first pullLock
if (!priorityQueue.empty()) {
Expand Down
10 changes: 5 additions & 5 deletions src/helics/network/ipc/IpcBlockingPriorityQueue.hpp
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
/*
Copyright (c) 2017-2018,
Copyright (c) 2017-2022,
Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
Energy, LLC All rights reserved. See LICENSE file and DISCLAIMER for more details.
Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
*/
#pragma once

#include "helics/external/optional.hpp"

#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <chrono>
#include <optional>
#include <utility>

namespace helics {
Expand Down Expand Up @@ -58,7 +58,7 @@ most cases.
@details only available for copy assignable objects
@return an optional object with an object of type T if available
*/
stx::optional<std::pair<unsigned char*, int>> try_peek() const;
std::optional<std::pair<unsigned char*, int>> try_peek() const;

/** try to pop an object from the queue
@return an optional containing the value if successful the optional will be empty if there is no
Expand Down
11 changes: 4 additions & 7 deletions src/helics/network/ipc/IpcBlockingPriorityQueueImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ namespace ipc {
using namespace boost::interprocess; // NOLINT

/** default constructor*/
IpcBlockingPriorityQueueImpl::IpcBlockingPriorityQueueImpl(void* dataBlock,
size_t blockSize)
{
}
IpcBlockingPriorityQueueImpl::IpcBlockingPriorityQueueImpl(void* data, size_t blockSize) {}

/** clear the queue*/
void IpcBlockingPriorityQueueImpl::clear()
Expand Down Expand Up @@ -169,7 +166,7 @@ val the value to push on the queue
@return an optional object with an object of type T if available
*/
template<typename = std::enable_if<std::is_copy_assignable<T>::value>>
stx::optional<T> try_peek() const
std::optional<T> try_peek() const
{
std::lock_guard<std::mutex> lock(m_pullLock);
if (!priorityQueue.empty()) {
Expand All @@ -187,7 +184,7 @@ val the value to push on the queue
@return an optional containing the value if successful the optional will be empty if there is no
element in the queue
*/
stx::optional<T> try_pop();
std::optional<T> try_pop();

/** blocking call to wait on an object from the stack*/
T pop()
Expand Down Expand Up @@ -228,7 +225,7 @@ element in the queue
}

/** blocking call to wait on an object from the stack with timeout*/
stx::optional<T> pop(std::chrono::milliseconds timeout)
std::optional<T> pop(std::chrono::milliseconds timeout)
{
auto val = try_pop();
while (!val) {
Expand Down
20 changes: 10 additions & 10 deletions src/helics/network/ipc/IpcBlockingPriorityQueueImpl.hpp
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/*
Copyright (c) 2017-2018,
Copyright (c) 2017-2022,
Battelle Memorial Institute; Lawrence Livermore National Security, LLC; Alliance for Sustainable
Energy, LLC All rights reserved. See LICENSE file and DISCLAIMER for more details.
Energy, LLC. See the top-level NOTICE for additional details. All rights reserved.
SPDX-License-Identifier: BSD-3-Clause
*/
#pragma once

#include "helics/external/optional.hpp"

#include <boost/interprocess/mapped_region.hpp>
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/sync/interprocess_condition.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <chrono>
#include <optional>
#include <utility>

namespace helics {
Expand All @@ -24,11 +24,11 @@ namespace ipc {
/** class containing the raw data block implementation*/
class dataBlock {
private:
unsigned char* origin = nullptr;
unsigned char* next = nullptr;
size_t totalSize = 0;
dataIndex* next = nullptr;
int dataCount = 0;
unsigned char* origin{nullptr};
unsigned char* next{nullptr};
size_t totalSize{0};
dataIndex* next{nullptr};
int dataCount{0};

public:
dataBlock(unsigned char* newBlock, size_t blockSize);
Expand Down Expand Up @@ -112,7 +112,7 @@ most cases.
@details only available for copy assignable objects
@return an optional object with an object of type T if available
*/
stx::optional<std::pair<unsigned char*, int>> try_peek() const;
std::optional<std::pair<unsigned char*, int>> try_peek() const;

/** try to pop an object from the queue
@return an optional containing the value if successful the optional will be empty if there is no
Expand Down
Loading