Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement asynchronous snapshot by copying dataTree #247

Merged
merged 6 commits into from
May 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions programs/server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@
<!-- Keeper will keep at least this log item, default 1_000_000. -->
<!-- <reserved_log_items>1000000</reserved_log_items> -->

<!-- Create snapshot mode, default is async, disable it with set to false. -->
<!-- <async_snapshot>true</async_snapshot> -->

<!-- Create snapshot in this log size, default is 3_000_000. -->
<!-- <snapshot_distance>3000000</snapshot_distance> -->

Expand Down
32 changes: 32 additions & 0 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1888,4 +1888,36 @@ void KeeperStore::reset()
}
}

std::shared_ptr<KeeperStore::BucketNodes> KeeperStore::dumpDataTree()
{
auto result = std::make_shared<KeeperStore::BucketNodes>();
ThreadPool object_thread_pool(MAP_BUCKET_NUM);

for (UInt32 thread_idx = 0; thread_idx < MAP_BUCKET_NUM; thread_idx++)
{
object_thread_pool.trySchedule(
[thread_idx, this, &result]
{
for (UInt32 bucket_idx = 0; bucket_idx < MAP_BUCKET_NUM; bucket_idx++)
{
if (bucket_idx % MAP_BUCKET_NUM != thread_idx)
continue;
LOG_INFO(log, "Dump datatree index {}", bucket_idx);
auto && bucket = this->container.getMap(bucket_idx).getMap();
(*result)[bucket_idx].reserve(bucket.size());
size_t key_size = 0;
for (auto && [path, node] : bucket)
{
key_size += path.size();
(*result)[bucket_idx].emplace_back(path, node->cloneWithoutChildren());
}
LOG_INFO(log, "Dump datatree done index {}, key_size {}, result size {}", bucket_idx, key_size, (*result)[bucket_idx].size());
}
});
}

object_thread_pool.wait();
return result;
}

}
26 changes: 26 additions & 0 deletions src/Service/KeeperStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ struct KeeperNode
return node;
}

std::shared_ptr<KeeperNode> cloneWithoutChildren() const
{
auto node = std::make_shared<KeeperNode>();
node->data = data;
node->acl_id = acl_id;
node->is_ephemeral = is_ephemeral;
node->is_sequential = is_sequential;
node->stat = stat;
return node;
}

/// All stat for client should be generated by this function.
/// This method will remove numChildren from persisted stat.
Coordination::Stat statForResponse() const
Expand Down Expand Up @@ -308,6 +319,19 @@ class KeeperStore
return session_and_timeout.size();
}

SessionAndTimeout getSessionTimeOut() const
{
std::lock_guard lock(session_mutex);
return session_and_timeout;
}

SessionAndAuth getSessionAuth() const
{
std::lock_guard lock(session_mutex);
return session_and_auth;
}


bool updateSessionTimeout(int64_t session_id, int64_t session_timeout_ms); /// TODO delete

/// process request
Expand Down Expand Up @@ -395,6 +419,8 @@ class KeeperStore
/// clear whole store and set to initial state.
void reset();

std::shared_ptr<BucketNodes> dumpDataTree();

private:
int64_t fetchAndGetZxid() { return zxid++; }

Expand Down
185 changes: 183 additions & 2 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ size_t KeeperSnapshotStore::serializeDataTreeV2(KeeperStore & storage)
return getObjectIdx(out->getFileName());
}

size_t KeeperSnapshotStore::serializeDataTreeAsync(SnapTask & snap_task)
{
std::shared_ptr<WriteBufferFromFile> out;
ptr<SnapshotBatchBody> batch;

auto checksum = serializeNodeAsync(out, batch, *snap_task.buckets_nodes);
auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum);
checksum = new_checksum;

writeTailAndClose(out, checksum);
LOG_INFO(log, "Creating snapshot processed data size {}, current zxid {}", snap_task.nodes_count, snap_task.next_zxid);

