diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cd8196756..9ca185e89 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,6 +10,7 @@ jobs: - uses: actions/checkout@v1 - name: make run: | + sudo apt-get update sudo apt-get -y install uuid-dev libcurl4-openssl-dev make BUILD_TLS=yes -j2 - name: gen-cert @@ -34,6 +35,7 @@ jobs: - uses: actions/checkout@v1 - name: make -j2 run: | + sudo apt-get update sudo apt-get -y install uuid-dev libcurl4-openssl-dev make -j2 @@ -50,6 +52,7 @@ jobs: - uses: actions/checkout@v1 - name: make run: | + sudo apt-get update sudo apt-get -y install uuid-dev libcurl4-openssl-dev make MALLOC=libc -j2 diff --git a/.github/workflows/endurance.yml b/.github/workflows/endurance.yml index ba2d3a2d4..de55bf8d0 100644 --- a/.github/workflows/endurance.yml +++ b/.github/workflows/endurance.yml @@ -17,6 +17,6 @@ jobs: make -j8 - name: test-multithread (5X) run: | - sudo apt-get install -y tcl8.5 + sudo apt-get install -y tcl tcl-tls ./runtest --loopn 5 --config server-threads 3 --clients 5 --endurance diff --git a/README.md b/README.md index 88f5f0f7f..fe0030dff 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ ![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg) ![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable) -[![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc) ##### New! Want to extend KeyDB with Javascript? Try [ModJS](https://github.com/JohnSully/ModJS) diff --git a/TLS.md b/TLS.md index ae1a066db..1afa2e8fa 100644 --- a/TLS.md +++ b/TLS.md @@ -56,8 +56,6 @@ Note that unlike Redis, KeyDB fully supports multithreading of TLS connections. To-Do List ---------- -- [ ] Add session caching support. Check if/how it's handled by clients to - assess how useful/important it is. - [ ] redis-benchmark support. The current implementation is a mix of using hiredis for parsing and basic networking (establishing connections), but directly manipulating sockets for most actions. This will need to be cleaned diff --git a/keydb.conf b/keydb.conf index 8979ee47c..e8ca634b7 100644 --- a/keydb.conf +++ b/keydb.conf @@ -199,6 +199,22 @@ tcp-keepalive 300 # # tls-prefer-server-ciphers yes +# By default, TLS session caching is enabled to allow faster and less expensive +# reconnections by clients that support it. Use the following directive to disable +# caching. +# +# tls-session-caching no + +# Change the default number of TLS sessions cached. A zero value sets the cache +# to unlimited size. The default size is 20480. +# +# tls-session-cache-size 5000 + +# Change the default timeout of cached TLS sessions. The default timeout is 300 +# seconds. +# +# tls-session-cache-timeout 60 + ################################# GENERAL ##################################### # By default KeyDB does not run as a daemon. Use 'yes' if you need it. @@ -401,6 +417,20 @@ dir ./ # replica-serve-stale-data yes +# Active Replicas will allow read only data access while loading remote RDBs +# provided they are permitted to serve stale data. As an option you may also +# permit them to accept write commands. This is an EXPERIMENTAL feature and +# may result in commands not being fully synchronized +# +# allow-write-during-load no + +# You can modify the number of masters necessary to form a replica quorum when +# multi-master is enabled and replica-serve-stale-data is "no". By default +# this is set to -1 which implies the number of known masters (e.g. those +# you added with replicaof) +# +# replica-quorum -1 + # You can configure a replica instance to accept writes or not. Writing against # a replica instance may be useful to store some ephemeral data (because data # written on a replica will be easily deleted after resync with the master) but diff --git a/src/ae.cpp b/src/ae.cpp index 44f302e69..789a6888b 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -258,9 +258,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, if (fSynchronous) { + { std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; + } delete cmd.pctl; } @@ -300,7 +302,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch cmd.fLock = fLock; if (fSynchronous) { - cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); + cmd.pctl = new (MALLOC_LOCAL) aeCommandControl; cmd.pctl->mutexcv.lock(); } @@ -311,9 +313,11 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch int ret = AE_OK; if (fSynchronous) { + { std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; + } delete cmd.pctl; } return ret; diff --git a/src/aelocker.h b/src/aelocker.h index ef757d2d2..e854f907b 100644 --- a/src/aelocker.h +++ b/src/aelocker.h @@ -9,11 +9,14 @@ class AeLocker { } - void arm(client *c) // if a client is passed, then the client is already locked + void arm(client *c, bool fIfNeeded = false) // if a client is passed, then the client is already locked { if (m_fArmed) return; + if (fIfNeeded && aeThreadOwnsLock()) + return; + serverAssertDebug(!GlobalLocksAcquired()); if (c != nullptr) diff --git a/src/aof.cpp b/src/aof.cpp index e8d4930b8..e37bc67d6 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1426,7 +1426,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr; - robj key, *o; + redisObjectStack key; + robj *o = nullptr; keystr = (sds)dictGetKey(de); o = (robj*)dictGetVal(de); diff --git a/src/config.cpp b/src/config.cpp index 75360c4b7..fe1d5cd41 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2180,7 +2180,7 @@ static int updateTlsCfg(char *val, char *prev, const char **err) { UNUSED(prev); UNUSED(err); if (tlsConfigure(&g_pserver->tls_ctx_config) == C_ERR) { - *err = "Unable to configure tls-cert-file. Check server logs."; + *err = "Unable to update TLS configuration. Check server logs."; return 0; } return 1; @@ -2190,6 +2190,12 @@ static int updateTlsCfgBool(int val, int prev, const char **err) { UNUSED(prev); return updateTlsCfg(NULL, NULL, err); } + +static int updateTlsCfgInt(long long val, long long prev, const char **err) { + UNUSED(val); + UNUSED(prev); + return updateTlsCfg(NULL, NULL, err); +} #endif /* USE_OPENSSL */ int fDummy = false; @@ -2230,6 +2236,8 @@ standardConfig configs[] = { createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG, g_pserver->aof_enabled, 0, NULL, updateAppendonly), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("multi-master-no-forward", NULL, MODIFIABLE_CONFIG, cserver.multimaster_no_forward, 0, validateMultiMasterNoForward, NULL), + createBoolConfig("allow-write-during-load", NULL, MODIFIABLE_CONFIG, g_pserver->fWriteDuringActiveLoad, 0, NULL, NULL), + createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, fDummy, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), @@ -2287,6 +2295,7 @@ standardConfig configs[] = { createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 50, INTEGER_CONFIG, NULL, NULL), + createIntConfig("replica-quorum", NULL, MODIFIABLE_CONFIG, -1, INT_MAX, g_pserver->repl_quorum, -1, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), @@ -2324,10 +2333,13 @@ standardConfig configs[] = { #ifdef USE_OPENSSL createIntConfig("tls-port", NULL, IMMUTABLE_CONFIG, 0, 65535, g_pserver->tls_port, 0, INTEGER_CONFIG, NULL, NULL), /* TCP port. */ + createIntConfig("tls-session-cache-size", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->tls_ctx_config.session_cache_size, 20*1024, INTEGER_CONFIG, NULL, updateTlsCfgInt), + createIntConfig("tls-session-cache-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->tls_ctx_config.session_cache_timeout, 300, INTEGER_CONFIG, NULL, updateTlsCfgInt), createBoolConfig("tls-cluster", NULL, MODIFIABLE_CONFIG, g_pserver->tls_cluster, 0, NULL, NULL), createBoolConfig("tls-replication", NULL, MODIFIABLE_CONFIG, g_pserver->tls_replication, 0, NULL, NULL), createBoolConfig("tls-auth-clients", NULL, MODIFIABLE_CONFIG, g_pserver->tls_auth_clients, 1, NULL, NULL), createBoolConfig("tls-prefer-server-ciphers", NULL, MODIFIABLE_CONFIG, g_pserver->tls_ctx_config.prefer_server_ciphers, 0, NULL, updateTlsCfgBool), + createBoolConfig("tls-session-caching", NULL, MODIFIABLE_CONFIG, g_pserver->tls_ctx_config.session_caching, 1, NULL, updateTlsCfgBool), createStringConfig("tls-cert-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.cert_file, NULL, NULL, updateTlsCfg), createStringConfig("tls-key-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.key_file, NULL, NULL, updateTlsCfg), createStringConfig("tls-dh-params-file", NULL, MODIFIABLE_CONFIG, EMPTY_STRING_IS_NULL, g_pserver->tls_ctx_config.dh_params_file, NULL, NULL, updateTlsCfg), diff --git a/src/db.cpp b/src/db.cpp index 11816262b..0beee32b3 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -93,7 +93,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) { updateDbValAccess(de, flags); if (flags & LOOKUP_UPDATEMVCC) { - val->mvcc_tstamp = getMvccTstamp(); + setMvccTstamp(val, getMvccTstamp()); } return val; } else { @@ -206,7 +206,9 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); int retval = dictAdd(db->pdict, copy, val); - val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); + uint64_t mvcc = getMvccTstamp(); + setMvccTstamp(key, mvcc); + setMvccTstamp(val, mvcc); if (retval == DICT_OK) { @@ -256,7 +258,7 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd if (fUpdateMvcc) { if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) val = dupStringObject(val); - val->mvcc_tstamp = getMvccTstamp(); + setMvccTstamp(val, getMvccTstamp()); } dictSetVal(db->pdict, de, val); @@ -291,12 +293,12 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) return (dbAddCore(db, key, val) == DICT_OK); robj *old = (robj*)dictGetVal(de); - if (old->mvcc_tstamp <= val->mvcc_tstamp) + if (mvccFromObj(old) <= mvccFromObj(val)) { dbOverwriteCore(db, de, key, val, false, true); return true; } - + return false; } else @@ -1393,7 +1395,7 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) db->setexpire->insert(e); } - int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; + int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) rememberSlaveKeyWithExpire(db,key); } @@ -1430,7 +1432,7 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) ((robj*)dictGetVal(kde))->SetFExpires(true); - int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; + int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0 && !g_pserver->fActiveReplica; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) rememberSlaveKeyWithExpire(db,key); } @@ -1486,7 +1488,6 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey) { robj *argv[3]; - robj objT; redisCommand *cmd = nullptr; switch (type) { diff --git a/src/defrag.cpp b/src/defrag.cpp index 3547cc4da..7463a818e 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -55,7 +55,8 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); * returns NULL in case the allocatoin wasn't moved. * when it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -void* activeDefragAlloc(void *ptr) { +template +TPTR* activeDefragAlloc(TPTR *ptr) { size_t size; void *newptr; if(!je_get_defrag_hint(ptr)) { @@ -70,7 +71,14 @@ void* activeDefragAlloc(void *ptr) { newptr = zmalloc_no_tcache(size); memcpy(newptr, ptr, size); zfree_no_tcache(ptr); - return newptr; + return (TPTR*)newptr; +} + +template<> +robj* activeDefragAlloc(robj *o) { + void *pvSrc = allocPtrFromObj(o); + void *pvDst = activeDefragAlloc(pvSrc); + return objFromAllocPtr(pvDst); } /*Defrag helper for sds strings diff --git a/src/expire.cpp b/src/expire.cpp index b7f648117..9da8ddc40 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -80,7 +80,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { robj *val = (robj*)dictGetVal(de); int deleted = 0; - robj objKey; + redisObjectStack objKey; initStaticStringObject(objKey, (char*)e.key()); bool fTtlChanged = false; @@ -145,7 +145,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { serverAssert(false); } - robj objSubkey; + redisObjectStack objSubkey; initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get()); propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); @@ -745,3 +745,90 @@ void touchCommand(client *c) { addReplyLongLong(c,touched); } +expireEntryFat::~expireEntryFat() +{ + if (m_dictIndex != nullptr) + dictRelease(m_dictIndex); +} + +void expireEntryFat::createIndex() +{ + serverAssert(m_dictIndex == nullptr); + m_dictIndex = dictCreate(&keyptrDictType, nullptr); + + for (auto &entry : m_vecexpireEntries) + { + if (entry.spsubkey != nullptr) + { + dictEntry *de = dictAddRaw(m_dictIndex, (void*)entry.spsubkey.get(), nullptr); + de->v.s64 = entry.when; + } + } +} + +void expireEntryFat::expireSubKey(const char *szSubkey, long long when) +{ + if (m_vecexpireEntries.size() >= INDEX_THRESHOLD && m_dictIndex == nullptr) + createIndex(); + + // First check if the subkey already has an expiration + if (m_dictIndex != nullptr && szSubkey != nullptr) + { + dictEntry *de = dictFind(m_dictIndex, szSubkey); + if (de != nullptr) + { + auto itr = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), de->v.u64); + while (itr != m_vecexpireEntries.end() && itr->when == de->v.s64) + { + bool fFound = false; + if (szSubkey == nullptr && itr->spsubkey == nullptr) { + fFound = true; + } else if (szSubkey != nullptr && itr->spsubkey != nullptr && sdscmp((sds)itr->spsubkey.get(), (sds)szSubkey) == 0) { + fFound = true; + } + if (fFound) { + m_vecexpireEntries.erase(itr); + dictDelete(m_dictIndex, szSubkey); + break; + } + ++itr; + } + } + } + else + { + for (auto &entry : m_vecexpireEntries) + { + if (szSubkey != nullptr) + { + // if this is a subkey expiry then its not a match if the expireEntry is either for the + // primary key or a different subkey + if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0) + continue; + } + else + { + if (entry.spsubkey != nullptr) + continue; + } + m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data())); + break; + } + } + auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when); + const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr; + auto itr = m_vecexpireEntries.emplace(itrInsert, when, subkey); + if (m_dictIndex && subkey) { + dictEntry *de = dictAddRaw(m_dictIndex, (void*)itr->spsubkey.get(), nullptr); + de->v.s64 = when; + } +} + +void expireEntryFat::popfrontExpireEntry() +{ + if (m_dictIndex != nullptr && m_vecexpireEntries.begin()->spsubkey) { + int res = dictDelete(m_dictIndex, (void*)m_vecexpireEntries.begin()->spsubkey.get()); + serverAssert(res == DICT_OK); + } + m_vecexpireEntries.erase(m_vecexpireEntries.begin()); +} \ No newline at end of file diff --git a/src/expire.h b/src/expire.h new file mode 100644 index 000000000..d002d6383 --- /dev/null +++ b/src/expire.h @@ -0,0 +1,220 @@ +#pragma once + +class expireEntryFat +{ + friend class expireEntry; + static const int INDEX_THRESHOLD = 16; +public: + struct subexpireEntry + { + long long when; + std::unique_ptr spsubkey; + + subexpireEntry(long long when, const char *subkey) + : when(when), spsubkey(subkey, sdsfree) + {} + + bool operator<(long long when) const noexcept { return this->when < when; } + bool operator<(const subexpireEntry &se) { return this->when < se.when; } + }; + +private: + sds m_keyPrimary; + std::vector m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key + dict *m_dictIndex = nullptr; + + void createIndex(); +public: + expireEntryFat(sds keyPrimary) + : m_keyPrimary(keyPrimary) + {} + ~expireEntryFat(); + + long long when() const noexcept { return m_vecexpireEntries.front().when; } + const char *key() const noexcept { return m_keyPrimary; } + + bool operator<(long long when) const noexcept { return this->when() < when; } + + void expireSubKey(const char *szSubkey, long long when); + + bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } + const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } + void popfrontExpireEntry(); + const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; } + size_t size() const noexcept { return m_vecexpireEntries.size(); } +}; + +class expireEntry { + union + { + sds m_key; + expireEntryFat *m_pfatentry; + } u; + long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer + +public: + class iter + { + friend class expireEntry; + expireEntry *m_pentry = nullptr; + size_t m_idx = 0; + + public: + iter(expireEntry *pentry, size_t idx) + : m_pentry(pentry), m_idx(idx) + {} + + iter &operator++() { ++m_idx; return *this; } + + const char *subkey() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].spsubkey.get(); + return nullptr; + } + long long when() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].when; + return m_pentry->when(); + } + + bool operator!=(const iter &other) + { + return m_idx != other.m_idx; + } + + const iter &operator*() const { return *this; } + }; + + expireEntry(sds key, const char *subkey, long long when) + { + if (subkey != nullptr) + { + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key); + u.m_pfatentry->expireSubKey(subkey, when); + } + else + { + u.m_key = key; + m_when = when; + } + } + + expireEntry(expireEntryFat *pfatentry) + { + u.m_pfatentry = pfatentry; + m_when = LLONG_MIN; + } + + expireEntry(expireEntry &&e) + { + u.m_key = e.u.m_key; + m_when = e.m_when; + e.u.m_key = (char*)key(); // we do this so it can still be found in the set + e.m_when = 0; + } + + ~expireEntry() + { + if (FFat()) + delete u.m_pfatentry; + } + + void setKeyUnsafe(sds key) + { + if (FFat()) + u.m_pfatentry->m_keyPrimary = key; + else + u.m_key = key; + } + + inline bool FFat() const noexcept { return m_when == LLONG_MIN; } + expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } + + + bool operator==(const char *key) const noexcept + { + return this->key() == key; + } + + bool operator<(const expireEntry &e) const noexcept + { + return when() < e.when(); + } + bool operator<(long long when) const noexcept + { + return this->when() < when; + } + + const char *key() const noexcept + { + if (FFat()) + return u.m_pfatentry->key(); + return u.m_key; + } + long long when() const noexcept + { + if (FFat()) + return u.m_pfatentry->when(); + return m_when; + } + + void update(const char *subkey, long long when) + { + if (!FFat()) + { + if (subkey == nullptr) + { + m_when = when; + return; + } + else + { + // we have to upgrade to a fat entry + long long whenT = m_when; + sds keyPrimary = u.m_key; + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); + u.m_pfatentry->expireSubKey(nullptr, whenT); + // at this point we're fat so fall through + } + } + u.m_pfatentry->expireSubKey(subkey, when); + } + + iter begin() { return iter(this, 0); } + iter end() + { + if (FFat()) + return iter(this, u.m_pfatentry->size()); + return iter(this, 1); + } + + void erase(iter &itr) + { + if (!FFat()) + throw -1; // assert + pfatentry()->m_vecexpireEntries.erase( + pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); + } + + bool FGetPrimaryExpire(long long *pwhen) + { + *pwhen = -1; + for (auto itr : *this) + { + if (itr.subkey() == nullptr) + { + *pwhen = itr.when(); + return true; + } + } + return false; + } + + explicit operator const char*() const noexcept { return key(); } + explicit operator long long() const noexcept { return when(); } +}; +typedef semiorderedset expireset; \ No newline at end of file diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 60eb653bf..4cada72cc 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -35,7 +35,11 @@ #include #include #include +#ifdef __FreeBSD__ +#include +#else #include +#endif #include #include #ifdef __linux__ @@ -167,7 +171,12 @@ extern "C" pid_t gettid() #else if (pidCache == -1) { uint64_t tidT; +#ifdef __FreeBSD__ +// Check https://github.com/ClickHouse/ClickHouse/commit/8d51824ddcb604b6f179a0216f0d32ba5612bd2e + tidT = pthread_getthreadid_np(); +#else pthread_threadid_np(nullptr, &tidT); +#endif serverAssert(tidT < UINT_MAX); pidCache = (int)tidT; } @@ -343,7 +352,9 @@ extern "C" void fastlock_lock(struct fastlock *lock) unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); unsigned cloops = 0; ticket ticketT; - unsigned loopLimit = g_fHighCpuPressure ? 0x10000 : 0x100000; + int fHighPressure; + __atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED); + unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000; for (;;) { @@ -478,19 +489,22 @@ void fastlock_auto_adjust_waits() { #ifdef __linux__ struct sysinfo sysinf; - auto fHighPressurePrev = g_fHighCpuPressure; + int fHighPressurePrev, fHighPressureNew; + __atomic_load(&g_fHighCpuPressure, &fHighPressurePrev, __ATOMIC_RELAXED); + fHighPressureNew = fHighPressurePrev; memset(&sysinf, 0, sizeof sysinf); if (!sysinfo(&sysinf)) { auto avgCoreLoad = sysinf.loads[0] / get_nprocs(); - g_fHighCpuPressure = (avgCoreLoad > ((1 << SI_LOAD_SHIFT) * 0.9)); - if (g_fHighCpuPressure) + int fHighPressureNew = (avgCoreLoad > ((1 << SI_LOAD_SHIFT) * 0.9)); + __atomic_store(&g_fHighCpuPressure, &fHighPressureNew, __ATOMIC_RELEASE); + if (fHighPressureNew) serverLog(!fHighPressurePrev ? 3 /*LL_WARNING*/ : 1 /* LL_VERBOSE */, "NOTICE: Detuning locks due to high load per core: %.2f%%", avgCoreLoad / (double)(1 << SI_LOAD_SHIFT)*100.0); } - if (!g_fHighCpuPressure && fHighPressurePrev) { + if (!fHighPressureNew && fHighPressurePrev) { serverLog(3 /*LL_WARNING*/, "NOTICE: CPU pressure reduced"); } #else g_fHighCpuPressure = g_fTestMode; #endif -} \ No newline at end of file +} diff --git a/src/networking.cpp b/src/networking.cpp index 3aab1e62c..0616534ee 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -56,7 +56,7 @@ size_t getStringObjectSdsUsedMemory(robj *o) { serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); switch(o->encoding) { case OBJ_ENCODING_RAW: return sdsZmallocSize((sds)ptrFromObj(o)); - case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj); + case OBJ_ENCODING_EMBSTR: return zmalloc_size(allocPtrFromObj(o))-sizeof(robj); default: return 0; /* Just integer encoding for now. */ } } @@ -1264,9 +1264,10 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) void acceptOnThread(connection *conn, int flags, char *cip) { int ielCur = ielFromEventLoop(serverTL->el); + bool fBootLoad = (g_pserver->loading == LOADING_BOOT); int ielTarget = 0; - if (g_pserver->loading) + if (fBootLoad) { ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active } @@ -1290,10 +1291,10 @@ void acceptOnThread(connection *conn, int flags, char *cip) szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); } - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT] { + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT, fBootLoad] { connMarshalThread(conn); acceptCommonHandler(conn,flags,szT,ielTarget); - if (!g_fTestMode && !g_pserver->loading) + if (!g_fTestMode && !fBootLoad) rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); zfree(szT); }); @@ -1302,7 +1303,7 @@ void acceptOnThread(connection *conn, int flags, char *cip) return; // If res != AE_OK we can still try to accept on the local thread } - if (!g_fTestMode && !g_pserver->loading) + if (!g_fTestMode && !fBootLoad) rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); aeAcquireLock(); @@ -2259,7 +2260,7 @@ int processMultibulkBuffer(client *c) { * 1. The client is reset unless there are reasons to avoid doing it. * 2. In the case of master clients, the replication offset is updated. * 3. Propagate commands we got from our master to replicas down the line. */ -void commandProcessed(client *c) { +void commandProcessed(client *c, int flags) { long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ @@ -2287,7 +2288,7 @@ void commandProcessed(client *c) { ae.arm(c); long long applied = c->reploff - prev_offset; if (applied) { - if (!g_pserver->fActiveReplica) + if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE)) { replicationFeedSlavesFromMasterStream(g_pserver->slaves, c->pending_querybuf, applied); @@ -2308,9 +2309,10 @@ void commandProcessed(client *c) { int processCommandAndResetClient(client *c, int flags) { int deadclient = 0; serverTL->current_client = c; - AeLocker locker; - if (processCommand(c, flags, locker) == C_OK) { - commandProcessed(c); + serverAssert(GlobalLocksAcquired()); + + if (processCommand(c, flags) == C_OK) { + commandProcessed(c, flags); } if (serverTL->current_client == NULL) deadclient = 1; serverTL->current_client = NULL; @@ -2460,14 +2462,26 @@ void readQueryFromClient(connection *conn) { return; } - /* There is more data in the client input buffer, continue parsing it - * in case to check if there is a full command to execute. */ - processInputBuffer(c, CMD_CALL_FULL); + serverTL->vecclientsProcess.push_back(c); +} + +void processClients() +{ + serverAssert(GlobalLocksAcquired()); + + for (client *c : serverTL->vecclientsProcess) { + /* There is more data in the client input buffer, continue parsing it + * in case to check if there is a full command to execute. */ + std::unique_lock ul(c->lock); + processInputBuffer(c, CMD_CALL_FULL); + } + if (listLength(serverTL->clients_pending_asyncwrite)) { - aelock.arm(c); ProcessPendingAsyncWrites(); } + + serverTL->vecclientsProcess.clear(); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -2843,7 +2857,7 @@ NULL if (c->name) addReplyBulk(c,c->name); else - addReplyNull(c, shared.nullbulk); + addReplyNull(c); } else if (!strcasecmp(szFromObj(c->argv[1]),"pause") && c->argc == 3) { /* CLIENT PAUSE */ long long duration; @@ -3425,7 +3439,7 @@ void processEventsWhileBlocked(int iel) { aeReleaseLock(); - serverAssertDebug(!GlobalLocksAcquired()); + serverAssert(!GlobalLocksAcquired()); try { while (iterations--) { diff --git a/src/object.cpp b/src/object.cpp index 091e3d4da..4b32c5a4d 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -41,12 +41,15 @@ /* ===================== Creation and parsing of objects ==================== */ robj *createObject(int type, void *ptr) { - robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED); + size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0; + char *oB = (char*)zcalloc(sizeof(robj)+mvccExtraBytes, MALLOC_SHARED); + robj *o = reinterpret_cast(oB + mvccExtraBytes); + o->type = type; o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; o->setrefcount(1); - o->mvcc_tstamp = OBJ_MVCC_INVALID; + setMvccTstamp(o, OBJ_MVCC_INVALID); /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ @@ -95,13 +98,16 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { size_t allocsize = sizeof(struct sdshdr8)+len+1; if (allocsize < sizeof(void*)) allocsize = sizeof(void*); - robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED); + + size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0; + char *oB = (char*)zcalloc(sizeof(robj)+allocsize-sizeof(redisObject::m_ptr)+mvccExtraBytes, MALLOC_SHARED); + robj *o = reinterpret_cast(oB + mvccExtraBytes); struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr); o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->setrefcount(1); - o->mvcc_tstamp = OBJ_MVCC_INVALID; + setMvccTstamp(o, OBJ_MVCC_INVALID); if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; @@ -129,8 +135,9 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { * * The current limit of 52 is chosen so that the biggest string object * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 -static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); +#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 52 + +//static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); robj *createStringObject(const char *ptr, size_t len) { if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) return createEmbeddedStringObject(ptr,len); @@ -390,7 +397,11 @@ void decrRefCount(robj_roptr o) { case OBJ_CRON: freeCronObject(o); break; default: serverPanic("Unknown object type"); break; } - zfree(o.unsafe_robjcast()); + if (g_pserver->fActiveReplica) { + zfree(reinterpret_cast(o.unsafe_robjcast())-1); + } else { + zfree(o.unsafe_robjcast()); + } } else { if (prev <= 0) serverPanic("decrRefCount against refcount <= 0"); } @@ -1320,7 +1331,8 @@ NULL } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == NULL) return; - addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); + uint64_t mvcc = mvccFromObj(o); + addReplyLongLong(c, (g_pserver->mstime - (mvcc >> MVCC_MS_SHIFT)) / 1000); } else { addReplySubcommandSyntaxError(c); } @@ -1362,7 +1374,7 @@ NULL } } if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) { - addReplyNull(c, shared.nullbulk); + addReplyNull(c); return; } size_t usage = objectComputeSize((robj*)dictGetVal(de),samples); @@ -1500,3 +1512,39 @@ void redisObject::setrefcount(unsigned ref) serverAssert(!FExpires()); refcount.store(ref, std::memory_order_relaxed); } + +redisObjectStack::redisObjectStack() +{ + // We need to ensure the Extended Object is first in the class layout + serverAssert(reinterpret_cast(static_cast(this)) != reinterpret_cast(this)); +} + +void *allocPtrFromObj(robj_roptr o) { + if (g_pserver->fActiveReplica) + return reinterpret_cast(o.unsafe_robjcast()) - 1; + return o.unsafe_robjcast(); +} + +robj *objFromAllocPtr(void *pv) { + if (g_pserver->fActiveReplica) { + return reinterpret_cast(reinterpret_cast(pv)+1); + } + return reinterpret_cast(pv); +} + +uint64_t mvccFromObj(robj_roptr o) +{ + if (g_pserver->fActiveReplica) { + redisObjectExtended *oe = reinterpret_cast(o.unsafe_robjcast()) - 1; + return oe->mvcc_tstamp; + } + return OBJ_MVCC_INVALID; +} + +void setMvccTstamp(robj *o, uint64_t mvcc) +{ + if (!g_pserver->fActiveReplica) + return; + redisObjectExtended *oe = reinterpret_cast(o) - 1; + oe->mvcc_tstamp = mvcc; +} \ No newline at end of file diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 07e0e85f6..3ccbb6a66 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -89,7 +89,7 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel) { if (channel) addReplyBulk(c,channel); else - addReplyNull(c, shared.nullbulk); + addReplyNull(c); addReplyLongLong(c,clientSubscriptionsCount(c)); } @@ -117,7 +117,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { if (pattern) addReplyBulk(c,pattern); else - addReplyNull(c, shared.nullbulk); + addReplyNull(c); addReplyLongLong(c,clientSubscriptionsCount(c)); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 5ee6baa25..71ff6030c 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1089,8 +1089,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { } char szT[32]; - snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); - if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + if (g_pserver->fActiveReplica) { + snprintf(szT, 32, "%" PRIu64, mvccFromObj(val)); + if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + } /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -1144,7 +1146,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o) { - robj key; + redisObjectStack key; initStaticStringObject(key,(char*)keystr); expireEntry *pexpire = getExpire(db, &key); @@ -1997,7 +1999,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { exit(1); } RedisModuleIO io; - robj keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); moduleInitIOContext(io,mt,rdb,&keyobj); io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2; @@ -2046,7 +2048,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { return NULL; } - o->mvcc_tstamp = mvcc_tstamp; + setMvccTstamp(o, mvcc_tstamp); serverAssert(!o->FExpires()); return o; } @@ -2055,7 +2057,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { * needed to provide loading stats. */ void startLoading(size_t size, int rdbflags) { /* Load the DB */ - g_pserver->loading = 1; + g_pserver->loading = (rdbflags & RDBFLAGS_REPLICATION) ? LOADING_REPLICATION : LOADING_BOOT; g_pserver->loading_start_time = time(NULL); g_pserver->loading_loaded_bytes = 0; g_pserver->loading_total_bytes = size; @@ -2314,7 +2316,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else { - redisObject keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); decrRefCount(subexpireKey); @@ -2398,14 +2400,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; goto eoferr; } - bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; + bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the replica. */ - robj keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; if (fStaleMvccKey || fExpiredKey) { diff --git a/src/replication.cpp b/src/replication.cpp index 2299a7530..beab02809 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1166,6 +1166,7 @@ void processReplconfUuid(client *c, robj *arg) * full resync. */ void replconfCommand(client *c) { int j; + bool fCapaCommand = false; if ((c->argc % 2) == 0) { /* Number of arguments must be odd to make sure that every @@ -1176,6 +1177,7 @@ void replconfCommand(client *c) { /* Process every option-value pair. */ for (j = 1; j < c->argc; j+=2) { + fCapaCommand = false; if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) { long port; @@ -1200,6 +1202,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_PSYNC2; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; + + fCapaCommand = true; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { /* REPLCONF ACK is used by replica to inform the master the amount * of replication stream that it processed so far. It is an @@ -1242,7 +1246,16 @@ void replconfCommand(client *c) { return; } } - addReply(c,shared.ok); + + if (fCapaCommand) { + sds reply = sdsnew("+OK"); + if (g_pserver->fActiveReplica) + reply = sdscat(reply, " active-replica"); + reply = sdscat(reply, "\r\n"); + addReplySds(c, reply); + } else { + addReply(c,shared.ok); + } } /* This function puts a replica in the online state, and should be called just @@ -2557,6 +2570,30 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read return PSYNC_NOT_SUPPORTED; } +void parseMasterCapa(redisMaster *mi, sds strcapa) +{ + if (sdslen(strcapa) < 1 || strcapa[0] != '+') + return; + + char *szStart = strcapa + 1; // skip the + + char *pchEnd = szStart; + + mi->isActive = false; + for (;;) + { + if (*pchEnd == ' ' || *pchEnd == '\0') { + // Parse the word + if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) { + mi->isActive = true; + } + szStart = pchEnd + 1; + } + if (*pchEnd == '\0') + break; + ++pchEnd; + } +} + /* This handler fires when the non blocking connect was able to * establish a connection with the master. */ void syncWithMaster(connection *conn) { @@ -2750,16 +2787,8 @@ void syncWithMaster(connection *conn) { * * The master will ignore capabilities it does not understand. */ if (mi->repl_state == REPL_STATE_SEND_CAPA) { - if (g_pserver->fActiveReplica) - { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", - "capa","eof","capa","psync2","capa","activeExpire",NULL); - } - else - { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", - "capa","eof","capa","psync2",NULL); - } + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF", + "capa","eof","capa","psync2","capa","activeExpire",NULL); if (err) goto write_error; sdsfree(err); mi->repl_state = REPL_STATE_RECEIVE_CAPA; @@ -2774,6 +2803,8 @@ void syncWithMaster(connection *conn) { if (err[0] == '-') { serverLog(LL_NOTICE,"(Non critical) Master does not understand " "REPLCONF capa: %s", err); + } else { + parseMasterCapa(mi, err); } sdsfree(err); mi->repl_state = REPL_STATE_SEND_PSYNC; @@ -4007,12 +4038,20 @@ int FBrokenLinkToMaster() listNode *ln; listRewind(g_pserver->masters, &li); + int connected = 0; while ((ln = listNext(&li))) { redisMaster *mi = (redisMaster*)listNodeValue(ln); - if (mi->repl_state != REPL_STATE_CONNECTED) - return true; + if (mi->repl_state == REPL_STATE_CONNECTED) + ++connected; } + + if (g_pserver->repl_quorum < 0) { + return connected < (int)listLength(g_pserver->masters); + } else { + return connected < g_pserver->repl_quorum; + } + return false; } diff --git a/src/server.cpp b/src/server.cpp index 95b805363..17aec83d7 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1735,6 +1735,18 @@ void clientsCron(int iel) { freeClientsInAsyncFreeQueue(iel); } +bool expireOwnKeys() +{ + if (iAmMaster()) { + return true; + } else if (!g_pserver->fActiveReplica && (listLength(g_pserver->masters) == 1)) { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + if (mi->isActive) + return true; + } + return false; +} + /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ @@ -1742,7 +1754,7 @@ void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ if (g_pserver->active_expire_enabled) { - if (iAmMaster()) { + if (expireOwnKeys()) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); } else { expireSlaveKeys(); @@ -2185,6 +2197,7 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData } extern int ProcessingEventsWhileBlocked; +void processClients(); /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep @@ -2203,6 +2216,7 @@ extern int ProcessingEventsWhileBlocked; void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); int iel = ielFromEventLoop(eventLoop); + processClients(); /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2287,6 +2301,7 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) /* Try to process pending commands for clients that were just unblocked. */ aeAcquireLock(); + processClients(); if (listLength(g_pserver->rgthreadvar[iel].unblocked_clients)) { processUnblockedClients(iel); } @@ -2458,6 +2473,7 @@ void initMasterInfo(redisMaster *master) master->cached_master = NULL; master->master_initial_offset = -1; + master->isActive = false; master->repl_state = REPL_STATE_NONE; master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ @@ -3548,7 +3564,7 @@ void call(client *c, int flags) { !(flags & CMD_CALL_PROPAGATE_AOF)) propagate_flags &= ~PROPAGATE_AOF; - if (c->cmd->flags & CMD_SKIP_PROPOGATE) + if ((c->cmd->flags & CMD_SKIP_PROPOGATE) && g_pserver->fActiveReplica) propagate_flags &= ~PROPAGATE_REPL; /* Call propagate() only if at least one of AOF / replication @@ -3633,12 +3649,12 @@ void call(client *c, int flags) { * If C_OK is returned the client is still alive and valid and * other operations can be performed by the caller. Otherwise * if C_ERR is returned the client was destroyed (i.e. after QUIT). */ -int processCommand(client *c, int callFlags, AeLocker &locker) { +int processCommand(client *c, int callFlags) { AssertCorrectThread(c); + serverAssert(GlobalLocksAcquired()); if (moduleHasCommandFilters()) { - locker.arm(c); moduleCallCommandFilters(c); } @@ -3690,9 +3706,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { /* Check if the user can run this command according to the current * ACLs. */ - if (c->puser && !(c->puser->flags & USER_FLAG_ALLCOMMANDS)) - locker.arm(c); // ACLs require the lock - int acl_keypos; int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); if (acl_retval != ACL_OK) { @@ -3720,7 +3733,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 && c->cmd->proc != execCommand)) { - locker.arm(c); int hashslot; int error_code; clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc, @@ -3735,9 +3747,6 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { return C_OK; } } - - if (!locker.isArmed()) - locker.arm(c); incrementMvccTstamp(); @@ -3853,8 +3862,12 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ if (g_pserver->loading && !(c->cmd->flags & CMD_LOADING)) { - addReply(c, shared.loadingerr); - return C_OK; + /* Active Replicas can execute read only commands, and optionally write commands */ + if (!(g_pserver->loading == LOADING_REPLICATION && g_pserver->fActiveReplica && ((c->cmd->flags & CMD_READONLY) || g_pserver->fWriteDuringActiveLoad))) + { + addReply(c, shared.loadingerr); + return C_OK; + } } /* Lua script too slow? Only allow a limited number of commands. @@ -4494,7 +4507,7 @@ sds genRedisInfoString(const char *section) { "aof_last_cow_size:%zu\r\n" "module_fork_in_progress:%d\r\n" "module_fork_last_cow_size:%zu\r\n", - g_pserver->loading.load(std::memory_order_relaxed), + !!g_pserver->loading.load(std::memory_order_relaxed), /* Note: libraries expect 1 or 0 here so coerce our enum */ g_pserver->dirty, g_pserver->rdb_child_pid != -1, (intmax_t)g_pserver->lastsave, @@ -4643,64 +4656,61 @@ sds genRedisInfoString(const char *section) { listLength(g_pserver->masters) == 0 ? "master" : g_pserver->fActiveReplica ? "active-replica" : "slave"); if (listLength(g_pserver->masters)) { - listIter li; - listNode *ln; - listRewind(g_pserver->masters, &li); - bool fAllUp = true; - while ((ln = listNext(&li))) { - redisMaster *mi = (redisMaster*)listNodeValue(ln); - fAllUp = fAllUp && mi->repl_state == REPL_STATE_CONNECTED; - } - info = sdscatprintf(info, "master_global_link_status:%s\r\n", - fAllUp ? "up" : "down"); + FBrokenLinkToMaster() ? "down" : "up"); int cmasters = 0; + listIter li; + listNode *ln; listRewind(g_pserver->masters, &li); while ((ln = listNext(&li))) { long long slave_repl_offset = 1; redisMaster *mi = (redisMaster*)listNodeValue(ln); - info = sdscatprintf(info, "Master %d: \r\n", cmasters); - ++cmasters; if (mi->master) slave_repl_offset = mi->master->reploff; else if (mi->cached_master) slave_repl_offset = mi->cached_master->reploff; + char master_prefix[128] = ""; + if (cmasters != 0) { + snprintf(master_prefix, sizeof(master_prefix), "_%d", cmasters); + } + info = sdscatprintf(info, - "master_host:%s\r\n" - "master_port:%d\r\n" - "master_link_status:%s\r\n" - "master_last_io_seconds_ago:%d\r\n" - "master_sync_in_progress:%d\r\n" + "master%s_host:%s\r\n" + "master%s_port:%d\r\n" + "master%s_link_status:%s\r\n" + "master%s_last_io_seconds_ago:%d\r\n" + "master%s_sync_in_progress:%d\r\n" "slave_repl_offset:%lld\r\n" - ,mi->masterhost, - mi->masterport, - (mi->repl_state == REPL_STATE_CONNECTED) ? + ,master_prefix, mi->masterhost, + master_prefix, mi->masterport, + master_prefix, (mi->repl_state == REPL_STATE_CONNECTED) ? "up" : "down", - mi->master ? + master_prefix, mi->master ? ((int)(g_pserver->unixtime-mi->master->lastinteraction)) : -1, - mi->repl_state == REPL_STATE_TRANSFER, + master_prefix, mi->repl_state == REPL_STATE_TRANSFER, slave_repl_offset ); if (mi->repl_state == REPL_STATE_TRANSFER) { info = sdscatprintf(info, - "master_sync_left_bytes:%lld\r\n" - "master_sync_last_io_seconds_ago:%d\r\n" - , (long long) + "master%s_sync_left_bytes:%lld\r\n" + "master%s_sync_last_io_seconds_ago:%d\r\n" + , master_prefix, (long long) (mi->repl_transfer_size - mi->repl_transfer_read), - (int)(g_pserver->unixtime-mi->repl_transfer_lastio) + master_prefix, (int)(g_pserver->unixtime-mi->repl_transfer_lastio) ); } if (mi->repl_state != REPL_STATE_CONNECTED) { info = sdscatprintf(info, - "master_link_down_since_seconds:%jd\r\n", - (intmax_t)g_pserver->unixtime-mi->repl_down_since); + "master%s_link_down_since_seconds:%jd\r\n", + master_prefix, (intmax_t)g_pserver->unixtime-mi->repl_down_since); } + ++cmasters; } info = sdscatprintf(info, "slave_priority:%d\r\n" diff --git a/src/server.h b/src/server.h index 7479b4557..02aa630ab 100644 --- a/src/server.h +++ b/src/server.h @@ -94,6 +94,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "semiorderedset.h" #include "connection.h" /* Connection abstraction */ #include "serverassert.h" +#include "expire.h" #define REDISMODULE_CORE 1 #include "redismodule.h" /* Redis modules API defines. */ @@ -104,6 +105,9 @@ typedef long long ustime_t; /* microsecond time type. */ #include "endianconv.h" #include "crc64.h" +#define LOADING_BOOT 1 +#define LOADING_REPLICATION 2 + extern int g_fTestMode; struct redisObject; @@ -796,7 +800,16 @@ typedef struct RedisModuleDigest { #define MVCC_MS_SHIFT 20 -typedef struct redisObject { +// This struct will be allocated ahead of the ROBJ when needed +struct redisObjectExtended { + uint64_t mvcc_tstamp; +}; + +typedef class redisObject { +protected: + redisObject() {} + +public: unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or @@ -805,7 +818,6 @@ typedef struct redisObject { private: mutable std::atomic refcount {0}; public: - uint64_t mvcc_tstamp; void *m_ptr; inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } @@ -816,7 +828,18 @@ typedef struct redisObject { void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } } robj; -static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); +static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase"); + +class redisObjectStack : public redisObjectExtended, public redisObject +{ +public: + redisObjectStack(); +}; + +uint64_t mvccFromObj(robj_roptr o); +void setMvccTstamp(redisObject *o, uint64_t mvcc); +void *allocPtrFromObj(robj_roptr o); +robj *objFromAllocPtr(void *pv); __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) { @@ -842,243 +865,6 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) return (char*)ptrFromObj(o); } -class expireEntryFat -{ - friend class expireEntry; -public: - struct subexpireEntry - { - long long when; - std::unique_ptr spsubkey; - - subexpireEntry(long long when, const char *subkey) - : when(when), spsubkey(subkey, sdsfree) - {} - - bool operator<(long long when) const noexcept { return this->when < when; } - bool operator<(const subexpireEntry &se) { return this->when < se.when; } - }; - -private: - sds m_keyPrimary; - std::vector m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key - -public: - expireEntryFat(sds keyPrimary) - : m_keyPrimary(keyPrimary) - {} - long long when() const noexcept { return m_vecexpireEntries.front().when; } - const char *key() const noexcept { return m_keyPrimary; } - - bool operator<(long long when) const noexcept { return this->when() < when; } - - void expireSubKey(const char *szSubkey, long long when) - { - // First check if the subkey already has an expiration - for (auto &entry : m_vecexpireEntries) - { - if (szSubkey != nullptr) - { - // if this is a subkey expiry then its not a match if the expireEntry is either for the - // primary key or a different subkey - if (entry.spsubkey == nullptr || sdscmp((sds)entry.spsubkey.get(), (sds)szSubkey) != 0) - continue; - } - else - { - if (entry.spsubkey != nullptr) - continue; - } - m_vecexpireEntries.erase(m_vecexpireEntries.begin() + (&entry - m_vecexpireEntries.data())); - break; - } - auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when); - const char *subkey = (szSubkey) ? sdsdup(szSubkey) : nullptr; - m_vecexpireEntries.emplace(itrInsert, when, subkey); - } - - bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } - const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } - void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); } - const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; } - size_t size() const noexcept { return m_vecexpireEntries.size(); } -}; - -class expireEntry { - union - { - sds m_key; - expireEntryFat *m_pfatentry; - } u; - long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer - -public: - class iter - { - friend class expireEntry; - expireEntry *m_pentry = nullptr; - size_t m_idx = 0; - - public: - iter(expireEntry *pentry, size_t idx) - : m_pentry(pentry), m_idx(idx) - {} - - iter &operator++() { ++m_idx; return *this; } - - const char *subkey() const - { - if (m_pentry->FFat()) - return (*m_pentry->pfatentry())[m_idx].spsubkey.get(); - return nullptr; - } - long long when() const - { - if (m_pentry->FFat()) - return (*m_pentry->pfatentry())[m_idx].when; - return m_pentry->when(); - } - - bool operator!=(const iter &other) - { - return m_idx != other.m_idx; - } - - const iter &operator*() const { return *this; } - }; - - expireEntry(sds key, const char *subkey, long long when) - { - if (subkey != nullptr) - { - m_when = LLONG_MIN; - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key); - u.m_pfatentry->expireSubKey(subkey, when); - } - else - { - u.m_key = key; - m_when = when; - } - } - - expireEntry(expireEntryFat *pfatentry) - { - u.m_pfatentry = pfatentry; - m_when = LLONG_MIN; - } - - expireEntry(expireEntry &&e) - { - u.m_key = e.u.m_key; - m_when = e.m_when; - e.u.m_key = (char*)key(); // we do this so it can still be found in the set - e.m_when = 0; - } - - ~expireEntry() - { - if (FFat()) - delete u.m_pfatentry; - } - - void setKeyUnsafe(sds key) - { - if (FFat()) - u.m_pfatentry->m_keyPrimary = key; - else - u.m_key = key; - } - - inline bool FFat() const noexcept { return m_when == LLONG_MIN; } - expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } - - - bool operator==(const char *key) const noexcept - { - return this->key() == key; - } - - bool operator<(const expireEntry &e) const noexcept - { - return when() < e.when(); - } - bool operator<(long long when) const noexcept - { - return this->when() < when; - } - - const char *key() const noexcept - { - if (FFat()) - return u.m_pfatentry->key(); - return u.m_key; - } - long long when() const noexcept - { - if (FFat()) - return u.m_pfatentry->when(); - return m_when; - } - - void update(const char *subkey, long long when) - { - if (!FFat()) - { - if (subkey == nullptr) - { - m_when = when; - return; - } - else - { - // we have to upgrade to a fat entry - long long whenT = m_when; - sds keyPrimary = u.m_key; - m_when = LLONG_MIN; - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); - u.m_pfatentry->expireSubKey(nullptr, whenT); - // at this point we're fat so fall through - } - } - u.m_pfatentry->expireSubKey(subkey, when); - } - - iter begin() { return iter(this, 0); } - iter end() - { - if (FFat()) - return iter(this, u.m_pfatentry->size()); - return iter(this, 1); - } - - void erase(iter &itr) - { - if (!FFat()) - throw -1; // assert - pfatentry()->m_vecexpireEntries.erase( - pfatentry()->m_vecexpireEntries.begin() + itr.m_idx); - } - - bool FGetPrimaryExpire(long long *pwhen) - { - *pwhen = -1; - for (auto itr : *this) - { - if (itr.subkey() == nullptr) - { - *pwhen = itr.when(); - return true; - } - } - return false; - } - - explicit operator const char*() const noexcept { return key(); } - explicit operator long long() const noexcept { return when(); } -}; -typedef semiorderedset expireset; - /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, * and Module types have their registered name returned. */ @@ -1522,6 +1308,9 @@ typedef struct redisTLSContextConfig { char *ciphers; char *ciphersuites; int prefer_server_ciphers; + int session_caching; + int session_cache_size; + int session_cache_timeout; } redisTLSContextConfig; /*----------------------------------------------------------------------------- @@ -1566,6 +1355,7 @@ struct redisServerThreadVars { char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; bool fRetrySetAofEvent = false; + std::vector vecclientsProcess; }; struct redisMaster { @@ -1581,6 +1371,7 @@ struct redisMaster { char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ long long master_initial_offset; /* Master PSYNC offset. */ + bool isActive = false; int repl_state; /* Replication status if the instance is a replica */ off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ @@ -1871,6 +1662,7 @@ struct redisServer { int repl_syncio_timeout; /* Timeout for synchronous I/O calls */ int repl_disable_tcp_nodelay; /* Disable TCP_NODELAY after SYNC? */ int repl_serve_stale_data; /* Serve stale data when link is down? */ + int repl_quorum; /* For multimaster what do we consider a quorum? -1 means all master must be online */ int repl_slave_ro; /* Slave is read only? */ int repl_slave_ignore_maxmemory; /* If true slaves do not evict. */ int slave_priority; /* Reported in INFO and used by Sentinel. */ @@ -1991,6 +1783,7 @@ struct redisServer { int watchdog_period; /* Software watchdog period in ms. 0 = off */ int fActiveReplica; /* Can this replica also be a master? */ + int fWriteDuringActiveLoad; /* Can this active-replica write during an RDB load? */ // Format: // Lower 20 bits: a counter incrementing for each command executed in the same millisecond @@ -2557,7 +2350,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev size_t freeMemoryGetNotCountedMemory(); int freeMemoryIfNeeded(void); int freeMemoryIfNeededAndSafe(void); -int processCommand(client *c, int callFlags, class AeLocker &locker); +int processCommand(client *c, int callFlags); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); struct redisCommand *lookupCommandByCString(const char *s); diff --git a/src/storage.h b/src/storage.h index e9106aca2..2d46c0223 100644 --- a/src/storage.h +++ b/src/storage.h @@ -1,8 +1,6 @@ #ifndef __STORAGE_H__ #define __STORAGE_H__ -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 // Note: also defined in object.c - should always match - #ifdef __cplusplus extern "C" { #endif diff --git a/src/t_list.cpp b/src/t_list.cpp index f95852aac..e9bac8211 100644 --- a/src/t_list.cpp +++ b/src/t_list.cpp @@ -331,7 +331,7 @@ void lindexCommand(client *c) { addReplyBulk(c,value); decrRefCount(value); } else { - addReplyNull(c,shared.nullbulk); + addReplyNull(c); } } else { serverPanic("Unknown list encoding"); diff --git a/src/tls.cpp b/src/tls.cpp index 25ca0bd31..5a128b596 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -150,8 +150,6 @@ void tlsInit(void) { serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator."); } - /* Server configuration */ - g_pserver->tls_auth_clients = 1; /* Secure by default */ tlsInitThread(); } @@ -193,6 +191,15 @@ int tlsConfigure(redisTLSContextConfig *ctx_config) { SSL_CTX_set_options(ctx, SSL_OP_DONT_INSERT_EMPTY_FRAGMENTS); #endif + if (ctx_config->session_caching) { + SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_SERVER); + SSL_CTX_sess_set_cache_size(ctx, ctx_config->session_cache_size); + SSL_CTX_set_timeout(ctx, ctx_config->session_cache_timeout); + SSL_CTX_set_session_id_context(ctx, (const unsigned char*) "KeyDB", 5); + } else { + SSL_CTX_set_session_cache_mode(ctx, SSL_SESS_CACHE_OFF); + } + protocols = parseProtocolsConfig(ctx_config->protocols); if (protocols == -1) goto error; diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl index 0e753c147..0f0d34b7f 100644 --- a/tests/integration/replication-multimaster.tcl +++ b/tests/integration/replication-multimaster.tcl @@ -96,6 +96,26 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { } } } + + # Keep this test last since it muchs with the config + if [string equal $topology "mesh"] { + test "$topology_name quorum respected" { + $R(0) config set replica-serve-stale-data no + + # No issues when all nodes are connected with default settings + $R(0) get testkey + + # No issues when quorum is equal to the number of nodes + $R(0) config set replica-quorum 3 + $R(0) get testkey + + $R(0) config set replica-quorum 4 + catch { + $R(0) get testkey + } e + assert_match {*MASTER is down*} $e + } + } } } } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index c7121e848..8ca84bf2f 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -78,17 +78,8 @@ start_server {tags {"introspection"}} { syslog-facility databases port - io-threads tls-port - tls-prefer-server-ciphers - tls-cert-file - tls-key-file - tls-dh-params-file - tls-ca-cert-file - tls-ca-cert-dir - tls-protocols - tls-ciphers - tls-ciphersuites + io-threads logfile unixsocketperm slaveof @@ -101,6 +92,23 @@ start_server {tags {"introspection"}} { bgsave_cpulist } + if {!$::tls} { + append skip_configs { + tls-prefer-server-ciphers + tls-session-cache-timeout + tls-session-cache-size + tls-session-caching + tls-cert-file + tls-key-file + tls-dh-params-file + tls-ca-cert-file + tls-ca-cert-dir + tls-protocols + tls-ciphers + tls-ciphersuites + } + } + set configs {} foreach {k v} [r config get *] { if {[lsearch $skip_configs $k] != -1} { diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index 6c991ac97..7bcd7753b 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -109,6 +109,7 @@ start_server {tags {"pubsub"}} { unsubscribe $rd1 # Wait for a response to the unsub __consume_subscribe_messages $rd1 unsubscribe {chan1 chan2 chan3} + after 1 assert_equal 0 [r publish chan1 hello] assert_equal 0 [r publish chan2 hello] assert_equal 0 [r publish chan3 hello]