Skip to content

Commit

Permalink
Use MultiGet
Browse files Browse the repository at this point in the history
  • Loading branch information
JohnSully committed Sep 25, 2023
1 parent e95c8b4 commit f0e8c6f
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 63 deletions.
2 changes: 1 addition & 1 deletion src/AsyncWorkQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void AsyncWorkQueue::WorkerThreadMain()
listRelease(vars.clients_pending_asyncwrite);

std::unique_lock<fastlock> lockf(serverTL->lockPendingWrite);
serverTL->vecclientsProcess.clear();
serverTL->setclientsProcess.clear();
serverTL->clients_pending_write.clear();
std::atomic_thread_fence(std::memory_order_seq_cst);
}
Expand Down
5 changes: 3 additions & 2 deletions src/IStorage.h
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
#pragma once
#include <functional>
#include <unordered_set>
#include "sds.h"
#include "ae.h"

#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f"

struct StorageToken {
struct client *c;
std::unordered_set<struct client *> setc;
struct redisDbPersistentData *db;
virtual ~StorageToken() {}
};
Expand Down Expand Up @@ -42,7 +43,7 @@ class IStorage
virtual bool enumerate_hashslot(callback fn, unsigned int hashslot) const = 0;
virtual size_t count() const = 0;

virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, const char * /*key*/, size_t /*cchKey*/) {return nullptr;};
virtual StorageToken *begin_retrieve(struct aeEventLoop *, aePostFunctionTokenProc, sds *, size_t) {return nullptr;};
virtual void complete_retrieve(StorageToken * /*tok*/, callbackSingle /*fn*/) {};

virtual void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem) {
Expand Down
6 changes: 4 additions & 2 deletions src/StorageCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn) const
m_spstorage->retrieve(key, sdslen(key), fn);
}

StorageToken *StorageCache::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds key) {
StorageToken *StorageCache::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey) {
#if 0
std::unique_lock<fastlock> ul(m_lock);
if (m_pdict != nullptr)
{
Expand All @@ -199,7 +200,8 @@ StorageToken *StorageCache::begin_retrieve(struct aeEventLoop *el, aePostFunctio
return nullptr; // Not found
}
ul.unlock();
return m_spstorage->begin_retrieve(el, proc, key, sdslen(key));
#endif
return m_spstorage->begin_retrieve(el, proc, rgkey, ckey);
}

void StorageCache::complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn) {
Expand Down
2 changes: 1 addition & 1 deletion src/StorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class StorageCache
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void bulkInsert(char **rgkeys, size_t *rgcbkeys, char **rgvals, size_t *rgcbvals, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const;
StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds key);
StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey);
void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn);
bool erase(sds key);
void emergencyFreeCache();
Expand Down
2 changes: 1 addition & 1 deletion src/ae.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionTokenProc *proc, Storag
cmd.op = AE_ASYNC_OP::PostAsynDBFunction;
cmd.tproc = proc;
cmd.clientData = (void*)token;
cmd.fLock = true;
cmd.fLock = false;
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (size != sizeof(cmd))
return AE_ERR;
Expand Down
2 changes: 1 addition & 1 deletion src/blocked.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ void unblockClient(client *c) {
listDelNode(g_pserver->paused_clients,c->paused_list_node);
c->paused_list_node = NULL;
} else if (c->btype == BLOCKED_STORAGE) {
serverTL->vecclientsProcess.push_back(c);
serverTL->setclientsProcess.insert(c);
} else {
serverPanic("Unknown btype in unblockClient().");
}
Expand Down
88 changes: 55 additions & 33 deletions src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3363,45 +3363,60 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
#endif
return;
}
}

AeLocker lock;
void redisDbPersistentData::prefetchKeysFlash(std::unordered_set<client*> &setc)
{
serverAssert(GlobalLocksAcquired());
std::vector<sds> veckeys;
std::unordered_set<client*> setcBlocked;

for (client *c : setc) {
for (auto &command : c->vecqueuedcmd) {
getKeysResult result = GETKEYS_RESULT_INIT;
auto cmd = lookupCommand(szFromObj(command.argv[0]));
if (cmd == nullptr)
return; // Bad command? It's not for us to judge, just bail

if (command.argc < std::abs(cmd->arity))
return; // Invalid number of args

int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
bool fQueued = false;
for (int ikey = 0; ikey < numkeys; ++ikey)
{
robj *objKey = command.argv[result.keys[ikey]];
if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) {
veckeys.push_back(szFromObj(objKey));
fQueued = true;
}
}

std::vector<robj*> veckeys;
lock.arm(c);
getKeysResult result = GETKEYS_RESULT_INIT;
auto cmd = lookupCommand(szFromObj(command.argv[0]));
if (cmd == nullptr)
return; // Bad command? It's not for us to judge, just bail

if (command.argc < std::abs(cmd->arity))
return; // Invalid number of args

int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result);
for (int ikey = 0; ikey < numkeys; ++ikey)
{
robj *objKey = command.argv[result.keys[ikey]];
if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr)
veckeys.push_back(objKey);
if (fQueued) {
setcBlocked.insert(c);
}
getKeysFreeResult(&result);
}
}

