diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 000000000..c3b5c0f42 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,26 @@ +language: generic +os: osx +matrix: + include: + - os: linux + script: make + env: COMPILER_NAME=gcc CXX=g++-5 CC=gcc-5 + addons: + apt: + packages: + - g++-5 + - nasm + sources: &sources + - llvm-toolchain-precise-3.8 + - ubuntu-toolchain-r-test + - os: linux + script: make MALLOC=libc + env: COMPILER_NAME=clang CXX=clang++-3.8 CC=clang-3.8 CXXFLAGS="-stdlib=libc++" LDFLAGS="-stdlib=libc++" + addons: + apt: + packages: + - clang-3.8 + - libc++-dev + - libc++abi-dev + - nasm + sources: *sources diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 000000000..36c5621af --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM ubuntu:18.04 + + + +RUN apt-get update \ + && DEBIAN_FRONTEND=noninteractive apt-get install -qqy \ + build-essential nasm autotools-dev autoconf libjemalloc-dev tcl tcl-dev \ + && apt-get clean + +CMD make \ No newline at end of file diff --git a/README.md b/README.md index b1a02eef2..c60b6683a 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,6 @@ +![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg) +[![Build Status](https://travis-ci.org/JohnSully/KeyDB.svg?branch=unstable)](https://travis-ci.org/JohnSully/KeyDB) [![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) + What is KeyDB? -------------- @@ -9,6 +12,13 @@ KeyDB has full compatibility with the Redis protocol, modules, and scripts. Thi Try our docker container: https://hub.docker.com/r/eqalpha/keydb +Talk on Gitter: https://gitter.im/KeyDB + +New: Active Replica Support +--------------------------- + +New! KeyDB now has support for Active Replicas. This feature greatly simplifies hot-spare failover and allows you to distribute writes over replicas instead of just a single master. For more information [see the wiki page](https://github.com/JohnSully/KeyDB/wiki/KeyDB-(Redis-Fork):-Active-Replica-Support). + Why fork Redis? --------------- @@ -201,6 +211,26 @@ Future work: - Allow rebalancing of connections to different threads after the connection - Allow multiple readers access to the hashtable concurrently +Docker Build +------------ + +Run the following commands for a full source download and build: + +``` +git clone git@github.com:JohnSully/KeyDB.git +docker run -it --rm `pwd`/KeyDB:/build -w /build devopsdood/keydb-builder make +``` + +Then you have fresh binaries built, you can also pass any other options to the make command above after the word make. E.g. + +```docker run -it --rm `pwd`/KeyDB:/build -w /build devopsdood/keydb-builder make MAllOC=memkind``` + +The above commands will build you binaries in the src directory. Standard `make install` without Docker command will work after if you wish to install + +If you'd prefer you can build the Dockerfile in the repo instead of pulling the above container for use: + +`docker build -t KeyDB .` + Code contributions ----------------- diff --git a/deps/hiredis/sds.h b/deps/hiredis/sds.h index 404f246c4..6b46297c2 100644 --- a/deps/hiredis/sds.h +++ b/deps/hiredis/sds.h @@ -83,7 +83,7 @@ struct __attribute__ ((__packed__)) sdshdr64 { #define SDS_HDR(T,s) ((struct sdshdr##T *)((s)-(sizeof(struct sdshdr##T)))) #define SDS_TYPE_5_LEN(f) ((f)>>SDS_TYPE_BITS) -inline size_t sdslen(const sds s) { +static inline size_t sdslen(const sds s) { unsigned char flags = s[-1]; switch(__builtin_expect((flags&SDS_TYPE_MASK), SDS_TYPE_5)) { diff --git a/redis.conf b/redis.conf index c37989ece..99f66a667 100644 --- a/redis.conf +++ b/redis.conf @@ -1562,3 +1562,8 @@ server-threads 2 # Should KeyDB pin threads to CPUs? By default this is disabled, and KeyDB will not bind threads. # When enabled threads are bount to cores sequentially starting at core 0. # server-thread-affinity true + +# Uncomment the option below to enable Active Active support. Note that +# replicas will still sync in the normal way and incorrect ordering when +# bringing up replicas can result in data loss (the first master will win). +# active-replica yes diff --git a/src/Makefile b/src/Makefile index d9b986e71..57cd3a0c7 100644 --- a/src/Makefile +++ b/src/Makefile @@ -45,8 +45,9 @@ endif endif USEASM?=true -# Do we use our assembly spinlock? +# Do we use our assembly spinlock? X64 only ifeq ($(uname_S),Linux) +ifeq ($(uname_M),x86_64) ifneq ($(@@),32bit) ifeq ($(USEASM),true) ASM_OBJ+= fastlock_x64.o @@ -55,6 +56,11 @@ ifeq ($(USEASM),true) endif endif endif +endif + +ifeq ($(COMPILER_NAME),clang) + CXXFLAGS+= -stdlib=libc++ +endif # To get ARM stack traces if Redis crashes we need a special C flag. ifneq (,$(filter aarch64 armv,$(uname_M))) @@ -86,7 +92,7 @@ endif -include .make-settings FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) -FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) +FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(CXXFLAGS) $(REDIS_CFLAGS) FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG) FINAL_LIBS=-lm DEBUG=-g -ggdb @@ -133,7 +139,7 @@ ifeq ($(uname_S),DragonFly) else # All the other OSes (notably Linux) FINAL_LDFLAGS+= -rdynamic - FINAL_LIBS+=-ldl -pthread -lrt + FINAL_LIBS+=-ldl -pthread -lrt -luuid endif endif endif @@ -229,17 +235,15 @@ persist-settings: distclean .PHONY: persist-settings # Prerequisites target -.make-prerequisites: - @touch $@ - # Clean everything, persist settings and build dependencies if anything changed ifneq ($(strip $(PREV_FINAL_CFLAGS)), $(strip $(FINAL_CFLAGS))) .make-prerequisites: persist-settings -endif - -ifneq ($(strip $(PREV_FINAL_LDFLAGS)), $(strip $(FINAL_LDFLAGS))) +else ifneq ($(strip $(PREV_FINAL_LDFLAGS)), $(strip $(FINAL_LDFLAGS))) .make-prerequisites: persist-settings +else +.make-prerequisites: endif + @touch $@ # keydb-server $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) diff --git a/src/ae_kqueue.c b/src/ae_kqueue.c index 19ac9ffc1..5c83f6464 100644 --- a/src/ae_kqueue.c +++ b/src/ae_kqueue.c @@ -39,10 +39,10 @@ typedef struct aeApiState { } aeApiState; static int aeApiCreate(aeEventLoop *eventLoop) { - aeApiState *state = zmalloc(sizeof(aeApiState)); + aeApiState *state = (aeApiState*)zmalloc(sizeof(aeApiState), MALLOC_LOCAL); if (!state) return -1; - state->events = zmalloc(sizeof(struct kevent)*eventLoop->setsize, MALLOC_LOCAL); + state->events = (struct kevent*)zmalloc(sizeof(struct kevent)*eventLoop->setsize, MALLOC_LOCAL); if (!state->events) { zfree(state); return -1; @@ -58,14 +58,14 @@ static int aeApiCreate(aeEventLoop *eventLoop) { } static int aeApiResize(aeEventLoop *eventLoop, int setsize) { - aeApiState *state = eventLoop->apidata; + aeApiState *state = (aeApiState*)eventLoop->apidata; - state->events = zrealloc(state->events, sizeof(struct kevent)*setsize, MALLOC_LOCAL); + state->events = (struct kevent*)zrealloc(state->events, sizeof(struct kevent)*setsize, MALLOC_LOCAL); return 0; } static void aeApiFree(aeEventLoop *eventLoop) { - aeApiState *state = eventLoop->apidata; + aeApiState *state = (aeApiState*)eventLoop->apidata; close(state->kqfd); zfree(state->events); @@ -73,7 +73,7 @@ static void aeApiFree(aeEventLoop *eventLoop) { } static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; + aeApiState *state = (aeApiState*)eventLoop->apidata; struct kevent ke; if (mask & AE_READABLE) { @@ -88,7 +88,7 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { } static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeApiState *state = eventLoop->apidata; + aeApiState *state = (aeApiState*)eventLoop->apidata; struct kevent ke; if (mask & AE_READABLE) { @@ -102,7 +102,7 @@ static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) { } static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { - aeApiState *state = eventLoop->apidata; + aeApiState *state = (aeApiState*)eventLoop->apidata; int retval, numevents = 0; if (tvp != NULL) { @@ -133,6 +133,6 @@ static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { return numevents; } -static char *aeApiName(void) { +static const char *aeApiName(void) { return "kqueue"; } diff --git a/src/aof.c b/src/aof.c index 69e588e10..431ea8756 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1248,7 +1248,7 @@ int rewriteModuleObject(rio *r, robj *key, robj *o) { RedisModuleIO io; moduleValue *mv = ptrFromObj(o); moduleType *mt = mv->type; - moduleInitIOContext(io,mt,r); + moduleInitIOContext(io,mt,r,key); mt->aof_rewrite(&io,key,mv->value); if (io.ctx) { moduleFreeContext(io.ctx); diff --git a/src/cluster.c b/src/cluster.c index 5052c623a..ec92bc4e3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -4776,7 +4776,7 @@ NULL /* Generates a DUMP-format representation of the object 'o', adding it to the * io stream pointed by 'rio'. This function can't fail. */ -void createDumpPayload(rio *payload, robj *o) { +void createDumpPayload(rio *payload, robj *o, robj *key) { unsigned char buf[2]; uint64_t crc; @@ -4784,7 +4784,7 @@ void createDumpPayload(rio *payload, robj *o) { * byte followed by the serialized object. This is understood by RESTORE. */ rioInitWithBuffer(payload,sdsempty()); serverAssert(rdbSaveObjectType(payload,o)); - serverAssert(rdbSaveObject(payload,o)); + serverAssert(rdbSaveObject(payload,o,key)); /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ @@ -4842,7 +4842,7 @@ void dumpCommand(client *c) { } /* Create the DUMP encoded representation. */ - createDumpPayload(&payload,o); + createDumpPayload(&payload,o,c->argv[1]); /* Transfer to the client */ dumpobj = createObject(OBJ_STRING,payload.io.buffer.ptr); @@ -4915,7 +4915,7 @@ void restoreCommand(client *c) { rioInitWithBuffer(&payload,ptrFromObj(c->argv[3])); if (((type = rdbLoadObjectType(&payload)) == -1) || - ((obj = rdbLoadObject(type,&payload)) == NULL)) + ((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL)) { addReplyError(c,"Bad data format"); return; @@ -5203,7 +5203,7 @@ void migrateCommand(client *c) { /* Emit the payload argument, that is the serialized object using * the DUMP format. */ - createDumpPayload(&payload,ov[j]); + createDumpPayload(&payload,ov[j],kv[j]); serverAssertWithInfo(c,NULL, rioWriteBulkString(&cmd,payload.io.buffer.ptr, sdslen(payload.io.buffer.ptr))); diff --git a/src/config.c b/src/config.c index fe563e7ed..dd8bdfdf8 100644 --- a/src/config.c +++ b/src/config.c @@ -849,6 +849,16 @@ void loadServerConfigFromString(char *config) { err = "Unknown argument: server-thread-affinity expects either true or false"; goto loaderr; } + } else if (!strcasecmp(argv[0], "active-replica") && argc == 2) { + server.fActiveReplica = yesnotoi(argv[1]); + if (server.repl_slave_ro) { + server.repl_slave_ro = FALSE; + serverLog(LL_NOTICE, "Notice: \"active-replica yes\" implies \"replica-read-only no\""); + } + if (server.fActiveReplica == -1) { + server.fActiveReplica = CONFIG_DEFAULT_ACTIVE_REPLICA; + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } @@ -2350,6 +2360,7 @@ int rewriteConfig(char *path) { rewriteConfigYesNoOption(state,"lazyfree-lazy-server-del",server.lazyfree_lazy_server_del,CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL); rewriteConfigYesNoOption(state,"replica-lazy-flush",server.repl_slave_lazy_flush,CONFIG_DEFAULT_SLAVE_LAZY_FLUSH); rewriteConfigYesNoOption(state,"dynamic-hz",server.dynamic_hz,CONFIG_DEFAULT_DYNAMIC_HZ); + rewriteConfigYesNoOption(state,"active-replica",server.fActiveReplica,CONFIG_DEFAULT_ACTIVE_REPLICA); /* Rewrite Sentinel config if in Sentinel mode. */ if (server.sentinel_mode) rewriteConfigSentinelOption(state); diff --git a/src/fastlock.cpp b/src/fastlock.cpp index f265f3908..f1e13a279 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -34,6 +34,20 @@ #include #include #include +#include +#include + +#ifdef __APPLE__ +#include +#ifdef TARGET_OS_MAC +/* The CLANG that ships with Mac OS doesn't have these builtins. + but on x86 they are just normal reads/writes anyways */ +#define __atomic_load_4(ptr, csq) (*(reinterpret_cast(ptr))) +#define __atomic_load_2(ptr, csq) (*(reinterpret_cast(ptr))) + +#define __atomic_store_4(ptr, val, csq) (*(reinterpret_cast(ptr)) = val) +#endif +#endif /**************************************************** * @@ -43,12 +57,28 @@ ****************************************************/ static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); +uint64_t g_longwaits = 0; + +uint64_t fastlock_getlongwaitcount() +{ + return g_longwaits; +} + extern "C" pid_t gettid() { static thread_local int pidCache = -1; +#ifdef __linux__ if (pidCache == -1) pidCache = syscall(SYS_gettid); +#else + if (pidCache == -1) { + uint64_t tidT; + pthread_threadid_np(nullptr, &tidT); + assert(tidT < UINT_MAX); + pidCache = (int)tidT; + } +#endif return pidCache; } @@ -75,7 +105,10 @@ extern "C" void fastlock_lock(struct fastlock *lock) while (__atomic_load_2(&lock->m_ticket.m_active, __ATOMIC_ACQUIRE) != myticket) { if ((++cloops % 1024*1024) == 0) + { sched_yield(); + ++g_longwaits; + } #if defined(__i386__) || defined(__amd64__) __asm__ ("pause"); #endif @@ -103,7 +136,7 @@ extern "C" int fastlock_trylock(struct fastlock *lock) struct ticket ticket_expect { active, active }; struct ticket ticket_setiflocked { active, next }; - if (__atomic_compare_exchange(&lock->m_ticket, &ticket_expect, &ticket_setiflocked, true /*strong*/, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) + if (__atomic_compare_exchange(&lock->m_ticket, &ticket_expect, &ticket_setiflocked, false /*weak*/, __ATOMIC_ACQUIRE, __ATOMIC_ACQUIRE)) { lock->m_depth = 1; __atomic_store_4(&lock->m_pidOwner, gettid(), __ATOMIC_RELEASE); @@ -137,4 +170,4 @@ extern "C" void fastlock_free(struct fastlock *lock) bool fastlock::fOwnLock() { return gettid() == m_pidOwner; -} \ No newline at end of file +} diff --git a/src/fastlock.h b/src/fastlock.h index b5a70c530..b8027de54 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -13,6 +13,8 @@ int fastlock_trylock(struct fastlock *lock); void fastlock_unlock(struct fastlock *lock); void fastlock_free(struct fastlock *lock); +uint64_t fastlock_getlongwaitcount(); // this is a global value + /* End C API */ #ifdef __cplusplus } diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 1b876350f..7645d3baa 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -2,6 +2,7 @@ section .text extern gettid extern sched_yield +extern g_longwaits ; This is the first use of assembly in this codebase, a valid question is WHY? ; The spinlock we implement here is performance critical, and simply put GCC @@ -49,6 +50,8 @@ ALIGN 16 syscall ; give up our timeslice we'll be here a while pop rax pop rsi + mov rcx, g_longwaits + inc qword [rcx] ; increment our long wait counter mov rdi, [rsp] ; our struct pointer is on the stack already xor ecx, ecx ; Reset our loop counter jmp .LLoop ; Get back in the game diff --git a/src/fmacros.h b/src/fmacros.h index a56bb9331..ea663e1a3 100644 --- a/src/fmacros.h +++ b/src/fmacros.h @@ -35,7 +35,12 @@ #if defined(__linux__) #define _GNU_SOURCE 1 #define _DEFAULT_SOURCE 1 + +#include +#if LINUX_VERSION_CODE >= KERNEL_VERSION(3,19,0) +#define HAVE_SO_INCOMING_CPU 1 #endif +#endif // __linux__ #if defined(_AIX) #define _ALL_SOURCE diff --git a/src/module.c b/src/module.c index fda6bcf66..4e4a584af 100644 --- a/src/module.c +++ b/src/module.c @@ -3439,6 +3439,14 @@ RedisModuleCtx *RM_GetContextFromIO(RedisModuleIO *io) { return io->ctx; } +/* Returns a RedisModuleString with the name of the key currently saving or + * loading, when an IO data type callback is called. There is no guarantee + * that the key name is always available, so this may return NULL. + */ +const RedisModuleString *RM_GetKeyNameFromIO(RedisModuleIO *io) { + return io->key; +} + /* -------------------------------------------------------------------------- * Logging * -------------------------------------------------------------------------- */ @@ -5195,6 +5203,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(RetainString); REGISTER_API(StringCompare); REGISTER_API(GetContextFromIO); + REGISTER_API(GetKeyNameFromIO); REGISTER_API(BlockClient); REGISTER_API(UnblockClient); REGISTER_API(IsBlockedReplyRequest); diff --git a/src/networking.cpp b/src/networking.cpp index b608370fb..550f84b9f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -138,6 +138,7 @@ void linkClient(client *c) { * this way removing the client in unlinkClient() will not require * a linear scan, but just a constant time operation. */ c->client_list_node = listLast(server.clients); + if (c->fd != -1) atomicIncr(server.rgthreadvar[c->iel].cclients, 1); uint64_t id = htonu64(c->id); raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); } @@ -225,6 +226,7 @@ client *createClient(int fd, int iel) { c->bufAsync = NULL; c->buflenAsync = 0; c->bufposAsync = 0; + memset(c->uuid, 0, UUID_BINARY_LEN); listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); @@ -1035,6 +1037,7 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { return; } +#ifdef HAVE_SO_INCOMING_CPU // Set thread affinity if (server.fThreadAffinity) { @@ -1044,6 +1047,7 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { serverLog(LL_WARNING, "Failed to set socket affinity"); } } +#endif /* If maxclient directive is set and this is one client more... close the * connection. Note that we create the client instead to check before @@ -1206,6 +1210,8 @@ void unlinkClient(client *c) { aeDeleteFileEvent(server.rgthreadvar[c->iel].el,c->fd,AE_WRITABLE); close(c->fd); c->fd = -1; + + atomicDecr(server.rgthreadvar[c->iel].cclients, 1); } /* Remove from the list of pending writes if needed. */ @@ -2130,7 +2136,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ processInputBufferAndReplicate(c); - aelock.arm(nullptr); + aelock.arm(c); ProcessPendingAsyncWrites(); } diff --git a/src/rdb.c b/src/rdb.c index 31db9d67f..6a06e779e 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -752,7 +752,7 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { /* Save a Redis object. * Returns -1 on error, number of bytes written on success. */ -ssize_t rdbSaveObject(rio *rdb, robj *o) { +ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { ssize_t n = 0, nwritten = 0; if (o->type == OBJ_STRING) { @@ -967,7 +967,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { RedisModuleIO io; moduleValue *mv = ptrFromObj(o); moduleType *mt = mv->type; - moduleInitIOContext(io,mt,rdb); + moduleInitIOContext(io,mt,rdb,key); /* Write the "module" identifier as prefix, so that we'll be able * to call the right module during loading. */ @@ -997,7 +997,7 @@ ssize_t rdbSaveObject(rio *rdb, robj *o) { * this length with very little changes to the code. In the future * we could switch to a faster solution. */ size_t rdbSavedObjectLen(robj *o) { - ssize_t len = rdbSaveObject(NULL,o); + ssize_t len = rdbSaveObject(NULL,o,NULL); serverAssertWithInfo(NULL,o,len != -1); return len; } @@ -1039,7 +1039,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; - if (rdbSaveObject(rdb,val) == -1) return -1; + if (rdbSaveObject(rdb,val,key) == -1) return -1; return 1; } @@ -1401,7 +1401,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. */ -robj *rdbLoadObject(int rdbtype, rio *rdb) { +robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { robj *o = NULL, *ele, *dec; uint64_t len; unsigned int i; @@ -1788,7 +1788,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb) { exit(1); } RedisModuleIO io; - moduleInitIOContext(io,mt,rdb); + moduleInitIOContext(io,mt,rdb,key); io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2; /* Call the rdb_load method of the module providing the 10 bit * encoding version in the lower 10 bits of the module ID. */ @@ -2044,7 +2044,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { /* Read key */ if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ - if ((val = rdbLoadObject(type,rdb)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,rdb,key)) == NULL) goto eoferr; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is diff --git a/src/rdb.h b/src/rdb.h index fcd44e742..7d95ba562 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -144,9 +144,9 @@ int rdbSaveFile(char *filename, rdbSaveInfo *rsi); int rdbSaveFd(int fd, rdbSaveInfo *rsi); int rdbSaveS3(char *path, rdbSaveInfo *rsi); int rdbLoadS3(char *path, rdbSaveInfo *rsi); -ssize_t rdbSaveObject(rio *rdb, robj *o); +ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key); size_t rdbSavedObjectLen(robj *o); -robj *rdbLoadObject(int type, rio *rdb); +robj *rdbLoadObject(int type, rio *rdb, robj *key); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); robj *rdbLoadStringObject(rio *rdb); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index 1af3953e7..2fd8366a5 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -285,7 +285,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { rdbstate.keys++; /* Read value */ rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE; - if ((val = rdbLoadObject(type,&rdb)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,&rdb,key)) == NULL) goto eoferr; /* Check if the key already expired. */ if (expiretime != -1 && expiretime < now) rdbstate.already_expired++; diff --git a/src/redismodule.h b/src/redismodule.h index 272da08df..02941aa96 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -278,6 +278,7 @@ int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, Re void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str); int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b); RedisModuleCtx *REDISMODULE_API_FUNC(RedisModule_GetContextFromIO)(RedisModuleIO *io); +const RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetKeyNameFromIO)(RedisModuleIO *io); long long REDISMODULE_API_FUNC(RedisModule_Milliseconds)(void); void REDISMODULE_API_FUNC(RedisModule_DigestAddStringBuffer)(RedisModuleDigest *md, unsigned char *ele, size_t len); void REDISMODULE_API_FUNC(RedisModule_DigestAddLongLong)(RedisModuleDigest *md, long long ele); @@ -442,6 +443,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(RetainString); REDISMODULE_GET_API(StringCompare); REDISMODULE_GET_API(GetContextFromIO); + REDISMODULE_GET_API(GetKeyNameFromIO); REDISMODULE_GET_API(Milliseconds); REDISMODULE_GET_API(DigestAddStringBuffer); REDISMODULE_GET_API(DigestAddLongLong); diff --git a/src/replication.cpp b/src/replication.cpp index b124a6ae3..651680127 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -38,6 +38,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(int newfd); @@ -74,6 +75,21 @@ char *replicationGetSlaveName(client *c) { return buf; } +static bool FSameHost(client *clientA, client *clientB) +{ + const unsigned char *a = clientA->uuid; + const unsigned char *b = clientB->uuid; + + unsigned char zeroCheck = 0; + for (int i = 0; i < UUID_BINARY_LEN; ++i) + { + if (a[i] != b[i]) + return false; + zeroCheck |= a[i]; + } + return (zeroCheck != 0); // if the UUID is nil then it is never equal +} + /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { @@ -117,7 +133,13 @@ void resizeReplicationBacklog(long long newsize) { void freeReplicationBacklog(void) { serverAssert(GlobalLocksAcquired()); - serverAssert(listLength(server.slaves) == 0); + listIter li; + listNode *ln; + listRewind(server.slaves, &li); + while ((ln = listNext(&li))) { + // server.slaves should be empty, or filled with clients pending close + serverAssert(((client*)listNodeValue(ln))->flags & CLIENT_CLOSE_ASAP); + } zfree(server.repl_backlog); server.repl_backlog = NULL; } @@ -186,7 +208,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * propagate *identical* replication stream. In this way this slave can * advertise the same replication ID as the master (since it shares the * master replication history and has the same backlog and offsets). */ - if (server.masterhost != NULL) return; + if (!server.fActiveReplica && server.masterhost != NULL) return; /* If there aren't slaves, and there is no backlog buffer to populate, * we can return ASAP. */ @@ -226,6 +248,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { while((ln = listNext(&li))) { client *slave = (client*)ln->value; if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (server.current_client && FSameHost(server.current_client, slave)) continue; addReplyAsync(slave,selectcmd); } @@ -268,6 +291,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (server.current_client && FSameHost(server.current_client, slave)) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), @@ -282,10 +306,10 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { addReplyBulkAsync(slave,argv[j]); } - /* Release the lock on all slaves */ listRewind(slaves,&li); while((ln = listNext(&li))) { - ((client*)ln->value)->lock.unlock(); + client *slave = (client*)ln->value; + slave->lock.unlock(); } } @@ -311,6 +335,8 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *slave = (client*)ln->value; std::lock_guardlock)> ulock(slave->lock); + if (FSameHost(slave, server.master)) + continue; // Active Active case, don't feed back /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; @@ -658,9 +684,11 @@ void syncCommand(client *c) { /* Refuse SYNC requests if we are a slave but the link with our master * is not ok... */ - if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { - addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); - return; + if (!server.fActiveReplica) { + if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { + addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); + return; + } } /* SYNC can't be issued when the server has pending data to send to @@ -790,6 +818,33 @@ void syncCommand(client *c) { return; } +void processReplconfUuid(client *c, robj *arg) +{ + try + { + if (arg->type != OBJ_STRING) + throw "Invalid UUID"; + + const char *remoteUUID = (const char*)ptrFromObj(arg); + if (strlen(remoteUUID) != 36) + throw "Invalid UUID"; + + if (uuid_parse(remoteUUID, c->uuid) != 0) + throw "Invalid UUID"; + + char szServerUUID[36 + 2]; // 1 for the '+', another for '\0' + szServerUUID[0] = '+'; + uuid_unparse(server.uuid, szServerUUID+1); + addReplyProto(c, szServerUUID, 37); + addReplyProto(c, "\r\n", 2); + } + catch (const char *szErr) + { + addReplyError(c, szErr); + return; + } +} + /* REPLCONF