Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve: dispatcher #1732

Merged
merged 12 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions src/game/scheduling/dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,19 @@ void Dispatcher::executeEvents(std::unique_lock<std::mutex> &asyncLock) {

if (groupId == static_cast<uint8_t>(TaskGroup::Serial)) {
executeSerialEvents(tasks);
mergeEvents(); // merge request, as there may be async event requests
} else {
executeParallelEvents(tasks, groupId, asyncLock);
}
}
}

void Dispatcher::executeScheduledEvents() {
for (uint_fast64_t i = 0, max = scheduledTasks.size(); i < max && !scheduledTasks.empty(); ++i) {
const auto &task = scheduledTasks.top();
auto &threadScheduledTasks = getThreadTask()->scheduledTasks;

auto it = scheduledTasks.begin();
while (it != scheduledTasks.end()) {
const auto &task = *it;
if (task->getTime() > Task::TIME_NOW) {
break;
}
Expand All @@ -111,12 +115,16 @@ void Dispatcher::executeScheduledEvents() {

if (task->execute() && task->isCycle()) {
task->updateTime();
scheduledTasks.emplace(task);
threadScheduledTasks.emplace_back(task);
} else {
scheduledTasksRef.erase(task->getEventId());
scheduledTasksRef.erase(task->getId());
}

scheduledTasks.pop();
++it;
}

if (it != scheduledTasks.begin()) {
scheduledTasks.erase(scheduledTasks.begin(), it);
}

dispacherContext.reset();
Expand All @@ -126,18 +134,15 @@ void Dispatcher::executeScheduledEvents() {
void Dispatcher::mergeEvents() {
for (const auto &thread : threads) {
std::scoped_lock lock(thread->mutex);
if (!thread->tasks.empty()) {
for (uint_fast8_t i = 0; i < static_cast<uint8_t>(TaskGroup::Last); ++i) {
for (uint_fast8_t i = 0; i < static_cast<uint8_t>(TaskGroup::Last); ++i) {
if (!thread->tasks[i].empty()) {
m_tasks[i].insert(m_tasks[i].end(), make_move_iterator(thread->tasks[i].begin()), make_move_iterator(thread->tasks[i].end()));
thread->tasks[i].clear();
}
}

if (!thread->scheduledTasks.empty()) {
for (auto &task : thread->scheduledTasks) {
scheduledTasks.emplace(task);
scheduledTasksRef.emplace(task->getEventId(), task);
}
scheduledTasks.insert(make_move_iterator(thread->scheduledTasks.begin()), make_move_iterator(thread->scheduledTasks.end()));
thread->scheduledTasks.clear();
}
}
Expand All @@ -153,38 +158,39 @@ std::chrono::nanoseconds Dispatcher::timeUntilNextScheduledTask() const {
return CHRONO_MILI_MAX;
}

const auto &task = scheduledTasks.top();
const auto &task = *scheduledTasks.begin();
const auto timeRemaining = task->getTime() - Task::TIME_NOW;
return std::max<std::chrono::nanoseconds>(timeRemaining, CHRONO_NANO_0);
}

void Dispatcher::addEvent(std::function<void(void)> &&f, std::string_view context, uint32_t expiresAfterMs) {
const auto &thread = threads[getThreadId()];
const auto &thread = getThreadTask();
std::scoped_lock lock(thread->mutex);
thread->tasks[static_cast<uint8_t>(TaskGroup::Serial)].emplace_back(expiresAfterMs, std::move(f), context);
notify();
}

uint64_t Dispatcher::scheduleEvent(const std::shared_ptr<Task> &task) {
const auto &thread = threads[getThreadId()];
const auto &thread = getThreadTask();
std::scoped_lock lock(thread->mutex);

auto eventId = scheduledTasksRef
.emplace(task->generateId(), thread->scheduledTasks.emplace_back(task))
.emplace(task->getId(), thread->scheduledTasks.emplace_back(task))
.first->first;

notify();
return eventId;
}

void Dispatcher::asyncEvent(std::function<void(void)> &&f, TaskGroup group) {
const auto &thread = threads[getThreadId()];
const auto &thread = getThreadTask();
std::scoped_lock lock(thread->mutex);
thread->tasks[static_cast<uint8_t>(group)].emplace_back(0, std::move(f), dispacherContext.taskName);
notify();
}

void Dispatcher::stopEvent(uint64_t eventId) {
auto it = scheduledTasksRef.find(eventId);
const auto &it = scheduledTasksRef.find(eventId);
if (it != scheduledTasksRef.end()) {
it->second->cancel();
scheduledTasksRef.erase(it);
Expand Down
20 changes: 6 additions & 14 deletions src/game/scheduling/dispatcher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class Dispatcher {
public:
explicit Dispatcher(ThreadPool &threadPool) :
threadPool(threadPool) {
threads.reserve(std::thread::hardware_concurrency() + 1);
for (uint_fast16_t i = 0; i < std::thread::hardware_concurrency() + 1; ++i) {
threads.reserve(threadPool.getNumberOfThreads() + 1);
for (uint_fast16_t i = 0; i < threads.capacity(); ++i) {
threads.emplace_back(std::make_unique<ThreadTask>());
}
};
Expand Down Expand Up @@ -133,17 +133,9 @@ class Dispatcher {
Task::TIME_NOW = std::chrono::system_clock::now();
}

static int16_t getThreadId() {
static std::atomic_int16_t lastId = -1;
thread_local static int16_t id = -1;

if (id == -1) {
lastId.fetch_add(1);
id = lastId.load();
}

return id;
};
const auto &getThreadTask() const {
return threads[ThreadPool::getThreadId()];
}

uint64_t scheduleEvent(uint32_t delay, std::function<void(void)> &&f, std::string_view context, bool cycle, bool log = true) {
return scheduleEvent(std::make_shared<Task>(std::move(f), context, delay, cycle, log));
Expand Down Expand Up @@ -204,7 +196,7 @@ class Dispatcher {

// Main Events
std::array<std::vector<Task>, static_cast<uint8_t>(TaskGroup::Last)> m_tasks;
std::priority_queue<std::shared_ptr<Task>, std::deque<std::shared_ptr<Task>>, Task::Compare> scheduledTasks;
phmap::btree_multiset<std::shared_ptr<Task>, Task::Compare> scheduledTasks;
phmap::parallel_flat_hash_map_m<uint64_t, std::shared_ptr<Task>> scheduledTasksRef;

friend class CanaryServer;
Expand Down
8 changes: 7 additions & 1 deletion src/game/scheduling/task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@ std::chrono::system_clock::time_point Task::TIME_NOW = SYSTEM_TIME_ZERO;
std::atomic_uint_fast64_t Task::LAST_EVENT_ID = 0;

bool Task::execute() const {
if (!func || hasExpired()) {
if (isCanceled()) {
return false;
}

if (hasExpired()) {
g_logger().info("The task '{}' has expired, it has not been executed in {} ms.", getContext(), expiration - utime);
return false;
}

if (log) {
if (hasTraceableContext()) {
g_logger().trace("Executing task {}.", getContext());
Expand Down
70 changes: 31 additions & 39 deletions src/game/scheduling/task.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#pragma once
#include "utils/tools.hpp"
#include <unordered_set>

static constexpr auto SYSTEM_TIME_ZERO = std::chrono::system_clock::time_point(std::chrono::milliseconds(0));

Expand All @@ -17,24 +18,27 @@ class Task {
static std::chrono::system_clock::time_point TIME_NOW;

Task(uint32_t expiresAfterMs, std::function<void(void)> &&f, std::string_view context) :
expiration(expiresAfterMs > 0 ? TIME_NOW + std::chrono::milliseconds(expiresAfterMs) : SYSTEM_TIME_ZERO),
context(context), func(std::move(f)) {
func(std::move(f)), context(context), utime(TIME_NOW), expiration(expiresAfterMs > 0 ? TIME_NOW + std::chrono::milliseconds(expiresAfterMs) : SYSTEM_TIME_ZERO) {
assert(!this->context.empty() && "Context cannot be empty!");
}

Task(std::function<void(void)> &&f, std::string_view context, uint32_t delay, bool cycle = false, bool log = true) :
cycle(cycle), log(log), delay(delay), utime(TIME_NOW + std::chrono::milliseconds(delay)), context(context), func(std::move(f)) {
func(std::move(f)), context(context), utime(TIME_NOW + std::chrono::milliseconds(delay)), delay(delay), cycle(cycle), log(log) {
assert(!this->context.empty() && "Context cannot be empty!");
}

~Task() = default;

void setEventId(uint64_t id) {
eventId = id;
}
uint64_t getId() {
if (id == 0) {
if (++LAST_EVENT_ID == 0) {
LAST_EVENT_ID = 1;
}

id = LAST_EVENT_ID;
}

uint64_t getEventId() const {
return eventId;
return id;
}

uint32_t getDelay() const {
Expand All @@ -58,43 +62,24 @@ class Task {
}

bool isCanceled() const {
return canceled;
return func == nullptr;
}

void cancel() {
canceled = true;
func = nullptr;
}

bool execute() const;

private:
static std::atomic_uint_fast64_t LAST_EVENT_ID;

void updateTime() {
utime = TIME_NOW + std::chrono::milliseconds(delay);
}

uint64_t generateId() {
if (eventId == 0) {
if (++LAST_EVENT_ID == 0) {
LAST_EVENT_ID = 1;
}

eventId = LAST_EVENT_ID;
}

return eventId;
}

struct Compare {
bool operator()(const std::shared_ptr<Task> &a, const std::shared_ptr<Task> &b) const {
return b->utime < a->utime;
}
};

private:
static std::atomic_uint_fast64_t LAST_EVENT_ID;

bool hasTraceableContext() const {
const static auto tasksContext = phmap::flat_hash_set<std::string>({
const static auto tasksContext = std::unordered_set<std::string_view>({
"Creature::checkCreatureWalk",
"Decay::checkDecay",
"Dispatcher::asyncEvent",
Expand Down Expand Up @@ -123,16 +108,23 @@ class Task {
return tasksContext.contains(context);
}

bool canceled = false;
bool cycle = false;
bool log = true;
struct Compare {
bool operator()(const std::shared_ptr<Task> &a, const std::shared_ptr<Task> &b) const {
return a->utime < b->utime;
}
};

uint32_t delay = 0;
uint64_t eventId = 0;
std::function<void(void)> func = nullptr;
std::string_view context;

std::chrono::system_clock::time_point utime = SYSTEM_TIME_ZERO;
std::chrono::system_clock::time_point expiration = SYSTEM_TIME_ZERO;

std::string_view context;
std::function<void(void)> func = nullptr;
uint64_t id = 0;
uint32_t delay = 0;

bool cycle = false;
bool log = true;

friend class Dispatcher;
};
2 changes: 1 addition & 1 deletion src/lib/thread/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ void ThreadPool::start() {
* will make processing non-blocking in some way and that would allow
* single core computers to process things concurrently, but not in parallel.
*/
int nThreads = std::max<int>(static_cast<int>(getNumberOfCores()), DEFAULT_NUMBER_OF_THREADS);
nThreads = std::max<uint16_t>(static_cast<int>(getNumberOfCores()), DEFAULT_NUMBER_OF_THREADS);

for (std::size_t i = 0; i < nThreads; ++i) {
threads.emplace_back([this] { ioService.run(); });
Expand Down
18 changes: 18 additions & 0 deletions src/lib/thread/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,27 @@ class ThreadPool {
asio::io_context &getIoContext();
void addLoad(const std::function<void(void)> &load);

uint16_t getNumberOfThreads() const {
return nThreads;
}

static int16_t getThreadId() {
static std::atomic_int16_t lastId = -1;
thread_local static int16_t id = -1;

if (id == -1) {
lastId.fetch_add(1);
id = lastId.load();
}

return id;
};

private:
Logger &logger;
asio::io_context ioService;
std::vector<std::jthread> threads;
asio::io_context::work work { ioService };

uint16_t nThreads = 0;
};
Loading