From 6e1e669f41804d31e408905d39bc0b8caa06f475 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Mon, 19 Feb 2024 14:37:31 +0800 Subject: [PATCH 1/5] Add snapshot common --- src/Service/NuRaftLogSnapshot.h | 16 ++++++++++------ src/Service/SnapshotCommon.cpp | 2 ++ src/Service/SnapshotCommon.h | 14 ++++++++++++++ 3 files changed, 26 insertions(+), 6 deletions(-) create mode 100644 src/Service/SnapshotCommon.cpp create mode 100644 src/Service/SnapshotCommon.h diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index e48aa5d3d3..2401b6a3a5 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -29,11 +29,13 @@ using nuraft::ulong; enum SnapshotVersion : uint8_t { V0 = 0, - V1 = 1, /// with ACL map, and last_log_term for file name + V1 = 1, /// Add ACL map + V2 = 2, /// Replace protobuf + V3 = 3, /// Add last_log_term to file name None = 255, }; -static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V1; +static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V2; struct SnapshotBatchHeader { @@ -73,8 +75,10 @@ class KeeperSnapshotStore const String & snap_dir_, snapshot & meta, UInt32 max_object_node_size_ = MAX_OBJECT_NODE_SIZE, - UInt32 save_batch_size_ = SAVE_BATCH_SIZE) - : snap_dir(snap_dir_) + UInt32 save_batch_size_ = SAVE_BATCH_SIZE, + SnapshotVersion version_ = CURRENT_SNAPSHOT_VERSION) + : version(version_) + , snap_dir(snap_dir_) , max_object_node_size(max_object_node_size_) , save_batch_size(save_batch_size_) , log(&(Poco::Logger::get("KeeperSnapshotStore"))) @@ -143,7 +147,7 @@ class KeeperSnapshotStore static const int SNAPSHOT_THREAD_NUM = 8; static const int IO_BUFFER_SIZE = 16384; /// 16K - SnapshotVersion version = CURRENT_SNAPSHOT_VERSION; + SnapshotVersion version; private: /// get path of an object @@ -153,7 +157,7 @@ class KeeperSnapshotStore bool parseObject(String obj_path, KeeperStore & store); /// load batch header in an object - /// TODO use inter nal buffer + /// TODO use internal buffer bool loadBatchHeader(ptr fs, SnapshotBatchHeader & head); /// serialize whole data tree diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp new file mode 100644 index 0000000000..84fc8a0ab9 --- /dev/null +++ b/src/Service/SnapshotCommon.cpp @@ -0,0 +1,2 @@ +#include "SnapshotCommon.h" + diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h new file mode 100644 index 0000000000..e19d1971a2 --- /dev/null +++ b/src/Service/SnapshotCommon.h @@ -0,0 +1,14 @@ +// +// Created by wujianchao on 2/19/24. +// + +#ifndef RAFTKEEPER_SNAPSHOTUTILS_H +#define RAFTKEEPER_SNAPSHOTUTILS_H + + +class snapshotUtils +{ +}; + + +#endif //RAFTKEEPER_SNAPSHOTUTILS_H From 0d4331c2dca31820ecb58797d304473097572f5a Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Mon, 19 Feb 2024 17:57:42 +0800 Subject: [PATCH 2/5] Replace protobuf snapshot batch --- programs/converter/RaftKeeperConverter.cpp | 2 +- src/Service/KeeperServer.cpp | 2 +- src/Service/NuRaftLogSnapshot.cpp | 986 +++++++++++---------- src/Service/NuRaftLogSnapshot.h | 100 +-- src/Service/NuRaftStateMachine.h | 2 +- src/Service/SnapshotCommon.cpp | 677 +++++++++++++- src/Service/SnapshotCommon.h | 144 ++- src/Service/tests/gtest_raft_snapshot.cpp | 53 +- 8 files changed, 1413 insertions(+), 553 deletions(-) diff --git a/programs/converter/RaftKeeperConverter.cpp b/programs/converter/RaftKeeperConverter.cpp index 278d33079e..a091c65c6c 100644 --- a/programs/converter/RaftKeeperConverter.cpp +++ b/programs/converter/RaftKeeperConverter.cpp @@ -65,7 +65,7 @@ int mainEntryRaftKeeperConverter(int argc, char ** argv) store.getZxid()); nuraft::ptr new_snapshot(nuraft::cs_new(store.getZxid(), 1, std::make_shared())); nuraft::ptr snap_mgr = nuraft::cs_new( - options["output-dir"].as(), 3600 * 1, KeeperSnapshotStore::MAX_OBJECT_NODE_SIZE); + options["output-dir"].as(), 3600 * 1, MAX_OBJECT_NODE_SIZE); snap_mgr->createSnapshot(*new_snapshot, store, store.getZxid(), store.getSessionIDCounter()); std::cout << "Snapshot serialized to path:" << options["output-dir"].as() << std::endl; } diff --git a/src/Service/KeeperServer.cpp b/src/Service/KeeperServer.cpp index 34447322f2..f4034300a2 100644 --- a/src/Service/KeeperServer.cpp +++ b/src/Service/KeeperServer.cpp @@ -44,7 +44,7 @@ KeeperServer::KeeperServer( new_session_id_callback, state_manager->load_log_store(), checkAndGetSuperdigest(settings->super_digest), - KeeperSnapshotStore::MAX_OBJECT_NODE_SIZE, + MAX_OBJECT_NODE_SIZE, request_processor_); } namespace diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 77bae3b2e0..2eed80f98b 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -2,8 +2,6 @@ #include #include #include -#include -#include #include #include @@ -12,7 +10,6 @@ #include #include -#include #include #include @@ -40,376 +37,132 @@ namespace ErrorCodes using nuraft::cs_new; using Poco::NumberFormatter; -const UInt32 KeeperSnapshotStore::MAX_OBJECT_NODE_SIZE; - -const String MAGIC_SNAPSHOT_TAIL = "SnapTail"; -const String MAGIC_SNAPSHOT_HEAD = "SnapHead"; - -int openFileForWrite(const String & obj_path) -{ - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - errno = 0; - int snap_fd = ::open(obj_path.c_str(), O_RDWR | O_CREAT, 0644); - if (snap_fd < 0) - { - LOG_ERROR(log, "Created new snapshot object {} failed, fd {}, error:{}", obj_path, snap_fd, strerror(errno)); - return -1; - } - return snap_fd; -} - -/// snapshot object file header -bool isFileHeader(UInt64 magic) -{ - union - { - uint64_t magic_num; - uint8_t magic_array[8] = {'S', 'n', 'a', 'p', 'H', 'e', 'a', 'd'}; - }; - return magic == magic_num; -} - -/// snapshot object file tail -bool isFileTail(UInt64 magic) -{ - union - { - uint64_t magic_num; - uint8_t magic_array[8] = {'S', 'n', 'a', 'p', 'T', 'a', 'i', 'l'}; - }; - return magic == magic_num; -} - -std::shared_ptr openFileAndWriteHeader(const String & path, const SnapshotVersion version) -{ - auto out = std::make_shared(path); - out->write(MAGIC_SNAPSHOT_HEAD.data(), MAGIC_SNAPSHOT_HEAD.size()); - writeIntBinary(static_cast(version), *out); - return out; -} - -void writeTailAndClose(std::shared_ptr & out, UInt32 checksum) +void KeeperSnapshotStore::getObjectPath(ulong object_id, String & obj_path) { - out->write(MAGIC_SNAPSHOT_TAIL.data(), MAGIC_SNAPSHOT_TAIL.size()); - writeIntBinary(checksum, *out); - out->close(); + char path_buf[1024]; + snprintf(path_buf, 1024, SNAPSHOT_FILE_NAME, curr_time.c_str(), last_log_index, object_id); + obj_path = path_buf; + obj_path = snap_dir + "/" + obj_path; } -int openFileForRead(String & obj_path) +size_t KeeperSnapshotStore::getObjectIdx(const String & file_name) { - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - errno = 0; - int snap_fd = ::open(obj_path.c_str(), O_RDWR); - if (snap_fd < 0) - { - LOG_ERROR(log, "Open snapshot object {} failed, fd {}, error:{}", obj_path, snap_fd, strerror(errno)); - return -1; - } - return snap_fd; + auto it1 = file_name.find_last_of('_'); + return std::stoi(file_name.substr(it1 + 1, file_name.size() - it1)); } -/// save batch data in snapshot object -std::pair saveBatch(std::shared_ptr & out, ptr & batch) +size_t KeeperSnapshotStore::serializeDataTree(KeeperStore & storage) { - if (!batch) - batch = cs_new(); - - String str_buf; - batch->SerializeToString(&str_buf); - - SnapshotBatchHeader header; - header.data_length = str_buf.size(); - header.data_crc = RK::getCRC32(str_buf.c_str(), str_buf.size()); - - writeIntBinary(header.data_length, *out); - writeIntBinary(header.data_crc, *out); + std::shared_ptr out; + ptr batch; - out->write(str_buf.c_str(), header.data_length); - out->next(); + uint64_t processed = 0; + uint32_t checksum = 0; - return {SnapshotBatchHeader::HEADER_SIZE + header.data_length, header.data_crc}; -} + serializeNode(out, batch, storage, "/", processed, checksum); + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; -UInt32 updateCheckSum(UInt32 checksum, UInt32 data_crc) -{ - union - { - UInt64 data; - UInt32 crc[2]; - }; - crc[0] = checksum; - crc[1] = data_crc; - return RK::getCRC32(reinterpret_cast(&data), 8); -} + writeTailAndClose(out, checksum); + LOG_INFO(log, "Creating snapshot processed data size {}, current zxid {}", processed, storage.zxid); -std::pair -saveBatchAndUpdateCheckSum(std::shared_ptr & out, ptr & batch, UInt32 checksum) -{ - auto [save_size, data_crc] = saveBatch(out, batch); - /// rebuild batch - batch = cs_new(); - return {save_size, updateCheckSum(checksum, data_crc)}; + return getObjectIdx(out->getFileName()); } -void serializeAcls(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version) +size_t KeeperSnapshotStore::serializeDataTreeV2(KeeperStore & storage) { - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - - const auto & acl_map = acls.getMapping(); - LOG_INFO(log, "Begin create snapshot acl object, acl size {}, path {}", acl_map.size(), path); - - auto out = openFileAndWriteHeader(path, version); - ptr batch; - - uint64_t index = 0; - UInt32 checksum = 0; - - for (const auto & acl_it : acl_map) - { - /// flush and rebuild batch - if (index % save_batch_size == 0) - { - /// skip flush the first batch - if (index != 0) - { - /// write data in batch to file - auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); - checksum = new_checksum; - } - batch = cs_new(); - batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP); - } - - /// append to batch - SnapshotItemPB * entry = batch->add_data(); - WriteBufferFromNuraftBuffer buf; - Coordination::write(acl_it.first, buf); - Coordination::write(acl_it.second, buf); - - ptr data = buf.getBuffer(); - data->pos(0); - entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); + std::shared_ptr out; + ptr batch; - index++; - } + uint64_t processed = 0; + uint32_t checksum = 0; - /// flush the last acl batch - auto [_, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + serializeNodeV2(out, batch, storage, "/", processed, checksum); + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); checksum = new_checksum; writeTailAndClose(out, checksum); + LOG_INFO(log, "Creating snapshot processed data size {}, current zxid {}", processed, storage.zxid); + + return getObjectIdx(out->getFileName()); } -[[maybe_unused]] size_t serializeEphemerals(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size) +void KeeperSnapshotStore::serializeNode( + ptr & out, + ptr & batch, + KeeperStore & store, + const String & path, + uint64_t & processed, + uint32_t & checksum) { - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - LOG_INFO(log, "Begin create snapshot ephemeral object, node size {}, path {}", ephemerals.size(), path); - - ptr batch; + auto node = store.container.get(path); - std::lock_guard lock(mutex); + /// In case of node is deleted + if (!node) + return; - if (ephemerals.empty()) + std::shared_ptr node_copy; { - LOG_INFO(log, "Create snapshot ephemeral nodes size is 0"); - return 0; + std::shared_lock lock(node->mutex); + node_copy = node->clone(); } - auto out = cs_new(path); - uint64_t index = 0; - for (auto & ephemeral_it : ephemerals) + if (processed % max_object_node_size == 0) { - /// flush and rebuild batch - if (index % save_batch_size == 0) - { - /// skip flush the first batch - if (index != 0) - { - /// write data in batch to file - saveBatch(out, batch); - } - batch = cs_new(); - batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_DATA_EPHEMERAL); - } - - /// append to batch - SnapshotItemPB * entry = batch->add_data(); - WriteBufferFromNuraftBuffer buf; - Coordination::write(ephemeral_it.first, buf); - Coordination::write(ephemeral_it.second.size(), buf); + /// time to create new snapshot object + uint64_t obj_id = processed / max_object_node_size; - for (const auto & node_path : ephemeral_it.second) + if (obj_id != 0) { - Coordination::write(node_path, buf); - } + /// flush last batch data + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; - ptr data = buf.getBuffer(); - data->pos(0); - entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); + /// close current object file + writeTailAndClose(out, checksum); + /// reset checksum + checksum = 0; + } + String new_obj_path; + /// for there are 4 objects before data objects + getObjectPath(obj_id + 4, new_obj_path); - index++; + LOG_INFO(log, "Create new snapshot object {}, path {}", obj_id + 4, new_obj_path); + out = openFileAndWriteHeader(new_obj_path, version); } - /// flush the last batch - saveBatch(out, batch); - out->close(); - return 1; -} - -/** - * Serialize sessions and return the next_session_id before serialize - */ -int64_t serializeSessions(KeeperStore & store, UInt32 save_batch_size, const SnapshotVersion version, String & path) -{ - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - - auto out = openFileAndWriteHeader(path, version); - - - LOG_INFO(log, "Begin create snapshot session object, session size {}, path {}", store.session_and_timeout.size(), path); - - std::lock_guard lock(store.session_mutex); - std::lock_guard acl_lock(store.auth_mutex); - - int64_t next_session_id = store.session_id_counter; - ptr batch; - - uint64_t index = 0; - UInt32 checksum = 0; - - for (auto & session_it : store.session_and_timeout) + /// flush and rebuild batch + if (processed % save_batch_size == 0) { - /// flush and rebuild batch - if (index % save_batch_size == 0) + /// skip flush the first batch + if (processed != 0) { - /// skip flush the first batch - if (index != 0) - { - /// write data in batch to file - auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); - checksum = new_checksum; - } - batch = cs_new(); - batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_SESSION); + /// flush data in batch to file + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; } - - /// append to batch - SnapshotItemPB * entry = batch->add_data(); - WriteBufferFromNuraftBuffer buf; - Coordination::write(session_it.first, buf); //NewSession - Coordination::write(session_it.second, buf); //Timeout_ms - - Coordination::AuthIDs ids; - if (store.session_and_auth.count(session_it.first)) - ids = store.session_and_auth.at(session_it.first); - Coordination::write(ids, buf); - - ptr data = buf.getBuffer(); - data->pos(0); - entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); - - index++; - } - - /// flush the last batch - auto [_, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); - checksum = new_checksum; - writeTailAndClose(out, checksum); - - return next_session_id; -} - -/** - * Save map or map - */ -template -void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path) -{ - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - LOG_INFO(log, "Begin create snapshot map object, map size {}, path {}", snap_map.size(), path); - - auto out = openFileAndWriteHeader(path, version); - ptr batch; - - uint64_t index = 0; - UInt32 checksum = 0; - - for (auto & it : snap_map) - { - /// flush and rebuild batch - if (index % save_batch_size == 0) + else { - /// skip flush the first batch - if (index != 0) - { - /// write data in batch to file - auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); - checksum = new_checksum; - } - - batch = cs_new(); - if constexpr (std::is_same_v) - batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_STRINGMAP); - else if constexpr (std::is_same_v) - batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP); - else - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Only support string and int map."); + if (!batch) + batch = cs_new(); } - - /// append to batch - SnapshotItemPB * entry = batch->add_data(); - WriteBufferFromNuraftBuffer buf; - Coordination::write(it.first, buf); - Coordination::write(it.second, buf); - - ptr data = buf.getBuffer(); - data->pos(0); - entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); - - index++; } - /// flush the last batch - auto [_, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); - checksum = new_checksum; - writeTailAndClose(out, checksum); -} - -void KeeperSnapshotStore::getObjectPath(ulong object_id, String & obj_path) -{ - char path_buf[1024]; - snprintf(path_buf, 1024, SNAPSHOT_FILE_NAME, curr_time.c_str(), last_log_index, object_id); - obj_path = path_buf; - obj_path = snap_dir + "/" + obj_path; -} - -size_t KeeperSnapshotStore::getObjectIdx(const String & file_name) -{ - auto it1 = file_name.find_last_of('_'); - return std::stoi(file_name.substr(it1 + 1, file_name.size() - it1)); -} - -size_t KeeperSnapshotStore::serializeDataTree(KeeperStore & storage) -{ - std::shared_ptr out; - ptr batch; - - uint64_t processed = 0; - uint32_t checksum = 0; - - serializeNode(out, batch, storage, "/", processed, checksum); - auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); - checksum = new_checksum; + LOG_TRACE(log, "Append node path {}", path); + appendNodeToBatch(batch, path, node_copy); + processed++; - writeTailAndClose(out, checksum); - LOG_INFO(log, "Creating snapshot processed data size {}, current zxid {}", processed, storage.zxid); + String path_with_slash = path; + if (path != "/") + path_with_slash += '/'; - return getObjectIdx(out->getFileName()); + for (const auto & child : node->children) + serializeNode(out, batch, store, path_with_slash + child, processed, checksum); } -void KeeperSnapshotStore::serializeNode( +void KeeperSnapshotStore::serializeNodeV2( ptr & out, - ptr & batch, + ptr & batch, KeeperStore & store, const String & path, uint64_t & processed, @@ -435,7 +188,7 @@ void KeeperSnapshotStore::serializeNode( if (obj_id != 0) { /// flush last batch data - auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); checksum = new_checksum; /// close current object file @@ -458,18 +211,18 @@ void KeeperSnapshotStore::serializeNode( if (processed != 0) { /// flush data in batch to file - auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); checksum = new_checksum; } else { if (!batch) - batch = cs_new(); + batch = cs_new(); } } LOG_TRACE(log, "Append node path {}", path); - appendNodeToBatch(batch, path, node_copy); + appendNodeToBatchV2(batch, path, node_copy); processed++; String path_with_slash = path; @@ -477,7 +230,7 @@ void KeeperSnapshotStore::serializeNode( path_with_slash += '/'; for (const auto & child : node->children) - serializeNode(out, batch, store, path_with_slash + child, processed, checksum); + serializeNodeV2(out, batch, store, path_with_slash + child, processed, checksum); } void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node) @@ -497,7 +250,29 @@ void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const St entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); } +void KeeperSnapshotStore::appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node) +{ + WriteBufferFromNuraftBuffer buf; + + Coordination::write(path, buf); + Coordination::write(node->data, buf); + Coordination::write(node->acl_id, buf); + Coordination::write(node->is_ephemeral, buf); + Coordination::write(node->is_sequential, buf); + Coordination::write(node->stat, buf); + + ptr data = buf.getBuffer(); + data->pos(0); + batch->add(String(reinterpret_cast(data->data_begin()), data->size())); +} + size_t KeeperSnapshotStore::createObjects(KeeperStore & store, int64_t next_zxid, int64_t next_session_id) +{ + return version < SnapshotVersion::V2 ? createObjectsV1(store, next_zxid, next_session_id) + : createObjectsV2(store, next_zxid, next_session_id); +} + +size_t KeeperSnapshotStore::createObjectsV1(KeeperStore & store, int64_t next_zxid, int64_t next_session_id) { if (snap_meta->size() == 0) { @@ -568,6 +343,77 @@ size_t KeeperSnapshotStore::createObjects(KeeperStore & store, int64_t next_zxid return total_obj_count; } +size_t KeeperSnapshotStore::createObjectsV2(KeeperStore & store, int64_t next_zxid, int64_t next_session_id) +{ + if (snap_meta->size() == 0) + { + return 0; + } + + Poco::File(snap_dir).createDirectories(); + + size_t data_object_count = store.container.size() / max_object_node_size; + if (store.container.size() % max_object_node_size) + { + data_object_count += 1; + } + + //uint map、Sessions、acls、Normal node objects + size_t total_obj_count = data_object_count + 3; + + LOG_INFO( + log, + "Creating snapshot v3 with approximately data_object_count {}, total_obj_count {}, next zxid {}, next session id {}", + data_object_count, + total_obj_count, + next_zxid, + next_session_id); + + /// 1. Save uint map before nodes + IntMap int_map; + /// Next transaction id + int_map["ZXID"] = next_zxid; + /// Next session id + int_map["SESSIONID"] = next_session_id; + + String map_path; + getObjectPath(1, map_path); + serializeMapV2(int_map, save_batch_size, version, map_path); + + /// 2. Save sessions + String session_path; + /// object index should start from 1 + getObjectPath(2, session_path); + int64_t serialized_next_session_id = serializeSessionsV2(store, save_batch_size, version, session_path); + LOG_INFO( + log, + "Creating snapshot nex_session_id {}, serialized_next_session_id {}", + toHexString(next_session_id), + toHexString(serialized_next_session_id)); + + /// 3. Save acls + String acl_path; + /// object index should start from 1 + getObjectPath(3, acl_path); + serializeAclsV2(store.acl_map, acl_path, save_batch_size, version); + + /// 4. Save data tree + size_t last_id = serializeDataTreeV2(store); + + total_obj_count = last_id; + LOG_INFO(log, "Creating snapshot real data_object_count {}, total_obj_count {}", total_obj_count - 3, total_obj_count); + + /// add all path to objects_path + for (size_t i = 1; i < total_obj_count + 1; i++) + { + String path; + getObjectPath(i, path); + addObjectPath(i, path); + } + + return total_obj_count; +} + void KeeperSnapshotStore::init(String create_time = "") { if (create_time.empty()) @@ -581,7 +427,7 @@ void KeeperSnapshotStore::init(String create_time = "") } } -bool KeeperSnapshotStore::loadBatchHeader(ptr fs, SnapshotBatchHeader & head) +bool KeeperSnapshotStore::parseBatchHeader(ptr fs, SnapshotBatchHeader & head) { head.reset(); errno = 0; @@ -606,7 +452,7 @@ bool KeeperSnapshotStore::loadBatchHeader(ptr fs, SnapshotBatchHea return true; } -bool KeeperSnapshotStore::parseObject(String obj_path, KeeperStore & store) +bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, SnapshotVersion used_version) { ptr snap_fs = cs_new(); snap_fs->open(obj_path, std::ios::in | std::ios::binary); @@ -624,7 +470,8 @@ bool KeeperSnapshotStore::parseObject(String obj_path, KeeperStore & store) size_t read_size = 0; SnapshotBatchHeader header; UInt32 checksum = 0; - SnapshotVersion version_ = SnapshotVersion::None; + SnapshotVersion version_from_obj = SnapshotVersion::None; + while (!snap_fs->eof()) { size_t cur_read_size = read_size; @@ -634,21 +481,21 @@ bool KeeperSnapshotStore::parseObject(String obj_path, KeeperStore & store) // Just log it, and break; if (readUInt64(snap_fs, magic) != 0) { - if (snap_fs->eof() && version_ == SnapshotVersion::V0) + if (snap_fs->eof() && version_from_obj == SnapshotVersion::V0) { - LOG_INFO(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_)); + LOG_INFO(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj)); break; } - throw Exception(ErrorCodes::CORRUPTED_DATA, "snapshot {} load magic error, version {}", obj_path, uint8_t(version_)); + throw Exception(ErrorCodes::CORRUPTED_DATA, "snapshot {} load magic error, version {}", obj_path, toString(version_from_obj)); } read_size += 8; if (isFileHeader(magic)) { - char * buf = reinterpret_cast(&version_); + char * buf = reinterpret_cast(&version_from_obj); snap_fs->read(buf, sizeof(uint8_t)); read_size += 1; - LOG_INFO(log, "obj_path {}, read file header, version {}", obj_path, uint8_t(version_)); + LOG_INFO(log, "Got snapshot file header with version {}", toString(version_from_obj)); } else if (isFileTail(magic)) { @@ -663,17 +510,18 @@ bool KeeperSnapshotStore::parseObject(String obj_path, KeeperStore & store) } else { - if (version_ == SnapshotVersion::None) + if (version_from_obj == SnapshotVersion::None) { - version_ = SnapshotVersion::V0; - LOG_INFO(log, "obj_path {}, didn't read version, set to V0", obj_path); + version_from_obj = SnapshotVersion::V0; + LOG_INFO(log, "snapshot has no version, set to V0", obj_path); } LOG_INFO(log, "obj_path {}, didn't read the header and tail of the file", obj_path); snap_fs->seekg(cur_read_size); read_size = cur_read_size; } - if (!loadBatchHeader(snap_fs, header)) + + if (!parseBatchHeader(snap_fs, header)) { throw Exception(ErrorCodes::CORRUPTED_DATA, "snapshot {} load header error", obj_path); } @@ -696,39 +544,55 @@ bool KeeperSnapshotStore::parseObject(String obj_path, KeeperStore & store) delete[] body_buf; return false; } - SnapshotBatchPB batch_pb; - batch_pb.ParseFromString(String(body_buf, header.data_length)); - switch (batch_pb.batch_type()) - { - case SnapshotTypePB::SNAPSHOT_TYPE_DATA: { - LOG_INFO(log, "Load batch size {}, end point {}", batch_pb.data_size(), read_size); - for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) + + /// parse batch body by used_version + if (used_version == SnapshotVersion::None) + used_version = version_from_obj; + + if (used_version < SnapshotVersion::V2) + parseBatchBody(store, body_buf, header.data_length, used_version); + else + parseBatchBodyV2(store, body_buf, header.data_length, used_version); + delete[] body_buf; + } + return true; +} + +bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) +{ + SnapshotBatchPB batch_pb; + batch_pb.ParseFromString(String(batch_buf, length)); + switch (batch_pb.batch_type()) + { + case SnapshotTypePB::SNAPSHOT_TYPE_DATA: { + LOG_INFO(log, "Load batch size {}", batch_pb.data_size()); + for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) + { + const SnapshotItemPB & item_pb = batch_pb.data(data_idx); + const String & data = item_pb.data(); + ptr buf = buffer::alloc(data.size() + 1); + //buf->put(data.data(), data.size()); + buf->put(data); + buf->pos(0); + ReadBufferFromNuraftBuffer in(buf); + ptr node = cs_new(); + String key; + try { - const SnapshotItemPB & item_pb = batch_pb.data(data_idx); - const String & data = item_pb.data(); - ptr buf = buffer::alloc(data.size() + 1); - //buf->put(data.data(), data.size()); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - ptr node = cs_new(); - String key; - try + Coordination::read(key, in); + Coordination::read(node->data, in); + if (version_ >= SnapshotVersion::V1) { - Coordination::read(key, in); - Coordination::read(node->data, in); - if (version_ >= SnapshotVersion::V1) - { - Coordination::read(node->acl_id, in); - } - else if (version_ == SnapshotVersion::V0) - { - /// Deserialize ACL - Coordination::ACLs acls; - Coordination::read(acls, in); - node->acl_id = store.acl_map.convertACLs(acls); - LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); - } + Coordination::read(node->acl_id, in); + } + else if (version_ == SnapshotVersion::V0) + { + /// Deserialize ACL + Coordination::ACLs acls; + Coordination::read(acls, in); + node->acl_id = store.acl_map.convertACLs(acls); + LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); + } /// Some strange ACLID during deserialization from ZooKeeper if (node->acl_id == std::numeric_limits::max()) @@ -744,86 +608,84 @@ bool KeeperSnapshotStore::parseObject(String obj_path, KeeperStore & store) LOG_TRACE(log, "Load snapshot read key {}, node stat {}", key, node->stat.toString()); store.container.emplace(key, std::move(node)); - if (ephemeral_owner != 0) - { - LOG_TRACE(log, "Load snapshot find ephemeral node {} - {}", ephemeral_owner, key); - std::lock_guard l(store.ephemerals_mutex); - auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; - ephemeral_nodes.emplace(key); - } - } - catch (Coordination::Exception & e) + if (ephemeral_owner != 0) { - LOG_WARNING( - log, - "Can't read snapshot data {}, data index {}, key {}, excepiton {}", - obj_path, - data_idx, - key, - e.displayText()); - break; + LOG_TRACE(log, "Load snapshot find ephemeral node {} - {}", ephemeral_owner, key); + std::lock_guard l(store.ephemerals_mutex); + auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; + ephemeral_nodes.emplace(key); } } + catch (Coordination::Exception & e) + { + LOG_WARNING( + log, + "Can't read snapshot data, data index {}, key {}, excepiton {}", + data_idx, + key, + e.displayText()); + break; + } } + } + break; + case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG: break; - case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG: - break; - case SnapshotTypePB::SNAPSHOT_TYPE_SERVER: - break; - case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: { - for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) + case SnapshotTypePB::SNAPSHOT_TYPE_SERVER: + break; + case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: { + for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) + { + const SnapshotItemPB & item_pb = batch_pb.data(data_idx); + const String & data = item_pb.data(); + ptr buf = buffer::alloc(data.size() + 1); + buf->put(data); + buf->pos(0); + ReadBufferFromNuraftBuffer in(buf); + int64_t session_id; + int64_t timeout; + try { - const SnapshotItemPB & item_pb = batch_pb.data(data_idx); - const String & data = item_pb.data(); - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - int64_t session_id; - int64_t timeout; - try + Coordination::read(session_id, in); + Coordination::read(timeout, in); + if (version_ >= SnapshotVersion::V1) { - Coordination::read(session_id, in); - Coordination::read(timeout, in); - if (version_ >= SnapshotVersion::V1) + Coordination::AuthIDs ids; + Coordination::read(ids, in); { - Coordination::AuthIDs ids; - Coordination::read(ids, in); + if (!ids.empty()) { - if (!ids.empty()) - { - std::lock_guard lock(store.auth_mutex); - store.session_and_auth[session_id] = ids; - } + std::lock_guard lock(store.auth_mutex); + store.session_and_auth[session_id] = ids; } } } - catch (Coordination::Exception & e) - { - LOG_WARNING( - log, - "Can't read type_ephemeral snapshot {}, data index {}, key {}, excepiton {}", - obj_path, - data_idx, - e.displayText()); - break; - } - LOG_TRACE(log, "Read session id {}, timeout {}", session_id, timeout); - store.addSessionID(session_id, timeout); } + catch (Coordination::Exception & e) + { + LOG_WARNING( + log, + "Can't read snapshot, data index {}, key {}, excepiton {}", + data_idx, + e.displayText()); + break; + } + LOG_TRACE(log, "Read session id {}, timeout {}", session_id, timeout); + store.addSessionID(session_id, timeout); } - break; - case SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP: - if (version_ >= SnapshotVersion::V1) + } + break; + case SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP: + if (version_ >= SnapshotVersion::V1) + { + for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) { - for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) - { - const SnapshotItemPB & item_pb = batch_pb.data(data_idx); - const String & data = item_pb.data(); - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); + const SnapshotItemPB & item_pb = batch_pb.data(data_idx); + const String & data = item_pb.data(); + ptr buf = buffer::alloc(data.size() + 1); + buf->put(data); + buf->pos(0); + ReadBufferFromNuraftBuffer in(buf); uint64_t acl_id; Coordination::ACLs acls; @@ -831,54 +693,230 @@ bool KeeperSnapshotStore::parseObject(String obj_path, KeeperStore & store) Coordination::read(acl_id, in); Coordination::read(acls, in); - LOG_TRACE(log, "parseObject acl_id {}", acl_id); - store.acl_map.addMapping(acl_id, acls); - } + LOG_TRACE(log, "parseObject acl_id {}", acl_id); + store.acl_map.addMapping(acl_id, acls); } - break; - case SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP: { - IntMap int_map; - for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) + } + break; + case SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP: { + IntMap int_map; + for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) + { + const SnapshotItemPB & item_pb = batch_pb.data(data_idx); + const String & data = item_pb.data(); + ptr buf = buffer::alloc(data.size() + 1); + buf->put(data); + buf->pos(0); + ReadBufferFromNuraftBuffer in(buf); + String key; + int64_t value; + try { - const SnapshotItemPB & item_pb = batch_pb.data(data_idx); - const String & data = item_pb.data(); - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - String key; - int64_t value; - try + Coordination::read(key, in); + Coordination::read(value, in); + } + catch (Coordination::Exception & e) + { + LOG_WARNING( + log, + "Can't read uint map snapshot, data index {}, key {}, exception {}", + data_idx, + e.displayText()); + break; + } + int_map[key] = value; + } + if (int_map.find("ZXID") != int_map.end()) + { + store.zxid = int_map["ZXID"]; + } + if (int_map.find("SESSIONID") != int_map.end()) + { + store.session_id_counter = int_map["SESSIONID"]; + } + } + break; + default: + break; + } + return true; +} + +bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) +{ + SnapshotBatchBody batch; + batch.parse(String(batch_buf, length)); + switch (batch.type) + { + case SnapshotBatchType::SNAPSHOT_TYPE_DATA: { + LOG_INFO(log, "Load batch size {}", batch.size()); + for (size_t data_idx = 0; data_idx < batch.size(); data_idx++) + { + const String & data = batch[data_idx]; + ptr buf = buffer::alloc(data.size() + 1); + buf->put(data); + buf->pos(0); + ReadBufferFromNuraftBuffer in(buf); + ptr node = cs_new(); + String key; + try + { + Coordination::read(key, in); + Coordination::read(node->data, in); + if (version_ >= SnapshotVersion::V1) + { + Coordination::read(node->acl_id, in); + } + else if (version_ == SnapshotVersion::V0) { - Coordination::read(key, in); - Coordination::read(value, in); + /// Deserialize ACL + Coordination::ACLs acls; + Coordination::read(acls, in); + node->acl_id = store.acl_map.convertACLs(acls); + LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); } - catch (Coordination::Exception & e) + + /// Some strange ACLID during deserialization from ZooKeeper + if (node->acl_id == std::numeric_limits::max()) + node->acl_id = 0; + + store.acl_map.addUsage(node->acl_id); + LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); + + Coordination::read(node->is_ephemeral, in); + Coordination::read(node->is_sequential, in); + Coordination::read(node->stat, in); + auto ephemeral_owner = node->stat.ephemeralOwner; + LOG_TRACE(log, "Load snapshot read key {}, node stat {}", key, node->stat.toString()); + store.container.emplace(key, std::move(node)); + + if (ephemeral_owner != 0) { - LOG_WARNING( - log, - "Can't read uint map snapshot {}, data index {}, key {}, exception {}", - obj_path, - data_idx, - e.displayText()); - break; + LOG_TRACE(log, "Load snapshot find ephemeral node {} - {}", ephemeral_owner, key); + std::lock_guard l(store.ephemerals_mutex); + auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; + ephemeral_nodes.emplace(key); + } + } + catch (Coordination::Exception & e) + { + LOG_WARNING( + log, + "Can't read snapshot data, data index {}, key {}, excepiton {}", + data_idx, + key, + e.displayText()); + break; + } + } + } + break; + case SnapshotBatchType::SNAPSHOT_TYPE_CONFIG: + break; + case SnapshotBatchType::SNAPSHOT_TYPE_SERVER: + break; + case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: { + for (size_t data_idx = 0; data_idx < batch.size(); data_idx++) + { + const String & data = batch[data_idx]; + ptr buf = buffer::alloc(data.size() + 1); + buf->put(data); + buf->pos(0); + ReadBufferFromNuraftBuffer in(buf); + int64_t session_id; + int64_t timeout; + try + { + Coordination::read(session_id, in); + Coordination::read(timeout, in); + if (version_ >= SnapshotVersion::V1) + { + Coordination::AuthIDs ids; + Coordination::read(ids, in); + { + if (!ids.empty()) + { + std::lock_guard lock(store.auth_mutex); + store.session_and_auth[session_id] = ids; + } + } } - int_map[key] = value; } - if (int_map.find("ZXID") != int_map.end()) + catch (Coordination::Exception & e) { - store.zxid = int_map["ZXID"]; + LOG_WARNING( + log, + "Can't read snapshot, data index {}, key {}, excepiton {}", + data_idx, + e.displayText()); + break; } - if (int_map.find("SESSIONID") != int_map.end()) + LOG_TRACE(log, "Read session id {}, timeout {}", session_id, timeout); + store.addSessionID(session_id, timeout); + } + } + break; + case SnapshotBatchType::SNAPSHOT_TYPE_ACLMAP: + if (version_ >= SnapshotVersion::V1) + { + for (size_t data_idx = 0; data_idx < batch.size(); data_idx++) { - store.session_id_counter = int_map["SESSIONID"]; + const String & data = batch[data_idx]; + ptr buf = buffer::alloc(data.size() + 1); + buf->put(data); + buf->pos(0); + ReadBufferFromNuraftBuffer in(buf); + + uint64_t acl_id; + Coordination::ACLs acls; + + Coordination::read(acl_id, in); + Coordination::read(acls, in); + + LOG_TRACE(log, "parseObject acl_id {}", acl_id); + store.acl_map.addMapping(acl_id, acls); } } break; - default: - break; + case SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP: { + IntMap int_map; + for (size_t data_idx = 0; data_idx < batch.size(); data_idx++) + { + const String & data = batch[data_idx]; + ptr buf = buffer::alloc(data.size() + 1); + buf->put(data); + buf->pos(0); + ReadBufferFromNuraftBuffer in(buf); + String key; + int64_t value; + try + { + Coordination::read(key, in); + Coordination::read(value, in); + } + catch (Coordination::Exception & e) + { + LOG_WARNING( + log, + "Can't read uint map snapshot, data index {}, key {}, exception {}", + data_idx, + e.displayText()); + break; + } + int_map[key] = value; + } + if (int_map.find("ZXID") != int_map.end()) + { + store.zxid = int_map["ZXID"]; + } + if (int_map.find("SESSIONID") != int_map.end()) + { + store.session_id_counter = int_map["SESSIONID"]; + } } - delete[] body_buf; + break; + default: + break; } return true; } diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index 2401b6a3a5..b2ba2971fd 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -1,24 +1,6 @@ #pragma once -#include -#include - -#include -#include - -#include -#include -#include -#ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wsuggest-destructor-override" -# pragma clang diagnostic ignored "-Wheader-hygiene" -#endif -#include -#ifdef __clang__ -# pragma clang diagnostic pop -#endif -#include +#include namespace RK @@ -26,32 +8,6 @@ namespace RK using nuraft::snapshot; using nuraft::ulong; -enum SnapshotVersion : uint8_t -{ - V0 = 0, - V1 = 1, /// Add ACL map - V2 = 2, /// Replace protobuf - V3 = 3, /// Add last_log_term to file name - None = 255, -}; - -static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V2; - -struct SnapshotBatchHeader -{ - /// The length of the batch data (uncompressed) - UInt32 data_length; - /// The CRC32C of the batch data. - /// If compression is enabled, this is the checksum of the compressed data. - UInt32 data_crc; - void reset() - { - data_length = 0; - data_crc = 0; - } - static const size_t HEADER_SIZE = 8; -}; - /** * Operate a snapshot, when the current snapshot is down, we should renew a store. * @@ -68,9 +24,6 @@ struct SnapshotBatchHeader class KeeperSnapshotStore { public: - using StringMap = std::unordered_map; - using IntMap = std::unordered_map; - KeeperSnapshotStore( const String & snap_dir_, snapshot & meta, @@ -105,7 +58,7 @@ class KeeperSnapshotStore void init(String create_time); /// parse the latest snapshot - void loadLatestSnapshot(KeeperStore & store); + void loadLatestSnapshot(KeeperStore & store, SnapshotVersion used_version = SnapshotVersion::None); /// load on object of the latest snapshot void loadObject(ulong obj_id, ptr & buffer); @@ -136,36 +89,39 @@ class KeeperSnapshotStore static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_%s_%lu_%lu_%lu"; #endif - static const String MAGIC_SNAPSHOT_TAIL; - static const String MAGIC_SNAPSHOT_HEAD; - - /// 0.3KB / Node * 100M Count = 300MB - static const UInt32 MAX_OBJECT_NODE_SIZE = 1000000; - /// 100M Count / 10K = 10K - static const UInt32 SAVE_BATCH_SIZE = 10000; - - static const int SNAPSHOT_THREAD_NUM = 8; - static const int IO_BUFFER_SIZE = 16384; /// 16K + static constexpr int SNAPSHOT_THREAD_NUM = 8; + static constexpr int IO_BUFFER_SIZE = 16384; /// 16K SnapshotVersion version; private: + /// For snapshot version v1 + size_t createObjectsV1(KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0); + /// For snapshot version v3 + size_t createObjectsV2(KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0); + /// get path of an object void getObjectPath(ulong object_id, String & path); /// parse object - bool parseObject(String obj_path, KeeperStore & store); + bool parseObject(KeeperStore & store, String obj_path, SnapshotVersion used_version); /// load batch header in an object /// TODO use internal buffer - bool loadBatchHeader(ptr fs, SnapshotBatchHeader & head); + bool parseBatchHeader(ptr fs, SnapshotBatchHeader & head); + + /// parse a batch + bool parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_); + /// parse a batch + bool parseBatchBodyV2(KeeperStore & store, char * buf, size_t length, SnapshotVersion version_); /// serialize whole data tree size_t serializeDataTree(KeeperStore & storage); - /** - * Serialize data tree by deep traversal. - */ + /// for snapshot version v3 + size_t serializeDataTreeV2(KeeperStore & storage); + + /// Serialize data tree by deep traversal. void serializeNode( ptr & out, ptr & batch, @@ -174,8 +130,19 @@ class KeeperSnapshotStore uint64_t & processed, uint32_t & checksum); + /// for snapshot version v3 + void serializeNodeV2( + ptr & out, + ptr & batch, + KeeperStore & store, + const String & path, + uint64_t & processed, + uint32_t & checksum); + /// append node to batch inline static void appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node); + /// for snapshot version v3 + inline static void appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node); /// snapshot directory, note than the directory may contain more than one snapshot. String snap_dir; @@ -225,7 +192,7 @@ class KeeperSnapshotManager ~KeeperSnapshotManager() = default; - size_t createSnapshot(snapshot & meta, KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0); + size_t createSnapshot(snapshot & meta, KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0, SnapshotVersion version = CURRENT_SNAPSHOT_VERSION); /// save snapshot meta, invoked when we receive an snapshot from leader. bool receiveSnapshotMeta(snapshot & meta); @@ -243,7 +210,7 @@ class KeeperSnapshotManager bool loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr & buffer); /// parse snapshot object, invoked when follower apply received snapshot to state machine. - bool parseSnapshot(const snapshot & meta, KeeperStore & storage); + bool parseSnapshot(const snapshot & meta, KeeperStore & storage, SnapshotVersion used_version = CURRENT_SNAPSHOT_VERSION); /// latest snapshot meta ptr lastSnapshot(); @@ -255,7 +222,6 @@ class KeeperSnapshotManager size_t removeSnapshots(); private: - /// snapshot directory String snap_dir; diff --git a/src/Service/NuRaftStateMachine.h b/src/Service/NuRaftStateMachine.h index 93826e3282..7643cc59a6 100644 --- a/src/Service/NuRaftStateMachine.h +++ b/src/Service/NuRaftStateMachine.h @@ -38,7 +38,7 @@ class NuRaftStateMachine : public nuraft::state_machine std::unordered_map> & new_session_id_callback_, ptr log_store_ = nullptr, String super_digest = "", - UInt32 object_node_size = KeeperSnapshotStore::MAX_OBJECT_NODE_SIZE, + UInt32 object_node_size = MAX_OBJECT_NODE_SIZE, std::shared_ptr request_processor_ = nullptr); ~NuRaftStateMachine() override = default; diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 84fc8a0ab9..9be5e23076 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -1,2 +1,677 @@ -#include "SnapshotCommon.h" +#include +#include +#include + +#include +#include +#include +#include +#include + +#ifdef __clang__ +# pragma clang diagnostic push +# pragma clang diagnostic ignored "-Wformat-nonliteral" +#endif + +namespace RK +{ + +namespace ErrorCodes +{ + extern const int CHECKSUM_DOESNT_MATCH; + extern const int CORRUPTED_DATA; + extern const int UNKNOWN_FORMAT_VERSION; + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + +using nuraft::cs_new; + +String toString(SnapshotVersion version) +{ + switch (version) + { + case SnapshotVersion::V0: + return "v0"; + case SnapshotVersion::V1: + return "v1"; + case SnapshotVersion::V2: + return "v2"; + case SnapshotVersion::V3: + return "v3"; + case SnapshotVersion::None: + return "none"; + } +} + +int openFileForWrite(const String & obj_path) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + errno = 0; + int snap_fd = ::open(obj_path.c_str(), O_RDWR | O_CREAT, 0644); + if (snap_fd < 0) + { + LOG_ERROR(log, "Created new snapshot object {} failed, fd {}, error:{}", obj_path, snap_fd, strerror(errno)); + return -1; + } + return snap_fd; +} + +bool isFileHeader(UInt64 magic) +{ + union + { + uint64_t magic_num; + uint8_t magic_array[8] = {'S', 'n', 'a', 'p', 'H', 'e', 'a', 'd'}; + }; + return magic == magic_num; +} + +bool isFileTail(UInt64 magic) +{ + union + { + uint64_t magic_num; + uint8_t magic_array[8] = {'S', 'n', 'a', 'p', 'T', 'a', 'i', 'l'}; + }; + return magic == magic_num; +} + +std::shared_ptr openFileAndWriteHeader(const String & path, const SnapshotVersion version) +{ + auto out = std::make_shared(path); + out->write(MAGIC_SNAPSHOT_HEAD.data(), MAGIC_SNAPSHOT_HEAD.size()); + writeIntBinary(static_cast(version), *out); + return out; +} + +void writeTailAndClose(std::shared_ptr & out, UInt32 checksum) +{ + out->write(MAGIC_SNAPSHOT_TAIL.data(), MAGIC_SNAPSHOT_TAIL.size()); + writeIntBinary(checksum, *out); + out->close(); +} + +int openFileForRead(String & obj_path) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + errno = 0; + int snap_fd = ::open(obj_path.c_str(), O_RDWR); + if (snap_fd < 0) + { + LOG_ERROR(log, "Open snapshot object {} failed, fd {}, error:{}", obj_path, snap_fd, strerror(errno)); + return -1; + } + return snap_fd; +} + + +/// save batch data in snapshot object +std::pair saveBatch(std::shared_ptr & out, ptr & batch) +{ + if (!batch) + batch = cs_new(); + + String str_buf; + batch->SerializeToString(&str_buf); + + SnapshotBatchHeader header; + header.data_length = str_buf.size(); + header.data_crc = RK::getCRC32(str_buf.c_str(), str_buf.size()); + + writeIntBinary(header.data_length, *out); + writeIntBinary(header.data_crc, *out); + + out->write(str_buf.c_str(), header.data_length); + out->next(); + + return {SnapshotBatchHeader::HEADER_SIZE + header.data_length, header.data_crc}; +} + +std::pair +saveBatchAndUpdateCheckSum(std::shared_ptr & out, ptr & batch, UInt32 checksum) +{ + auto [save_size, data_crc] = saveBatch(out, batch); + /// rebuild batch + batch = cs_new(); + return {save_size, updateCheckSum(checksum, data_crc)}; +} + +void serializeAcls(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + + const auto & acl_map = acls.getMapping(); + LOG_INFO(log, "Begin create snapshot acl object, acl size {}, path {}", acl_map.size(), path); + + auto out = openFileAndWriteHeader(path, version); + ptr batch; + + uint64_t index = 0; + UInt32 checksum = 0; + + for (const auto & acl_it : acl_map) + { + /// flush and rebuild batch + if (index % save_batch_size == 0) + { + /// skip flush the first batch + if (index != 0) + { + /// write data in batch to file + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; + } + batch = cs_new(); + batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP); + } + + /// append to batch + SnapshotItemPB * entry = batch->add_data(); + WriteBufferFromNuraftBuffer buf; + Coordination::write(acl_it.first, buf); + Coordination::write(acl_it.second, buf); + + ptr data = buf.getBuffer(); + data->pos(0); + entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); + + index++; + } + + /// flush the last acl batch + auto [_, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; + + writeTailAndClose(out, checksum); +} + +[[maybe_unused]] size_t serializeEphemerals(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + LOG_INFO(log, "Begin create snapshot ephemeral object, node size {}, path {}", ephemerals.size(), path); + + ptr batch; + + std::lock_guard lock(mutex); + + if (ephemerals.empty()) + { + LOG_INFO(log, "Create snapshot ephemeral nodes size is 0"); + return 0; + } + + auto out = cs_new(path); + uint64_t index = 0; + for (auto & ephemeral_it : ephemerals) + { + /// flush and rebuild batch + if (index % save_batch_size == 0) + { + /// skip flush the first batch + if (index != 0) + { + /// write data in batch to file + saveBatch(out, batch); + } + batch = cs_new(); + batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_DATA_EPHEMERAL); + } + + /// append to batch + SnapshotItemPB * entry = batch->add_data(); + WriteBufferFromNuraftBuffer buf; + Coordination::write(ephemeral_it.first, buf); + Coordination::write(ephemeral_it.second.size(), buf); + + for (const auto & node_path : ephemeral_it.second) + { + Coordination::write(node_path, buf); + } + + ptr data = buf.getBuffer(); + data->pos(0); + entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); + + index++; + } + + /// flush the last batch + saveBatch(out, batch); + out->close(); + return 1; +} + +/** + * Serialize sessions and return the next_session_id before serialize + */ +int64_t serializeSessions(KeeperStore & store, UInt32 save_batch_size, const SnapshotVersion version, String & path) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + + auto out = openFileAndWriteHeader(path, version); + + + LOG_INFO(log, "Begin create snapshot session object, session size {}, path {}", store.session_and_timeout.size(), path); + + std::lock_guard lock(store.session_mutex); + std::lock_guard acl_lock(store.auth_mutex); + + int64_t next_session_id = store.session_id_counter; + ptr batch; + + uint64_t index = 0; + UInt32 checksum = 0; + + for (auto & session_it : store.session_and_timeout) + { + /// flush and rebuild batch + if (index % save_batch_size == 0) + { + /// skip flush the first batch + if (index != 0) + { + /// write data in batch to file + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; + } + batch = cs_new(); + batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_SESSION); + } + + /// append to batch + SnapshotItemPB * entry = batch->add_data(); + WriteBufferFromNuraftBuffer buf; + Coordination::write(session_it.first, buf); //NewSession + Coordination::write(session_it.second, buf); //Timeout_ms + + Coordination::AuthIDs ids; + if (store.session_and_auth.count(session_it.first)) + ids = store.session_and_auth.at(session_it.first); + Coordination::write(ids, buf); + + ptr data = buf.getBuffer(); + data->pos(0); + entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); + + index++; + } + + /// flush the last batch + auto [_, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; + writeTailAndClose(out, checksum); + + return next_session_id; +} + +/** + * Save map or map + */ +template +void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + LOG_INFO(log, "Begin create snapshot map object, map size {}, path {}", snap_map.size(), path); + + auto out = openFileAndWriteHeader(path, version); + ptr batch; + + uint64_t index = 0; + UInt32 checksum = 0; + + for (auto & it : snap_map) + { + /// flush and rebuild batch + if (index % save_batch_size == 0) + { + /// skip flush the first batch + if (index != 0) + { + /// write data in batch to file + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; + } + + batch = cs_new(); + if constexpr (std::is_same_v) + batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_STRINGMAP); + else if constexpr (std::is_same_v) + batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP); + else + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Only support string and int map."); + } + + /// append to batch + SnapshotItemPB * entry = batch->add_data(); + WriteBufferFromNuraftBuffer buf; + Coordination::write(it.first, buf); + Coordination::write(it.second, buf); + + ptr data = buf.getBuffer(); + data->pos(0); + entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); + + index++; + } + + /// flush the last batch + auto [_, new_checksum] = saveBatchAndUpdateCheckSum(out, batch, checksum); + checksum = new_checksum; + writeTailAndClose(out, checksum); +} + +template void serializeMap(StringMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); +template void serializeMap(IntMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); + +std::pair saveBatchV2(std::shared_ptr & out, ptr & batch) +{ + if (!batch) + batch = cs_new(); + + String str_buf = SnapshotBatchBody::serialize(*batch); + + SnapshotBatchHeader header; + header.data_length = str_buf.size(); + header.data_crc = RK::getCRC32(str_buf.c_str(), str_buf.size()); + + writeIntBinary(header.data_length, *out); + writeIntBinary(header.data_crc, *out); + + out->write(str_buf.c_str(), header.data_length); + out->next(); + + return {SnapshotBatchHeader::HEADER_SIZE + header.data_length, header.data_crc}; +} + +UInt32 updateCheckSum(UInt32 checksum, UInt32 data_crc) +{ + union + { + UInt64 data; + UInt32 crc[2]; + }; + crc[0] = checksum; + crc[1] = data_crc; + return RK::getCRC32(reinterpret_cast(&data), 8); +} + +std::pair +saveBatchAndUpdateCheckSumV2(std::shared_ptr & out, ptr & batch, UInt32 checksum) +{ + auto [save_size, data_crc] = saveBatchV2(out, batch); + /// rebuild batch + batch = cs_new(); + return {save_size, updateCheckSum(checksum, data_crc)}; +} + +void serializeAclsV2(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + + const auto & acl_map = acls.getMapping(); + LOG_INFO(log, "Begin create snapshot acl object, acl size {}, path {}", acl_map.size(), path); + + auto out = openFileAndWriteHeader(path, version); + ptr batch; + + uint64_t index = 0; + UInt32 checksum = 0; + + for (const auto & acl_it : acl_map) + { + /// flush and rebuild batch + if (index % save_batch_size == 0) + { + /// skip flush the first batch + if (index != 0) + { + /// write data in batch to file + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); + checksum = new_checksum; + } + batch = cs_new(); + batch->type = SnapshotBatchType::SNAPSHOT_TYPE_ACLMAP; + } + + /// append to batch + WriteBufferFromNuraftBuffer buf; + Coordination::write(acl_it.first, buf); + Coordination::write(acl_it.second, buf); + + ptr data = buf.getBuffer(); + data->pos(0); + batch->add(String(reinterpret_cast(data->data_begin()), data->size())); + + index++; + } + + /// flush the last acl batch + auto [_, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); + checksum = new_checksum; + + writeTailAndClose(out, checksum); +} + +[[maybe_unused]] size_t serializeEphemeralsV2(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + LOG_INFO(log, "Begin create snapshot ephemeral object, node size {}, path {}", ephemerals.size(), path); + + ptr batch; + + std::lock_guard lock(mutex); + + if (ephemerals.empty()) + { + LOG_INFO(log, "Create snapshot ephemeral nodes size is 0"); + return 0; + } + + auto out = cs_new(path); + uint64_t index = 0; + for (auto & ephemeral_it : ephemerals) + { + /// flush and rebuild batch + if (index % save_batch_size == 0) + { + /// skip flush the first batch + if (index != 0) + { + /// write data in batch to file + saveBatchV2(out, batch); + } + batch = cs_new(); + batch->type = SnapshotBatchType::SNAPSHOT_TYPE_DATA_EPHEMERAL; + } + + /// append to batch + WriteBufferFromNuraftBuffer buf; + Coordination::write(ephemeral_it.first, buf); + Coordination::write(ephemeral_it.second.size(), buf); + + for (const auto & node_path : ephemeral_it.second) + { + Coordination::write(node_path, buf); + } + + ptr data = buf.getBuffer(); + data->pos(0); + batch->add(String(reinterpret_cast(data->data_begin()), data->size())); + + index++; + } + + /// flush the last batch + saveBatchV2(out, batch); + out->close(); + return 1; +} + +int64_t serializeSessionsV2(KeeperStore & store, UInt32 save_batch_size, const SnapshotVersion version, String & path) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + + auto out = openFileAndWriteHeader(path, version); + + + LOG_INFO(log, "Begin create snapshot session object, session size {}, path {}", store.session_and_timeout.size(), path); + + std::lock_guard lock(store.session_mutex); + std::lock_guard acl_lock(store.auth_mutex); + + int64_t next_session_id = store.session_id_counter; + ptr batch; + + uint64_t index = 0; + UInt32 checksum = 0; + + for (auto & session_it : store.session_and_timeout) + { + /// flush and rebuild batch + if (index % save_batch_size == 0) + { + /// skip flush the first batch + if (index != 0) + { + /// write data in batch to file + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); + checksum = new_checksum; + } + batch = cs_new(); + batch->type = SnapshotBatchType::SNAPSHOT_TYPE_SESSION; + } + + /// append to batch + WriteBufferFromNuraftBuffer buf; + Coordination::write(session_it.first, buf); //NewSession + Coordination::write(session_it.second, buf); //Timeout_ms + + Coordination::AuthIDs ids; + if (store.session_and_auth.count(session_it.first)) + ids = store.session_and_auth.at(session_it.first); + Coordination::write(ids, buf); + + ptr data = buf.getBuffer(); + data->pos(0); + batch->add(String(reinterpret_cast(data->data_begin()), data->size())); + + index++; + } + + /// flush the last batch + auto [_, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); + checksum = new_checksum; + writeTailAndClose(out, checksum); + + return next_session_id; +} + +template +void serializeMapV2(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path) +{ + Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); + LOG_INFO(log, "Begin create snapshot map object, map size {}, path {}", snap_map.size(), path); + + auto out = openFileAndWriteHeader(path, version); + ptr batch; + + uint64_t index = 0; + UInt32 checksum = 0; + + for (auto & it : snap_map) + { + /// flush and rebuild batch + if (index % save_batch_size == 0) + { + /// skip flush the first batch + if (index != 0) + { + /// write data in batch to file + auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); + checksum = new_checksum; + } + + batch = cs_new(); + if constexpr (std::is_same_v) + batch->type = SnapshotBatchType::SNAPSHOT_TYPE_STRINGMAP; + else if constexpr (std::is_same_v) + batch->type = SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP; + else + throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Only support string and int map."); + } + + /// append to batch + WriteBufferFromNuraftBuffer buf; + Coordination::write(it.first, buf); + Coordination::write(it.second, buf); + + ptr data = buf.getBuffer(); + data->pos(0); + batch->add(String(reinterpret_cast(data->data_begin()), data->size())); + + index++; + } + + /// flush the last batch + auto [_, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum); + checksum = new_checksum; + writeTailAndClose(out, checksum); +} + +template void serializeMapV2(StringMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); +template void serializeMapV2(IntMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); + +void SnapshotBatchBody::add(const String & element) +{ + elements.push_back(element); +} + +size_t SnapshotBatchBody::size() const +{ + return elements.size(); +} + +String & SnapshotBatchBody::operator[](size_t n) +{ + return elements.at(n); +} + +String SnapshotBatchBody::serialize(const SnapshotBatchBody & batch_body) +{ + WriteBufferFromOwnString buf; + writeIntBinary(static_cast(batch_body.type), buf); + writeIntBinary(static_cast(batch_body.elements.size()), buf); + for (const auto & element : batch_body.elements) + { + writeIntBinary(static_cast(element.size()), buf); + writeString(element, buf); + } + return std::move(buf.str()); +} + +ptr SnapshotBatchBody::parse(const String & data) +{ + ptr batch_body = std::make_shared(); + ReadBufferFromMemory in(data.c_str(), data.size()); + readIntBinary(batch_body->type, in); + int32_t element_count; + readIntBinary(element_count, in); + batch_body->elements.reserve(element_count); + for (int i = 0; i < element_count; i++) + { + int32_t element_size; + readIntBinary(element_size, in); + String element; + element.resize(element_size); + in.readStrict(element.data(), element_size); + batch_body->elements.emplace_back(std::move(element)); + } + return batch_body; +} + +} + +#ifdef __clang__ +# pragma clang diagnostic pop +#endif diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index e19d1971a2..90dedcadfc 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -1,14 +1,144 @@ -// -// Created by wujianchao on 2/19/24. -// +#pragma once -#ifndef RAFTKEEPER_SNAPSHOTUTILS_H -#define RAFTKEEPER_SNAPSHOTUTILS_H +#include +#include +#include +#include -class snapshotUtils +#include +#include +#include +#ifdef __clang__ +# pragma clang diagnostic push +# pragma clang diagnostic ignored "-Wsuggest-destructor-override" +# pragma clang diagnostic ignored "-Wheader-hygiene" +#endif +#include +#ifdef __clang__ +# pragma clang diagnostic pop +#endif +#include + + +namespace RK +{ + +const String MAGIC_SNAPSHOT_TAIL = "SnapTail"; +const String MAGIC_SNAPSHOT_HEAD = "SnapHead"; + +/// 0.3KB / Node * 100M Count = 300MB +static constexpr UInt32 MAX_OBJECT_NODE_SIZE = 1000000; +/// 100M Count / 10K = 10K +static constexpr UInt32 SAVE_BATCH_SIZE = 10000; + +using StringMap = std::unordered_map; +using IntMap = std::unordered_map; + +enum SnapshotVersion : uint8_t +{ + V0 = 0, + V1 = 1, /// Add ACL map + V2 = 2, /// Replace protobuf + V3 = 3, /// Add last_log_term to file name + None = 255, +}; + +String toString(SnapshotVersion version); + + +static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V2; + +struct SnapshotBatchHeader +{ + /// The length of the batch data (uncompressed) + UInt32 data_length; + /// The CRC32C of the batch data. + /// If compression is enabled, this is the checksum of the compressed data. + UInt32 data_crc; + void reset() + { + data_length = 0; + data_crc = 0; + } + static const size_t HEADER_SIZE = 8; +}; + +enum class SnapshotBatchType : int +{ + SNAPSHOT_TYPE_DATA = 0, + SNAPSHOT_TYPE_DATA_EPHEMERAL = 1, + SNAPSHOT_TYPE_CONFIG = 2, + SNAPSHOT_TYPE_SERVER = 3, + SNAPSHOT_TYPE_SESSION = 4, + SNAPSHOT_TYPE_STRINGMAP = 5, + SNAPSHOT_TYPE_UINTMAP = 6, + SNAPSHOT_TYPE_ACLMAP = 7 +}; + +struct SnapshotBatchBody { + SnapshotBatchType type; + /// element count + int32_t count; + std::vector elements; + + void add(const String & element); + size_t size() const; + String & operator[](size_t n); + + static String serialize(const SnapshotBatchBody & batch_body); + static ptr parse(const String & data); }; +int openFileForWrite(const String & obj_path); +int openFileForRead(String & obj_path); + +/// snapshot object file header +bool isFileHeader(UInt64 magic); + +/// snapshot object file tail +bool isFileTail(UInt64 magic); + +std::shared_ptr openFileAndWriteHeader(const String & path, const SnapshotVersion version); +void writeTailAndClose(std::shared_ptr & out, UInt32 checksum); + +UInt32 updateCheckSum(UInt32 checksum, UInt32 data_crc); + +/// ----- For snapshot version 1 ----- // TODO delete + +/// save batch data in snapshot object +std::pair saveBatch(std::shared_ptr & out, ptr & batch); +std::pair +saveBatchAndUpdateCheckSum(std::shared_ptr & out, ptr & batch, UInt32 checksum); + +void serializeAcls(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version); +[[maybe_unused]] size_t serializeEphemerals(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size); + +/// Serialize sessions and return the next_session_id before serialize +int64_t serializeSessions(KeeperStore & store, UInt32 save_batch_size, SnapshotVersion version, String & path); + +/// Save map or map +template +void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); + + +/// ----- For snapshot version 2 ----- + +/// save batch data in snapshot object +std::pair saveBatchV2(std::shared_ptr & out, ptr & batch); +std::pair +saveBatchAndUpdateCheckSumV2(std::shared_ptr & out, ptr & batch, UInt32 checksum); + +void serializeAclsV2(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version); +[[maybe_unused]] size_t +serializeEphemeralsV2(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size); + +/// Serialize sessions and return the next_session_id before serialize +int64_t serializeSessionsV2(KeeperStore & store, UInt32 save_batch_size, SnapshotVersion version, String & path); + +/// Save map or map +template +void serializeMapV2(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); -#endif //RAFTKEEPER_SNAPSHOTUTILS_H +} diff --git a/src/Service/tests/gtest_raft_snapshot.cpp b/src/Service/tests/gtest_raft_snapshot.cpp index 04b5526841..0ac43faf56 100644 --- a/src/Service/tests/gtest_raft_snapshot.cpp +++ b/src/Service/tests/gtest_raft_snapshot.cpp @@ -395,6 +395,57 @@ TEST(RaftSnapshot, readAndSaveSnapshot) cleanDirectory(snap_save_dir); } +TEST(RaftSnapshot, compitablilityWithV3) +{ + String snap_read_dir(SNAP_DIR + "/13"); + String snap_save_dir(SNAP_DIR + "/14"); + cleanDirectory(snap_read_dir); + cleanDirectory(snap_save_dir); + + UInt32 last_index = 1024; + UInt32 term = 1; + KeeperSnapshotManager snap_mgr_read(snap_read_dir, 3, 100); + KeeperSnapshotManager snap_mgr_save(snap_save_dir, 3, 100); + + ptr config = cs_new(1, 0); + + RaftSettingsPtr raft_settings(RaftSettings::getDefault()); + KeeperStore store(raft_settings->dead_session_check_period_ms); + + for (int i = 0; i < last_index; i++) + { + String key = std::to_string(i + 1); + String value = "table_" + key; + setNode(store, key, value); + } + snapshot meta(last_index, term, config); + size_t object_size = snap_mgr_read.createSnapshot(meta, store, SnapshotVersion::V1); + ASSERT_EQ(object_size, 11 + 1 + 1 + 1); + + ulong obj_id = 0; + snap_mgr_save.receiveSnapshotMeta(meta); + while (true) + { + obj_id++; + if (!snap_mgr_read.existSnapshotObject(meta, obj_id)) + { + break; + } + ptr buffer; + snap_mgr_read.loadSnapshotObject(meta, obj_id, buffer); + if (buffer != nullptr) + { + snap_mgr_save.saveSnapshotObject(meta, obj_id, *(buffer.get())); + } + } + for (auto i = 1; i < obj_id; i++) + { + ASSERT_TRUE(snap_mgr_save.existSnapshotObject(meta, i)); + } + cleanDirectory(snap_read_dir); + cleanDirectory(snap_save_dir); +} + void parseSnapshot(const SnapshotVersion create_version, const SnapshotVersion parse_version) { String snap_dir(SNAP_DIR + "/5"); @@ -478,7 +529,7 @@ void parseSnapshot(const SnapshotVersion create_version, const SnapshotVersion p ASSERT_EQ(store.container.size(), 2050); /// Include "/" node snapshot meta(last_index, term, config); - size_t object_size = snap_mgr.createSnapshot(meta, store, store.zxid, store.session_id_counter); + size_t object_size = snap_mgr.createSnapshot(meta, store, store.zxid, store.session_id_counter, create_version); /// Normal node objects、Sessions、Others(int_map)、ACL_MAP ASSERT_EQ(object_size, 21 + 3); From b16c2db2bc06a0b4b227681842b545d1152f209b Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Tue, 20 Feb 2024 11:08:35 +0800 Subject: [PATCH 3/5] Add tests for replacing protobuf snapshot batch --- src/Service/NuRaftLogSnapshot.cpp | 144 +++++------ src/Service/NuRaftLogSnapshot.h | 40 ++-- src/Service/SnapshotCommon.cpp | 6 +- src/Service/SnapshotCommon.h | 2 - src/Service/tests/gtest_raft_snapshot.cpp | 280 ++++++++++------------ 5 files changed, 219 insertions(+), 253 deletions(-) diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 2eed80f98b..f1aa0dcfc1 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -452,7 +452,7 @@ bool KeeperSnapshotStore::parseBatchHeader(ptr fs, SnapshotBatchHe return true; } -bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, SnapshotVersion used_version) +bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) { ptr snap_fs = cs_new(); snap_fs->open(obj_path, std::ios::in | std::ios::binary); @@ -496,6 +496,8 @@ bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, Snap snap_fs->read(buf, sizeof(uint8_t)); read_size += 1; LOG_INFO(log, "Got snapshot file header with version {}", toString(version_from_obj)); + if (version_from_obj > CURRENT_SNAPSHOT_VERSION) + throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version_from_obj); } else if (isFileTail(magic)) { @@ -545,14 +547,10 @@ bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, Snap return false; } - /// parse batch body by used_version - if (used_version == SnapshotVersion::None) - used_version = version_from_obj; - - if (used_version < SnapshotVersion::V2) - parseBatchBody(store, body_buf, header.data_length, used_version); + if (version_from_obj < SnapshotVersion::V2) + parseBatchBody(store, body_buf, header.data_length, version_from_obj); else - parseBatchBodyV2(store, body_buf, header.data_length, used_version); + parseBatchBodyV2(store, body_buf, header.data_length, version_from_obj); delete[] body_buf; } return true; @@ -574,7 +572,7 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, //buf->put(data.data(), data.size()); buf->put(data); buf->pos(0); - ReadBufferFromNuraftBuffer in(buf); + ReadBufferFromNuRaftBuffer in(buf); ptr node = cs_new(); String key; try @@ -594,19 +592,19 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); } - /// Some strange ACLID during deserialization from ZooKeeper - if (node->acl_id == std::numeric_limits::max()) - node->acl_id = 0; + /// Some strange ACLID during deserialization from ZooKeeper + if (node->acl_id == std::numeric_limits::max()) + node->acl_id = 0; - store.acl_map.addUsage(node->acl_id); - LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); + store.acl_map.addUsage(node->acl_id); + LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); - Coordination::read(node->is_ephemeral, in); - Coordination::read(node->is_sequential, in); - Coordination::read(node->stat, in); - auto ephemeral_owner = node->stat.ephemeralOwner; - LOG_TRACE(log, "Load snapshot read key {}, node stat {}", key, node->stat.toString()); - store.container.emplace(key, std::move(node)); + Coordination::read(node->is_ephemeral, in); + Coordination::read(node->is_sequential, in); + Coordination::read(node->stat, in); + auto ephemeral_owner = node->stat.ephemeralOwner; + LOG_TRACE(log, "Load snapshot read key {}, node stat {}", key, node->stat.toString()); + store.container.emplace(key, std::move(node)); if (ephemeral_owner != 0) { @@ -641,7 +639,7 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, ptr buf = buffer::alloc(data.size() + 1); buf->put(data); buf->pos(0); - ReadBufferFromNuraftBuffer in(buf); + ReadBufferFromNuRaftBuffer in(buf); int64_t session_id; int64_t timeout; try @@ -685,13 +683,13 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, ptr buf = buffer::alloc(data.size() + 1); buf->put(data); buf->pos(0); - ReadBufferFromNuraftBuffer in(buf); + ReadBufferFromNuRaftBuffer in(buf); - uint64_t acl_id; - Coordination::ACLs acls; + uint64_t acl_id; + Coordination::ACLs acls; - Coordination::read(acl_id, in); - Coordination::read(acls, in); + Coordination::read(acl_id, in); + Coordination::read(acls, in); LOG_TRACE(log, "parseObject acl_id {}", acl_id); store.acl_map.addMapping(acl_id, acls); @@ -707,7 +705,7 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, ptr buf = buffer::alloc(data.size() + 1); buf->put(data); buf->pos(0); - ReadBufferFromNuraftBuffer in(buf); + ReadBufferFromNuRaftBuffer in(buf); String key; int64_t value; try @@ -744,19 +742,19 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) { - SnapshotBatchBody batch; - batch.parse(String(batch_buf, length)); - switch (batch.type) + ptr batch; + batch = SnapshotBatchBody::parse(String(batch_buf, length)); + switch (batch->type) { case SnapshotBatchType::SNAPSHOT_TYPE_DATA: { - LOG_INFO(log, "Load batch size {}", batch.size()); - for (size_t data_idx = 0; data_idx < batch.size(); data_idx++) + LOG_INFO(log, "Loading snapshot data batch with element count {}", batch->size()); + for (size_t data_idx = 0; data_idx < batch->size(); data_idx++) { - const String & data = batch[data_idx]; + const String & data = (*batch)[data_idx]; ptr buf = buffer::alloc(data.size() + 1); buf->put(data); buf->pos(0); - ReadBufferFromNuraftBuffer in(buf); + ReadBufferFromNuRaftBuffer in(buf); ptr node = cs_new(); String key; try @@ -816,13 +814,14 @@ bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf case SnapshotBatchType::SNAPSHOT_TYPE_SERVER: break; case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: { - for (size_t data_idx = 0; data_idx < batch.size(); data_idx++) + LOG_INFO(log, "Loading snapshot session batch with element count {}", batch->size()); + for (size_t data_idx = 0; data_idx < batch->size(); data_idx++) { - const String & data = batch[data_idx]; + const String & data = (*batch)[data_idx]; ptr buf = buffer::alloc(data.size() + 1); buf->put(data); buf->pos(0); - ReadBufferFromNuraftBuffer in(buf); + ReadBufferFromNuRaftBuffer in(buf); int64_t session_id; int64_t timeout; try @@ -857,15 +856,16 @@ bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf } break; case SnapshotBatchType::SNAPSHOT_TYPE_ACLMAP: + LOG_INFO(log, "Loading snapshot acl batch with element count {}", batch->size()); if (version_ >= SnapshotVersion::V1) { - for (size_t data_idx = 0; data_idx < batch.size(); data_idx++) + for (size_t data_idx = 0; data_idx < batch->size(); data_idx++) { - const String & data = batch[data_idx]; + const String & data = (*batch)[data_idx]; ptr buf = buffer::alloc(data.size() + 1); buf->put(data); buf->pos(0); - ReadBufferFromNuraftBuffer in(buf); + ReadBufferFromNuRaftBuffer in(buf); uint64_t acl_id; Coordination::ACLs acls; @@ -879,14 +879,15 @@ bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf } break; case SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP: { + LOG_INFO(log, "Loading snapshot int_map batch with element count {}", batch->size()); IntMap int_map; - for (size_t data_idx = 0; data_idx < batch.size(); data_idx++) + for (size_t data_idx = 0; data_idx < batch->size(); data_idx++) { - const String & data = batch[data_idx]; + const String & data = (*batch)[data_idx]; ptr buf = buffer::alloc(data.size() + 1); buf->put(data); buf->pos(0); - ReadBufferFromNuraftBuffer in(buf); + ReadBufferFromNuRaftBuffer in(buf); String key; int64_t value; try @@ -923,39 +924,37 @@ bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) { - SnapshotVersion current_version = static_cast(version); - if (current_version > version) - throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version); - ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM); for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++) { - object_thread_pool.trySchedule([this, thread_idx, &store] { - Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.parseObjectThread")); - UInt32 obj_idx = 0; - for (auto it = this->objects_path.begin(); it != this->objects_path.end(); it++) + object_thread_pool.trySchedule( + [this, thread_idx, &store] { - if (obj_idx % SNAPSHOT_THREAD_NUM == thread_idx) + Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.parseObjectThread")); + UInt32 obj_idx = 0; + for (auto it = this->objects_path.begin(); it != this->objects_path.end(); it++) { - LOG_INFO( - thread_log, - "Parse snapshot object, thread_idx {}, obj_index {}, path {}, obj size {}", - thread_idx, - it->first, - it->second, - this->objects_path.size()); - try - { - this->parseObject(it->second, store); - } - catch (Exception & e) + if (obj_idx % SNAPSHOT_THREAD_NUM == thread_idx) { - LOG_ERROR(log, "parseObject error {}, {}", it->second, getExceptionMessage(e, true)); + LOG_INFO( + thread_log, + "Parse snapshot object, thread_idx {}, obj_index {}, path {}, obj size {}", + thread_idx, + it->first, + it->second, + this->objects_path.size()); + try + { + this->parseObject(store, it->second); + } + catch (Exception & e) + { + LOG_ERROR(log, "parseObject error {}, {}", it->second, getExceptionMessage(e, true)); + } } + obj_idx++; } - obj_idx++; - } - }); + }); } object_thread_pool.wait(); @@ -1092,15 +1091,17 @@ void KeeperSnapshotStore::addObjectPath(ulong obj_id, String & path) objects_path[obj_id] = path; } -size_t KeeperSnapshotManager::createSnapshot(snapshot & meta, KeeperStore & store, int64_t next_zxid, int64_t next_session_id) +size_t KeeperSnapshotManager::createSnapshot( + snapshot & meta, KeeperStore & store, int64_t next_zxid, int64_t next_session_id, SnapshotVersion version) { size_t store_size = store.container.size(); meta.set_size(store_size); - ptr snap_store = cs_new(snap_dir, meta, object_node_size); + ptr snap_store = cs_new(snap_dir, meta, object_node_size, SAVE_BATCH_SIZE, version); snap_store->init(); LOG_INFO( log, - "Create snapshot last_log_term {}, last_log_idx {}, size {}, nodes {}, ephemeral nodes {}, sessions {}, session_id_counter {}, zxid {}", + "Create snapshot last_log_term {}, last_log_idx {}, size {}, nodes {}, ephemeral nodes {}, sessions {}, session_id_counter {}, " + "zxid {}", meta.get_last_log_term(), meta.get_last_log_idx(), meta.size(), @@ -1280,6 +1281,7 @@ size_t KeeperSnapshotManager::removeSnapshots() } return snapshots.size(); } + } #ifdef __clang__ diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index b2ba2971fd..f6584413f2 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -57,8 +57,8 @@ class KeeperSnapshotStore /// initialize a snapshot store void init(String create_time); - /// parse the latest snapshot - void loadLatestSnapshot(KeeperStore & store, SnapshotVersion used_version = SnapshotVersion::None); + /// Load the latest snapshot object. + void loadLatestSnapshot(KeeperStore & store); /// load on object of the latest snapshot void loadObject(ulong obj_id, ptr & buffer); @@ -103,22 +103,22 @@ class KeeperSnapshotStore /// get path of an object void getObjectPath(ulong object_id, String & path); - /// parse object - bool parseObject(KeeperStore & store, String obj_path, SnapshotVersion used_version); + /// Parse an snapshot object. We should take the version from snapshot in general. + bool parseObject(KeeperStore & store, String obj_path); - /// load batch header in an object + /// Parse batch header in an object /// TODO use internal buffer bool parseBatchHeader(ptr fs, SnapshotBatchHeader & head); - /// parse a batch + /// Parse a batch bool parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_); - /// parse a batch + /// Parse a batch bool parseBatchBodyV2(KeeperStore & store, char * buf, size_t length, SnapshotVersion version_); - /// serialize whole data tree + /// Serialize whole data tree size_t serializeDataTree(KeeperStore & storage); - /// for snapshot version v3 + /// For snapshot version v3 size_t serializeDataTreeV2(KeeperStore & storage); /// Serialize data tree by deep traversal. @@ -130,7 +130,7 @@ class KeeperSnapshotStore uint64_t & processed, uint32_t & checksum); - /// for snapshot version v3 + /// For snapshot version v3 void serializeNodeV2( ptr & out, ptr & batch, @@ -139,18 +139,18 @@ class KeeperSnapshotStore uint64_t & processed, uint32_t & checksum); - /// append node to batch + /// Append node to batch inline static void appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node); - /// for snapshot version v3 + /// For snapshot version v3 inline static void appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node); - /// snapshot directory, note than the directory may contain more than one snapshot. + /// Snapshot directory, note than the directory may contain more than one snapshot. String snap_dir; - /// an object can contain how many items + /// How many items an object can contain UInt32 max_object_node_size; - /// a batch can contain how many items + /// How many items a batch can contain UInt32 save_batch_size; Poco::Logger * log; @@ -158,10 +158,10 @@ class KeeperSnapshotStore /// metadata of a snapshot ptr snap_meta; - /// last log index in the snapshot + /// Last log index in the snapshot UInt64 last_log_index; - /// lost log index term in the snapshot + /// Lost log index term in the snapshot UInt64 last_log_term; std::map objects_path; @@ -169,7 +169,7 @@ class KeeperSnapshotStore /// Appended to snapshot file name String curr_time; - /// used to create snapshot asynchronously, + /// Used to create snapshot asynchronously, /// but now creating snapshot is synchronous std::shared_ptr snapshot_thread; }; @@ -177,7 +177,7 @@ class KeeperSnapshotStore using KeeperSnapshotStoreMap = std::map>; /** - * Snapshots manager who may create, remove snapshot. + * Snapshots manager who may create, remove snapshots. */ class KeeperSnapshotManager { @@ -210,7 +210,7 @@ class KeeperSnapshotManager bool loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr & buffer); /// parse snapshot object, invoked when follower apply received snapshot to state machine. - bool parseSnapshot(const snapshot & meta, KeeperStore & storage, SnapshotVersion used_version = CURRENT_SNAPSHOT_VERSION); + bool parseSnapshot(const snapshot & meta, KeeperStore & storage); /// latest snapshot meta ptr lastSnapshot(); diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 9be5e23076..8cc5e60d77 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -4,7 +4,7 @@ #include #include -#include +#include #include #include #include @@ -654,7 +654,9 @@ ptr SnapshotBatchBody::parse(const String & data) { ptr batch_body = std::make_shared(); ReadBufferFromMemory in(data.c_str(), data.size()); - readIntBinary(batch_body->type, in); + int32_t type; + readIntBinary(type, in); + batch_body->type = static_cast(type); int32_t element_count; readIntBinary(element_count, in); batch_body->elements.reserve(element_count); diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index 90dedcadfc..b3abd2e8b8 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -79,8 +79,6 @@ enum class SnapshotBatchType : int struct SnapshotBatchBody { SnapshotBatchType type; - /// element count - int32_t count; std::vector elements; void add(const String & element); diff --git a/src/Service/tests/gtest_raft_snapshot.cpp b/src/Service/tests/gtest_raft_snapshot.cpp index 0ac43faf56..e6eb00f32c 100644 --- a/src/Service/tests/gtest_raft_snapshot.cpp +++ b/src/Service/tests/gtest_raft_snapshot.cpp @@ -395,73 +395,123 @@ TEST(RaftSnapshot, readAndSaveSnapshot) cleanDirectory(snap_save_dir); } -TEST(RaftSnapshot, compitablilityWithV3) + +void compareKeeperStore(KeeperStore & store, KeeperStore & new_store, bool compare_acl) { - String snap_read_dir(SNAP_DIR + "/13"); - String snap_save_dir(SNAP_DIR + "/14"); - cleanDirectory(snap_read_dir); - cleanDirectory(snap_save_dir); + ASSERT_EQ(new_store.container.size(), store.container.size()); + for (UInt32 i = 0; i < store.container.getBlockNum(); i++) + { + auto & inner_map = store.container.getMap(i); + for (auto it = inner_map.getMap().begin(); it != inner_map.getMap().end(); it++) + { + auto new_node = new_store.container.get(it->first); + ASSERT_TRUE(new_node != nullptr); + ASSERT_EQ(new_node->data, it->second->data); + if (compare_acl) + { + ASSERT_EQ(new_node->acl_id, it->second->acl_id); + } - UInt32 last_index = 1024; - UInt32 term = 1; - KeeperSnapshotManager snap_mgr_read(snap_read_dir, 3, 100); - KeeperSnapshotManager snap_mgr_save(snap_save_dir, 3, 100); + ASSERT_EQ(new_node->is_ephemeral, it->second->is_ephemeral); + ASSERT_EQ(new_node->is_sequential, it->second->is_sequential); + ASSERT_EQ(new_node->stat, it->second->stat); + ASSERT_EQ(new_node->children, it->second->children); + } + } + ASSERT_EQ(new_store.container.get("/1020/test112")->data, "test211"); - ptr config = cs_new(1, 0); + ASSERT_TRUE(true) << "compare container."; - RaftSettingsPtr raft_settings(RaftSettings::getDefault()); - KeeperStore store(raft_settings->dead_session_check_period_ms); + /// compare ephemeral + ASSERT_EQ(new_store.ephemerals.size(), store.ephemerals.size()); + ASSERT_EQ(store.ephemerals.size(), 1); + for (const auto & [session_id, paths] : store.ephemerals) + { + ASSERT_FALSE(new_store.ephemerals.find(session_id) == new_store.ephemerals.end()); + ASSERT_EQ(paths, new_store.ephemerals.find(session_id)->second); + } - for (int i = 0; i < last_index; i++) + ASSERT_TRUE(true) << "compare ephemeral."; + + /// compare sessions + ASSERT_EQ(store.session_and_timeout.size(), 10003); + ASSERT_EQ(store.session_and_timeout.size(), new_store.session_and_timeout.size()); + ASSERT_EQ(store.session_and_timeout, new_store.session_and_timeout); + + ASSERT_TRUE(true) << "compare sessions."; + + /// compare Others(int_map) + ASSERT_EQ(store.session_id_counter, 10004); + ASSERT_EQ(store.session_id_counter, new_store.session_id_counter); + ASSERT_EQ(store.zxid, new_store.zxid); + + ASSERT_TRUE(true) << "compare Others(int_map)."; + + + /// compare session_and_auth + if (compare_acl) { - String key = std::to_string(i + 1); - String value = "table_" + key; - setNode(store, key, value); + ASSERT_EQ(store.session_and_auth, new_store.session_and_auth); } - snapshot meta(last_index, term, config); - size_t object_size = snap_mgr_read.createSnapshot(meta, store, SnapshotVersion::V1); - ASSERT_EQ(object_size, 11 + 1 + 1 + 1); - ulong obj_id = 0; - snap_mgr_save.receiveSnapshotMeta(meta); - while (true) + ASSERT_TRUE(true) << "compare session_and_auth."; + + /// compare ACLs + if (compare_acl) { - obj_id++; - if (!snap_mgr_read.existSnapshotObject(meta, obj_id)) + /// include : vector acl, (ACL::All, "digest", "user1:password1"), (ACL::Read, "digest", "user1:password1"), (ACL::All, "digest", "user1:password") + ASSERT_EQ(new_store.acl_map.getMapping().size(), 4); + ASSERT_EQ(store.acl_map.getMapping(), new_store.acl_map.getMapping()); + + const auto & acls = new_store.acl_map.convertNumber(store.container.get("/1020")->acl_id); + ASSERT_EQ(acls.size(), 2); + ASSERT_EQ(acls[0].id, "user1:XDkd2dsEuhc9ImU3q8pa8UOdtpI="); + ASSERT_EQ(acls[1].id, "user1:CGujN0OWj2wmttV5NJgM2ja68PQ="); + + for (const auto & acl : new_store.acl_map.convertNumber(store.container.get("/1022")->acl_id)) { - break; + ASSERT_EQ(acl.permissions, ACL::Read); } - ptr buffer; - snap_mgr_read.loadSnapshotObject(meta, obj_id, buffer); - if (buffer != nullptr) + + for (const auto & acl : new_store.acl_map.convertNumber(store.container.get("/1024")->acl_id)) { - snap_mgr_save.saveSnapshotObject(meta, obj_id, *(buffer.get())); + ASSERT_EQ(acl.permissions, ACL::All); + ASSERT_EQ(acl.id, "user1:CGujN0OWj2wmttV5NJgM2ja68PQ="); } + + const auto & const_acl_usage_counter = store.acl_map.getUsageCounter(); + auto & acl_usage_counter = const_cast(const_acl_usage_counter); + const auto & const_new_acl_usage_counter = new_store.acl_map.getUsageCounter(); + auto & new_acl_usage_counter = const_cast(const_new_acl_usage_counter); + + ASSERT_EQ(acl_usage_counter, new_acl_usage_counter); + + const auto & acls_1020 = getACL(new_store, "/1020"); + ASSERT_EQ(acls_1020[0].id, "user1:XDkd2dsEuhc9ImU3q8pa8UOdtpI="); + ASSERT_EQ(acls_1020[1].id, "user1:CGujN0OWj2wmttV5NJgM2ja68PQ="); + // end of compare } - for (auto i = 1; i < obj_id; i++) - { - ASSERT_TRUE(snap_mgr_save.existSnapshotObject(meta, i)); - } - cleanDirectory(snap_read_dir); - cleanDirectory(snap_save_dir); + + ASSERT_TRUE(true) << "compare ACLs."; } -void parseSnapshot(const SnapshotVersion create_version, const SnapshotVersion parse_version) +void parseSnapshot(const SnapshotVersion version1, const SnapshotVersion version2) { String snap_dir(SNAP_DIR + "/5"); cleanDirectory(snap_dir); + KeeperSnapshotManager snap_mgr(snap_dir, 3, 100); ptr config = cs_new(1, 0); RaftSettingsPtr raft_settings(RaftSettings::getDefault()); KeeperStore store(raft_settings->dead_session_check_period_ms); + /// 1. build keeper store + /// session 1 store.getSessionID(3000); addAuth(store, 1, "digest", "user1:password1"); /// set acl to session - UInt32 last_index = 2048; - UInt32 term = 1; for (int i = 1; i <= 1024; i++) { String key = std::to_string(i); @@ -528,140 +578,54 @@ void parseSnapshot(const SnapshotVersion create_version, const SnapshotVersion p ASSERT_EQ(store.container.size(), 2050); /// Include "/" node - snapshot meta(last_index, term, config); - size_t object_size = snap_mgr.createSnapshot(meta, store, store.zxid, store.session_id_counter, create_version); + /// 2. create snapshot with version1 + + snapshot meta(1, 1, config); + size_t object_size = snap_mgr.createSnapshot(meta, store, store.zxid, store.session_id_counter, version1); /// Normal node objects、Sessions、Others(int_map)、ACL_MAP ASSERT_EQ(object_size, 21 + 3); - KeeperStore new_storage(raft_settings->dead_session_check_period_ms); - - ASSERT_TRUE(snap_mgr.parseSnapshot(meta, new_storage)); - - /// compare container - ASSERT_EQ(new_storage.container.size(), 2050); /// Include "/" node, "/1020/test112" - ASSERT_EQ(new_storage.container.size(), store.container.size()); - for (UInt32 i = 0; i < store.container.getBlockNum(); i++) - { - auto & inner_map = store.container.getMap(i); - for (auto it = inner_map.getMap().begin(); it != inner_map.getMap().end(); it++) - { - auto new_node = new_storage.container.get(it->first); - ASSERT_TRUE(new_node != nullptr); - ASSERT_EQ(new_node->data, it->second->data); - if (create_version >= V1 && parse_version >= V1) - { - ASSERT_EQ(new_node->acl_id, it->second->acl_id); - } - - ASSERT_EQ(new_node->is_ephemeral, it->second->is_ephemeral); - ASSERT_EQ(new_node->is_sequential, it->second->is_sequential); - ASSERT_EQ(new_node->stat, it->second->stat); - ASSERT_EQ(new_node->children, it->second->children); - } - } - ASSERT_EQ(new_storage.container.get("/1020/test112")->data, "test211"); - - ASSERT_TRUE(true) << "compare container."; - - /// compare ephemeral - ASSERT_EQ(new_storage.ephemerals.size(), store.ephemerals.size()); - ASSERT_EQ(store.ephemerals.size(), 1); - for (const auto & [session_id, paths] : store.ephemerals) - { - ASSERT_FALSE(new_storage.ephemerals.find(session_id) == new_storage.ephemerals.end()); - ASSERT_EQ(paths, new_storage.ephemerals.find(session_id)->second); - } - - ASSERT_TRUE(true) << "compare ephemeral."; - - /// compare sessions - ASSERT_EQ(store.session_and_timeout.size(), 10003); - ASSERT_EQ(store.session_and_timeout.size(), new_storage.session_and_timeout.size()); - ASSERT_EQ(store.session_and_timeout, new_storage.session_and_timeout); - - ASSERT_TRUE(true) << "compare sessions."; - - /// compare Others(int_map) - ASSERT_EQ(store.session_id_counter, 10004); - ASSERT_EQ(store.session_id_counter, new_storage.session_id_counter); - ASSERT_EQ(store.zxid, new_storage.zxid); - - ASSERT_TRUE(true) << "compare Others(int_map)."; - - - /// compare session_and_auth - if (create_version >= V1 && parse_version >= V1) - { - ASSERT_EQ(store.session_and_auth, new_storage.session_and_auth); - } - - ASSERT_TRUE(true) << "compare session_and_auth."; - - /// compare ACLs - if (create_version >= V1 && parse_version >= V1) - { - /// include : vector acl, (ACL::All, "digest", "user1:password1"), (ACL::Read, "digest", "user1:password1"), (ACL::All, "digest", "user1:password") - ASSERT_EQ(new_storage.acl_map.getMapping().size(), 4); - ASSERT_EQ(store.acl_map.getMapping(), new_storage.acl_map.getMapping()); - - const auto & acls = new_storage.acl_map.convertNumber(store.container.get("/1020")->acl_id); - ASSERT_EQ(acls.size(), 2); - ASSERT_EQ(acls[0].id, "user1:XDkd2dsEuhc9ImU3q8pa8UOdtpI="); - ASSERT_EQ(acls[1].id, "user1:CGujN0OWj2wmttV5NJgM2ja68PQ="); - - for (const auto & acl : new_storage.acl_map.convertNumber(store.container.get("/1022")->acl_id)) - { - ASSERT_EQ(acl.permissions, ACL::Read); - } - - for (const auto & acl : new_storage.acl_map.convertNumber(store.container.get("/1024")->acl_id)) - { - ASSERT_EQ(acl.permissions, ACL::All); - ASSERT_EQ(acl.id, "user1:CGujN0OWj2wmttV5NJgM2ja68PQ="); - } - - const auto & const_acl_usage_counter = store.acl_map.getUsageCounter(); - auto & acl_usage_counter = const_cast(const_acl_usage_counter); - const auto & const_new_acl_usage_counter = new_storage.acl_map.getUsageCounter(); - auto & new_acl_usage_counter = const_cast(const_new_acl_usage_counter); + /// 3. load the snapshot into new_store + KeeperStore new_store(raft_settings->dead_session_check_period_ms); + ASSERT_TRUE(snap_mgr.parseSnapshot(meta, new_store)); - ASSERT_EQ(acl_usage_counter, new_acl_usage_counter); + /// 4. compare store and new_store + bool compare_acl = version1 >= V1 && version2 >= V1; + compareKeeperStore(store, new_store, compare_acl); - const auto & acls_1020 = getACL(new_storage, "/1020"); - ASSERT_EQ(acls_1020[0].id, "user1:XDkd2dsEuhc9ImU3q8pa8UOdtpI="); - ASSERT_EQ(acls_1020[1].id, "user1:CGujN0OWj2wmttV5NJgM2ja68PQ="); - // end of compare - } + /// 5. create snapshot with version2 + snapshot new_meta(2, 1, config); /// We should use different last_log_index + size_t new_object_size = snap_mgr.createSnapshot(new_meta, new_store, new_store.zxid, new_store.session_id_counter, version2); + ASSERT_EQ(new_object_size, 21 + 3); - ASSERT_TRUE(true) << "compare ACLs."; + /// 6. load the snapshot into new_store1 + KeeperStore new_store1(raft_settings->dead_session_check_period_ms); + ASSERT_TRUE(snap_mgr.parseSnapshot(new_meta, new_store1)); - for (int i = last_index; i < 2 * last_index; i++) - { - String key = std::to_string(i + 1); - String value = "table_" + key; - setNode(store, key, value); - } - - ASSERT_EQ(store.container.size(), 4098); - sleep(1); /// snapshot_create_interval minest is 1 - snapshot meta2(2 * last_index, term, config); - object_size = snap_mgr.createSnapshot(meta2, store); - - KeeperSnapshotManager new_snap_mgr(snap_dir, 1, 100); - ASSERT_EQ(new_snap_mgr.loadSnapshotMetas(), 2); - ASSERT_EQ(new_snap_mgr.lastSnapshot()->get_last_log_idx(), 4096); - - ASSERT_EQ(new_snap_mgr.removeSnapshots(), 1); - - cleanDirectory(snap_dir); + /// 7. compare new_store and new_store1 + compareKeeperStore(new_store, new_store1, compare_acl); } TEST(RaftSnapshot, parseSnapshot) { parseSnapshot(V0, V0); - sleep(1); /// snapshot_create_interval minest is 1 + sleep(1); /// snapshot_create_interval is 1 + parseSnapshot(V1, V1); + sleep(1); + + parseSnapshot(V0, V1); + sleep(1); + + parseSnapshot(V0, V2); + sleep(1); + + parseSnapshot(V1, V2); + sleep(1); + + parseSnapshot(V2, V1); + sleep(1); } TEST(RaftSnapshot, createSnapshotWithFuzzyLog) From 7d7aaba3f4b3c0c88f356c4a3284c7196c1a6005 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Tue, 20 Feb 2024 14:51:32 +0800 Subject: [PATCH 4/5] Fix error when parse snapshot acl of v0 --- src/Service/NuRaftLogSnapshot.cpp | 28 +++++++++++++++++------ src/Service/NuRaftLogSnapshot.h | 13 ++++++++--- src/Service/SnapshotCommon.cpp | 8 ------- src/Service/tests/gtest_raft_snapshot.cpp | 3 +-- 4 files changed, 32 insertions(+), 20 deletions(-) diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index f1aa0dcfc1..f576c060bc 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -149,7 +149,7 @@ void KeeperSnapshotStore::serializeNode( } LOG_TRACE(log, "Append node path {}", path); - appendNodeToBatch(batch, path, node_copy); + appendNodeToBatch(batch, path, node_copy, version); processed++; String path_with_slash = path; @@ -222,7 +222,7 @@ void KeeperSnapshotStore::serializeNodeV2( } LOG_TRACE(log, "Append node path {}", path); - appendNodeToBatchV2(batch, path, node_copy); + appendNodeToBatchV2(batch, path, node_copy, version); processed++; String path_with_slash = path; @@ -233,14 +233,21 @@ void KeeperSnapshotStore::serializeNodeV2( serializeNodeV2(out, batch, store, path_with_slash + child, processed, checksum); } -void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node) +void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version) { SnapshotItemPB * entry = batch->add_data(); WriteBufferFromNuraftBuffer buf; Coordination::write(path, buf); Coordination::write(node->data, buf); - Coordination::write(node->acl_id, buf); + if (version == SnapshotVersion::V0) + { + /// Just ignore acls for snapshot V0 /// TODO delete + Coordination::ACLs acls; + Coordination::write(acls, buf); + } + else + Coordination::write(node->acl_id, buf); Coordination::write(node->is_ephemeral, buf); Coordination::write(node->is_sequential, buf); Coordination::write(node->stat, buf); @@ -250,13 +257,20 @@ void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const St entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); } -void KeeperSnapshotStore::appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node) +void KeeperSnapshotStore::appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version) { WriteBufferFromNuraftBuffer buf; Coordination::write(path, buf); Coordination::write(node->data, buf); - Coordination::write(node->acl_id, buf); + if (version == SnapshotVersion::V0) + { + /// Just ignore acls for snapshot V0 /// TODO delete + Coordination::ACLs acls; + Coordination::write(acls, buf); + } + else + Coordination::write(node->acl_id, buf); Coordination::write(node->is_ephemeral, buf); Coordination::write(node->is_sequential, buf); Coordination::write(node->stat, buf); @@ -608,7 +622,7 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, if (ephemeral_owner != 0) { - LOG_TRACE(log, "Load snapshot find ephemeral node {} - {}", ephemeral_owner, key); + LOG_TRACE(log, "Load snapshot find ephemeral node {} owner {}", key, ephemeral_owner); std::lock_guard l(store.ephemerals_mutex); auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; ephemeral_nodes.emplace(key); diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index f6584413f2..3488c3de2d 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -140,9 +140,11 @@ class KeeperSnapshotStore uint32_t & checksum); /// Append node to batch - inline static void appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node); + inline static void + appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version); /// For snapshot version v3 - inline static void appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node); + inline static void + appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version); /// Snapshot directory, note than the directory may contain more than one snapshot. String snap_dir; @@ -192,7 +194,12 @@ class KeeperSnapshotManager ~KeeperSnapshotManager() = default; - size_t createSnapshot(snapshot & meta, KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0, SnapshotVersion version = CURRENT_SNAPSHOT_VERSION); + size_t createSnapshot( + snapshot & meta, + KeeperStore & store, + int64_t next_zxid = 0, + int64_t next_session_id = 0, + SnapshotVersion version = CURRENT_SNAPSHOT_VERSION); /// save snapshot meta, invoked when we receive an snapshot from leader. bool receiveSnapshotMeta(snapshot & meta); diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 8cc5e60d77..0f76440500 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -17,14 +17,6 @@ namespace RK { -namespace ErrorCodes -{ - extern const int CHECKSUM_DOESNT_MATCH; - extern const int CORRUPTED_DATA; - extern const int UNKNOWN_FORMAT_VERSION; - extern const int ILLEGAL_TYPE_OF_ARGUMENT; -} - using nuraft::cs_new; String toString(SnapshotVersion version) diff --git a/src/Service/tests/gtest_raft_snapshot.cpp b/src/Service/tests/gtest_raft_snapshot.cpp index e6eb00f32c..4fb480e9d4 100644 --- a/src/Service/tests/gtest_raft_snapshot.cpp +++ b/src/Service/tests/gtest_raft_snapshot.cpp @@ -412,7 +412,7 @@ void compareKeeperStore(KeeperStore & store, KeeperStore & new_store, bool compa ASSERT_EQ(new_node->acl_id, it->second->acl_id); } - ASSERT_EQ(new_node->is_ephemeral, it->second->is_ephemeral); + ASSERT_EQ(new_node->is_ephemeral, it->second->is_ephemeral) << "Ephemeral not equals for path " << it->first; ASSERT_EQ(new_node->is_sequential, it->second->is_sequential); ASSERT_EQ(new_node->stat, it->second->stat); ASSERT_EQ(new_node->children, it->second->children); @@ -552,7 +552,6 @@ void parseSnapshot(const SnapshotVersion version1, const SnapshotVersion version } for (int i = 0; i < 1024; i++) - { String key = std::to_string(i); String value = "table_" + key; From c3673e9057336dfc3d050a5b93c816fbf43bee93 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Tue, 20 Feb 2024 15:09:04 +0800 Subject: [PATCH 5/5] Fix code style --- src/Service/NuRaftLogSnapshot.cpp | 1 - src/Service/SnapshotCommon.cpp | 5 +++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index f576c060bc..e1012901fe 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -31,7 +31,6 @@ namespace ErrorCodes extern const int CHECKSUM_DOESNT_MATCH; extern const int CORRUPTED_DATA; extern const int UNKNOWN_FORMAT_VERSION; - extern const int ILLEGAL_TYPE_OF_ARGUMENT; } using nuraft::cs_new; diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 0f76440500..6e50aa9d23 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -17,6 +17,11 @@ namespace RK { +namespace ErrorCodes +{ + extern const int ILLEGAL_TYPE_OF_ARGUMENT; +} + using nuraft::cs_new; String toString(SnapshotVersion version)