From 2554a7120894f7a6e06a912cf212be3d05cdd2e1 Mon Sep 17 00:00:00 2001 From: Renato Machado Date: Wed, 22 Nov 2023 16:39:46 -0300 Subject: [PATCH] improve: rework dispatcher --- src/client/map.cpp | 1 + src/framework/core/application.cpp | 3 + src/framework/core/asyncdispatcher.h | 4 + src/framework/core/eventdispatcher.cpp | 146 ++++++++++++++----------- src/framework/core/eventdispatcher.h | 48 +++++++- src/framework/core/logger.cpp | 13 +-- src/framework/core/logger.h | 1 - src/framework/core/scheduledevent.h | 2 +- src/framework/pch.h | 1 + 9 files changed, 140 insertions(+), 79 deletions(-) diff --git a/src/client/map.cpp b/src/client/map.cpp index 68ec14c347..a6202b8c4b 100644 --- a/src/client/map.cpp +++ b/src/client/map.cpp @@ -33,6 +33,7 @@ #include #include #include +#include #ifdef FRAMEWORK_EDITOR #include "houses.h" diff --git a/src/framework/core/application.cpp b/src/framework/core/application.cpp index 01c41e615b..7a442bd468 100644 --- a/src/framework/core/application.cpp +++ b/src/framework/core/application.cpp @@ -70,6 +70,9 @@ void Application::init(std::vector& args, uint8_t asyncDispatchMaxT std::locale::global(std::locale()); g_asyncDispatcher.init(asyncDispatchMaxThreads); + g_dispatcher.init(); + g_textDispatcher.init(); + g_mainDispatcher.init(); std::string startupOptions; for (uint32_t i = 1; i < args.size(); ++i) { diff --git a/src/framework/core/asyncdispatcher.h b/src/framework/core/asyncdispatcher.h index 8e9a980a11..41676fae5a 100644 --- a/src/framework/core/asyncdispatcher.h +++ b/src/framework/core/asyncdispatcher.h @@ -51,6 +51,10 @@ class AsyncDispatcher m_condition.notify_all(); } + inline auto getNumberOfThreads() const { + return m_threads.size(); + } + protected: void exec_loop(); diff --git a/src/framework/core/eventdispatcher.cpp b/src/framework/core/eventdispatcher.cpp index 0093da1ee4..05f7c219e1 100644 --- a/src/framework/core/eventdispatcher.cpp +++ b/src/framework/core/eventdispatcher.cpp @@ -21,6 +21,7 @@ */ #include "eventdispatcher.h" +#include "asyncdispatcher.h" #include #include "timer.h" @@ -29,64 +30,30 @@ EventDispatcher g_dispatcher, g_textDispatcher, g_mainDispatcher; std::thread::id g_mainThreadId = std::this_thread::get_id(); std::thread::id g_eventThreadId; +void EventDispatcher::init() { + m_threads.reserve(g_asyncDispatcher.getNumberOfThreads() + 1); + for (uint_fast16_t i = 1; i < m_threads.capacity(); ++i) { + m_threads.emplace_back(std::make_unique()); + } +}; + void EventDispatcher::shutdown() { - while (!m_eventList.empty()) - poll(); - - while (!m_scheduledEventList.empty()) { - m_scheduledEventList.top()->cancel(); - m_scheduledEventList.pop(); + while (!m_eventList.empty()) { + executeEvents(); + mergeEvents(); } + m_scheduledEventList.clear(); + m_disabled = true; } void EventDispatcher::poll() { - { - std::scoped_lock l(m_mutex); - for (int count = 0, max = m_scheduledEventList.size(); count < max && !m_scheduledEventList.empty(); ++count) { - const auto scheduledEvent = m_scheduledEventList.top(); - if (scheduledEvent->remainingTicks() > 0) - break; - - m_scheduledEventList.pop(); - scheduledEvent->execute(); - - if (scheduledEvent->nextCycle()) - m_scheduledEventList.push(scheduledEvent); - } - } - - const bool isMainDispatcher = &g_mainDispatcher == this; - std::unique_lock ul(m_mutex); - - // execute events list until all events are out, this is needed because some events can schedule new events that would - // change the UIWidgets layout, in this case we must execute these new events before we continue rendering, - m_pollEventsSize = m_eventList.size(); - int_fast32_t loops = 0; - while (m_pollEventsSize > 0) { - if (loops > 50) { - static Timer reportTimer; - if (reportTimer.running() && reportTimer.ticksElapsed() > 100) { - g_logger.error("ATTENTION the event list is not getting empty, this could be caused by some bad code"); - reportTimer.restart(); - } - break; - } - - for (int_fast32_t i = -1; ++i < static_cast(m_pollEventsSize);) { - const auto event = m_eventList.front(); - m_eventList.pop_front(); - if (isMainDispatcher) ul.unlock(); - event->execute(); - if (isMainDispatcher) ul.lock(); - } - m_pollEventsSize = m_eventList.size(); - - ++loops; - } + executeScheduledEvents(); + executeEvents(); + mergeEvents(); } ScheduledEventPtr EventDispatcher::scheduleEvent(const std::function& callback, int delay) @@ -94,11 +61,13 @@ ScheduledEventPtr EventDispatcher::scheduleEvent(const std::function& ca if (m_disabled) return std::make_shared(nullptr, delay, 1); - std::scoped_lock lock(m_mutex); - assert(delay >= 0); + const auto& scheduledEvent = std::make_shared(callback, delay, 1); - m_scheduledEventList.emplace(scheduledEvent); + const auto& thread = getThreadTask(); + std::scoped_lock lock(thread->mutex); + + thread->scheduledEventList.emplace_back(scheduledEvent); return scheduledEvent; } @@ -107,15 +76,17 @@ ScheduledEventPtr EventDispatcher::cycleEvent(const std::function& callb if (m_disabled) return std::make_shared(nullptr, delay, 0); - std::scoped_lock lock(m_mutex); - assert(delay > 0); + const auto& scheduledEvent = std::make_shared(callback, delay, 0); - m_scheduledEventList.emplace(scheduledEvent); + const auto& thread = getThreadTask(); + std::scoped_lock lock(thread->mutex); + + thread->scheduledEventList.emplace_back(scheduledEvent); return scheduledEvent; } -EventPtr EventDispatcher::addEvent(const std::function& callback, bool pushFront) +EventPtr EventDispatcher::addEvent(const std::function& callback) { if (m_disabled) return std::make_shared(nullptr); @@ -125,15 +96,60 @@ EventPtr EventDispatcher::addEvent(const std::function& callback, bool p return std::make_shared(nullptr); } - std::scoped_lock lock(m_mutex); - const auto& event = std::make_shared(callback); - // front pushing is a way to execute an event before others - if (pushFront) { - m_eventList.emplace_front(event); - // the poll event list only grows when pushing into front - ++m_pollEventsSize; - } else - m_eventList.emplace_back(event); + + const auto& thread = getThreadTask(); + std::scoped_lock lock(thread->mutex); + thread->events.emplace_back(event); + return event; +} + +void EventDispatcher::executeEvents() { + if (m_eventList.empty()) { + return; + } + + for (const auto& event : m_eventList) { + event->execute(); + } + + m_eventList.clear(); +} + +void EventDispatcher::executeScheduledEvents() { + auto& threadScheduledTasks = getThreadTask()->scheduledEventList; + + auto it = m_scheduledEventList.begin(); + while (it != m_scheduledEventList.end()) { + const auto& scheduledEvent = *it; + if (scheduledEvent->remainingTicks() > 0) + break; + + scheduledEvent->execute(); + + if (scheduledEvent->nextCycle()) + threadScheduledTasks.emplace_back(scheduledEvent); + + ++it; + } + + if (it != m_scheduledEventList.begin()) { + m_scheduledEventList.erase(m_scheduledEventList.begin(), it); + } +} + +void EventDispatcher::mergeEvents() { + for (const auto& thread : m_threads) { + std::scoped_lock lock(thread->mutex); + if (!thread->events.empty()) { + m_eventList.insert(m_eventList.end(), make_move_iterator(thread->events.begin()), make_move_iterator(thread->events.end())); + thread->events.clear(); + } + + if (!thread->scheduledEventList.empty()) { + m_scheduledEventList.insert(make_move_iterator(thread->scheduledEventList.begin()), make_move_iterator(thread->scheduledEventList.end())); + thread->scheduledEventList.clear(); + } + } } \ No newline at end of file diff --git a/src/framework/core/eventdispatcher.h b/src/framework/core/eventdispatcher.h index cba0fb09ac..21b1222c90 100644 --- a/src/framework/core/eventdispatcher.h +++ b/src/framework/core/eventdispatcher.h @@ -25,27 +25,65 @@ #include "clock.h" #include "scheduledevent.h" -#include #include // @bindsingleton g_dispatcher class EventDispatcher { public: + EventDispatcher() { + m_threads.emplace_back(std::make_unique()); + } + + void init(); void shutdown(); void poll(); - EventPtr addEvent(const std::function& callback, bool pushFront = false); + EventPtr addEvent(const std::function& callback); ScheduledEventPtr scheduleEvent(const std::function& callback, int delay); ScheduledEventPtr cycleEvent(const std::function& callback, int delay); private: + inline void mergeEvents(); + inline void executeEvents(); + inline void executeScheduledEvents(); + + const auto& getThreadTask() const { + return m_threads[getThreadId()]; + } + + static int16_t getThreadId() { + static std::atomic_int16_t lastId = -1; + thread_local static int16_t id = -1; + + if (id == -1) { + lastId.fetch_add(1); + id = lastId.load(); + } + + return id; + }; + size_t m_pollEventsSize{}; bool m_disabled{ false }; - std::recursive_mutex m_mutex; - std::deque m_eventList; - std::priority_queue, ScheduledEvent::Compare> m_scheduledEventList; + // Thread Events + struct ThreadTask + { + ThreadTask() { + events.reserve(2000); + scheduledEventList.reserve(2000); + } + + std::vector events; + std::vector scheduledEventList; + std::mutex mutex; + }; + std::vector> m_threads; + + // Main Events + std::vector m_eventList; + phmap::btree_multiset m_scheduledEventList; }; extern EventDispatcher g_dispatcher, g_textDispatcher, g_mainDispatcher; diff --git a/src/framework/core/logger.cpp b/src/framework/core/logger.cpp index 42ea676061..38fb7516aa 100644 --- a/src/framework/core/logger.cpp +++ b/src/framework/core/logger.cpp @@ -72,8 +72,6 @@ void Logger::log(Fw::LogLevel level, const std::string_view message) __android_log_print(ANDROID_LOG_INFO, "OTClientMobile", "%s", outmsg.c_str()); #endif // ANDROID - std::scoped_lock lock(m_mutex); - std::cout << outmsg << std::endl; if (m_outFile.good()) { @@ -107,7 +105,12 @@ void Logger::log(Fw::LogLevel level, const std::string_view message) void Logger::logFunc(Fw::LogLevel level, const std::string_view message, const std::string_view prettyFunction) { - std::scoped_lock lock(m_mutex); + if (g_eventThreadId != std::this_thread::get_id()) { + g_dispatcher.addEvent([this, level, msg = std::string{ message }, prettyFunction = std::string{ prettyFunction }] { + logFunc(level, msg, prettyFunction); + }); + return; + } auto fncName = prettyFunction.substr(0, prettyFunction.find_first_of('(')); if (fncName.find_last_of(' ') != std::string::npos) @@ -127,8 +130,6 @@ void Logger::logFunc(Fw::LogLevel level, const std::string_view message, const s void Logger::fireOldMessages() { - std::scoped_lock lock(m_mutex); - if (m_onLog) { for (const LogMessage& logMessage : m_logMessages) { m_onLog(logMessage.level, logMessage.message, logMessage.when); @@ -138,8 +139,6 @@ void Logger::fireOldMessages() void Logger::setLogFile(const std::string_view file) { - std::scoped_lock lock(m_mutex); - m_outFile.open(stdext::utf8_to_latin1(file), std::ios::out | std::ios::app); if (!m_outFile.is_open() || !m_outFile.good()) { g_logger.error(stdext::format("Unable to save log to '%s'", file)); diff --git a/src/framework/core/logger.h b/src/framework/core/logger.h index ce4eaad192..9906bda1f4 100644 --- a/src/framework/core/logger.h +++ b/src/framework/core/logger.h @@ -66,7 +66,6 @@ class Logger std::deque m_logMessages; OnLogCallback m_onLog; std::ofstream m_outFile; - std::recursive_mutex m_mutex; Fw::LogLevel m_level{ Fw::LogDebug }; }; diff --git a/src/framework/core/scheduledevent.h b/src/framework/core/scheduledevent.h index c96cdb2437..b4cdee2870 100644 --- a/src/framework/core/scheduledevent.h +++ b/src/framework/core/scheduledevent.h @@ -44,7 +44,7 @@ class ScheduledEvent : public Event { bool operator() (const ScheduledEventPtr& a, const ScheduledEventPtr& b) const { - return b->ticks() < a->ticks(); + return b->ticks() > a->ticks(); } }; diff --git a/src/framework/pch.h b/src/framework/pch.h index 50948942e2..6e61fa8878 100644 --- a/src/framework/pch.h +++ b/src/framework/pch.h @@ -48,6 +48,7 @@ #include #include +#include #include using namespace std::literals;