diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index f1aa0dcfc1..55af553b1f 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -149,7 +149,7 @@ void KeeperSnapshotStore::serializeNode( } LOG_TRACE(log, "Append node path {}", path); - appendNodeToBatch(batch, path, node_copy); + appendNodeToBatch(batch, path, node_copy, version); processed++; String path_with_slash = path; @@ -222,7 +222,7 @@ void KeeperSnapshotStore::serializeNodeV2( } LOG_TRACE(log, "Append node path {}", path); - appendNodeToBatchV2(batch, path, node_copy); + appendNodeToBatchV2(batch, path, node_copy, version); processed++; String path_with_slash = path; @@ -233,14 +233,21 @@ void KeeperSnapshotStore::serializeNodeV2( serializeNodeV2(out, batch, store, path_with_slash + child, processed, checksum); } -void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node) +void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version) { SnapshotItemPB * entry = batch->add_data(); WriteBufferFromNuraftBuffer buf; Coordination::write(path, buf); Coordination::write(node->data, buf); - Coordination::write(node->acl_id, buf); + if (version == SnapshotVersion::V0) + { + /// Just ignore acls for snapshot V0 /// TODO delete + Coordination::ACLs acls; + Coordination::write(acls, buf); + } + else + Coordination::write(node->acl_id, buf); Coordination::write(node->is_ephemeral, buf); Coordination::write(node->is_sequential, buf); Coordination::write(node->stat, buf); @@ -250,13 +257,20 @@ void KeeperSnapshotStore::appendNodeToBatch(ptr batch, const St entry->set_data(String(reinterpret_cast(data->data_begin()), data->size())); } -void KeeperSnapshotStore::appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node) +void KeeperSnapshotStore::appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version) { WriteBufferFromNuraftBuffer buf; Coordination::write(path, buf); Coordination::write(node->data, buf); - Coordination::write(node->acl_id, buf); + if (version == SnapshotVersion::V0) + { + /// Just ignore acls for snapshot V0 /// TODO delete + Coordination::ACLs acls; + Coordination::write(acls, buf); + } + else + Coordination::write(node->acl_id, buf); Coordination::write(node->is_ephemeral, buf); Coordination::write(node->is_sequential, buf); Coordination::write(node->stat, buf); @@ -606,6 +620,8 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, LOG_TRACE(log, "Load snapshot read key {}, node stat {}", key, node->stat.toString()); store.container.emplace(key, std::move(node)); + if (store.container.get(key)->is_ephemeral) // TODO + LOG_INFO(log, "{} is ephemeral and owner is {}", key, ephemeral_owner); if (ephemeral_owner != 0) { LOG_TRACE(log, "Load snapshot find ephemeral node {} - {}", ephemeral_owner, key); @@ -924,8 +940,8 @@ bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) { - ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM); - for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++) + ThreadPool object_thread_pool(1);/// TODO SNAPSHOT_THREAD_NUM + for (UInt32 thread_idx = 0; thread_idx < 1; thread_idx++) // TODO { object_thread_pool.trySchedule( [this, thread_idx, &store] @@ -934,7 +950,7 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store) UInt32 obj_idx = 0; for (auto it = this->objects_path.begin(); it != this->objects_path.end(); it++) { - if (obj_idx % SNAPSHOT_THREAD_NUM == thread_idx) + if (obj_idx % 1 == thread_idx) // TODO { LOG_INFO( thread_log, diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index f6584413f2..3488c3de2d 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -140,9 +140,11 @@ class KeeperSnapshotStore uint32_t & checksum); /// Append node to batch - inline static void appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node); + inline static void + appendNodeToBatch(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version); /// For snapshot version v3 - inline static void appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node); + inline static void + appendNodeToBatchV2(ptr batch, const String & path, std::shared_ptr node, SnapshotVersion version); /// Snapshot directory, note than the directory may contain more than one snapshot. String snap_dir; @@ -192,7 +194,12 @@ class KeeperSnapshotManager ~KeeperSnapshotManager() = default; - size_t createSnapshot(snapshot & meta, KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0, SnapshotVersion version = CURRENT_SNAPSHOT_VERSION); + size_t createSnapshot( + snapshot & meta, + KeeperStore & store, + int64_t next_zxid = 0, + int64_t next_session_id = 0, + SnapshotVersion version = CURRENT_SNAPSHOT_VERSION); /// save snapshot meta, invoked when we receive an snapshot from leader. bool receiveSnapshotMeta(snapshot & meta); diff --git a/src/Service/tests/gtest_raft_snapshot.cpp b/src/Service/tests/gtest_raft_snapshot.cpp index e6eb00f32c..4fb480e9d4 100644 --- a/src/Service/tests/gtest_raft_snapshot.cpp +++ b/src/Service/tests/gtest_raft_snapshot.cpp @@ -412,7 +412,7 @@ void compareKeeperStore(KeeperStore & store, KeeperStore & new_store, bool compa ASSERT_EQ(new_node->acl_id, it->second->acl_id); } - ASSERT_EQ(new_node->is_ephemeral, it->second->is_ephemeral); + ASSERT_EQ(new_node->is_ephemeral, it->second->is_ephemeral) << "Ephemeral not equals for path " << it->first; ASSERT_EQ(new_node->is_sequential, it->second->is_sequential); ASSERT_EQ(new_node->stat, it->second->stat); ASSERT_EQ(new_node->children, it->second->children); @@ -552,7 +552,6 @@ void parseSnapshot(const SnapshotVersion version1, const SnapshotVersion version } for (int i = 0; i < 1024; i++) - { String key = std::to_string(i); String value = "table_" + key;