Skip to content

Commit

Permalink
some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
a00817524 committed Oct 25, 2023
1 parent c08568f commit 1239d56
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
2 changes: 1 addition & 1 deletion src/StorageCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class StorageCache
void retrieve(sds key, IStorage::callbackSingle fn) const;
StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey);
void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn);
StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {m_spstorage->begin_endWriteBatch(el,proc);} // NOP
StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {return m_spstorage->begin_endWriteBatch(el,proc);} // NOP
void complete_endWriteBatch(StorageToken *tok) {m_spstorage->complete_endWriteBatch(tok);};
bool erase(sds key);
void emergencyFreeCache();
Expand Down
2 changes: 1 addition & 1 deletion src/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3091,7 +3091,7 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **

if (m_spstorage != nullptr)
{
auto tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback);
auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback);
if (tok != nullptr)
{
for (client *c : setcBlocked) //need to check how to push client to blocked list
Expand Down
32 changes: 25 additions & 7 deletions src/storage/rocksdb.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,21 +255,39 @@ void RocksDBStorageProvider::endWriteBatch()
m_lock.unlock();
}

StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback)
struct BatchStorageToken : public StorageToken {
std::shared_ptr<rocksdb::DB> tspdb; // Note: This must be first so it is deleted last
rocksdb::WriteBatch* tspbatch;
~BatchStorageToken(){
tspdb.reset();
tspdb = nullptr;
tspbatch = nullptr;
}
};

StorageToken* RocksEncoderStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback)
{
StorageToken *tok = new StorageToken();
auto pbatch = m_spbatch.get();
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok,&pbatch]{
m_spdb->Write(WriteOptions(),pbatch);
// serverLog(LL_WARNING, "RocksEncoderStorageProvider::begin_endWriteBatch");
BatchStorageToken *tok = new BatchStorageToken();
tok->tspbatch = m_spbatch.get();
tok->tspdb = m_spdb;
(*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{
// serverAssert(db);
serverAssert(tok->tspdb);
tok->tspdb->Write(WriteOptions(),tok->tspbatch);
aePostFunction(el,callback,tok);
});
return tok;
}

void RocksDBStorageProvider::complete_endWriteBatch(StorageToken* tok){
// m_spbatch = nullptr;
void RocksEncoderStorageProvider::complete_endWriteBatch(StorageToken* tok){
// serverLog(LL_WARNING, "RocksEncoderStorageProvider::complete_endWriteBatch");
m_lock.unlock();
delete tok;
tok = nullptr;
}


void RocksDBStorageProvider::batch_lock()
{
m_lock.lock();
Expand Down

0 comments on commit 1239d56

Please sign in to comment.