diff --git a/src/StorageCache.h b/src/StorageCache.h index 828c657df..18fd7f7e4 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -45,7 +45,7 @@ class StorageCache void retrieve(sds key, IStorage::callbackSingle fn) const; StorageToken *begin_retrieve(struct aeEventLoop *el, aePostFunctionTokenProc proc, sds *rgkey, size_t ckey); void complete_retrieve(StorageToken *tok, IStorage::callbackSingle fn); - StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {m_spstorage->begin_endWriteBatch(el,proc);} // NOP + StorageToken* begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* proc) {return m_spstorage->begin_endWriteBatch(el,proc);} // NOP void complete_endWriteBatch(StorageToken *tok) {m_spstorage->complete_endWriteBatch(tok);}; bool erase(sds key); void emergencyFreeCache(); diff --git a/src/db.cpp b/src/db.cpp index 8d23af067..387fbdcdb 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3091,7 +3091,7 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** if (m_spstorage != nullptr) { - auto tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback); + auto *tok = m_spstorage->begin_endWriteBatch(serverTL->el, storageLoadCallback); if (tok != nullptr) { for (client *c : setcBlocked) //need to check how to push client to blocked list diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 1bae69d8c..6c84ccad8 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -255,21 +255,39 @@ void RocksDBStorageProvider::endWriteBatch() m_lock.unlock(); } -StorageToken* RocksDBStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback) +struct BatchStorageToken : public StorageToken { + std::shared_ptr tspdb; // Note: This must be first so it is deleted last + rocksdb::WriteBatch* tspbatch; + ~BatchStorageToken(){ + tspdb.reset(); + tspdb = nullptr; + tspbatch = nullptr; + } +}; + +StorageToken* RocksEncoderStorageProvider::begin_endWriteBatch(struct aeEventLoop *el, aePostFunctionTokenProc* callback) { - StorageToken *tok = new StorageToken(); - auto pbatch = m_spbatch.get(); - (*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok,&pbatch]{ - m_spdb->Write(WriteOptions(),pbatch); + // serverLog(LL_WARNING, "RocksEncoderStorageProvider::begin_endWriteBatch"); + BatchStorageToken *tok = new BatchStorageToken(); + tok->tspbatch = m_spbatch.get(); + tok->tspdb = m_spdb; + (*m_pfactory->m_wqueue)->AddWorkFunction([this, el,callback,tok]{ + // serverAssert(db); + serverAssert(tok->tspdb); + tok->tspdb->Write(WriteOptions(),tok->tspbatch); aePostFunction(el,callback,tok); }); + return tok; } -void RocksDBStorageProvider::complete_endWriteBatch(StorageToken* tok){ - // m_spbatch = nullptr; +void RocksEncoderStorageProvider::complete_endWriteBatch(StorageToken* tok){ + // serverLog(LL_WARNING, "RocksEncoderStorageProvider::complete_endWriteBatch"); m_lock.unlock(); + delete tok; + tok = nullptr; } + void RocksDBStorageProvider::batch_lock() { m_lock.lock();