diff --git a/.travis.yml b/.travis.yml index 90d8fc61c..8d20ca462 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,11 +4,11 @@ matrix: include: - os: linux script: make - env: COMPILER_NAME=gcc CXX=g++-5 CC=gcc-5 + env: COMPILER_NAME=g++-6 CXX=g++-6 CC=gcc-6 addons: apt: packages: - - g++-5 + - g++-6 - nasm - uuid-dev sources: &sources @@ -16,10 +16,11 @@ matrix: - ubuntu-toolchain-r-test - os: linux script: make MALLOC=libc - env: COMPILER_NAME=clang CXX=clang++-3.8 CC=clang-3.8 CXXFLAGS="-stdlib=libc++" LDFLAGS="-stdlib=libc++" + env: COMPILER_NAME=clang CXX=clang++-3.8 CC=clang-3.8 CXXFLAGS="-I/usr/include/libcxxabi/" LDFLAGS="-lc++" addons: apt: packages: + - libc++abi-dev - clang-3.8 - libc++-dev - libc++abi-dev diff --git a/README.md b/README.md index 13c2de99e..1f23f2c04 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,11 @@ ![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg) [![Build Status](https://travis-ci.org/JohnSully/KeyDB.svg?branch=unstable)](https://travis-ci.org/JohnSully/KeyDB) [![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) +[![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc) What is KeyDB? -------------- -KeyDB is a high performance fork of Redis focusing on multithreading, memory efficiency, and high throughput. In addition to multithreading KeyDB also has features only available in Redis Enterprise such as [Active Replication](https://github.com/JohnSully/KeyDB/wiki/KeyDB-(Redis-Fork):-Active-Replica-Support), [FLASH storage](https://github.com/JohnSully/KeyDB/wiki/FLASH-Storage) support, and some not available at all such as direct backup to AWS S3. +KeyDB is a high performance fork of Redis focusing on multithreading, memory efficiency, and high throughput. In addition to multithreading KeyDB also has features only available in Redis Enterprise such as [Active Replication](https://github.com/JohnSully/KeyDB/wiki/Active-Replication), [FLASH storage](https://github.com/JohnSully/KeyDB/wiki/FLASH-Storage) support, and some not available at all such as direct backup to AWS S3. On the same hardware KeyDB can perform twice as many queries per second as Redis, with 60% lower latency. @@ -14,12 +15,14 @@ Try our docker container: https://hub.docker.com/r/eqalpha/keydb Talk on Gitter: https://gitter.im/KeyDB +[Subscribe to the KeyDB mailing list](https://eqalpha.us20.list-manage.com/subscribe/post?u=978f486c2f95589b24591a9cc&id=4ab9220500) + Management GUI: We recommend [FastoNoSQL](https://fastonosql.com/) which has official KeyDB support. New: Active Replica Support --------------------------- -New! KeyDB now has support for Active Replicas. This feature greatly simplifies hot-spare failover and allows you to distribute writes over replicas instead of just a single master. For more information [see the wiki page](https://github.com/JohnSully/KeyDB/wiki/KeyDB-(Redis-Fork):-Active-Replica-Support). +New! KeyDB now has support for Active Replicas. This feature greatly simplifies hot-spare failover and allows you to distribute writes over replicas instead of just a single master. For more information [see the wiki page](https://github.com/JohnSully/KeyDB/wiki/Active-Replication). Why fork Redis? --------------- @@ -249,4 +252,3 @@ source distribution. Please see the CONTRIBUTING file in this source distribution for more information. - diff --git a/deps/hiredis/read.c b/deps/hiredis/read.c index c75c3435f..cc0f3cc72 100644 --- a/deps/hiredis/read.c +++ b/deps/hiredis/read.c @@ -31,6 +31,7 @@ #include "fmacros.h" #include +#include #include #ifndef _MSC_VER #include diff --git a/redis.conf b/redis.conf index 99f66a667..2a10eabe5 100644 --- a/redis.conf +++ b/redis.conf @@ -942,13 +942,7 @@ aof-use-rdb-preamble yes lua-time-limit 5000 ################################ REDIS CLUSTER ############################### -# -# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -# WARNING EXPERIMENTAL: Redis Cluster is considered to be stable code, however -# in order to mark it as "mature" we need to wait for a non trivial percentage -# of users to deploy it in production. -# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -# + # Normal Redis instances can't be part of a Redis Cluster; only nodes that are # started as cluster nodes can. In order to start a Redis instance as a # cluster node enable the cluster support uncommenting the following: diff --git a/runtest-moduleapi b/runtest-moduleapi new file mode 100755 index 000000000..84cdb9bb8 --- /dev/null +++ b/runtest-moduleapi @@ -0,0 +1,16 @@ +#!/bin/sh +TCL_VERSIONS="8.5 8.6" +TCLSH="" + +for VERSION in $TCL_VERSIONS; do + TCL=`which tclsh$VERSION 2>/dev/null` && TCLSH=$TCL +done + +if [ -z $TCLSH ] +then + echo "You need tcl 8.5 or newer in order to run the Redis test" + exit 1 +fi + +make -C tests/modules && \ +$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}" diff --git a/src/Makefile b/src/Makefile index 80b9cef9d..2966ec471 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -STD=-std=c99 -pedantic -DREDIS_STATIC='' +STD=-std=c11 -pedantic -DREDIS_STATIC='' CXX_STD=-std=c++14 -pedantic -fno-rtti -D__STDC_FORMAT_MACROS ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring FreeBSD,$(uname_S))) @@ -140,6 +140,7 @@ else # All the other OSes (notably Linux) FINAL_LDFLAGS+= -rdynamic FINAL_LIBS+=-ldl -pthread -lrt -luuid + FINAL_CFLAGS += -DMOTD endif endif endif @@ -197,11 +198,11 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli -REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o $(ASM_OBJ) +REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark -REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o siphash.o redis-benchmark.o storage-lite.o fastlock.o $(ASM_OBJ) +REDIS_BENCHMARK_OBJ=ae.o anet.o redis-benchmark.o adlist.o dict.o zmalloc.o siphash.o redis-benchmark.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_CHECK_RDB_NAME=keydb-check-rdb REDIS_CHECK_AOF_NAME=keydb-check-aof @@ -264,7 +265,7 @@ $(REDIS_CHECK_AOF_NAME): $(REDIS_SERVER_NAME) # keydb-cli $(REDIS_CLI_NAME): $(REDIS_CLI_OBJ) - $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o $(FINAL_LIBS) + $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/linenoise/linenoise.o $(FINAL_LIBS) -lcurl # keydb-benchmark $(REDIS_BENCHMARK_NAME): $(REDIS_BENCHMARK_OBJ) diff --git a/src/acl.cpp b/src/acl.cpp index d7d352d42..259d9fb61 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -542,6 +542,8 @@ struct redisCommand *ACLLookupCommand(const char *name) { * and command ID. */ void ACLResetSubcommandsForCommand(user *u, unsigned long id) { if (u->allowed_subcommands && u->allowed_subcommands[id]) { + for (int i = 0; u->allowed_subcommands[id][i]; i++) + sdsfree(u->allowed_subcommands[id][i]); zfree(u->allowed_subcommands[id]); u->allowed_subcommands[id] = NULL; } diff --git a/src/ae.cpp b/src/ae.cpp index 99f09d49f..3f1ec9260 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -803,9 +803,9 @@ void aeAcquireLock() g_lock.lock(); } -int aeTryAcquireLock() +int aeTryAcquireLock(int fWeak) { - return g_lock.try_lock(); + return g_lock.try_lock(!!fWeak); } void aeReleaseLock() diff --git a/src/ae.h b/src/ae.h index f08c49dd8..14ccc3dc8 100644 --- a/src/ae.h +++ b/src/ae.h @@ -159,7 +159,7 @@ int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); void aeAcquireLock(); -int aeTryAcquireLock(); +int aeTryAcquireLock(int fWeak); void aeReleaseLock(); int aeThreadOwnsLock(); diff --git a/src/aelocker.h b/src/aelocker.h new file mode 100644 index 000000000..d5c8186bf --- /dev/null +++ b/src/aelocker.h @@ -0,0 +1,69 @@ +#pragma once + +class AeLocker +{ + bool m_fArmed = false; + +public: + AeLocker() + { + } + + void arm(client *c) // if a client is passed, then the client is already locked + { + if (c != nullptr) + { + serverAssert(!m_fArmed); + serverAssert(c->lock.fOwnLock()); + + if (!aeTryAcquireLock(true /*fWeak*/)) // avoid locking the client if we can + { + bool fOwnClientLock = true; + int clientNesting = 1; + for (;;) + { + if (fOwnClientLock) + { + clientNesting = c->lock.unlock_recursive(); + fOwnClientLock = false; + } + aeAcquireLock(); + if (!c->lock.try_lock(false)) // ensure a strong try because aeAcquireLock is expensive + { + aeReleaseLock(); + } + else + { + break; + } + } + c->lock.lock_recursive(clientNesting); + } + + m_fArmed = true; + } + else if (!m_fArmed) + { + m_fArmed = true; + aeAcquireLock(); + } + } + + void disarm() + { + serverAssert(m_fArmed); + m_fArmed = false; + aeReleaseLock(); + } + + bool isArmed() const + { + return m_fArmed; + } + + ~AeLocker() + { + if (m_fArmed) + aeReleaseLock(); + } +}; \ No newline at end of file diff --git a/src/aof.cpp b/src/aof.cpp index 19c6c4a12..c7160489b 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -199,6 +199,12 @@ ssize_t aofRewriteBufferWrite(int fd) { * AOF file implementation * ------------------------------------------------------------------------- */ +/* Return true if an AOf fsync is currently already in progress in a + * BIO thread. */ +int aofFsyncInProgress(void) { + return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; +} + /* Starts a background task that performs fsync() against the specified * file descriptor (the one of the AOF file) in another thread. */ void aof_background_fsync(int fd) { @@ -337,10 +343,24 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; - if (sdslen(g_pserver->aof_buf) == 0) return; + if (sdslen(g_pserver->aof_buf) == 0) { + /* Check if we need to do fsync even the aof buffer is empty, + * because previously in AOF_FSYNC_EVERYSEC mode, fsync is + * called only when aof buffer is not empty, so if users + * stop write commands before fsync called in one second, + * the data in page cache cannot be flushed in time. */ + if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && + g_pserver->aof_fsync_offset != g_pserver->aof_current_size && + g_pserver->unixtime > g_pserver->aof_last_fsync && + !(sync_in_progress = aofFsyncInProgress())) { + goto try_fsync; + } else { + return; + } + } if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC) - sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; + sync_in_progress = aofFsyncInProgress(); if (g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. @@ -472,6 +492,7 @@ void flushAppendOnlyFile(int force) { g_pserver->aof_buf = sdsempty(); } +try_fsync: /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (g_pserver->aof_no_fsync_on_rewrite && @@ -486,10 +507,14 @@ void flushAppendOnlyFile(int force) { redis_fsync(g_pserver->aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); + g_pserver->aof_fsync_offset = g_pserver->aof_current_size; g_pserver->aof_last_fsync = g_pserver->unixtime; } else if ((g_pserver->aof_fsync == AOF_FSYNC_EVERYSEC && g_pserver->unixtime > g_pserver->aof_last_fsync)) { - if (!sync_in_progress) aof_background_fsync(g_pserver->aof_fd); + if (!sync_in_progress) { + aof_background_fsync(g_pserver->aof_fd); + g_pserver->aof_fsync_offset = g_pserver->aof_current_size; + } g_pserver->aof_last_fsync = g_pserver->unixtime; } } @@ -703,6 +728,7 @@ int loadAppendOnlyFile(char *filename) { * operation is received. */ if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { g_pserver->aof_current_size = 0; + g_pserver->aof_fsync_offset = g_pserver->aof_current_size; fclose(fp); return C_ERR; } @@ -726,7 +752,7 @@ int loadAppendOnlyFile(char *filename) { serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; - rioInitWithFile(&rdb,fileno(fp)); + rioInitWithFile(&rdb,fp); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (rdbLoadRio(&rdb,&rsi,1) != C_OK) { serverLog(LL_WARNING,"Error reading the RDB preamble of the AOF file, AOF loading aborted"); @@ -842,6 +868,7 @@ int loadAppendOnlyFile(char *filename) { stopLoading(); aofUpdateCurrentSize(); g_pserver->aof_rewrite_base_size = g_pserver->aof_current_size; + g_pserver->aof_fsync_offset = g_pserver->aof_current_size; return C_OK; readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */ @@ -1373,7 +1400,7 @@ int rewriteAppendOnlyFile(char *filename) { } g_pserver->aof_child_diff = sdsempty(); - rioInitWithFile(&aof,fileno(fp)); + rioInitWithFile(&aof,fp); if (g_pserver->aof_rewrite_incremental_fsync) rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); diff --git a/src/bitops.cpp b/src/bitops.cpp index 98e8b9bc7..02034f377 100644 --- a/src/bitops.cpp +++ b/src/bitops.cpp @@ -995,12 +995,18 @@ void bitfieldCommand(client *c) { /* Lookup for read is ok if key doesn't exit, but errors * if it's not a string. */ o = lookupKeyRead(c->db,c->argv[1]); - if (o != nullptr && checkType(c,o,OBJ_STRING)) return; + if (o != nullptr && checkType(c,o,OBJ_STRING)) { + zfree(ops); + return; + } } else { /* Lookup by making room up to the farest bit reached by * this operation. */ if ((o = lookupStringForBitCommand(c, - highest_write_offset)) == nullptr) return; + highest_write_offset)) == nullptr) { + zfree(ops); + return; + } } addReplyArrayLen(c,numops); diff --git a/src/blocked.cpp b/src/blocked.cpp index 1f807dac3..c09488b1f 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -64,6 +64,7 @@ */ #include "server.h" +#include int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where); @@ -180,6 +181,7 @@ void queueClientForReprocessing(client *c) { * of operation the client is blocking for. */ void unblockClient(client *c) { serverAssert(GlobalLocksAcquired()); + serverAssert(c->lock.fOwnLock()); if (c->btype == BLOCKED_LIST || c->btype == BLOCKED_ZSET || c->btype == BLOCKED_STREAM) { @@ -301,6 +303,7 @@ void handleClientsBlockedOnKeys(void) { while(numclients--) { listNode *clientnode = listFirst(clients); client *receiver = (client*)clientnode->value; + std::unique_lock lock(receiver->lock); if (receiver->btype != BLOCKED_LIST) { /* Put at the tail, so that at the next call diff --git a/src/config.cpp b/src/config.cpp index badedcee4..fd284e5d8 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -864,6 +864,8 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0], "version-override") && argc == 2) { KEYDB_SET_VERSION = zstrdup(argv[1]); serverLog(LL_WARNING, "Warning version is overriden to: %s\n", KEYDB_SET_VERSION); + } else if (!strcasecmp(argv[0],"testmode") && argc == 2){ + g_fTestMode = yesnotoi(argv[1]); } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } diff --git a/src/debug.cpp b/src/debug.cpp index 4e588a254..f4791f593 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -706,7 +706,7 @@ void _serverAssertPrintClientInfo(const client *c) { bugReportStart(); serverLog(LL_WARNING,"=== ASSERTION FAILED CLIENT CONTEXT ==="); - serverLog(LL_WARNING,"client->flags = %d", c->flags); + serverLog(LL_WARNING,"client->flags = %d", static_cast(c->flags)); serverLog(LL_WARNING,"client->fd = %d", c->fd); serverLog(LL_WARNING,"client->argc = %d", c->argc); for (j=0; j < c->argc; j++) { diff --git a/src/defrag.cpp b/src/defrag.cpp index 64f7a3bb2..2e9abd290 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -47,7 +47,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); -dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged); +dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); /* Defrag helper for generic allocations. * @@ -353,7 +353,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { sdsele = (sds)ln->value; if ((newsds = activeDefragSds(sdsele))) { /* When defragging an sds value, we need to update the dict key */ - unsigned int hash = dictGetHash(d, sdsele); + uint64_t hash = dictGetHash(d, sdsele); replaceSateliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged); ln->value = newsds; defragged++; @@ -390,7 +390,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { * moved. Return value is the the dictEntry if found, or NULL if not found. * NOTE: this is very ugly code, but it let's us avoid the complication of * doing a scan on another dict. */ -dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged) { +dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) { dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); if (deref) { dictEntry *de = *deref; diff --git a/src/evict.cpp b/src/evict.cpp index f7b99f389..4be6bf761 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -78,7 +78,7 @@ unsigned int getLRUClock(void) { unsigned int LRU_CLOCK(void) { unsigned int lruclock; if (1000/g_pserver->hz <= LRU_CLOCK_RESOLUTION) { - atomicGet(g_pserver->lruclock,lruclock); + lruclock = g_pserver->lruclock; } else { lruclock = getLRUClock(); } diff --git a/src/fastlock.cpp b/src/fastlock.cpp index f1e13a279..33de19866 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -36,6 +36,10 @@ #include #include #include +#ifdef __linux__ +#include +#endif +#include #ifdef __APPLE__ #include @@ -49,6 +53,10 @@ #endif #endif +#ifndef UNUSED +#define UNUSED(x) ((void)x) +#endif + /**************************************************** * * Implementation of a fair spinlock. To promote fairness we @@ -64,6 +72,16 @@ uint64_t fastlock_getlongwaitcount() return g_longwaits; } +#ifndef ASM_SPINLOCK +#ifdef __linux__ +static int futex(volatile unsigned *uaddr, int futex_op, int val, + const struct timespec *timeout, int val3) +{ + return syscall(SYS_futex, uaddr, futex_op, val, + timeout, uaddr, val3); +} +#endif +#endif extern "C" pid_t gettid() { @@ -88,6 +106,7 @@ extern "C" void fastlock_init(struct fastlock *lock) lock->m_ticket.m_avail = 0; lock->m_depth = 0; lock->m_pidOwner = -1; + lock->futex = 0; } #ifndef ASM_SPINLOCK @@ -100,18 +119,25 @@ extern "C" void fastlock_lock(struct fastlock *lock) } unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); - +#ifdef __linux__ + unsigned mask = (1U << (myticket % 32)); +#endif int cloops = 0; - while (__atomic_load_2(&lock->m_ticket.m_active, __ATOMIC_ACQUIRE) != myticket) + ticket ticketT; + while (((ticketT.u = __atomic_load_4(&lock->m_ticket.m_active, __ATOMIC_ACQUIRE)) & 0xffff) != myticket) { +#if defined(__i386__) || defined(__amd64__) + __asm__ ("pause"); +#endif if ((++cloops % 1024*1024) == 0) { - sched_yield(); +#ifdef __linux__ + __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); + futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, ticketT.u, nullptr, mask); + __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); +#endif ++g_longwaits; } -#if defined(__i386__) || defined(__amd64__) - __asm__ ("pause"); -#endif } lock->m_depth = 1; @@ -119,7 +145,7 @@ extern "C" void fastlock_lock(struct fastlock *lock) std::atomic_thread_fence(std::memory_order_acquire); } -extern "C" int fastlock_trylock(struct fastlock *lock) +extern "C" int fastlock_trylock(struct fastlock *lock, int fWeak) { if ((int)__atomic_load_4(&lock->m_pidOwner, __ATOMIC_ACQUIRE) == gettid()) { @@ -134,9 +160,9 @@ extern "C" int fastlock_trylock(struct fastlock *lock) uint16_t active = __atomic_load_2(&lock->m_ticket.m_active, __ATOMIC_RELAXED); uint16_t next = active + 1; - struct ticket ticket_expect { active, active }; - struct ticket ticket_setiflocked { active, next }; - if (__atomic_compare_exchange(&lock->m_ticket, &ticket_expect, &ticket_setiflocked, false /*weak*/, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) + struct ticket ticket_expect { { { active, active } } }; + struct ticket ticket_setiflocked { { { active, next } } }; + if (__atomic_compare_exchange(&lock->m_ticket, &ticket_expect, &ticket_setiflocked, fWeak /*weak*/, __ATOMIC_ACQUIRE, __ATOMIC_RELAXED)) { lock->m_depth = 1; __atomic_store_4(&lock->m_pidOwner, gettid(), __ATOMIC_RELEASE); @@ -145,6 +171,24 @@ extern "C" int fastlock_trylock(struct fastlock *lock) return false; } +#ifdef __linux__ +#define ROL32(v, shift) ((v << shift) | (v >> (32-shift))) +void unlock_futex(struct fastlock *lock, uint16_t ifutex) +{ + unsigned mask = (1U << (ifutex % 32)); + unsigned futexT = __atomic_load_4(&lock->futex, __ATOMIC_RELAXED) & mask; + + if (futexT == 0) + return; + + while (__atomic_load_4(&lock->futex, __ATOMIC_ACQUIRE) & mask) + { + if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1) + break; + } +} +#endif + extern "C" void fastlock_unlock(struct fastlock *lock) { --lock->m_depth; @@ -152,8 +196,13 @@ extern "C" void fastlock_unlock(struct fastlock *lock) { assert((int)__atomic_load_4(&lock->m_pidOwner, __ATOMIC_RELAXED) >= 0); // unlock after free lock->m_pidOwner = -1; - std::atomic_thread_fence(std::memory_order_acquire); - __atomic_fetch_add(&lock->m_ticket.m_active, 1, __ATOMIC_ACQ_REL); // on x86 the atomic is not required here, but ASM handles that case + std::atomic_thread_fence(std::memory_order_release); + uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE); // on x86 the atomic is not required here, but ASM handles that case +#ifdef __linux__ + unlock_futex(lock, activeNew); +#else + UNUSED(activeNew); +#endif } } #endif @@ -171,3 +220,17 @@ bool fastlock::fOwnLock() { return gettid() == m_pidOwner; } + +int fastlock_unlock_recursive(struct fastlock *lock) +{ + int rval = lock->m_depth; + lock->m_depth = 1; + fastlock_unlock(lock); + return rval; +} + +void fastlock_lock_recursive(struct fastlock *lock, int nesting) +{ + fastlock_lock(lock); + lock->m_depth = nesting; +} \ No newline at end of file diff --git a/src/fastlock.h b/src/fastlock.h index b8027de54..c7a40bdf3 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -9,9 +9,11 @@ extern "C" { struct fastlock; void fastlock_init(struct fastlock *lock); void fastlock_lock(struct fastlock *lock); -int fastlock_trylock(struct fastlock *lock); +int fastlock_trylock(struct fastlock *lock, int fWeak); void fastlock_unlock(struct fastlock *lock); void fastlock_free(struct fastlock *lock); +int fastlock_unlock_recursive(struct fastlock *lock); +void fastlock_lock_recursive(struct fastlock *lock, int nesting); uint64_t fastlock_getlongwaitcount(); // this is a global value @@ -20,17 +22,29 @@ uint64_t fastlock_getlongwaitcount(); // this is a global value } #endif +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpedantic" struct ticket { - uint16_t m_active; - uint16_t m_avail; + union + { + struct + { + uint16_t m_active; + uint16_t m_avail; + }; + unsigned u; + }; }; +#pragma GCC diagnostic pop + struct fastlock { volatile struct ticket m_ticket; volatile int m_pidOwner; volatile int m_depth; + unsigned futex; #ifdef __cplusplus fastlock() @@ -43,9 +57,9 @@ struct fastlock fastlock_lock(this); } - bool try_lock() + bool try_lock(bool fWeak = false) { - return !!fastlock_trylock(this); + return !!fastlock_trylock(this, fWeak); } void unlock() @@ -53,6 +67,16 @@ struct fastlock fastlock_unlock(this); } + int unlock_recursive() + { + return fastlock_unlock_recursive(this); + } + + void lock_recursive(int nesting) + { + fastlock_lock_recursive(this, nesting); + } + bool fOwnLock(); // true if this thread owns the lock, NOTE: not 100% reliable, use for debugging only #endif }; diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 7645d3baa..baf33654f 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -22,7 +22,7 @@ fastlock_lock: push rdi ; we need our struct pointer (also balance the stack for the call) call gettid ; get our thread ID (TLS is nasty in ASM so don't bother inlining) mov esi, eax ; back it up in esi - mov rdi, [rsp] ; get our pointer back + pop rdi ; get our pointer back cmp [rdi+4], esi ; Is the TID we got back the owner of the lock? je .LLocked ; Don't spin in that case @@ -30,11 +30,11 @@ fastlock_lock: xor eax, eax ; eliminate partial register dependency inc eax ; we want to add one lock xadd [rdi+2], ax ; do the xadd, ax contains the value before the addition - ; eax now contains the ticket - xor ecx, ecx + ; ax now contains the ticket ALIGN 16 .LLoop: - cmp [rdi], ax ; is our ticket up? + mov edx, [rdi] + cmp dx, ax ; is our ticket up? je .LLocked ; leave the loop pause add ecx, 1000h ; Have we been waiting a long time? (oflow if we have) @@ -44,22 +44,34 @@ ALIGN 16 ; But the compiler doesn't know that we rarely hit this, and when we do we know the lock is ; taking a long time to be released anyways. We optimize for the common case of short ; lock intervals. That's why we're using a spinlock in the first place + ; If we get here we're going to sleep in the kernel with a futex push rsi push rax - mov rax, 24 ; sys_sched_yield - syscall ; give up our timeslice we'll be here a while - pop rax - pop rsi + ; Setup the syscall args + ; rdi ARG1 futex (already in rdi) + mov esi, (9 | 128) ; rsi ARG2 FUTEX_WAIT_BITSET_PRIVATE + ; rdx ARG3 ticketT.u (already in edx) + xor r10d, r10d ; r10 ARG4 NULL + mov r8, rdi ; r8 ARG5 dup rdi + xor r9d, r9d + bts r9d, eax ; r9 ARG6 mask + mov eax, 202 ; sys_futex + ; Do the syscall + lock or [rdi+12], r9d ; inform the unlocking thread we're waiting + syscall ; wait for the futex + not r9d ; convert our flag into a mask of bits not to touch + lock and [rdi+12], r9d ; clear the flag in the futex control mask + ; cleanup and continue mov rcx, g_longwaits inc qword [rcx] ; increment our long wait counter - mov rdi, [rsp] ; our struct pointer is on the stack already + pop rax + pop rsi xor ecx, ecx ; Reset our loop counter jmp .LLoop ; Get back in the game ALIGN 16 .LLocked: mov [rdi+4], esi ; lock->m_pidOwner = gettid() inc dword [rdi+8] ; lock->m_depth++ - add rsp, 8 ; fix stack ret ALIGN 16 @@ -114,9 +126,32 @@ fastlock_unlock: ; uint16_t avail ; int32_t m_pidOwner ; int32_t m_depth + push r11 sub dword [rdi+8], 1 ; decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state jnz .LDone ; if depth is non-zero this is a recursive unlock, and we still hold it mov dword [rdi+4], -1 ; pidOwner = -1 (we don't own it anymore) - inc word [rdi] ; give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + mov ecx, [rdi] ; get current active (this one) + inc ecx ; bump it to the next thread + mov [rdi], cx ; give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + ; At this point the lock is removed, however we must wake up any pending futexs + mov r9d, 1 ; eax is the bitmask for 2 threads + rol r9d, cl ; place the mask in the right spot for the next 2 threads +ALIGN 16 +.LRetryWake: + mov r11d, [rdi+12] ; load the futex mask + and r11d, r9d ; are any threads waiting on a futex? + jz .LDone ; if not we're done. + ; we have to wake the futexs + ; rdi ARG1 futex (already in rdi) + mov esi, (10 | 128) ; rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE + mov edx, 0x7fffffff ; rdx ARG3 INT_MAX (number of threads to wake) + xor r10d, r10d ; r10 ARG4 NULL + mov r8, rdi ; r8 ARG5 dup rdi + ; r9 ARG6 mask (already set above) + mov eax, 202 ; sys_futex + syscall + cmp eax, 1 ; did we wake as many as we expected? + jnz .LRetryWake .LDone: + pop r11 ret diff --git a/src/module.cpp b/src/module.cpp index 04ca21a97..568205f90 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -763,6 +763,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->usedby = listCreate(); module->usingMods = listCreate(); module->filters = listCreate(); + module->in_call = 0; ctx->module = module; } @@ -3786,14 +3787,7 @@ void moduleHandleBlockedClients(int iel) { * replies to send to the client in a thread safe context. * We need to glue such replies to the client output buffer and * free the temporary client we just used for the replies. */ - if (c) { - if (bc->reply_client->bufpos) - addReplyProto(c,bc->reply_client->buf, - bc->reply_client->bufpos); - if (listLength(bc->reply_client->reply)) - listJoin(c->reply,bc->reply_client->reply); - c->reply_bytes += bc->reply_client->reply_bytes; - } + if (c) AddReplyFromClient(c, bc->reply_client); freeClient(bc->reply_client); if (c != NULL) { @@ -3812,7 +3806,7 @@ void moduleHandleBlockedClients(int iel) { AssertCorrectThread(c); fastlock_lock(&g_pserver->rgthreadvar[c->iel].lockPendingWrite); - listAddNodeHead(g_pserver->rgthreadvar[c->iel].clients_pending_write,c); + g_pserver->rgthreadvar[c->iel].clients_pending_write.push_back(c); fastlock_unlock(&g_pserver->rgthreadvar[c->iel].lockPendingWrite); } } @@ -3917,7 +3911,10 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { * in order to keep things like the currently selected database and similar * things. */ ctx->client = createClient(-1, IDX_EVENT_LOOP_MAIN); - if (bc) selectDb(ctx->client,bc->dbid); + if (bc) { + selectDb(ctx->client,bc->dbid); + ctx->client->id = bc->client->id; + } return ctx; } @@ -4991,6 +4988,12 @@ int RM_UnregisterCommandFilter(RedisModuleCtx *ctx, RedisModuleCommandFilter *fi return REDISMODULE_OK; } +int moduleHasCommandFilters() +{ + // Note: called outside the global lock + return listLength(moduleCommandFilters); +} + void moduleCallCommandFilters(client *c) { if (listLength(moduleCommandFilters) == 0) return; diff --git a/src/modules/Makefile b/src/modules/Makefile index 537aa0daf..4f6b50f2e 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -13,7 +13,7 @@ endif .SUFFIXES: .c .so .xo .o -all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so hellofilter.so +all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so .c.xo: $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ @@ -47,11 +47,6 @@ hellodict.xo: ../redismodule.h hellodict.so: hellodict.xo -hellofilter.xo: ../redismodule.h - -hellofilter.so: hellofilter.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc - testmodule.xo: ../redismodule.h testmodule.so: testmodule.xo diff --git a/src/modules/hellotimer.c b/src/modules/hellotimer.c index 57b111b7c..27da7ec1b 100644 --- a/src/modules/hellotimer.c +++ b/src/modules/hellotimer.c @@ -40,7 +40,7 @@ /* Timer callback. */ void timerHandler(RedisModuleCtx *ctx, void *data) { REDISMODULE_NOT_USED(ctx); - printf("Fired %s!\n", data); + printf("Fired %s!\n", (const char *) data); RedisModule_Free(data); } diff --git a/src/multi.cpp b/src/multi.cpp index 262a8f1d0..3383f5a49 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -82,7 +82,6 @@ void discardTransaction(client *c) { /* Flag the transacation as DIRTY_EXEC so that EXEC will fail. * Should be called every time there is an error while queueing a command. */ void flagTransaction(client *c) { - serverAssert(GlobalLocksAcquired()); if (c->flags & CLIENT_MULTI) c->flags |= CLIENT_DIRTY_EXEC; } diff --git a/src/networking.cpp b/src/networking.cpp index 4961eb6c0..1ffc1fea4 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -30,71 +30,18 @@ #include "server.h" #include "atomicvar.h" +#include #include #include #include #include #include +#include "aelocker.h" static void setProtocolError(const char *errstr, client *c); void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); void addReplyBulkCStringCore(client *c, const char *s, bool fAsync); -class AeLocker -{ - bool m_fArmed = false; - -public: - AeLocker() - { - } - - void arm(client *c) // if a client is passed, then the client is already locked - { - if (c != nullptr) - { - serverAssert(!m_fArmed); - serverAssert(c->lock.fOwnLock()); - - bool fClientLocked = true; - while (!aeTryAcquireLock()) - { - if (fClientLocked) c->lock.unlock(); - fClientLocked = false; - aeAcquireLock(); - if (!c->lock.try_lock()) - { - aeReleaseLock(); - } - else - { - break; - } - } - - m_fArmed = true; - } - else if (!m_fArmed) - { - m_fArmed = true; - aeAcquireLock(); - } - } - - void disarm() - { - serverAssert(m_fArmed); - m_fArmed = false; - aeReleaseLock(); - } - - ~AeLocker() - { - if (m_fArmed) - aeReleaseLock(); - } -}; - /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute * the client output buffer size. */ @@ -167,7 +114,7 @@ client *createClient(int fd, int iel) { selectDb(c,0); uint64_t client_id; - atomicGetIncr(g_pserver->next_client_id,client_id,1); + client_id = g_pserver->next_client_id.fetch_add(1); c->iel = iel; fastlock_init(&c->lock); c->id = client_id; @@ -197,6 +144,7 @@ client *createClient(int fd, int iel) { c->replstate = REPL_STATE_NONE; c->repl_put_online_on_ack = 0; c->reploff = 0; + c->reploff_skipped = 0; c->read_reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; @@ -261,7 +209,7 @@ void clientInstallWriteHandler(client *c) { * we'll not be able to write the whole reply at once. */ c->flags |= CLIENT_PENDING_WRITE; std::unique_lock lockf(g_pserver->rgthreadvar[c->iel].lockPendingWrite); - listAddNodeHead(g_pserver->rgthreadvar[c->iel].clients_pending_write,c); + g_pserver->rgthreadvar[c->iel].clients_pending_write.push_back(c); } } @@ -675,7 +623,7 @@ void setDeferredPushLen(client *c, void *node, long length) { /* Add a double as a bulk reply */ void addReplyDoubleCore(client *c, double d, bool fAsync) { - if (isinf(d)) { + if (std::isinf(d)) { /* Libc in odd systems (Hi Solaris!) will format infinite in a * different way, so better to handle it in an explicit way. */ if (c->resp == 2) { @@ -838,8 +786,11 @@ void addReplyNullCore(client *c, bool fAsync) { } } -void addReplyNull(client *c) { - addReplyNullCore(c, false); +void addReplyNull(client *c, robj_roptr objOldProtocol) { + if (c->resp < 3 && objOldProtocol != nullptr) + addReply(c, objOldProtocol); + else + addReplyNullCore(c, false); } void addReplyNullAsync(client *c) { @@ -931,7 +882,10 @@ void addReplyBulkSdsAsync(client *c, sds s) { /* Add a C null term string as bulk reply */ void addReplyBulkCStringCore(client *c, const char *s, bool fAsync) { if (s == NULL) { - addReplyNullCore(c,fAsync); + if (c->resp < 3) + addReplyCore(c,shared.nullbulk, fAsync); + else + addReplyNullCore(c,fAsync); } else { addReplyBulkCBufferCore(c,s,strlen(s),fAsync); } @@ -1011,6 +965,19 @@ void addReplySubcommandSyntaxError(client *c) { sdsfree(cmd); } +/* Append 'src' client output buffers into 'dst' client output buffers. + * This function clears the output buffers of 'src' */ +void AddReplyFromClient(client *dst, client *src) { + if (prepareClientToWrite(dst, false) != C_OK) + return; + addReplyProto(dst,src->buf, src->bufpos); + if (listLength(src->reply)) + listJoin(dst->reply,src->reply); + dst->reply_bytes += src->reply_bytes; + src->reply_bytes = 0; + src->bufpos = 0; +} + /* Copy 'src' client output buffers into 'dst' client output buffers. * The function takes care of freeing the old output buffers of the * destination client. */ @@ -1130,10 +1097,30 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); int ielCur = ielFromEventLoop(el); - // We always accept on the same thread - aeAcquireLock(); - acceptCommonHandler(cfd,0,cip, ielCur); - aeReleaseLock(); + if (!g_fTestMode) + { + // We always accept on the same thread + LLocalThread: + aeAcquireLock(); + acceptCommonHandler(cfd,0,cip, ielCur); + aeReleaseLock(); + } + else + { + // In test mode we want a good distribution among threads and avoid the main thread + // since the main thread is most likely to work + int iel = IDX_EVENT_LOOP_MAIN; + while (cserver.cthreads > 1 && iel == IDX_EVENT_LOOP_MAIN) + iel = rand() % cserver.cthreads; + if (iel == ielFromEventLoop(el)) + goto LLocalThread; + char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ + acceptCommonHandler(cfd,0,szT, iel); + zfree(szT); + }); + } } } @@ -1155,7 +1142,17 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket); aeAcquireLock(); - acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur); + int ielTarget = rand() % cserver.cthreads; + if (ielTarget == ielCur) + { + acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur); + } + else + { + aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ + acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget); + }); + } aeReleaseLock(); } @@ -1236,9 +1233,10 @@ void unlinkClient(client *c) { /* Remove from the list of pending writes if needed. */ if (c->flags & CLIENT_PENDING_WRITE) { std::unique_lock lockf(g_pserver->rgthreadvar[c->iel].lockPendingWrite); - ln = listSearchKey(g_pserver->rgthreadvar[c->iel].clients_pending_write,c); - serverAssert(ln != NULL); - listDelNode(g_pserver->rgthreadvar[c->iel].clients_pending_write,ln); + auto itr = std::find(g_pserver->rgthreadvar[c->iel].clients_pending_write.begin(), + g_pserver->rgthreadvar[c->iel].clients_pending_write.end(), c); + serverAssert(itr != g_pserver->rgthreadvar[c->iel].clients_pending_write.end()); + g_pserver->rgthreadvar[c->iel].clients_pending_write.erase(itr); c->flags &= ~CLIENT_PENDING_WRITE; } @@ -1379,10 +1377,15 @@ void freeClient(client *c) { * a context where calling freeClient() is not possible, because the client * should be valid for the continuation of the flow of the program. */ void freeClientAsync(client *c) { + /* We need to handle concurrent access to the server.clients_to_close list + * only in the freeClientAsync() function, since it's the only function that + * may access the list while Redis uses I/O threads. All the other accesses + * are in the context of the main thread while the other threads are + * idle. */ if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; - AeLocker lock; - lock.arm(nullptr); std::lock_guardlock)> clientlock(c->lock); + AeLocker lock; + lock.arm(c); c->flags |= CLIENT_CLOSE_ASAP; listAddNodeTail(g_pserver->clients_to_close,c); } @@ -1414,7 +1417,12 @@ client *lookupClientByID(uint64_t id) { } /* Write data in output buffers to client. Return C_OK if the client - * is still valid after the call, C_ERR if it was freed. */ + * is still valid after the call, C_ERR if it was freed because of some + * error. + * + * This function is called by threads, but always with handler_installed + * set to 0. So when handler_installed is set to 0 the function must be + * thread safe. */ int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; clientReplyBlock *o; @@ -1480,7 +1488,7 @@ int writeToClient(int fd, client *c, int handler_installed) { !(c->flags & CLIENT_SLAVE)) break; } - __atomic_fetch_add(&g_pserver->stat_net_output_bytes, totwritten, __ATOMIC_RELAXED); + g_pserver->stat_net_output_bytes += totwritten; if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; @@ -1488,15 +1496,7 @@ int writeToClient(int fd, client *c, int handler_installed) { serverLog(LL_VERBOSE, "Error writing to client: %s", strerror(errno)); lock.unlock(); - if (aeTryAcquireLock()) - { - freeClient(c); - aeReleaseLock(); - } - else - { - freeClientAsync(c); - } + freeClientAsync(c); return C_ERR; } @@ -1515,15 +1515,7 @@ int writeToClient(int fd, client *c, int handler_installed) { /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { lock.unlock(); - if (aeTryAcquireLock()) - { - freeClient(c); - aeReleaseLock(); - } - else - { - freeClientAsync(c); - } + freeClientAsync(c); return C_ERR; } } @@ -1536,7 +1528,14 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { client *c = (client*)privdata; serverAssert(ielFromEventLoop(el) == c->iel); - writeToClient(fd,c,1); + if (writeToClient(fd,c,1) == C_ERR) + { + AeLocker ae; + c->lock.lock(); + ae.arm(c); + if (c->flags & CLIENT_CLOSE_ASAP) + freeClient(c); + } } void ProcessPendingAsyncWrites() @@ -1598,56 +1597,62 @@ void ProcessPendingAsyncWrites() * need to use a syscall in order to install the writable event handler, * get it called, and so forth. */ int handleClientsWithPendingWrites(int iel) { - listIter li; - listNode *ln; - std::unique_lock lockf(g_pserver->rgthreadvar[iel].lockPendingWrite); - list *list = g_pserver->rgthreadvar[iel].clients_pending_write; - int processed = listLength(list); + auto &vec = g_pserver->rgthreadvar[iel].clients_pending_write; + int processed = (int)vec.size(); serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); - listRewind(list,&li); - while((ln = listNext(&li))) { - client *c = (client*)listNodeValue(ln); - std::unique_locklock)> lock(c->lock); + int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; + /* For the fsync=always policy, we want that a given FD is never + * served for reading and writing in the same event loop iteration, + * so that in the middle of receiving the query, and serving it + * to the client, we'll call beforeSleep() that will do the + * actual fsync of AOF to disk. AE_BARRIER ensures that. */ + if (g_pserver->aof_state == AOF_ON && + g_pserver->aof_fsync == AOF_FSYNC_ALWAYS) + { + ae_flags |= AE_BARRIER; + } - c->flags &= ~CLIENT_PENDING_WRITE; - listDelNode(list,ln); + while(!vec.empty()) { + client *c = vec.back(); AssertCorrectThread(c); + c->flags &= ~CLIENT_PENDING_WRITE; + vec.pop_back(); + /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ if (c->flags & CLIENT_PROTECTED) continue; + std::unique_locklock)> lock(c->lock); + /* Try to write buffers to the client socket. */ if (writeToClient(c->fd,c,0) == C_ERR) { - lock.release(); // client is free'd + if (c->flags & CLIENT_CLOSE_ASAP) + { + c->lock.lock(); + AeLocker ae; + ae.arm(c); + freeClient(c); // writeToClient will only async close, but there's no need to wait + } continue; } /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { - int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; - /* For the fsync=always policy, we want that a given FD is never - * served for reading and writing in the same event loop iteration, - * so that in the middle of receiving the query, and serving it - * to the client, we'll call beforeSleep() that will do the - * actual fsync of AOF to disk. AE_BARRIER ensures that. */ - if (g_pserver->aof_state == AOF_ON && - g_pserver->aof_fsync == AOF_FSYNC_ALWAYS) - { - ae_flags |= AE_BARRIER; - } - if (aeCreateFileEvent(g_pserver->rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) freeClientAsync(c); } } - AeLocker locker; - locker.arm(nullptr); - ProcessPendingAsyncWrites(); + if (listLength(serverTL->clients_pending_asyncwrite)) + { + AeLocker locker; + locker.arm(nullptr); + ProcessPendingAsyncWrites(); + } return processed; } @@ -1950,13 +1955,48 @@ int processMultibulkBuffer(client *c) { return C_ERR; } +/* This function calls processCommand(), but also performs a few sub tasks + * that are useful in that context: + * + * 1. It sets the current client to the client 'c'. + * 2. In the case of master clients, the replication offset is updated. + * 3. The client is reset unless there are reasons to avoid doing it. + * + * The function returns C_ERR in case the client was freed as a side effect + * of processing the command, otherwise C_OK is returned. */ +int processCommandAndResetClient(client *c, int flags) { + int deadclient = 0; + serverTL->current_client = c; + if (processCommand(c, flags) == C_OK) { + if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { + /* Update the applied replication offset of our master. */ + c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + } + + /* Don't reset the client structure for clients blocked in a + * module blocking command, so that the reply callback will + * still be able to access the client argv and argc field. + * The client will be reset in unblockClientFromModule(). */ + if (!(c->flags & CLIENT_BLOCKED) || + c->btype != BLOCKED_MODULE) + { + resetClient(c); + } + } + if (serverTL->current_client == NULL) deadclient = 1; + serverTL->current_client = NULL; + /* freeMemoryIfNeeded may flush slave output buffers. This may + * result into a slave, that may be the active client, to be + * freed. */ + return deadclient ? C_ERR : C_OK; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c, int callFlags) { AssertCorrectThread(c); - bool fFreed = false; /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { @@ -2000,60 +2040,47 @@ void processInputBuffer(client *c, int callFlags) { if (c->argc == 0) { resetClient(c); } else { - AeLocker locker; - locker.arm(c); - serverTL->current_client = c; - - /* Only reset the client when the command was executed. */ - if (processCommand(c, callFlags) == C_OK) { - if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { - /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; - } - - /* Don't reset the client structure for clients blocked in a - * module blocking command, so that the reply callback will - * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) - resetClient(c); - } - /* freeMemoryIfNeeded may flush slave output buffers. This may - * result into a slave, that may be the active client, to be - * freed. */ - if (serverTL->current_client == NULL) { - fFreed = true; - break; + /* We are finally ready to execute the command. */ + if (processCommandAndResetClient(c, callFlags) == C_ERR) { + /* If the client is no longer valid, we avoid exiting this + * loop and trimming the client buffer later. So we return + * ASAP in that case. */ + return; } - serverTL->current_client = NULL; } } /* Trim to pos */ - if (!fFreed && c->qb_pos) { + if (c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } } /* This is a wrapper for processInputBuffer that also cares about handling - * the replication forwarding to the sub-slaves, in case the client 'c' + * the replication forwarding to the sub-replicas, in case the client 'c' * is flagged as master. Usually you want to call this instead of the * raw processInputBuffer(). */ void processInputBufferAndReplicate(client *c) { if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c, CMD_CALL_FULL); } else { + /* If the client is a master we need to compute the difference + * between the applied offset before and after processing the buffer, + * to understand how much of the replication stream was actually + * applied to the master state: this quantity, and its corresponding + * part of the replication stream, will be propagated to the + * sub-replicas and to the replication backlog. */ size_t prev_offset = c->reploff; processInputBuffer(c, CMD_CALL_FULL); size_t applied = c->reploff - prev_offset; if (applied) { if (!g_pserver->fActiveReplica) { - aeAcquireLock(); + AeLocker ae; + ae.arm(c); replicationFeedSlavesFromMasterStream(g_pserver->slaves, c->pending_querybuf, applied); - aeReleaseLock(); } sdsrange(c->pending_querybuf,applied,-1); } @@ -2103,16 +2130,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); - lock.unlock(); - aelock.arm(nullptr); - freeClient(c); + freeClientAsync(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); - lock.unlock(); - aelock.arm(nullptr); - freeClient(c); + freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer @@ -2132,10 +2155,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { bytes = sdscatrepr(bytes,c->querybuf,64); serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); - sdsfree(bytes); - lock.unlock(); - aelock.arm(nullptr); - freeClient(c); + sdsfree(bytes); + freeClientAsync(c); return; } @@ -2146,8 +2167,11 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ processInputBufferAndReplicate(c); - aelock.arm(c); - ProcessPendingAsyncWrites(); + if (listLength(serverTL->clients_pending_asyncwrite)) + { + aelock.arm(c); + ProcessPendingAsyncWrites(); + } } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -2497,7 +2521,7 @@ NULL if (c->name) addReplyBulk(c,c->name); else - addReplyNull(c); + addReplyNull(c, shared.nullbulk); } else if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"pause") && c->argc == 3) { long long duration; @@ -2571,7 +2595,9 @@ void helloCommand(client *c) { if (!g_pserver->sentinel_mode) { addReplyBulkCString(c,"role"); - addReplyBulkCString(c,listLength(g_pserver->masters) ? "replica" : "master"); + addReplyBulkCString(c,listLength(g_pserver->masters) ? + g_pserver->fActiveReplica ? "active-replica" : "replica" + : "master"); } addReplyBulkCString(c,"modules"); @@ -2899,3 +2925,4 @@ int processEventsWhileBlocked(int iel) { aeAcquireLock(); return count; } + diff --git a/src/new.cpp b/src/new.cpp new file mode 100644 index 000000000..044257928 --- /dev/null +++ b/src/new.cpp @@ -0,0 +1,24 @@ +#include // std::size_t +#include "server.h" +#include "new.h" + +[[deprecated]] +void *operator new(size_t size) +{ + return zmalloc(size, MALLOC_LOCAL); +} + +void *operator new(size_t size, enum MALLOC_CLASS mclass) +{ + return zmalloc(size, mclass); +} + +void operator delete(void * p) noexcept +{ + zfree(p); +} + +void operator delete(void *p, std::size_t) noexcept +{ + zfree(p); +} \ No newline at end of file diff --git a/src/new.h b/src/new.h index 7ea65e979..69464f127 100644 --- a/src/new.h +++ b/src/new.h @@ -2,22 +2,9 @@ #include // std::size_t [[deprecated]] -inline void *operator new(size_t size) -{ - return zmalloc(size, MALLOC_LOCAL); -} +void *operator new(size_t size); -inline void *operator new(size_t size, enum MALLOC_CLASS mclass) -{ - return zmalloc(size, mclass); -} +void *operator new(size_t size, enum MALLOC_CLASS mclass); -inline void operator delete(void * p) noexcept -{ - zfree(p); -} - -inline void operator delete(void *p, std::size_t) noexcept -{ - zfree(p); -} \ No newline at end of file +void operator delete(void * p) noexcept; +void operator delete(void *p, std::size_t) noexcept; diff --git a/src/object.cpp b/src/object.cpp index 0ca578de1..b57a09b7e 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -624,7 +624,7 @@ int getDoubleFromObject(const robj *o, double *target) { (size_t)(eptr-(char*)szFromObj(o)) != sdslen(szFromObj(o)) || (errno == ERANGE && (value == HUGE_VAL || value == -HUGE_VAL || value == 0)) || - isnan(value)) + std::isnan(value)) return C_ERR; } else if (o->encoding == OBJ_ENCODING_INT) { value = (long)ptrFromObj(o); @@ -666,7 +666,7 @@ int getLongDoubleFromObject(robj *o, long double *target) { (size_t)(eptr-(char*)szFromObj(o)) != sdslen(szFromObj(o)) || (errno == ERANGE && (value == HUGE_VAL || value == -HUGE_VAL || value == 0)) || - isnan(value)) + std::isnan(value)) return C_ERR; } else if (o->encoding == OBJ_ENCODING_INT) { value = (long)szFromObj(o); @@ -1341,7 +1341,7 @@ NULL } } if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) { - addReplyNull(c); + addReplyNull(c, shared.nullbulk); return; } size_t usage = objectComputeSize((robj*)dictGetVal(de),samples); diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 61a6dc373..6a9c2bdfc 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -84,7 +84,7 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel) { if (channel) addReplyBulk(c,channel); else - addReplyNull(c); + addReplyNull(c, shared.nullbulk); addReplyLongLong(c,clientSubscriptionsCount(c)); } @@ -112,7 +112,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { if (pattern) addReplyBulk(c,pattern); else - addReplyNull(c); + addReplyNull(c, shared.nullbulk); addReplyLongLong(c,clientSubscriptionsCount(c)); } diff --git a/src/rdb-s3.cpp b/src/rdb-s3.cpp index e32275ef2..39be87f83 100644 --- a/src/rdb-s3.cpp +++ b/src/rdb-s3.cpp @@ -33,12 +33,19 @@ int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi) else { close(fd[0]); - if (rdbSaveFd(fd[1], rsi) != C_OK) + FILE *fp = fdopen(fd[1], "w"); + if (fp == NULL) { - close(fd[1]); + close (fd[1]); return C_ERR; } - close(fd[1]); + + if (rdbSaveFp(fp, rsi) != C_OK) + { + fclose(fp); + return C_ERR; + } + fclose(fp); waitpid(pid, &status, 0); } @@ -59,7 +66,7 @@ int rdbLoadS3Core(int fd, rdbSaveInfo *rsi) if ((fp = fdopen(fd, "rb")) == NULL) return C_ERR; startLoading(fp); - rioInitWithFile(&rdb,fileno(fp)); + rioInitWithFile(&rdb,fp); retval = rdbLoadRio(&rdb,rsi,0); fclose(fp); stopLoading(); diff --git a/src/rdb.cpp b/src/rdb.cpp index a5f36b95a..2cac89178 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -543,10 +543,10 @@ int rdbSaveDoubleValue(rio *rdb, double val) { unsigned char buf[128]; int len; - if (isnan(val)) { + if (std::isnan(val)) { buf[0] = 253; len = 1; - } else if (!isfinite(val)) { + } else if (!std::isfinite(val)) { len = 1; buf[0] = (val < 0) ? 255 : 254; } else { @@ -1220,12 +1220,12 @@ int rdbSaveRioWithEOFMark(rio *rdb, int *error, rdbSaveInfo *rsi) { return C_ERR; } -int rdbSaveFd(int fd, rdbSaveInfo *rsi) +int rdbSaveFp(FILE *fp, rdbSaveInfo *rsi) { int error = 0; rio rdb; - rioInitWithFile(&rdb,fd); + rioInitWithFile(&rdb,fp); if (g_pserver->rdb_save_incremental_fsync) rioSetAutoSync(&rdb,REDIS_AUTOSYNC_BYTES); @@ -1267,7 +1267,7 @@ int rdbSaveFile(char *filename, rdbSaveInfo *rsi) { return C_ERR; } - if (rdbSaveFd(fileno(fp), rsi) == C_ERR){ + if (rdbSaveFp(fp, rsi) == C_ERR){ goto werr; } @@ -2151,7 +2151,7 @@ int rdbLoadFile(char *filename, rdbSaveInfo *rsi) { if ((fp = fopen(filename,"r")) == NULL) return C_ERR; startLoading(fp); - rioInitWithFile(&rdb,fileno(fp)); + rioInitWithFile(&rdb,fp); retval = rdbLoadRio(&rdb,rsi,0); fclose(fp); stopLoading(); diff --git a/src/rdb.h b/src/rdb.h index 45cfa475a..0ee2cad92 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -141,7 +141,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); int rdbSave(rdbSaveInfo *rsi); int rdbSaveFile(char *filename, rdbSaveInfo *rsi); -int rdbSaveFd(int fd, rdbSaveInfo *rsi); +int rdbSaveFp(FILE *pf, rdbSaveInfo *rsi); int rdbSaveS3(char *path, rdbSaveInfo *rsi); int rdbLoadS3(char *path, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key); diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 8d50303ee..45170f037 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -258,6 +258,19 @@ static redisConfig *getRedisConfig(const char *ip, int port, else fprintf(stderr,"%s: %s\n",hostsocket,err); goto fail; } + + if(config.auth){ + void *authReply = NULL; + redisAppendCommand(c, "AUTH %s", config.auth); + if (REDIS_OK != redisGetReply(c, &authReply)) goto fail; + if (reply) freeReplyObject(reply); + reply = ((redisReply *) authReply); + if (reply->type == REDIS_REPLY_ERROR) { + fprintf(stderr, "ERROR: %s\n", reply->str); + goto fail; + } + } + redisAppendCommand(c, "CONFIG GET %s", "save"); redisAppendCommand(c, "CONFIG GET %s", "appendonly"); @@ -1196,7 +1209,7 @@ static int fetchClusterSlotsConfiguration(client c) { assert(reply->type == REDIS_REPLY_ARRAY); for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; - assert(r->type = REDIS_REPLY_ARRAY); + assert(r->type == REDIS_REPLY_ARRAY); assert(r->elements >= 3); int from, to, slot; from = r->element[0]->integer; @@ -1298,7 +1311,7 @@ int parseOptions(int argc, const char **argv) { if (*p < '0' || *p > '9') goto invalid; } config.randomkeys = 1; - config.randomkeys_keyspacelen = atoi(argv[++i]); + config.randomkeys_keyspacelen = atoi(next); if (config.randomkeys_keyspacelen < 0) config.randomkeys_keyspacelen = 0; } else if (!strcmp(argv[i],"-q")) { diff --git a/src/redis-check-aof.cpp b/src/redis-check-aof.cpp index cb707b9ac..de9ab1f77 100644 --- a/src/redis-check-aof.cpp +++ b/src/redis-check-aof.cpp @@ -37,7 +37,7 @@ snprintf(error, sizeof(error), "0x%16llx: %s", (long long)epos, __buf); \ } -static char error[1024]; +static char error[1044]; static off_t epos; int consumeNewline(char *buf) { diff --git a/src/redis-check-rdb.cpp b/src/redis-check-rdb.cpp index a1194799e..1e8c20428 100644 --- a/src/redis-check-rdb.cpp +++ b/src/redis-check-rdb.cpp @@ -186,7 +186,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) { int closefile = (fp == NULL); if (fp == NULL && (fp = fopen(rdbfilename,"r")) == NULL) return 1; - rioInitWithFile(&rdb,fileno(fp)); + rioInitWithFile(&rdb,fp); rdbstate.rio = &rdb; rdb.update_cksum = rdbLoadProgressCallback; if (rioRead(&rdb,buf,9) == 0) goto eoferr; diff --git a/src/redis-cli.c b/src/redis-cli.c index ab3de2e73..eae4a1d0e 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -45,6 +45,8 @@ #include #include #include +#include +#include #include #include /* use sds.h from hiredis, so that only one set of sds functions will be present in the binary */ @@ -6532,6 +6534,116 @@ static void intrinsicLatencyMode(void) { } } +/*------------------------------------------------------------------------------ + * Message of the day + *--------------------------------------------------------------------------- */ +#ifdef MOTD +#include + +static const char *szMotdCachePath() +{ + static sds sdsMotdCachePath = NULL; + if (sdsMotdCachePath != NULL) + return sdsMotdCachePath; + + struct passwd *pw = getpwuid(getuid()); + if (pw == NULL) + return ""; + const char *homedir = pw->pw_dir; + sdsMotdCachePath = sdsnew(homedir); + sdsMotdCachePath = sdscat(sdsMotdCachePath, "/.keydb-cli-motd"); + return sdsMotdCachePath; +} +static size_t motd_write_callback(void *ptr, size_t size, size_t nmemb, sds *str) +{ + *str = sdscatlen(*str, ptr, size*nmemb); + return (size*nmemb); +} + +static char *fetchMOTDFromCache() +{ + struct stat attrib; + if (stat(szMotdCachePath(), &attrib) != 0) + return NULL; + time_t t = attrib.st_mtim.tv_sec; + time_t now = time(NULL); + if ((now - t) < 14400) + { + // If our cache was updated no more than 4 hours ago use it instead of fetching the MOTD + FILE *pf = fopen(szMotdCachePath(), "rb"); + if (pf == NULL) + return NULL; + fseek(pf, 0L, SEEK_END); + long cb = ftell(pf); + fseek(pf, 0L, SEEK_SET); // rewind + sds str = sdsnewlen(NULL, cb); + size_t cbRead = fread(str, 1, cb, pf); + fclose(pf); + if ((long)cbRead != cb) + { + sdsfree(str); + return NULL; + } + return str; + } + return NULL; +} + +static void setMOTDCache(const char *sz) +{ + FILE *pf = fopen(szMotdCachePath(), "wb"); + size_t celem = fwrite(sz, strlen(sz), 1, pf); + (void)celem; // best effort + fclose(pf); +} + +static char *fetchMOTD() +{ + sds str; + CURL *curl; + CURLcode res; + + /* First try and get the string from the cache */ + str = fetchMOTDFromCache(); + if (str != NULL) + return str; + + str = sdsnew(""); + curl = curl_easy_init(); + if(curl) { + curl_easy_setopt(curl, CURLOPT_URL, "http://api.keydb.dev/motd/motd.txt"); + curl_easy_setopt(curl, CURLOPT_FOLLOWLOCATION, 1L); // follow redirects + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 2); // take no more than two seconds + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, motd_write_callback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &str); + + /* Perform the request, res will get the return code */ + res = curl_easy_perform(curl); + /* Check for errors */ + if(res != CURLE_OK) + { + sdsfree(str); + str = NULL; + } + + /* always cleanup */ + curl_easy_cleanup(curl); + + if (str != NULL) + setMOTDCache(str); + } + return str; +} + +#else + +static char *fetchMOTD() +{ + return NULL; +} + +#endif + /*------------------------------------------------------------------------------ * Program main() *--------------------------------------------------------------------------- */ @@ -6700,6 +6812,15 @@ int main(int argc, char **argv) { /* Start interactive mode when no command is provided */ if (argc == 0 && !config.eval) { + /* Show the message of the day if we are interactive */ + if (config.output == OUTPUT_STANDARD) { + char *szMotd = fetchMOTD(); + if (szMotd != NULL) { + printf("Message of the day:\n %s\n", szMotd); + sdsfree(szMotd); + } + } + /* Ignore SIGPIPE in interactive mode to force a reconnect */ signal(SIGPIPE, SIG_IGN); @@ -6711,6 +6832,7 @@ int main(int argc, char **argv) { /* Otherwise, we have some arguments to execute */ if (cliConnect(0) != REDIS_OK) exit(1); + if (config.eval) { return evalMode(argc,argv); } else { diff --git a/src/redis-cli.h b/src/redis-cli.h index bda80c42b..33910c2ce 100644 --- a/src/redis-cli.h +++ b/src/redis-cli.h @@ -276,4 +276,5 @@ redisReply *sendScan(unsigned long long *it); #ifdef __cplusplus } -#endif \ No newline at end of file +#endif + diff --git a/src/replication.cpp b/src/replication.cpp index f37473fc5..14ee002aa 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -323,6 +323,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { char proto[1024]; int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); cchProto = std::min((int)sizeof(proto), cchProto); + long long master_repl_offset_start = g_pserver->master_repl_offset; /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -375,8 +376,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - if (serverTL->current_client && FSameHost(serverTL->current_client, slave)) continue; std::unique_locklock)> lock(slave->lock); + if (serverTL->current_client && FSameHost(serverTL->current_client, slave)) + { + slave->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; + continue; + } if (!fSendRaw) addReplyProtoAsync(slave, proto, cchProto); @@ -1290,6 +1295,7 @@ void replicationCreateMasterClient(redisMaster *mi, int fd, int dbid) { mi->master->flags |= CLIENT_MASTER; mi->master->authenticated = 1; mi->master->reploff = mi->master_initial_offset; + mi->master->reploff_skipped = 0; mi->master->read_reploff = mi->master->reploff; mi->master->puser = NULL; /* This client can do everything. */ @@ -2192,8 +2198,10 @@ int connectWithMaster(redisMaster *mi) { void undoConnectWithMaster(redisMaster *mi) { int fd = mi->repl_transfer_s; - aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE); - close(fd); + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{ + aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE); + close(fd); + }); mi->repl_transfer_s = -1; } @@ -2240,7 +2248,8 @@ struct redisMaster *replicationAddMaster(char *ip, int port) { while ((ln = listNext(&li))) { redisMaster *miCheck = (redisMaster*)listNodeValue(ln); - serverAssert(strcasecmp(miCheck->masterhost, ip) || miCheck->masterport != port); + if (strcasecmp(miCheck->masterhost, ip)==0 && miCheck->masterport == port) + return nullptr; } // Pre-req satisfied, lets continue @@ -2335,12 +2344,15 @@ void replicationUnsetMaster(redisMaster *mi) { /* This function is called when the slave lose the connection with the * master into an unexpected way. */ void replicationHandleMasterDisconnection(redisMaster *mi) { - mi->master = NULL; - mi->repl_state = REPL_STATE_CONNECT; - mi->repl_down_since = g_pserver->unixtime; - /* We lost connection with our master, don't disconnect slaves yet, - * maybe we'll be able to PSYNC with our master later. We'll disconnect - * the slaves only if we'll have to do a full resync with our master. */ + if (mi != nullptr) + { + mi->master = NULL; + mi->repl_state = REPL_STATE_CONNECT; + mi->repl_down_since = g_pserver->unixtime; + /* We lost connection with our master, don't disconnect slaves yet, + * maybe we'll be able to PSYNC with our master later. We'll disconnect + * the slaves only if we'll have to do a full resync with our master. */ + } } void replicaofCommand(client *c) { @@ -2380,26 +2392,18 @@ void replicaofCommand(client *c) { if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) return; - /* Check if we are already attached to the specified slave */ - listIter li; - listNode *ln; - listRewind(g_pserver->masters, &li); - while ((ln = listNext(&li))) + redisMaster *miNew = replicationAddMaster((char*)ptrFromObj(c->argv[1]), port); + if (miNew == nullptr) { - redisMaster *mi = (redisMaster*)listNodeValue(ln); - if (!strcasecmp(mi->masterhost,(const char*)ptrFromObj(c->argv[1])) - && mi->masterport == port) { - serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " - "with the master we are already connected " - "with. No operation performed."); - addReplySds(c,sdsnew("+OK Already connected to specified " - "master\r\n")); - return; - } + // We have a duplicate + serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " + "with the master we are already connected " + "with. No operation performed."); + addReplySds(c,sdsnew("+OK Already connected to specified " + "master\r\n")); + return; } - /* There was no previous master or the user specified a different one, - * we can continue. */ - redisMaster *miNew = replicationAddMaster((char*)ptrFromObj(c->argv[1]), port); + sds client = catClientInfoString(sdsempty(),c); serverLog(LL_NOTICE,"REPLICAOF %s:%d enabled (user request from '%s')", miNew->masterhost, miNew->masterport, client); @@ -2436,7 +2440,7 @@ void roleCommand(client *c) { addReplyArrayLen(c,3); addReplyBulkCString(c,slaveip); addReplyBulkLongLong(c,slave->slave_listening_port); - addReplyBulkLongLong(c,slave->repl_ack_off); + addReplyBulkLongLong(c,slave->repl_ack_off+slave->reploff_skipped); slaves++; } setDeferredArrayLen(c,mbcount,slaves); @@ -2450,7 +2454,10 @@ void roleCommand(client *c) { redisMaster *mi = (redisMaster*)listNodeValue(ln); const char *slavestate = NULL; addReplyArrayLen(c,5); - addReplyBulkCBuffer(c,"slave",5); + if (g_pserver->fActiveReplica) + addReplyBulkCBuffer(c,"active-replica",14); + else + addReplyBulkCBuffer(c,"slave",5); addReplyBulkCString(c,mi->masterhost); addReplyLongLong(c,mi->masterport); if (slaveIsInHandshakeState(mi)) { @@ -2781,7 +2788,7 @@ int replicationCountAcksByOffset(long long offset) { client *slave = (client*)ln->value; if (slave->replstate != SLAVE_STATE_ONLINE) continue; - if (slave->repl_ack_off >= offset) count++; + if ((slave->repl_ack_off + slave->reploff_skipped) >= offset) count++; } return count; } @@ -2793,7 +2800,7 @@ void waitCommand(client *c) { long numreplicas, ackreplicas; long long offset = c->woff; - if (listLength(g_pserver->masters)) { + if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) { addReplyError(c,"WAIT cannot be used with replica instances. Please also note that since Redis 4.0 if a replica is configured to be writable (which is not the default) writes to replicas are just local and are not propagated."); return; } diff --git a/src/rio.cpp b/src/rio.cpp index 3c0c7672a..d6d1937eb 100644 --- a/src/rio.cpp +++ b/src/rio.cpp @@ -109,13 +109,14 @@ void rioInitWithBuffer(rio *r, sds s) { static size_t rioFileWrite(rio *r, const void *buf, size_t len) { size_t retval; - retval = write(r->io.file.fd,buf,len); + retval = fwrite(buf,len,1,r->io.file.fp); r->io.file.buffered += len; if (r->io.file.autosync && r->io.file.buffered >= r->io.file.autosync) { - redis_fsync(r->io.file.fd); + fflush(r->io.file.fp); + redis_fsync(fileno(r->io.file.fp)); r->io.file.buffered = 0; } return retval; @@ -123,18 +124,18 @@ static size_t rioFileWrite(rio *r, const void *buf, size_t len) { /* Returns 1 or 0 for success/failure. */ static size_t rioFileRead(rio *r, void *buf, size_t len) { - return read(r->io.file.fd,buf,len); + return fread(buf,len,1,r->io.file.fp); } /* Returns read/write position in file. */ static off_t rioFileTell(rio *r) { - return lseek(r->io.file.fd, 0, SEEK_CUR); + return ftello(r->io.file.fp); } /* Flushes any buffer to target device if applicable. Returns 1 on success * and 0 on failures. */ static int rioFileFlush(rio *r) { - return (fsync(r->io.file.fd) == 0) ? 1 : 0; + return (fflush(r->io.file.fp) == 0) ? 1 : 0; } static const rio rioFileIO = { @@ -149,9 +150,9 @@ static const rio rioFileIO = { { { NULL, 0 } } /* union for io-specific vars */ }; -void rioInitWithFile(rio *r, int fd) { +void rioInitWithFile(rio *r, FILE *fp) { *r = rioFileIO; - r->io.file.fd = fd; + r->io.file.fp = fp; r->io.file.buffered = 0; r->io.file.autosync = 0; } diff --git a/src/rio.h b/src/rio.h index 172b7f9b2..3ec32263b 100644 --- a/src/rio.h +++ b/src/rio.h @@ -73,7 +73,7 @@ struct _rio { } buffer; /* Stdio file pointer target. */ struct { - int fd; + FILE *fp; off_t buffered; /* Bytes written since last fsync. */ off_t autosync; /* fsync after 'autosync' bytes written. */ } file; @@ -128,7 +128,7 @@ static inline int rioFlush(rio *r) { return r->flush(r); } -void rioInitWithFile(rio *r, int fd); +void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); void rioInitWithFdset(rio *r, int *fds, int numfds); diff --git a/src/scripting.cpp b/src/scripting.cpp index 15f41745e..1548044e2 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -371,7 +371,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { int acl_retval = 0; int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS; struct redisCommand *cmd; - client *c = g_pserver->lua_client; + client *c = serverTL->lua_client; sds reply; // Ensure our client is on the right thread @@ -966,8 +966,11 @@ void scriptingInit(int setup) { lua_State *lua = lua_open(); if (setup) { - g_pserver->lua_client = NULL; - g_pserver->lua_caller = NULL; + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + g_pserver->rgthreadvar[iel].lua_client = createClient(-1, iel); + g_pserver->rgthreadvar[iel].lua_client->flags |= CLIENT_LUA; + } g_pserver->lua_timedout = 0; ldbInit(); } @@ -1117,15 +1120,6 @@ void scriptingInit(int setup) { lua_pcall(lua,0,0,0); } - /* Create the (non connected) client that we use to execute Redis commands - * inside the Lua interpreter. - * Note: there is no need to create it again when this function is called - * by scriptingReset(). */ - if (g_pserver->lua_client == NULL) { - g_pserver->lua_client = createClient(-1, IDX_EVENT_LOOP_MAIN); - g_pserver->lua_client->flags |= CLIENT_LUA; - } - /* Lua beginners often don't use "local", this is likely to introduce * subtle bugs in their code. To prevent problems we protect accesses * to global variables. */ @@ -1272,7 +1266,7 @@ sds luaCreateFunction(client *c, lua_State *lua, robj *body) { * so that we can replicate / write in the AOF all the * EVALSHA commands as EVAL using the original script. */ int retval = dictAdd(g_pserver->lua_scripts,sha,body); - serverAssertWithInfo(c ? c : g_pserver->lua_client,NULL,retval == DICT_OK); + serverAssertWithInfo(c ? c : serverTL->lua_client,NULL,retval == DICT_OK); g_pserver->lua_scripts_mem += sdsZmallocSize(sha) + getStringObjectSdsUsedMemory(body); incrRefCount(body); return sha; @@ -1393,7 +1387,7 @@ void evalGenericCommand(client *c, int evalsha) { luaSetGlobalArray(lua,"ARGV",c->argv+3+numkeys,c->argc-3-numkeys); /* Select the right DB in the context of the Lua client */ - selectDb(g_pserver->lua_client,c->db->id); + selectDb(serverTL->lua_client,c->db->id); /* Set a hook in order to be able to stop the script execution if it * is running for too much time. diff --git a/src/server.cpp b/src/server.cpp index 4d3a1ce69..8ccbab059 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -59,6 +59,9 @@ #include #include #include +#include "aelocker.h" + +int g_fTestMode = false; /* Our shared "common" objects */ @@ -1745,16 +1748,17 @@ void databasesCron(void) { * every object access, and accuracy is not needed. To access a global var is * a lot faster than calling time(NULL) */ void updateCachedTime(void) { - time_t unixtime = time(NULL); - atomicSet(g_pserver->unixtime,unixtime); + g_pserver->unixtime = time(NULL); g_pserver->mstime = mstime(); - /* To get information about daylight saving time, we need to call localtime_r - * and cache the result. However calling localtime_r in this context is safe - * since we will never fork() while here, in the main thread. The logging - * function will call a thread safe version of localtime that has no locks. */ + /* To get information about daylight saving time, we need to call + * localtime_r and cache the result. However calling localtime_r in this + * context is safe since we will never fork() while here, in the main + * thread. The logging function will call a thread safe version of + * localtime that has no locks. */ struct tm tm; - localtime_r(&g_pserver->unixtime,&tm); + time_t ut = g_pserver->unixtime; + localtime_r(&ut,&tm); g_pserver->daylight_active = tm.tm_isdst; } @@ -1783,6 +1787,15 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { UNUSED(id); UNUSED(clientData); + /* If another threads unblocked one of our clients, and this thread has been idle + then beforeSleep won't have a chance to process the unblocking. So we also + process them here in the cron job to ensure they don't starve. + */ + if (listLength(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) + { + processUnblockedClients(IDX_EVENT_LOOP_MAIN); + } + ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up /* Software watchdog: deliver the SIGALRM that will reach the signal @@ -1826,8 +1839,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ - unsigned long lruclock = getLRUClock(); - atomicSet(g_pserver->lruclock,lruclock); + g_pserver->lruclock = getLRUClock(); /* Record the max memory used since the server was started. */ if (zmalloc_used_memory() > g_pserver->stat_peak_memory) @@ -2000,9 +2012,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { flushAppendOnlyFile(0); } - /* Close clients that need to be closed asynchronous */ - freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN); - /* Clear the paused clients flag if needed. */ clientsArePaused(); /* Don't check return value, just use the side effect.*/ @@ -2053,14 +2062,19 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData int iel = ielFromEventLoop(eventLoop); serverAssert(iel != IDX_EVENT_LOOP_MAIN); + + /* If another threads unblocked one of our clients, and this thread has been idle + then beforeSleep won't have a chance to process the unblocking. So we also + process them here in the cron job to ensure they don't starve. + */ + if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) + { + processUnblockedClients(iel); + } - aeAcquireLock(); ProcessPendingAsyncWrites(); // A bug but leave for now, events should clean up after themselves clientsCron(iel); - freeClientsInAsyncFreeQueue(iel); - aeReleaseLock(); - return 1000/g_pserver->hz; } @@ -2119,6 +2133,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); aeAcquireLock(); + /* Close clients that need to be closed asynchronous */ + freeClientsInAsyncFreeQueue(IDX_EVENT_LOOP_MAIN); + /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ @@ -2143,6 +2160,11 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(iel); + aeAcquireLock(); + /* Close clients that need to be closed asynchronous */ + freeClientsInAsyncFreeQueue(iel); + aeReleaseLock(); + /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ @@ -2166,6 +2188,8 @@ void createSharedObjects(void) { shared.ok = createObject(OBJ_STRING,sdsnew("+OK\r\n")); shared.err = createObject(OBJ_STRING,sdsnew("-ERR\r\n")); shared.emptybulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); + shared.emptymultibulk = createObject(OBJ_STRING,sdsnew("*0\r\n")); + shared.nullbulk = createObject(OBJ_STRING,sdsnew("$0\r\n\r\n")); shared.czero = createObject(OBJ_STRING,sdsnew(":0\r\n")); shared.cone = createObject(OBJ_STRING,sdsnew(":1\r\n")); shared.emptyarray = createObject(OBJ_STRING,sdsnew("*0\r\n")); @@ -2287,10 +2311,6 @@ void initMasterInfo(redisMaster *master) void initServerConfig(void) { int j; - serverAssert(pthread_mutex_init(&g_pserver->next_client_id_mutex,NULL) == 0); - serverAssert(pthread_mutex_init(&g_pserver->lruclock_mutex,NULL) == 0); - serverAssert(pthread_mutex_init(&g_pserver->unixtime_mutex,NULL) == 0); - updateCachedTime(); getRandomHexChars(g_pserver->runid,CONFIG_RUN_ID_SIZE); g_pserver->runid[CONFIG_RUN_ID_SIZE] = '\0'; @@ -2405,8 +2425,7 @@ void initServerConfig(void) { g_pserver->lua_time_limit = LUA_SCRIPT_TIME_LIMIT; g_pserver->fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA; - unsigned int lruclock = getLRUClock(); - atomicSet(g_pserver->lruclock,lruclock); + g_pserver->lruclock = getLRUClock(); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -2496,20 +2515,6 @@ void initServerConfig(void) { /* Multithreading */ cserver.cthreads = CONFIG_DEFAULT_THREADS; cserver.fThreadAffinity = CONFIG_DEFAULT_THREAD_AFFINITY; - - g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL); - - /* Create the Redis databases, and initialize other internal state. */ - for (int j = 0; j < cserver.dbnum; j++) { - g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); - g_pserver->db[j].expires = dictCreate(&keyptrDictType,NULL); - g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); - g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); - g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL); - g_pserver->db[j].id = j; - g_pserver->db[j].avg_ttl = 0; - g_pserver->db[j].defrag_later = listCreate(); - } } extern char **environ; @@ -2853,7 +2858,6 @@ static void initNetworking(int fReusePort) static void initServerThread(struct redisServerThreadVars *pvar, int fMain) { - pvar->clients_pending_write = listCreate(); pvar->unblocked_clients = listCreate(); pvar->clients_pending_asyncwrite = listCreate(); pvar->ipfd_count = 0; @@ -2916,6 +2920,20 @@ void initServer(void) { fastlock_init(&g_pserver->flock); + g_pserver->db = (redisDb*)zmalloc(sizeof(redisDb)*cserver.dbnum, MALLOC_LOCAL); + + /* Create the Redis databases, and initialize other internal state. */ + for (int j = 0; j < cserver.dbnum; j++) { + g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); + g_pserver->db[j].expires = dictCreate(&keyptrDictType,NULL); + g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); + g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); + g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL); + g_pserver->db[j].id = j; + g_pserver->db[j].avg_ttl = 0; + g_pserver->db[j].defrag_later = listCreate(); + } + if (g_pserver->syslog_enabled) { openlog(g_pserver->syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, g_pserver->syslog_facility); @@ -3429,8 +3447,14 @@ void call(client *c, int flags) { * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ int processCommand(client *c, int callFlags) { - serverAssert(GlobalLocksAcquired()); - moduleCallCommandFilters(c); + AeLocker locker; + AssertCorrectThread(c); + + if (moduleHasCommandFilters()) + { + locker.arm(c); + moduleCallCommandFilters(c); + } /* The QUIT command is handled separately. Normal command procs will * go through checking for replication and QUIT will cause trouble @@ -3442,10 +3466,6 @@ int processCommand(client *c, int callFlags) { return C_ERR; } - AssertCorrectThread(c); - serverAssert(GlobalLocksAcquired()); - incrementMvccTstamp(); - /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ c->cmd = c->lastcmd = lookupCommand((sds)ptrFromObj(c->argv[0])); @@ -3482,6 +3502,8 @@ int processCommand(client *c, int callFlags) { /* Check if the user can run this command according to the current * ACLs. */ + if (c->puser && !(c->puser->flags & USER_FLAG_ALLCOMMANDS)) + locker.arm(c); // ACLs require the lock int acl_retval = ACLCheckCommandPerm(c); if (acl_retval != ACL_OK) { flagTransaction(c); @@ -3507,6 +3529,7 @@ int processCommand(client *c, int callFlags) { !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { + locker.arm(c); int hashslot; int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, @@ -3522,6 +3545,11 @@ int processCommand(client *c, int callFlags) { } } + incrementMvccTstamp(); + + if (!locker.isArmed()) + locker.arm(c); + /* Handle the maxmemory directive. * * Note that we do not want to reclaim memory if we are here re-entering @@ -3986,8 +4014,7 @@ sds genRedisInfoString(const char *section) { call_uname = 0; } - unsigned int lruclock; - atomicGet(g_pserver->lruclock,lruclock); + unsigned int lruclock = g_pserver->lruclock.load(); info = sdscatprintf(info, "# Server\r\n" "redis_version:%s\r\n" @@ -4299,8 +4326,8 @@ sds genRedisInfoString(const char *section) { g_pserver->stat_numconnections, g_pserver->stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), - g_pserver->stat_net_input_bytes, - g_pserver->stat_net_output_bytes, + g_pserver->stat_net_input_bytes.load(), + g_pserver->stat_net_output_bytes.load(), (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024, (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024, g_pserver->stat_rejected_conn, @@ -4330,7 +4357,8 @@ sds genRedisInfoString(const char *section) { info = sdscatprintf(info, "# Replication\r\n" "role:%s\r\n", - listLength(g_pserver->masters) == 0 ? "master" : "slave"); + listLength(g_pserver->masters) == 0 ? "master" + : g_pserver->fActiveReplica ? "active-replica" : "slave"); if (listLength(g_pserver->masters)) { listIter li; listNode *ln; @@ -4440,7 +4468,7 @@ sds genRedisInfoString(const char *section) { "slave%d:ip=%s,port=%d,state=%s," "offset=%lld,lag=%ld\r\n", slaveid,slaveip,slave->slave_listening_port,state, - slave->repl_ack_off, lag); + (slave->repl_ack_off + slave->reploff_skipped), lag); slaveid++; } } @@ -4913,7 +4941,7 @@ void incrementMvccTstamp() } else { - g_pserver->mvcc_tstamp = ((uint64_t)g_pserver->mstime) << 20; + atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << 20); } } @@ -4967,8 +4995,6 @@ int main(int argc, char **argv) { return sha1Test(argc, argv); } else if (!strcasecmp(argv[2], "util")) { return utilTest(argc, argv); - } else if (!strcasecmp(argv[2], "sds")) { - return sdsTest(argc, argv); } else if (!strcasecmp(argv[2], "endianconv")) { return endianconvTest(argc, argv); } else if (!strcasecmp(argv[2], "crc64")) { diff --git a/src/server.h b/src/server.h index 487600d34..912aba251 100644 --- a/src/server.h +++ b/src/server.h @@ -40,6 +40,7 @@ #include #include +#include #include #include #include @@ -49,6 +50,9 @@ #include #include #include +#include +#include +#include #ifdef __cplusplus extern "C" { #include @@ -84,6 +88,8 @@ typedef long long mstime_t; /* millisecond time type. */ #include "endianconv.h" #include "crc64.h" +extern int g_fTestMode; + struct redisObject; class robj_roptr { @@ -917,7 +923,7 @@ typedef struct client { time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; - int flags; /* Client flags: CLIENT_* macros. */ + std::atomic flags; /* Client flags: CLIENT_* macros. */ int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ @@ -928,6 +934,7 @@ typedef struct client { sds replpreamble; /* Replication DB preamble. */ long long read_reploff; /* Read replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */ + long long reploff_skipped; /* Repl backlog we did not send to this client */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves @@ -977,7 +984,7 @@ struct moduleLoadQueueEntry { }; struct sharedObjectsStruct { - robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space, + robj *crlf, *ok, *err, *emptybulk, *emptymultibulk, *nullbulk, *czero, *cone, *pong, *space, *colon, *queued, *null[4], *nullarray[4], *emptyarray, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr, *outofrangeerr, *noscripterr, *loadingerr, *slowscripterr, *bgsaveerr, @@ -1140,7 +1147,7 @@ struct redisServerThreadVars { aeEventLoop *el; int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ int ipfd_count; /* Used slots in ipfd[] */ - list *clients_pending_write; /* There is to write or install handler. */ + std::vector clients_pending_write; /* There is to write or install handler. */ list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; int cclients; @@ -1148,6 +1155,7 @@ struct redisServerThreadVars { int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a client blocked on a module command needs to be processed. */ + client *lua_client = nullptr; /* The "fake client" to query Redis from Lua */ struct fastlock lockPendingWrite; }; @@ -1156,7 +1164,7 @@ struct redisMaster { char *masterauth; /* AUTH with this password with master */ char *masterhost; /* Hostname of master */ int masterport; /* Port of master */ - client *cached_master; /* Cached master to be reused for PSYNC. */ + client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into @@ -1236,7 +1244,7 @@ struct redisServer { struct redisServerThreadVars rgthreadvar[MAX_EVENT_LOOPS]; - unsigned int lruclock; /* Clock for LRU eviction */ + std::atomic lruclock; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ @@ -1268,7 +1276,7 @@ struct redisServer { mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ - uint64_t next_client_id; /* Next client unique ID. Incremental. */ + std::atomic next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don't accept external connections. */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ @@ -1305,8 +1313,8 @@ struct redisServer { long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ - long long stat_net_input_bytes; /* Bytes read from network. */ - long long stat_net_output_bytes; /* Bytes written to network. */ + std::atomic stat_net_input_bytes; /* Bytes read from network. */ + std::atomic stat_net_output_bytes; /* Bytes written to network. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ /* The following two are used to track instantaneous metrics, like @@ -1317,7 +1325,6 @@ struct redisServer { long long samples[STATS_METRIC_SAMPLES]; int idx; } inst_metric[STATS_METRIC_COUNT]; - /* AOF persistence */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ @@ -1327,6 +1334,7 @@ struct redisServer { off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ + off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ @@ -1464,10 +1472,10 @@ struct redisServer { int list_max_ziplist_size; int list_compress_depth; /* time cache */ - time_t unixtime; /* Unix time sampled every cron cycle. */ - time_t timezone; /* Cached timezone. As set by tzset(). */ - int daylight_active; /* Currently in daylight saving time. */ - long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ + std::atomic unixtime; /* Unix time sampled every cron cycle. */ + time_t timezone; /* Cached timezone. As set by tzset(). */ + int daylight_active; /* Currently in daylight saving time. */ + long long mstime; /* 'unixtime' with milliseconds resolution. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -1493,8 +1501,7 @@ struct redisServer { REDISMODULE_CLUSTER_FLAG_*. */ /* Scripting */ lua_State *lua; /* The Lua interpreter. We use just one for all clients */ - client *lua_client; /* The "fake client" to query Redis from Lua */ - client *lua_caller; /* The client running EVAL right now, or NULL */ + client *lua_caller = nullptr; /* The client running EVAL right now, or NULL */ dict *lua_scripts; /* A dictionary of SHA1 -> Lua scripts */ unsigned long long lua_scripts_mem; /* Cached scripts' memory + oh */ mstime_t lua_time_limit; /* Script timeout in milliseconds */ @@ -1526,12 +1533,6 @@ struct redisServer { int bug_report_start; /* True if bug report header was already logged. */ int watchdog_period; /* Software watchdog period in ms. 0 = off */ - /* Mutexes used to protect atomic variables when atomic builtins are - * not available. */ - pthread_mutex_t lruclock_mutex; - pthread_mutex_t next_client_id_mutex; - pthread_mutex_t unixtime_mutex; - int fActiveReplica; /* Can this replica also be a master? */ struct fastlock flock; @@ -1540,6 +1541,9 @@ struct redisServer { // Lower 20 bits: a counter incrementing for each command executed in the same millisecond // Upper 44 bits: mstime (least significant 44-bits) enough for ~500 years before rollover from date of addition uint64_t mvcc_tstamp; + + /* System hardware info */ + size_t system_memory_size; /* Total memory in system as reported by OS */ }; typedef struct pubsubPattern { @@ -1673,6 +1677,7 @@ void moduleAcquireGIL(int fServerThread); void moduleReleaseGIL(int fServerThread); void moduleNotifyKeyspaceEvent(int type, const char *event, robj *key, int dbid); void moduleCallCommandFilters(client *c); +int moduleHasCommandFilters(); /* Utils */ long long ustime(void); @@ -1703,12 +1708,13 @@ void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask); void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask); void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask); -void addReplyNull(client *c); +void addReplyNull(client *c, robj_roptr objOldProtocol = nullptr); void addReplyNullArray(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj_roptr obj); +void AddReplyFromClient(client *c, client *src); void addReplyBulkCString(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkLongLong(client *c, long long ll); diff --git a/src/sort.cpp b/src/sort.cpp index 6b517b25a..34b86d700 100644 --- a/src/sort.cpp +++ b/src/sort.cpp @@ -475,7 +475,7 @@ void sortCommand(client *c) { vector[j].u.score = strtod(szFromObj(byval),&eptr); if (eptr[0] != '\0' || errno == ERANGE || - isnan(vector[j].u.score)) + std::isnan(vector[j].u.score)) { int_conversion_error = 1; } diff --git a/src/t_hash.cpp b/src/t_hash.cpp index a7d35a926..f2758bdd2 100644 --- a/src/t_hash.cpp +++ b/src/t_hash.cpp @@ -615,7 +615,7 @@ void hincrbyfloatCommand(client *c) { } value += incr; - if (isnan(value) || isinf(value)) { + if (std::isnan(value) || std::isinf(value)) { addReplyError(c,"increment would produce NaN or Infinity"); return; } diff --git a/src/t_list.cpp b/src/t_list.cpp index a65aea8ad..ecef14452 100644 --- a/src/t_list.cpp +++ b/src/t_list.cpp @@ -331,7 +331,7 @@ void lindexCommand(client *c) { addReplyBulk(c,value); decrRefCount(value); } else { - addReplyNull(c); + addReplyNull(c,shared.nullbulk); } } else { serverPanic("Unknown list encoding"); @@ -414,7 +414,7 @@ void lrangeCommand(client *c) { /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { - addReplyNull(c); + addReplyNull(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; @@ -617,7 +617,7 @@ void rpoplpushCommand(client *c) { * the AOF and replication channel. * * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the - * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that + * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that * we can propagate the command properly. * * The function returns C_OK if we are able to serve the client, otherwise diff --git a/src/t_stream.cpp b/src/t_stream.cpp index d67dd3f39..d2b2cbbf0 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -492,14 +492,14 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI streamEncodeID(si->start_key,start); } else { si->start_key[0] = 0; - si->start_key[0] = 0; + si->start_key[1] = 0; } if (end) { streamEncodeID(si->end_key,end); } else { si->end_key[0] = UINT64_MAX; - si->end_key[0] = UINT64_MAX; + si->end_key[1] = UINT64_MAX; } /* Seek the correct node in the radix tree. */ diff --git a/src/t_string.cpp b/src/t_string.cpp index aea74b48d..4cb30eac6 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -28,7 +28,7 @@ */ #include "server.h" -#include /* isnan(), isinf() */ +#include /* isnan(), isinf() */ /*----------------------------------------------------------------------------- * String Commands @@ -408,7 +408,7 @@ void incrbyfloatCommand(client *c) { return; value += incr; - if (isnan(value) || isinf(value)) { + if (std::isnan(value) || std::isinf(value)) { addReplyError(c,"increment would produce NaN or Infinity"); return; } diff --git a/src/t_zset.cpp b/src/t_zset.cpp index ec0c764dd..456348e10 100644 --- a/src/t_zset.cpp +++ b/src/t_zset.cpp @@ -134,7 +134,7 @@ zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) { unsigned int rank[ZSKIPLIST_MAXLEVEL]; int i, level; - serverAssert(!isnan(score)); + serverAssert(!std::isnan(score)); x = zsl->header; for (i = zsl->level-1; i >= 0; i--) { /* store rank that is crossed to reach the insert position */ @@ -530,11 +530,11 @@ static int zslParseRange(robj *min, robj *max, zrangespec *spec) { } else { if (((char*)ptrFromObj(min))[0] == '(') { spec->min = strtod((char*)ptrFromObj(min)+1,&eptr); - if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR; + if (eptr[0] != '\0' || std::isnan(spec->min)) return C_ERR; spec->minex = 1; } else { spec->min = strtod((char*)ptrFromObj(min),&eptr); - if (eptr[0] != '\0' || isnan(spec->min)) return C_ERR; + if (eptr[0] != '\0' || std::isnan(spec->min)) return C_ERR; } } if (max->encoding == OBJ_ENCODING_INT) { @@ -542,11 +542,11 @@ static int zslParseRange(robj *min, robj *max, zrangespec *spec) { } else { if (((char*)ptrFromObj(max))[0] == '(') { spec->max = strtod((char*)ptrFromObj(max)+1,&eptr); - if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR; + if (eptr[0] != '\0' || std::isnan(spec->max)) return C_ERR; spec->maxex = 1; } else { spec->max = strtod((char*)ptrFromObj(max),&eptr); - if (eptr[0] != '\0' || isnan(spec->max)) return C_ERR; + if (eptr[0] != '\0' || std::isnan(spec->max)) return C_ERR; } } @@ -1320,7 +1320,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { double curscore; /* NaN as input is an error regardless of all the other parameters. */ - if (isnan(score)) { + if (std::isnan(score)) { *flags = ZADD_NAN; return 0; } @@ -1339,7 +1339,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { /* Prepare the score for the increment if needed. */ if (incr) { score += curscore; - if (isnan(score)) { + if (std::isnan(score)) { *flags |= ZADD_NAN; return 0; } @@ -1385,7 +1385,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int *flags, double *newscore) { /* Prepare the score for the increment if needed. */ if (incr) { score += curscore; - if (isnan(score)) { + if (std::isnan(score)) { *flags |= ZADD_NAN; return 0; } @@ -2150,7 +2150,7 @@ inline static void zunionInterAggregate(double *target, double val, int aggregat /* The result of adding two doubles is NaN when one variable * is +inf and the other is -inf. When these numbers are added, * we maintain the convention of the result being 0.0. */ - if (isnan(*target)) *target = 0.0; + if (std::isnan(*target)) *target = 0.0; } else if (aggregate == REDIS_AGGR_MIN) { *target = val < *target ? val : *target; } else if (aggregate == REDIS_AGGR_MAX) { @@ -2283,7 +2283,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { double score, value; score = src[0].weight * zval.score; - if (isnan(score)) score = 0; + if (std::isnan(score)) score = 0; for (j = 1; j < setnum; j++) { /* It is not safe to access the zset we are @@ -2330,7 +2330,7 @@ void zunionInterGenericCommand(client *c, robj *dstkey, int op) { while (zuiNext(&src[i],&zval)) { /* Initialize value */ score = src[i].weight * zval.score; - if (isnan(score)) score = 0; + if (std::isnan(score)) score = 0; /* Search for this element in the accumulating dictionary. */ de = dictAddRaw(accumulator,zuiSdsFromValue(&zval),&existing); @@ -2439,7 +2439,7 @@ void zrangeGenericCommand(client *c, int reverse) { /* Invariant: start >= 0, so this test will be true when end < 0. * The range is empty when start > end or start >= length. */ if (start > end || start >= llen) { - addReplyNull(c); + addReplyNull(c,shared.emptymultibulk); return; } if (end >= llen) end = llen-1; @@ -2595,7 +2595,10 @@ void genericZrangebyscoreCommand(client *c, int reverse) { /* No "first" element in the specified interval. */ if (eptr == NULL) { - addReplyNull(c); + if (c->resp < 3) + addReply(c, shared.emptyarray); + else + addReplyNull(c); return; } diff --git a/src/version.h b/src/version.h index ad9d014f7..a736892ce 100644 --- a/src/version.h +++ b/src/version.h @@ -1,2 +1,2 @@ -#define KEYDB_REAL_VERSION "0.9.5" +#define KEYDB_REAL_VERSION "0.9.6" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config diff --git a/src/ziplist.c b/src/ziplist.c index 3b618e63a..4d2e89a45 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -576,7 +576,7 @@ void zipEntry(unsigned char *p, zlentry *e) { /* Create a new empty ziplist. */ unsigned char *ziplistNew(void) { - unsigned int bytes = ZIPLIST_HEADER_SIZE+1; + unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE; unsigned char *zl = zmalloc(bytes, MALLOC_SHARED); ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl index 2530511c9..9a833457e 100644 --- a/tests/integration/psync2.tcl +++ b/tests/integration/psync2.tcl @@ -166,12 +166,15 @@ start_server {} { # Pick a random slave set slave_id [expr {($master_id+1)%5}] set sync_count [status $R($master_id) sync_full] + set sync_partial [status $R($master_id) sync_partial_ok] catch { $R($slave_id) config rewrite $R($slave_id) debug restart } + # note: just waiting for connected_slaves==4 has a race condition since + # we might do the check before the master realized that the slave disconnected wait_for_condition 50 2000 { - [status $R($master_id) connected_slaves] == 4 + [status $R($master_id) sync_partial_ok] == $sync_partial + 1 } else { fail "Replica not reconnecting" } diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl new file mode 100644 index 000000000..4583da321 --- /dev/null +++ b/tests/integration/replication-active.tcl @@ -0,0 +1,86 @@ +start_server {tags {"active-repl"} overrides {active-replica yes}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + set slave_log [srv 0 stdout] + set slave_pid [s process_id] + start_server {overrides {active-replica yes}} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Use a short replication timeout on the slave, so that if there + # are no bugs the timeout is triggered in a reasonable amount + # of time. + $slave config set repl-timeout 5 + + # Start the replication process... + $slave slaveof $master_host $master_port + $master slaveof $slave_host $slave_port + + test {Active replicas report the correct role} { + wait_for_condition 50 100 { + [string match *active-replica* [$slave role]] + } else { + fail "Replica0 does not report the correct role" + } + wait_for_condition 50 100 { + [string match *active-replica* [$master role]] + } else { + fail "Replica1 does not report the correct role" + } + } + + test {Active replicas propogate} { + $master set testkey foo + wait_for_condition 50 500 { + [string match *foo* [$slave get testkey]] + } else { + fail "replication failed to propogate" + } + + $slave set testkey bar + wait_for_condition 50 500 { + [string match bar [$master get testkey]] + } else { + fail "replication failed to propogate in the other direction" + } + } + + test {Active replicas WAIT} { + # Test that wait succeeds since replicas should be syncronized + $master set testkey foo + $slave set testkey2 test + assert_equal {1} [$master wait 1 1000] { "value should propogate + within 1 second" } + assert_equal {1} [$slave wait 1 1000] { "value should propogate + within 1 second" } + + # Now setup a situation where wait should fail + exec kill -SIGSTOP $slave_pid + $master set testkey fee + assert_equal {0} [$master wait 1 1000] { "slave shouldn't be + synchronized since its stopped" } + } + # Resume the replica we paused in the prior test + exec kill -SIGCONT $slave_pid + + test {Active replica expire propogates} { + $master set testkey1 foo + wait_for_condition 50 1000 { + [string match *foo* [$slave get testkey1]] + } else { + fail "Replication failed to propogate" + } + $master pexpire testkey1 200 + after 1000 + assert_equal {0} [$master del testkey1] {"master expired"} + assert_equal {0} [$slave del testkey1] {"slave expired"} + + $slave set testkey1 foo px 200 + after 1000 + assert_equal {0} [$master del testkey1] + assert_equal {0} [$slave del testkey1] + } + } +} diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index a3bce2a4c..bf8682446 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -79,6 +79,32 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec stop_bg_complex_data $load_handle0 stop_bg_complex_data $load_handle1 stop_bg_complex_data $load_handle2 + + # Wait for the slave to reach the "online" + # state from the POV of the master. + set retry 5000 + while {$retry} { + set info [$master info] + if {[string match {*slave0:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slave not correctly synchronized" + } + + # Wait that slave acknowledge it is online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. (-LOADING error) + wait_for_condition 5000 100 { + [lindex [$slave role] 3] eq {connected} + } else { + fail "Slave still not connected after some time" + } + set retry 10 while {$retry && ([$master debug digest] ne [$slave debug digest])}\ { diff --git a/tests/modules/Makefile b/tests/modules/Makefile new file mode 100644 index 000000000..014d20afa --- /dev/null +++ b/tests/modules/Makefile @@ -0,0 +1,24 @@ + +# find the OS +uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') + +# Compile flags for linux / osx +ifeq ($(uname_S),Linux) + SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2 + SHOBJ_LDFLAGS ?= -shared +else + SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c99 -O2 + SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup +endif + +.SUFFIXES: .c .so .xo .o + +all: commandfilter.so + +.c.xo: + $(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ + +commandfilter.xo: ../../src/redismodule.h + +commandfilter.so: commandfilter.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc diff --git a/src/modules/hellofilter.c b/tests/modules/commandfilter.c similarity index 80% rename from src/modules/hellofilter.c rename to tests/modules/commandfilter.c index 448e12983..d25d49c44 100644 --- a/src/modules/hellofilter.c +++ b/tests/modules/commandfilter.c @@ -1,18 +1,18 @@ #define REDISMODULE_EXPERIMENTAL_API -#include "../redismodule.h" +#include "redismodule.h" #include static RedisModuleString *log_key_name; -static const char log_command_name[] = "hellofilter.log"; -static const char ping_command_name[] = "hellofilter.ping"; -static const char unregister_command_name[] = "hellofilter.unregister"; +static const char log_command_name[] = "commandfilter.log"; +static const char ping_command_name[] = "commandfilter.ping"; +static const char unregister_command_name[] = "commandfilter.unregister"; static int in_log_command = 0; static RedisModuleCommandFilter *filter = NULL; -int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { (void) argc; (void) argv; @@ -23,7 +23,7 @@ int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, return REDISMODULE_OK; } -int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { (void) argc; (void) argv; @@ -39,7 +39,7 @@ int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a return REDISMODULE_OK; } -int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModuleString *s = RedisModule_CreateString(ctx, "", 0); @@ -74,9 +74,9 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar return REDISMODULE_OK; } -void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) +void CommandFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) { - if (in_log_command) return; /* don't process our own RM_Call() from HelloFilter_LogCommand() */ + if (in_log_command) return; /* don't process our own RM_Call() from CommandFilter_LogCommand() */ /* Fun manipulations: * - Remove @delme @@ -117,7 +117,7 @@ void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) } int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (RedisModule_Init(ctx,"hellofilter",1,REDISMODULE_APIVER_1) + if (RedisModule_Init(ctx,"commandfilter",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (argc != 2) { @@ -130,18 +130,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_StringToLongLong(argv[1], &noself); if (RedisModule_CreateCommand(ctx,log_command_name, - HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,ping_command_name, - HelloFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,unregister_command_name, - HelloFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if ((filter = RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter, + if ((filter = RedisModule_RegisterCommandFilter(ctx, CommandFilter_CommandFilter, noself ? REDISMODULE_CMDFILTER_NOSELF : 0)) == NULL) return REDISMODULE_ERR; diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index d2f281526..6abbddbbe 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -41,6 +41,7 @@ set ::all_tests { integration/replication-3 integration/replication-4 integration/replication-psync + integration/replication-active integration/aof integration/rdb integration/convert-zipmap-hash-on-load @@ -63,7 +64,6 @@ set ::all_tests { unit/lazyfree unit/wait unit/pendingquerybuf - modules/commandfilter } # Index to the next test to run in the ::all_tests list. set ::next_test 0 @@ -504,7 +504,7 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--only}} { lappend ::only_tests $arg incr j - } elseif {$opt eq {--skiptill}} { + } elseif {$opt eq {--skip-till}} { set ::skip_till $arg incr j } elseif {$opt eq {--list-tests}} { diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index 82c75f82d..058441433 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -108,4 +108,11 @@ start_server {tags {"acl"}} { assert_match {*+debug|segfault*} $cmdstr assert_match {*+acl*} $cmdstr } + + test {ACL #5998 regression: memory leaks adding / removing subcommands} { + r AUTH default "" + r ACL setuser newuser reset -debug +debug|a +debug|b +debug|c + r ACL setuser newuser -debug + # The test framework will detect a leak if any. + } } diff --git a/tests/unit/lazyfree.tcl b/tests/unit/lazyfree.tcl index 1e568ed78..8efb3aecd 100644 --- a/tests/unit/lazyfree.tcl +++ b/tests/unit/lazyfree.tcl @@ -2,11 +2,11 @@ start_server {tags {"lazyfree"}} { test "UNLINK can reclaim memory in background" { set orig_mem [s used_memory] set args {} - for {set i 0} {$i < 100000} {incr i} { + for {set i 0} {$i < 200000} {incr i} { lappend args $i } r sadd myset {*}$args - assert {[r scard myset] == 100000} + assert {[r scard myset] == 200000} set peak_mem [s used_memory] assert {[r unlink myset] == 1} assert {$peak_mem > $orig_mem+1000000} @@ -19,14 +19,13 @@ start_server {tags {"lazyfree"}} { } test "FLUSHDB ASYNC can reclaim memory in background" { - after 500 # Sometimes Redis is busy with a prior operation set orig_mem [s used_memory] set args {} - for {set i 0} {$i < 100000} {incr i} { + for {set i 0} {$i < 200000} {incr i} { lappend args $i } r sadd myset {*}$args - assert {[r scard myset] == 100000} + assert {[r scard myset] == 200000} set peak_mem [s used_memory] r flushdb async assert {$peak_mem > $orig_mem+1000000} diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 1def57af5..0f64ddc18 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -161,7 +161,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} } # make sure master doesn't disconnect slave because of timeout - $master config set repl-timeout 300 ;# 5 minutes + $master config set repl-timeout 1200 ;# 20 minutes (for valgrind and slow machines) $master config set maxmemory-policy allkeys-random $master config set client-output-buffer-limit "replica 100000000 100000000 300" $master config set repl-backlog-size [expr {10*1024}] @@ -212,7 +212,8 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} assert {[$master dbsize] == 100} assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers - assert {$delta < 50*1024 && $delta > -50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + set delta_max [expr {$cmd_count / 2}] ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + assert {$delta < $delta_max && $delta > -$delta_max} $master client kill type slave set killed_used [s -1 used_memory] @@ -221,7 +222,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}] set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}] assert {$killed_slave_buf == 0} - assert {$delta_no_repl > -50*1024 && $delta_no_repl < 50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max} } # unfreeze slave process (after the 'test' succeeded or failed, but before we attempt to terminate the server diff --git a/tests/modules/commandfilter.tcl b/tests/unit/moduleapi/commandfilter.tcl similarity index 86% rename from tests/modules/commandfilter.tcl rename to tests/unit/moduleapi/commandfilter.tcl index 1e5c41d2b..6078f64f2 100644 --- a/tests/modules/commandfilter.tcl +++ b/tests/unit/moduleapi/commandfilter.tcl @@ -1,4 +1,4 @@ -set testmodule [file normalize src/modules/hellofilter.so] +set testmodule [file normalize tests/modules/commandfilter.so] start_server {tags {"modules"}} { r module load $testmodule log-key 0 @@ -27,7 +27,7 @@ start_server {tags {"modules"}} { test {Command Filter applies on RM_Call() commands} { r del log-key - r hellofilter.ping + r commandfilter.ping r lrange log-key 0 -1 } "{ping @log}" @@ -39,13 +39,13 @@ start_server {tags {"modules"}} { test {Command Filter applies on Lua redis.call() that calls a module} { r del log-key - r eval "redis.call('hellofilter.ping')" 0 + r eval "redis.call('commandfilter.ping')" 0 r lrange log-key 0 -1 } "{ping @log}" test {Command Filter is unregistered implicitly on module unload} { r del log-key - r module unload hellofilter + r module unload commandfilter r set mykey @log r lrange log-key 0 -1 } {} @@ -59,14 +59,14 @@ start_server {tags {"modules"}} { assert_equal "{set mykey @log}" [r lrange log-key 0 -1] # Unregister - r hellofilter.unregister + r commandfilter.unregister r del log-key r set mykey @log r lrange log-key 0 -1 } {} - r module unload hellofilter + r module unload commandfilter r module load $testmodule log-key 1 test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { @@ -74,10 +74,10 @@ start_server {tags {"modules"}} { assert_equal "{set mykey @log}" [r lrange log-key 0 -1] r del log-key - r hellofilter.ping + r commandfilter.ping assert_equal {} [r lrange log-key 0 -1] - r eval "redis.call('hellofilter.ping')" 0 + r eval "redis.call('commandfilter.ping')" 0 assert_equal {} [r lrange log-key 0 -1] } diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index 5d625cf45..b205eb31b 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -15,7 +15,7 @@ start_server {tags {"obuf-limits"}} { if {![regexp {omem=([0-9]+)} $c - omem]} break if {$omem > 200000} break } - assert {$omem >= 90000 && $omem < 200000} + assert {$omem >= 80000 && $omem < 200000} $rd1 close } diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 965902456..a0bbba0c5 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -1,3 +1,10 @@ +start_server {tags {"other"} overrides {databases 64}} { + test {CONF-DATABASES - ensure the databases config option is respected} { + r select 63 + r set testkey {foo} + } {OK} +} + start_server {tags {"other"}} { if {$::force_failure} { # This is used just for test suite development purposes.