Skip to content

Commit

Permalink
Merge pull request hyrise#237 from kaihowl/fix/kh-taskWeakObservers
Browse files Browse the repository at this point in the history
Using weak pointers instead of bare pointers.
  • Loading branch information
bastih committed Nov 7, 2013
2 parents 706e0d4 + 7d66503 commit ae1582a
Show file tree
Hide file tree
Showing 17 changed files with 92 additions and 57 deletions.
2 changes: 1 addition & 1 deletion src/bin/hyrise/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ int main(int argc, char *argv[]) {

SharedScheduler::getInstance().init(scheduler_name, worker_threads);

// MainS erver Loop
// Main Server Loop
struct ev_loop *loop = ev_default_loop(0);
ebb_server server;
// Initialize server based on libev event loop
Expand Down
16 changes: 11 additions & 5 deletions src/bin/units_access/SpawnConsecutiveSubtasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,20 @@ namespace {

void SpawnConsecutiveSubtasks::executePlanOperation() {
std::vector<std::shared_ptr<PlanOperation>> children;
std::vector<Task*> successors;
std::vector<std::shared_ptr<Task>> successors;
auto scheduler = SharedScheduler::getInstance().getScheduler();

for (auto doneObserver : _doneObservers) {
Task* const task = dynamic_cast<Task*>(doneObserver);
successors.push_back(task);
{
std::lock_guard<decltype(_observerMutex)> lk(_observerMutex);
for (const auto& weakDoneObserver : _doneObservers) {
if (auto doneObserver = weakDoneObserver.lock()) {
if (const auto task = std::dynamic_pointer_cast<Task>(doneObserver)) {
successors.push_back(std::move(task));
}
}
}
}

for (size_t i = 0; i < m_numberOfSpawns; ++i) {
children.push_back(QueryParser::instance().parse("SpawnedTask", Json::Value()));

Expand Down
14 changes: 10 additions & 4 deletions src/bin/units_access/SpawnParallelSubtasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,18 @@ namespace {
// Execution with horizontal tables results in undefined behavior
void SpawnParallelSubtasks::executePlanOperation() {
std::vector<std::shared_ptr<PlanOperation>> children;
std::vector<Task*> successors;
std::vector<std::shared_ptr<Task>> successors;
auto scheduler = SharedScheduler::getInstance().getScheduler();

for (auto doneObserver : _doneObservers) {
Task* const task = dynamic_cast<Task*>(doneObserver);
successors.push_back(task);
{
std::lock_guard<decltype(_observerMutex)> lk(_observerMutex);
for (const auto& weakDoneObserver : _doneObservers) {
if (auto doneObserver = weakDoneObserver.lock()) {
if (const auto task = std::dynamic_pointer_cast<Task>(doneObserver)) {
successors.push_back(std::move(task));
}
}
}
}

for (size_t i = 0; i < m_numberOfSpawns; ++i) {
Expand Down
2 changes: 1 addition & 1 deletion src/bin/units_access/helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ hyrise::storage::c_atable_ptr_t executeAndWait(
std::unique_ptr<MockedConnection> conn(new MockedConnection("query="+httpQuery));

SharedScheduler::getInstance().resetScheduler("WSCoreBoundQueuesScheduler", poolSize);
AbstractTaskScheduler * scheduler = SharedScheduler::getInstance().getScheduler();
const auto& scheduler = SharedScheduler::getInstance().getScheduler();

auto request = std::make_shared<access::RequestParseTask>(conn.get());
auto response = request->getResponseTask();
Expand Down
23 changes: 11 additions & 12 deletions src/bin/units_taskscheduler/taskscheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
#include "taskscheduler/WSCoreBoundQueuesScheduler.h"
#include "taskscheduler/ThreadPerTaskScheduler.h"

#include "helper/make_unique.h"
#include "helper/HwlocHelper.h"


Expand Down Expand Up @@ -54,17 +53,17 @@ INSTANTIATE_TEST_CASE_P(

TEST_P(SchedulerTest, setScheduler) {
SharedScheduler::getInstance().resetScheduler("CoreBoundQueuesScheduler");
AbstractTaskScheduler * scheduler = SharedScheduler::getInstance().getScheduler();
CoreBoundQueuesScheduler * simple_task_scheduler = dynamic_cast<CoreBoundQueuesScheduler *>(scheduler);
bool test = (simple_task_scheduler == nullptr);
const auto& scheduler = SharedScheduler::getInstance().getScheduler();
std::shared_ptr<CoreBoundQueuesScheduler> simple_task_scheduler = std::dynamic_pointer_cast<CoreBoundQueuesScheduler>(scheduler);
bool test = (simple_task_scheduler == NULL);
ASSERT_EQ(test, false);

SharedScheduler::getInstance().resetScheduler(scheduler_name, getNumberOfCoresOnSystem());
}

TEST_P(SchedulerTest, wait_task_test) {
SharedScheduler::getInstance().resetScheduler(scheduler_name);
AbstractTaskScheduler *scheduler = SharedScheduler::getInstance().getScheduler();
const auto& scheduler = SharedScheduler::getInstance().getScheduler();

std::shared_ptr<WaitTask> waiter = std::make_shared<WaitTask>();
scheduler->schedule(waiter);
Expand All @@ -87,7 +86,7 @@ long int getTimeInMillis() {

TEST_P(SchedulerTest, sync_task_test) {
SharedScheduler::getInstance().resetScheduler(scheduler_name);
AbstractTaskScheduler *scheduler = SharedScheduler::getInstance().getScheduler();
const auto& scheduler = SharedScheduler::getInstance().getScheduler();

//scheduler->resize(2);

Expand All @@ -113,7 +112,7 @@ TEST_P(SchedulerTest, million_dependencies_test) {
std::vector<std::shared_ptr<NoOp> > vtasks2;

SharedScheduler::getInstance().resetScheduler(scheduler_name);
AbstractTaskScheduler *scheduler = SharedScheduler::getInstance().getScheduler();
const auto& scheduler = SharedScheduler::getInstance().getScheduler();

//scheduler->resize(threads1);

Expand Down Expand Up @@ -148,7 +147,7 @@ TEST_P(SchedulerTest, million_noops_test) {
std::vector<std::shared_ptr<NoOp> > vtasks1;

SharedScheduler::getInstance().resetScheduler(scheduler_name);
AbstractTaskScheduler *scheduler = SharedScheduler::getInstance().getScheduler();
const auto& scheduler = SharedScheduler::getInstance().getScheduler();

//scheduler->resize(threads1);

Expand All @@ -167,7 +166,7 @@ TEST_P(SchedulerTest, million_noops_test) {

TEST_P(SchedulerTest, wait_dependency_task_test) {
SharedScheduler::getInstance().resetScheduler(scheduler_name);
AbstractTaskScheduler *scheduler = SharedScheduler::getInstance().getScheduler();
const auto& scheduler = SharedScheduler::getInstance().getScheduler();

//scheduler->resize(2);
std::shared_ptr<NoOp> nop = std::make_shared<NoOp>();
Expand All @@ -185,7 +184,7 @@ TEST_P(SchedulerTest, wait_set_test) {
int sleeptime = 50;

SharedScheduler::getInstance().resetScheduler(scheduler_name);
AbstractTaskScheduler *scheduler = SharedScheduler::getInstance().getScheduler();
const auto& scheduler = SharedScheduler::getInstance().getScheduler();

auto waiter = std::make_shared<WaitTask>();
auto sleeper = std::make_shared<SleepTask>(sleeptime);
Expand Down Expand Up @@ -253,14 +252,14 @@ bool long_block_test(AbstractTaskScheduler * scheduler){
TEST(SchedulerBlockTest, dont_block_test) {
/* we assign a long running task and a number of smaller tasks with a think time to the queues -
the scheduler should realize that one queue is blocked and assign tasks to other queues */
auto scheduler = make_unique<CoreBoundQueuesScheduler>(2);
auto scheduler = std::make_shared<CoreBoundQueuesScheduler>(2);
// These test currently just check for execute
long_block_test(scheduler.get());
}

TEST(SchedulerBlockTest, dont_block_test_with_work_stealing) {
/* steal work from that queue */
auto scheduler = make_unique<WSCoreBoundQueuesScheduler>(2);
auto scheduler = std::make_shared<WSCoreBoundQueuesScheduler>(2);
long_block_test(scheduler.get());
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/access/system/RequestParseTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ std::string hash(const std::string &v) {

void RequestParseTask::operator()() {
assert((_responseTask != nullptr) && "Response needs to be set");
AbstractTaskScheduler *scheduler = SharedScheduler::getInstance().getScheduler();
const auto& scheduler = SharedScheduler::getInstance().getScheduler();

performance_vector_t& performance_data = _responseTask->getPerformanceData();

Expand Down
2 changes: 1 addition & 1 deletion src/lib/taskscheduler/AbstractCoreBoundQueuesScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void AbstractCoreBoundQueuesScheduler::schedule(std::shared_ptr<Task> task) {
if (task->isReady())
pushToQueue(task);
else {
task->addReadyObserver(this);
task->addReadyObserver(shared_from_this());
std::lock_guard<lock_t> lk(_setMutex);
_waitSet.insert(task);
LOG4CXX_DEBUG(_logger, "Task " << std::hex << (void *)task.get() << std::dec << " inserted in wait queue");
Expand Down
5 changes: 4 additions & 1 deletion src/lib/taskscheduler/AbstractCoreBoundQueuesScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@
#include "AbstractCoreBoundQueue.h"
#include <atomic>

class AbstractCoreBoundQueuesScheduler : public AbstractTaskScheduler, public TaskReadyObserver{
class AbstractCoreBoundQueuesScheduler :
public AbstractTaskScheduler,
public TaskReadyObserver,
public std::enable_shared_from_this<TaskReadyObserver> {

public:

Expand Down
2 changes: 1 addition & 1 deletion src/lib/taskscheduler/CentralPriorityScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void CentralPriorityScheduler::schedule(std::shared_ptr<Task> task){
_condition.notify_one();
}
else {
task->addReadyObserver(this);
task->addReadyObserver(shared_from_this());
std::lock_guard<lock_t> lk(_setMutex);
_waitSet.insert(task);
LOG4CXX_DEBUG(_logger, "Task " << std::hex << (void *)task.get() << std::dec << " inserted in wait queue");
Expand Down
5 changes: 4 additions & 1 deletion src/lib/taskscheduler/CentralPriorityScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ class PriorityWorkerThread {
/**
* a central scheduler holds a task queue and n worker threads
*/
class CentralPriorityScheduler : public AbstractTaskScheduler, public TaskReadyObserver {
class CentralPriorityScheduler :
public AbstractTaskScheduler,
public TaskReadyObserver,
public std::enable_shared_from_this<TaskReadyObserver> {
friend class PriorityWorkerThread;
typedef std::unordered_set<std::shared_ptr<Task> > waiting_tasks_t;
// set for tasks with open dependencies
Expand Down
2 changes: 1 addition & 1 deletion src/lib/taskscheduler/CentralScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void CentralScheduler::schedule(std::shared_ptr<Task> task){
_condition.notify_one();
}
else {
task->addReadyObserver(this);
task->addReadyObserver(shared_from_this());
std::lock_guard<lock_t> lk(_setMutex);
_waitSet.insert(task);
LOG4CXX_DEBUG(_logger, "Task " << std::hex << (void *)task.get() << std::dec << " inserted in wait queue");
Expand Down
5 changes: 4 additions & 1 deletion src/lib/taskscheduler/CentralScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ class WorkerThread {
/**
* a central scheduler holds a task queue and n worker threads
*/
class CentralScheduler : public AbstractTaskScheduler, public TaskReadyObserver {
class CentralScheduler :
public AbstractTaskScheduler,
public TaskReadyObserver,
public std::enable_shared_from_this<TaskReadyObserver> {
friend class WorkerThread;
typedef std::unordered_set<std::shared_ptr<Task> > waiting_tasks_t;
// set for tasks with open dependencies
Expand Down
18 changes: 8 additions & 10 deletions src/lib/taskscheduler/SharedScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,15 @@
#include <stdexcept>

struct AbstractTaskSchedulerFactory {
virtual AbstractTaskScheduler * create(int cores) const = 0;
virtual std::shared_ptr<AbstractTaskScheduler> create(int cores) const = 0;
virtual ~AbstractTaskSchedulerFactory() {}
};

/// Factory for schedulers, implements abstract factory pattern
template<typename T>
struct TaskSchedulerFactory : public AbstractTaskSchedulerFactory {
AbstractTaskScheduler * create(int cores) const {
return new T(cores);
std::shared_ptr<AbstractTaskScheduler> create(int cores) const {
return std::shared_ptr<AbstractTaskScheduler>(new T(cores));
}
};

Expand All @@ -37,11 +37,10 @@ class SchedulerException : public std::runtime_error {
class SharedScheduler{
typedef std::map< std::string, std::unique_ptr<AbstractTaskSchedulerFactory>> factory_map_t;
factory_map_t _schedulers;
AbstractTaskScheduler * _sharedScheduler = nullptr;
std::shared_ptr<AbstractTaskScheduler> _sharedScheduler;
public:

~SharedScheduler(){
delete _sharedScheduler;
}

template<typename TaskSchedulerClass>
Expand All @@ -57,12 +56,12 @@ class SharedScheduler{
}

bool isInitialized(){
return (_sharedScheduler != nullptr);
return bool(_sharedScheduler);
}

void init(const std::string &scheduler, int cores = getNumberOfCoresOnSystem()){

if(_sharedScheduler != nullptr)
if(_sharedScheduler)
throw SchedulerException("Scheduler has already been initialized");
if(_schedulers.find(scheduler) != _schedulers.end()){
_sharedScheduler = _schedulers[scheduler]->create(cores);
Expand All @@ -74,9 +73,8 @@ class SharedScheduler{
* stops current scheduler gracefully; starts new scheduler
*/
void resetScheduler(const std::string &scheduler, int cores = getNumberOfCoresOnSystem()){
if(_sharedScheduler != nullptr) {
if(_sharedScheduler) {
_sharedScheduler->shutdown();
delete _sharedScheduler;
}

if(_schedulers.find(scheduler) != _schedulers.end()){
Expand All @@ -85,7 +83,7 @@ class SharedScheduler{
throw SchedulerException("Requested scheduler was not registered");
}

AbstractTaskScheduler * getScheduler() {
std::shared_ptr<AbstractTaskScheduler> getScheduler() {
return _sharedScheduler;
}

Expand Down
36 changes: 25 additions & 11 deletions src/lib/taskscheduler/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,32 @@ void Task::unlockForNotifications() {
}

void Task::notifyReadyObservers() {
std::lock_guard<decltype(_observerMutex)> lk(_observerMutex);
std::vector<TaskReadyObserver *>::iterator itr;
for (itr = _readyObservers.begin(); itr != _readyObservers.end(); ++itr) {
(*itr)->notifyReady(shared_from_this());
// Lock and copy observers.
// This way we do not run any callbacks while holding a lock.
std::vector<std::weak_ptr<TaskReadyObserver>> targets;
{
std::lock_guard<decltype(_observerMutex)> lk(_observerMutex);
targets = _readyObservers;
}
for (const auto& target : targets) {
if (auto observer = target.lock()) {
observer->notifyReady(shared_from_this());
}
}
}

void Task::notifyDoneObservers() {
std::lock_guard<decltype(_observerMutex)> lk(_observerMutex);
std::vector<TaskDoneObserver *>::iterator itr;
for (itr = _doneObservers.begin(); itr != _doneObservers.end(); ++itr) {
(*itr)->notifyDone(shared_from_this());
// Lock and copy observers.
// This way we do not run any callbacks while holding a lock.
std::vector<std::weak_ptr<TaskDoneObserver>> targets;
{
std::lock_guard<decltype(_observerMutex)> lk(_observerMutex);
targets = _doneObservers;
}
for (const auto& target : targets) {
if (auto observer = target.lock()) {
observer->notifyDone(shared_from_this());
}
}
}

Expand All @@ -44,17 +58,17 @@ void Task::addDependency(std::shared_ptr<Task> dependency) {
_dependencies.push_back(dependency);
++_dependencyWaitCount;
}
dependency->addDoneObserver(this);
dependency->addDoneObserver(shared_from_this());

}

void Task::addReadyObserver(TaskReadyObserver *observer) {
void Task::addReadyObserver(const std::shared_ptr<TaskReadyObserver>& observer) {

std::lock_guard<decltype(_observerMutex)> lk(_observerMutex);
_readyObservers.push_back(observer);
}

void Task::addDoneObserver(TaskDoneObserver *observer) {
void Task::addDoneObserver(const std::shared_ptr<TaskDoneObserver>& observer) {

std::lock_guard<decltype(_observerMutex)> lk(_observerMutex);
_doneObservers.push_back(observer);
Expand Down
8 changes: 4 additions & 4 deletions src/lib/taskscheduler/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ class Task : public TaskDoneObserver, public std::enable_shared_from_this<Task>

protected:
std::vector<std::shared_ptr<Task> > _dependencies;
std::vector<TaskReadyObserver *> _readyObservers;
std::vector<TaskDoneObserver *> _doneObservers;
std::vector<std::weak_ptr<TaskReadyObserver>> _readyObservers;
std::vector<std::weak_ptr<TaskDoneObserver>> _doneObservers;

int _dependencyWaitCount;
// mutex for dependencyCount and dependency vector
Expand Down Expand Up @@ -106,11 +106,11 @@ class Task : public TaskDoneObserver, public std::enable_shared_from_this<Task>
/*
* adds an observer that gets notified if this task is ready to run
*/
void addReadyObserver(TaskReadyObserver *observer);
void addReadyObserver(const std::shared_ptr<TaskReadyObserver>& observer);
/*
* adds an obserer that gets notified if this task is done
*/
void addDoneObserver(TaskDoneObserver *observer);
void addDoneObserver(const std::shared_ptr<TaskDoneObserver>& observer);
/*
* whether this task is ready to run / has open dependencies
*/
Expand Down
2 changes: 1 addition & 1 deletion src/lib/taskscheduler/ThreadPerTaskScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void ThreadPerTaskScheduler::schedule(std::shared_ptr<Task> task){
t.detach();
}
else {
task->addReadyObserver(this);
task->addReadyObserver(shared_from_this());
std::lock_guard<lock_t> lk(_setMutex);
_waitSet.insert(task);
LOG4CXX_DEBUG(_logger, "Task " << std::hex << (void *)task.get() << std::dec << " inserted in wait queue");
Expand Down
Loading

0 comments on commit ae1582a

Please sign in to comment.