return getObjectIdx(out->getFileName());
}

void KeeperSnapshotStore::serializeNodeV2(
ptr<WriteBufferFromFile> & out,
ptr<SnapshotBatchBody> & batch,
Expand Down Expand Up @@ -139,6 +154,66 @@ void KeeperSnapshotStore::serializeNodeV2(
serializeNodeV2(out, batch, store, path_with_slash + child, processed, checksum);
}

uint32_t KeeperSnapshotStore::serializeNodeAsync(
ptr<WriteBufferFromFile> & out,
ptr<SnapshotBatchBody> & batch,
BucketNodes & bucket_nodes)
{
uint64_t processed = 0;
uint32_t checksum = 0;
for (auto && bucket : bucket_nodes)
{
for (auto && [path, node] : bucket)
{
if (processed % max_object_node_size == 0)
{
/// time to create new snapshot object
uint64_t obj_id = processed / max_object_node_size;

if (obj_id != 0)
{
/// flush last batch data
auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum);
checksum = new_checksum;

/// close current object file
writeTailAndClose(out, checksum);
/// reset checksum
checksum = 0;
}
String new_obj_path;
/// for there are 4 objects before data objects
getObjectPath(obj_id + 4, new_obj_path);

LOG_INFO(log, "Create new snapshot object {}, path {}", obj_id + 4, new_obj_path);
out = openFileAndWriteHeader(new_obj_path, version);
}

/// flush and rebuild batch
if (processed % save_batch_size == 0)
{
/// skip flush the first batch
if (processed != 0)
{
/// flush data in batch to file
auto [save_size, new_checksum] = saveBatchAndUpdateCheckSumV2(out, batch, checksum);
checksum = new_checksum;
}
else
{
if (!batch)
batch = cs_new<SnapshotBatchBody>();
}
}

LOG_TRACE(log, "Append node path {}", path);
appendNodeToBatchV2(batch, path, node, version);
processed++;
}
}
return checksum;
}

