diff --git a/MANIFESTO b/MANIFESTO deleted file mode 100644 index 2b719057e..000000000 --- a/MANIFESTO +++ /dev/null @@ -1,67 +0,0 @@ -[Note: this is the Redis manifesto, for general information about - installing and running Redis read the README file instead.] - -Redis Manifesto -=============== - -1 - A DSL for Abstract Data Types. Redis is a DSL (Domain Specific Language) - that manipulates abstract data types and implemented as a TCP daemon. - Commands manipulate a key space where keys are binary-safe strings and - values are different kinds of abstract data types. Every data type - represents an abstract version of a fundamental data structure. For instance - Redis Lists are an abstract representation of linked lists. In Redis, the - essence of a data type isn't just the kind of operations that the data types - support, but also the space and time complexity of the data type and the - operations performed upon it. - -2 - Memory storage is #1. The Redis data set, composed of defined key-value - pairs, is primarily stored in the computer's memory. The amount of memory in - all kinds of computers, including entry-level servers, is increasing - significantly each year. Memory is fast, and allows Redis to have very - predictable performance. Datasets composed of 10k or 40 millions keys will - perform similarly. Complex data types like Redis Sorted Sets are easy to - implement and manipulate in memory with good performance, making Redis very - simple. Redis will continue to explore alternative options (where data can - be optionally stored on disk, say) but the main goal of the project remains - the development of an in-memory database. - -3 - Fundamental data structures for a fundamental API. The Redis API is a direct - consequence of fundamental data structures. APIs can often be arbitrary but - not an API that resembles the nature of fundamental data structures. If we - ever meet intelligent life forms from another part of the universe, they'll - likely know, understand and recognize the same basic data structures we have - in our computer science books. Redis will avoid intermediate layers in API, - so that the complexity is obvious and more complex operations can be - performed as the sum of the basic operations. - -4 - Code is like a poem; it's not just something we write to reach some - practical result. Sometimes people that are far from the Redis philosophy - suggest using other code written by other authors (frequently in other - languages) in order to implement something Redis currently lacks. But to us - this is like if Shakespeare decided to end Enrico IV using the Paradiso from - the Divina Commedia. Is using any external code a bad idea? Not at all. Like - in "One Thousand and One Nights" smaller self contained stories are embedded - in a bigger story, we'll be happy to use beautiful self contained libraries - when needed. At the same time, when writing the Redis story we're trying to - write smaller stories that will fit in to other code. - -5 - We're against complexity. We believe designing systems is a fight against - complexity. We'll accept to fight the complexity when it's worthwhile but - we'll try hard to recognize when a small feature is not worth 1000s of lines - of code. Most of the time the best way to fight complexity is by not - creating it at all. - -6 - Two levels of API. The Redis API has two levels: 1) a subset of the API fits - naturally into a distributed version of Redis and 2) a more complex API that - supports multi-key operations. Both are useful if used judiciously but - there's no way to make the more complex multi-keys API distributed in an - opaque way without violating our other principles. We don't want to provide - the illusion of something that will work magically when actually it can't in - all cases. Instead we'll provide commands to quickly migrate keys from one - instance to another to perform multi-key operations and expose the tradeoffs - to the user. - -7 - We optimize for joy. We believe writing code is a lot of hard work, and the - only way it can be worth is by enjoying it. When there is no longer joy in - writing code, the best thing to do is stop. To prevent this, we'll avoid - taking paths that will make Redis less of a joy to develop. diff --git a/README.md b/README.md index d3963dc85..b1a02eef2 100644 --- a/README.md +++ b/README.md @@ -1,12 +1,14 @@ What is KeyDB? -------------- -KeyDB is a high performance fork of Redis focussing on multithreading, memory efficiency, and high throughput. In addition to multithreading KeyDB also has features only available in Redis Enterprise such as FLASH storage support, and some not available at all such as direct backup to AWS S3. +KeyDB is a high performance fork of Redis focusing on multithreading, memory efficiency, and high throughput. In addition to multithreading KeyDB also has features only available in Redis Enterprise such as FLASH storage support, and some not available at all such as direct backup to AWS S3. On the same hardware KeyDB can perform twice as many queries per second as Redis, with 60% lower latency. KeyDB has full compatibility with the Redis protocol, modules, and scripts. This includes full support for transactions, and atomic execution of scripts. For more information see our architecture section below. +Try our docker container: https://hub.docker.com/r/eqalpha/keydb + Why fork Redis? --------------- @@ -39,7 +41,7 @@ If you would like to use the FLASH backed storage this option configures the dir db-s3-object /path/to/bucket -If you would like KeyDB to dump directly to AWS S3 this option specifies the bucket. Using this option with the traditional RDB options will result in KeyDB backing up twice to both locations. This requires the AWS CLI tools to be installed and configured which are used under the hood to transfer the data. +If you would like KeyDB to dump and load directly to AWS S3 this option specifies the bucket. Using this option with the traditional RDB options will result in KeyDB backing up twice to both locations. If both are specified KeyDB will first attempt to load from the local dump file and if that fails load from S3. This requires the AWS CLI tools to be installed and configured which are used under the hood to transfer the data. All other configuration options behave as you'd expect. Your existing configuration files should continue to work unchanged. @@ -179,6 +181,8 @@ for Ubuntu and Debian systems: % cd utils % ./install_server.sh +_Note_: `install_server.sh` will not work on Mac OSX; it is built for Linux only. + The script will ask you a few questions and will setup everything you need to run KeyDB properly as a background daemon that will start again on system reboots. @@ -189,7 +193,7 @@ You'll be able to stop and start KeyDB using the script named Multithreading Architecture --------------------------- -KeyDB works by running the normal Redis event loop on multiple threads. Network IO, and query parsing are done concurrently. Each connection is assigned a thread on accept(). Access to the core hash table is guarded by spinlock. Because the hashtable access is extremely fast this lock has low contention. Transactions hold the lock for the duration of the EXEC command. Modules work in concert with the GIL which is only acquired when all server threads are paused. This maintains the atomicity gurantees modules expect. +KeyDB works by running the normal Redis event loop on multiple threads. Network IO, and query parsing are done concurrently. Each connection is assigned a thread on accept(). Access to the core hash table is guarded by spinlock. Because the hashtable access is extremely fast this lock has low contention. Transactions hold the lock for the duration of the EXEC command. Modules work in concert with the GIL which is only acquired when all server threads are paused. This maintains the atomicity guarantees modules expect. Unlike most databases the core data structure is the fastest part of the system. Most of the query time comes from parsing the REPL protocol and copying data to/from the network. diff --git a/src/Makefile b/src/Makefile index 31fcc5a2a..d9b986e71 100644 --- a/src/Makefile +++ b/src/Makefile @@ -343,3 +343,6 @@ install: all $(REDIS_INSTALL) $(REDIS_CHECK_RDB_NAME) $(INSTALL_BIN) $(REDIS_INSTALL) $(REDIS_CHECK_AOF_NAME) $(INSTALL_BIN) @ln -sf $(REDIS_SERVER_NAME) $(INSTALL_BIN)/$(REDIS_SENTINEL_NAME) + +uninstall: + rm -f $(INSTALL_BIN)/{$(REDIS_SERVER_NAME),$(REDIS_BENCHMARK_NAME),$(REDIS_CLI_NAME),$(REDIS_CHECK_RDB_NAME),$(REDIS_CHECK_AOF_NAME),$(REDIS_SENTINEL_NAME)} diff --git a/src/acl.c b/src/acl.c index ab10f79b1..b7756df79 100644 --- a/src/acl.c +++ b/src/acl.c @@ -1389,6 +1389,8 @@ void ACLLoadUsersAtStartup(void) { * ACL SETUSER ... acl rules ... * ACL DELUSER [...] * ACL GETUSER + * ACL GENPASS + * ACL WHOAMI */ void aclCommand(client *c) { char *sub = ptrFromObj(c->argv[1]); @@ -1571,6 +1573,10 @@ void aclCommand(client *c) { } dictReleaseIterator(di); setDeferredArrayLen(c,dl,arraylen); + } else if (!strcasecmp(sub,"genpass") && c->argc == 2) { + char pass[32]; /* 128 bits of actual pseudo random data. */ + getRandomHexChars(pass,sizeof(pass)); + addReplyBulkCBuffer(c,pass,sizeof(pass)); } else if (!strcasecmp(sub,"help")) { const char *help[] = { "LOAD -- Reload users from the ACL file.", @@ -1581,6 +1587,7 @@ void aclCommand(client *c) { "DELUSER [...] -- Delete a list of users.", "CAT -- List available categories.", "CAT -- List commands inside category.", +"GENPASS -- Generate a secure user password.", "WHOAMI -- Return the current connection username.", NULL }; diff --git a/src/aof.c b/src/aof.c index e5153cd98..69e588e10 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1621,6 +1621,9 @@ void aofRemoveTempFile(pid_t childpid) { snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid); unlink(tmpfile); + + snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) childpid); + unlink(tmpfile); } /* Update the server.aof_current_size field explicitly using stat(2) diff --git a/src/blocked.c b/src/blocked.c index b0fb127dc..3a3ca76a0 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -77,10 +77,18 @@ int serveClientBlockedOnList(client *receiver, robj *key, robj *dstkey, redisDb * is zero. */ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) { long long tval; + long double ftval; - if (getLongLongFromObjectOrReply(c,object,&tval, - "timeout is not an integer or out of range") != C_OK) - return C_ERR; + if (unit == UNIT_SECONDS) { + if (getLongDoubleFromObjectOrReply(c,object,&ftval, + "timeout is not an float or out of range") != C_OK) + return C_ERR; + tval = (long long) (ftval * 1000.0); + } else { + if (getLongLongFromObjectOrReply(c,object,&tval, + "timeout is not an integer or out of range") != C_OK) + return C_ERR; + } if (tval < 0) { addReplyError(c,"timeout is negative"); @@ -88,7 +96,6 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int } if (tval > 0) { - if (unit == UNIT_SECONDS) tval *= 1000; tval += mstime(); } *timeout = tval; diff --git a/src/cluster.c b/src/cluster.c index 13e303b3d..5052c623a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -3031,6 +3031,7 @@ void clusterHandleSlaveFailover(void) { if (server.cluster->mf_end) { server.cluster->failover_auth_time = mstime(); server.cluster->failover_auth_rank = 0; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } serverLog(LL_WARNING, "Start of election delayed for %lld milliseconds " diff --git a/src/config.c b/src/config.c index da548cba9..fe563e7ed 100644 --- a/src/config.c +++ b/src/config.c @@ -1103,8 +1103,8 @@ void configSetCommand(client *c) { int soft_seconds; class = getClientTypeByName(v[j]); - hard = strtoll(v[j+1],NULL,10); - soft = strtoll(v[j+2],NULL,10); + hard = memtoll(v[j+1],NULL); + soft = memtoll(v[j+2],NULL); soft_seconds = strtoll(v[j+3],NULL,10); server.client_obuf_limits[class].hard_limit_bytes = hard; diff --git a/src/debug.c b/src/debug.c index d24c9ef9c..34e0ab22c 100644 --- a/src/debug.c +++ b/src/debug.c @@ -362,7 +362,7 @@ NULL } emptyDb(-1,EMPTYDB_NO_FLAGS,NULL); protectClient(c); - int ret = rdbLoad(server.rdb_filename,NULL); + int ret = rdbLoad(NULL); unprotectClient(c); if (ret != C_OK) { addReplyError(c,"Error trying to load the RDB dump"); diff --git a/src/geo.c b/src/geo.c index f04e83f5e..7fc52d68d 100644 --- a/src/geo.c +++ b/src/geo.c @@ -659,7 +659,7 @@ void georadiusGeneric(client *c, int flags) { zsetConvertToZiplistIfNeeded(zobj,maxelelen); setKey(c->db,storekey,zobj); decrRefCount(zobj); - notifyKeyspaceEvent(NOTIFY_LIST,"georadiusstore",storekey, + notifyKeyspaceEvent(NOTIFY_ZSET,"georadiusstore",storekey, c->db->id); server.dirty += returned_items; } else if (dbDelete(c->db,storekey)) { diff --git a/src/hyperloglog.c b/src/hyperloglog.c index 3aa0e07ff..6d1a05754 100644 --- a/src/hyperloglog.c +++ b/src/hyperloglog.c @@ -614,6 +614,7 @@ int hllSparseToDense(robj *o) { } else { runlen = HLL_SPARSE_VAL_LEN(p); regval = HLL_SPARSE_VAL_VALUE(p); + if ((runlen + idx) > HLL_REGISTERS) break; /* Overflow. */ while(runlen--) { HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval); idx++; @@ -1013,7 +1014,12 @@ uint64_t hllCount(struct hllhdr *hdr, int *invalid) { double m = HLL_REGISTERS; double E; int j; - int reghisto[HLL_Q+2] = {0}; + /* Note that reghisto size could be just HLL_Q+2, becuase HLL_Q+1 is + * the maximum frequency of the "000...1" sequence the hash function is + * able to return. However it is slow to check for sanity of the + * input: instead we history array at a safe size: overflows will + * just write data to wrong, but correctly allocated, places. */ + int reghisto[64] = {0}; /* Compute register histogram */ if (hdr->encoding == HLL_DENSE) { @@ -1088,6 +1094,7 @@ int hllMerge(uint8_t *max, size_t cmax, robj *hll) { } else { runlen = HLL_SPARSE_VAL_LEN(p); regval = HLL_SPARSE_VAL_VALUE(p); + if ((runlen + i) > HLL_REGISTERS) break; /* Overflow. */ while(runlen--) { if (i < 0 || (size_t)i >= cmax) return C_ERR; diff --git a/src/module.c b/src/module.c index 844bbd8fd..fda6bcf66 100644 --- a/src/module.c +++ b/src/module.c @@ -47,9 +47,21 @@ struct RedisModule { int ver; /* Module version. We use just progressive integers. */ int apiver; /* Module API version as requested during initialization.*/ list *types; /* Module data types. */ + list *usedby; /* List of modules using APIs from this one. */ + list *using; /* List of modules we use some APIs of. */ }; typedef struct RedisModule RedisModule; +/* This represents a shared API. Shared APIs will be used to populate + * the server.sharedapi dictionary, mapping names of APIs exported by + * modules for other modules to use, to their structure specifying the + * function pointer that can be called. */ +struct RedisModuleSharedAPI { + void *func; + RedisModule *module; +}; +typedef struct RedisModuleSharedAPI RedisModuleSharedAPI; + static dict *modules; /* Hash table of modules. SDS -> RedisModule ptr.*/ /* Entries in the context->amqueue array, representing objects to free @@ -510,6 +522,22 @@ void RedisModuleCommandDispatcher(client *c) { cp->func(&ctx,(void**)c->argv,c->argc); moduleHandlePropagationAfterCommandCallback(&ctx); moduleFreeContext(&ctx); + + /* In some cases processMultibulkBuffer uses sdsMakeRoomFor to + * expand the query buffer, and in order to avoid a big object copy + * the query buffer SDS may be used directly as the SDS string backing + * the client argument vectors: sometimes this will result in the SDS + * string having unused space at the end. Later if a module takes ownership + * of the RedisString, such space will be wasted forever. Inside the + * Redis core this is not a problem because tryObjectEncoding() is called + * before storing strings in the key space. Here we need to do it + * for the module. */ + for (int i = 0; i < c->argc; i++) { + /* Only do the work if the module took ownership of the object: + * in that case the refcount is no longer 1. */ + if (c->argv[i]->refcount > 1) + trimStringObjectIfNeeded(c->argv[i]); + } } /* This function returns the list of keys, with the same interface as the @@ -702,6 +730,8 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->ver = ver; module->apiver = apiver; module->types = listCreate(); + module->usedby = listCreate(); + module->using = listCreate(); ctx->module = module; } @@ -1360,6 +1390,9 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * * * REDISMODULE_CTX_FLAGS_MULTI: The command is running inside a transaction * + * * REDISMODULE_CTX_FLAGS_REPLICATED: The command was sent over the replication + * link by the MASTER + * * * REDISMODULE_CTX_FLAGS_MASTER: The Redis instance is a master * * * REDISMODULE_CTX_FLAGS_SLAVE: The Redis instance is a slave @@ -1392,6 +1425,9 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { flags |= REDISMODULE_CTX_FLAGS_LUA; if (ctx->client->flags & CLIENT_MULTI) flags |= REDISMODULE_CTX_FLAGS_MULTI; + /* Module command recieved from MASTER, is replicated. */ + if (ctx->client->flags & CLIENT_MASTER) + flags |= REDISMODULE_CTX_FLAGS_REPLICATED; } if (server.cluster_enabled) @@ -3424,6 +3460,8 @@ void RM_LogRaw(RedisModule *module, const char *levelstr, const char *fmt, va_li else if (!strcasecmp(levelstr,"warning")) level = LL_WARNING; else level = LL_VERBOSE; /* Default. */ + if (level < server.verbosity) return; + name_len = snprintf(msg, sizeof(msg),"<%s> ", module->name); vsnprintf(msg + name_len, sizeof(msg) - name_len, fmt, ap); serverLogRaw(level,msg); @@ -4647,6 +4685,121 @@ void RM_GetRandomHexChars(char *dst, size_t len) { getRandomHexChars(dst,len); } +/* -------------------------------------------------------------------------- + * Modules API exporting / importing + * -------------------------------------------------------------------------- */ + +/* This function is called by a module in order to export some API with a + * given name. Other modules will be able to use this API by calling the + * symmetrical function RM_GetSharedAPI() and casting the return value to + * the right function pointer. + * + * The function will return REDISMODULE_OK if the name is not already taken, + * otherwise REDISMODULE_ERR will be returned and no operation will be + * performed. + * + * IMPORTANT: the apiname argument should be a string literal with static + * lifetime. The API relies on the fact that it will always be valid in + * the future. */ +int RM_ExportSharedAPI(RedisModuleCtx *ctx, const char *apiname, void *func) { + RedisModuleSharedAPI *sapi = zmalloc(sizeof(*sapi), MALLOC_LOCAL); + sapi->module = ctx->module; + sapi->func = func; + if (dictAdd(server.sharedapi, (char*)apiname, sapi) != DICT_OK) { + zfree(sapi); + return REDISMODULE_ERR; + } + return REDISMODULE_OK; +} + +/* Request an exported API pointer. The return value is just a void pointer + * that the caller of this function will be required to cast to the right + * function pointer, so this is a private contract between modules. + * + * If the requested API is not available then NULL is returned. Because + * modules can be loaded at different times with different order, this + * function calls should be put inside some module generic API registering + * step, that is called every time a module attempts to execute a + * command that requires external APIs: if some API cannot be resolved, the + * command should return an error. + * + * Here is an exmaple: + * + * int ... myCommandImplementation() { + * if (getExternalAPIs() == 0) { + * reply with an error here if we cannot have the APIs + * } + * // Use the API: + * myFunctionPointer(foo); + * } + * + * And the function registerAPI() is: + * + * int getExternalAPIs(void) { + * static int api_loaded = 0; + * if (api_loaded != 0) return 1; // APIs already resolved. + * + * myFunctionPointer = RedisModule_GetOtherModuleAPI("..."); + * if (myFunctionPointer == NULL) return 0; + * + * return 1; + * } + */ +void *RM_GetSharedAPI(RedisModuleCtx *ctx, const char *apiname) { + dictEntry *de = dictFind(server.sharedapi, apiname); + if (de == NULL) return NULL; + RedisModuleSharedAPI *sapi = dictGetVal(de); + if (listSearchKey(sapi->module->usedby,ctx->module) == NULL) { + listAddNodeTail(sapi->module->usedby,ctx->module); + listAddNodeTail(ctx->module->using,sapi->module); + } + return sapi->func; +} + +/* Remove all the APIs registered by the specified module. Usually you + * want this when the module is going to be unloaded. This function + * assumes that's caller responsibility to make sure the APIs are not + * used by other modules. + * + * The number of unregistered APIs is returned. */ +int moduleUnregisterSharedAPI(RedisModule *module) { + int count = 0; + dictIterator *di = dictGetSafeIterator(server.sharedapi); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + const char *apiname = dictGetKey(de); + RedisModuleSharedAPI *sapi = dictGetVal(de); + if (sapi->module == module) { + dictDelete(server.sharedapi,apiname); + zfree(sapi); + count++; + } + } + dictReleaseIterator(di); + return count; +} + +/* Remove the specified module as an user of APIs of ever other module. + * This is usually called when a module is unloaded. + * + * Returns the number of modules this module was using APIs from. */ +int moduleUnregisterUsedAPI(RedisModule *module) { + listIter li; + listNode *ln; + int count = 0; + + listRewind(module->using,&li); + while((ln = listNext(&li))) { + RedisModule *used = ln->value; + listNode *ln = listSearchKey(used->usedby,module); + if (ln) { + listDelNode(module->using,ln); + count++; + } + } + return count; +} + /* -------------------------------------------------------------------------- * Modules API internals * -------------------------------------------------------------------------- */ @@ -4792,6 +4945,8 @@ int moduleLoad(const char *path, void **module_argv, int module_argc) { if (onload((void*)&ctx,module_argv,module_argc) == REDISMODULE_ERR) { if (ctx.module) { moduleUnregisterCommands(ctx.module); + moduleUnregisterSharedAPI(ctx.module); + moduleUnregisterUsedAPI(ctx.module); moduleFreeModuleStructure(ctx.module); } dlclose(handle); @@ -4821,14 +4976,17 @@ int moduleUnload(sds name) { if (module == NULL) { errno = ENOENT; return REDISMODULE_ERR; - } - - if (listLength(module->types)) { + } else if (listLength(module->types)) { errno = EBUSY; return REDISMODULE_ERR; + } else if (listLength(module->usedby)) { + errno = EPERM; + return REDISMODULE_ERR; } moduleUnregisterCommands(module); + moduleUnregisterSharedAPI(module); + moduleUnregisterUsedAPI(module); /* Remove any notification subscribers this module might have */ moduleUnsubscribeNotifications(module); @@ -4909,7 +5067,12 @@ NULL errmsg = "no such module with that name"; break; case EBUSY: - errmsg = "the module exports one or more module-side data types, can't unload"; + errmsg = "the module exports one or more module-side data " + "types, can't unload"; + break; + case EPERM: + errmsg = "the module exports APIs used by other modules. " + "Please unload them first and try again"; break; default: errmsg = "operation not possible."; @@ -4934,6 +5097,7 @@ size_t moduleCount(void) { * file so that's easy to seek it to add new entries. */ void moduleRegisterCoreAPI(void) { server.moduleapi = dictCreate(&moduleAPIDictType,NULL); + server.sharedapi = dictCreate(&moduleAPIDictType,NULL); REGISTER_API(Alloc); REGISTER_API(Calloc); REGISTER_API(Realloc); @@ -5084,4 +5248,6 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(DictPrev); REGISTER_API(DictCompareC); REGISTER_API(DictCompare); + REGISTER_API(ExportSharedAPI); + REGISTER_API(GetSharedAPI); } diff --git a/src/object.c b/src/object.c index 600dbfbc9..7112e0593 100644 --- a/src/object.c +++ b/src/object.c @@ -417,6 +417,18 @@ int isObjectRepresentableAsLongLong(robj *o, long long *llval) { } } +/* Optimize the SDS string inside the string object to require little space, + * in case there is more than 10% of free space at the end of the SDS + * string. This happens because SDS strings tend to overallocate to avoid + * wasting too much time in allocations when appending to the string. */ +void trimStringObjectIfNeeded(robj *o) { + if (o->encoding == OBJ_ENCODING_RAW && + sdsavail(ptrFromObj(o)) > sdslen(ptrFromObj(o))/10) + { + o->m_ptr = sdsRemoveFreeSpace(ptrFromObj(o)); + } +} + /* Try to encode a string object in order to save space */ robj *tryObjectEncoding(robj *o) { long value; @@ -486,11 +498,7 @@ robj *tryObjectEncoding(robj *o) { * We do that only for relatively large strings as this branch * is only entered if the length of the string is greater than * OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */ - if (o->encoding == OBJ_ENCODING_RAW && - sdsavail(s) > len/10) - { - o->m_ptr = sdsRemoveFreeSpace(ptrFromObj(o)); - } + trimStringObjectIfNeeded(o); /* Return the original object. */ return o; @@ -1197,7 +1205,7 @@ sds getMemoryDoctorReport(void) { /* Set the object LRU/LFU depending on server.maxmemory_policy. * The lfu_freq arg is only relevant if policy is MAXMEMORY_FLAG_LFU. - * The lru_idle and lru_clock args are only relevant if policy + * The lru_idle and lru_clock args are only relevant if policy * is MAXMEMORY_FLAG_LRU. * Either or both of them may be <0, in that case, nothing is set. */ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, @@ -1208,16 +1216,20 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, val->lru = (LFUGetTimeInMinutes()<<8) | lfu_freq; } } else if (lru_idle >= 0) { - /* Serialized LRU idle time is in seconds. Scale + /* Provided LRU idle time is in seconds. Scale * according to the LRU clock resolution this Redis * instance was compiled with (normally 1000 ms, so the * below statement will expand to lru_idle*1000/1000. */ lru_idle = lru_idle*1000/LRU_CLOCK_RESOLUTION; - val->lru = lru_clock - lru_idle; - /* If the lru field overflows (since LRU it is a wrapping - * clock), the best we can do is to provide the maximum - * representable idle time. */ - if (val->lru < 0) val->lru = lru_clock+1; + long lru_abs = lru_clock - lru_idle; /* Absolute access time. */ + /* If the LRU field underflows (since LRU it is a wrapping + * clock), the best we can do is to provide a large enough LRU + * that is half-way in the circlular LRU clock we use: this way + * the computed idle time for this object will stay high for quite + * some time. */ + if (lru_abs < 0) + lru_abs = (lru_clock+(LRU_CLOCK_MAX/2)) % LRU_CLOCK_MAX; + val->lru = lru_abs; } } diff --git a/src/rdb-s3.cpp b/src/rdb-s3.cpp index bd00bb2bd..f28bd07d5 100644 --- a/src/rdb-s3.cpp +++ b/src/rdb-s3.cpp @@ -48,4 +48,64 @@ extern "C" int rdbSaveS3(char *s3bucket, rdbSaveInfo *rsi) serverLog(LL_NOTICE,"DB saved on AWS S3"); return (status == EXIT_SUCCESS) ? C_OK : C_ERR; -} \ No newline at end of file +} + + +int rdbLoadS3Core(int fd, rdbSaveInfo *rsi) +{ + FILE *fp; + rio rdb; + int retval; + + if ((fp = fdopen(fd, "rb")) == NULL) return C_ERR; + startLoading(fp); + rioInitWithFile(&rdb,fileno(fp)); + retval = rdbLoadRio(&rdb,rsi,0); + fclose(fp); + stopLoading(); + return retval; +} + +int rdbLoadS3(char *s3bucket, rdbSaveInfo *rsi) +{ + int status = EXIT_FAILURE; + int fd[2]; + if (pipe(fd) != 0) + return C_ERR; + + pid_t pid = fork(); + if (pid < 0) + { + close(fd[0]); + close(fd[1]); + return C_ERR; + } + + if (pid == 0) + { + // child process + dup2(fd[1], STDOUT_FILENO); + close(fd[1]); + close(fd[0]); + execlp("aws", "aws", "s3", "cp", s3bucket, "-", nullptr); + exit(EXIT_FAILURE); + } + else + { + close(fd[1]); + if (rdbLoadS3Core(fd[0], rsi) != C_OK) + { + close(fd[0]); + return C_ERR; + } + close(fd[0]); + waitpid(pid, &status, 0); + } + + if (status != EXIT_SUCCESS) + serverLog(LL_WARNING, "Failed to load DB from AWS S3"); + else + serverLog(LL_NOTICE,"DB loaded from AWS S3"); + + return (status == EXIT_SUCCESS) ? C_OK : C_ERR; +} diff --git a/src/rdb.c b/src/rdb.c index 4a504e815..31db9d67f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -2097,6 +2097,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { return C_ERR; /* Just to avoid warning */ } +int rdbLoadFile(char *filename, rdbSaveInfo *rsi); +int rdbLoad(rdbSaveInfo *rsi) +{ + int err = C_ERR; + if (server.rdb_filename != NULL) + err = rdbLoadFile(server.rdb_filename, rsi); + + if ((err == C_ERR) && server.rdb_s3bucketpath != NULL) + err = rdbLoadS3(server.rdb_s3bucketpath, rsi); + + return err; +} + /* Like rdbLoadRio() but takes a filename instead of a rio stream. The * filename is open for reading and a rio stream object created in order * to do the actual loading. Moreover the ETA displayed in the INFO @@ -2104,7 +2117,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { * * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the * loading code will fiil the information fields in the structure. */ -int rdbLoad(char *filename, rdbSaveInfo *rsi) { +int rdbLoadFile(char *filename, rdbSaveInfo *rsi) { FILE *fp; rio rdb; int retval; diff --git a/src/rdb.h b/src/rdb.h index 2daa49984..fcd44e742 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -135,7 +135,7 @@ uint64_t rdbLoadLen(rio *rdb, int *isencoded); int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj *o); int rdbLoadObjectType(rio *rdb); -int rdbLoad(char *filename, rdbSaveInfo *rsi); +int rdbLoad(rdbSaveInfo *rsi); int rdbSaveBackground(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); @@ -143,6 +143,7 @@ int rdbSave(rdbSaveInfo *rsi); int rdbSaveFile(char *filename, rdbSaveInfo *rsi); int rdbSaveFd(int fd, rdbSaveInfo *rsi); int rdbSaveS3(char *path, rdbSaveInfo *rsi); +int rdbLoadS3(char *path, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj *o); size_t rdbSavedObjectLen(robj *o); robj *rdbLoadObject(int type, rio *rdb); diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index da9522482..6bb3533bf 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -248,11 +248,11 @@ static redisConfig *getRedisConfig(const char *ip, int port, c = redisConnect(ip, port); else c = redisConnectUnix(hostsocket); - if (c->err) { + if (c == NULL || c->err) { fprintf(stderr,"Could not connect to Redis at "); - if (hostsocket == NULL) - fprintf(stderr,"%s:%d: %s\n",ip,port,c->errstr); - else fprintf(stderr,"%s: %s\n",hostsocket,c->errstr); + char *err = (c != NULL ? c->errstr : ""); + if (hostsocket == NULL) fprintf(stderr,"%s:%d: %s\n",ip,port,err); + else fprintf(stderr,"%s: %s\n",hostsocket,err); goto fail; } redisAppendCommand(c, "CONFIG GET %s", "save"); @@ -277,18 +277,16 @@ static redisConfig *getRedisConfig(const char *ip, int port, case 1: cfg->appendonly = sdsnew(value); break; } } - if (reply) freeReplyObject(reply); - if (c) redisFree(c); + freeReplyObject(reply); + redisFree(c); return cfg; fail: - if (reply) freeReplyObject(reply); - if (c) redisFree(c); - zfree(cfg); fprintf(stderr, "ERROR: failed to fetch CONFIG from "); - if (c->connection_type == REDIS_CONN_TCP) - fprintf(stderr, "%s:%d\n", c->tcp.host, c->tcp.port); - else if (c->connection_type == REDIS_CONN_UNIX) - fprintf(stderr, "%s\n", c->unix_sock.path); + if (hostsocket == NULL) fprintf(stderr, "%s:%d\n", ip, port); + else fprintf(stderr, "%s\n", hostsocket); + freeReplyObject(reply); + redisFree(c); + zfree(cfg); return NULL; } static void freeRedisConfig(redisConfig *cfg) { @@ -346,7 +344,9 @@ static void randomizeClientKey(client c) { for (i = 0; i < c->randlen; i++) { char *p = c->randptr[i]+11; - size_t r = random() % config.randomkeys_keyspacelen; + size_t r = 0; + if (config.randomkeys_keyspacelen != 0) + r = random() % config.randomkeys_keyspacelen; size_t j; for (j = 0; j < 12; j++) { @@ -448,27 +448,27 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } - if (config.cluster_mode && is_err && c->cluster_node && - (!strncmp(r->str,"MOVED",5) || - !strcmp(r->str,"ASK"))) - { - /* Try to update slots configuration if the key of the - * command is using the slot hash tag. */ - if (c->staglen && !fetchClusterSlotsConfiguration(c)) - exit(1); - /* - clusterNode *node = c->cluster_node; - assert(node); - if (++node->current_slot_index >= node->slots_count) { - if (config.showerrors) { - fprintf(stderr, "WARN: No more available slots in " - "node %s:%d\n", node->ip, node->port); - } - freeReplyObject(reply); - freeClient(c); - return; + /* Try to update slots configuration if reply error is + * MOVED/ASK/CLUSTERDOWN and the key(s) used by the command + * contain(s) the slot hash tag. */ + if (is_err && c->cluster_node && c->staglen) { + int fetch_slots = 0, do_wait = 0; + if (!strncmp(r->str,"MOVED",5) || !strncmp(r->str,"ASK",3)) + fetch_slots = 1; + else if (!strncmp(r->str,"CLUSTERDOWN",11)) { + /* Usually the cluster is able to recover itself after + * a CLUSTERDOWN error, so try to sleep one second + * before requesting the new configuration. */ + fetch_slots = 1; + do_wait = 1; + printf("Error from server %s:%d: %s\n", + c->cluster_node->ip, + c->cluster_node->port, + r->str); } - */ + if (do_wait) sleep(1); + if (fetch_slots && !fetchClusterSlotsConfiguration(c)) + exit(1); } freeReplyObject(reply); @@ -818,22 +818,37 @@ static void showLatencyReport(void) { } } -static void benchmark(char *title, char *cmd, int len) { +static void initBenchmarkThreads() { int i; + if (config.threads) freeBenchmarkThreads(); + config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*), MALLOC_LOCAL); + for (i = 0; i < config.num_threads; i++) { + benchmarkThread *thread = createBenchmarkThread(i); + config.threads[i] = thread; + } +} + +static void startBenchmarkThreads() { + int i; + for (i = 0; i < config.num_threads; i++) { + benchmarkThread *t = config.threads[i]; + if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){ + fprintf(stderr, "FATAL: Failed to start thread %d.\n", i); + exit(1); + } + } + for (i = 0; i < config.num_threads; i++) + pthread_join(config.threads[i]->thread, NULL); +} + +static void benchmark(char *title, char *cmd, int len) { client c; config.title = title; config.requests_issued = 0; config.requests_finished = 0; - if (config.num_threads) { - if (config.threads) freeBenchmarkThreads(); - config.threads = zmalloc(config.num_threads * sizeof(benchmarkThread*), MALLOC_LOCAL); - for (i = 0; i < config.num_threads; i++) { - benchmarkThread *thread = createBenchmarkThread(i); - config.threads[i] = thread; - } - } + if (config.num_threads) initBenchmarkThreads(); int thread_id = config.num_threads > 0 ? 0 : -1; c = createClient(cmd,len,NULL,thread_id); @@ -841,17 +856,7 @@ static void benchmark(char *title, char *cmd, int len) { config.start = mstime(); if (!config.num_threads) aeMain(config.el); - else { - for (i = 0; i < config.num_threads; i++) { - benchmarkThread *t = config.threads[i]; - if (pthread_create(&(t->thread), NULL, execBenchmarkThread, t)){ - fprintf(stderr, "FATAL: Failed to start thread %d.\n", i); - exit(1); - } - } - for (i = 0; i < config.num_threads; i++) - pthread_join(config.threads[i]->thread, NULL); - } + else startBenchmarkThreads(); config.totlatency = mstime()-config.start; showLatencyReport(); @@ -1284,6 +1289,11 @@ int parseOptions(int argc, const char **argv) { if (config.pipeline <= 0) config.pipeline=1; } else if (!strcmp(argv[i],"-r")) { if (lastarg) goto invalid; + const char *next = argv[++i], *p = next; + if (*p == '-') { + p++; + if (*p < '0' || *p > '9') goto invalid; + } config.randomkeys = 1; config.randomkeys_keyspacelen = atoi(argv[++i]); if (config.randomkeys_keyspacelen < 0) @@ -1555,9 +1565,15 @@ int main(int argc, const char **argv) { if (config.idlemode) { printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); - c = createClient("",0,NULL,-1); /* will never receive a reply */ + int thread_id = -1, use_threads = (config.num_threads > 0); + if (use_threads) { + thread_id = 0; + initBenchmarkThreads(); + } + c = createClient("",0,NULL,thread_id); /* will never receive a reply */ createMissingClients(c); - aeMain(config.el); + if (use_threads) startBenchmarkThreads(); + else aeMain(config.el); /* and will wait for every */ } diff --git a/src/redis-check-aof.c b/src/redis-check-aof.c index a79adb946..4f8c3833a 100644 --- a/src/redis-check-aof.c +++ b/src/redis-check-aof.c @@ -33,8 +33,8 @@ #define ERROR(...) { \ char __buf[1024]; \ - sprintf(__buf, __VA_ARGS__); \ - sprintf(error, "0x%16llx: %s", (long long)epos, __buf); \ + snprintf(__buf, sizeof(__buf), __VA_ARGS__); \ + snprintf(error, sizeof(error), "0x%16llx: %s", (long long)epos, __buf); \ } static char error[1024]; diff --git a/src/redis-cli.c b/src/redis-cli.c index 972f682b6..25f8b775d 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -2269,7 +2269,7 @@ static clusterManagerNode *clusterManagerNewNode(char *ip, int port) { static sds clusterManagerGetNodeRDBFilename(clusterManagerNode *node) { assert(config.cluster_manager_command.backup_dir); sds filename = sdsnew(config.cluster_manager_command.backup_dir); - if (filename[sdslen(filename)] - 1 != '/') + if (filename[sdslen(filename) - 1] != '/') filename = sdscat(filename, "/"); filename = sdscatprintf(filename, "redis-node-%s-%d-%s.rdb", node->ip, node->port, node->name); @@ -2733,6 +2733,36 @@ static sds clusterManagerNodeGetJSON(clusterManagerNode *node, json = sdscatprintf(json, ",\n \"cluster_errors\": %lu", error_count); } + if (node->migrating_count > 0 && node->migrating != NULL) { + int i = 0; + sds migrating = sdsempty(); + for (; i < node->migrating_count; i += 2) { + sds slot = node->migrating[i]; + sds dest = node->migrating[i + 1]; + if (slot && dest) { + if (sdslen(migrating) > 0) migrating = sdscat(migrating, ","); + migrating = sdscatfmt(migrating, "\"%S\": \"%S\"", slot, dest); + } + } + if (sdslen(migrating) > 0) + json = sdscatfmt(json, ",\n \"migrating\": {%S}", migrating); + sdsfree(migrating); + } + if (node->importing_count > 0 && node->importing != NULL) { + int i = 0; + sds importing = sdsempty(); + for (; i < node->importing_count; i += 2) { + sds slot = node->importing[i]; + sds from = node->importing[i + 1]; + if (slot && from) { + if (sdslen(importing) > 0) importing = sdscat(importing, ","); + importing = sdscatfmt(importing, "\"%S\": \"%S\"", slot, from); + } + } + if (sdslen(importing) > 0) + json = sdscatfmt(json, ",\n \"importing\": {%S}", importing); + sdsfree(importing); + } json = sdscat(json, "\n }"); sdsfree(replicate); sdsfree(slots); @@ -2842,7 +2872,7 @@ static void clusterManagerShowClusterInfo(void) { replicas++; } redisReply *reply = CLUSTER_MANAGER_COMMAND(node, "DBSIZE"); - if (reply != NULL || reply->type == REDIS_REPLY_INTEGER) + if (reply != NULL && reply->type == REDIS_REPLY_INTEGER) dbsize = reply->integer; if (dbsize < 0) { char *err = ""; diff --git a/src/redismodule.h b/src/redismodule.h index d18c38881..272da08df 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -85,6 +85,9 @@ #define REDISMODULE_CTX_FLAGS_OOM (1<<10) /* Less than 25% of memory available according to maxmemory. */ #define REDISMODULE_CTX_FLAGS_OOM_WARNING (1<<11) +/* The command was sent over the replication link. */ +#define REDISMODULE_CTX_FLAGS_REPLICATED (1<<12) + #define REDISMODULE_NOTIFY_GENERIC (1<<2) /* g */ #define REDISMODULE_NOTIFY_STRING (1<<3) /* $ */ @@ -332,6 +335,8 @@ void REDISMODULE_API_FUNC(RedisModule_GetRandomBytes)(unsigned char *dst, size_t void REDISMODULE_API_FUNC(RedisModule_GetRandomHexChars)(char *dst, size_t len); void REDISMODULE_API_FUNC(RedisModule_SetDisconnectCallback)(RedisModuleBlockedClient *bc, RedisModuleDisconnectFunc callback); void REDISMODULE_API_FUNC(RedisModule_SetClusterFlags)(RedisModuleCtx *ctx, uint64_t flags); +int REDISMODULE_API_FUNC(RedisModule_ExportSharedAPI)(RedisModuleCtx *ctx, const char *apiname, void *func); +void *REDISMODULE_API_FUNC(RedisModule_GetSharedAPI)(RedisModuleCtx *ctx, const char *apiname); #endif /* This is included inline inside each Redis module. */ @@ -492,6 +497,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(GetRandomBytes); REDISMODULE_GET_API(GetRandomHexChars); REDISMODULE_GET_API(SetClusterFlags); + REDISMODULE_GET_API(ExportSharedAPI); + REDISMODULE_GET_API(GetSharedAPI); #endif if (RedisModule_IsModuleNameBusy && RedisModule_IsModuleNameBusy(name)) return REDISMODULE_ERR; diff --git a/src/replication.cpp b/src/replication.cpp index 899fa3dcf..b124a6ae3 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1334,7 +1334,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { aeDeleteFileEvent(el,server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { + if (rdbLoad(&rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); /* Re-enable the AOF if we disabled it earlier, in order to restore @@ -2102,14 +2102,26 @@ void replicaofCommand(client *c) { } else { long port; + if (c->flags & CLIENT_SLAVE) + { + /* If a client is already a replica they cannot run this command, + * because it involves flushing all replicas (including this + * client) */ + addReplyError(c, "Command is not valid when client is a replica."); + return; + } + if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != C_OK)) return; /* Check if we are already attached to the specified slave */ if (server.masterhost && !strcasecmp(server.masterhost,(const char*)ptrFromObj(c->argv[1])) && server.masterport == port) { - serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed."); - addReplySdsAsync(c,sdsnew("+OK Already connected to specified master\r\n")); + serverLog(LL_NOTICE,"REPLICAOF would result into synchronization " + "with the master we are already connected " + "with. No operation performed."); + addReplySds(c,sdsnew("+OK Already connected to specified " + "master\r\n")); return; } /* There was no previous master or the user specified a different one, diff --git a/src/sds.c b/src/sds.c index dac1f281f..9c0da50cb 100644 --- a/src/sds.c +++ b/src/sds.c @@ -259,8 +259,12 @@ sds sdsRemoveFreeSpace(sds s) { char type, oldtype = s[-1] & SDS_TYPE_MASK; int hdrlen, oldhdrlen = sdsHdrSize(oldtype); size_t len = sdslen(s); + size_t avail = sdsavail(s); sh = (char*)s-oldhdrlen; + /* Return ASAP if there is no space left. */ + if (avail == 0) return s; + /* Check what would be the minimum SDS header that is just good enough to * fit this string. */ type = sdsReqType(len); diff --git a/src/server.cpp b/src/server.cpp index 9fd917866..9d7127b5d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -483,11 +483,11 @@ struct redisCommand redisCommandTable[] = { "write fast @sortedset", 0,NULL,1,1,1,0,0,0}, - {"bzpopmin",bzpopminCommand,-2, + {"bzpopmin",bzpopminCommand,-3, "write no-script fast @sortedset @blocking", 0,NULL,1,-2,1,0,0,0}, - {"bzpopmax",bzpopmaxCommand,-2, + {"bzpopmax",bzpopmaxCommand,-3, "write no-script fast @sortedset @blocking", 0,NULL,1,-2,1,0,0,0}, @@ -4668,9 +4668,9 @@ void loadDataFromDisk(void) { if (server.aof_state == AOF_ON) { if (loadAppendOnlyFile(server.aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); - } else if (server.rdb_filename != NULL) { + } else if (server.rdb_filename != NULL || server.rdb_s3bucketpath != NULL) { rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(server.rdb_filename,&rsi) == C_OK) { + if (rdbLoad(&rsi) == C_OK) { serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds", (float)(ustime()-start)/1000000); diff --git a/src/server.h b/src/server.h index 53255584e..3fa5f90a3 100644 --- a/src/server.h +++ b/src/server.h @@ -1087,7 +1087,9 @@ struct redisServer { size_t initial_memory_usage; /* Bytes used after initialization. */ int always_show_logo; /* Show logo even for non-stdout logging. */ /* Modules */ - dict *moduleapi; /* Exported APIs dictionary for modules. */ + dict *moduleapi; /* Exported core APIs dictionary for modules. */ + dict *sharedapi; /* Like moduleapi but containing the APIs that + modules share with each other. */ list *loadmodule_queue; /* List of modules to load at startup. */ int module_blocked_pipe[2]; /* Pipe used to awake the event loop if a client blocked on a module command needs @@ -1740,6 +1742,7 @@ int compareStringObjects(robj *a, robj *b); int collateStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b); unsigned long long estimateObjectIdleTime(robj *o); +void trimStringObjectIfNeeded(robj *o); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) /* Synchronous I/O with timeout */ @@ -1967,7 +1970,6 @@ void setTypeConvert(robj *subject, int enc); void hashTypeConvert(robj *o, int enc); void hashTypeTryConversion(robj *subject, robj **argv, int start, int end); -void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2); int hashTypeExists(robj *o, sds key); int hashTypeDelete(robj *o, sds key); unsigned long hashTypeLength(const robj *o); @@ -2328,11 +2330,13 @@ void lolwutCommand(client *c); void aclCommand(client *c); #if defined(__GNUC__) +#ifndef __cplusplus void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); void free(void *ptr) __attribute__ ((deprecated)); void *malloc(size_t size) __attribute__ ((deprecated)); void *realloc(void *ptr, size_t size) __attribute__ ((deprecated)); #endif +#endif /* Debugging stuff */ void _serverAssertWithInfo(const client *c, const robj *o, const char *estr, const char *file, int line); diff --git a/src/t_hash.c b/src/t_hash.c index f5d31d966..8c418314b 100644 --- a/src/t_hash.c +++ b/src/t_hash.c @@ -615,6 +615,10 @@ void hincrbyfloatCommand(client *c) { } value += incr; + if (isnan(value) || isinf(value)) { + addReplyError(c,"increment would produce NaN or Infinity"); + return; + } char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),value,1); diff --git a/src/t_list.c b/src/t_list.c index c7350887f..3c14f44e3 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -520,7 +520,7 @@ void lremCommand(client *c) { if (removed) { signalModifiedKey(c->db,c->argv[1]); - notifyKeyspaceEvent(NOTIFY_GENERIC,"lrem",c->argv[1],c->db->id); + notifyKeyspaceEvent(NOTIFY_LIST,"lrem",c->argv[1],c->db->id); } if (listTypeLength(subject) == 0) { diff --git a/src/t_set.c b/src/t_set.c index 99ff6fb47..1652b5416 100644 --- a/src/t_set.c +++ b/src/t_set.c @@ -1064,7 +1064,8 @@ void sunionDiffGenericCommand(client *c, robj **setkeys, int setnum, sdsfree(ele); } setTypeReleaseIterator(si); - decrRefCount(dstset); + server.lazyfree_lazy_server_del ? freeObjAsync(dstset) : + decrRefCount(dstset); } else { /* If we have a target key where to store the resulting set * create this key with the result set inside */ diff --git a/src/t_stream.c b/src/t_stream.c index ebf1dfb9f..3570332d7 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1742,14 +1742,17 @@ NULL /* Everything but the "HELP" option requires a key and group name. */ if (c->argc >= 4) { o = lookupKeyWrite(c->db,c->argv[2]); - if (o) s = ptrFromObj(o); + if (o) { + if (checkType(c,o,OBJ_STREAM)) return; + s = (stream*)ptrFromObj(o); + } grpname = ptrFromObj(c->argv[3]); } /* Check for missing key/group. */ if (c->argc >= 4 && !mkstream) { /* At this point key must exist, or there is an error. */ - if (o == NULL) { + if (s == NULL) { addReplyError(c, "The XGROUP subcommand requires the key to exist. " "Note that for CREATE you may want to use the MKSTREAM " @@ -1757,8 +1760,6 @@ NULL return; } - if (checkType(c,o,OBJ_STREAM)) return; - /* Certain subcommands require the group to exist. */ if ((cg = streamLookupCG(s,grpname)) == NULL && (!strcasecmp(opt,"SETID") || @@ -1786,7 +1787,8 @@ NULL } /* Handle the MKSTREAM option now that the command can no longer fail. */ - if (s == NULL && mkstream) { + if (s == NULL) { + serverAssert(mkstream); o = createStreamObject(); dbAdd(c->db,c->argv[2],o); s = ptrFromObj(o); @@ -2284,8 +2286,13 @@ void xclaimCommand(client *c) { /* Update the consumer and idle time. */ nack->consumer = consumer; nack->delivery_time = deliverytime; - /* Set the delivery attempts counter if given. */ - if (retrycount >= 0) nack->delivery_count = retrycount; + /* Set the delivery attempts counter if given, otherwise + * autoincrement unless JUSTID option provided */ + if (retrycount >= 0) { + nack->delivery_count = retrycount; + } else if (!justid) { + nack->delivery_count++; + } /* Add the entry in the new consumer local PEL. */ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Send the reply for this entry. */ diff --git a/src/t_zset.c b/src/t_zset.c index 7fbcf3bbf..17be22433 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -2906,7 +2906,10 @@ void genericZrangebylexCommand(client *c, int reverse) { while (remaining) { if (remaining >= 3 && !strcasecmp(ptrFromObj(c->argv[pos]),"limit")) { if ((getLongFromObjectOrReply(c, c->argv[pos+1], &offset, NULL) != C_OK) || - (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) return; + (getLongFromObjectOrReply(c, c->argv[pos+2], &limit, NULL) != C_OK)) { + zslFreeLexRange(&range); + return; + } pos += 3; remaining -= 3; } else { zslFreeLexRange(&range); @@ -3140,7 +3143,10 @@ void genericZpopCommand(client *c, robj **keyv, int keyc, int where, int emitkey if (countarg) { if (getLongFromObjectOrReply(c,countarg,&count,NULL) != C_OK) return; - if (count < 0) count = 1; + if (count <= 0) { + addReply(c,shared.emptyarray); + return; + } } /* Check type and break on the first error, otherwise identify candidate. */ diff --git a/src/util.c b/src/util.c index 66d599190..783bcf83b 100644 --- a/src/util.c +++ b/src/util.c @@ -447,7 +447,7 @@ int string2l(const char *s, size_t slen, long *lval) { * a double: no spaces or other characters before or after the string * representing the number are accepted. */ int string2ld(const char *s, size_t slen, long double *dp) { - char buf[256]; + char buf[MAX_LONG_DOUBLE_CHARS]; long double value; char *eptr; diff --git a/tests/unit/hyperloglog.tcl b/tests/unit/hyperloglog.tcl index 7d36b7a35..712fcc641 100644 --- a/tests/unit/hyperloglog.tcl +++ b/tests/unit/hyperloglog.tcl @@ -115,6 +115,34 @@ start_server {tags {"hll"}} { set e } {*WRONGTYPE*} + test {Fuzzing dense/sparse encoding: Redis should always detect errors} { + for {set j 0} {$j < 1000} {incr j} { + r del hll + set items {} + set numitems [randomInt 3000] + for {set i 0} {$i < $numitems} {incr i} { + lappend items [expr {rand()}] + } + r pfadd hll {*}$items + + # Corrupt it in some random way. + for {set i 0} {$i < 5} {incr i} { + set len [r strlen hll] + set pos [randomInt $len] + set byte [randstring 1 1 binary] + r setrange hll $pos $byte + # Don't modify more bytes 50% of times + if {rand() < 0.5} break + } + + # Use the hyperloglog to check if it crashes + # Redis in some way. + catch { + r pfcount hll + } + } + } + test {PFADD, PFCOUNT, PFMERGE type checking works} { r set foo bar catch {r pfadd foo 1} e diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 1557082a2..676896a75 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -436,8 +436,11 @@ start_server { test "$pop: with non-integer timeout" { set rd [redis_deferring_client] - $rd $pop blist1 1.1 - assert_error "ERR*not an integer*" {$rd read} + r del blist1 + $rd $pop blist1 0.1 + r rpush blist1 foo + assert_equal {blist1 foo} [$rd read] + assert_equal 0 [r exists blist1] } test "$pop: with zero timeout should block indefinitely" { diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 13981cc22..34d4061c2 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -195,6 +195,49 @@ start_server { assert_equal "" [lindex $reply 0] } + test {XCLAIM without JUSTID increments delivery count} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Client 1 reads item 1 from the stream without acknowledgements. + # Client 2 then claims pending item 1 from the PEL of client 1 + set reply [ + r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + ] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client2 10 $id1 + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1] eq {a 1}} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} + + # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client3 10 $id1 JUSTID + ] + assert {[llength $reply] == 1} + assert {[lindex $reply 0] eq $id1} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host] diff --git a/utils/install_server.sh b/utils/install_server.sh index 9934bca92..75be74ceb 100755 --- a/utils/install_server.sh +++ b/utils/install_server.sh @@ -43,6 +43,9 @@ # # /!\ This script should be run as root # +# NOTE: This script will not work on Mac OSX. +# It supports Debian and Ubuntu Linux. +# ################################################################################ die () {