diff --git a/src/Service/KeeperStore.cpp b/src/Service/KeeperStore.cpp index 867ba6cc8e..152ddc7cee 100644 --- a/src/Service/KeeperStore.cpp +++ b/src/Service/KeeperStore.cpp @@ -1659,6 +1659,24 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot) } } +void KeeperStore::buildBlockChildren(const std::vector & all_objects_edges, UInt32 block_idx) +{ + for (auto & object_edges : all_objects_edges) + { + for (auto & [parent_path, path] : object_edges[block_idx]) + { + auto parent = container.get(parent_path); + + if (unlikely(parent == nullptr)) + { + throw RK::Exception("Logical error: Build : can not find parent for node " + path, ErrorCodes::LOGICAL_ERROR); + } + + parent->children.emplace(std::move(path)); + } + } +} + void KeeperStore::cleanEphemeralNodes(int64_t session_id, ThreadSafeQueue & responses_queue, bool ignore_response) { LOG_DEBUG(log, "Clean ephemeral nodes for session {}", toHexString(session_id)); diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index a9713612ac..a66c2d7a3a 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -25,6 +25,8 @@ namespace RK struct StoreRequest; using StoreRequestPtr = std::shared_ptr; using ChildrenSet = std::unordered_set; +using Edge = std::pair; +using Edges = std::vector; /** * Represent an entry in data tree. @@ -160,6 +162,7 @@ class ConcurrentMap bool erase(String const & key) { return mapFor(key).erase(key); } InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBlocks]; } + UInt32 getBlockIndex(String const & key) { return hash_(key) % NumBlocks; } UInt32 getBlockNum() const { return NumBlocks; } InnerMap & getMap(const UInt32 & index) { return maps_[index]; } @@ -200,6 +203,7 @@ class KeeperStore using SessionIDs = std::vector; using Watches = std::unordered_map; + using BlocksEdges = std::array; /// global session id counter, used to allocate new session id. /// It should be same across all nodes. @@ -297,6 +301,9 @@ class KeeperStore /// Build path children after load data from snapshot void buildPathChildren(bool from_zk_snapshot = false); + // Build childrenset for the node in specified block after load data from snapshot. + void buildBlockChildren(const std::vector & all_objects_edges, UInt32 block_idx); + void finalize(); /// Add session id. Used when restoring KeeperStorage from snapshot. diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index f7ddfae3d7..ec00c38f55 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -461,7 +461,7 @@ void KeeperSnapshotStore::parseBatchHeader(ptr fs, SnapshotBatchHe } } -void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) +void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, BlocksEdges & blocks_edges) { ptr snap_fs = cs_new(); snap_fs->open(obj_path, std::ios::in | std::ios::binary); @@ -491,7 +491,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) { if (snap_fs->eof() && version_from_obj == SnapshotVersion::V0) { - LOG_INFO(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj)); + LOG_DEBUG(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj)); break; } throw Exception( @@ -504,7 +504,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) char * buf = reinterpret_cast(&version_from_obj); snap_fs->read(buf, sizeof(uint8_t)); read_size += 1; - LOG_INFO(log, "Got snapshot file header with version {}", toString(version_from_obj)); + LOG_DEBUG(log, "Got snapshot file header with version {}", toString(version_from_obj)); if (version_from_obj > CURRENT_SNAPSHOT_VERSION) throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version_from_obj); } @@ -514,7 +514,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) char * buf = reinterpret_cast(&file_checksum); snap_fs->read(buf, sizeof(UInt32)); read_size += 4; - LOG_INFO(log, "obj_path {}, file_checksum {}, checksum {}.", obj_path, file_checksum, checksum); + LOG_DEBUG(log, "obj_path {}, file_checksum {}, checksum {}.", obj_path, file_checksum, checksum); if (file_checksum != checksum) throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "snapshot {} checksum doesn't match", obj_path); break; @@ -527,7 +527,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) LOG_INFO(log, "snapshot has no version, set to V0", obj_path); } - LOG_INFO(log, "obj_path {}, didn't read the header and tail of the file", obj_path); + LOG_DEBUG(log, "obj_path {}, didn't read the header and tail of the file", obj_path); snap_fs->seekg(cur_read_size); read_size = cur_read_size; } @@ -535,12 +535,12 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) parseBatchHeader(snap_fs, header); checksum = updateCheckSum(checksum, header.data_crc); - char * body_buf = new char[header.data_length]; + String body_string(header.data_length, '0'); + char * body_buf = body_string.data(); read_size += (SnapshotBatchHeader::HEADER_SIZE + header.data_length); if (!snap_fs->read(body_buf, header.data_length)) { - delete[] body_buf; throwFromErrno( "Can't read snapshot object file " + obj_path + ", batch size " + std::to_string(header.data_length) + ", only " + std::to_string(snap_fs->gcount()) + " could be read", @@ -549,41 +549,39 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) if (!verifyCRC32(body_buf, header.data_length, header.data_crc)) { - delete[] body_buf; throwFromErrno("Can't read snapshot object file " + obj_path + ", batch crc not match.", ErrorCodes::CORRUPTED_SNAPSHOT); } if (version_from_obj < SnapshotVersion::V2) - parseBatchBody(store, body_buf, header.data_length, version_from_obj); + parseBatchBody(store, body_string, blocks_edges, version_from_obj); else - parseBatchBodyV2(store, body_buf, header.data_length, version_from_obj); - delete[] body_buf; + parseBatchBodyV2(store, body_string, blocks_edges, version_from_obj); } } -void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) +void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, const String & body_string, BlocksEdges & blocks_edges, SnapshotVersion version_) { SnapshotBatchPB batch_pb; - batch_pb.ParseFromString(String(batch_buf, length)); + batch_pb.ParseFromString(body_string); switch (batch_pb.batch_type()) { case SnapshotTypePB::SNAPSHOT_TYPE_DATA: - LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size()); - parseBatchData(store, batch_pb, version_); + LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size()); + parseBatchData(store, batch_pb, blocks_edges, version_); break; case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: { - LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size()); + LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size()); parseBatchSession(store, batch_pb, version_); } break; case SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP: - LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size()); + LOG_DEBUG(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size()); parseBatchAclMap(store, batch_pb, version_); break; case SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP: - LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size()); + LOG_DEBUG(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size()); parseBatchIntMap(store, batch_pb, version_); - LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter); + LOG_DEBUG(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter); break; case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG: case SnapshotTypePB::SNAPSHOT_TYPE_SERVER: @@ -593,29 +591,29 @@ void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, } } -void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) +void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & body_string, BlocksEdges & blocks_edges, SnapshotVersion version_) { ptr batch; - batch = SnapshotBatchBody::parse(String(batch_buf, length)); + batch = SnapshotBatchBody::parse(body_string); switch (batch->type) { case SnapshotBatchType::SNAPSHOT_TYPE_DATA: - LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch->size()); - parseBatchDataV2(store, *batch, version_); + LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch->size()); + parseBatchDataV2(store, *batch, blocks_edges, version_); break; case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: { - LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch->size()); + LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch->size()); parseBatchSessionV2(store, *batch, version_); } break; case SnapshotBatchType::SNAPSHOT_TYPE_ACLMAP: - LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch->size()); + LOG_DEBUG(log, "Parsing batch acl from snapshot, acl count {}", batch->size()); parseBatchAclMapV2(store, *batch, version_); break; case SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP: - LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch->size()); + LOG_DEBUG(log, "Parsing batch int_map from snapshot, element count {}", batch->size()); parseBatchIntMapV2(store, *batch, version_); - LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter); + LOG_DEBUG(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter); break; case SnapshotBatchType::SNAPSHOT_TYPE_CONFIG: case SnapshotBatchType::SNAPSHOT_TYPE_SERVER: @@ -628,6 +626,10 @@ void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) { ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM); + all_objects_edges = std::vector(objects_path.size()); + + LOG_INFO(log, "Parse object from disk"); + for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++) { object_thread_pool.trySchedule( @@ -648,11 +650,12 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) this->objects_path.size()); try { - this->parseObject(store, it->second); + this->parseObject(store, it->second, all_objects_edges[obj_idx]); } catch (Exception & e) { LOG_ERROR(log, "parseObject error {}, {}", it->second, getExceptionMessage(e, true)); + throw; } } obj_idx++; @@ -661,8 +664,27 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) } object_thread_pool.wait(); + LOG_INFO(log, "Build ChildrenSet for nodes"); + /// Build node tree relationship - store.buildPathChildren(); + for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++) + { + object_thread_pool.trySchedule( + [this, thread_idx, &store] + { + Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildChildren")); + for (UInt32 block_idx = 0; block_idx < store.container.getBlockNum(); block_idx++) + { + if (block_idx % SNAPSHOT_THREAD_NUM == thread_idx) + { + LOG_INFO(thread_log, "Build ChildrenSet for nodes in block {}, thread_idx {}", block_idx, thread_idx); + store.buildBlockChildren(this->all_objects_edges, block_idx); + } + } + }); + } + + object_thread_pool.wait(); LOG_INFO( log, diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index 732c54dee5..4cca589686 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -104,16 +104,16 @@ class KeeperSnapshotStore void getObjectPath(ulong object_id, String & path); /// Parse an snapshot object. We should take the version from snapshot in general. - void parseObject(KeeperStore & store, String obj_path); + void parseObject(KeeperStore & store, String obj_path, BlocksEdges &); /// Parse batch header in an object /// TODO use internal buffer void parseBatchHeader(ptr fs, SnapshotBatchHeader & head); /// Parse a batch /// TODO delete - void parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_); + void parseBatchBody(KeeperStore & store, const String &, BlocksEdges &, SnapshotVersion version_); /// Parse a batch - void parseBatchBodyV2(KeeperStore & store, char * buf, size_t length, SnapshotVersion version_); + void parseBatchBodyV2(KeeperStore & store, const String &, BlocksEdges &, SnapshotVersion version_); /// Serialize whole data tree /// TODO delete size_t serializeDataTree(KeeperStore & storage); @@ -168,6 +168,8 @@ class KeeperSnapshotStore std::map objects_path; + std::vector all_objects_edges; + /// Appended to snapshot file name String curr_time; diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 13023b9e9e..624187f340 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -153,6 +153,8 @@ ptrparseKeeperNode(const String & buf, SnapshotVersion versi Coordination::read(node->is_sequential, in); Coordination::read(node->stat, in); + node->children.reserve(node->stat.numChildren); + return node_with_path; } @@ -412,7 +414,7 @@ template void serializeMap(StringMap & snap_map, UInt32 save_batch_si template void serializeMap(IntMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); -void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version) +void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BlocksEdges & blocks_edges, SnapshotVersion version) { for (int i = 0; i < batch.data_size(); i++) { @@ -444,14 +446,29 @@ void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, Snapshot store.acl_map.addUsage(node->acl_id); auto ephemeral_owner = node->stat.ephemeralOwner; - store.container.emplace(path, std::move(node)); - if (ephemeral_owner != 0) { std::lock_guard l(store.ephemerals_mutex); auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; ephemeral_nodes.emplace(path); } + + store.container.emplace(path, std::move(node)); + + if (unlikely(path == "/")) + continue; + + auto rslash_pos = path.rfind('/'); + + if (unlikely(rslash_pos < 0)) + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path); + + auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos); + auto idx = store.container.getBlockIndex(parent_path); + + // Storage edges in different bucket, according to the block index of parent node. + // Which allow us to insert child paths for all nodes in parallel. + blocks_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1)); } } @@ -844,7 +861,7 @@ ptr SnapshotBatchBody::parse(const String & data) return batch_body; } -void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version) +void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BlocksEdges & blocks_edges, SnapshotVersion version) { for (size_t i = 0; i < batch.size(); i++) { @@ -875,14 +892,30 @@ void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVe store.acl_map.addUsage(node->acl_id); auto ephemeral_owner = node->stat.ephemeralOwner; - store.container.emplace(path, std::move(node)); - if (ephemeral_owner != 0) { std::lock_guard l(store.ephemerals_mutex); auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; ephemeral_nodes.emplace(path); } + + store.container.emplace(path, std::move(node)); + + if (unlikely(path == "/")) + continue; + + auto rslash_pos = path.rfind('/'); + + if (unlikely(rslash_pos < 0)) + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path); + + auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos); + auto idx = store.container.getBlockIndex(parent_path); + + // Storage edges in different bucket, according to the block index of parent node. + // Which allow us to insert child paths for all nodes in parallel. + blocks_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1)); + } } diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index a80ed496d9..806673fc5d 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -34,6 +34,7 @@ static constexpr UInt32 SAVE_BATCH_SIZE = 10000; using StringMap = std::unordered_map; using IntMap = std::unordered_map; +using BlocksEdges = KeeperStore::BlocksEdges; enum SnapshotVersion : uint8_t { @@ -127,7 +128,7 @@ template void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); /// Parse snapshot batch without protobuf -void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version); +void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BlocksEdges & blocks_edges, SnapshotVersion version); void parseBatchSession(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version); void parseBatchAclMap(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version); void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version); @@ -152,7 +153,7 @@ template void serializeMapV2(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); /// parse snapshot batch -void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); +void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BlocksEdges & blocks_edges, SnapshotVersion version); void parseBatchSessionV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); void parseBatchAclMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version);