void KeeperSnapshotStore::appendNodeToBatchV2(
ptr<SnapshotBatchBody> batch, const String & path, std::shared_ptr<KeeperNode> node, SnapshotVersion version)
{
Expand Down Expand Up @@ -168,6 +243,12 @@ size_t KeeperSnapshotStore::createObjects(KeeperStore & store, int64_t next_zxid
return createObjectsV2(store, next_zxid, next_session_id);
}

size_t KeeperSnapshotStore::createObjectsAsync(SnapTask & snap_task)
{
return createObjectsAsyncImpl(snap_task);
}


size_t KeeperSnapshotStore::createObjectsV2(KeeperStore & store, int64_t next_zxid, int64_t next_session_id)
{
if (snap_meta->size() == 0)
Expand Down Expand Up @@ -209,7 +290,12 @@ size_t KeeperSnapshotStore::createObjectsV2(KeeperStore & store, int64_t next_zx
String session_path;
/// object index should start from 1
getObjectPath(2, session_path);
int64_t serialized_next_session_id = serializeSessionsV2(store, save_batch_size, version, session_path);

auto session_and_timeout = store.getSessionTimeOut();
auto session_and_auth = store.getSessionAuth();
auto serialized_next_session_id = store.session_id_counter;

serializeSessionsV2(session_and_timeout, session_and_auth, save_batch_size, version, session_path);
LOG_INFO(
log,
"Creating snapshot nex_session_id {}, serialized_next_session_id {}",
Expand All @@ -220,7 +306,7 @@ size_t KeeperSnapshotStore::createObjectsV2(KeeperStore & store, int64_t next_zx
String acl_path;
/// object index should start from 1
getObjectPath(3, acl_path);
serializeAclsV2(store.acl_map, acl_path, save_batch_size, version);
serializeAclsV2(store.acl_map.getMapping(), acl_path, save_batch_size, version);

/// 4. Save data tree
size_t last_id = serializeDataTreeV2(store);
Expand All @@ -239,6 +325,77 @@ size_t KeeperSnapshotStore::createObjectsV2(KeeperStore & store, int64_t next_zx
return total_obj_count;
}


size_t KeeperSnapshotStore::createObjectsAsyncImpl(SnapTask & snap_task)
{
if (snap_meta->size() == 0)
{
return 0;
}

Poco::File(snap_dir).createDirectories();

size_t data_object_count = (snap_task.nodes_count + max_object_node_size -1) / max_object_node_size;

//uint map、Sessions、acls、Normal node objects
size_t total_obj_count = data_object_count + 3;

LOG_INFO(
log,
"Creating async snapshot v3 with approximately data_object_count {}, total_obj_count {}, next zxid {}, next session id {}",
data_object_count,
total_obj_count,
snap_task.next_zxid,
snap_task.next_session_id);

/// 1. Save uint map before nodes
IntMap int_map;
/// Next transaction id
int_map["ZXID"] = snap_task.next_zxid;
/// Next session id
int_map["SESSIONID"] = snap_task.next_session_id;

String map_path;
getObjectPath(1, map_path);
serializeMapV2(int_map, save_batch_size, version, map_path);

/// 2. Save sessions
String session_path;
/// object index should start from 1
getObjectPath(2, session_path);

serializeSessionsV2(snap_task.session_and_timeout, snap_task.session_and_auth, save_batch_size, version, session_path);

int64_t serialized_next_session_id = snap_task.next_session_id;
LOG_INFO(
log,
"Creating snapshot nex_session_id {}, serialized_next_session_id {}",
toHexString(snap_task.next_session_id),
toHexString(serialized_next_session_id));

/// 3. Save acls
String acl_path;
/// object index should start from 1
getObjectPath(3, acl_path);
serializeAclsV2(snap_task.acl_map, acl_path, save_batch_size, version);

/// 4. Save data tree
size_t last_id = serializeDataTreeAsync(snap_task);

total_obj_count = last_id;
LOG_INFO(log, "Creating snapshot real data_object_count {}, total_obj_count {}", total_obj_count - 3, total_obj_count);

/// add all path to objects_path
for (size_t i = 1; i < total_obj_count + 1; i++)
{
String path;
getObjectPath(i, path);
addObjectPath(i, path);
}

return total_obj_count;
}

void KeeperSnapshotStore::init(String create_time = "")
{
if (create_time.empty())
Expand Down Expand Up @@ -581,6 +738,30 @@ void KeeperSnapshotStore::addObjectPath(ulong obj_id, String & path)
objects_path[obj_id] = path;
}

size_t KeeperSnapshotManager::createSnapshotAsync(SnapTask & snap_task, SnapshotVersion version)
{
// size_t store_size = store.container.size();
auto && meta = snap_task.s;
meta->set_size(snap_task.nodes_count);
ptr<KeeperSnapshotStore> snap_store = cs_new<KeeperSnapshotStore>(snap_dir, *meta, object_node_size, SAVE_BATCH_SIZE, version);
snap_store->init();
LOG_INFO(
log,
"Create snapshot last_log_term {}, last_log_idx {}, size {}, nodes {}, ephemeral nodes {}, sessions {}, session_id_counter {}, "
"zxid {}",
meta->get_last_log_term(),
meta->get_last_log_idx(),
meta->size(),
snap_task.nodes_count,
snap_task.ephemeral_nodes_count,
snap_task.session_count,
snap_task.next_session_id,
snap_task.next_zxid);
size_t obj_size = snap_store->createObjectsAsync(snap_task);
snapshots[meta->get_last_log_idx()] = snap_store;
return obj_size;
}

size_t KeeperSnapshotManager::createSnapshot(
snapshot & meta, KeeperStore & store, int64_t next_zxid, int64_t next_session_id, SnapshotVersion version)
{
Expand Down
Loading
Loading