Skip to content

Commit

Permalink
Implement TS_call, fix TS_latch, update TS_pool
Browse files Browse the repository at this point in the history
  • Loading branch information
BenSokol committed Nov 2, 2019
1 parent 4dde3fc commit b54dd2e
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 60 deletions.
18 changes: 5 additions & 13 deletions TS_call.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* @Author: Ben Sokol <Ben>
* @Email: [email protected]
* @Created: February 15th, 2019 [2:36pm]
* @Modified: February 22nd, 2019 [2:21pm]
* @Modified: November 1st, 2019 [7:18pm]
* @Version: 1.0.0
*
* Copyright (C) 2019 by Ben Sokol. All Rights Reserved.
Expand All @@ -12,22 +12,14 @@
#ifndef TS_CALL_HPP
#define TS_CALL_HPP

#error TS_call.hpp is not complete. DO NOT USE.

#include <functional>
#include <mutex>
#include <type_traits>

namespace TS {
// template <typename return_type, typename mtx_type>
// return_type call(mtx_type &mtx, std::function<return_type()> func) {
// std::lock_guard<mtx_type> lg(mtx);
// return func();
// }

template <typename return_type, typename context_type, typename mtx_type, typename... Args>
return_type call(mtx_type &mtx, context_type context, return_type (*func)(Args...), Args... args) {
template <typename mtx_type, typename function_type, typename... Args>
typename std::result_of<function_type(Args...)>::type call(mtx_type &mtx, function_type &&f, Args &&... args) {
std::lock_guard<mtx_type> lg(mtx);
return func(args...);
return f(args...);
}
} // namespace TS

Expand Down
52 changes: 40 additions & 12 deletions TS_latch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* @Author: Ben Sokol <Ben>
* @Email: [email protected]
* @Created: February 19th, 2019 [12:28pm]
* @Modified: April 11th, 2019 [6:05pm]
* @Modified: November 1st, 2019 [7:13pm]
* @Version: 1.0.0
*
* Copyright (C) 2019 by Ben Sokol. All Rights Reserved.
Expand All @@ -12,8 +12,6 @@
#include <condition_variable>
#include <shared_mutex>

#include "UTL_assert.h"

#include "TS_latch.hpp"


Expand All @@ -29,25 +27,52 @@ namespace TS {
mCond.notify_all();
}

void Latch::count_down_and_wait() {
void Latch::clear_and_reset(size_t newThreshold) {
std::shared_lock<TS_LATCH_mutex_type> lck(mMtx);
if (newThreshold == 0) {
throw std::invalid_argument("newThreshold must be greater than zero.");
}
mCond.notify_all();
mThreshold = newThreshold;
mCount = newThreshold;
}

void Latch::reset(size_t newThreshold) {
std::shared_lock<TS_LATCH_mutex_type> lck(mMtx);
mCount = mCount - 1;
if (newThreshold == 0) {
throw std::invalid_argument("newThreshold must be greater than zero.");
}
mThreshold = newThreshold;
mCount = newThreshold;
}

void Latch::count_down_and_wait(size_t n) {
std::shared_lock<TS_LATCH_mutex_type> lck(mMtx);
if (n > mCount) {
mCount = 0;
}
else {
mCount = mCount - n;
}

if (mCount == 0) {
mCount = mThreshold;
mCond.notify_all();
return;
}

mCond.wait(lck);
else {
mCond.wait(lck);
}
}

void Latch::count_down(size_t n) {
std::shared_lock<TS_LATCH_mutex_type> lck(mMtx);
if (n < mCount) {
throw std::invalid_argument("n must be less than count.");
if (n > mCount) {
mCount = 0;
}
else {
mCount = mCount - n;
}

if (mCount == 0) {
mCount = mThreshold;
mCond.notify_all();
}
}
Expand All @@ -58,6 +83,9 @@ namespace TS {

void Latch::wait() {
std::shared_lock<TS_LATCH_mutex_type> lck(mMtx);
if (mCount == 0) {
return;
}
mCond.wait(lck);
}

Expand Down
25 changes: 13 additions & 12 deletions TS_latch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* @Author: Ben Sokol <Ben>
* @Email: [email protected]
* @Created: February 19th, 2019 [12:28pm]
* @Modified: April 11th, 2019 [6:10pm]
* @Modified: November 1st, 2019 [6:26pm]
* @Version: 1.0.0
*
* Copyright (C) 2019 by Ben Sokol. All Rights Reserved.
Expand All @@ -16,25 +16,26 @@


#if __has_include(<shared_mutex>)
#include <shared_mutex>
#ifndef TS_LATCH_mutex_type
#if defined(_LIBCPP_AVAILABILITY_SHARED_MUTEX)
#define TS_LATCH_mutex_type std::shared_mutex
#include <shared_mutex>
#ifndef TS_LATCH_mutex_type
#if defined(_LIBCPP_AVAILABILITY_SHARED_MUTEX)
#define TS_LATCH_mutex_type std::shared_mutex
#else
#define TS_LATCH_mutex_type std::shared_timed_mutex
#endif
#endif
#else
#define TS_LATCH_mutex_type std::shared_timed_mutex
#endif
#endif
#else
#error Requires #include <shared_mutex> for std::shared_mutex or std::shared_timed_mutex
#error Requires #include <shared_mutex> for std::shared_mutex or std::shared_timed_mutex
#endif

namespace TS {
class Latch {
public:
explicit Latch(size_t aCount);
~Latch();

void count_down_and_wait();
void clear_and_reset(size_t newThreshold);
void reset(size_t newThreshold);
void count_down_and_wait(size_t n = 1);
void count_down(size_t n = 1);
size_t get_count() const noexcept;
void wait();
Expand Down
47 changes: 27 additions & 20 deletions TS_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* @Author: Ben Sokol
* @Email: [email protected]
* @Created: March 26th, 2019 [8:17am]
* @Modified: March 29th, 2019 [11:16pm]
* @Modified: November 1st, 2019 [5:43pm]
* @Version: 1.0.0
*
* Copyright (C) 2019 by Ben Sokol. All Rights Reserved.
Expand All @@ -12,11 +12,12 @@
#ifndef TS_POOL_HPP
#define TS_POOL_HPP

#include <cstdint> // intmax_t
#include <cstdlib> // size_t

#include <algorithm> // std::min
#include <cassert> // assert
#include <condition_variable> // std::condition_variable
#include <cstdint> // intmax_t
#include <cstdlib> // size_t
#include <functional> // std::bind, std::function
#include <future> // std::future, std::packaged_task
#include <limits> // std::numeric_limits
Expand Down Expand Up @@ -47,7 +48,8 @@ namespace TS {
// enqueue with priority
template <typename function_type, typename... Args>
std::future<typename std::result_of<function_type(Args...)>::type> enqueue(priority_type aPriority,
function_type &&f, Args &&... args);
function_type &&f,
Args &&... args);

// enqueue with normal (0) priority
template <typename function_type, typename... Args>
Expand Down Expand Up @@ -87,23 +89,25 @@ namespace TS {
void pool<priority_type>::taskThread() {
// run tasks in queue until mStop == true || mTasks.empty()
for (;;) {
std::function<void()> task;
std::function<void()> function_task;
{
std::unique_lock<std::mutex> lock(mQueueMutex);
mQueueUpdatedCondition.wait(lock, [this]() { return (mStop || !mTasks.empty()); });
mQueueUpdatedCondition.wait(lock, [this]() {
return (mStop || !mTasks.empty());
});

//End the worker thread immediately if it is asked to stop
if (mStop) {
return;
}
else {
task = std::get<1>(std::move(mTasks.top()));
function_task = std::get<1>(std::move(mTasks.top()));
mTasks.pop();
}
}

// run task and notify when complete.
task();
function_task();
mTaskFinishedCondition.notify_all();
}
}
Expand All @@ -118,7 +122,7 @@ namespace TS {
mStop = false;
mWorkers = std::vector<std::thread>();
mTasks = std::priority_queue<prioritized_task, std::vector<prioritized_task>, prioritizedTaskCompare>(
prioritizedTaskCompareFunc);
prioritizedTaskCompareFunc);

// Start task threads
for (size_t i = 0; i < aThreads; i++) {
Expand All @@ -144,12 +148,11 @@ namespace TS {
template <typename priority_type>
template <typename function_type, typename... Args>
std::future<typename std::result_of<function_type(Args...)>::type>
pool<priority_type>::enqueue(priority_type aPriority, function_type &&f, Args &&... args) {

auto task = std::make_shared<std::packaged_task<typename std::result_of<function_type(Args...)>::type()>>(
std::bind(std::forward<function_type>(f), std::forward<Args>(args)...));
pool<priority_type>::enqueue(priority_type aPriority, function_type &&f, Args &&... args) {
auto packaged_task = std::make_shared<std::packaged_task<typename std::result_of<function_type(Args...)>::type()>>(
std::bind(std::forward<function_type>(f), std::forward<Args>(args)...));

std::future<typename std::result_of<function_type(Args...)>::type> res = task->get_future();
std::future<typename std::result_of<function_type(Args...)>::type> res = packaged_task->get_future();

{
std::unique_lock<std::mutex> lock(mQueueMutex);
Expand All @@ -159,7 +162,9 @@ namespace TS {
throw std::runtime_error("enqueue on stopped ThreadPool");
}

mTasks.emplace(prioritized_task(aPriority, [task]() { (*task)(); }));
mTasks.emplace(prioritized_task(aPriority, [packaged_task]() {
(*packaged_task)();
}));
}

mQueueUpdatedCondition.notify_one();
Expand All @@ -177,15 +182,17 @@ namespace TS {

template <typename priority_type>
void pool<priority_type>::wait() {
for (const auto &worker : mWorkers) {
if (std::this_thread::get_id() == worker.get_id()) {
throw std::runtime_error("Cannot wait on threadpool from within a task");
}
if (std::any_of(mWorkers.begin(), mWorkers.end(), [&](const auto &worker) {
return std::this_thread::get_id() == worker.get_id();
})) {
throw std::runtime_error("Cannot wait on threadpool from within a task");
}

while (!mTasks.empty()) {
std::unique_lock<std::mutex> lock(mTaskFinishedMutex);
mTaskFinishedCondition.wait(lock, [this]() { return mTasks.empty(); });
mTaskFinishedCondition.wait(lock, [this]() {
return mTasks.empty();
});
}
}

Expand Down
6 changes: 3 additions & 3 deletions private/TS_ostream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* @Author: Ben Sokol <Ben>
* @Email: [email protected]
* @Created: February 21st, 2019 [2:08am]
* @Modified: February 22nd, 2019 [1:22pm]
* @Modified: November 1st, 2019 [7:06pm]
* @Version: 1.0.0
*
* Copyright (C) 2019 by Ben Sokol. All Rights Reserved.
Expand All @@ -12,8 +12,8 @@
#ifndef TS_OSTREAM_HPP
#define TS_OSTREAM_HPP

#if !defined(TS_LOGANDPRINT_HPP) && !defined(TS_PRINT_HPP) && !defined(TS_LOG_HPP)
#error Internal use only. TS_ostream.hpp is private. Do not include outside of threadsafe-tools
#if !defined(TS_LOGANDPRINT_HPP) && !defined(TS_PRINT_HPP) && !defined(TS_LOG_HPP) && !defined(TEST_BUILD)
#error Internal use only. TS_ostream.hpp is private. Do not include outside of threadsafe-tools
#endif

#include <ostream>
Expand Down

0 comments on commit b54dd2e

Please sign in to comment.