Skip to content

Commit

Permalink
Make snapshot load faster
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy committed Mar 25, 2024
1 parent 96935ad commit 71ddfce
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 40 deletions.
18 changes: 18 additions & 0 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1659,6 +1659,24 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot)
}
}

void KeeperStore::buildBlockChildren(const std::vector<BlocksEdges> & all_objects_edges, UInt32 block_idx)
{
for (auto & object_edges : all_objects_edges)
{
for (auto & [parent_path, path] : object_edges[block_idx])
{
auto parent = container.get(parent_path);

if (unlikely(parent == nullptr))
{
throw RK::Exception("Logical error: Build : can not find parent for node " + path, ErrorCodes::LOGICAL_ERROR);
}

parent->children.emplace(std::move(path));
}
}
}

void KeeperStore::cleanEphemeralNodes(int64_t session_id, ThreadSafeQueue<ResponseForSession> & responses_queue, bool ignore_response)
{
LOG_DEBUG(log, "Clean ephemeral nodes for session {}", toHexString(session_id));
Expand Down
7 changes: 7 additions & 0 deletions src/Service/KeeperStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace RK
struct StoreRequest;
using StoreRequestPtr = std::shared_ptr<StoreRequest>;
using ChildrenSet = std::unordered_set<String>;
using Edge = std::pair<String, String>;
using Edges = std::vector<Edge>;

/**
* Represent an entry in data tree.
Expand Down Expand Up @@ -160,6 +162,7 @@ class ConcurrentMap
bool erase(String const & key) { return mapFor(key).erase(key); }

InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBlocks]; }
UInt32 getBlockIndex(String const & key) { return hash_(key) % NumBlocks; }
UInt32 getBlockNum() const { return NumBlocks; }
InnerMap & getMap(const UInt32 & index) { return maps_[index]; }

Expand Down Expand Up @@ -200,6 +203,7 @@ class KeeperStore
using SessionIDs = std::vector<int64_t>;

using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>;
using BlocksEdges = std::array<Edges, MAP_BLOCK_NUM>;

/// global session id counter, used to allocate new session id.
/// It should be same across all nodes.
Expand Down Expand Up @@ -297,6 +301,9 @@ class KeeperStore
/// Build path children after load data from snapshot
void buildPathChildren(bool from_zk_snapshot = false);

// Build childrenset for the node in specified block after load data from snapshot.
void buildBlockChildren(const std::vector<BlocksEdges> & all_objects_edges, UInt32 block_idx);

void finalize();

/// Add session id. Used when restoring KeeperStorage from snapshot.
Expand Down
80 changes: 51 additions & 29 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ void KeeperSnapshotStore::parseBatchHeader(ptr<std::fstream> fs, SnapshotBatchHe
}
}

void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, BlocksEdges & blocks_edges)
{
ptr<std::fstream> snap_fs = cs_new<std::fstream>();
snap_fs->open(obj_path, std::ios::in | std::ios::binary);
Expand Down Expand Up @@ -491,7 +491,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
{
if (snap_fs->eof() && version_from_obj == SnapshotVersion::V0)
{
LOG_INFO(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj));
LOG_DEBUG(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj));
break;
}
throw Exception(
Expand All @@ -504,7 +504,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
char * buf = reinterpret_cast<char *>(&version_from_obj);
snap_fs->read(buf, sizeof(uint8_t));
read_size += 1;
LOG_INFO(log, "Got snapshot file header with version {}", toString(version_from_obj));
LOG_DEBUG(log, "Got snapshot file header with version {}", toString(version_from_obj));
if (version_from_obj > CURRENT_SNAPSHOT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version_from_obj);
}
Expand All @@ -514,7 +514,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
char * buf = reinterpret_cast<char *>(&file_checksum);
snap_fs->read(buf, sizeof(UInt32));
read_size += 4;
LOG_INFO(log, "obj_path {}, file_checksum {}, checksum {}.", obj_path, file_checksum, checksum);
LOG_DEBUG(log, "obj_path {}, file_checksum {}, checksum {}.", obj_path, file_checksum, checksum);
if (file_checksum != checksum)
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "snapshot {} checksum doesn't match", obj_path);
break;
Expand All @@ -527,20 +527,20 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
LOG_INFO(log, "snapshot has no version, set to V0", obj_path);
}

LOG_INFO(log, "obj_path {}, didn't read the header and tail of the file", obj_path);
LOG_DEBUG(log, "obj_path {}, didn't read the header and tail of the file", obj_path);
snap_fs->seekg(cur_read_size);
read_size = cur_read_size;
}

parseBatchHeader(snap_fs, header);

checksum = updateCheckSum(checksum, header.data_crc);
char * body_buf = new char[header.data_length];
String body_string(header.data_length, '0');
char * body_buf = body_string.data();
read_size += (SnapshotBatchHeader::HEADER_SIZE + header.data_length);

if (!snap_fs->read(body_buf, header.data_length))
{
delete[] body_buf;
throwFromErrno(
"Can't read snapshot object file " + obj_path + ", batch size " + std::to_string(header.data_length) + ", only "
+ std::to_string(snap_fs->gcount()) + " could be read",
Expand All @@ -549,41 +549,39 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)

if (!verifyCRC32(body_buf, header.data_length, header.data_crc))
{
delete[] body_buf;
throwFromErrno("Can't read snapshot object file " + obj_path + ", batch crc not match.", ErrorCodes::CORRUPTED_SNAPSHOT);
}

if (version_from_obj < SnapshotVersion::V2)
parseBatchBody(store, body_buf, header.data_length, version_from_obj);
parseBatchBody(store, body_string, blocks_edges, version_from_obj);
else
parseBatchBodyV2(store, body_buf, header.data_length, version_from_obj);
delete[] body_buf;
parseBatchBodyV2(store, body_string, blocks_edges, version_from_obj);
}
}

void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_)
void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, const String & body_string, BlocksEdges & blocks_edges, SnapshotVersion version_)
{
SnapshotBatchPB batch_pb;
batch_pb.ParseFromString(String(batch_buf, length));
batch_pb.ParseFromString(body_string);
switch (batch_pb.batch_type())
{
case SnapshotTypePB::SNAPSHOT_TYPE_DATA:
LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size());
parseBatchData(store, batch_pb, version_);
LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size());
parseBatchData(store, batch_pb, blocks_edges, version_);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: {
LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size());
parseBatchSession(store, batch_pb, version_);
}
break;
case SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP:
LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size());
parseBatchAclMap(store, batch_pb, version_);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP:
LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size());
parseBatchIntMap(store, batch_pb, version_);
LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
LOG_DEBUG(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG:
case SnapshotTypePB::SNAPSHOT_TYPE_SERVER:
Expand All @@ -593,29 +591,29 @@ void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf,
}
}

void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_)
void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & body_string, BlocksEdges & blocks_edges, SnapshotVersion version_)
{
ptr<SnapshotBatchBody> batch;
batch = SnapshotBatchBody::parse(String(batch_buf, length));
batch = SnapshotBatchBody::parse(body_string);
switch (batch->type)
{
case SnapshotBatchType::SNAPSHOT_TYPE_DATA:
LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch->size());
parseBatchDataV2(store, *batch, version_);
LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch->size());
parseBatchDataV2(store, *batch, blocks_edges, version_);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: {
LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch->size());
LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch->size());
parseBatchSessionV2(store, *batch, version_);
}
break;
case SnapshotBatchType::SNAPSHOT_TYPE_ACLMAP:
LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch->size());
LOG_DEBUG(log, "Parsing batch acl from snapshot, acl count {}", batch->size());
parseBatchAclMapV2(store, *batch, version_);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP:
LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch->size());
LOG_DEBUG(log, "Parsing batch int_map from snapshot, element count {}", batch->size());
parseBatchIntMapV2(store, *batch, version_);
LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
LOG_DEBUG(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_CONFIG:
case SnapshotBatchType::SNAPSHOT_TYPE_SERVER:
Expand All @@ -628,6 +626,10 @@ void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf
void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
{
ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM);
all_objects_edges = std::vector<BlocksEdges>(objects_path.size());

LOG_INFO(log, "Parse object from disk");

for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++)
{
object_thread_pool.trySchedule(
Expand All @@ -648,11 +650,12 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
this->objects_path.size());
try
{
this->parseObject(store, it->second);
this->parseObject(store, it->second, all_objects_edges[obj_idx]);
}
catch (Exception & e)
{
LOG_ERROR(log, "parseObject error {}, {}", it->second, getExceptionMessage(e, true));
throw;
}
}
obj_idx++;
Expand All @@ -661,8 +664,27 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
}
object_thread_pool.wait();

LOG_INFO(log, "Build ChildrenSet for nodes");

/// Build node tree relationship
store.buildPathChildren();
for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++)
{
object_thread_pool.trySchedule(
[this, thread_idx, &store]
{
Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildChildren"));
for (UInt32 block_idx = 0; block_idx < store.container.getBlockNum(); block_idx++)
{
if (block_idx % SNAPSHOT_THREAD_NUM == thread_idx)
{
LOG_INFO(thread_log, "Build ChildrenSet for nodes in block {}, thread_idx {}", block_idx, thread_idx);
store.buildBlockChildren(this->all_objects_edges, block_idx);
}
}
});
}

object_thread_pool.wait();

LOG_INFO(
log,
Expand Down
8 changes: 5 additions & 3 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ class KeeperSnapshotStore
void getObjectPath(ulong object_id, String & path);

/// Parse an snapshot object. We should take the version from snapshot in general.
void parseObject(KeeperStore & store, String obj_path);
void parseObject(KeeperStore & store, String obj_path, BlocksEdges &);

/// Parse batch header in an object
/// TODO use internal buffer
void parseBatchHeader(ptr<std::fstream> fs, SnapshotBatchHeader & head);

/// Parse a batch /// TODO delete
void parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_);
void parseBatchBody(KeeperStore & store, const String &, BlocksEdges &, SnapshotVersion version_);
/// Parse a batch
void parseBatchBodyV2(KeeperStore & store, char * buf, size_t length, SnapshotVersion version_);
void parseBatchBodyV2(KeeperStore & store, const String &, BlocksEdges &, SnapshotVersion version_);

/// Serialize whole data tree /// TODO delete
size_t serializeDataTree(KeeperStore & storage);
Expand Down Expand Up @@ -168,6 +168,8 @@ class KeeperSnapshotStore

std::map<ulong, String> objects_path;

std::vector<BlocksEdges> all_objects_edges;

/// Appended to snapshot file name
String curr_time;

Expand Down
45 changes: 39 additions & 6 deletions src/Service/SnapshotCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ ptr<KeeperNodeWithPath>parseKeeperNode(const String & buf, SnapshotVersion versi
Coordination::read(node->is_sequential, in);
Coordination::read(node->stat, in);

node->children.reserve(node->stat.numChildren);

return node_with_path;
}

Expand Down Expand Up @@ -412,7 +414,7 @@ template void serializeMap<StringMap>(StringMap & snap_map, UInt32 save_batch_si
template void serializeMap<IntMap>(IntMap & snap_map, UInt32 save_batch_size, SnapshotVersion version, String & path);


void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, SnapshotVersion version)
void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, BlocksEdges & blocks_edges, SnapshotVersion version)
{
for (int i = 0; i < batch.data_size(); i++)
{
Expand Down Expand Up @@ -444,14 +446,29 @@ void parseBatchData(KeeperStore & store, const SnapshotBatchPB & batch, Snapshot
store.acl_map.addUsage(node->acl_id);

auto ephemeral_owner = node->stat.ephemeralOwner;
store.container.emplace(path, std::move(node));

if (ephemeral_owner != 0)
{
std::lock_guard l(store.ephemerals_mutex);
auto & ephemeral_nodes = store.ephemerals[ephemeral_owner];
ephemeral_nodes.emplace(path);
}

store.container.emplace(path, std::move(node));

if (unlikely(path == "/"))
continue;

auto rslash_pos = path.rfind('/');

if (unlikely(rslash_pos < 0))
throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path);

auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos);
auto idx = store.container.getBlockIndex(parent_path);

// Storage edges in different bucket, according to the block index of parent node.
// Which allow us to insert child paths for all nodes in parallel.
blocks_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1));
}
}

Expand Down Expand Up @@ -844,7 +861,7 @@ ptr<SnapshotBatchBody> SnapshotBatchBody::parse(const String & data)
return batch_body;
}

void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVersion version)
void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, BlocksEdges & blocks_edges, SnapshotVersion version)
{
for (size_t i = 0; i < batch.size(); i++)
{
Expand Down Expand Up @@ -875,14 +892,30 @@ void parseBatchDataV2(KeeperStore & store, SnapshotBatchBody & batch, SnapshotVe
store.acl_map.addUsage(node->acl_id);

auto ephemeral_owner = node->stat.ephemeralOwner;
store.container.emplace(path, std::move(node));

if (ephemeral_owner != 0)
{
std::lock_guard l(store.ephemerals_mutex);
auto & ephemeral_nodes = store.ephemerals[ephemeral_owner];
ephemeral_nodes.emplace(path);
}

store.container.emplace(path, std::move(node));

if (unlikely(path == "/"))
continue;

auto rslash_pos = path.rfind('/');

if (unlikely(rslash_pos < 0))
throw Exception(ErrorCodes::CORRUPTED_SNAPSHOT, "Can't find parent path for path {}", path);

auto parent_path = rslash_pos == 0 ? "/" : path.substr(0, rslash_pos);
auto idx = store.container.getBlockIndex(parent_path);

// Storage edges in different bucket, according to the block index of parent node.
// Which allow us to insert child paths for all nodes in parallel.
blocks_edges[idx].emplace_back(std::move(parent_path), path.substr(rslash_pos + 1));

}
}

Expand Down
Loading

0 comments on commit 71ddfce

Please sign in to comment.