getKeysFreeResult(&result);

std::vector<std::tuple<sds, robj*, std::unique_ptr<expireEntry>>> vecInserts;
for (robj *objKey : veckeys)
{
std::unique_ptr<expireEntry> spexpire;
auto *tok = m_spstorage->begin_retrieve(serverTL->el, storageLoadCallback, (sds)szFromObj(objKey));
if (tok != nullptr) {
tok->c = c;
tok->db = this;
blockClient(c, BLOCKED_STORAGE);
auto *tok = m_spstorage->begin_retrieve(serverTL->el, storageLoadCallback, veckeys.data(), veckeys.size());
if (tok != nullptr) {
for (client *c : setcBlocked) {
if (!(c->flags & CLIENT_BLOCKED))
blockClient(c, BLOCKED_STORAGE);
}
tok->setc = std::move(setcBlocked);
tok->db = this;
}

return;
}

/*static*/ void redisDbPersistentData::storageLoadCallback(aeEventLoop *el, StorageToken *tok) {
/*static*/ void redisDbPersistentData::storageLoadCallback(aeEventLoop *, StorageToken *tok) {
serverTL->setStorageTokensProcess.insert(tok);
}

void redisDbPersistentData::processStorageToken(StorageToken *tok) {
auto setc = std::move(tok->setc);
tok->db->m_spstorage->complete_retrieve(tok, [&](const char *szKey, size_t cbKey, const void *data, size_t cb) {
auto *db = tok->db;
size_t offset = 0;
Expand Down Expand Up @@ -3435,6 +3450,13 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
serverAssert(o->FExpires() == (db->m_setexpire->find(key) != db->m_setexpire->end()));
}
});
std::unique_lock<fastlock> ul(tok->c->lock);
unblockClient(tok->c);
}
tok = nullptr; // Invalid past this point

for (client *c : setc) {
std::unique_lock<fastlock> ul(c->lock);
if (c->flags & CLIENT_BLOCKED)
unblockClient(c);
else
serverTL->setclientsProcess.insert(c);
}
}
18 changes: 12 additions & 6 deletions src/networking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1593,7 +1593,8 @@ void unlinkClient(client *c) {
c->fPendingAsyncWrite = FALSE;
}

serverTL->vecclientsProcess.erase(std::remove(serverTL->vecclientsProcess.begin(), serverTL->vecclientsProcess.end(), c), serverTL->vecclientsProcess.end());
serverTL->setclientsProcess.erase(c);
serverTL->setclientsPrefetch.erase(c);

