Skip to content

Commit

Permalink
Merge pull request #386 from EQ-Alpha/fix_rdb_hang
Browse files Browse the repository at this point in the history
add readwrite lock for forking
  • Loading branch information
MalavanEQAlpha authored and msotheeswaran-sc committed Jan 14, 2022
1 parent 35dc4df commit dabb819
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 124 deletions.
65 changes: 12 additions & 53 deletions src/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -365,11 +365,7 @@ typedef struct RedisModuleCommandFilter {
static list *moduleCommandFilters;

/* Module GIL Variables */
static int s_cAcquisitionsServer = 0;
static int s_cAcquisitionsModule = 0;
static std::mutex s_mutex;
static std::condition_variable s_cv;
static std::recursive_mutex s_mutexModule;
static readWriteLock s_moduleGIL;
thread_local bool g_fModuleThread = false;

typedef void (*RedisModuleForkDoneHandler) (int exitcode, int bysignal, void *user_data);
Expand Down Expand Up @@ -5969,95 +5965,58 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
// as the server thread acquisition is sufficient. If we did try to lock we would deadlock
static bool FModuleCallBackLock(bool fServerThread)
{
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_cAcquisitionsServer > 0;
return !fServerThread && aeThreadOwnsLock() && !g_fModuleThread && s_moduleGIL.hasReader();
}
void moduleAcquireGIL(int fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex);
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;

if (FModuleCallBackLock(fServerThread)) {
return;
}

while (*pcheck > 0)
s_cv.wait(lock);

if (fServerThread)
{
++s_cAcquisitionsServer;
s_moduleGIL.acquireRead();
}
else
{
// only try to acquire the mutexModule in exclusive mode
if (fExclusive){
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
// As a result, a deadlock has occured.
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
// in order to prevent this deadlock from occuring.
while (!s_mutexModule.try_lock())
s_cv.wait(lock);
}
++s_cAcquisitionsModule;
fModuleGILWlocked++;
s_moduleGIL.acquireWrite(fExclusive);
}
}

int moduleTryAcquireGIL(bool fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex, std::defer_lock);
if (!lock.try_lock())
return 1;
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;

if (FModuleCallBackLock(fServerThread)) {
return 0;
}

if (*pcheck > 0)
return 1;

if (fServerThread)
{
++s_cAcquisitionsServer;
if (!s_moduleGIL.tryAcquireRead())
return 1;
}
else
{
// only try to acquire the mutexModule in exclusive mode
if (fExclusive){
if (!s_mutexModule.try_lock())
return 1;
}
++s_cAcquisitionsModule;
fModuleGILWlocked++;
if (!s_moduleGIL.tryAcquireWrite(fExclusive))
return 1;
}
return 0;
}

void moduleReleaseGIL(int fServerThread, int fExclusive) {
std::unique_lock<std::mutex> lock(s_mutex);

if (FModuleCallBackLock(fServerThread)) {
return;
}

if (fServerThread)
{
--s_cAcquisitionsServer;
s_moduleGIL.releaseRead();
}
else
{
// only try to release the mutexModule in exclusive mode
if (fExclusive)
s_mutexModule.unlock();
--s_cAcquisitionsModule;
fModuleGILWlocked--;
s_moduleGIL.releaseWrite(fExclusive);
}
s_cv.notify_all();
}

int moduleGILAcquiredByModule(void) {
return fModuleGILWlocked > 0;
return s_moduleGIL.hasWriter();
}


