diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index 8d04b742a..77f5e9ea4 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -56,7 +56,7 @@ void AsyncWorkQueue::WorkerThreadMain() listRelease(vars.clients_pending_asyncwrite); std::unique_lock lockf(serverTL->lockPendingWrite); - serverTL->vecclientsProcess.clear(); + serverTL->setclientsProcess.clear(); serverTL->clients_pending_write.clear(); std::atomic_thread_fence(std::memory_order_seq_cst); } diff --git a/src/IStorage.h b/src/IStorage.h index 5301884ee..79726daf5 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -1,12 +1,13 @@ #pragma once #include +#include #include "sds.h" #include "ae.h" #define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f" struct StorageToken { - struct client *c; + std::unordered_set setc; struct redisDbPersistentData *db; virtual ~StorageToken() {} }; @@ -42,7 +43,7 @@ class IStorage virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const = 0; virtual size_t count() const = 0; - virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, const char * /*key*/, size_t /*cchKey*/) {return nullptr;}; + virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, sds *, size_t) {return nullptr;}; virtual void complete_retrieve(StorageToken * /*tok*/, callbackSingle /*fn*/) {}; virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) { diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 2445e7259..c0ff305b1 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -188,7 +188,8 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const m_spstorage->retrieve(key, sdslen(key), fn); } -StorageToken *StorageCache::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds key) { +StorageToken *StorageCache::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey) { +#if 0 std::unique_lock ul(m_lock); if (m_pdict != nullptr) { @@ -199,7 +200,8 @@ StorageToken *StorageCache::begin_retrieve(struct aeEventLoop *el, aePostFunctio return nullptr; // Not found } ul.unlock(); - return m_spstorage->begin_retrieve(el, proc, key, sdslen(key)); +#endif + return m_spstorage->begin_retrieve(el, proc, rgkey, ckey); } void StorageCache::complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn) { diff --git a/src/StorageCache.h b/src/StorageCache.h index b9996acda..614f8c27b 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -43,7 +43,7 @@ class StorageCache void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; - StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds key); + StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey); void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn); bool erase(sds key); void emergencyFreeCache(); diff --git a/src/ae.cpp b/src/ae.cpp index a43969fe0..2b3b854cd 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -278,7 +278,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionTokenProc *proc, Storag cmd.op = AE_ASYNC_OP::PostAsynDBFunction; cmd.tproc = proc; cmd.clientData = (void*)token; - cmd.fLock = true; + cmd.fLock = false; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) return AE_ERR; diff --git a/src/blocked.cpp b/src/blocked.cpp index 5f098706f..8062b8d54 100644 --- a/src/blocked.cpp +++ b/src/blocked.cpp @@ -208,7 +208,7 @@ void unblockClient(client *c) { listDelNode(g_pserver->paused_clients,c->paused_list_node); c->paused_list_node = NULL; } else if (c->btype == BLOCKED_STORAGE) { - serverTL->vecclientsProcess.push_back(c); + serverTL->setclientsProcess.insert(c); } else { serverPanic("Unknown btype in unblockClient()."); } diff --git a/src/db.cpp b/src/db.cpp index 7185e72cb..6e0f127e8 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3363,45 +3363,60 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command #endif return; } +} - AeLocker lock; +void redisDbPersistentData::prefetchKeysFlash(std::unordered_set &setc) +{ + serverAssert(GlobalLocksAcquired()); + std::vector veckeys; + std::unordered_set setcBlocked; + + for (client *c : setc) { + for (auto &command : c->vecqueuedcmd) { + getKeysResult result = GETKEYS_RESULT_INIT; + auto cmd = lookupCommand(szFromObj(command.argv[0])); + if (cmd == nullptr) + return; // Bad command? It's not for us to judge, just bail + + if (command.argc < std::abs(cmd->arity)) + return; // Invalid number of args + + int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); + bool fQueued = false; + for (int ikey = 0; ikey < numkeys; ++ikey) + { + robj *objKey = command.argv[result.keys[ikey]]; + if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) { + veckeys.push_back(szFromObj(objKey)); + fQueued = true; + } + } - std::vector veckeys; - lock.arm(c); - getKeysResult result = GETKEYS_RESULT_INIT; - auto cmd = lookupCommand(szFromObj(command.argv[0])); - if (cmd == nullptr) - return; // Bad command? It's not for us to judge, just bail - - if (command.argc < std::abs(cmd->arity)) - return; // Invalid number of args - - int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); - for (int ikey = 0; ikey < numkeys; ++ikey) - { - robj *objKey = command.argv[result.keys[ikey]]; - if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) - veckeys.push_back(objKey); + if (fQueued) { + setcBlocked.insert(c); + } + getKeysFreeResult(&result); + } } - getKeysFreeResult(&result); - - std::vector>> vecInserts; - for (robj *objKey : veckeys) - { - std::unique_ptr spexpire; - auto *tok = m_spstorage->begin_retrieve(serverTL->el, storageLoadCallback, (sds)szFromObj(objKey)); - if (tok != nullptr) { - tok->c = c; - tok->db = this; - blockClient(c, BLOCKED_STORAGE); + auto *tok = m_spstorage->begin_retrieve(serverTL->el, storageLoadCallback, veckeys.data(), veckeys.size()); + if (tok != nullptr) { + for (client *c : setcBlocked) { + if (!(c->flags & CLIENT_BLOCKED)) + blockClient(c, BLOCKED_STORAGE); } + tok->setc = std::move(setcBlocked); + tok->db = this; } - return; } -/*static*/ void redisDbPersistentData::storageLoadCallback(aeEventLoop *el, StorageToken *tok) { +/*static*/ void redisDbPersistentData::storageLoadCallback(aeEventLoop *, StorageToken *tok) { + serverTL->setStorageTokensProcess.insert(tok); +} + +void redisDbPersistentData::processStorageToken(StorageToken *tok) { + auto setc = std::move(tok->setc); tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb) { auto *db = tok->db; size_t offset = 0; @@ -3435,6 +3450,13 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command serverAssert(o->FExpires() == (db->m_setexpire->find(key) != db->m_setexpire->end())); } }); - std::unique_lock ul(tok->c->lock); - unblockClient(tok->c); -} + tok = nullptr; // Invalid past this point + + for (client *c : setc) { + std::unique_lock ul(c->lock); + if (c->flags & CLIENT_BLOCKED) + unblockClient(c); + else + serverTL->setclientsProcess.insert(c); + } +} \ No newline at end of file diff --git a/src/networking.cpp b/src/networking.cpp index 76d70684a..2c047ae7c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1593,7 +1593,8 @@ void unlinkClient(client *c) { c->fPendingAsyncWrite = FALSE; } - serverTL->vecclientsProcess.erase(std::remove(serverTL->vecclientsProcess.begin(), serverTL->vecclientsProcess.end(), c), serverTL->vecclientsProcess.end()); + serverTL->setclientsProcess.erase(c); + serverTL->setclientsPrefetch.erase(c); /* Clear the tracking status. */ if (c->flags & CLIENT_TRACKING) disableTracking(c); @@ -2774,8 +2775,13 @@ void readQueryFromClient(connection *conn) { processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC); } } - if (!c->vecqueuedcmd.empty() && !(c->flags & CLIENT_BLOCKED)) - serverTL->vecclientsProcess.push_back(c); + if (!c->vecqueuedcmd.empty()) { + if (g_pserver->m_pstorageFactory != nullptr && g_pserver->prefetch_enabled) { + serverTL->setclientsPrefetch.insert(c); + } else { + serverTL->setclientsProcess.insert(c); + } + } } else { // If we're single threaded its actually better to just process the command here while the query is hot in the cache // multithreaded lock contention dominates and batching is better @@ -2790,9 +2796,9 @@ void processClients() serverAssert(GlobalLocksAcquired()); // Note that this function is reentrant and vecclients may be modified by code called from processInputBuffer - while (!serverTL->vecclientsProcess.empty()) { - client *c = serverTL->vecclientsProcess.front(); - serverTL->vecclientsProcess.erase(serverTL->vecclientsProcess.begin()); + while (!serverTL->setclientsProcess.empty()) { + client *c = *serverTL->setclientsProcess.begin(); + serverTL->setclientsProcess.erase(serverTL->setclientsProcess.begin()); /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ diff --git a/src/server.cpp b/src/server.cpp index b8dbbf5a8..0bbf2dec4 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2840,6 +2840,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) { locker.arm(); + for (auto *tok : serverTL->setStorageTokensProcess) { + tok->db->processStorageToken(tok); + } + serverTL->setStorageTokensProcess.clear(); + + if (g_pserver->m_pstorageFactory != nullptr && !serverTL->setclientsPrefetch.empty()) { + g_pserver->db[0]->prefetchKeysFlash(serverTL->setclientsPrefetch); + for (client *c : serverTL->setclientsPrefetch) { + if (!(c->flags & CLIENT_BLOCKED)) + serverTL->setclientsProcess.insert(c); + } + serverTL->setclientsPrefetch.clear(); + } + /* end any snapshots created by fast async commands */ for (int idb = 0; idb < cserver.dbnum; ++idb) { if (serverTL->rgdbSnapshot[idb] != nullptr && serverTL->rgdbSnapshot[idb]->FStale()) { @@ -4146,7 +4160,7 @@ void InitServerLast() { set_jemalloc_bg_thread(cserver.jemalloc_bg_thread); g_pserver->initial_memory_usage = zmalloc_used_memory(); - g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads); + g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10); // Allocate the repl backlog diff --git a/src/server.h b/src/server.h index 39d1cfeb8..ef21fd930 100644 --- a/src/server.h +++ b/src/server.h @@ -1210,6 +1210,8 @@ class redisDbPersistentData bool keycacheIsEnabled(); void prefetchKeysAsync(client *c, struct parsed_command &command); + void prefetchKeysFlash(std::unordered_set &setc); + void processStorageToken(StorageToken *tok); bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } @@ -1374,6 +1376,8 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::resortExpire; using redisDbPersistentData::prefetchKeysAsync; + using redisDbPersistentData::prefetchKeysFlash; + using redisDbPersistentData::processStorageToken; using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::FRehashing; using redisDbPersistentData::FTrackingChanges; @@ -2212,7 +2216,9 @@ struct redisServerThreadVars { int propagate_in_transaction = 0; /* Make sure we don't propagate nested MULTI/EXEC */ int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */ - std::vector vecclientsProcess; + std::unordered_set setclientsProcess; + std::unordered_set setclientsPrefetch; + std::unordered_set setStorageTokensProcess; dictAsyncRehashCtl *rehashCtl = nullptr; int getRdbKeySaveDelay(); diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 1456834ef..ef2f14abb 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -279,21 +279,41 @@ bool RocksDBStorageProvider::FKeyExists(std::string& key) const } struct RetrievalStorageToken : public StorageToken { - std::string key; - std::vector data; - bool fFound = false; + std::unordered_map> mapkeydata; }; -StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, const char *key, size_t cchKey) { +StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, sds *rgkey, size_t ckey) { RetrievalStorageToken *tok = new RetrievalStorageToken(); - tok->key = std::string(key, cchKey); + + for (size_t ikey = 0; ikey < ckey; ++ikey) { + tok->mapkeydata.insert(std::make_pair(std::string(rgkey[ikey], sdslen(rgkey[ikey])), nullptr)); + } + (*m_pfactory->m_wqueue)->AddWorkFunction([this, el, callback, tok]{ - rocksdb::PinnableSlice slice; - auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(prefixKey(tok->key.data(), tok->key.size())), &slice); - if (status.ok()) { - tok->data.resize(slice.size()); - memcpy(tok->data.data(), slice.data(), slice.size()); - tok->fFound = true; + std::vector veckeysStr; + std::vector veckeys; + std::vector vecvals; + std::vector vecstatus; + veckeys.reserve(tok->mapkeydata.size()); + veckeysStr.reserve(tok->mapkeydata.size()); + vecvals.resize(tok->mapkeydata.size()); + vecstatus.resize(tok->mapkeydata.size()); + for (auto &pair : tok->mapkeydata) { + veckeysStr.emplace_back(prefixKey(pair.first.data(), pair.first.size())); + veckeys.emplace_back(veckeysStr.back().data(), veckeysStr.back().size()); + } + + m_spdb->MultiGet(ReadOptions(), + m_spcolfamily.get(), + veckeys.size(), const_cast(veckeys.data()), + vecvals.data(), vecstatus.data()); + + auto itrDst = tok->mapkeydata.begin(); + for (size_t ires = 0; ires < veckeys.size(); ++ires) { + if (vecstatus[ires].ok()) { + itrDst->second = std::make_unique(std::move(vecvals[ires])); + } + ++itrDst; } aePostFunction(el, callback, tok); }); @@ -302,7 +322,10 @@ StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aeP void RocksDBStorageProvider::complete_retrieve(StorageToken *tok, callbackSingle fn) { RetrievalStorageToken *rtok = reinterpret_cast(tok); - if (rtok->fFound) - fn(rtok->key.data(), rtok->key.size(), rtok->data.data(), rtok->data.size()); + for (auto &pair : rtok->mapkeydata) { + if (pair.second != nullptr) { + fn(pair.first.data(), pair.first.size(), pair.second->data(), pair.second->size()); + } + } delete rtok; } \ No newline at end of file diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index c8557a577..01edb4975 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -31,7 +31,7 @@ class RocksDBStorageProvider : public IStorage virtual bool erase(const char *key, size_t cchKey) override; virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override; - virtual StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, const char *key, size_t cchKey); + virtual StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, sds *rgkey, size_t ckey); virtual void complete_retrieve(StorageToken *tok, callbackSingle fn); virtual size_t clear() override;