/* Clear the tracking status. */
if (c->flags & CLIENT_TRACKING) disableTracking(c);
Expand Down Expand Up @@ -2774,8 +2775,13 @@ void readQueryFromClient(connection *conn) {
processInputBuffer(c, false, CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_ASYNC);
}
}
if (!c->vecqueuedcmd.empty() && !(c->flags & CLIENT_BLOCKED))
serverTL->vecclientsProcess.push_back(c);
if (!c->vecqueuedcmd.empty()) {
if (g_pserver->m_pstorageFactory != nullptr && g_pserver->prefetch_enabled) {
serverTL->setclientsPrefetch.insert(c);
} else {
serverTL->setclientsProcess.insert(c);
}
}
} else {
// If we're single threaded its actually better to just process the command here while the query is hot in the cache
// multithreaded lock contention dominates and batching is better
Expand All @@ -2790,9 +2796,9 @@ void processClients()
serverAssert(GlobalLocksAcquired());

// Note that this function is reentrant and vecclients may be modified by code called from processInputBuffer
while (!serverTL->vecclientsProcess.empty()) {
client *c = serverTL->vecclientsProcess.front();
serverTL->vecclientsProcess.erase(serverTL->vecclientsProcess.begin());
while (!serverTL->setclientsProcess.empty()) {
client *c = *serverTL->setclientsProcess.begin();
serverTL->setclientsProcess.erase(serverTL->setclientsProcess.begin());

/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
Expand Down
16 changes: 15 additions & 1 deletion src/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2840,6 +2840,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) {

locker.arm();

for (auto *tok : serverTL->setStorageTokensProcess) {
tok->db->processStorageToken(tok);
}
serverTL->setStorageTokensProcess.clear();

if (g_pserver->m_pstorageFactory != nullptr && !serverTL->setclientsPrefetch.empty()) {
g_pserver->db[0]->prefetchKeysFlash(serverTL->setclientsPrefetch);
for (client *c : serverTL->setclientsPrefetch) {
if (!(c->flags & CLIENT_BLOCKED))
serverTL->setclientsProcess.insert(c);
}
serverTL->setclientsPrefetch.clear();
}

/* end any snapshots created by fast async commands */
for (int idb = 0; idb < cserver.dbnum; ++idb) {
if (serverTL->rgdbSnapshot[idb] != nullptr && serverTL->rgdbSnapshot[idb]->FStale()) {
Expand Down Expand Up @@ -4146,7 +4160,7 @@ void InitServerLast() {
set_jemalloc_bg_thread(cserver.jemalloc_bg_thread);
g_pserver->initial_memory_usage = zmalloc_used_memory();

g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads);
g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads*10);

// Allocate the repl backlog

Expand Down
8 changes: 7 additions & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,8 @@ class redisDbPersistentData
bool keycacheIsEnabled();

void prefetchKeysAsync(client *c, struct parsed_command &command);
void prefetchKeysFlash(std::unordered_set<client*> &setc);
void processStorageToken(StorageToken *tok);

bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }

Expand Down Expand Up @@ -1374,6 +1376,8 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::dictUnsafeKeyOnly;
using redisDbPersistentData::resortExpire;
using redisDbPersistentData::prefetchKeysAsync;
using redisDbPersistentData::prefetchKeysFlash;
using redisDbPersistentData::processStorageToken;
using redisDbPersistentData::prepOverwriteForSnapshot;
using redisDbPersistentData::FRehashing;
using redisDbPersistentData::FTrackingChanges;
Expand Down Expand Up @@ -2212,7 +2216,9 @@ struct redisServerThreadVars {

int propagate_in_transaction = 0; /* Make sure we don't propagate nested MULTI/EXEC */
int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */
std::vector<client*> vecclientsProcess;
std::unordered_set<client*> setclientsProcess;
std::unordered_set<client*> setclientsPrefetch;
std::unordered_set<StorageToken*> setStorageTokensProcess;
dictAsyncRehashCtl *rehashCtl = nullptr;

int getRdbKeySaveDelay();
Expand Down
49 changes: 36 additions & 13 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,21 +279,41 @@ bool RocksDBStorageProvider::FKeyExists(std::string& key) const
}

struct RetrievalStorageToken : public StorageToken {
std::string key;
std::vector<char> data;
bool fFound = false;
std::unordered_map<std::string, std::unique_ptr<rocksdb::PinnableSlice>> mapkeydata;
};

StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, const char *key, size_t cchKey) {
StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, sds *rgkey, size_t ckey) {
RetrievalStorageToken *tok = new RetrievalStorageToken();
tok->key = std::string(key, cchKey);

for (size_t ikey = 0; ikey < ckey; ++ikey) {
tok->mapkeydata.insert(std::make_pair(std::string(rgkey[ikey], sdslen(rgkey[ikey])), nullptr));
}

(*m_pfactory->m_wqueue)->AddWorkFunction([this, el, callback, tok]{
rocksdb::PinnableSlice slice;
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(prefixKey(tok->key.data(), tok->key.size())), &slice);
if (status.ok()) {
tok->data.resize(slice.size());
memcpy(tok->data.data(), slice.data(), slice.size());
tok->fFound = true;
std::vector<std::string> veckeysStr;
std::vector<rocksdb::Slice> veckeys;
std::vector<rocksdb::PinnableSlice> vecvals;
std::vector<rocksdb::Status> vecstatus;
veckeys.reserve(tok->mapkeydata.size());
veckeysStr.reserve(tok->mapkeydata.size());
vecvals.resize(tok->mapkeydata.size());
vecstatus.resize(tok->mapkeydata.size());
for (auto &pair : tok->mapkeydata) {
veckeysStr.emplace_back(prefixKey(pair.first.data(), pair.first.size()));
veckeys.emplace_back(veckeysStr.back().data(), veckeysStr.back().size());
}

m_spdb->MultiGet(ReadOptions(),
m_spcolfamily.get(),
veckeys.size(), const_cast<const rocksdb::Slice*>(veckeys.data()),
vecvals.data(), vecstatus.data());

auto itrDst = tok->mapkeydata.begin();
for (size_t ires = 0; ires < veckeys.size(); ++ires) {
if (vecstatus[ires].ok()) {
itrDst->second = std::make_unique<rocksdb::PinnableSlice>(std::move(vecvals[ires]));
}
++itrDst;
}
aePostFunction(el, callback, tok);
});
Expand All @@ -302,7 +322,10 @@ StorageToken *RocksDBStorageProvider::begin_retrieve(struct aeEventLoop *el, aeP

void RocksDBStorageProvider::complete_retrieve(StorageToken *tok, callbackSingle fn) {
RetrievalStorageToken *rtok = reinterpret_cast<RetrievalStorageToken*>(tok);
if (rtok->fFound)
fn(rtok->key.data(), rtok->key.size(), rtok->data.data(), rtok->data.size());
for (auto &pair : rtok->mapkeydata) {
if (pair.second != nullptr) {
fn(pair.first.data(), pair.first.size(), pair.second->data(), pair.second->size());
}
}
delete rtok;
}
2 changes: 1 addition & 1 deletion src/storage/rocksdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class RocksDBStorageProvider : public IStorage
virtual bool erase(const char *key, size_t cchKey) override;
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;

virtual StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, const char *key, size_t cchKey);
virtual StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc callback, sds *rgkey, size_t ckey);
virtual void complete_retrieve(StorageToken *tok, callbackSingle fn);

virtual size_t clear() override;
Expand Down

0 comments on commit f0e8c6f

Please sign in to comment.