Skip to content

Commit

Permalink
Two fixes: 1) Remove race conditions by not locking clients when asyn…
Browse files Browse the repository at this point in the history
…c writing. 2) Don't derefence dangling pointers in lambda
  • Loading branch information
JohnSully committed Oct 25, 2019
1 parent 549cad3 commit cb93752
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
6 changes: 5 additions & 1 deletion src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,11 @@ void clientInstallAsyncWriteHandler(client *c) {
int prepareClientToWrite(client *c, bool fAsync) {
fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread
serverAssert(FCorrectThread(c) || fAsync);
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
if (FCorrectThread(c)) {
serverAssert(c->fd <= 0 || c->lock.fOwnLock());
} else {
serverAssert(GlobalLocksAcquired());
}

if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer.
// do not install a write handler
Expand Down
32 changes: 27 additions & 5 deletions src/replication.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
std::unique_lock<decltype(replica->lock)> lock(replica->lock);
std::unique_lock<decltype(replica->lock)> lock(replica->lock, std::defer_lock);
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
if (FCorrectThread(replica))
lock.lock();
if (serverTL->current_client && FSameHost(serverTL->current_client, replica))
{
replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start;
Expand Down Expand Up @@ -434,7 +437,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle

while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
std::lock_guard<decltype(replica->lock)> ulock(replica->lock);
std::unique_lock<decltype(replica->lock)> ulock(replica->lock, std::defer_lock);
if (FCorrectThread(replica))
ulock.lock();
if (FMasterHost(replica))
continue; // Active Active case, don't feed back

Expand Down Expand Up @@ -483,7 +488,10 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv,
listRewind(monitors,&li);
while((ln = listNext(&li))) {
client *monitor = (client*)ln->value;
std::lock_guard<decltype(monitor->lock)> lock(monitor->lock);
std::unique_lock<decltype(monitor->lock)> lock(monitor->lock, std::defer_lock);
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
if (FCorrectThread(c))
lock.lock();
addReplyAsync(monitor,cmdobj);
}
decrRefCount(cmdobj);
Expand Down Expand Up @@ -1206,7 +1214,21 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
}
else
{
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica]{
aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] {
// Because the client could have been closed while the lambda waited to run we need to
// verify the replica is still connected
listIter li;
listNode *ln;
listRewind(g_pserver->slaves,&li);
bool fFound = false;
while ((ln = listNext(&li))) {
if (listNodeValue(ln) == replica) {
fFound = true;
break;
}
}
if (!fFound)
return;
aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE);
if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) {
freeClient(replica);
Expand Down Expand Up @@ -3379,4 +3401,4 @@ void updateMasterAuth()
if (cserver.default_masteruser)
mi->masteruser = zstrdup(cserver.default_masteruser);
}
}
}

0 comments on commit cb93752

Please sign in to comment.