Expand Down
93 changes: 24 additions & 69 deletions src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3863,82 +3863,37 @@ int checkClientPauseTimeoutAndReturnIfPaused(void) {
*
* The function returns the total number of events processed. */
void processEventsWhileBlocked(int iel) {
serverAssert(GlobalLocksAcquired());
int iterations = 4; /* See the function top-comment. */

std::vector<client*> vecclients;
listIter li;
listNode *ln;
listRewind(g_pserver->clients, &li);

// All client locks must be acquired *after* the global lock is reacquired to prevent deadlocks
// so unlock here, and save them for reacquisition later
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->lock.fOwnLock()) {
serverAssert(c->flags & CLIENT_PROTECTED); // If the client is not protected we have no gurantee they won't be free'd in the event loop
c->lock.unlock();
vecclients.push_back(c);
int eventsCount = 0;
executeWithoutGlobalLock([&](){
int iterations = 4; /* See the function top-comment. */
try
{
ProcessingEventsWhileBlocked = 1;
while (iterations--) {
long long startval = g_pserver->events_processed_while_blocked;
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that g_pserver->events_processed_while_blocked will also get
* incremeted by callbacks called by the event loop handlers. */
eventsCount += ae_events;
long long events = eventsCount - startval;
if (!events) break;
}
ProcessingEventsWhileBlocked = 0;
}
}

/* Since we're about to release our lock we need to flush the repl backlog queue */
bool fReplBacklog = g_pserver->repl_batch_offStart >= 0;
if (fReplBacklog) {
flushReplBacklogToClients();
g_pserver->repl_batch_idxStart = -1;
g_pserver->repl_batch_offStart = -1;
}

long long eventsCount = 0;
aeReleaseLock();
serverAssert(!GlobalLocksAcquired());
try
{
ProcessingEventsWhileBlocked = 1;
while (iterations--) {
long long startval = g_pserver->events_processed_while_blocked;
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that g_pserver->events_processed_while_blocked will also get
* incremeted by callbacks called by the event loop handlers. */
eventsCount += ae_events;
long long events = eventsCount - startval;
if (!events) break;
}
ProcessingEventsWhileBlocked = 0;
}
catch (...)
{
// Caller expects us to be locked so fix and rethrow
ProcessingEventsWhileBlocked = 0;
AeLocker locker;
locker.arm(nullptr);
locker.release();
for (client *c : vecclients)
c->lock.lock();
throw;
}

AeLocker locker;
locker.arm(nullptr);
locker.release();
catch (...)
{
ProcessingEventsWhileBlocked = 0;
throw;
}
});

g_pserver->events_processed_while_blocked += eventsCount;

whileBlockedCron();

// Restore it so the calling code is not confused
if (fReplBacklog) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}

for (client *c : vecclients)
c->lock.lock();

// If a different thread processed the shutdown we need to abort the lua command or we will hang
if (serverTL->el->stop)
throw ShutdownException();
Expand Down
113 changes: 113 additions & 0 deletions src/readwritelock.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#pragma once
#include <condition_variable>

class readWriteLock {
std::mutex m_readLock;
std::recursive_mutex m_writeLock;
std::condition_variable m_cv;
int m_readCount = 0;
int m_writeCount = 0;
bool m_writeWaiting = false;
public:
void acquireRead() {
std::unique_lock<std::mutex> rm(m_readLock);
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
}

bool tryAcquireRead() {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
if (!rm.try_lock())
return false;
if (m_writeCount > 0 || m_writeWaiting)
return false;
m_readCount++;
return true;
}

void acquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
m_writeWaiting = true;
while (m_readCount > 0)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_writeWaiting = false;
}

void upgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
m_writeWaiting = true;
while (m_readCount > 1)
m_cv.wait(rm);
if (exclusive) {
/* Another thread might have the write lock while we have the read lock
but won't be able to release it until they can acquire the read lock
so release the read lock and try again instead of waiting to avoid deadlock */
while(!m_writeLock.try_lock())
m_cv.wait(rm);
}
m_writeCount++;
m_readCount--;
m_writeWaiting = false;
}

bool tryAcquireWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock, std::defer_lock);
if (!rm.try_lock())
return false;
if (m_readCount > 0)
return false;
if (exclusive)
if (!m_writeLock.try_lock())
return false;
m_writeCount++;
return true;
}

void releaseRead() {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_readCount > 0);
m_readCount--;
m_cv.notify_all();
}

void releaseWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_writeCount > 0);
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
m_cv.notify_all();
}

void downgradeWrite(bool exclusive = true) {
std::unique_lock<std::mutex> rm(m_readLock);
serverAssert(m_writeCount > 0);
if (exclusive)
m_writeLock.unlock();
m_writeCount--;
while (m_writeCount > 0 || m_writeWaiting)
m_cv.wait(rm);
m_readCount++;
}

bool hasReader() {
return m_readCount > 0;
}

bool hasWriter() {
return m_writeCount > 0;
}

bool writeWaiting() {
return m_writeWaiting;
}
};
Loading

0 comments on commit dabb819

Please sign in to comment.