From 3974952d320833e17aa96c9e5bde23ed7a24a4a8 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Mon, 6 May 2024 10:13:25 +0800 Subject: [PATCH 1/3] Fix typos and logs --- src/Service/KeeperStore.cpp | 56 +++++++++------------- src/Service/KeeperStore.h | 12 ++--- src/Service/NuRaftLogSnapshot.cpp | 72 +++++++++++++---------------- src/Service/ZooKeeperDataReader.cpp | 2 +- 4 files changed, 62 insertions(+), 80 deletions(-) diff --git a/src/Service/KeeperStore.cpp b/src/Service/KeeperStore.cpp index ad4388e590..1e4c25c916 100644 --- a/src/Service/KeeperStore.cpp +++ b/src/Service/KeeperStore.cpp @@ -293,7 +293,7 @@ struct StoreRequestCreate final : public StoreRequest std::pair process(KeeperStore & store, int64_t zxid, int64_t session_id, int64_t time) const override { - Poco::Logger * log = &(Poco::Logger::get("SvsKeeperStorageCreateRequest")); + Poco::Logger * log = &(Poco::Logger::get("StoreRequestCreate")); Coordination::ZooKeeperResponsePtr response_ptr = zk_request->makeResponse(); Undo undo; @@ -429,7 +429,7 @@ struct StoreRequestGet final : public StoreRequest bool checkAuth(KeeperStore & store, int64_t session_id) const override { - Poco::Logger * log = &(Poco::Logger::get("SvsKeeperStorageGetRequest")); + Poco::Logger * log = &(Poco::Logger::get("StoreRequestGet")); auto & container = store.container; auto node = container.get(zk_request->getPath()); if (node == nullptr) @@ -519,7 +519,7 @@ struct StoreRequestRemove final : public StoreRequest Coordination::ZooKeeperRemoveRequest & request = dynamic_cast(*zk_request); Undo undo; - Poco::Logger * log = &(Poco::Logger::get("SvsKeeperStorageRemoveRequest")); + Poco::Logger * log = &(Poco::Logger::get("StoreRequestRemove")); KeeperStore::Container::SharedElement node = store.container.get(request.path); if (node == nullptr) { @@ -736,7 +736,7 @@ struct StoreRequestList final : public StoreRequest auto path_prefix = request.path; if (path_prefix.empty()) - throw RK::Exception("Logical error: path cannot be empty", ErrorCodes::LOGICAL_ERROR); + throw RK::Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: path cannot be empty"); if (response_ptr->getOpNum() == Coordination::OpNum::List || response_ptr->getOpNum() == Coordination::OpNum::FilteredList) { @@ -764,7 +764,7 @@ struct StoreRequestList final : public StoreRequest if (node == nullptr) { LOG_ERROR( - &Poco::Logger::get("SvsKeeperStorageListRequest"), + &Poco::Logger::get("StoreRequestList"), "Inconsistency found between uncommitted and committed data, can't get child {} for {} ." "Keeper will terminate to avoid undefined behaviour.", child, request.path); std::terminate(); @@ -1225,7 +1225,7 @@ class StoreRequestFactory final : private boost::noncopyable { auto it = op_num_to_request.find(zk_request->getOpNum()); if (it == op_num_to_request.end()) - throw RK::Exception("Unknown operation type " + toString(zk_request->getOpNum()), ErrorCodes::LOGICAL_ERROR); + throw RK::Exception(ErrorCodes::LOGICAL_ERROR, "Unknown operation type {}", toString(zk_request->getOpNum())); return it->second(zk_request); } @@ -1618,13 +1618,11 @@ bool KeeperStore::updateSessionTimeout(int64_t session_id, int64_t /*session_tim return true; } -void KeeperStore::buildPathChildren(bool from_zk_snapshot) +void KeeperStore::buildChildrenSet(bool from_zk_snapshot) { - LOG_INFO(log, "build path children in keeper storage {}", container.size()); - /// build children - for (UInt32 bucket_idx = 0; bucket_idx < container.getBucketNum(); bucket_idx++) + for (UInt32 bucket_id = 0; bucket_id < container.getBucketNum(); bucket_id++) { - for (const auto & it : container.getMap(bucket_idx).getMap()) + for (const auto & it : container.getMap(bucket_id).getMap()) { if (it.first == "/") continue; @@ -1632,47 +1630,39 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot) auto parent_path = getParentPath(it.first); auto child_path = getBaseName(it.first); auto parent = container.get(parent_path); + if (parent == nullptr) - { - throw RK::Exception("Logical error: Build : can not find parent node " + it.first, ErrorCodes::LOGICAL_ERROR); - } - else - { - parent->children.insert(child_path); - if (from_zk_snapshot) - parent->stat.numChildren++; - } + throw RK::Exception(ErrorCodes::LOGICAL_ERROR, "Error when building children set, can not find parent for node {}", it.first); + + parent->children.insert(child_path); + if (from_zk_snapshot) + parent->stat.numChildren++; } } } -void KeeperStore::buildBucketNodes(const std::vector & all_objects_nodes, UInt32 bucket_idx) +void KeeperStore::fillDataTreeBucket(const std::vector & all_objects_nodes, UInt32 bucket_id) { for (auto && object_nodes : all_objects_nodes) { - for (auto && [path, node] : object_nodes[bucket_idx]) + for (auto && [path, node] : object_nodes[bucket_id]) { -// container.emplace(path, std::move(node), bucket_idx); - if (!container.emplace(path, std::move(node), bucket_idx) && path != "/") - { - throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Logical error: When loading data from a snapshot, duplicate node {} were found ", path); - } + if (!container.emplace(path, std::move(node), bucket_id) && path != "/") + throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Error when filling data tree bucket {}, duplicated node {}", bucket_id, path); } } } -void KeeperStore::buildBucketChildren(const std::vector & all_objects_edges, UInt32 bucket_idx) +void KeeperStore::buildBucketChildren(const std::vector & all_objects_edges, UInt32 bucket_id) { - for (auto & object_edges : all_objects_edges) + for (const auto & object_edges : all_objects_edges) { - for (auto & [parent_path, path] : object_edges[bucket_idx]) + for (const auto & [parent_path, path] : object_edges[bucket_id]) { auto parent = container.get(parent_path); if (unlikely(parent == nullptr)) - { - throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Logical error: Build : can not find parent {} for node ", path); - } + throw RK::Exception(RK::ErrorCodes::LOGICAL_ERROR, "Can not find parent for node {}", path); parent->children.emplace(std::move(path)); } diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index 50f70b2a5c..0c8484b4f2 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -316,17 +316,17 @@ class KeeperStore bool check_acl = true, bool ignore_response = false); - /// Build path children after load data from snapshot - void buildPathChildren(bool from_zk_snapshot = false); + /// Build children set after loading data from snapshot + void buildChildrenSet(bool from_zk_snapshot = false); - // Build childrenSet for the node in specified bucket after load data from snapshot. - void buildBucketChildren(const std::vector & all_objects_edges, UInt32 bucket_idx); - void buildBucketNodes(const std::vector & all_objects_nodes, UInt32 bucket_idx); + // Build children set for the nodes in specified bucket after load data from snapshot. + void buildBucketChildren(const std::vector & all_objects_edges, UInt32 bucket_id); + void fillDataTreeBucket(const std::vector & all_objects_nodes, UInt32 bucket_id); void finalize(); - /// Add session id. Used when restoring KeeperStorage from snapshot. + /// Add session id. Used when restoring KeeperStore from snapshot. void addSessionID(int64_t session_id, int64_t session_timeout_ms) { std::lock_guard lock(session_mutex); diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 6fefc431da..c16113e39d 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -399,73 +399,65 @@ void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & b void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) { auto objects_cnt = objects_path.size(); - ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM); + ThreadPool thread_pool(SNAPSHOT_THREAD_NUM); + all_objects_edges = std::vector(objects_cnt); all_objects_nodes = std::vector(objects_cnt); - LOG_INFO(log, "Parse object from disk"); + LOG_INFO(log, "Parsing snapshot objects from disk"); + Stopwatch watch; - for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++) + for (UInt32 thread_id = 0; thread_id < SNAPSHOT_THREAD_NUM; thread_id++) { - object_thread_pool.trySchedule( - [this, thread_idx, &store] + thread_pool.trySchedule( + [this, thread_id, &store] { - Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.parseObjectThread")); + Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.parseObjectThread#" + std::to_string(thread_id))); UInt32 obj_idx = 0; for (auto it = this->objects_path.begin(); it != this->objects_path.end(); it++) { - if (obj_idx % SNAPSHOT_THREAD_NUM == thread_idx) + if (obj_idx % SNAPSHOT_THREAD_NUM == thread_id) { - LOG_INFO( - thread_log, - "Parse snapshot object, thread_idx {}, obj_index {}, path {}, obj size {}", - thread_idx, - it->first, - it->second, - this->objects_path.size()); - try - { - this->parseObject(store, it->second, all_objects_edges[obj_idx], all_objects_nodes[obj_idx]); - } - catch (Exception & e) - { - LOG_ERROR(log, "parseObject error {}, {}", it->second, getExceptionMessage(e, true)); - throw; - } + LOG_INFO(thread_log, "Parsing snapshot object {}", it->second); + parseObject(store, it->second, all_objects_edges[obj_idx], all_objects_nodes[obj_idx]); } obj_idx++; } }); } - object_thread_pool.wait(); - LOG_INFO(log, "Build DataTree"); + thread_pool.wait(); + LOG_INFO(log, "Parsing snapshot objects costs {}ms", watch.elapsedMilliseconds()); + + LOG_INFO(log, "Building data tree from snapshot objects"); + watch.restart(); - /// Build node tree relationship - for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++) + /// Build data tree relationship in parallel + for (UInt32 thread_id = 0; thread_id < SNAPSHOT_THREAD_NUM; thread_id++) { - object_thread_pool.trySchedule( - [this, thread_idx, &store] + thread_pool.trySchedule( + [this, thread_id, &store] { - Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildDataTreeThread")); - for (UInt32 bucket_idx = 0; bucket_idx < store.container.getBucketNum(); bucket_idx++) + Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildDataTreeThread#" + std::to_string(thread_id))); + for (UInt32 bucket_id = 0; bucket_id < store.container.getBucketNum(); bucket_id++) { - if (bucket_idx % SNAPSHOT_THREAD_NUM == thread_idx) + if (bucket_id % SNAPSHOT_THREAD_NUM == thread_id) { - LOG_INFO(thread_log, "Build nodes in bucket {}, thread_idx {}", bucket_idx, thread_idx); - store.buildBucketNodes(this->all_objects_nodes, bucket_idx); - LOG_INFO(thread_log, "Build ChildrenSet for nodes in bucket {}, thread_idx {}", bucket_idx, thread_idx); - store.buildBucketChildren(this->all_objects_edges, bucket_idx); + LOG_INFO(thread_log, "Filling bucket {} in data tree", bucket_id); + store.fillDataTreeBucket(all_objects_nodes, bucket_id); + LOG_INFO(thread_log, "Building children set for data tree bucket {}", bucket_id); + store.buildBucketChildren(all_objects_edges, bucket_id); } } }); } - object_thread_pool.wait(); + thread_pool.wait(); + LOG_INFO(log, "Building data tree costs {}ms", watch.elapsedMilliseconds()); LOG_INFO( log, - "Load snapshot done: nodes {}, ephemeral nodes {}, sessions {}, session_id_counter {}, zxid {}", + "Loading snapshot done: nodes {}, ephemeral nodes {}, sessions {}, session_id_counter {}, zxid {}", store.getNodesCount(), store.getTotalEphemeralNodesCount(), store.getSessionCount(), @@ -696,8 +688,8 @@ size_t KeeperSnapshotManager::loadSnapshotMetas() file_dir.list(file_vec); char time_str[128]; - unsigned long log_last_index; - unsigned long object_id; + uint64_t log_last_index; + uint64_t object_id; for (const auto & file : file_vec) { diff --git a/src/Service/ZooKeeperDataReader.cpp b/src/Service/ZooKeeperDataReader.cpp index 9193af6460..0ae87d51ae 100644 --- a/src/Service/ZooKeeperDataReader.cpp +++ b/src/Service/ZooKeeperDataReader.cpp @@ -148,7 +148,7 @@ int64_t deserializeStorageData(KeeperStore & store, ReadBuffer & in, Poco::Logge LOG_INFO(log, "Deserialized nodes from snapshot: {}", count); } - store.buildPathChildren(true); + store.buildChildrenSet(true); LOG_INFO(log, "Totally deserialized {} nodes from snapshot", count); return max_zxid; From 28acbbd730b6deb8a4eb9eae21680f0c87f1ad4b Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Mon, 6 May 2024 11:08:31 +0800 Subject: [PATCH 2/3] Clear object edges and nodes when loading snapshot completes --- src/Service/NuRaftLogSnapshot.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index c16113e39d..365ff3d542 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -455,6 +455,9 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) thread_pool.wait(); LOG_INFO(log, "Building data tree costs {}ms", watch.elapsedMilliseconds()); + all_objects_edges.clear(); + all_objects_nodes.clear(); + LOG_INFO( log, "Loading snapshot done: nodes {}, ephemeral nodes {}, sessions {}, session_id_counter {}, zxid {}", From 1e04caf72db57afe8a07cbcca12eff5524836a3f Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Mon, 6 May 2024 12:13:29 +0800 Subject: [PATCH 3/3] Little refactoring of KeeperNodeMap --- src/Service/KeeperStore.cpp | 2 +- src/Service/KeeperStore.h | 70 ++++++++++++++++--------------- src/Service/NuRaftLogSnapshot.cpp | 4 +- 3 files changed, 39 insertions(+), 37 deletions(-) diff --git a/src/Service/KeeperStore.cpp b/src/Service/KeeperStore.cpp index 1e4c25c916..5b0e3e45a7 100644 --- a/src/Service/KeeperStore.cpp +++ b/src/Service/KeeperStore.cpp @@ -520,7 +520,7 @@ struct StoreRequestRemove final : public StoreRequest Undo undo; Poco::Logger * log = &(Poco::Logger::get("StoreRequestRemove")); - KeeperStore::Container::SharedElement node = store.container.get(request.path); + auto node = store.container.get(request.path); if (node == nullptr) { response.error = Coordination::Error::ZNONODE; diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index 0c8484b4f2..edf4cf94c4 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -79,86 +79,89 @@ struct KeeperNodeWithPath std::shared_ptr node; }; -/// KeeperNodeMap is a two-level unordered_map which is designed -/// to reduce latency for unordered_map scales. -template +/// KeeperNodeMap is a two-level unordered_map which is designed to reduce latency for unordered_map scaling. +/// It is not a thread-safe map. But it is accessed only in the request processor thread. +template class KeeperNodeMap { public: - using SharedElement = std::shared_ptr; - using ElementMap = std::unordered_map; - using Action = std::function; + using Key = String; + using ValuePtr = std::shared_ptr; + using NestedMap = std::unordered_map; + using Action = std::function; class InnerMap { public: - SharedElement get(String const & key) + ValuePtr get(const String & key) { - auto i = map_.find(key); - return (i != map_.end()) ? i->second : nullptr; + auto i = map.find(key); + return (i != map.end()) ? i->second : nullptr; } template - bool emplace(String const & key, T && value) + bool emplace(const String & key, T && value) { - return map_.insert_or_assign(key, value).second; + return map.insert_or_assign(key, value).second; } - bool erase(String const & key) + bool erase(const String & key) { - return map_.erase(key); + return map.erase(key); } size_t size() const { - return map_.size(); + return map.size(); } void clear() { - map_.clear(); + map.clear(); } void forEach(const Action & fn) { - for (const auto & [key, value] : map_) + for (const auto & [key, value] : map) fn(key, value); } /// This method will destroy InnerMap thread safety property. - /// deprecated use forEach instead. - ElementMap & getMap() { return map_; } + /// Deprecated, please use forEach instead. + NestedMap & getMap() { return map; } private: - ElementMap map_; + NestedMap map; }; private: - std::array maps_; - std::hash hash_; + inline InnerMap & mapFor(const String & key) { return buckets[hash(key) % NumBuckets]; } + + std::array buckets; + std::hash hash; std::atomic node_count{0}; public: - SharedElement get(const String & key) { return mapFor(key).get(key); } - SharedElement at(const String & key) { return mapFor(key).get(key); } + ValuePtr get(const String & key) { return mapFor(key).get(key); } + ValuePtr at(const String & key) { return mapFor(key).get(key); } template bool emplace(const String & key, T && value) { if (mapFor(key).emplace(key, std::forward(value))) { - node_count ++; + node_count++; return true; } return false; } template - bool emplace(const String & key, T && value, UInt32 bucket_idx) + bool emplace(const String & key, T && value, UInt32 bucket_id) { - if (maps_[bucket_idx].emplace(key, std::forward(value))) + if (buckets[bucket_id].emplace(key, std::forward(value))) { - node_count ++; + node_count++; return true; } return false; @@ -168,7 +171,7 @@ class KeeperNodeMap { if (mapFor(key).erase(key)) { - node_count --; + node_count--; return true; } return false; @@ -176,16 +179,15 @@ class KeeperNodeMap size_t count(const String & key) { return get(key) != nullptr ? 1 : 0; } - - InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBuckets]; } - UInt32 getBucketIndex(String const & key) { return hash_(key) % NumBuckets; } + UInt32 getBucketIndex(const String & key) { return hash(key) % NumBuckets; } UInt32 getBucketNum() const { return NumBuckets; } - InnerMap & getMap(const UInt32 & index) { return maps_[index]; } + + InnerMap & getMap(const UInt32 & bucket_id) { return buckets[bucket_id]; } void clear() { - for (auto & map : maps_) - map.clear(); + for (auto & bucket : buckets) + bucket.clear(); node_count.store(0); } diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 365ff3d542..909e30701c 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -691,8 +691,8 @@ size_t KeeperSnapshotManager::loadSnapshotMetas() file_dir.list(file_vec); char time_str[128]; - uint64_t log_last_index; - uint64_t object_id; + unsigned long log_last_index; + unsigned long object_id; for (const auto & file : file_vec) {