From 7960f90f342cad3de182849bce666ead514a12e8 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Tue, 20 Feb 2024 18:09:24 +0800 Subject: [PATCH 1/4] Refactor snapshot serializing and parsing. Fail fast when there are some parsing errors. Avoid unnecessary data copy. --- src/Common/ErrorCodes.cpp | 2 + src/Service/ACLMap.cpp | 2 +- src/Service/KeeperStore.h | 6 + src/Service/NuRaftLogSnapshot.cpp | 358 +++-------------------------- src/Service/SnapshotCommon.cpp | 367 ++++++++++++++++++++++++++++-- src/Service/SnapshotCommon.h | 14 ++ 6 files changed, 400 insertions(+), 349 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 2e3659e215..f8fe619441 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -105,6 +105,8 @@ M(114, RAFT_FWD_NO_CONN) \ M(115, FORWARD_NOT_CONNECTED) \ M(116, ILLEGAL_SETTING_VALUE) \ + M(117, CORRUPTED_LOG) \ + M(118, CORRUPTED_SNAPSHOT) \ /* See END */ namespace RK diff --git a/src/Service/ACLMap.cpp b/src/Service/ACLMap.cpp index 3756f85f44..f2d7ffd5c4 100644 --- a/src/Service/ACLMap.cpp +++ b/src/Service/ACLMap.cpp @@ -100,9 +100,9 @@ void ACLMap::addMapping(uint64_t acls_id, const Coordination::ACLs & acls) void ACLMap::addUsage(uint64_t acl_id, uint64_t count) { - std::lock_guard lock(acl_mutex); if (acl_id == 0) return; + std::lock_guard lock(acl_mutex); usage_counter[acl_id] += count; } diff --git a/src/Service/KeeperStore.h b/src/Service/KeeperStore.h index c1698f0ebd..a9713612ac 100644 --- a/src/Service/KeeperStore.h +++ b/src/Service/KeeperStore.h @@ -73,6 +73,12 @@ struct KeeperNode bool operator!=(const KeeperNode & rhs) const { return !(rhs == *this); } }; +struct KeeperNodeWithPath +{ + String path; + std::shared_ptr node; +}; + /// Map for data tree, it is thread safe with read write lock. /// ConcurrentMap is a two-level unordered_map which is designed /// to reduce latency for unordered_map scales. diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index e1012901fe..b712bda3de 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -575,178 +575,27 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, batch_pb.ParseFromString(String(batch_buf, length)); switch (batch_pb.batch_type()) { - case SnapshotTypePB::SNAPSHOT_TYPE_DATA: { - LOG_INFO(log, "Load batch size {}", batch_pb.data_size()); - for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) - { - const SnapshotItemPB & item_pb = batch_pb.data(data_idx); - const String & data = item_pb.data(); - ptr buf = buffer::alloc(data.size() + 1); - //buf->put(data.data(), data.size()); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - ptr node = cs_new(); - String key; - try - { - Coordination::read(key, in); - Coordination::read(node->data, in); - if (version_ >= SnapshotVersion::V1) - { - Coordination::read(node->acl_id, in); - } - else if (version_ == SnapshotVersion::V0) - { - /// Deserialize ACL - Coordination::ACLs acls; - Coordination::read(acls, in); - node->acl_id = store.acl_map.convertACLs(acls); - LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); - } - - /// Some strange ACLID during deserialization from ZooKeeper - if (node->acl_id == std::numeric_limits::max()) - node->acl_id = 0; - - store.acl_map.addUsage(node->acl_id); - LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); - - Coordination::read(node->is_ephemeral, in); - Coordination::read(node->is_sequential, in); - Coordination::read(node->stat, in); - auto ephemeral_owner = node->stat.ephemeralOwner; - LOG_TRACE(log, "Load snapshot read key {}, node stat {}", key, node->stat.toString()); - store.container.emplace(key, std::move(node)); - - if (ephemeral_owner != 0) - { - LOG_TRACE(log, "Load snapshot find ephemeral node {} owner {}", key, ephemeral_owner); - std::lock_guard l(store.ephemerals_mutex); - auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; - ephemeral_nodes.emplace(key); - } - } - catch (Coordination::Exception & e) - { - LOG_WARNING( - log, - "Can't read snapshot data, data index {}, key {}, excepiton {}", - data_idx, - key, - e.displayText()); - break; - } - } - } + case SnapshotTypePB::SNAPSHOT_TYPE_DATA: + LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size()); + parseBatchData(store, batch_pb, version_); break; - case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG: - break; - case SnapshotTypePB::SNAPSHOT_TYPE_SERVER: - break; case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: { - for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) - { - const SnapshotItemPB & item_pb = batch_pb.data(data_idx); - const String & data = item_pb.data(); - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - int64_t session_id; - int64_t timeout; - try - { - Coordination::read(session_id, in); - Coordination::read(timeout, in); - if (version_ >= SnapshotVersion::V1) - { - Coordination::AuthIDs ids; - Coordination::read(ids, in); - { - if (!ids.empty()) - { - std::lock_guard lock(store.auth_mutex); - store.session_and_auth[session_id] = ids; - } - } - } - } - catch (Coordination::Exception & e) - { - LOG_WARNING( - log, - "Can't read snapshot, data index {}, key {}, excepiton {}", - data_idx, - e.displayText()); - break; - } - LOG_TRACE(log, "Read session id {}, timeout {}", session_id, timeout); - store.addSessionID(session_id, timeout); - } + LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size()); + parseBatchSession(store, batch_pb, version_); } break; case SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP: - if (version_ >= SnapshotVersion::V1) - { - for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) - { - const SnapshotItemPB & item_pb = batch_pb.data(data_idx); - const String & data = item_pb.data(); - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - - uint64_t acl_id; - Coordination::ACLs acls; - - Coordination::read(acl_id, in); - Coordination::read(acls, in); - - LOG_TRACE(log, "parseObject acl_id {}", acl_id); - store.acl_map.addMapping(acl_id, acls); - } - } + LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size()); + parseBatchAclMap(store, batch_pb, version_); break; - case SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP: { - IntMap int_map; - for (int data_idx = 0; data_idx < batch_pb.data_size(); data_idx++) - { - const SnapshotItemPB & item_pb = batch_pb.data(data_idx); - const String & data = item_pb.data(); - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - String key; - int64_t value; - try - { - Coordination::read(key, in); - Coordination::read(value, in); - } - catch (Coordination::Exception & e) - { - LOG_WARNING( - log, - "Can't read uint map snapshot, data index {}, key {}, exception {}", - data_idx, - e.displayText()); - break; - } - int_map[key] = value; - } - if (int_map.find("ZXID") != int_map.end()) - { - store.zxid = int_map["ZXID"]; - } - if (int_map.find("SESSIONID") != int_map.end()) - { - store.session_id_counter = int_map["SESSIONID"]; - } - } + case SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP: + LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size()); + parseBatchIntMap(store, batch_pb, version_); + LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter); break; + case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG: + case SnapshotTypePB::SNAPSHOT_TYPE_SERVER: + break; default: break; } @@ -759,176 +608,27 @@ bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf batch = SnapshotBatchBody::parse(String(batch_buf, length)); switch (batch->type) { - case SnapshotBatchType::SNAPSHOT_TYPE_DATA: { - LOG_INFO(log, "Loading snapshot data batch with element count {}", batch->size()); - for (size_t data_idx = 0; data_idx < batch->size(); data_idx++) - { - const String & data = (*batch)[data_idx]; - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - ptr node = cs_new(); - String key; - try - { - Coordination::read(key, in); - Coordination::read(node->data, in); - if (version_ >= SnapshotVersion::V1) - { - Coordination::read(node->acl_id, in); - } - else if (version_ == SnapshotVersion::V0) - { - /// Deserialize ACL - Coordination::ACLs acls; - Coordination::read(acls, in); - node->acl_id = store.acl_map.convertACLs(acls); - LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); - } - - /// Some strange ACLID during deserialization from ZooKeeper - if (node->acl_id == std::numeric_limits::max()) - node->acl_id = 0; - - store.acl_map.addUsage(node->acl_id); - LOG_TRACE(log, "parseObject path {}, acl_id {}", key, node->acl_id); - - Coordination::read(node->is_ephemeral, in); - Coordination::read(node->is_sequential, in); - Coordination::read(node->stat, in); - auto ephemeral_owner = node->stat.ephemeralOwner; - LOG_TRACE(log, "Load snapshot read key {}, node stat {}", key, node->stat.toString()); - store.container.emplace(key, std::move(node)); - - if (ephemeral_owner != 0) - { - LOG_TRACE(log, "Load snapshot find ephemeral node {} - {}", ephemeral_owner, key); - std::lock_guard l(store.ephemerals_mutex); - auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; - ephemeral_nodes.emplace(key); - } - } - catch (Coordination::Exception & e) - { - LOG_WARNING( - log, - "Can't read snapshot data, data index {}, key {}, excepiton {}", - data_idx, - key, - e.displayText()); - break; - } - } - } - break; - case SnapshotBatchType::SNAPSHOT_TYPE_CONFIG: - break; - case SnapshotBatchType::SNAPSHOT_TYPE_SERVER: + case SnapshotBatchType::SNAPSHOT_TYPE_DATA: + LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch->size()); + parseBatchDataV2(store, *batch, version_); break; case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: { - LOG_INFO(log, "Loading snapshot session batch with element count {}", batch->size()); - for (size_t data_idx = 0; data_idx < batch->size(); data_idx++) - { - const String & data = (*batch)[data_idx]; - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - int64_t session_id; - int64_t timeout; - try - { - Coordination::read(session_id, in); - Coordination::read(timeout, in); - if (version_ >= SnapshotVersion::V1) - { - Coordination::AuthIDs ids; - Coordination::read(ids, in); - { - if (!ids.empty()) - { - std::lock_guard lock(store.auth_mutex); - store.session_and_auth[session_id] = ids; - } - } - } - } - catch (Coordination::Exception & e) - { - LOG_WARNING( - log, - "Can't read snapshot, data index {}, key {}, excepiton {}", - data_idx, - e.displayText()); - break; - } - LOG_TRACE(log, "Read session id {}, timeout {}", session_id, timeout); - store.addSessionID(session_id, timeout); - } + LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch->size()); + parseBatchSessionV2(store, *batch, version_); } break; case SnapshotBatchType::SNAPSHOT_TYPE_ACLMAP: - LOG_INFO(log, "Loading snapshot acl batch with element count {}", batch->size()); - if (version_ >= SnapshotVersion::V1) - { - for (size_t data_idx = 0; data_idx < batch->size(); data_idx++) - { - const String & data = (*batch)[data_idx]; - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - - uint64_t acl_id; - Coordination::ACLs acls; - - Coordination::read(acl_id, in); - Coordination::read(acls, in); - - LOG_TRACE(log, "parseObject acl_id {}", acl_id); - store.acl_map.addMapping(acl_id, acls); - } - } + LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch->size()); + parseBatchAclMapV2(store, *batch, version_); + break; + case SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP: + LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch->size()); + parseBatchIntMapV2(store, *batch, version_); + LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter); + break; + case SnapshotBatchType::SNAPSHOT_TYPE_CONFIG: + case SnapshotBatchType::SNAPSHOT_TYPE_SERVER: break; - case SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP: { - LOG_INFO(log, "Loading snapshot int_map batch with element count {}", batch->size()); - IntMap int_map; - for (size_t data_idx = 0; data_idx < batch->size(); data_idx++) - { - const String & data = (*batch)[data_idx]; - ptr buf = buffer::alloc(data.size() + 1); - buf->put(data); - buf->pos(0); - ReadBufferFromNuRaftBuffer in(buf); - String key; - int64_t value; - try - { - Coordination::read(key, in); - Coordination::read(value, in); - } - catch (Coordination::Exception & e) - { - LOG_WARNING( - log, - "Can't read uint map snapshot, data index {}, key {}, exception {}", - data_idx, - e.displayText()); - break; - } - int_map[key] = value; - } - if (int_map.find("ZXID") != int_map.end()) - { - store.zxid = int_map["ZXID"]; - } - if (int_map.find("SESSIONID") != int_map.end()) - { - store.session_id_counter = int_map["SESSIONID"]; - } - } - break; default: break; } diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 6e50aa9d23..d130495c7b 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -20,6 +20,7 @@ namespace RK namespace ErrorCodes { extern const int ILLEGAL_TYPE_OF_ARGUMENT; + extern const int CORRUPTED_SNAPSHOT; } using nuraft::cs_new; @@ -82,13 +83,6 @@ std::shared_ptr openFileAndWriteHeader(const String & path, return out; } -void writeTailAndClose(std::shared_ptr & out, UInt32 checksum) -{ - out->write(MAGIC_SNAPSHOT_TAIL.data(), MAGIC_SNAPSHOT_TAIL.size()); - writeIntBinary(checksum, *out); - out->close(); -} - int openFileForRead(String & obj_path) { Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); @@ -102,6 +96,74 @@ int openFileForRead(String & obj_path) return snap_fd; } +void writeTailAndClose(std::shared_ptr & out, UInt32 checksum) +{ + out->write(MAGIC_SNAPSHOT_TAIL.data(), MAGIC_SNAPSHOT_TAIL.size()); + writeIntBinary(checksum, *out); + out->close(); +} + +UInt32 updateCheckSum(UInt32 checksum, UInt32 data_crc) +{ + union + { + UInt64 data; + UInt32 crc[2]; + }; + crc[0] = checksum; + crc[1] = data_crc; + return RK::getCRC32(reinterpret_cast(&data), 8); +} + +String serializeKeeperNode(const String & path, const ptr & node, SnapshotVersion version) +{ + WriteBufferFromOwnString buf; + + Coordination::write(path, buf); + Coordination::write(node->data, buf); + + if (version == SnapshotVersion::V0) + { + /// Just ignore acls for snapshot V0 which is only used in JD /// TODO delete the compatibility code + Coordination::ACLs acls; + Coordination::write(acls, buf); + } + else + Coordination::write(node->acl_id, buf); + + Coordination::write(node->is_ephemeral, buf); + Coordination::write(node->is_sequential, buf); + Coordination::write(node->stat, buf); + + return std::move(buf.str()); +} + +std::pair, Coordination::ACLs> parseKeeperNode(const String & buf, SnapshotVersion version) +{ + ReadBufferFromMemory in(buf.data(), buf.size()); + + ptr node_with_path = cs_new(); + auto & node = node_with_path->node; + node = std::make_shared(); + Coordination::ACLs acls; + + Coordination::read(node_with_path->path, in); + Coordination::read(node->data, in); + if (version >= SnapshotVersion::V1) + { + Coordination::read(node->acl_id, in); + } + else if (version == SnapshotVersion::V0) + { + Coordination::read(acls, in); + } + + Coordination::read(node->is_ephemeral, in); + Coordination::read(node->is_sequential, in); + Coordination::read(node->stat, in); + + return {node_with_path, acls}; +} /// save batch data in snapshot object std::pair saveBatch(std::shared_ptr & out, ptr & batch) @@ -361,6 +423,148 @@ void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, template void serializeMap(StringMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); template void serializeMap(IntMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); + +void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version) +{ + for (int i = 0; i < batch.data_size(); i++) + { + const auto & item = batch.data(i); + const String & data = item.data(); + + String path; + ptr node; + Coordination::ACLs acls; + + try + { + auto parse_res = parseKeeperNode(data, version); + path = std::move(parse_res.first->path); + node = std::move(parse_res.first->node); + acls = std::move(parse_res.second); + } + catch (...) + { + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th node in batch", i + 1); + } + + assert(!node); + + if (version == SnapshotVersion::V0) + node->acl_id = store.acl_map.convertACLs(acls); + + /// Some strange ACLID during deserialization from ZooKeeper + if (node->acl_id == std::numeric_limits::max()) + node->acl_id = 0; + + store.acl_map.addUsage(node->acl_id); + + auto ephemeral_owner = node->stat.ephemeralOwner; + store.container.emplace(path, std::move(node)); + + if (ephemeral_owner != 0) + { + std::lock_guard l(store.ephemerals_mutex); + auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; + ephemeral_nodes.emplace(path); + } + } +} + +void parseBatchSession(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version) +{ + for (int i = 0; i < batch.data_size(); i++) + { + const auto & item = batch.data(i); + const String & data = item.data(); + + ReadBufferFromMemory in(data.data(), data.size()); + int64_t session_id; + int64_t timeout; + + try + { + Coordination::read(session_id, in); + Coordination::read(timeout, in); + + if (version >= SnapshotVersion::V1) + { + Coordination::AuthIDs ids; + Coordination::read(ids, in); + if (!ids.empty()) + { + std::lock_guard lock(store.auth_mutex); + store.session_and_auth[session_id] = ids; + } + } + } + catch (...) + { + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th session in batch", i + 1); + } + store.addSessionID(session_id, timeout); + } +} + +void parseBatchAclMap(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version) +{ + if (version >= SnapshotVersion::V1) + { + for (int i = 0; i < batch.data_size(); i++) + { + const SnapshotItemPB & item_pb = batch.data(i); + const String & data = item_pb.data(); + ReadBufferFromMemory in(data.data(), data.size()); + + uint64_t acl_id; + Coordination::ACLs acls; + + try + { + Coordination::read(acl_id, in); + Coordination::read(acls, in); + } + catch (...) + { + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th acl in batch", i + 1); + } + + store.acl_map.addMapping(acl_id, acls); + } + } +} + +void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion /*version*/) +{ + IntMap int_map; + for (int i = 0; i < batch.data_size(); i++) + { + const auto & item = batch.data(i); + const String & data = item.data(); + ReadBufferFromMemory in(data.data(), data.size()); + + String key; + int64_t value; + try + { + Coordination::read(key, in); + Coordination::read(value, in); + } + catch (...) + { + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th element of int_map in batch", i + 1); + } + int_map[key] = value; + } + if (int_map.find("ZXID") != int_map.end()) + { + store.zxid = int_map["ZXID"]; + } + if (int_map.find("SESSIONID") != int_map.end()) + { + store.session_id_counter = int_map["SESSIONID"]; + } +} + std::pair saveBatchV2(std::shared_ptr & out, ptr & batch) { if (!batch) @@ -381,18 +585,6 @@ std::pair saveBatchV2(std::shared_ptr & out return {SnapshotBatchHeader::HEADER_SIZE + header.data_length, header.data_crc}; } -UInt32 updateCheckSum(UInt32 checksum, UInt32 data_crc) -{ - union - { - UInt64 data; - UInt32 crc[2]; - }; - crc[0] = checksum; - crc[1] = data_crc; - return RK::getCRC32(reinterpret_cast(&data), 8); -} - std::pair saveBatchAndUpdateCheckSumV2(std::shared_ptr & out, ptr & batch, UInt32 checksum) { @@ -669,6 +861,143 @@ ptr SnapshotBatchBody::parse(const String & data) return batch_body; } +void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version) +{ + for (size_t i = 0; i < batch.size(); i++) + { + const auto & data = batch[i]; + + String path; + ptr node; + Coordination::ACLs acls; + + try + { + auto parse_res = parseKeeperNode(data, version); + path = std::move(parse_res.first->path); + node = std::move(parse_res.first->node); + acls = std::move(parse_res.second); + } + catch (...) + { + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th node in batch", i + 1); + } + + assert(!node); + + if (version == SnapshotVersion::V0) + node->acl_id = store.acl_map.convertACLs(acls); + + /// Some strange ACLID during deserialization from ZooKeeper + if (node->acl_id == std::numeric_limits::max()) + node->acl_id = 0; + + store.acl_map.addUsage(node->acl_id); + + auto ephemeral_owner = node->stat.ephemeralOwner; + store.container.emplace(path, std::move(node)); + + if (ephemeral_owner != 0) + { + std::lock_guard l(store.ephemerals_mutex); + auto & ephemeral_nodes = store.ephemerals[ephemeral_owner]; + ephemeral_nodes.emplace(path); + } + } +} + +void parseBatchSessionV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version) +{ + for (size_t i = 0; i < batch.size(); i++) + { + const String & data = batch[i]; + ReadBufferFromMemory in(data.data(), data.size()); + + int64_t session_id; + int64_t timeout; + + try + { + Coordination::read(session_id, in); + Coordination::read(timeout, in); + + if (version >= SnapshotVersion::V1) + { + Coordination::AuthIDs ids; + Coordination::read(ids, in); + if (!ids.empty()) + { + std::lock_guard lock(store.auth_mutex); + store.session_and_auth[session_id] = ids; + } + } + } + catch (...) + { + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th session in batch", i + 1); + } + store.addSessionID(session_id, timeout); + } +} + +void parseBatchAclMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version) +{ + if (version >= SnapshotVersion::V1) + { + for (size_t i = 0; i < batch.size(); i++) + { + const String & data = batch[i]; + ReadBufferFromMemory in(data.data(), data.size()); + + uint64_t acl_id; + Coordination::ACLs acls; + + try + { + Coordination::read(acl_id, in); + Coordination::read(acls, in); + } + catch (...) + { + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th acl in batch", i + 1); + } + + store.acl_map.addMapping(acl_id, acls); + } + } +} + +void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion /*version*/) +{ + IntMap int_map; + for (size_t i = 0; i < batch.size(); i++) + { + const String & data = batch[i]; + ReadBufferFromMemory in(data.data(), data.size()); + + String key; + int64_t value; + try + { + Coordination::read(key, in); + Coordination::read(value, in); + } + catch (...) + { + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th element of int_map in batch", i + 1); + } + int_map[key] = value; + } + if (int_map.find("ZXID") != int_map.end()) + { + store.zxid = int_map["ZXID"]; + } + if (int_map.find("SESSIONID") != int_map.end()) + { + store.session_id_counter = int_map["SESSIONID"]; + } +} + } #ifdef __clang__ diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index b3abd2e8b8..1f81680000 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -103,6 +103,10 @@ void writeTailAndClose(std::shared_ptr & out, UInt32 checks UInt32 updateCheckSum(UInt32 checksum, UInt32 data_crc); +/// serialize and parse keeper node +String serializeKeeperNode(const String & path, const ptr & node, SnapshotVersion version); +std::pair, Coordination::ACLs> parseKeeperNode(const String & buf, SnapshotVersion version); + /// ----- For snapshot version 1 ----- // TODO delete /// save batch data in snapshot object @@ -120,6 +124,11 @@ int64_t serializeSessions(KeeperStore & store, UInt32 save_batch_size, SnapshotV template void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); +/// parse snapshot batch +void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch_pb, SnapshotVersion version); +void parseBatchSession(KeeperStore & store, const SnapshotBatchPB & batch_pb, SnapshotVersion version); +void parseBatchAclMap(KeeperStore & store, const SnapshotBatchPB & batch_pb, SnapshotVersion version); +void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch_pb, SnapshotVersion version); /// ----- For snapshot version 2 ----- @@ -139,4 +148,9 @@ int64_t serializeSessionsV2(KeeperStore & store, UInt32 save_batch_size, Snapsho template void serializeMapV2(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); +/// parse snapshot batch +void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch_pb, SnapshotVersion version); +void parseBatchSessionV2(KeeperStore & store, SnapshotBatchBody & batch_pb, SnapshotVersion version); +void parseBatchAclMapV2(KeeperStore & store, SnapshotBatchBody & batch_pb, SnapshotVersion version); +void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch_pb, SnapshotVersion version); } From 0dcf9796d28fca9278e9bcb29fce796616e94802 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Tue, 20 Feb 2024 18:57:25 +0800 Subject: [PATCH 2/4] Fail fast when serializing and parsing snapshot --- src/Common/ErrorCodes.cpp | 2 + src/Service/NuRaftLogSnapshot.cpp | 100 +++++++++++++----------------- src/Service/NuRaftLogSnapshot.h | 22 +++---- src/Service/SnapshotCommon.cpp | 83 +++++++++++-------------- src/Service/SnapshotCommon.h | 31 ++++----- 5 files changed, 109 insertions(+), 129 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index f8fe619441..89f54d71cb 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -107,6 +107,8 @@ M(116, ILLEGAL_SETTING_VALUE) \ M(117, CORRUPTED_LOG) \ M(118, CORRUPTED_SNAPSHOT) \ + M(119, SNAPSHOT_OBJECT_NOT_EXISTS) \ + M(120, SNAPSHOT_NOT_EXISTS) \ /* See END */ namespace RK diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index b712bda3de..f7aa3f049a 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -29,8 +29,10 @@ namespace RK namespace ErrorCodes { extern const int CHECKSUM_DOESNT_MATCH; - extern const int CORRUPTED_DATA; + extern const int CORRUPTED_SNAPSHOT; extern const int UNKNOWN_FORMAT_VERSION; + extern const int SNAPSHOT_OBJECT_NOT_EXISTS; + extern const int SNAPSHOT_NOT_EXISTS; } using nuraft::cs_new; @@ -46,8 +48,8 @@ void KeeperSnapshotStore::getObjectPath(ulong object_id, String & obj_path) size_t KeeperSnapshotStore::getObjectIdx(const String & file_name) { - auto it1 = file_name.find_last_of('_'); - return std::stoi(file_name.substr(it1 + 1, file_name.size() - it1)); + auto it = file_name.find_last_of('_'); + return std::stoi(file_name.substr(it + 1, file_name.size() - it)); } size_t KeeperSnapshotStore::serializeDataTree(KeeperStore & storage) @@ -232,7 +234,8 @@ void KeeperSnapshotStore::serializeNodeV2( serializeNodeV2(out, batch, store, path_with_slash + child, processed, checksum); } -void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version) +void KeeperSnapshotStore::appendNodeToBatch( + ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version) { SnapshotItemPB * entry = batch->add_data(); WriteBufferFromNuraftBuffer buf; @@ -256,7 +259,8 @@ void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const St entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); } -void KeeperSnapshotStore::appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version) +void KeeperSnapshotStore::appendNodeToBatchV2( + ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version) { WriteBufferFromNuraftBuffer buf; @@ -282,7 +286,7 @@ void KeeperSnapshotStore::appendNodeToBatchV2(ptr batch, cons size_t KeeperSnapshotStore::createObjects(KeeperStore & store, int64_t next_zxid, int64_t next_session_id) { return version < SnapshotVersion::V2 ? createObjectsV1(store, next_zxid, next_session_id) - : createObjectsV2(store, next_zxid, next_session_id); + : createObjectsV2(store, next_zxid, next_session_id); } size_t KeeperSnapshotStore::createObjectsV1(KeeperStore & store, int64_t next_zxid, int64_t next_session_id) @@ -440,45 +444,36 @@ void KeeperSnapshotStore::init(String create_time = "") } } -bool KeeperSnapshotStore::parseBatchHeader(ptr fs, SnapshotBatchHeader & head) +void KeeperSnapshotStore::parseBatchHeader(ptr fs, SnapshotBatchHeader & head) { head.reset(); errno = 0; if (readUInt32(fs, head.data_length) != 0) { if (!fs->eof()) - { - LOG_ERROR(log, "Can't read header data_length from snapshot file, error:{}.", strerror(errno)); - } - return false; + throwFromErrno("Can't read header data_length from snapshot file", ErrorCodes::CORRUPTED_SNAPSHOT); } if (readUInt32(fs, head.data_crc) != 0) { if (!fs->eof()) - { - LOG_ERROR(log, "Can't read header data_crc from snapshot file, error:{}.", strerror(errno)); - } - return false; + throwFromErrno("Can't read header data_crc from snapshot file", ErrorCodes::CORRUPTED_SNAPSHOT); } - - return true; } -bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) +void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) { ptr snap_fs = cs_new(); snap_fs->open(obj_path, std::ios::in | std::ios::binary); + if (snap_fs->fail()) - { - LOG_ERROR(log, "Open snapshot object {} for read failed, error:{}", obj_path, strerror(errno)); - return false; - } + throwFromErrno("Open snapshot object " + obj_path + " for read failed", ErrorCodes::CORRUPTED_SNAPSHOT); + snap_fs->seekg(0, snap_fs->end); size_t file_size = snap_fs->tellg(); snap_fs->seekg(0, snap_fs->beg); - LOG_INFO(log, "Open snapshot object {} for read,file size {}", obj_path, file_size); + LOG_INFO(log, "Open snapshot object {} for read, file size {}", obj_path, file_size); size_t read_size = 0; SnapshotBatchHeader header; @@ -499,11 +494,12 @@ bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) LOG_INFO(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj)); break; } - throw Exception(ErrorCodes::CORRUPTED_DATA, "snapshot {} load magic error, version {}", obj_path, toString(version_from_obj)); + throw Exception( + ErrorCodes::CORRUPTED_SNAPSHOT, "snapshot {} load magic error, version {}", obj_path, toString(version_from_obj)); } read_size += 8; - if (isFileHeader(magic)) + if (isSnapshotFileHeader(magic)) { char * buf = reinterpret_cast(&version_from_obj); snap_fs->read(buf, sizeof(uint8_t)); @@ -512,7 +508,7 @@ bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) if (version_from_obj > CURRENT_SNAPSHOT_VERSION) throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version_from_obj); } - else if (isFileTail(magic)) + else if (isSnapshotFileTail(magic)) { UInt32 file_checksum; char * buf = reinterpret_cast(&file_checksum); @@ -536,10 +532,7 @@ bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) read_size = cur_read_size; } - if (!parseBatchHeader(snap_fs, header)) - { - throw Exception(ErrorCodes::CORRUPTED_DATA, "snapshot {} load header error", obj_path); - } + parseBatchHeader(snap_fs, header); checksum = updateCheckSum(checksum, header.data_crc); char * body_buf = new char[header.data_length]; @@ -547,17 +540,17 @@ bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) if (!snap_fs->read(body_buf, header.data_length)) { - LOG_ERROR( - log, "Can't read snapshot object file {} size {}, only {} could be read", obj_path, header.data_length, snap_fs->gcount()); delete[] body_buf; - return false; + throwFromErrno( + "Can't read snapshot object file " + obj_path + ", batch size " + std::to_string(header.data_length) + ", only " + + std::to_string(snap_fs->gcount()) + " could be read", + ErrorCodes::CORRUPTED_SNAPSHOT); } if (!verifyCRC32(body_buf, header.data_length, header.data_crc)) { - LOG_ERROR(log, "Found corrupted data, file {}", obj_path); delete[] body_buf; - return false; + throwFromErrno("Can't read snapshot object file " + obj_path + ", batch crc not match.", ErrorCodes::CORRUPTED_SNAPSHOT); } if (version_from_obj < SnapshotVersion::V2) @@ -566,10 +559,9 @@ bool KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path) parseBatchBodyV2(store, body_buf, header.data_length, version_from_obj); delete[] body_buf; } - return true; } -bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) +void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) { SnapshotBatchPB batch_pb; batch_pb.ParseFromString(String(batch_buf, length)); @@ -578,7 +570,7 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, case SnapshotTypePB::SNAPSHOT_TYPE_DATA: LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size()); parseBatchData(store, batch_pb, version_); - break; + break; case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: { LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size()); parseBatchSession(store, batch_pb, version_); @@ -592,17 +584,16 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size()); parseBatchIntMap(store, batch_pb, version_); LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter); - break; + break; case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG: case SnapshotTypePB::SNAPSHOT_TYPE_SERVER: break; default: break; } - return true; } -bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) +void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_) { ptr batch; batch = SnapshotBatchBody::parse(String(batch_buf, length)); @@ -632,7 +623,6 @@ bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf default: break; } - return true; } void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) @@ -692,10 +682,7 @@ bool KeeperSnapshotStore::existObject(ulong obj_id) void KeeperSnapshotStore::loadObject(ulong obj_id, ptr & buffer) { if (!existObject(obj_id)) - { - LOG_WARNING(log, "Not exist object {}", obj_id); - return; - } + throw Exception(ErrorCodes::SNAPSHOT_OBJECT_NOT_EXISTS, "Snapshot object {} does not exist", obj_id); String obj_path = objects_path.at(obj_id); @@ -754,10 +741,6 @@ void KeeperSnapshotStore::saveObject(ulong obj_id, buffer & buffer) getObjectPath(obj_id, obj_path); int snap_fd = openFileForWrite(obj_path); - if (snap_fd < 0) - { - return; - } buffer.pos(0); size_t offset = 0; @@ -858,11 +841,15 @@ bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj bool KeeperSnapshotManager::loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr & buffer) { auto it = snapshots.find(meta.get_last_log_idx()); + if (it == snapshots.end()) - { - LOG_WARNING(log, "Can't find snapshot, last log index {}", meta.get_last_log_idx()); - return false; - } + throw Exception( + ErrorCodes::SNAPSHOT_NOT_EXISTS, + "Error when loading snapshot object {}, for snapshot {} does not exist", + obj_id, + meta.get_last_log_idx()); + + ptr store = it->second; store->loadObject(obj_id, buffer); return true; @@ -892,8 +879,7 @@ bool KeeperSnapshotManager::parseSnapshot(const snapshot & meta, KeeperStore & s auto it = snapshots.find(meta.get_last_log_idx()); if (it == snapshots.end()) { - LOG_WARNING(log, "Can't find snapshot, last log index {}", meta.get_last_log_idx()); - return false; + throw Exception(ErrorCodes::SNAPSHOT_NOT_EXISTS, "Error when parsing snapshot {}, for it does not exist", meta.get_last_log_idx()); } ptr store = it->second; store->loadLatestSnapshot(storage); @@ -930,7 +916,7 @@ size_t KeeperSnapshotManager::loadSnapshotMetas() ptr snap_store = cs_new(snap_dir, meta, object_node_size); snap_store->init(time_str); snapshots[meta.get_last_log_idx()] = snap_store; - LOG_INFO(log, "load filename {}, time {}, index {}, object id {}", file, time_str, log_last_index, object_id); + LOG_INFO(log, "Load filename {}, time {}, index {}, object id {}", file, time_str, log_last_index, object_id); } String full_path = snap_dir + "/" + file; snapshots[log_last_index]->addObjectPath(object_id, full_path); diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index 3488c3de2d..732c54dee5 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -95,7 +95,7 @@ class KeeperSnapshotStore SnapshotVersion version; private: - /// For snapshot version v1 + /// For snapshot version v1 /// TODO delete size_t createObjectsV1(KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0); /// For snapshot version v3 size_t createObjectsV2(KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0); @@ -104,24 +104,24 @@ class KeeperSnapshotStore void getObjectPath(ulong object_id, String & path); /// Parse an snapshot object. We should take the version from snapshot in general. - bool parseObject(KeeperStore & store, String obj_path); + void parseObject(KeeperStore & store, String obj_path); /// Parse batch header in an object /// TODO use internal buffer - bool parseBatchHeader(ptr fs, SnapshotBatchHeader & head); + void parseBatchHeader(ptr fs, SnapshotBatchHeader & head); + /// Parse a batch /// TODO delete + void parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_); /// Parse a batch - bool parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_); - /// Parse a batch - bool parseBatchBodyV2(KeeperStore & store, char * buf, size_t length, SnapshotVersion version_); + void parseBatchBodyV2(KeeperStore & store, char * buf, size_t length, SnapshotVersion version_); - /// Serialize whole data tree + /// Serialize whole data tree /// TODO delete size_t serializeDataTree(KeeperStore & storage); - /// For snapshot version v3 + /// For snapshot version v2 size_t serializeDataTreeV2(KeeperStore & storage); - /// Serialize data tree by deep traversal. + /// Serialize data tree by deep traversal. /// TODO delete void serializeNode( ptr & out, ptr & batch, @@ -139,10 +139,10 @@ class KeeperSnapshotStore uint64_t & processed, uint32_t & checksum); - /// Append node to batch + /// Append node to batch /// TODO delete inline static void appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version); - /// For snapshot version v3 + /// For snapshot version v2 inline static void appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version); diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index d130495c7b..68b8ab88d0 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -19,7 +19,6 @@ namespace RK namespace ErrorCodes { - extern const int ILLEGAL_TYPE_OF_ARGUMENT; extern const int CORRUPTED_SNAPSHOT; } @@ -42,20 +41,8 @@ String toString(SnapshotVersion version) } } -int openFileForWrite(const String & obj_path) -{ - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - errno = 0; - int snap_fd = ::open(obj_path.c_str(), O_RDWR | O_CREAT, 0644); - if (snap_fd < 0) - { - LOG_ERROR(log, "Created new snapshot object {} failed, fd {}, error:{}", obj_path, snap_fd, strerror(errno)); - return -1; - } - return snap_fd; -} -bool isFileHeader(UInt64 magic) +bool isSnapshotFileHeader(UInt64 magic) { union { @@ -65,7 +52,7 @@ bool isFileHeader(UInt64 magic) return magic == magic_num; } -bool isFileTail(UInt64 magic) +bool isSnapshotFileTail(UInt64 magic) { union { @@ -75,7 +62,15 @@ bool isFileTail(UInt64 magic) return magic == magic_num; } -std::shared_ptr openFileAndWriteHeader(const String & path, const SnapshotVersion version) +int openFileForWrite(const String & path) +{ + int snap_fd = ::open(path.c_str(), O_RDWR | O_CREAT, 0644); + if (snap_fd < 0) + throwFromErrno("Opening snapshot object " + path + " failed", ErrorCodes::CORRUPTED_SNAPSHOT); + return snap_fd; +} + +ptr openFileAndWriteHeader(const String & path, SnapshotVersion version) { auto out = std::make_shared(path); out->write(MAGIC_SNAPSHOT_HEAD.data(), MAGIC_SNAPSHOT_HEAD.size()); @@ -83,23 +78,19 @@ std::shared_ptr openFileAndWriteHeader(const String & path, return out; } -int openFileForRead(String & obj_path) +int openFileForRead(String & path) { - Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - errno = 0; - int snap_fd = ::open(obj_path.c_str(), O_RDWR); + int snap_fd = ::open(path.c_str(), O_RDWR); if (snap_fd < 0) - { - LOG_ERROR(log, "Open snapshot object {} failed, fd {}, error:{}", obj_path, snap_fd, strerror(errno)); - return -1; - } + throwFromErrno("Opening snapshot object " + path + " failed", ErrorCodes::CORRUPTED_SNAPSHOT); return snap_fd; } -void writeTailAndClose(std::shared_ptr & out, UInt32 checksum) +void writeTailAndClose(ptr & out, UInt32 checksum) { out->write(MAGIC_SNAPSHOT_TAIL.data(), MAGIC_SNAPSHOT_TAIL.size()); writeIntBinary(checksum, *out); + out->next(); out->close(); } @@ -166,7 +157,7 @@ std::pair, Coordination::ACLs> parseKeeperNode(const Str } /// save batch data in snapshot object -std::pair saveBatch(std::shared_ptr & out, ptr & batch) +std::pair saveBatch(ptr & out, ptr & batch) { if (!batch) batch = cs_new(); @@ -188,7 +179,7 @@ std::pair saveBatch(std::shared_ptr & out, } std::pair -saveBatchAndUpdateCheckSum(std::shared_ptr & out, ptr & batch, UInt32 checksum) +saveBatchAndUpdateCheckSum(ptr & out, ptr & batch, UInt32 checksum) { auto [save_size, data_crc] = saveBatch(out, batch); /// rebuild batch @@ -398,7 +389,7 @@ void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, else if constexpr (std::is_same_v) batch->set_batch_type(SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP); else - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Only support string and int map."); + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Only support string and int map."); } /// append to batch @@ -441,14 +432,13 @@ void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, Snapshot path = std::move(parse_res.first->path); node = std::move(parse_res.first->node); acls = std::move(parse_res.second); + assert(node); } catch (...) { throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th node in batch", i + 1); } - assert(!node); - if (version == SnapshotVersion::V0) node->acl_id = store.acl_map.convertACLs(acls); @@ -476,16 +466,16 @@ void parseBatchSession(KeeperStore & store, const SnapshotBatchPB & batch, Snaps { const auto & item = batch.data(i); const String & data = item.data(); - + ReadBufferFromMemory in(data.data(), data.size()); int64_t session_id; int64_t timeout; - + try { Coordination::read(session_id, in); Coordination::read(timeout, in); - + if (version >= SnapshotVersion::V1) { Coordination::AuthIDs ids; @@ -514,7 +504,7 @@ void parseBatchAclMap(KeeperStore & store, const SnapshotBatchPB & batch, Snapsh const SnapshotItemPB & item_pb = batch.data(i); const String & data = item_pb.data(); ReadBufferFromMemory in(data.data(), data.size()); - + uint64_t acl_id; Coordination::ACLs acls; @@ -541,7 +531,7 @@ void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch, Snapsh const auto & item = batch.data(i); const String & data = item.data(); ReadBufferFromMemory in(data.data(), data.size()); - + String key; int64_t value; try @@ -551,7 +541,8 @@ void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch, Snapsh } catch (...) { - throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th element of int_map in batch", i + 1); + throw Exception( + ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th element of int_map in batch", i + 1); } int_map[key] = value; } @@ -565,7 +556,7 @@ void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch, Snapsh } } -std::pair saveBatchV2(std::shared_ptr & out, ptr & batch) +std::pair saveBatchV2(ptr & out, ptr & batch) { if (!batch) batch = cs_new(); @@ -586,7 +577,7 @@ std::pair saveBatchV2(std::shared_ptr & out } std::pair -saveBatchAndUpdateCheckSumV2(std::shared_ptr & out, ptr & batch, UInt32 checksum) +saveBatchAndUpdateCheckSumV2(ptr & out, ptr & batch, UInt32 checksum) { auto [save_size, data_crc] = saveBatchV2(out, batch); /// rebuild batch @@ -787,7 +778,7 @@ void serializeMapV2(T & snap_map, UInt32 save_batch_size, SnapshotVersion versio else if constexpr (std::is_same_v) batch->type = SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP; else - throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Only support string and int map."); + throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Only support string and int map."); } /// append to batch @@ -877,14 +868,13 @@ void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVe path = std::move(parse_res.first->path); node = std::move(parse_res.first->node); acls = std::move(parse_res.second); + assert(node); } catch (...) { throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th node in batch", i + 1); } - assert(!node); - if (version == SnapshotVersion::V0) node->acl_id = store.acl_map.convertACLs(acls); @@ -915,12 +905,12 @@ void parseBatchSessionV2(KeeperStore & store, SnapshotBatchBody & batch, Snapsho int64_t session_id; int64_t timeout; - + try { Coordination::read(session_id, in); Coordination::read(timeout, in); - + if (version >= SnapshotVersion::V1) { Coordination::AuthIDs ids; @@ -948,7 +938,7 @@ void parseBatchAclMapV2(KeeperStore & store, SnapshotBatchBody & batch, Snapshot { const String & data = batch[i]; ReadBufferFromMemory in(data.data(), data.size()); - + uint64_t acl_id; Coordination::ACLs acls; @@ -974,7 +964,7 @@ void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch, Snapshot { const String & data = batch[i]; ReadBufferFromMemory in(data.data(), data.size()); - + String key; int64_t value; try @@ -984,7 +974,8 @@ void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch, Snapshot } catch (...) { - throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th element of int_map in batch", i + 1); + throw Exception( + ErrorCodes::CORRUPTED_SNAPSHOT, "Snapshot is corrupted, can't parse the {}th element of int_map in batch", i + 1); } int_map[key] = value; } diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index 1f81680000..1b2cd369e0 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -89,17 +89,17 @@ struct SnapshotBatchBody static ptr parse(const String & data); }; -int openFileForWrite(const String & obj_path); -int openFileForRead(String & obj_path); +int openFileForWrite(const String & path); +int openFileForRead(String & path); /// snapshot object file header -bool isFileHeader(UInt64 magic); +bool isSnapshotFileHeader(UInt64 magic); /// snapshot object file tail -bool isFileTail(UInt64 magic); +bool isSnapshotFileTail(UInt64 magic); -std::shared_ptr openFileAndWriteHeader(const String & path, const SnapshotVersion version); -void writeTailAndClose(std::shared_ptr & out, UInt32 checksum); +ptr openFileAndWriteHeader(const String & path, SnapshotVersion version); +void writeTailAndClose(ptr & out, UInt32 checksum); UInt32 updateCheckSum(UInt32 checksum, UInt32 data_crc); @@ -110,9 +110,9 @@ std::pair, Coordination::ACLs> parseKeeperNode(const Str /// ----- For snapshot version 1 ----- // TODO delete /// save batch data in snapshot object -std::pair saveBatch(std::shared_ptr & out, ptr & batch); +std::pair saveBatch(ptr & out, ptr & batch); std::pair -saveBatchAndUpdateCheckSum(std::shared_ptr & out, ptr & batch, UInt32 checksum); +saveBatchAndUpdateCheckSum(ptr & out, ptr & batch, UInt32 checksum); void serializeAcls(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version); [[maybe_unused]] size_t serializeEphemerals(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size); @@ -124,18 +124,19 @@ int64_t serializeSessions(KeeperStore & store, UInt32 save_batch_size, SnapshotV template void serializeMap(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); -/// parse snapshot batch -void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch_pb, SnapshotVersion version); -void parseBatchSession(KeeperStore & store, const SnapshotBatchPB & batch_pb, SnapshotVersion version); -void parseBatchAclMap(KeeperStore & store, const SnapshotBatchPB & batch_pb, SnapshotVersion version); -void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch_pb, SnapshotVersion version); +/// Parse snapshot batch without protobuf +void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version); +void parseBatchSession(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version); +void parseBatchAclMap(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version); +void parseBatchIntMap(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version); + /// ----- For snapshot version 2 ----- /// save batch data in snapshot object -std::pair saveBatchV2(std::shared_ptr & out, ptr & batch); +std::pair saveBatchV2(ptr & out, ptr & batch); std::pair -saveBatchAndUpdateCheckSumV2(std::shared_ptr & out, ptr & batch, UInt32 checksum); +saveBatchAndUpdateCheckSumV2(ptr & out, ptr & batch, UInt32 checksum); void serializeAclsV2(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version); [[maybe_unused]] size_t From 265bc25ca584130926f822b9f24bb2abb5747b70 Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Wed, 21 Feb 2024 11:30:22 +0800 Subject: [PATCH 3/4] Fix parsing and serializing keeper node. Add tests to parsing and serializing keeper node. --- src/Service/SnapshotCommon.cpp | 42 +++++++++-------------- src/Service/SnapshotCommon.h | 14 ++++---- src/Service/tests/gtest_raft_snapshot.cpp | 35 +++++++++++++++++++ 3 files changed, 58 insertions(+), 33 deletions(-) diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 68b8ab88d0..13023b9e9e 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -129,31 +129,31 @@ String serializeKeeperNode(const String & path, const ptr & node, Sn return std::move(buf.str()); } -std::pair, Coordination::ACLs> parseKeeperNode(const String & buf, SnapshotVersion version) +ptrparseKeeperNode(const String & buf, SnapshotVersion version) { ReadBufferFromMemory in(buf.data(), buf.size()); ptr node_with_path = cs_new(); auto & node = node_with_path->node; node = std::make_shared(); - Coordination::ACLs acls; Coordination::read(node_with_path->path, in); Coordination::read(node->data, in); - if (version >= SnapshotVersion::V1) - { - Coordination::read(node->acl_id, in); - } - else if (version == SnapshotVersion::V0) + + if (version == SnapshotVersion::V0) { + /// Just ignore acls for snapshot V0 which is only used in JD /// TODO delete the compatibility code + Coordination::ACLs acls; Coordination::read(acls, in); } + else + Coordination::read(node->acl_id, in); Coordination::read(node->is_ephemeral, in); Coordination::read(node->is_sequential, in); Coordination::read(node->stat, in); - return {node_with_path, acls}; + return node_with_path; } /// save batch data in snapshot object @@ -298,10 +298,7 @@ void serializeAcls(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotV int64_t serializeSessions(KeeperStore & store, UInt32 save_batch_size, const SnapshotVersion version, String & path) { Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - auto out = openFileAndWriteHeader(path, version); - - LOG_INFO(log, "Begin create snapshot session object, session size {}, path {}", store.session_and_timeout.size(), path); std::lock_guard lock(store.session_mutex); @@ -424,14 +421,12 @@ void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, Snapshot String path; ptr node; - Coordination::ACLs acls; try { - auto parse_res = parseKeeperNode(data, version); - path = std::move(parse_res.first->path); - node = std::move(parse_res.first->node); - acls = std::move(parse_res.second); + auto node_with_path = parseKeeperNode(data, version); + path = std::move(node_with_path->path); + node = std::move(node_with_path->node); assert(node); } catch (...) @@ -440,7 +435,7 @@ void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, Snapshot } if (version == SnapshotVersion::V0) - node->acl_id = store.acl_map.convertACLs(acls); + node->acl_id = 0; /// Some strange ACLID during deserialization from ZooKeeper if (node->acl_id == std::numeric_limits::max()) @@ -691,10 +686,7 @@ void serializeAclsV2(ACLMap & acls, String path, UInt32 save_batch_size, Snapsho int64_t serializeSessionsV2(KeeperStore & store, UInt32 save_batch_size, const SnapshotVersion version, String & path) { Poco::Logger * log = &(Poco::Logger::get("KeeperSnapshotStore")); - auto out = openFileAndWriteHeader(path, version); - - LOG_INFO(log, "Begin create snapshot session object, session size {}, path {}", store.session_and_timeout.size(), path); std::lock_guard lock(store.session_mutex); @@ -860,14 +852,12 @@ void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVe String path; ptr node; - Coordination::ACLs acls; try { - auto parse_res = parseKeeperNode(data, version); - path = std::move(parse_res.first->path); - node = std::move(parse_res.first->node); - acls = std::move(parse_res.second); + auto node_with_path = parseKeeperNode(data, version); + path = std::move(node_with_path->path); + node = std::move(node_with_path->node); assert(node); } catch (...) @@ -876,7 +866,7 @@ void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVe } if (version == SnapshotVersion::V0) - node->acl_id = store.acl_map.convertACLs(acls); + node->acl_id = 0; /// Some strange ACLID during deserialization from ZooKeeper if (node->acl_id == std::numeric_limits::max()) diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index 1b2cd369e0..4147704eb8 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -103,9 +103,9 @@ void writeTailAndClose(ptr & out, UInt32 checksum); UInt32 updateCheckSum(UInt32 checksum, UInt32 data_crc); -/// serialize and parse keeper node +/// Serialize and parse keeper node. Please note that children is ignored for we build parent relationship after load all data. String serializeKeeperNode(const String & path, const ptr & node, SnapshotVersion version); -std::pair, Coordination::ACLs> parseKeeperNode(const String & buf, SnapshotVersion version); +ptr parseKeeperNode(const String & buf, SnapshotVersion version); /// ----- For snapshot version 1 ----- // TODO delete @@ -115,7 +115,7 @@ std::pair saveBatchAndUpdateCheckSum(ptr & out, ptr & batch, UInt32 checksum); void serializeAcls(ACLMap & acls, String path, UInt32 save_batch_size, SnapshotVersion version); -[[maybe_unused]] size_t serializeEphemerals(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size); +[[maybe_unused]] size_t serializeEphemerals(KeeperStore::Ephemerals & ephemerals, std::mutex & mutex, String path, UInt32 save_batch_size); /// TODO delete /// Serialize sessions and return the next_session_id before serialize int64_t serializeSessions(KeeperStore & store, UInt32 save_batch_size, SnapshotVersion version, String & path); @@ -150,8 +150,8 @@ template void serializeMapV2(T & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path); /// parse snapshot batch -void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch_pb, SnapshotVersion version); -void parseBatchSessionV2(KeeperStore & store, SnapshotBatchBody & batch_pb, SnapshotVersion version); -void parseBatchAclMapV2(KeeperStore & store, SnapshotBatchBody & batch_pb, SnapshotVersion version); -void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch_pb, SnapshotVersion version); +void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); +void parseBatchSessionV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); +void parseBatchAclMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); +void parseBatchIntMapV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version); } diff --git a/src/Service/tests/gtest_raft_snapshot.cpp b/src/Service/tests/gtest_raft_snapshot.cpp index 4fb480e9d4..9104e1ac21 100644 --- a/src/Service/tests/gtest_raft_snapshot.cpp +++ b/src/Service/tests/gtest_raft_snapshot.cpp @@ -301,6 +301,41 @@ void assertStateMachineEquals(KeeperStore & storage, KeeperStore & ano_storage) } +TEST(RaftSnapshot, parseAndSerializeKeeperNode) +{ + String path = "/parseAndSerializeKeeperNode"; + ptr node = cs_new(); + node->data = "some_data"; + node->acl_id = 0; + node->is_ephemeral = true; + node->children = {"1", "2"}; + node->is_sequential = false; + node->stat.czxid = 1; + node->stat.mzxid = 1; + node->stat.ctime = 1; + node->stat.mtime = 1; + node->stat.version = 1; + node->stat.cversion = 1; + node->stat.aversion = 1; + node->stat.ephemeralOwner = 1; + node->stat.dataLength = 9; + node->stat.numChildren = 2; + node->stat.pzxid = 1; + + auto test = [&path, &node](SnapshotVersion version) + { + String buf = serializeKeeperNode(path, node, version); + auto parsed = parseKeeperNode(buf, version); + /// when serialize keeper node children is ignored. + parsed->node->children = {"1", "2"}; + ASSERT_EQ(parsed->path, path); + ASSERT_EQ(*parsed->node, *node); + }; + + test(V0); + test(V1); + test(V2); +} TEST(RaftSnapshot, createSnapshot_1) { From 221719e384219dc16efc3d564e407170ddd1c20e Mon Sep 17 00:00:00 2001 From: JackyWoo Date: Wed, 21 Feb 2024 11:35:49 +0800 Subject: [PATCH 4/4] Fix typo --- src/Service/ConnectionHandler.h | 2 +- src/Service/SnapshotCommon.h | 2 ++ src/Service/ThreadSafeQueue.h | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/Service/ConnectionHandler.h b/src/Service/ConnectionHandler.h index 651940b9ed..51ef90ce95 100644 --- a/src/Service/ConnectionHandler.h +++ b/src/Service/ConnectionHandler.h @@ -37,7 +37,7 @@ using Poco::Thread; * User connection handler with TCP protocol. It is a core class who process * Zookeeper network protocol and send it to dispatcher. * - * We utilize a getWorkerReactor network programming model. We allocate a handler for + * We utilize reactor network programming model. We allocate a handler for * every connection and ensure that every handler run in the same network thread. * * So there is no multi-thread issues. diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index 4147704eb8..a80ed496d9 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -49,6 +49,7 @@ String toString(SnapshotVersion version); static constexpr auto CURRENT_SNAPSHOT_VERSION = SnapshotVersion::V2; +/// Batch data header in an snapshot object file. struct SnapshotBatchHeader { /// The length of the batch data (uncompressed) @@ -64,6 +65,7 @@ struct SnapshotBatchHeader static const size_t HEADER_SIZE = 8; }; +/// Batch data header in an snapshot object file. enum class SnapshotBatchType : int { SNAPSHOT_TYPE_DATA = 0, diff --git a/src/Service/ThreadSafeQueue.h b/src/Service/ThreadSafeQueue.h index 4a260ae27c..6c76f08b1f 100644 --- a/src/Service/ThreadSafeQueue.h +++ b/src/Service/ThreadSafeQueue.h @@ -6,7 +6,7 @@ namespace RK { -/// Queue with mutex and condvar. As simple as possible. +/// Queue with mutex and condition_variable. As simple as possible. template > class ThreadSafeQueue {