Skip to content

Commit

Permalink
Fix error when parse snapshot data of v0
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Feb 20, 2024
1 parent b16c2db commit 86edd35
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 14 deletions.
34 changes: 25 additions & 9 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void KeeperSnapshotStore::serializeNode(
}

LOG_TRACE(log, "Append node path {}", path);
appendNodeToBatch(batch, path, node_copy);
appendNodeToBatch(batch, path, node_copy, version);
processed++;

String path_with_slash = path;
Expand Down Expand Up @@ -222,7 +222,7 @@ void KeeperSnapshotStore::serializeNodeV2(
}

LOG_TRACE(log, "Append node path {}", path);
appendNodeToBatchV2(batch, path, node_copy);
appendNodeToBatchV2(batch, path, node_copy, version);
processed++;

String path_with_slash = path;
Expand All @@ -233,14 +233,21 @@ void KeeperSnapshotStore::serializeNodeV2(
serializeNodeV2(out, batch, store, path_with_slash + child, processed, checksum);
}

void KeeperSnapshotStore::appendNodeToBatch(ptr<SnapshotBatchPB> batch, const String & path, std::shared_ptr<KeeperNode> node)
void KeeperSnapshotStore::appendNodeToBatch(ptr<SnapshotBatchPB> batch, const String & path, std::shared_ptr<KeeperNode> node, SnapshotVersion version)
{
SnapshotItemPB * entry = batch->add_data();
WriteBufferFromNuraftBuffer buf;

Coordination::write(path, buf);
Coordination::write(node->data, buf);
Coordination::write(node->acl_id, buf);
if (version == SnapshotVersion::V0)
{
/// Just ignore acls for snapshot V0 /// TODO delete
Coordination::ACLs acls;
Coordination::write(acls, buf);
}
else
Coordination::write(node->acl_id, buf);
Coordination::write(node->is_ephemeral, buf);
Coordination::write(node->is_sequential, buf);
Coordination::write(node->stat, buf);
Expand All @@ -250,13 +257,20 @@ void KeeperSnapshotStore::appendNodeToBatch(ptr<SnapshotBatchPB> batch, const St
entry->set_data(String(reinterpret_cast<char *>(data->data_begin()), data->size()));
}

void KeeperSnapshotStore::appendNodeToBatchV2(ptr<SnapshotBatchBody> batch, const String & path, std::shared_ptr<KeeperNode> node)
void KeeperSnapshotStore::appendNodeToBatchV2(ptr<SnapshotBatchBody> batch, const String & path, std::shared_ptr<KeeperNode> node, SnapshotVersion version)
{
WriteBufferFromNuraftBuffer buf;

Coordination::write(path, buf);
Coordination::write(node->data, buf);
Coordination::write(node->acl_id, buf);
if (version == SnapshotVersion::V0)
{
/// Just ignore acls for snapshot V0 /// TODO delete
Coordination::ACLs acls;
Coordination::write(acls, buf);
}
else
Coordination::write(node->acl_id, buf);
Coordination::write(node->is_ephemeral, buf);
Coordination::write(node->is_sequential, buf);
Coordination::write(node->stat, buf);
Expand Down Expand Up @@ -606,6 +620,8 @@ bool KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf,
LOG_TRACE(log, "Load snapshot read key {}, node stat {}", key, node->stat.toString());
store.container.emplace(key, std::move(node));

if (store.container.get(key)->is_ephemeral) // TODO
LOG_INFO(log, "{} is ephemeral and owner is {}", key, ephemeral_owner);
if (ephemeral_owner != 0)
{
LOG_TRACE(log, "Load snapshot find ephemeral node {} - {}", ephemeral_owner, key);
Expand Down Expand Up @@ -924,8 +940,8 @@ bool KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf

void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
{
ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM);
for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++)
ThreadPool object_thread_pool(1);/// TODO SNAPSHOT_THREAD_NUM
for (UInt32 thread_idx = 0; thread_idx < 1; thread_idx++) // TODO
{
object_thread_pool.trySchedule(
[this, thread_idx, &store]
Expand All @@ -934,7 +950,7 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
UInt32 obj_idx = 0;
for (auto it = this->objects_path.begin(); it != this->objects_path.end(); it++)
{
if (obj_idx % SNAPSHOT_THREAD_NUM == thread_idx)
if (obj_idx % 1 == thread_idx) // TODO
{
LOG_INFO(
thread_log,
Expand Down
13 changes: 10 additions & 3 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,11 @@ class KeeperSnapshotStore
uint32_t & checksum);

/// Append node to batch
inline static void appendNodeToBatch(ptr<SnapshotBatchPB> batch, const String & path, std::shared_ptr<KeeperNode> node);
inline static void
appendNodeToBatch(ptr<SnapshotBatchPB> batch, const String & path, std::shared_ptr<KeeperNode> node, SnapshotVersion version);
/// For snapshot version v3
inline static void appendNodeToBatchV2(ptr<SnapshotBatchBody> batch, const String & path, std::shared_ptr<KeeperNode> node);
inline static void
appendNodeToBatchV2(ptr<SnapshotBatchBody> batch, const String & path, std::shared_ptr<KeeperNode> node, SnapshotVersion version);

/// Snapshot directory, note than the directory may contain more than one snapshot.
String snap_dir;
Expand Down Expand Up @@ -192,7 +194,12 @@ class KeeperSnapshotManager

~KeeperSnapshotManager() = default;

size_t createSnapshot(snapshot & meta, KeeperStore & store, int64_t next_zxid = 0, int64_t next_session_id = 0, SnapshotVersion version = CURRENT_SNAPSHOT_VERSION);
size_t createSnapshot(
snapshot & meta,
KeeperStore & store,
int64_t next_zxid = 0,
int64_t next_session_id = 0,
SnapshotVersion version = CURRENT_SNAPSHOT_VERSION);

/// save snapshot meta, invoked when we receive an snapshot from leader.
bool receiveSnapshotMeta(snapshot & meta);
Expand Down
3 changes: 1 addition & 2 deletions src/Service/tests/gtest_raft_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ void compareKeeperStore(KeeperStore & store, KeeperStore & new_store, bool compa
ASSERT_EQ(new_node->acl_id, it->second->acl_id);
}

ASSERT_EQ(new_node->is_ephemeral, it->second->is_ephemeral);
ASSERT_EQ(new_node->is_ephemeral, it->second->is_ephemeral) << "Ephemeral not equals for path " << it->first;
ASSERT_EQ(new_node->is_sequential, it->second->is_sequential);
ASSERT_EQ(new_node->stat, it->second->stat);
ASSERT_EQ(new_node->children, it->second->children);
Expand Down Expand Up @@ -552,7 +552,6 @@ void parseSnapshot(const SnapshotVersion version1, const SnapshotVersion version
}

for (int i = 0; i < 1024; i++)

{
String key = std::to_string(i);
String value = "table_" + key;
Expand Down

0 comments on commit 86edd35

Please sign in to comment.