Skip to content

Commit

Permalink
Merge branch 'unstable' into RELEASE_6
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSully committed Feb 7, 2021
2 parents d30fcfb + 165333b commit 0b7b533
Show file tree
Hide file tree
Showing 8 changed files with 61 additions and 99 deletions.
4 changes: 2 additions & 2 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ endif
USEASM?=true

ifneq ($(strip $(SANITIZE)),)
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE -fno-omit-frame-pointer
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE -fno-omit-frame-pointer
LDFLAGS+= -fsanitize=$(SANITIZE)
MALLOC=libc
USEASM=false
Expand Down
70 changes: 5 additions & 65 deletions src/ae.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,6 @@ enum class AE_ASYNC_OP
CreateFileEvent,
};

struct aeCommandControl
{
std::condition_variable cv;
std::atomic<int> rval;
std::mutex mutexcv;
};

struct aeCommand
{
AE_ASYNC_OP op;
Expand All @@ -128,7 +121,6 @@ struct aeCommand
std::function<void()> *pfn;
};
void *clientData;
aeCommandControl *pctl;
};

void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
Expand All @@ -149,19 +141,7 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
break;

case AE_ASYNC_OP::CreateFileEvent:
{
if (cmd.pctl != nullptr)
{
cmd.pctl->mutexcv.lock();
std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData));
cmd.pctl->cv.notify_all();
cmd.pctl->mutexcv.unlock();
}
else
{
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
}
}
aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData);
break;

case AE_ASYNC_OP::PostFunction:
Expand All @@ -175,19 +155,11 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )

case AE_ASYNC_OP::PostCppFunction:
{
if (cmd.pctl != nullptr)
cmd.pctl->mutexcv.lock();

std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (cmd.fLock)
ulock.lock();
(*cmd.pfn)();

if (cmd.pctl != nullptr)
{
cmd.pctl->cv.notify_all();
cmd.pctl->mutexcv.unlock();
}

delete cmd.pfn;
}
break;
Expand Down Expand Up @@ -226,7 +198,7 @@ ssize_t safe_write(int fd, const void *pv, size_t cb)
}

int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData, int fSynchronous)
aeFileProc *proc, void *clientData)
{
if (eventLoop == g_eventLoopThisThread)
return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData);
Expand All @@ -239,13 +211,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
cmd.mask = mask;
cmd.fproc = proc;
cmd.clientData = clientData;
cmd.pctl = nullptr;
cmd.fLock = true;
if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
cmd.pctl->mutexcv.lock();
}

auto size = safe_write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (size != sizeof(cmd))
Expand All @@ -254,16 +220,6 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
serverAssert(errno == EAGAIN);
ret = AE_ERR;
}

if (fSynchronous)
{
{
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval;
}
delete cmd.pctl;
}

return ret;
}
Expand All @@ -286,7 +242,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
return AE_OK;
}

int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock, bool fForceQueue)
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock, bool fForceQueue)
{
if (eventLoop == g_eventLoopThisThread && !fForceQueue)
{
Expand All @@ -297,13 +253,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
aeCommand cmd = {};
cmd.op = AE_ASYNC_OP::PostCppFunction;
cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn);
cmd.pctl = nullptr;
cmd.fLock = fLock;
if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl;
cmd.pctl->mutexcv.lock();
}

auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (!(!size || size == sizeof(cmd))) {
Expand All @@ -314,17 +264,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
if (size == 0)
return AE_ERR;

int ret = AE_OK;
if (fSynchronous)
{
{
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval;
}
delete cmd.pctl;
}
return ret;
return AE_OK;
}

aeEventLoop *aeCreateEventLoop(int setsize) {
Expand Down
4 changes: 2 additions & 2 deletions src/ae.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
#ifdef __cplusplus
} // EXTERN C
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true, bool fForceQueue = false);
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fLock = true, bool fForceQueue = false);
extern "C" {
#endif
void aeDeleteEventLoop(aeEventLoop *eventLoop);
Expand All @@ -144,7 +144,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);

int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData, int fSynchronous);
aeFileProc *proc, void *clientData);

void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask);
Expand Down
13 changes: 8 additions & 5 deletions src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5066,12 +5066,15 @@ void mvccrestoreCommand(client *c) {
setMvccTstamp(obj, mvcc);

/* Create the key and set the TTL if any */
dbMerge(c->db,key,obj,true);
if (expire >= 0) {
setExpire(c,c->db,key,nullptr,expire);
if (dbMerge(c->db,key,obj,true)) {
if (expire >= 0) {
setExpire(c,c->db,key,nullptr,expire);
}
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
} else {
decrRefCount(obj);
}
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
addReply(c,shared.ok);
g_pserver->dirty++;
}
Expand Down
2 changes: 1 addition & 1 deletion src/expire.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
executeCronJobExpireHook(keyCopy, val);
sdsfree(keyCopy);
decrRefCount(val);
}, false, true /*fLock*/, true /*fForceQueue*/);
}, true /*fLock*/, true /*fForceQueue*/);
}
return;

