Skip to content

Commit

Permalink
Implented suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: AssemblyJohn <[email protected]>
  • Loading branch information
AssemblyJohn committed Dec 13, 2024
1 parent 0d7e93e commit 6ce05ae
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 36 deletions.
46 changes: 12 additions & 34 deletions include/ocpp/common/safe_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// SPDX-License-Identifier: Apache-2.0
// Copyright 2020 - 2023 Pionix GmbH and Contributors to EVerest
// Copyright 2020 - 2024 Pionix GmbH and Contributors to EVerest

#pragma once

Expand All @@ -9,19 +9,10 @@

namespace ocpp {

enum class EThreadNotifyPolicy {
// Never notify the waiting thread
ThreadNotify_Never,
// Notify the waiting thread when we push an element in the queue
ThreadNotify_Push,
// Notify the waiting thread when we pop an element from the queue
ThreadNotify_Pop,
// Always notify a waiting thread on all operations
ThreadNotify_Always,
};

/// \brief Thread safe message queue
template <typename T, EThreadNotifyPolicy Policy = EThreadNotifyPolicy::ThreadNotify_Push> class SafeQueue {
/// \brief Thread safe message queue. Holds a conditional variable
/// that can be waited upon. Will take up the waiting thread on each
/// operation of push/pop/clear
template <typename T> class SafeQueue {
using safe_queue_reference = typename std::queue<T>::reference;
using safe_queue_const_reference = typename std::queue<T>::const_reference;

Expand Down Expand Up @@ -52,10 +43,7 @@ template <typename T, EThreadNotifyPolicy Policy = EThreadNotifyPolicy::ThreadNo
// Unlock here and notify
lock.unlock();

if constexpr (Policy == EThreadNotifyPolicy::ThreadNotify_Always ||
Policy == EThreadNotifyPolicy::ThreadNotify_Pop) {
notify_waiting_thread();
}
notify_waiting_thread();

return front;
}
Expand All @@ -67,10 +55,7 @@ template <typename T, EThreadNotifyPolicy Policy = EThreadNotifyPolicy::ThreadNo
queue.push(value);
}

if constexpr (Policy == EThreadNotifyPolicy::ThreadNotify_Always ||
Policy == EThreadNotifyPolicy::ThreadNotify_Push) {
notify_waiting_thread();
}
notify_waiting_thread();
}

/// \brief Queues an element and notifies any threads waiting on the internal conditional variable
Expand All @@ -80,10 +65,7 @@ template <typename T, EThreadNotifyPolicy Policy = EThreadNotifyPolicy::ThreadNo
queue.push(value);
}

if constexpr (Policy == EThreadNotifyPolicy::ThreadNotify_Always ||
Policy == EThreadNotifyPolicy::ThreadNotify_Push) {
notify_waiting_thread();
}
notify_waiting_thread();
}

/// \brief Clears the queue
Expand All @@ -95,23 +77,19 @@ template <typename T, EThreadNotifyPolicy Policy = EThreadNotifyPolicy::ThreadNo
empty.swap(queue);
}

if constexpr (Policy != EThreadNotifyPolicy::ThreadNotify_Never) {
// Clear should make all waiting threads
// wake to check for other states
notify_waiting_thread();
}
notify_waiting_thread();
}

/// \brief Waits for the queue to receive an element
/// \param timeout to wait for an element, pass in a value <= 0 to wait indefinitely
inline void wait_on_queue_element(std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
wait_on_queue_element_predicate([]() { return false; }, timeout);
wait_on_queue_element_or_predicate([]() { return false; }, timeout);
}

/// \brief Same as 'wait_on_queue' but receives an additional predicate to wait upon
template <class Predicate>
inline void wait_on_queue_element_predicate(Predicate pred,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
inline void wait_on_queue_element_or_predicate(Predicate pred,
std::chrono::milliseconds timeout = std::chrono::milliseconds(0)) {
std::unique_lock<std::mutex> lock(mutex);

if (timeout.count() > 0) {
Expand Down
2 changes: 1 addition & 1 deletion include/ocpp/common/websocket/websocket_libwebsockets.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ class WebsocketLibwebsockets final : public WebsocketBase {
std::shared_ptr<ConnectionData> conn_data;

// Queue of outgoing messages, notify thread only when we remove messages
SafeQueue<std::shared_ptr<WebsocketMessage>, EThreadNotifyPolicy::ThreadNotify_Pop> message_queue;
SafeQueue<std::shared_ptr<WebsocketMessage>> message_queue;

std::unique_ptr<std::thread> recv_message_thread;
SafeQueue<std::string> recv_message_queue;
Expand Down
2 changes: 1 addition & 1 deletion lib/ocpp/common/websocket/websocket_libwebsockets.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1581,7 +1581,7 @@ void WebsocketLibwebsockets::thread_deferred_callback_queue() {
while (true) {
std::function<void()> callback;
{
this->deferred_callback_queue.wait_on_queue_element_predicate(
this->deferred_callback_queue.wait_on_queue_element_or_predicate(
[this]() { return this->stop_deferred_handler.load(); });

if (stop_deferred_handler and this->deferred_callback_queue.empty()) {
Expand Down

0 comments on commit 6ce05ae

Please sign in to comment.