Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make snapshot load faster #222

Merged
merged 3 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,24 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot)
}
}

void KeeperStore::buildBlockChildren(const std::vector<BlocksEdges> & all_objects_edges, UInt32 block_idx)
{
for (auto & object_edges : all_objects_edges)
{
for (auto & [parent_path, path] : object_edges[block_idx])
{
auto parent = container.get(parent_path);

if (unlikely(parent == nullptr))
{
throw RK::Exception("Logical error: Build : can not find parent for node " + path, ErrorCodes::LOGICAL_ERROR);
}

parent->children.emplace(std::move(path));
}
}
}

void KeeperStore::cleanEphemeralNodes(int64_t session_id, ThreadSafeQueue<ResponseForSession> & responses_queue, bool ignore_response)
{
LOG_DEBUG(log, "Clean ephemeral nodes for session {}", toHexString(session_id));
Expand Down
7 changes: 7 additions & 0 deletions src/Service/KeeperStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace RK
struct StoreRequest;
using StoreRequestPtr = std::shared_ptr<StoreRequest>;
using ChildrenSet = std::unordered_set<String>;
using Edge = std::pair<String, String>;
using Edges = std::vector<Edge>;

/**
* Represent an entry in data tree.
Expand Down Expand Up @@ -160,6 +162,7 @@ class ConcurrentMap
bool erase(String const & key) { return mapFor(key).erase(key); }

InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBlocks]; }
UInt32 getBlockIndex(String const & key) { return hash_(key) % NumBlocks; }
JackyWoo marked this conversation as resolved.
Show resolved Hide resolved
UInt32 getBlockNum() const { return NumBlocks; }
InnerMap & getMap(const UInt32 & index) { return maps_[index]; }

Expand Down Expand Up @@ -200,6 +203,7 @@ class KeeperStore
using SessionIDs = std::vector<int64_t>;

using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>;
using BlocksEdges = std::array<Edges, MAP_BLOCK_NUM>;
JackyWoo marked this conversation as resolved.
Show resolved Hide resolved

/// global session id counter, used to allocate new session id.
/// It should be same across all nodes.
Expand Down Expand Up @@ -297,6 +301,9 @@ class KeeperStore
/// Build path children after load data from snapshot
void buildPathChildren(bool from_zk_snapshot = false);

// Build childrenset for the node in specified block after load data from snapshot.
void buildBlockChildren(const std::vector<BlocksEdges> & all_objects_edges, UInt32 block_idx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_zk_snapshot is omited


void finalize();

/// Add session id. Used when restoring KeeperStorage from snapshot.
Expand Down
80 changes: 51 additions & 29 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ void KeeperSnapshotStore::parseBatchHeader(ptr<std::fstream> fs, SnapshotBatchHe
}
}

void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, BlocksEdges & blocks_edges)
{
ptr<std::fstream> snap_fs = cs_new<std::fstream>();
snap_fs->open(obj_path, std::ios::in | std::ios::binary);
Expand Down Expand Up @@ -491,7 +491,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
{
if (snap_fs->eof() && version_from_obj == SnapshotVersion::V0)
{
LOG_INFO(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj));
LOG_DEBUG(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj));
break;
}
throw Exception(
Expand All @@ -504,7 +504,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
char * buf = reinterpret_cast<char *>(&version_from_obj);
snap_fs->read(buf, sizeof(uint8_t));
read_size += 1;
LOG_INFO(log, "Got snapshot file header with version {}", toString(version_from_obj));
LOG_DEBUG(log, "Got snapshot file header with version {}", toString(version_from_obj));
if (version_from_obj > CURRENT_SNAPSHOT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version_from_obj);
}
Expand All @@ -514,7 +514,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
char * buf = reinterpret_cast<char *>(&file_checksum);
snap_fs->read(buf, sizeof(UInt32));
read_size += 4;
LOG_INFO(log, "obj_path {}, file_checksum {}, checksum {}.", obj_path, file_checksum, checksum);
LOG_DEBUG(log, "obj_path {}, file_checksum {}, checksum {}.", obj_path, file_checksum, checksum);
if (file_checksum != checksum)
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "snapshot {} checksum doesn't match", obj_path);
break;
Expand All @@ -527,20 +527,20 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
LOG_INFO(log, "snapshot has no version, set to V0", obj_path);
}

LOG_INFO(log, "obj_path {}, didn't read the header and tail of the file", obj_path);
LOG_DEBUG(log, "obj_path {}, didn't read the header and tail of the file", obj_path);
snap_fs->seekg(cur_read_size);
read_size = cur_read_size;
}

parseBatchHeader(snap_fs, header);

checksum = updateCheckSum(checksum, header.data_crc);
char * body_buf = new char[header.data_length];
String body_string(header.data_length, '0');
char * body_buf = body_string.data();
read_size += (SnapshotBatchHeader::HEADER_SIZE + header.data_length);

if (!snap_fs->read(body_buf, header.data_length))
{
delete[] body_buf;
throwFromErrno(
"Can't read snapshot object file " + obj_path + ", batch size " + std::to_string(header.data_length) + ", only "
+ std::to_string(snap_fs->gcount()) + " could be read",
Expand All @@ -549,41 +549,39 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)

if (!verifyCRC32(body_buf, header.data_length, header.data_crc))
{
delete[] body_buf;
throwFromErrno("Can't read snapshot object file " + obj_path + ", batch crc not match.", ErrorCodes::CORRUPTED_SNAPSHOT);
}

if (version_from_obj < SnapshotVersion::V2)
parseBatchBody(store, body_buf, header.data_length, version_from_obj);
parseBatchBody(store, body_string, blocks_edges, version_from_obj);
else
parseBatchBodyV2(store, body_buf, header.data_length, version_from_obj);
delete[] body_buf;
parseBatchBodyV2(store, body_string, blocks_edges, version_from_obj);
}
}

void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_)
void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, const String & body_string, BlocksEdges & blocks_edges, SnapshotVersion version_)
{
SnapshotBatchPB batch_pb;
batch_pb.ParseFromString(String(batch_buf, length));
batch_pb.ParseFromString(body_string);
switch (batch_pb.batch_type())
{
case SnapshotTypePB::SNAPSHOT_TYPE_DATA:
LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size());
parseBatchData(store, batch_pb, version_);
LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size());
parseBatchData(store, batch_pb, blocks_edges, version_);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: {
LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size());
parseBatchSession(store, batch_pb, version_);
}
break;
case SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP:
LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size());
parseBatchAclMap(store, batch_pb, version_);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP:
LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size());
parseBatchIntMap(store, batch_pb, version_);
LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
LOG_DEBUG(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG:
case SnapshotTypePB::SNAPSHOT_TYPE_SERVER:
Expand All @@ -593,29 +591,29 @@ void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf,
}
}

void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_)
void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & body_string, BlocksEdges & blocks_edges, SnapshotVersion version_)
{
ptr<SnapshotBatchBody> batch;
batch = SnapshotBatchBody::parse(String(batch_buf, length));
batch = SnapshotBatchBody::parse(body_string);
switch (batch->type)
{
case SnapshotBatchType::SNAPSHOT_TYPE_DATA:
LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch->size());
parseBatchDataV2(store, *batch, version_);
LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch->size());
parseBatchDataV2(store, *batch, blocks_edges, version_);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: {
LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch->size());
LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch->size());
parseBatchSessionV2(store, *batch, version_);
}
break;
case SnapshotBatchType::SNAPSHOT_TYPE_ACLMAP:
LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch->size());
LOG_DEBUG(log, "Parsing batch acl from snapshot, acl count {}", batch->size());
parseBatchAclMapV2(store, *batch, version_);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP:
LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch->size());
LOG_DEBUG(log, "Parsing batch int_map from snapshot, element count {}", batch->size());
parseBatchIntMapV2(store, *batch, version_);
LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
LOG_DEBUG(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_CONFIG:
case SnapshotBatchType::SNAPSHOT_TYPE_SERVER:
Expand All @@ -628,6 +626,10 @@ void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf
void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
{
ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM);
all_objects_edges = std::vector<BlocksEdges>(objects_path.size());

LOG_INFO(log, "Parse object from disk");

for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++)
{
object_thread_pool.trySchedule(
Expand All @@ -648,11 +650,12 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
this->objects_path.size());
try
{
this->parseObject(store, it->second);
this->parseObject(store, it->second, all_objects_edges[obj_idx]);
}
catch (Exception & e)
{
LOG_ERROR(log, "parseObject error {}, {}", it->second, getExceptionMessage(e, true));
throw;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw Exception and add an error code

}
}
obj_idx++;
Expand All @@ -661,8 +664,27 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
}
object_thread_pool.wait();

LOG_INFO(log, "Build ChildrenSet for nodes");

/// Build node tree relationship
store.buildPathChildren();
for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++)
{
object_thread_pool.trySchedule(
[this, thread_idx, &store]
{
Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildChildren"));
for (UInt32 block_idx = 0; block_idx < store.container.getBlockNum(); block_idx++)
{
if (block_idx % SNAPSHOT_THREAD_NUM == thread_idx)
{
LOG_INFO(thread_log, "Build ChildrenSet for nodes in block {}, thread_idx {}", block_idx, thread_idx);
store.buildBlockChildren(this->all_objects_edges, block_idx);
}
}
});
}

object_thread_pool.wait();

LOG_INFO(
log,
Expand Down
8 changes: 5 additions & 3 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ class KeeperSnapshotStore
void getObjectPath(ulong object_id, String & path);

/// Parse an snapshot object. We should take the version from snapshot in general.
void parseObject(KeeperStore & store, String obj_path);
void parseObject(KeeperStore & store, String obj_path, BlocksEdges &);

/// Parse batch header in an object
/// TODO use internal buffer
void parseBatchHeader(ptr<std::fstream> fs, SnapshotBatchHeader & head);

/// Parse a batch /// TODO delete
void parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_);
void parseBatchBody(KeeperStore & store, const String &, BlocksEdges &, SnapshotVersion version_);
/// Parse a batch
void parseBatchBodyV2(KeeperStore & store, char * buf, size_t length, SnapshotVersion version_);
void parseBatchBodyV2(KeeperStore & store, const String &, BlocksEdges &, SnapshotVersion version_);

/// Serialize whole data tree /// TODO delete
size_t serializeDataTree(KeeperStore & storage);
Expand Down Expand Up @@ -168,6 +168,8 @@ class KeeperSnapshotStore

std::map<ulong, String> objects_path;

std::vector<BlocksEdges> all_objects_edges;

/// Appended to snapshot file name
String curr_time;

Expand Down
45 changes: 39 additions & 6 deletions src/Service/SnapshotCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ ptr<KeeperNodeWithPath>parseKeeperNode(const String & buf, SnapshotVersion versi
Coordination::read(node->is_sequential, in);
Coordination::read(node->stat, in);

node->children.reserve(node->stat.numChildren);

return node_with_path;
}

Expand Down Expand Up @@ -412,7 +414,7 @@ template void serializeMap<StringMap>(StringMap & snap_map, UInt32 save_batch_si
template void serializeMap<IntMap>(IntMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path);


void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version)
void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BlocksEdges & blocks_edges, SnapshotVersion version)
{
for (int i = 0; i < batch.data_size(); i++)
{
Expand Down Expand Up @@ -444,14 +446,29 @@ void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, Snapshot
store.acl_map.addUsage(node->acl_id);

auto ephemeral_owner = node->stat.ephemeralOwner;
store.container.emplace(path, std::move(node));

if (ephemeral_owner != 0)
{
std::lock_guard l(store.ephemerals_mutex);
auto & ephemeral_nodes = store.ephemerals[ephemeral_owner];
ephemeral_nodes.emplace(path);
}

store.container.emplace(path, std::move(node));

if (unlikely(path == "/"))
continue;

auto rslash_pos = path.rfind('/');

if (unlikely(rslash_pos < 0))
throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path);

auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos);
auto idx = store.container.getBlockIndex(parent_path);

// Storage edges in different bucket, according to the block index of parent node.
// Which allow us to insert child paths for all nodes in parallel.
blocks_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1));
}
}

Expand Down Expand Up @@ -844,7 +861,7 @@ ptr<SnapshotBatchBody> SnapshotBatchBody::parse(const String & data)
return batch_body;
}

void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version)
void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BlocksEdges & blocks_edges, SnapshotVersion version)
{
for (size_t i = 0; i < batch.size(); i++)
{
Expand Down Expand Up @@ -875,14 +892,30 @@ void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVe
store.acl_map.addUsage(node->acl_id);

auto ephemeral_owner = node->stat.ephemeralOwner;
store.container.emplace(path, std::move(node));

if (ephemeral_owner != 0)
{
std::lock_guard l(store.ephemerals_mutex);
auto & ephemeral_nodes = store.ephemerals[ephemeral_owner];
ephemeral_nodes.emplace(path);
}

store.container.emplace(path, std::move(node));

if (unlikely(path == "/"))
continue;

auto rslash_pos = path.rfind('/');

if (unlikely(rslash_pos < 0))
throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path);

auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos);
auto idx = store.container.getBlockIndex(parent_path);

// Storage edges in different bucket, according to the block index of parent node.
// Which allow us to insert child paths for all nodes in parallel.
blocks_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1));

}
}

Expand Down
Loading
Loading