Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make snapshot load faster #222

Merged
merged 3 commits into from
Mar 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build-and-test-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ jobs:
submodules: recursive

- name: Install tools
run: brew install ninja ccache
run: brew install ninja ccache cmake llvm@13

- name: Generate Makefile
run: export CC=`which clang` CXX=`which clang++` && cmake -G Ninja -B ./build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}}
run: export CC=$(brew --prefix llvm@13)/bin/clang CXX=$(brew --prefix llvm@13)/bin/clang++ && cmake -G Ninja -B ./build -DCMAKE_BUILD_TYPE=${{env.BUILD_TYPE}}

- name: Build
working-directory: ${{github.workspace}}/build
Expand Down
22 changes: 20 additions & 2 deletions src/Service/KeeperStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1635,9 +1635,9 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot)
{
LOG_INFO(log, "build path children in keeper storage {}", container.size());
/// build children
for (UInt32 block_idx = 0; block_idx < container.getBlockNum(); block_idx++)
for (UInt32 bucket_idx = 0; bucket_idx < container.getBucketNum(); bucket_idx++)
{
for (const auto & it : container.getMap(block_idx).getMap())
for (const auto & it : container.getMap(bucket_idx).getMap())
{
if (it.first == "/")
continue;
Expand All @@ -1659,6 +1659,24 @@ void KeeperStore::buildPathChildren(bool from_zk_snapshot)
}
}

void KeeperStore::buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_idx)
{
for (auto & object_edges : all_objects_edges)
{
for (auto & [parent_path, path] : object_edges[bucket_idx])
{
auto parent = container.get(parent_path);

if (unlikely(parent == nullptr))
{
throw RK::Exception("Logical error: Build : can not find parent for node " + path, ErrorCodes::LOGICAL_ERROR);
}

parent->children.emplace(std::move(path));
}
}
}

