diff --git a/src/module.cpp b/src/module.cpp index 252082036..3233bed3e 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -365,11 +365,7 @@ typedef struct RedisModuleCommandFilter { static list *moduleCommandFilters; /* Module GIL Variables */ -static int s_cAcquisitionsServer = 0; -static int s_cAcquisitionsModule = 0; -static std::mutex s_mutex; -static std::condition_variable s_cv; -static std::recursive_mutex s_mutexModule; +static readWriteLock s_moduleGIL; thread_local bool g_fModuleThread = false; typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data); @@ -5969,95 +5965,58 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) { // as the server thread acquisition is sufficient. If we did try to lock we would deadlock static bool FModuleCallBackLock(bool fServerThread) { - return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0; + return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_moduleGIL.hasReader(); } void moduleAcquireGIL(int fServerThread, int fExclusive) { - std::unique_lock lock(s_mutex); - int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer; - if (FModuleCallBackLock(fServerThread)) { return; } - while (*pcheck > 0) - s_cv.wait(lock); - if (fServerThread) { - ++s_cAcquisitionsServer; + s_moduleGIL.acquireRead(); } else { - // only try to acquire the mutexModule in exclusive mode - if (fExclusive){ - // It is possible that another module thread holds the GIL (and s_mutexModule as a result). - // When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns. - // This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns. - // As a result, a deadlock has occured. - // We release the lock on s_mutex and wait until we are able to safely acquire the GIL - // in order to prevent this deadlock from occuring. - while (!s_mutexModule.try_lock()) - s_cv.wait(lock); - } - ++s_cAcquisitionsModule; - fModuleGILWlocked++; + s_moduleGIL.acquireWrite(fExclusive); } } int moduleTryAcquireGIL(bool fServerThread, int fExclusive) { - std::unique_lock lock(s_mutex, std::defer_lock); - if (!lock.try_lock()) - return 1; - int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer; - if (FModuleCallBackLock(fServerThread)) { return 0; } - if (*pcheck > 0) - return 1; - if (fServerThread) { - ++s_cAcquisitionsServer; + if (!s_moduleGIL.tryAcquireRead()) + return 1; } else { - // only try to acquire the mutexModule in exclusive mode - if (fExclusive){ - if (!s_mutexModule.try_lock()) - return 1; - } - ++s_cAcquisitionsModule; - fModuleGILWlocked++; + if (!s_moduleGIL.tryAcquireWrite(fExclusive)) + return 1; } return 0; } void moduleReleaseGIL(int fServerThread, int fExclusive) { - std::unique_lock lock(s_mutex); - if (FModuleCallBackLock(fServerThread)) { return; } - + if (fServerThread) { - --s_cAcquisitionsServer; + s_moduleGIL.releaseRead(); } else { - // only try to release the mutexModule in exclusive mode - if (fExclusive) - s_mutexModule.unlock(); - --s_cAcquisitionsModule; - fModuleGILWlocked--; + s_moduleGIL.releaseWrite(fExclusive); } - s_cv.notify_all(); } int moduleGILAcquiredByModule(void) { - return fModuleGILWlocked > 0; + return s_moduleGIL.hasWriter(); } diff --git a/src/networking.cpp b/src/networking.cpp index 76223b76d..b920ec4d7 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3863,82 +3863,37 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) { * * The function returns the total number of events processed. */ void processEventsWhileBlocked(int iel) { - serverAssert(GlobalLocksAcquired()); - int iterations = 4; /* See the function top-comment. */ - - std::vector vecclients; - listIter li; - listNode *ln; - listRewind(g_pserver->clients, &li); - // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks - // so unlock here, and save them for reacquisition later - while ((ln = listNext(&li)) != nullptr) - { - client *c = (client*)listNodeValue(ln); - if (c->lock.fOwnLock()) { - serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop - c->lock.unlock(); - vecclients.push_back(c); + int eventsCount = 0; + executeWithoutGlobalLock([&](){ + int iterations = 4; /* See the function top-comment. */ + try + { + ProcessingEventsWhileBlocked = 1; + while (iterations--) { + long long startval = g_pserver->events_processed_while_blocked; + long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el, + AE_FILE_EVENTS|AE_DONT_WAIT| + AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); + /* Note that g_pserver->events_processed_while_blocked will also get + * incremeted by callbacks called by the event loop handlers. */ + eventsCount += ae_events; + long long events = eventsCount - startval; + if (!events) break; + } + ProcessingEventsWhileBlocked = 0; } - } - - /* Since we're about to release our lock we need to flush the repl backlog queue */ - bool fReplBacklog = g_pserver->repl_batch_offStart >= 0; - if (fReplBacklog) { - flushReplBacklogToClients(); - g_pserver->repl_batch_idxStart = -1; - g_pserver->repl_batch_offStart = -1; - } - - long long eventsCount = 0; - aeReleaseLock(); - serverAssert(!GlobalLocksAcquired()); - try - { - ProcessingEventsWhileBlocked = 1; - while (iterations--) { - long long startval = g_pserver->events_processed_while_blocked; - long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el, - AE_FILE_EVENTS|AE_DONT_WAIT| - AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); - /* Note that g_pserver->events_processed_while_blocked will also get - * incremeted by callbacks called by the event loop handlers. */ - eventsCount += ae_events; - long long events = eventsCount - startval; - if (!events) break; - } - ProcessingEventsWhileBlocked = 0; - } - catch (...) - { - // Caller expects us to be locked so fix and rethrow - ProcessingEventsWhileBlocked = 0; - AeLocker locker; - locker.arm(nullptr); - locker.release(); - for (client *c : vecclients) - c->lock.lock(); - throw; - } - - AeLocker locker; - locker.arm(nullptr); - locker.release(); + catch (...) + { + ProcessingEventsWhileBlocked = 0; + throw; + } + }); g_pserver->events_processed_while_blocked += eventsCount; whileBlockedCron(); - // Restore it so the calling code is not confused - if (fReplBacklog) { - g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; - g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - } - - for (client *c : vecclients) - c->lock.lock(); - // If a different thread processed the shutdown we need to abort the lua command or we will hang if (serverTL->el->stop) throw ShutdownException(); diff --git a/src/readwritelock.h b/src/readwritelock.h new file mode 100644 index 000000000..d03e1c82b --- /dev/null +++ b/src/readwritelock.h @@ -0,0 +1,113 @@ +#pragma once +#include + +class readWriteLock { + std::mutex m_readLock; + std::recursive_mutex m_writeLock; + std::condition_variable m_cv; + int m_readCount = 0; + int m_writeCount = 0; + bool m_writeWaiting = false; +public: + void acquireRead() { + std::unique_lock rm(m_readLock); + while (m_writeCount > 0 || m_writeWaiting) + m_cv.wait(rm); + m_readCount++; + } + + bool tryAcquireRead() { + std::unique_lock rm(m_readLock, std::defer_lock); + if (!rm.try_lock()) + return false; + if (m_writeCount > 0 || m_writeWaiting) + return false; + m_readCount++; + return true; + } + + void acquireWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock); + m_writeWaiting = true; + while (m_readCount > 0) + m_cv.wait(rm); + if (exclusive) { + /* Another thread might have the write lock while we have the read lock + but won't be able to release it until they can acquire the read lock + so release the read lock and try again instead of waiting to avoid deadlock */ + while(!m_writeLock.try_lock()) + m_cv.wait(rm); + } + m_writeCount++; + m_writeWaiting = false; + } + + void upgradeWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock); + m_writeWaiting = true; + while (m_readCount > 1) + m_cv.wait(rm); + if (exclusive) { + /* Another thread might have the write lock while we have the read lock + but won't be able to release it until they can acquire the read lock + so release the read lock and try again instead of waiting to avoid deadlock */ + while(!m_writeLock.try_lock()) + m_cv.wait(rm); + } + m_writeCount++; + m_readCount--; + m_writeWaiting = false; + } + + bool tryAcquireWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock, std::defer_lock); + if (!rm.try_lock()) + return false; + if (m_readCount > 0) + return false; + if (exclusive) + if (!m_writeLock.try_lock()) + return false; + m_writeCount++; + return true; + } + + void releaseRead() { + std::unique_lock rm(m_readLock); + serverAssert(m_readCount > 0); + m_readCount--; + m_cv.notify_all(); + } + + void releaseWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock); + serverAssert(m_writeCount > 0); + if (exclusive) + m_writeLock.unlock(); + m_writeCount--; + m_cv.notify_all(); + } + + void downgradeWrite(bool exclusive = true) { + std::unique_lock rm(m_readLock); + serverAssert(m_writeCount > 0); + if (exclusive) + m_writeLock.unlock(); + m_writeCount--; + while (m_writeCount > 0 || m_writeWaiting) + m_cv.wait(rm); + m_readCount++; + } + + bool hasReader() { + return m_readCount > 0; + } + + bool hasWriter() { + return m_writeCount > 0; + } + + bool writeWaiting() { + return m_writeWaiting; + } +}; \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index ab75ac589..80e755d73 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -67,6 +67,7 @@ #include "aelocker.h" #include "motd.h" #include "t_nhash.h" +#include "readwritelock.h" #ifdef __linux__ #include #include @@ -91,8 +92,10 @@ double R_Zero, R_PosInf, R_NegInf, R_Nan; /* Global vars */ namespace GlobalHidden { struct redisServer server; /* Server global state */ +readWriteLock forkLock; } redisServer *g_pserver = &GlobalHidden::server; +readWriteLock *g_forkLock = &GlobalHidden::forkLock; struct redisServerConst cserver; __thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars std::mutex time_thread_mutex; @@ -2649,8 +2652,8 @@ void afterSleep(struct aeEventLoop *eventLoop) { Don't check here that modules are enabled, rather use the result from beforeSleep Otherwise you may double acquire the GIL and cause deadlocks in the module */ if (!ProcessingEventsWhileBlocked) { - wakeTimeThread(); if (serverTL->modulesEnabledThisAeLoop) moduleAcquireGIL(TRUE /*fServerThread*/); + wakeTimeThread(); } } @@ -6218,6 +6221,63 @@ void closeChildUnusedResourceAfterFork() { cserver.pidfile = NULL; } +void executeWithoutGlobalLock(std::function func) { + serverAssert(GlobalLocksAcquired()); + + std::vector vecclients; + listIter li; + listNode *ln; + listRewind(g_pserver->clients, &li); + + // All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks + // so unlock here, and save them for reacquisition later + while ((ln = listNext(&li)) != nullptr) + { + client *c = (client*)listNodeValue(ln); + if (c->lock.fOwnLock()) { + serverAssert(c->flags & CLIENT_PROTECTED || c->flags & CLIENT_EXECUTING_COMMAND); // If the client is not protected we have no gurantee they won't be free'd in the event loop + c->lock.unlock(); + vecclients.push_back(c); + } + } + + /* Since we're about to release our lock we need to flush the repl backlog queue */ + bool fReplBacklog = g_pserver->repl_batch_offStart >= 0; + if (fReplBacklog) { + flushReplBacklogToClients(); + g_pserver->repl_batch_idxStart = -1; + g_pserver->repl_batch_offStart = -1; + } + + aeReleaseLock(); + serverAssert(!GlobalLocksAcquired()); + try { + func(); + } + catch (...) { + // Caller expects us to be locked so fix and rethrow + AeLocker locker; + locker.arm(nullptr); + locker.release(); + for (client *c : vecclients) + c->lock.lock(); + throw; + } + + AeLocker locker; + locker.arm(nullptr); + locker.release(); + + // Restore it so the calling code is not confused + if (fReplBacklog) { + g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + } + + for (client *c : vecclients) + c->lock.lock(); +} + /* purpose is one of CHILD_TYPE_ types */ int redisFork(int purpose) { int childpid; @@ -6229,7 +6289,9 @@ int redisFork(int purpose) { openChildInfoPipe(); } - + long long startWriteLock = ustime(); + g_forkLock->acquireWrite(); + latencyAddSampleIfNeeded("fork-lock",(ustime()-startWriteLock)/1000); if ((childpid = fork()) == 0) { /* Child */ g_pserver->in_fork_child = purpose; @@ -6238,6 +6300,7 @@ int redisFork(int purpose) { closeChildUnusedResourceAfterFork(); } else { /* Parent */ + g_forkLock->releaseWrite(); g_pserver->stat_total_forks++; g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ @@ -6554,20 +6617,32 @@ void *timeThreadMain(void*) { timespec delay; delay.tv_sec = 0; delay.tv_nsec = 100; + int cycle_count = 0; + g_forkLock->acquireRead(); while (true) { { std::unique_lock lock(time_thread_mutex); if (sleeping_threads >= cserver.cthreads) { + g_forkLock->releaseRead(); time_thread_cv.wait(lock); + g_forkLock->acquireRead(); + cycle_count = 0; } } updateCachedTime(); + if (cycle_count == MAX_CYCLES_TO_HOLD_FORK_LOCK) { + g_forkLock->releaseRead(); + g_forkLock->acquireRead(); + cycle_count = 0; + } #if defined(__APPLE__) nanosleep(&delay, nullptr); #else clock_nanosleep(CLOCK_MONOTONIC, 0, &delay, NULL); #endif + cycle_count++; } + g_forkLock->releaseRead(); } void *workerThreadMain(void *parg) diff --git a/src/server.h b/src/server.h index 7d813c4a0..d0c443934 100644 --- a/src/server.h +++ b/src/server.h @@ -96,6 +96,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "connection.h" /* Connection abstraction */ #include "serverassert.h" #include "expire.h" +#include "readwritelock.h" #define REDISMODULE_CORE 1 #include "redismodule.h" /* Redis modules API defines. */ @@ -733,6 +734,9 @@ typedef enum { #define REDISMODULE_AUX_BEFORE_RDB (1<<0) #define REDISMODULE_AUX_AFTER_RDB (1<<1) +/* Number of cycles before time thread gives up fork lock */ +#define MAX_CYCLES_TO_HOLD_FORK_LOCK 10 + struct RedisModule; struct RedisModuleIO; struct RedisModuleDigest; @@ -2113,6 +2117,7 @@ typedef struct { //extern struct redisServer server; extern redisServer *g_pserver; +extern readWriteLock *g_forkLock; extern struct redisServerConst cserver; extern __thread struct redisServerThreadVars *serverTL; // thread local server vars extern struct sharedObjectsStruct shared; @@ -2496,6 +2501,7 @@ void sendChildInfo(childInfoType info_type, size_t keys, const char *pname); void receiveChildInfo(void); /* Fork helpers */ +void executeWithoutGlobalLock(std::function func); int redisFork(int type); int hasActiveChildProcess(); void resetChildState();