From 5a4eb1f372f8ce2c6a6bf92fe401053dfc6cb2ce Mon Sep 17 00:00:00 2001 From: lzydmxy <13126752315@163.com> Date: Mon, 22 Apr 2024 19:37:02 +0800 Subject: [PATCH 1/5] Remove node's lock --- src/Service/KeeperStore.cpp | 14 -------------- src/Service/KeeperStore.h | 2 -- src/Service/NuRaftLogSnapshot.cpp | 6 +----- 3 files changed, 1 insertion(+), 21 deletions(-) diff --git a/src/Service/KeeperStore.cpp b/src/Service/KeeperStore.cpp index 58dfdf134a..840ae847af 100644 --- a/src/Service/KeeperStore.cpp +++ b/src/Service/KeeperStore.cpp @@ -373,8 +373,6 @@ struct StoreRequestCreate final : public StoreRequest int64_t pzxid; { - std::lock_guard parent_lock(parent->mutex); - response.path_created = path_created; parent->children.insert(child_path); @@ -413,7 +411,6 @@ struct StoreRequestCreate final : public StoreRequest } auto undo_parent = store.container.at(parent_path); { - std::lock_guard parent_lock(undo_parent->mutex); --undo_parent->stat.cversion; --undo_parent->stat.numChildren; undo_parent->stat.pzxid = pzxid; @@ -473,7 +470,6 @@ struct StoreRequestGet final : public StoreRequest else { { - std::shared_lock r_lock(node->mutex); response.stat = node->statForResponse(); response.data = node->data; } @@ -548,7 +544,6 @@ struct StoreRequestRemove final : public StoreRequest auto parent = store.container.at(getParentPath(request.path)); { - std::lock_guard parent_lock(parent->mutex); --parent->stat.numChildren; pzxid = parent->stat.pzxid; parent->stat.pzxid = zxid; @@ -578,7 +573,6 @@ struct StoreRequestRemove final : public StoreRequest store.container.emplace(path, prev_node); auto undo_parent = store.container.at(getParentPath(path)); { - std::lock_guard parent_lock(undo_parent->mutex); ++(undo_parent->stat.numChildren); undo_parent->stat.pzxid = pzxid; undo_parent->children.insert(child_basename); @@ -610,7 +604,6 @@ struct StoreRequestExists final : public StoreRequest if (node != nullptr) { { - std::shared_lock r_lock(node->mutex); response.stat = node->statForResponse(); } response.error = Coordination::Error::ZOK; @@ -671,7 +664,6 @@ struct StoreRequestSet final : public StoreRequest { auto prev_node = node->clone(); { - std::lock_guard node_lock(node->mutex); ++node->stat.version; node->stat.mzxid = zxid; node->stat.mtime = time; @@ -756,7 +748,6 @@ struct StoreRequestList final : public StoreRequest } Coordination::ZooKeeperListResponse & response = dynamic_cast(*response_ptr); - std::shared_lock r_lock(node->mutex); response.stat = node->statForResponse(); @@ -792,7 +783,6 @@ struct StoreRequestList final : public StoreRequest else { Coordination::ZooKeeperSimpleListResponse & response = dynamic_cast(*response_ptr); - std::shared_lock r_lock(node->mutex); response.names.insert(response.names.end(), node->children.begin(), node->children.end()); } @@ -919,7 +909,6 @@ struct StoreRequestSetACL final : public StoreRequest uint64_t acl_id = store.acl_map.convertACLs(node_acls); store.acl_map.addUsage(acl_id); - std::lock_guard node_lock(node->mutex); node->acl_id = acl_id; ++node->stat.aversion; @@ -976,7 +965,6 @@ struct StoreRequestGetACL final : public StoreRequest } else { - std::shared_lock r_lock(node->mutex); response.stat = node->stat; response.acl = store.acl_map.convertNumber(node->acl_id); } @@ -1195,7 +1183,6 @@ void KeeperStore::finalize() { auto parent = container.at(getParentPath(ephemeral_path)); { - std::lock_guard parent_lock(parent->mutex); --parent->stat.numChildren; parent->children.erase(getBaseName(ephemeral_path)); } @@ -1700,7 +1687,6 @@ void KeeperStore::cleanEphemeralNodes(int64_t session_id, ThreadSafeQueuemutex); --parent->stat.numChildren; parent->children.erase(getBaseName(ephemeral_path)); } diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index 97c3830d8a..59df9d99ae 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -42,8 +42,6 @@ struct KeeperNode Coordination::Stat stat{}; ChildrenSet children{}; - std::shared_mutex mutex; - std::shared_ptr clone() const { auto node = std::make_shared(); diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 67c58c4cd6..759b726dad 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -84,11 +84,7 @@ void KeeperSnapshotStore::serializeNodeV2( if (!node) return; - std::shared_ptr node_copy; - { - std::shared_lock lock(node->mutex); - node_copy = node->clone(); - } + std::shared_ptr node_copy = node->clone(); if (processed % max_object_node_size == 0) { From 92a0b11a68b244f7c6ccc094ec9decb0f1761471 Mon Sep 17 00:00:00 2001 From: lzydmxy <13126752315@163.com> Date: Thu, 25 Apr 2024 11:44:20 +0800 Subject: [PATCH 2/5] Remove nodeMap's lock --- src/Service/KeeperStore.h | 55 +++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index 59df9d99ae..0eac837367 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -79,7 +79,6 @@ struct KeeperNodeWithPath std::shared_ptr node; }; -/// Map for data tree, it is thread safe with read write lock. /// ConcurrentMap is a two-level unordered_map which is designed /// to reduce latency for unordered_map scales. template @@ -90,49 +89,38 @@ class ConcurrentMap using ElementMap = std::unordered_map; using Action = std::function; - /// Simple encapsulate unordered_map with lock. class InnerMap { public: SharedElement get(String const & key) { - std::shared_lock lock(mut_); auto i = map_.find(key); return (i != map_.end()) ? i->second : nullptr; } - bool emplace(String const & key, SharedElement && value) - { - std::unique_lock lock(mut_); - auto [_, created] = map_.insert_or_assign(key, std::move(value)); - return created; - } - bool emplace(String const & key, const SharedElement & value) + + template + bool emplace(String const & key, T && value) { - std::unique_lock lock(mut_); - auto [_, created] = map_.insert_or_assign(key, value); - return created; + return map_.insert_or_assign(key, value).second; } + bool erase(String const & key) { - std::unique_lock write_lock(mut_); return map_.erase(key); } size_t size() const { - std::shared_lock lock(mut_); return map_.size(); } void clear() { - std::unique_lock lock(mut_); map_.clear(); } void forEach(const Action & fn) { - std::shared_lock read_lock(mut_); for (const auto & [key, value] : map_) fn(key, value); } @@ -142,22 +130,41 @@ class ConcurrentMap ElementMap & getMap() { return map_; } private: - mutable std::shared_mutex mut_; ElementMap map_; }; private: std::array maps_; std::hash hash_; + std::atomic node_count; public: SharedElement get(const String & key) { return mapFor(key).get(key); } SharedElement at(const String & key) { return mapFor(key).get(key); } - bool emplace(const String & key, SharedElement && value) { return mapFor(key).emplace(key, std::move(value)); } - bool emplace(const String & key, const SharedElement & value) { return mapFor(key).emplace(key, value); } + template + bool emplace(const String & key, T && value) + { + if (mapFor(key).emplace(key, std::forward(value))) + { + node_count ++; + return true; + } + return false; + } + + bool erase(String const & key) + { + if (mapFor(key).erase(key)) + { + node_count --; + return true; + } + return false; + } + size_t count(const String & key) { return get(key) != nullptr ? 1 : 0; } - bool erase(String const & key) { return mapFor(key).erase(key); } + InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBuckets]; } UInt32 getBucketIndex(String const & key) { return hash_(key) % NumBuckets; } @@ -168,14 +175,12 @@ class ConcurrentMap { for (auto & map : maps_) map.clear(); + node_count.store(0); } size_t size() const { - size_t s(0); - for (const auto & map : maps_) - s += map.size(); - return s; + return node_count.load(); } }; From a3d7fc0b0c1450b12c5252256f2762da876c1ed7 Mon Sep 17 00:00:00 2001 From: lzydmxy <13126752315@163.com> Date: Thu, 25 Apr 2024 14:19:30 +0800 Subject: [PATCH 3/5] Fix uninitialized node_count bug --- src/Service/KeeperStore.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index 0eac837367..807464674d 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -136,7 +136,7 @@ class ConcurrentMap private: std::array maps_; std::hash hash_; - std::atomic node_count; + std::atomic node_count{0}; public: SharedElement get(const String & key) { return mapFor(key).get(key); } From 61840dd6df8b66c3e5054d2c3542962a5de5d22a Mon Sep 17 00:00:00 2001 From: lzydmxy <13126752315@163.com> Date: Thu, 25 Apr 2024 22:44:12 +0800 Subject: [PATCH 4/5] Rename ConcurrentMap to KeeperNodeMap --- src/Service/KeeperStore.h | 8 +-- src/Service/tests/gtest_raft_performance.cpp | 75 ++++++++------------ 2 files changed, 35 insertions(+), 48 deletions(-) diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index 807464674d..f5ff4fdba4 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -79,10 +79,10 @@ struct KeeperNodeWithPath std::shared_ptr node; }; -/// ConcurrentMap is a two-level unordered_map which is designed +/// KeeperNodeMap is a two-level unordered_map which is designed /// to reduce latency for unordered_map scales. template -class ConcurrentMap +class KeeperNodeMap { public: using SharedElement = std::shared_ptr; @@ -188,9 +188,9 @@ class ConcurrentMap class KeeperStore { public: - /// bucket num for ConcurrentMap + /// bucket num for KeeperNodeMap static constexpr int MAP_BUCKET_NUM = 16; - using Container = ConcurrentMap; + using Container = KeeperNodeMap; using ResponsesForSessions = std::vector; using KeeperResponsesQueue = ThreadSafeQueue; diff --git a/src/Service/tests/gtest_raft_performance.cpp b/src/Service/tests/gtest_raft_performance.cpp index e3206c8a02..8f263f2d5b 100644 --- a/src/Service/tests/gtest_raft_performance.cpp +++ b/src/Service/tests/gtest_raft_performance.cpp @@ -146,7 +146,7 @@ TEST(RaftPerformance, appendLogThread) # endif #endif -TEST(RaftPerformance, machineCreateThread) +TEST(RaftPerformance, machineCreate) { Poco::Logger * log = &(Poco::Logger::get("RaftStateMachine")); String snap_dir(SNAP_DIR + "/51"); @@ -177,53 +177,40 @@ TEST(RaftPerformance, machineCreateThread) data.append("v"); } - std::vector thread_vec = {1, 2, 4, 8}; - for (auto thread_size : thread_vec) + int send_count = LOG_COUNT; + Stopwatch watch; + { - int send_count = LOG_COUNT; - //int thread_size = 10; - FreeThreadPool thread_pool(thread_size); - Stopwatch watch; - watch.start(); - for (int thread_idx = 0; thread_idx < thread_size; thread_idx++) + char key_buf[257]; + int begin = 0; + int end = send_count; + + while (begin < end) { - thread_pool.scheduleOrThrowOnError([&machine, thread_idx, thread_size, send_count, &key, &data] { - char key_buf[257]; - int log_count = send_count / thread_size; - int begin = thread_idx * log_count; - int end = (thread_idx + 1) * log_count; - //auto * thread_log = &Poco::Logger::get("client_thread"); - //LOG_INFO(thread_log, "Begin run thread {}/{}, send_count {}, range[{} - {}) ", thread_idx, thread_size, send_count, begin, end); - while (begin < end) - { - snprintf(key_buf, 257, "%s%02d%02d%010d", key.data(), thread_size, thread_idx, begin); - String key_str(key_buf); - //LOG_INFO(thread_log, "KEY:[{}] ", key_str); - createZNode(machine, key_str, data); - begin++; - } - }); + snprintf(key_buf, 257, "%s%010d", key.data(), begin); + String key_str(key_buf); + createZNode(machine, key_str, data); + begin++; } - //LOG_INFO(log, "Max thread count {}, running {}", thread_pool.getMaxThreads(), thread_pool.active()); - thread_pool.wait(); - watch.stop(); - int mill_second = watch.elapsedMilliseconds(); - int log_size = ((key_bytes + value_bytes) + sizeof(UInt32) * 4 + sizeof(UInt32) * 6); - double total_size = 1.0 * log_size * send_count / 1000 / 1000; - double byte_rate = 1.0 * total_size / mill_second * 1000; - double count_rate = 1.0 * send_count / mill_second * 1000; - LOG_INFO( - log, - "Append performance : thread_count {}, size {} Byte/OneLog, count {}, total_size {} M, milli second {}, byte rate {} M/S, TPS " - "{}", - thread_size, - log_size, - send_count, - total_size, - mill_second, - byte_rate, - count_rate); } + + watch.stop(); + int mill_second = watch.elapsedMilliseconds(); + int log_size = ((key_bytes + value_bytes) + sizeof(UInt32) * 4 + sizeof(UInt32) * 6); + double total_size = 1.0 * log_size * send_count / 1000 / 1000; + double byte_rate = 1.0 * total_size / mill_second * 1000; + double count_rate = 1.0 * send_count / mill_second * 1000; + LOG_INFO( + log, + "Append performance : size {} Byte/OneLog, count {}, total_size {} M, milli second {}, byte rate {} M/S, TPS " + "{}", + log_size, + send_count, + total_size, + mill_second, + byte_rate, + count_rate); + machine.shutdown(); cleanDirectory(snap_dir); From ac258dc2428d1bd806f17d09d94bc63af8871a9e Mon Sep 17 00:00:00 2001 From: lzydmxy <13126752315@163.com> Date: Sun, 28 Apr 2024 12:30:43 +0800 Subject: [PATCH 5/5] Modify snapshot loading since nodeMap is no longer thread-safe. --- src/Service/KeeperStore.cpp | 17 ++++++++++++++++- src/Service/KeeperStore.h | 14 ++++++++++++++ src/Service/NuRaftLogSnapshot.cpp | 20 ++++++++++++-------- src/Service/NuRaftLogSnapshot.h | 5 +++-- src/Service/SnapshotCommon.cpp | 27 +++++++++++++-------------- src/Service/SnapshotCommon.h | 3 ++- 6 files changed, 60 insertions(+), 26 deletions(-) diff --git a/src/Service/KeeperStore.cpp b/src/Service/KeeperStore.cpp index 840ae847af..ad4388e590 100644 --- a/src/Service/KeeperStore.cpp +++ b/src/Service/KeeperStore.cpp @@ -1646,6 +1646,21 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot) } } +void KeeperStore::buildBucketNodes(const std::vector & all_objects_nodes, UInt32 bucket_idx) +{ + for (auto && object_nodes : all_objects_nodes) + { + for (auto && [path, node] : object_nodes[bucket_idx]) + { +// container.emplace(path, std::move(node), bucket_idx); + if (!container.emplace(path, std::move(node), bucket_idx) && path != "/") + { + throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Logical error: When loading data from a snapshot, duplicate node {} were found ", path); + } + } + } +} + void KeeperStore::buildBucketChildren(const std::vector & all_objects_edges, UInt32 bucket_idx) { for (auto & object_edges : all_objects_edges) @@ -1656,7 +1671,7 @@ void KeeperStore::buildBucketChildren(const std::vector & all_objec if (unlikely(parent == nullptr)) { - throw RK::Exception("Logical error: Build : can not find parent for node " + path, ErrorCodes::LOGICAL_ERROR); + throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Logical error: Build : can not find parent {} for node ", path); } parent->children.emplace(std::move(path)); diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index f5ff4fdba4..50f70b2a5c 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -153,6 +153,17 @@ class KeeperNodeMap return false; } + template + bool emplace(const String & key, T && value, UInt32 bucket_idx) + { + if (maps_[bucket_idx].emplace(key, std::forward(value))) + { + node_count ++; + return true; + } + return false; + } + bool erase(String const & key) { if (mapFor(key).erase(key)) @@ -210,6 +221,7 @@ class KeeperStore /// Hold Edges in different Buckets based on the parent node's bucket number. /// It should be used when load snapshot to built node's childrenSet in parallel without lock. using BucketEdges = std::array; + using BucketNodes = std::array>>, MAP_BUCKET_NUM>; /// global session id counter, used to allocate new session id. /// It should be same across all nodes. @@ -309,6 +321,8 @@ class KeeperStore // Build childrenSet for the node in specified bucket after load data from snapshot. void buildBucketChildren(const std::vector & all_objects_edges, UInt32 bucket_idx); + void buildBucketNodes(const std::vector & all_objects_nodes, UInt32 bucket_idx); + void finalize(); diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 759b726dad..6fefc431da 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -269,7 +269,7 @@ void KeeperSnapshotStore::parseBatchHeader(ptr fs, SnapshotBatchHe } } -void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, BucketEdges & buckets_edges) +void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, BucketEdges & buckets_edges, BucketNodes & bucket_nodes) { ptr snap_fs = cs_new(); snap_fs->open(obj_path, std::ios::in | std::ios::binary); @@ -360,11 +360,11 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, Buck throwFromErrno("Can't read snapshot object file " + obj_path + ", batch crc not match.", ErrorCodes::CORRUPTED_SNAPSHOT); } - parseBatchBodyV2(store, body_string, buckets_edges, version_from_obj); + parseBatchBodyV2(store, body_string, buckets_edges, bucket_nodes, version_from_obj); } } -void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & body_string, BucketEdges & buckets_edges, SnapshotVersion version_) +void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & body_string, BucketEdges & buckets_edges, BucketNodes & bucket_nodes, SnapshotVersion version_) { ptr batch; batch = SnapshotBatchBody::parse(body_string); @@ -372,7 +372,7 @@ void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & b { case SnapshotBatchType::SNAPSHOT_TYPE_DATA: LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch->size()); - parseBatchDataV2(store, *batch, buckets_edges, version_); + parseBatchDataV2(store, *batch, buckets_edges, bucket_nodes, version_); break; case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: { LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch->size()); @@ -398,8 +398,10 @@ void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & b void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) { + auto objects_cnt = objects_path.size(); ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM); - all_objects_edges = std::vector(objects_path.size()); + all_objects_edges = std::vector(objects_cnt); + all_objects_nodes = std::vector(objects_cnt); LOG_INFO(log, "Parse object from disk"); @@ -423,7 +425,7 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) this->objects_path.size()); try { - this->parseObject(store, it->second, all_objects_edges[obj_idx]); + this->parseObject(store, it->second, all_objects_edges[obj_idx], all_objects_nodes[obj_idx]); } catch (Exception & e) { @@ -437,7 +439,7 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) } object_thread_pool.wait(); - LOG_INFO(log, "Build ChildrenSet for nodes"); + LOG_INFO(log, "Build DataTree"); /// Build node tree relationship for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++) @@ -445,11 +447,13 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) object_thread_pool.trySchedule( [this, thread_idx, &store] { - Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildChildren")); + Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildDataTreeThread")); for (UInt32 bucket_idx = 0; bucket_idx < store.container.getBucketNum(); bucket_idx++) { if (bucket_idx % SNAPSHOT_THREAD_NUM == thread_idx) { + LOG_INFO(thread_log, "Build nodes in bucket {}, thread_idx {}", bucket_idx, thread_idx); + store.buildBucketNodes(this->all_objects_nodes, bucket_idx); LOG_INFO(thread_log, "Build ChildrenSet for nodes in bucket {}, thread_idx {}", bucket_idx, thread_idx); store.buildBucketChildren(this->all_objects_edges, bucket_idx); } diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index 7b6231590d..6c5cb36043 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -102,14 +102,14 @@ class KeeperSnapshotStore void getObjectPath(ulong object_id, String & path); /// Parse an snapshot object. We should take the version from snapshot in general. - void parseObject(KeeperStore & store, String obj_path, BucketEdges &); + void parseObject(KeeperStore & store, String obj_path, BucketEdges &, BucketNodes &); /// Parse batch header in an object /// TODO use internal buffer void parseBatchHeader(ptr fs, SnapshotBatchHeader & head); /// Parse a batch - void parseBatchBodyV2(KeeperStore & store, const String &, BucketEdges &, SnapshotVersion version_); + void parseBatchBodyV2(KeeperStore & store, const String &, BucketEdges &, BucketNodes &, SnapshotVersion version_); /// For snapshot version v2 size_t serializeDataTreeV2(KeeperStore & storage); @@ -150,6 +150,7 @@ class KeeperSnapshotStore std::map objects_path; std::vector all_objects_edges; + std::vector all_objects_nodes; /// Appended to snapshot file name String curr_time; diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index d2267912a3..a493fa9ae7 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -452,7 +452,7 @@ ptr SnapshotBatchBody::parse(const String & data) return batch_body; } -void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BucketEdges & buckets_edges, SnapshotVersion version) +void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BucketEdges & buckets_edges, BucketNodes & bucket_nodes, SnapshotVersion version) { for (size_t i = 0; i < batch.size(); i++) { @@ -490,23 +490,22 @@ void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BucketEdge ephemeral_nodes.emplace(path); } - store.container.emplace(path, std::move(node)); - - if (unlikely(path == "/")) - continue; - - auto rslash_pos = path.rfind('/'); + // store.container.emplace(path, std::move(node)); + if (likely(path != "/")) + { + auto rslash_pos = path.rfind('/'); - if (unlikely(rslash_pos < 0)) - throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path); + if (unlikely(rslash_pos < 0)) + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path); - auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos); - auto idx = store.container.getBucketIndex(parent_path); + auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos); - // Storage edges in different bucket, according to the bucket index of parent node. - // Which allow us to insert child paths for all nodes in parallel. - buckets_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1)); + // Storage edges in different bucket, according to the bucket index of parent node. + // Which allow us to insert child paths for all nodes in parallel. + buckets_edges[store.container.getBucketIndex(parent_path)].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1)); + } + bucket_nodes[store.container.getBucketIndex(path)].emplace_back(std::move(path), std::move(node)); } } diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index 680d89d025..1ab2565941 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -34,6 +34,7 @@ static constexpr UInt32 SAVE_BATCH_SIZE = 10000; using StringMap = std::unordered_map; using IntMap = std::unordered_map; using BucketEdges = KeeperStore::BucketEdges; +using BucketNodes = KeeperStore::BucketNodes; enum SnapshotVersion : uint8_t { @@ -129,7 +130,7 @@ template void serializeMapV2(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); /// parse snapshot batch -void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BucketEdges & buckets_edges, SnapshotVersion version); +void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BucketEdges & buckets_edges, BucketNodes & bucket_nodes, SnapshotVersion version); void parseBatchSessionV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); void parseBatchAclMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version);