void KeeperStore::cleanEphemeralNodes(int64_t session_id, ThreadSafeQueue<ResponseForSession> & responses_queue, bool ignore_response)
{
LOG_DEBUG(log, "Clean ephemeral nodes for session {}", toHexString(session_id));
Expand Down
26 changes: 18 additions & 8 deletions src/Service/KeeperStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ namespace RK
struct StoreRequest;
using StoreRequestPtr = std::shared_ptr<StoreRequest>;
using ChildrenSet = std::unordered_set<String>;
using Edge = std::pair<String, String>;
using Edges = std::vector<Edge>;

/**
* Represent an entry in data tree.
Expand Down Expand Up @@ -82,7 +84,7 @@ struct KeeperNodeWithPath
/// Map for data tree, it is thread safe with read write lock.
/// ConcurrentMap is a two-level unordered_map which is designed
/// to reduce latency for unordered_map scales.
template <typename Element, unsigned NumBlocks>
template <typename Element, unsigned NumBuckets>
class ConcurrentMap
{
public:
Expand Down Expand Up @@ -147,7 +149,7 @@ class ConcurrentMap
};

private:
std::array<InnerMap, NumBlocks> maps_;
std::array<InnerMap, NumBuckets> maps_;
std::hash<String> hash_;

public:
Expand All @@ -159,8 +161,9 @@ class ConcurrentMap
size_t count(const String & key) { return get(key) != nullptr ? 1 : 0; }
bool erase(String const & key) { return mapFor(key).erase(key); }

InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBlocks]; }
UInt32 getBlockNum() const { return NumBlocks; }
InnerMap & mapFor(String const & key) { return maps_[hash_(key) % NumBuckets]; }
UInt32 getBucketIndex(String const & key) { return hash_(key) % NumBuckets; }
UInt32 getBucketNum() const { return NumBuckets; }
InnerMap & getMap(const UInt32 & index) { return maps_[index]; }

void clear()
Expand All @@ -182,9 +185,9 @@ class ConcurrentMap
class KeeperStore
{
public:
/// block num for ConcurrentMap
static constexpr int MAP_BLOCK_NUM = 16;
using Container = ConcurrentMap<KeeperNode, MAP_BLOCK_NUM>;
/// bucket num for ConcurrentMap
static constexpr int MAP_BUCKET_NUM = 16;
using Container = ConcurrentMap<KeeperNode, MAP_BUCKET_NUM>;

using ResponsesForSessions = std::vector<ResponseForSession>;
using KeeperResponsesQueue = ThreadSafeQueue<ResponseForSession>;
Expand All @@ -201,6 +204,10 @@ class KeeperStore

using Watches = std::unordered_map<String /* path, relative of root_path */, SessionIDs>;

/// Hold Edges in different Buckets based on the parent node's bucket number.
/// It should be used when load snapshot to built node's childrenSet in parallel without lock.
using BucketEdges = std::array<Edges, MAP_BUCKET_NUM>;

/// global session id counter, used to allocate new session id.
/// It should be same across all nodes.
int64_t session_id_counter{1};
Expand Down Expand Up @@ -297,6 +304,9 @@ class KeeperStore
/// Build path children after load data from snapshot
void buildPathChildren(bool from_zk_snapshot = false);

// Build childrenSet for the node in specified bucket after load data from snapshot.
void buildBucketChildren(const std::vector<BucketEdges> & all_objects_edges, UInt32 bucket_idx);

void finalize();

/// Add session id. Used when restoring KeeperStorage from snapshot.
Expand Down Expand Up @@ -335,7 +345,7 @@ class KeeperStore
uint64_t getApproximateDataSize() const
{
UInt64 node_count = container.size();
UInt64 size_bytes = container.getBlockNum() * sizeof(Container::InnerMap) /* Inner map size */
UInt64 size_bytes = container.getBucketNum() * sizeof(Container::InnerMap) /* Inner map size */
+ node_count * 8 / 0.75 /*hash map array size*/
+ node_count * sizeof(KeeperNode) /*node size*/
+ node_count * 100; /*path and child of node size*/
Expand Down
80 changes: 51 additions & 29 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ void KeeperSnapshotStore::parseBatchHeader(ptr<std::fstream> fs, SnapshotBatchHe
}
}

void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path, BucketEdges & buckets_edges)
{
ptr<std::fstream> snap_fs = cs_new<std::fstream>();
snap_fs->open(obj_path, std::ios::in | std::ios::binary);
Expand Down Expand Up @@ -491,7 +491,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
{
if (snap_fs->eof() && version_from_obj == SnapshotVersion::V0)
{
LOG_INFO(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj));
LOG_DEBUG(log, "obj_path {}, read file tail, version {}", obj_path, uint8_t(version_from_obj));
break;
}
throw Exception(
Expand All @@ -504,7 +504,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
char * buf = reinterpret_cast<char *>(&version_from_obj);
snap_fs->read(buf, sizeof(uint8_t));
read_size += 1;
LOG_INFO(log, "Got snapshot file header with version {}", toString(version_from_obj));
LOG_DEBUG(log, "Got snapshot file header with version {}", toString(version_from_obj));
if (version_from_obj > CURRENT_SNAPSHOT_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported snapshot version {}", version_from_obj);
}
Expand All @@ -514,7 +514,7 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
char * buf = reinterpret_cast<char *>(&file_checksum);
snap_fs->read(buf, sizeof(UInt32));
read_size += 4;
LOG_INFO(log, "obj_path {}, file_checksum {}, checksum {}.", obj_path, file_checksum, checksum);
LOG_DEBUG(log, "obj_path {}, file_checksum {}, checksum {}.", obj_path, file_checksum, checksum);
if (file_checksum != checksum)
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "snapshot {} checksum doesn't match", obj_path);
break;
Expand All @@ -527,20 +527,20 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)
LOG_INFO(log, "snapshot has no version, set to V0", obj_path);
}

LOG_INFO(log, "obj_path {}, didn't read the header and tail of the file", obj_path);
LOG_DEBUG(log, "obj_path {}, didn't read the header and tail of the file", obj_path);
snap_fs->seekg(cur_read_size);
read_size = cur_read_size;
}

