Skip to content

Commit

Permalink
improve: rework dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
mehah authored Nov 22, 2023
1 parent ae8d255 commit 2554a71
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 79 deletions.
1 change: 1 addition & 0 deletions src/client/map.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <framework/core/asyncdispatcher.h>
#include <framework/core/graphicalapplication.h>
#include <framework/core/eventdispatcher.h>
#include <queue>

#ifdef FRAMEWORK_EDITOR
#include "houses.h"
Expand Down
3 changes: 3 additions & 0 deletions src/framework/core/application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ void Application::init(std::vector<std::string>& args, uint8_t asyncDispatchMaxT
std::locale::global(std::locale());

g_asyncDispatcher.init(asyncDispatchMaxThreads);
g_dispatcher.init();
g_textDispatcher.init();
g_mainDispatcher.init();

std::string startupOptions;
for (uint32_t i = 1; i < args.size(); ++i) {
Expand Down
4 changes: 4 additions & 0 deletions src/framework/core/asyncdispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ class AsyncDispatcher
m_condition.notify_all();
}

inline auto getNumberOfThreads() const {
return m_threads.size();
}

protected:
void exec_loop();

Expand Down
146 changes: 81 additions & 65 deletions src/framework/core/eventdispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
*/

#include "eventdispatcher.h"
#include "asyncdispatcher.h"

#include <framework/core/clock.h>
#include "timer.h"
Expand All @@ -29,76 +30,44 @@ EventDispatcher g_dispatcher, g_textDispatcher, g_mainDispatcher;
std::thread::id g_mainThreadId = std::this_thread::get_id();
std::thread::id g_eventThreadId;

void EventDispatcher::init() {
m_threads.reserve(g_asyncDispatcher.getNumberOfThreads() + 1);
for (uint_fast16_t i = 1; i < m_threads.capacity(); ++i) {
m_threads.emplace_back(std::make_unique<ThreadTask>());
}
};

void EventDispatcher::shutdown()
{
while (!m_eventList.empty())
poll();

while (!m_scheduledEventList.empty()) {
m_scheduledEventList.top()->cancel();
m_scheduledEventList.pop();
while (!m_eventList.empty()) {
executeEvents();
mergeEvents();
}

m_scheduledEventList.clear();

m_disabled = true;
}

void EventDispatcher::poll()
{
{
std::scoped_lock l(m_mutex);
for (int count = 0, max = m_scheduledEventList.size(); count < max && !m_scheduledEventList.empty(); ++count) {
const auto scheduledEvent = m_scheduledEventList.top();
if (scheduledEvent->remainingTicks() > 0)
break;

m_scheduledEventList.pop();
scheduledEvent->execute();

if (scheduledEvent->nextCycle())
m_scheduledEventList.push(scheduledEvent);
}
}

const bool isMainDispatcher = &g_mainDispatcher == this;
std::unique_lock ul(m_mutex);

// execute events list until all events are out, this is needed because some events can schedule new events that would
// change the UIWidgets layout, in this case we must execute these new events before we continue rendering,
m_pollEventsSize = m_eventList.size();
int_fast32_t loops = 0;
while (m_pollEventsSize > 0) {
if (loops > 50) {
static Timer reportTimer;
if (reportTimer.running() && reportTimer.ticksElapsed() > 100) {
g_logger.error("ATTENTION the event list is not getting empty, this could be caused by some bad code");
reportTimer.restart();
}
break;
}

for (int_fast32_t i = -1; ++i < static_cast<int_fast32_t>(m_pollEventsSize);) {
const auto event = m_eventList.front();
m_eventList.pop_front();
if (isMainDispatcher) ul.unlock();
event->execute();
if (isMainDispatcher) ul.lock();
}
m_pollEventsSize = m_eventList.size();

++loops;
}
executeScheduledEvents();
executeEvents();
mergeEvents();
}

ScheduledEventPtr EventDispatcher::scheduleEvent(const std::function<void()>& callback, int delay)
{
if (m_disabled)
return std::make_shared<ScheduledEvent>(nullptr, delay, 1);

std::scoped_lock<std::recursive_mutex> lock(m_mutex);

assert(delay >= 0);

const auto& scheduledEvent = std::make_shared<ScheduledEvent>(callback, delay, 1);
m_scheduledEventList.emplace(scheduledEvent);
const auto& thread = getThreadTask();
std::scoped_lock lock(thread->mutex);

thread->scheduledEventList.emplace_back(scheduledEvent);
return scheduledEvent;
}

Expand All @@ -107,15 +76,17 @@ ScheduledEventPtr EventDispatcher::cycleEvent(const std::function<void()>& callb
if (m_disabled)
return std::make_shared<ScheduledEvent>(nullptr, delay, 0);

std::scoped_lock<std::recursive_mutex> lock(m_mutex);

assert(delay > 0);

const auto& scheduledEvent = std::make_shared<ScheduledEvent>(callback, delay, 0);
m_scheduledEventList.emplace(scheduledEvent);
const auto& thread = getThreadTask();
std::scoped_lock lock(thread->mutex);

thread->scheduledEventList.emplace_back(scheduledEvent);
return scheduledEvent;
}

EventPtr EventDispatcher::addEvent(const std::function<void()>& callback, bool pushFront)
EventPtr EventDispatcher::addEvent(const std::function<void()>& callback)
{
if (m_disabled)
return std::make_shared<Event>(nullptr);
Expand All @@ -125,15 +96,60 @@ EventPtr EventDispatcher::addEvent(const std::function<void()>& callback, bool p
return std::make_shared<Event>(nullptr);
}

std::scoped_lock<std::recursive_mutex> lock(m_mutex);

const auto& event = std::make_shared<Event>(callback);
// front pushing is a way to execute an event before others
if (pushFront) {
m_eventList.emplace_front(event);
// the poll event list only grows when pushing into front
++m_pollEventsSize;
} else
m_eventList.emplace_back(event);

const auto& thread = getThreadTask();
std::scoped_lock lock(thread->mutex);
thread->events.emplace_back(event);

return event;
}

void EventDispatcher::executeEvents() {
if (m_eventList.empty()) {
return;
}

for (const auto& event : m_eventList) {
event->execute();
}

m_eventList.clear();
}

void EventDispatcher::executeScheduledEvents() {
auto& threadScheduledTasks = getThreadTask()->scheduledEventList;

auto it = m_scheduledEventList.begin();
while (it != m_scheduledEventList.end()) {
const auto& scheduledEvent = *it;
if (scheduledEvent->remainingTicks() > 0)
break;

scheduledEvent->execute();

if (scheduledEvent->nextCycle())
threadScheduledTasks.emplace_back(scheduledEvent);

++it;
}

if (it != m_scheduledEventList.begin()) {
m_scheduledEventList.erase(m_scheduledEventList.begin(), it);
}
}

void EventDispatcher::mergeEvents() {
for (const auto& thread : m_threads) {
std::scoped_lock lock(thread->mutex);
if (!thread->events.empty()) {
m_eventList.insert(m_eventList.end(), make_move_iterator(thread->events.begin()), make_move_iterator(thread->events.end()));
thread->events.clear();
}

if (!thread->scheduledEventList.empty()) {
m_scheduledEventList.insert(make_move_iterator(thread->scheduledEventList.begin()), make_move_iterator(thread->scheduledEventList.end()));
thread->scheduledEventList.clear();
}
}
}
48 changes: 43 additions & 5 deletions src/framework/core/eventdispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,27 +25,65 @@
#include "clock.h"
#include "scheduledevent.h"

#include <queue>
#include <thread>

// @bindsingleton g_dispatcher
class EventDispatcher
{
public:
EventDispatcher() {
m_threads.emplace_back(std::make_unique<ThreadTask>());
}

void init();
void shutdown();
void poll();

EventPtr addEvent(const std::function<void()>& callback, bool pushFront = false);
EventPtr addEvent(const std::function<void()>& callback);
ScheduledEventPtr scheduleEvent(const std::function<void()>& callback, int delay);
ScheduledEventPtr cycleEvent(const std::function<void()>& callback, int delay);

private:
inline void mergeEvents();
inline void executeEvents();
inline void executeScheduledEvents();

const auto& getThreadTask() const {
return m_threads[getThreadId()];
}

static int16_t getThreadId() {
static std::atomic_int16_t lastId = -1;
thread_local static int16_t id = -1;

if (id == -1) {
lastId.fetch_add(1);
id = lastId.load();
}

return id;
};

size_t m_pollEventsSize{};
bool m_disabled{ false };

std::recursive_mutex m_mutex;
std::deque<EventPtr> m_eventList;
std::priority_queue<ScheduledEventPtr, std::deque<ScheduledEventPtr>, ScheduledEvent::Compare> m_scheduledEventList;
// Thread Events
struct ThreadTask
{
ThreadTask() {
events.reserve(2000);
scheduledEventList.reserve(2000);
}

std::vector<EventPtr> events;
std::vector<ScheduledEventPtr> scheduledEventList;
std::mutex mutex;
};
std::vector<std::unique_ptr<ThreadTask>> m_threads;

// Main Events
std::vector<EventPtr> m_eventList;
phmap::btree_multiset<ScheduledEventPtr, ScheduledEvent::Compare> m_scheduledEventList;
};

extern EventDispatcher g_dispatcher, g_textDispatcher, g_mainDispatcher;
Expand Down
13 changes: 6 additions & 7 deletions src/framework/core/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ void Logger::log(Fw::LogLevel level, const std::string_view message)
__android_log_print(ANDROID_LOG_INFO, "OTClientMobile", "%s", outmsg.c_str());
#endif // ANDROID

std::scoped_lock lock(m_mutex);

std::cout << outmsg << std::endl;

if (m_outFile.good()) {
Expand Down Expand Up @@ -107,7 +105,12 @@ void Logger::log(Fw::LogLevel level, const std::string_view message)

void Logger::logFunc(Fw::LogLevel level, const std::string_view message, const std::string_view prettyFunction)
{
std::scoped_lock lock(m_mutex);
if (g_eventThreadId != std::this_thread::get_id()) {
g_dispatcher.addEvent([this, level, msg = std::string{ message }, prettyFunction = std::string{ prettyFunction }] {
logFunc(level, msg, prettyFunction);
});
return;
}

auto fncName = prettyFunction.substr(0, prettyFunction.find_first_of('('));
if (fncName.find_last_of(' ') != std::string::npos)
Expand All @@ -127,8 +130,6 @@ void Logger::logFunc(Fw::LogLevel level, const std::string_view message, const s

void Logger::fireOldMessages()
{
std::scoped_lock lock(m_mutex);

if (m_onLog) {
for (const LogMessage& logMessage : m_logMessages) {
m_onLog(logMessage.level, logMessage.message, logMessage.when);
Expand All @@ -138,8 +139,6 @@ void Logger::fireOldMessages()

void Logger::setLogFile(const std::string_view file)
{
std::scoped_lock lock(m_mutex);

m_outFile.open(stdext::utf8_to_latin1(file), std::ios::out | std::ios::app);
if (!m_outFile.is_open() || !m_outFile.good()) {
g_logger.error(stdext::format("Unable to save log to '%s'", file));
Expand Down
1 change: 0 additions & 1 deletion src/framework/core/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class Logger
std::deque<LogMessage> m_logMessages;
OnLogCallback m_onLog;
std::ofstream m_outFile;
std::recursive_mutex m_mutex;
Fw::LogLevel m_level{ Fw::LogDebug };
};

Expand Down
2 changes: 1 addition & 1 deletion src/framework/core/scheduledevent.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ScheduledEvent : public Event
{
bool operator() (const ScheduledEventPtr& a, const ScheduledEventPtr& b) const
{
return b->ticks() < a->ticks();
return b->ticks() > a->ticks();
}
};

Expand Down
1 change: 1 addition & 0 deletions src/framework/pch.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include <vector>

#include <parallel_hashmap/phmap.h>
#include <parallel_hashmap/btree.h>
#include <pugixml.hpp>

using namespace std::literals;

0 comments on commit 2554a71

Please sign in to comment.