Skip to content

Commit

Permalink
Fix module locking issues
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSully committed May 9, 2019
1 parent 37b2d04 commit bf26959
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 53 deletions.
81 changes: 53 additions & 28 deletions src/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
#include "server.h"
#include "cluster.h"
#include <dlfcn.h>
#include <mutex>
#include <condition_variable>

#define REDISMODULE_CORE 1
#include "redismodule.h"
Expand Down Expand Up @@ -235,7 +237,6 @@ static list *moduleUnblockedClients;

/* We need a mutex that is unlocked / relocked in beforeSleep() in order to
* allow thread safe contexts to execute commands at a safe moment. */
static pthread_rwlock_t moduleGIL = PTHREAD_RWLOCK_INITIALIZER;
int fModuleGILWlocked = FALSE;

/* Function pointer type for keyspace event notification subscriptions from modules. */
Expand Down Expand Up @@ -293,6 +294,12 @@ typedef struct RedisModuleCommandFilter {
/* Registered filters */
static list *moduleCommandFilters;

/* Module GIL Variables */
static int s_cAcquisitionsServer = 0;
static int s_cAcquisitionsModule = 0;
static std::mutex s_mutex;
static std::condition_variable s_cv;

/* --------------------------------------------------------------------------
* Prototypes
* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -2750,7 +2757,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
int replicate = 0; /* Replicate this command? */
int call_flags;
sds proto = nullptr;

/* Create the client and dispatch the command. */
va_start(ap, fmt);
c = createClient(-1, IDX_EVENT_LOOP_MAIN);
Expand Down Expand Up @@ -3663,9 +3670,22 @@ int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
pthread_mutex_lock(&moduleUnblockedClientsMutex);
bc->privdata = privdata;
listAddNodeTail(moduleUnblockedClients,bc);
if (write(g_pserver->module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
if (bc->client != nullptr)
{
if (write(g_pserver->rgthreadvar[bc->client->iel].module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
}
else
{
for (int iel = 0; iel < cserver.cthreads; ++iel)
{
if (write(g_pserver->rgthreadvar[iel].module_blocked_pipe[1],"A",1) != 1) {
/* Ignore the error, this is best-effort. */
}
}
}

pthread_mutex_unlock(&moduleUnblockedClientsMutex);
return REDISMODULE_OK;
}
Expand Down Expand Up @@ -3706,21 +3726,24 @@ void RM_SetDisconnectCallback(RedisModuleBlockedClient *bc, RedisModuleDisconnec
* blocked client, it was terminated by Redis (for timeout or other reasons).
* When this happens the RedisModuleBlockedClient structure in the queue
* will have the 'client' field set to NULL. */
void moduleHandleBlockedClients(void) {
listNode *ln;
void moduleHandleBlockedClients(int iel) {
RedisModuleBlockedClient *bc;
serverAssert(GlobalLocksAcquired());

pthread_mutex_lock(&moduleUnblockedClientsMutex);
/* Here we unblock all the pending clients blocked in modules operations
* so we can read every pending "awake byte" in the pipe. */
char buf[1];
while (read(g_pserver->module_blocked_pipe[0],buf,1) == 1);
while (listLength(moduleUnblockedClients)) {
ln = listFirst(moduleUnblockedClients);
while (read(serverTL->module_blocked_pipe[0],buf,1) == 1);
listIter li;
listNode *ln;
listRewind(moduleUnblockedClients, &li);
while ((ln = listNext(&li))) {
bc = (RedisModuleBlockedClient*)ln->value;
client *c = bc->client;
serverAssert(c->iel == IDX_EVENT_LOOP_MAIN);
if ((c != nullptr) && (iel != c->iel))
continue;

listDelNode(moduleUnblockedClients,ln);
pthread_mutex_unlock(&moduleUnblockedClientsMutex);

Expand Down Expand Up @@ -3919,23 +3942,36 @@ void RM_ThreadSafeContextUnlock(RedisModuleCtx *ctx) {
}

void moduleAcquireGIL(int fServerThread) {
std::unique_lock<std::mutex> lock(s_mutex);
int *pcheck = fServerThread ? &s_cAcquisitionsModule : &s_cAcquisitionsServer;

while (*pcheck > 0)
s_cv.wait(lock);

if (fServerThread)
{
pthread_rwlock_rdlock(&moduleGIL);
++s_cAcquisitionsServer;
}
else
{
pthread_rwlock_wrlock(&moduleGIL);
fModuleGILWlocked = TRUE;
++s_cAcquisitionsModule;
fModuleGILWlocked++;
}
}

void moduleReleaseGIL(int fServerThread) {
pthread_rwlock_unlock(&moduleGIL);
if (!fServerThread)
std::unique_lock<std::mutex> lock(s_mutex);

if (fServerThread)
{
--s_cAcquisitionsServer;
}
else
{
fModuleGILWlocked = FALSE;
--s_cAcquisitionsModule;
fModuleGILWlocked--;
}
s_cv.notify_all();
}

int moduleGILAcquiredByModule(void) {
Expand Down Expand Up @@ -5102,24 +5138,13 @@ void moduleInitModulesSystem(void) {
moduleCommandFilters = listCreate();

moduleRegisterCoreAPI();
if (pipe(g_pserver->module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}
/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,g_pserver->module_blocked_pipe[0]);
anetNonBlock(NULL,g_pserver->module_blocked_pipe[1]);

/* Create the timers radix tree. */
Timers = raxNew();

/* Our thread-safe contexts GIL must start with already locked:
* it is just unlocked when it's safe. */
pthread_rwlock_init(&moduleGIL, NULL);
pthread_rwlock_rdlock(&moduleGIL);
moduleAcquireGIL(true);
}

/* Load all the modules in the g_pserver->loadmodule_queue list, which is
Expand Down
2 changes: 1 addition & 1 deletion src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1200,7 +1200,7 @@ void unlinkClient(client *c) {
serverAssert(c->fd == -1 || c->lock.fOwnLock());

/* If this is marked as current client unset it. */
if (serverTL->current_client == c) serverTL->current_client = NULL;
if (serverTL && serverTL->current_client == c) serverTL->current_client = NULL;

/* Certain operations must be done only if the client has an active socket.
* If the client was already unlinked or if it's a "fake client" the
Expand Down
58 changes: 38 additions & 20 deletions src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2103,7 +2103,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {

/* Check if there are clients unblocked by modules that implement
* blocking commands. */
moduleHandleBlockedClients();
moduleHandleBlockedClients(ielFromEventLoop(eventLoop));

/* Try to process pending commands for clients that were just unblocked. */
if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients))
Expand Down Expand Up @@ -2134,6 +2134,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) {
processUnblockedClients(iel);
}

/* Check if there are clients unblocked by modules that implement
* blocking commands. */
moduleHandleBlockedClients(ielFromEventLoop(eventLoop));
aeReleaseLock();

/* Handle writes with pending output buffers. */
Expand Down Expand Up @@ -2872,6 +2876,37 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
exit(1);
}
}

if (pipe(pvar->module_blocked_pipe) == -1) {
serverLog(LL_WARNING,
"Can't create the pipe for module blocking commands: %s",
strerror(errno));
exit(1);
}

/* Make the pipe non blocking. This is just a best effort aware mechanism
* and we do not want to block not in the read nor in the write half. */
anetNonBlock(NULL,pvar->module_blocked_pipe[0]);
anetNonBlock(NULL,pvar->module_blocked_pipe[1]);

/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}


/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(pvar->el, pvar->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
}

void initServer(void) {
Expand Down Expand Up @@ -2946,25 +2981,6 @@ void initServer(void) {
exit(1);
}

/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}


/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
if (aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}

/* Open the AOF file if needed. */
if (g_pserver->aof_state == AOF_ON) {
g_pserver->aof_fd = open(g_pserver->aof_filename,
Expand Down Expand Up @@ -4907,6 +4923,7 @@ void *workerThreadMain(void *parg)
serverLog(LOG_INFO, "Thread %d alive.", iel);
serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global

moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
Expand Down Expand Up @@ -5143,6 +5160,7 @@ int main(int argc, char **argv) {
}

aeReleaseLock(); //Finally we can dump the lock
moduleReleaseGIL(true);

serverAssert(cserver.cthreads > 0 && cserver.cthreads <= MAX_EVENT_LOOPS);
pthread_t rgthread[MAX_EVENT_LOOPS];
Expand Down
8 changes: 4 additions & 4 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1145,6 +1145,9 @@ struct redisServerThreadVars {
list *clients_pending_asyncwrite;
int cclients;
client *current_client; /* Current client */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
struct fastlock lockPendingWrite;
};

Expand Down Expand Up @@ -1247,9 +1250,6 @@ struct redisServer {
dict *sharedapi; /* Like moduleapi but containing the APIs that
modules share with each other. */
list *loadmodule_queue; /* List of modules to load at startup. */
int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a
client blocked on a module command needs
to be processed. */
/* Networking */
int port; /* TCP listening port */
int tcp_backlog; /* TCP listen() backlog */
Expand Down Expand Up @@ -1665,7 +1665,7 @@ moduleType *moduleTypeLookupModuleByID(uint64_t id);
void moduleTypeNameByID(char *name, uint64_t moduleid);
void moduleFreeContext(struct RedisModuleCtx *ctx);
void unblockClientFromModule(client *c);
void moduleHandleBlockedClients(void);
void moduleHandleBlockedClients(int iel);
void moduleBlockedClientTimedOut(client *c);
void moduleBlockedClientPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask);
size_t moduleCount(void);
Expand Down

0 comments on commit bf26959

Please sign in to comment.