parseBatchHeader(snap_fs, header);

checksum = updateCheckSum(checksum, header.data_crc);
char * body_buf = new char[header.data_length];
String body_string(header.data_length, '0');
char * body_buf = body_string.data();
read_size += (SnapshotBatchHeader::HEADER_SIZE + header.data_length);

if (!snap_fs->read(body_buf, header.data_length))
{
delete[] body_buf;
throwFromErrno(
"Can't read snapshot object file " + obj_path + ", batch size " + std::to_string(header.data_length) + ", only "
+ std::to_string(snap_fs->gcount()) + " could be read",
Expand All @@ -549,41 +549,39 @@ void KeeperSnapshotStore::parseObject(KeeperStore & store, String obj_path)

if (!verifyCRC32(body_buf, header.data_length, header.data_crc))
{
delete[] body_buf;
throwFromErrno("Can't read snapshot object file " + obj_path + ", batch crc not match.", ErrorCodes::CORRUPTED_SNAPSHOT);
}

if (version_from_obj < SnapshotVersion::V2)
parseBatchBody(store, body_buf, header.data_length, version_from_obj);
parseBatchBody(store, body_string, buckets_edges, version_from_obj);
else
parseBatchBodyV2(store, body_buf, header.data_length, version_from_obj);
delete[] body_buf;
parseBatchBodyV2(store, body_string, buckets_edges, version_from_obj);
}
}

void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_)
void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, const String & body_string, BucketEdges & buckets_edges, SnapshotVersion version_)
{
SnapshotBatchPB batch_pb;
batch_pb.ParseFromString(String(batch_buf, length));
batch_pb.ParseFromString(body_string);
switch (batch_pb.batch_type())
{
case SnapshotTypePB::SNAPSHOT_TYPE_DATA:
LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size());
parseBatchData(store, batch_pb, version_);
LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch_pb.data_size());
parseBatchData(store, batch_pb, buckets_edges, version_);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_SESSION: {
LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch_pb.data_size());
parseBatchSession(store, batch_pb, version_);
}
break;
case SnapshotTypePB::SNAPSHOT_TYPE_ACLMAP:
LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch acl from snapshot, acl count {}", batch_pb.data_size());
parseBatchAclMap(store, batch_pb, version_);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_UINTMAP:
LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size());
LOG_DEBUG(log, "Parsing batch int_map from snapshot, element count {}", batch_pb.data_size());
parseBatchIntMap(store, batch_pb, version_);
LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
LOG_DEBUG(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
break;
case SnapshotTypePB::SNAPSHOT_TYPE_CONFIG:
case SnapshotTypePB::SNAPSHOT_TYPE_SERVER:
Expand All @@ -593,29 +591,29 @@ void KeeperSnapshotStore::parseBatchBody(KeeperStore & store, char * batch_buf,
}
}