Expand Down
64 changes: 41 additions & 23 deletions src/module.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5045,7 +5045,7 @@ void RM_FreeThreadSafeContext(RedisModuleCtx *ctx) {
zfree(ctx);
}

static bool g_fModuleThread = false;
thread_local bool g_fModuleThread = false;
/* Acquire the server lock before executing a thread safe API call.
* This is not needed for `RedisModule_Reply*` calls when there is
* a blocked client connected to the thread safe context. */
Expand Down Expand Up @@ -5104,7 +5104,14 @@ void moduleAcquireGIL(int fServerThread) {
}
else
{
s_mutexModule.lock();
// It is possible that another module thread holds the GIL (and s_mutexModule as a result).
// When said thread goes to release the GIL, it will wait for s_mutex, which this thread owns.
// This thread is however waiting for the GIL (and s_mutexModule) that the other thread owns.
// As a result, a deadlock has occured.
// We release the lock on s_mutex and wait until we are able to safely acquire the GIL
// in order to prevent this deadlock from occuring.
while (!s_mutexModule.try_lock())
s_cv.wait(lock);
++s_cAcquisitionsModule;
fModuleGILWlocked++;
}
Expand Down Expand Up @@ -5643,6 +5650,9 @@ int moduleTimerHandler(struct aeEventLoop *eventLoop, long long id, void *client
* (If the time it takes to execute 'callback' is negligible the two
* statements above mean the same) */
RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisModuleTimerProc callback, void *data) {
static uint64_t pending_key;
static mstime_t pending_period = -1;

RedisModuleTimer *timer = (RedisModuleTimer*)zmalloc(sizeof(*timer), MALLOC_LOCAL);
timer->module = ctx->module;
timer->callback = callback;
Expand All @@ -5661,32 +5671,40 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
}
}

bool fNeedPost = (pending_period < 0); // If pending_period is already set, then a PostFunction is in flight and we don't need to set a new one
if (pending_period < 0 || period < pending_period) {
pending_period = period;
pending_key = key;
}

/* We need to install the main event loop timer if it's not already
* installed, or we may need to refresh its period if we just installed
* a timer that will expire sooner than any other else (i.e. the timer
* we just installed is the first timer in the Timers rax). */
if (aeTimer != -1) {
raxIterator ri;
raxStart(&ri,Timers);
raxSeek(&ri,"^",NULL,0);
raxNext(&ri);
if (memcmp(ri.key,&key,sizeof(key)) == 0) {
/* This is the first key, we need to re-install the timer according
* to the just added event. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
}, true /* synchronous */, false /* fLock */);
aeTimer = -1;
}
raxStop(&ri);
}
if (fNeedPost) {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, []{
if (aeTimer != -1) {
raxIterator ri;
raxStart(&ri,Timers);
raxSeek(&ri,"^",NULL,0);
raxNext(&ri);
if (memcmp(ri.key,&pending_key,sizeof(key)) == 0) {
/* This is the first key, we need to re-install the timer according
* to the just added event. */
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
aeTimer = -1;
}
raxStop(&ri);
}

/* If we have no main timer (the old one was invalidated, or this is the
* first module timer we have), install one. */
if (aeTimer == -1) {
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,pending_period,moduleTimerHandler,NULL,NULL);
}

/* If we have no main timer (the old one was invalidated, or this is the
* first module timer we have), install one. */
if (aeTimer == -1) {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL);
}, true /* synchronous */, false /* fLock */);
pending_period = -1;
});
}

return key;
Expand Down
2 changes: 1 addition & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4087,7 +4087,7 @@ bool client::postFunction(std::function<void(client *)> fn, bool fLock) {
std::lock_guard<decltype(this->lock)> lock(this->lock);
fn(this);
--casyncOpsPending;
}, false, fLock) == AE_OK;
}, fLock) == AE_OK;
}

/*================================== Shutdown =============================== */
Expand Down
1 change: 1 addition & 0 deletions tests/unit/moduleapi/hooks.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ tags "modules" {
}

$replica replicaof no one
after 300

test {Test role-master hook} {
assert_equal [r hooks.event_count role-replica] 1
Expand Down

0 comments on commit 0b7b533

Please sign in to comment.