diff --git a/src/db.cpp b/src/db.cpp index 3accdd361..a7d228434 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2528,6 +2528,12 @@ void slotToKeyFlush(int async) { * decrement the reference count to release the keys names. */ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int count) { if (g_pserver->m_pstorageFactory != nullptr) { + // We must commit so the storage engine agrees on the number of items in the hash slot + if (g_pserver->db[0]->FTrackingChanges()) { + if (g_pserver->db[0]->processChanges(false)) + g_pserver->db[0]->commitChanges(); + g_pserver->db[0]->trackChanges(false); + } int j = 0; g_pserver->db[0]->getStorageCache()->enumerate_hashslot([&](const char *key, size_t cchKey, const void *, size_t )->bool { keys[j++] = createStringObject(key, cchKey); @@ -2557,6 +2563,12 @@ unsigned int getKeysInSlot(unsigned int hashslot, robj **keys, unsigned int coun unsigned int delKeysInSlot(unsigned int hashslot) { serverAssert(GlobalLocksAcquired()); if (g_pserver->m_pstorageFactory != nullptr) { + // We must commit so the storage engine agrees on the number of items in the hash slot + if (g_pserver->db[0]->FTrackingChanges()) { + if (g_pserver->db[0]->processChanges(false)) + g_pserver->db[0]->commitChanges(); + g_pserver->db[0]->trackChanges(false); + } int j = 0; g_pserver->db[0]->getStorageCache()->enumerate_hashslot([&](const char *key, size_t cchKey, const void *, size_t )->bool { robj *keyobj = createStringObject(key, cchKey); diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index c8e364117..ace598d5b 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -25,7 +25,7 @@ bool FInternalKey(const char *key, size_t cch) std::string getPrefix(unsigned int hashslot) { char *hash_char = (char *)&hashslot; - return std::string(hash_char + (sizeof(unsigned int) - 2), 2); + return std::string(hash_char, 2); } std::string prefixKey(const char *key, size_t cchKey) @@ -202,23 +202,26 @@ bool RocksDBStorageProvider::enumerate_hashslot(callback fn, unsigned int hashsl std::string prefix = getPrefix(hashslot); std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get())); size_t count = 0; - for (it->Seek(prefix.c_str()); it->Valid(); it->Next()) { + serverAssert(prefix.size() >= 2); + bool full_iter = true; + for (it->Seek(rocksdb::Slice(prefix.data(), prefix.size())); it->Valid(); it->Next()) { if (FInternalKey(it->key().data(), it->key().size())) continue; - if (strncmp(it->key().data(),prefix.c_str(),2) != 0) + if (it->key().size() < 2 || memcmp(it->key().data(),prefix.data(),2) != 0) break; ++count; bool fContinue = fn(it->key().data()+2, it->key().size()-2, it->value().data(), it->value().size()); - if (!fContinue) + if (!fContinue) { + full_iter = false; break; + } } - bool full_iter = !it->Valid() || (strncmp(it->key().data(),prefix.c_str(),2) != 0); if (full_iter && count != g_pserver->cluster->slots_keys_count[hashslot]) { - printf("WARNING: rocksdb hashslot count mismatch"); + serverLog(LL_WARNING, "WARNING: rocksdb hashslot %d count mismatch %zu vs expected %lu", hashslot, count, g_pserver->cluster->slots_keys_count[hashslot]); } - assert(!full_iter || count == g_pserver->cluster->slots_keys_count[hashslot]); - assert(it->status().ok()); // Check for any errors found during the scan + serverAssert(!full_iter || count == g_pserver->cluster->slots_keys_count[hashslot]); + serverAssert(it->status().ok()); // Check for any errors found during the scan return full_iter; }