diff --git a/src/Makefile b/src/Makefile index be4c07369..9d4c22cf3 100644 --- a/src/Makefile +++ b/src/Makefile @@ -47,8 +47,6 @@ endif USEASM?=true ifneq ($(SANITIZE),) - CC=clang - CXX=clang++ CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE LDFLAGS+= -fsanitize=$(SANITIZE) @@ -190,7 +188,7 @@ endif REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS) REDIS_CXX=$(QUIET_CC)$(CC) $(FINAL_CXXFLAGS) -REDIS_NASM=$(QUIET_CC)nasm -felf64 +KEYDB_AS=$(QUIET_CC) as --64 -g REDIS_LD=$(QUIET_LINK)$(CXX) $(FINAL_LDFLAGS) REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL) @@ -295,7 +293,7 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c $(REDIS_CXX) -c $< %.o: %.asm .make-prerequisites - $(REDIS_NASM) $< + $(KEYDB_AS) $< -o $@ clean: rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark diff --git a/src/ae.cpp b/src/ae.cpp index f636078b1..0deec264f 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -80,7 +80,7 @@ class mutex_wrapper mutex_wrapper g_lock; #else -fastlock g_lock; +fastlock g_lock("AE (global)"); #endif thread_local aeEventLoop *g_eventLoopThisThread = NULL; @@ -327,7 +327,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) { for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; - fastlock_init(&eventLoop->flock); + fastlock_init(&eventLoop->flock, "event loop"); int rgfd[2]; if (pipe(rgfd) < 0) goto err; diff --git a/src/aof.cpp b/src/aof.cpp index e18bce652..719b72ed9 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -678,7 +678,7 @@ client *createFakeClient(void) { c->puser = NULL; listSetFreeMethod(c->reply,freeClientReplyValue); listSetDupMethod(c->reply,dupClientReplyValue); - fastlock_init(&c->lock); + fastlock_init(&c->lock, "fake client"); fastlock_lock(&c->lock); initClientMultiState(c); return c; diff --git a/src/db.cpp b/src/db.cpp index f34245964..d8aa0dc47 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -643,10 +643,6 @@ void keysCommand(client *c) { unsigned long numkeys = 0; void *replylen = addReplyDeferredLen(c); -#ifdef MULTITHREADED_KEYS - aeReleaseLock(); -#endif - di = dictGetSafeIterator(c->db->pdict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); while((de = dictNext(di)) != NULL) { @@ -664,14 +660,6 @@ void keysCommand(client *c) { } dictReleaseIterator(di); setDeferredArrayLen(c,replylen,numkeys); - -#ifdef MULTITHREADED_KEYS - fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks - AeLocker lock; - lock.arm(c); - fastlock_lock(&c->db->lock); // we still need the DB lock - lock.release(); -#endif } /* This callback is used by scanGenericCommand in order to collect elements diff --git a/src/debug.cpp b/src/debug.cpp index 3a4520776..234f197be 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -55,7 +55,7 @@ typedef ucontext_t sigcontext_t; #endif #endif -bool g_fInCrash = false; +int g_fInCrash = false; /* ================================= Debugging ============================== */ diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 71a49a1e8..c74ca8358 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -36,10 +36,13 @@ #include #include #include +#include #ifdef __linux__ #include #endif #include +#include +#include #ifdef __APPLE__ #include @@ -57,6 +60,7 @@ #define UNUSED(x) ((void)x) #endif +extern int g_fInCrash; /**************************************************** * @@ -125,6 +129,80 @@ #endif +#pragma weak _serverPanic +extern "C" __attribute__((weak)) void _serverPanic(const char * /*file*/, int /*line*/, const char * /*msg*/, ...) +{ + *((char*)-1) = 'x'; +} + +#pragma weak serverLog +__attribute__((weak)) void serverLog(int , const char *fmt, ...) +{ + va_list args; + va_start(args, fmt); + vprintf(fmt, args); + va_end(args); + printf("\n"); +} + +class DeadlockDetector +{ + std::map m_mapwait; + fastlock m_lock { "deadlock detector" }; +public: + void registerwait(fastlock *lock, pid_t thispid) + { + if (lock == &m_lock || g_fInCrash) + return; + fastlock_lock(&m_lock); + m_mapwait.insert(std::make_pair(thispid, lock)); + + // Detect cycles + pid_t pidCheck = thispid; + size_t cchecks = 0; + for (;;) + { + auto itr = m_mapwait.find(pidCheck); + if (itr == m_mapwait.end()) + break; + pidCheck = itr->second->m_pidOwner; + if (pidCheck == thispid) + { + // Deadlock detected, printout some debugging info and crash + serverLog(3 /*LL_WARNING*/, "\n\n"); + serverLog(3 /*LL_WARNING*/, "!!! ERROR: Deadlock detected !!!"); + pidCheck = thispid; + for (;;) + { + auto itr = m_mapwait.find(pidCheck); + serverLog(3 /* LL_WARNING */, "\t%d: (%p) %s", pidCheck, itr->second, itr->second->szName); + pidCheck = itr->second->m_pidOwner; + if (pidCheck == thispid) + break; + } + serverLog(3 /*LL_WARNING*/, "!!! KeyDB Will Now Crash !!!"); + _serverPanic(__FILE__, __LINE__, "Deadlock detected"); + } + + if (cchecks > m_mapwait.size()) + break; // There is a cycle but we're not in it + ++cchecks; + } + fastlock_unlock(&m_lock); + } + + void clearwait(fastlock *lock, pid_t thispid) + { + if (lock == &m_lock || g_fInCrash) + return; + fastlock_lock(&m_lock); + m_mapwait.erase(thispid); + fastlock_unlock(&m_lock); + } +}; + +DeadlockDetector g_dlock; + static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); uint64_t g_longwaits = 0; @@ -135,7 +213,6 @@ uint64_t fastlock_getlongwaitcount() return rval; } -#ifndef ASM_SPINLOCK #ifdef __linux__ static int futex(volatile unsigned *uaddr, int futex_op, int val, const struct timespec *timeout, int val3) @@ -144,7 +221,6 @@ static int futex(volatile unsigned *uaddr, int futex_op, int val, timeout, uaddr, val3); } #endif -#endif extern "C" pid_t gettid() { @@ -163,13 +239,26 @@ extern "C" pid_t gettid() return pidCache; } -extern "C" void fastlock_init(struct fastlock *lock) +extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask) +{ +#ifdef __linux__ + g_dlock.registerwait(lock, pid); + __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); + futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask); + __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); + g_dlock.clearwait(lock, pid); +#endif + __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED); +} + +extern "C" void fastlock_init(struct fastlock *lock, const char *name) { lock->m_ticket.m_active = 0; lock->m_ticket.m_avail = 0; lock->m_depth = 0; lock->m_pidOwner = -1; lock->futex = 0; + lock->szName = name; ANNOTATE_RWLOCK_CREATE(lock); } @@ -184,12 +273,12 @@ extern "C" void fastlock_lock(struct fastlock *lock) return; } + int tid = gettid(); unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); -#ifdef __linux__ unsigned mask = (1U << (myticket % 32)); -#endif int cloops = 0; ticket ticketT; + for (;;) { __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); @@ -201,17 +290,11 @@ extern "C" void fastlock_lock(struct fastlock *lock) #endif if ((++cloops % 1024*1024) == 0) { -#ifdef __linux__ - __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); - futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask); - __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); -#endif - __atomic_fetch_add(&g_longwaits, 1, __ATOMIC_RELAXED); + fastlock_sleep(lock, tid, ticketT.u, mask); } } lock->m_depth = 1; - int tid = gettid(); __atomic_store(&lock->m_pidOwner, &tid, __ATOMIC_RELEASE); ANNOTATE_RWLOCK_ACQUIRED(lock, true); std::atomic_thread_fence(std::memory_order_acquire); diff --git a/src/fastlock.h b/src/fastlock.h index c7a40bdf3..0117049a6 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -7,7 +7,7 @@ extern "C" { /* Begin C API */ struct fastlock; -void fastlock_init(struct fastlock *lock); +void fastlock_init(struct fastlock *lock, const char *name); void fastlock_lock(struct fastlock *lock); int fastlock_trylock(struct fastlock *lock, int fWeak); void fastlock_unlock(struct fastlock *lock); @@ -45,24 +45,25 @@ struct fastlock volatile int m_pidOwner; volatile int m_depth; unsigned futex; + const char *szName; #ifdef __cplusplus - fastlock() + fastlock(const char *name) { - fastlock_init(this); + fastlock_init(this, name); } - void lock() + inline void lock() { fastlock_lock(this); } - bool try_lock(bool fWeak = false) + inline bool try_lock(bool fWeak = false) { return !!fastlock_trylock(this, fWeak); } - void unlock() + inline void unlock() { fastlock_unlock(this); } diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index baf33654f..6c9df490e 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -1,156 +1,160 @@ -section .text +.intel_syntax noprefix +.text -extern gettid -extern sched_yield -extern g_longwaits +.extern gettid +.extern fastlock_sleep -; This is the first use of assembly in this codebase, a valid question is WHY? -; The spinlock we implement here is performance critical, and simply put GCC -; emits awful code. The original C code is left in fastlock.cpp for reference -; and x-plat. +# This is the first use of assembly in this codebase, a valid question is WHY? +# The spinlock we implement here is performance critical, and simply put GCC +# emits awful code. The original C code is left in fastlock.cpp for reference +# and x-plat. -ALIGN 16 -global fastlock_lock +.ALIGN 16 +.global fastlock_lock +.type fastlock_lock,@function fastlock_lock: - ; RDI points to the struct: - ; uint16_t active - ; uint16_t avail - ; int32_t m_pidOwner - ; int32_t m_depth + .cfi_startproc + .cfi_def_cfa rsp, 8 + # RDI points to the struct: + # uint16_t active + # uint16_t avail + # int32_t m_pidOwner + # int32_t m_depth - ; First get our TID and put it in ecx - push rdi ; we need our struct pointer (also balance the stack for the call) - call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining) - mov esi, eax ; back it up in esi - pop rdi ; get our pointer back + # First get our TID and put it in ecx + push rdi # we need our struct pointer (also balance the stack for the call) + .cfi_adjust_cfa_offset 8 + call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining) + mov esi, eax # back it up in esi + pop rdi # get our pointer back + .cfi_adjust_cfa_offset -8 - cmp [rdi+4], esi ; Is the TID we got back the owner of the lock? - je .LLocked ; Don't spin in that case + cmp [rdi+4], esi # Is the TID we got back the owner of the lock? + je .LLocked # Don't spin in that case - xor eax, eax ; eliminate partial register dependency - inc eax ; we want to add one - lock xadd [rdi+2], ax ; do the xadd, ax contains the value before the addition - ; ax now contains the ticket -ALIGN 16 + xor eax, eax # eliminate partial register dependency + inc eax # we want to add one + lock xadd [rdi+2], ax # do the xadd, ax contains the value before the addition + # ax now contains the ticket + # OK Start the wait loop + xor ecx, ecx +.ALIGN 16 .LLoop: mov edx, [rdi] - cmp dx, ax ; is our ticket up? - je .LLocked ; leave the loop + cmp dx, ax # is our ticket up? + je .LLocked # leave the loop pause - add ecx, 1000h ; Have we been waiting a long time? (oflow if we have) - ; 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) - jnc .LLoop ; If so, give up our timeslice to someone who's doing real work - ; Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" - ; But the compiler doesn't know that we rarely hit this, and when we do we know the lock is - ; taking a long time to be released anyways. We optimize for the common case of short - ; lock intervals. That's why we're using a spinlock in the first place - ; If we get here we're going to sleep in the kernel with a futex + add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have) + # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) + jnc .LLoop # If so, give up our timeslice to someone who's doing real work + # Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" + # But the compiler doesn't know that we rarely hit this, and when we do we know the lock is + # taking a long time to be released anyways. We optimize for the common case of short + # lock intervals. That's why we're using a spinlock in the first place + # If we get here we're going to sleep in the kernel with a futex + push rdi push rsi push rax - ; Setup the syscall args - ; rdi ARG1 futex (already in rdi) - mov esi, (9 | 128) ; rsi ARG2 FUTEX_WAIT_BITSET_PRIVATE - ; rdx ARG3 ticketT.u (already in edx) - xor r10d, r10d ; r10 ARG4 NULL - mov r8, rdi ; r8 ARG5 dup rdi - xor r9d, r9d - bts r9d, eax ; r9 ARG6 mask - mov eax, 202 ; sys_futex - ; Do the syscall - lock or [rdi+12], r9d ; inform the unlocking thread we're waiting - syscall ; wait for the futex - not r9d ; convert our flag into a mask of bits not to touch - lock and [rdi+12], r9d ; clear the flag in the futex control mask - ; cleanup and continue - mov rcx, g_longwaits - inc qword [rcx] ; increment our long wait counter + .cfi_adjust_cfa_offset 24 + # Setup the syscall args + + # rdi ARG1 futex (already in rdi) + # rsi ARG2 tid (already in esi) + # rdx ARG3 ticketT.u (already in edx) + bts ecx, eax # rcx ARG4 mask + call fastlock_sleep + # cleanup and continue pop rax pop rsi - xor ecx, ecx ; Reset our loop counter - jmp .LLoop ; Get back in the game -ALIGN 16 + pop rdi + .cfi_adjust_cfa_offset -24 + xor ecx, ecx # Reset our loop counter + jmp .LLoop # Get back in the game +.ALIGN 16 .LLocked: - mov [rdi+4], esi ; lock->m_pidOwner = gettid() - inc dword [rdi+8] ; lock->m_depth++ + mov [rdi+4], esi # lock->m_pidOwner = gettid() + inc dword ptr [rdi+8] # lock->m_depth++ ret +.cfi_endproc -ALIGN 16 -global fastlock_trylock +.ALIGN 16 +.global fastlock_trylock +.type fastlock_trylock,@function fastlock_trylock: - ; RDI points to the struct: - ; uint16_t active - ; uint16_t avail - ; int32_t m_pidOwner - ; int32_t m_depth + # RDI points to the struct: + # uint16_t active + # uint16_t avail + # int32_t m_pidOwner + # int32_t m_depth - ; First get our TID and put it in ecx - push rdi ; we need our struct pointer (also balance the stack for the call) - call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining) - mov esi, eax ; back it up in esi - pop rdi ; get our pointer back + # First get our TID and put it in ecx + push rdi # we need our struct pointer (also balance the stack for the call) + call gettid # get our thread ID (TLS is nasty in ASM so don't bother inlining) + mov esi, eax # back it up in esi + pop rdi # get our pointer back - cmp [rdi+4], esi ; Is the TID we got back the owner of the lock? - je .LRecursive ; Don't spin in that case + cmp [rdi+4], esi # Is the TID we got back the owner of the lock? + je .LRecursive # Don't spin in that case - mov eax, [rdi] ; get both active and avail counters - mov ecx, eax ; duplicate in ecx - ror ecx, 16 ; swap upper and lower 16-bits - cmp eax, ecx ; are the upper and lower 16-bits the same? - jnz .LAlreadyLocked ; If not return failure + mov eax, [rdi] # get both active and avail counters + mov ecx, eax # duplicate in ecx + ror ecx, 16 # swap upper and lower 16-bits + cmp eax, ecx # are the upper and lower 16-bits the same? + jnz .LAlreadyLocked # If not return failure - ; at this point we know eax+ecx have [avail][active] and they are both the same - add ecx, 10000h ; increment avail, ecx is now our wanted value - lock cmpxchg [rdi], ecx ; If rdi still contains the value in eax, put in ecx (inc avail) - jnz .LAlreadyLocked ; If Z is not set then someone locked it while we were preparing + # at this point we know eax+ecx have [avail][active] and they are both the same + add ecx, 0x10000 # increment avail, ecx is now our wanted value + lock cmpxchg [rdi], ecx # If rdi still contains the value in eax, put in ecx (inc avail) + jnz .LAlreadyLocked # If Z is not set then someone locked it while we were preparing xor eax, eax - inc eax ; return SUCCESS! (eax=1) - mov [rdi+4], esi ; lock->m_pidOwner = gettid() - mov dword [rdi+8], eax ; lock->m_depth = 1 + inc eax # return SUCCESS! (eax=1) + mov [rdi+4], esi # lock->m_pidOwner = gettid() + mov dword ptr [rdi+8], eax # lock->m_depth = 1 ret -ALIGN 16 +.ALIGN 16 .LRecursive: xor eax, eax - inc eax ; return SUCCESS! (eax=1) - inc dword [rdi+8] ; lock->m_depth++ + inc eax # return SUCCESS! (eax=1) + inc dword ptr [rdi+8] # lock->m_depth++ ret -ALIGN 16 +.ALIGN 16 .LAlreadyLocked: - xor eax, eax ; return 0; + xor eax, eax # return 0 ret -ALIGN 16 -global fastlock_unlock +.ALIGN 16 +.global fastlock_unlock fastlock_unlock: - ; RDI points to the struct: - ; uint16_t active - ; uint16_t avail - ; int32_t m_pidOwner - ; int32_t m_depth + # RDI points to the struct: + # uint16_t active + # uint16_t avail + # int32_t m_pidOwner + # int32_t m_depth push r11 - sub dword [rdi+8], 1 ; decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state - jnz .LDone ; if depth is non-zero this is a recursive unlock, and we still hold it - mov dword [rdi+4], -1 ; pidOwner = -1 (we don't own it anymore) - mov ecx, [rdi] ; get current active (this one) - inc ecx ; bump it to the next thread - mov [rdi], cx ; give up our ticket (note: lock is not required here because the spinlock itself guards this variable) - ; At this point the lock is removed, however we must wake up any pending futexs - mov r9d, 1 ; eax is the bitmask for 2 threads - rol r9d, cl ; place the mask in the right spot for the next 2 threads -ALIGN 16 + sub dword ptr [rdi+8], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state + jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it + mov dword ptr [rdi+4], -1 # pidOwner = -1 (we don't own it anymore) + mov ecx, [rdi] # get current active (this one) + inc ecx # bump it to the next thread + mov [rdi], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + # At this point the lock is removed, however we must wake up any pending futexs + mov r9d, 1 # eax is the bitmask for 2 threads + rol r9d, cl # place the mask in the right spot for the next 2 threads +.ALIGN 16 .LRetryWake: - mov r11d, [rdi+12] ; load the futex mask - and r11d, r9d ; are any threads waiting on a futex? - jz .LDone ; if not we're done. - ; we have to wake the futexs - ; rdi ARG1 futex (already in rdi) - mov esi, (10 | 128) ; rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE - mov edx, 0x7fffffff ; rdx ARG3 INT_MAX (number of threads to wake) - xor r10d, r10d ; r10 ARG4 NULL - mov r8, rdi ; r8 ARG5 dup rdi - ; r9 ARG6 mask (already set above) - mov eax, 202 ; sys_futex + mov r11d, [rdi+12] # load the futex mask + and r11d, r9d # are any threads waiting on a futex? + jz .LDone # if not we're done. + # we have to wake the futexs + # rdi ARG1 futex (already in rdi) + mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE + mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake) + xor r10d, r10d # r10 ARG4 NULL + mov r8, rdi # r8 ARG5 dup rdi + # r9 ARG6 mask (already set above) + mov eax, 202 # sys_futex syscall - cmp eax, 1 ; did we wake as many as we expected? + cmp eax, 1 # did we wake as many as we expected? jnz .LRetryWake .LDone: pop r11 diff --git a/src/networking.cpp b/src/networking.cpp index 636e95c62..d32912bd8 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -116,7 +116,7 @@ client *createClient(int fd, int iel) { uint64_t client_id; client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; - fastlock_init(&c->lock); + fastlock_init(&c->lock, "client"); c->id = client_id; c->resp = 2; c->fd = fd; @@ -248,7 +248,11 @@ void clientInstallAsyncWriteHandler(client *c) { int prepareClientToWrite(client *c, bool fAsync) { fAsync = fAsync && !FCorrectThread(c); // Not async if we're on the right thread serverAssert(FCorrectThread(c) || fAsync); - serverAssert(c->fd <= 0 || c->lock.fOwnLock()); + if (FCorrectThread(c)) { + serverAssert(c->fd <= 0 || c->lock.fOwnLock()); + } else { + serverAssert(GlobalLocksAcquired()); + } if (c->flags & CLIENT_FORCE_REPLY) return C_OK; // FORCE REPLY means we're doing something else with the buffer. // do not install a write handler @@ -1509,7 +1513,6 @@ int writeToClient(int fd, client *c, int handler_installed) { } else { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); - lock.unlock(); freeClientAsync(c); return C_ERR; @@ -1528,7 +1531,6 @@ int writeToClient(int fd, client *c, int handler_installed) { /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { - lock.unlock(); freeClientAsync(c); return C_ERR; } @@ -3000,6 +3002,12 @@ int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; + client *c = serverTL->current_client; + if (c != nullptr) + { + serverAssert(c->flags & CLIENT_PROTECTED); + c->lock.unlock(); + } aeReleaseLock(); while (iterations--) { int events = 0; @@ -3008,7 +3016,11 @@ int processEventsWhileBlocked(int iel) { if (!events) break; count += events; } - aeAcquireLock(); + AeLocker locker; + if (c != nullptr) + c->lock.lock(); + locker.arm(c); + locker.release(); return count; } diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 17866bec1..4f59c0a15 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -173,6 +173,8 @@ typedef struct redisConfig { sds appendonly; } redisConfig; +int g_fInCrash = false; + /* Prototypes */ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask); static void createMissingClients(client c); diff --git a/src/redis-cli.c b/src/redis-cli.c index eae4a1d0e..ea920569d 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -90,6 +90,8 @@ int spectrum_palette_mono[] = {0,233,234,235,237,239,241,243,245,247,249,251,253 int *spectrum_palette; int spectrum_palette_size; +int g_fInCrash = 0; + /*------------------------------------------------------------------------------ * Utility functions *--------------------------------------------------------------------------- */ diff --git a/src/replication.cpp b/src/replication.cpp index eb8dec503..d2f948567 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -384,7 +384,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - std::unique_locklock)> lock(replica->lock); + if (replica->flags & CLIENT_CLOSE_ASAP) continue; + std::unique_locklock)> lock(replica->lock, std::defer_lock); + // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() + if (FCorrectThread(replica)) + lock.lock(); if (serverTL->current_client && FSameHost(serverTL->current_client, replica)) { replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; @@ -433,7 +437,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *replica = (client*)ln->value; - std::lock_guardlock)> ulock(replica->lock); + std::unique_locklock)> ulock(replica->lock, std::defer_lock); + if (FCorrectThread(replica)) + ulock.lock(); if (FMasterHost(replica)) continue; // Active Active case, don't feed back @@ -482,7 +488,10 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors,&li); while((ln = listNext(&li))) { client *monitor = (client*)ln->value; - std::lock_guardlock)> lock(monitor->lock); + std::unique_locklock)> lock(monitor->lock, std::defer_lock); + // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() + if (FCorrectThread(c)) + lock.lock(); addReplyAsync(monitor,cmdobj); } decrRefCount(cmdobj); @@ -1205,7 +1214,21 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } else { - aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica]{ + aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { + // Because the client could have been closed while the lambda waited to run we need to + // verify the replica is still connected + listIter li; + listNode *ln; + listRewind(g_pserver->slaves,&li); + bool fFound = false; + while ((ln = listNext(&li))) { + if (listNodeValue(ln) == replica) { + fFound = true; + break; + } + } + if (!fFound) + return; aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE); if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) { freeClient(replica); @@ -3378,4 +3401,4 @@ void updateMasterAuth() if (cserver.default_masteruser) mi->masteruser = zstrdup(cserver.default_masteruser); } -} \ No newline at end of file +} diff --git a/src/server.cpp b/src/server.cpp index c33ff389b..0f0dfe122 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2878,7 +2878,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) exit(1); } - fastlock_init(&pvar->lockPendingWrite); + fastlock_init(&pvar->lockPendingWrite, "lockPendingWrite"); if (!fMain) { @@ -2925,8 +2925,6 @@ void initServer(void) { signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); - fastlock_init(&g_pserver->flock); - g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL); /* Create the Redis databases, and initialize other internal state. */ @@ -3706,7 +3704,6 @@ int processCommand(client *c, int callFlags) { queueMultiCommand(c); addReply(c,shared.queued); } else { - std::unique_lockdb->lock)> ulock(c->db->lock); call(c,callFlags); c->woff = g_pserver->master_repl_offset; if (listLength(g_pserver->ready_keys)) diff --git a/src/server.h b/src/server.h index cd2734a1d..276c939c3 100644 --- a/src/server.h +++ b/src/server.h @@ -1057,8 +1057,6 @@ typedef struct redisDb { long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ - - fastlock lock; } redisDb; /* Client MULTI/EXEC state */ @@ -1437,7 +1435,7 @@ struct redisServerThreadVars { client blocked on a module command needs to be processed. */ client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ - struct fastlock lockPendingWrite; + struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; }; @@ -1819,8 +1817,6 @@ struct redisServer { int fActiveReplica; /* Can this replica also be a master? */ - struct fastlock flock; - // Format: // Lower 20 bits: a counter incrementing for each command executed in the same millisecond // Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition @@ -2799,7 +2795,7 @@ void xorDigest(unsigned char *digest, const void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, const char *strflags); int moduleGILAcquiredByModule(void); -extern bool g_fInCrash; +extern int g_fInCrash; static inline int GlobalLocksAcquired(void) // Used in asserts to verify all global locks are correctly acquired for a server-thread to operate { return aeThreadOwnsLock() || moduleGILAcquiredByModule() || g_fInCrash; diff --git a/tests/integration/logging.tcl b/tests/integration/logging.tcl index c1f4854d4..9c8cbe8ba 100644 --- a/tests/integration/logging.tcl +++ b/tests/integration/logging.tcl @@ -6,7 +6,7 @@ if {$system_name eq {linux} || $system_name eq {darwin}} { test "Server is able to generate a stack trace on selected systems" { r config set watchdog-period 200 r debug sleep 1 - set pattern "*debugCommand*" + set pattern "*watchdogSignalHandler*" set retry 10 while {$retry} { set result [exec tail -100 < [srv 0 stdout]]