void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_)
void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, const String & body_string, BucketEdges & buckets_edges, SnapshotVersion version_)
{
ptr<SnapshotBatchBody> batch;
batch = SnapshotBatchBody::parse(String(batch_buf, length));
batch = SnapshotBatchBody::parse(body_string);
switch (batch->type)
{
case SnapshotBatchType::SNAPSHOT_TYPE_DATA:
LOG_INFO(log, "Parsing batch data from snapshot, data count {}", batch->size());
parseBatchDataV2(store, *batch, version_);
LOG_DEBUG(log, "Parsing batch data from snapshot, data count {}", batch->size());
parseBatchDataV2(store, *batch, buckets_edges, version_);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_SESSION: {
LOG_INFO(log, "Parsing batch session from snapshot, session count {}", batch->size());
LOG_DEBUG(log, "Parsing batch session from snapshot, session count {}", batch->size());
parseBatchSessionV2(store, *batch, version_);
}
break;
case SnapshotBatchType::SNAPSHOT_TYPE_ACLMAP:
LOG_INFO(log, "Parsing batch acl from snapshot, acl count {}", batch->size());
LOG_DEBUG(log, "Parsing batch acl from snapshot, acl count {}", batch->size());
parseBatchAclMapV2(store, *batch, version_);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_UINTMAP:
LOG_INFO(log, "Parsing batch int_map from snapshot, element count {}", batch->size());
LOG_DEBUG(log, "Parsing batch int_map from snapshot, element count {}", batch->size());
parseBatchIntMapV2(store, *batch, version_);
LOG_INFO(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
LOG_DEBUG(log, "Parsed zxid {}, session_id_counter {}", store.zxid, store.session_id_counter);
break;
case SnapshotBatchType::SNAPSHOT_TYPE_CONFIG:
case SnapshotBatchType::SNAPSHOT_TYPE_SERVER:
Expand All @@ -628,6 +626,10 @@ void KeeperSnapshotStore::parseBatchBodyV2(KeeperStore & store, char * batch_buf
void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
{
ThreadPool object_thread_pool(SNAPSHOT_THREAD_NUM);
all_objects_edges = std::vector<BucketEdges>(objects_path.size());

LOG_INFO(log, "Parse object from disk");

for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++)
{
object_thread_pool.trySchedule(
Expand All @@ -648,11 +650,12 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
this->objects_path.size());
try
{
this->parseObject(store, it->second);
this->parseObject(store, it->second, all_objects_edges[obj_idx]);
}
catch (Exception & e)
{
LOG_ERROR(log, "parseObject error {}, {}", it->second, getExceptionMessage(e, true));
throw;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please throw Exception and add an error code

}
}
obj_idx++;
Expand All @@ -661,8 +664,27 @@ void KeeperSnapshotStore::loadLatestSnapshot(KeeperStore & store)
}
object_thread_pool.wait();

LOG_INFO(log, "Build ChildrenSet for nodes");

/// Build node tree relationship
store.buildPathChildren();
for (UInt32 thread_idx = 0; thread_idx < SNAPSHOT_THREAD_NUM; thread_idx++)
{
object_thread_pool.trySchedule(
[this, thread_idx, &store]
{
Poco::Logger * thread_log = &(Poco::Logger::get("KeeperSnapshotStore.buildChildren"));
for (UInt32 bucket_idx = 0; bucket_idx < store.container.getBucketNum(); bucket_idx++)
{
if (bucket_idx % SNAPSHOT_THREAD_NUM == thread_idx)
{
LOG_INFO(thread_log, "Build ChildrenSet for nodes in bucket {}, thread_idx {}", bucket_idx, thread_idx);
store.buildBucketChildren(this->all_objects_edges, bucket_idx);
}
}
});
}

object_thread_pool.wait();

LOG_INFO(
log,
Expand Down
8 changes: 5 additions & 3 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ class KeeperSnapshotStore
void getObjectPath(ulong object_id, String & path);

/// Parse an snapshot object. We should take the version from snapshot in general.
void parseObject(KeeperStore & store, String obj_path);
void parseObject(KeeperStore & store, String obj_path, BucketEdges &);

/// Parse batch header in an object
/// TODO use internal buffer
void parseBatchHeader(ptr<std::fstream> fs, SnapshotBatchHeader & head);

/// Parse a batch /// TODO delete
void parseBatchBody(KeeperStore & store, char * batch_buf, size_t length, SnapshotVersion version_);
void parseBatchBody(KeeperStore & store, const String &, BucketEdges &, SnapshotVersion version_);
/// Parse a batch
void parseBatchBodyV2(KeeperStore & store, char * buf, size_t length, SnapshotVersion version_);
void parseBatchBodyV2(KeeperStore & store, const String &, BucketEdges &, SnapshotVersion version_);

/// Serialize whole data tree /// TODO delete
size_t serializeDataTree(KeeperStore & storage);
Expand Down Expand Up @@ -168,6 +168,8 @@ class KeeperSnapshotStore

std::map<ulong, String> objects_path;

std::vector<BucketEdges> all_objects_edges;

/// Appended to snapshot file name
String curr_time;

Expand Down
Loading
Loading