Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Save term info in snapshot filename #296

Merged
merged 4 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 38 additions & 32 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#include <algorithm>
#include <fmt/format.h>
#include <fcntl.h>
#include <filesystem>
#include <stdio.h>
#include <unistd.h>

#include <common/find_symbols.h>

#include <Poco/DateTime.h>
#include <Poco/DateTimeFormatter.h>
#include <Poco/File.h>
Expand Down Expand Up @@ -40,10 +43,8 @@ using Poco::NumberFormatter;

void KeeperSnapshotStore::getObjectPath(ulong object_id, String & obj_path)
{
char path_buf[1024];
snprintf(path_buf, 1024, SNAPSHOT_FILE_NAME, curr_time.c_str(), last_log_index, object_id);
obj_path = path_buf;
obj_path = snap_dir + "/" + obj_path;
SnapObject s_obj(curr_time.c_str(), last_log_term, last_log_index, object_id);
obj_path = snap_dir + "/" + s_obj.getObjectName();
}

size_t KeeperSnapshotStore::getObjectIdx(const String & file_name)
Expand Down Expand Up @@ -757,7 +758,7 @@ size_t KeeperSnapshotManager::createSnapshotAsync(SnapTask & snap_task, Snapshot
snap_task.next_session_id,
snap_task.next_zxid);
size_t obj_size = snap_store->createObjectsAsync(snap_task);
snapshots[meta->get_last_log_idx()] = snap_store;
snapshots[getSnapshotStoreMapKey(*meta)] = snap_store;
return obj_size;
}

Expand All @@ -781,26 +782,26 @@ size_t KeeperSnapshotManager::createSnapshot(
next_session_id,
next_zxid);
size_t obj_size = snap_store->createObjects(store, next_zxid, next_session_id);
snapshots[meta.get_last_log_idx()] = snap_store;
snapshots[getSnapshotStoreMapKey(meta)] = snap_store;
return obj_size;
}

bool KeeperSnapshotManager::receiveSnapshotMeta(snapshot & meta)
{
ptr<KeeperSnapshotStore> snap_store = cs_new<KeeperSnapshotStore>(snap_dir, meta, object_node_size);
snap_store->init();
snapshots[meta.get_last_log_idx()] = snap_store;
snapshots[getSnapshotStoreMapKey(meta)] = snap_store;
return true;
}

bool KeeperSnapshotManager::existSnapshot(const snapshot & meta)
{
return snapshots.find(meta.get_last_log_idx()) != snapshots.end();
return snapshots.find(getSnapshotStoreMapKey(meta)) != snapshots.end();
}

bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id)
{
auto it = snapshots.find(meta.get_last_log_idx());
auto it = snapshots.find(getSnapshotStoreMapKey(meta));
if (it == snapshots.end())
{
LOG_INFO(log, "Not exists snapshot last_log_idx {}", meta.get_last_log_idx());
Expand All @@ -814,7 +815,7 @@ bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj

bool KeeperSnapshotManager::loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr<buffer> & buffer)
{
auto it = snapshots.find(meta.get_last_log_idx());
auto it = snapshots.find(getSnapshotStoreMapKey(meta));

if (it == snapshots.end())
throw Exception(
Expand All @@ -831,14 +832,14 @@ bool KeeperSnapshotManager::loadSnapshotObject(const snapshot & meta, ulong obj_

bool KeeperSnapshotManager::saveSnapshotObject(snapshot & meta, ulong obj_id, buffer & buffer)
{
auto it = snapshots.find(meta.get_last_log_idx());
auto it = snapshots.find(getSnapshotStoreMapKey(meta));
ptr<KeeperSnapshotStore> store;
if (it == snapshots.end())
{
meta.set_size(0);
store = cs_new<KeeperSnapshotStore>(snap_dir, meta);
store->init();
snapshots[meta.get_last_log_idx()] = store;
snapshots[getSnapshotStoreMapKey(meta)] = store;
}
else
{
Expand All @@ -850,7 +851,7 @@ bool KeeperSnapshotManager::saveSnapshotObject(snapshot & meta, ulong obj_id, bu

bool KeeperSnapshotManager::parseSnapshot(const snapshot & meta, KeeperStore & storage)
{
auto it = snapshots.find(meta.get_last_log_idx());
auto it = snapshots.find(getSnapshotStoreMapKey(meta));
if (it == snapshots.end())
{
throw Exception(ErrorCodes::SNAPSHOT_NOT_EXISTS, "Error when parsing snapshot {}, for it does not exist", meta.get_last_log_idx());
Expand All @@ -869,31 +870,35 @@ size_t KeeperSnapshotManager::loadSnapshotMetas()

std::vector<String> file_vec;
file_dir.list(file_vec);
char time_str[128];

unsigned long log_last_index;
unsigned long object_id;

for (const auto & file : file_vec)
{
if (file.find("snapshot_") == file.npos)
{
LOG_INFO(log, "Skip non-snapshot file {}", file);
LOG_WARNING(log, "Skip non-snapshot file {}", file);
continue;
}
sscanf(file.c_str(), "snapshot_%[^_]_%lu_%lu", time_str, &log_last_index, &object_id);

if (snapshots.find(log_last_index) == snapshots.end())
SnapObject s_obj;
if (!s_obj.parseInfoFromObjectName(file))
{
ptr<nuraft::cluster_config> config = cs_new<nuraft::cluster_config>(log_last_index, log_last_index - 1);
nuraft::snapshot meta(log_last_index, 1, config);
LOG_ERROR(log, "Can't parse object info from file name {}", file);
continue;
}

auto key = getSnapshotStoreMapKey(s_obj);

if (snapshots.find(key) == snapshots.end())
{
ptr<nuraft::cluster_config> config = cs_new<nuraft::cluster_config>(s_obj.log_last_index, s_obj.log_last_index - 1);
nuraft::snapshot meta(s_obj.log_last_index, s_obj.log_last_term, config);
ptr<KeeperSnapshotStore> snap_store = cs_new<KeeperSnapshotStore>(snap_dir, meta, object_node_size);
snap_store->init(time_str);
snapshots[meta.get_last_log_idx()] = snap_store;
LOG_INFO(log, "Load filename {}, time {}, index {}, object id {}", file, time_str, log_last_index, object_id);
snap_store->init(s_obj.create_time);
snapshots[key] = snap_store;
}
String full_path = snap_dir + "/" + file;
snapshots[log_last_index]->addObjectPath(object_id, full_path);
LOG_INFO(log, "Load filename {}, term {}, index {}, object id {}", file, s_obj.log_last_term, s_obj.log_last_index, s_obj.object_id);
snapshots[key]->addObjectPath(s_obj.object_id, full_path);
}
LOG_INFO(log, "Load snapshot metas {} from snapshot directory {}", snapshots.size(), snap_dir);
return snapshots.size();
Expand All @@ -911,10 +916,6 @@ ptr<snapshot> KeeperSnapshotManager::lastSnapshot()
size_t KeeperSnapshotManager::removeSnapshots()
{
Int64 remove_count = static_cast<Int64>(snapshots.size()) - static_cast<Int64>(keep_max_snapshot_count);
char time_str[128];

unsigned long log_last_index;
unsigned long object_id;

while (remove_count > 0)
{
Expand All @@ -932,8 +933,13 @@ size_t KeeperSnapshotManager::removeSnapshots()
LOG_INFO(log, "Skip no snapshot file {}", file);
continue;
}
sscanf(file.c_str(), "snapshot_%[^_]_%lu_%lu", time_str, &log_last_index, &object_id);
if (remove_log_index == log_last_index)
SnapObject s_obj;
if (!s_obj.parseInfoFromObjectName(file))
{
LOG_ERROR(log, "Can't parse object info from file name {}", file);
continue;
}
if (remove_log_index == s_obj.log_last_index)
{
LOG_INFO(
log,
Expand Down
96 changes: 83 additions & 13 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Service/SnapshotCommon.h>
#include <Service/Metrics.h>
#include <Common/Stopwatch.h>
#include <charconv>


namespace RK
Expand Down Expand Up @@ -49,6 +50,70 @@ struct SnapTask
}
};

struct SnapObject
{
/// create_time, last_log_index, object_id
static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_{}_{}_{}";
/// create_time, last_log_term, last_log_index, object_id
static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_{}_{}_{}_{}";

String create_time;
UInt64 log_last_term;
UInt64 log_last_index;
UInt64 object_id;

SnapObject(String _create_time = "", UInt64 _log_last_term = 1, UInt64 _log_last_index = 1, UInt64 _object_id = 1)
:create_time(_create_time), log_last_term(_log_last_term), log_last_index(_log_last_index), object_id(_object_id)
{
}

String getObjectName()
{
return fmt::format(SNAPSHOT_FILE_NAME_V1, create_time, log_last_term, log_last_index, object_id);
}

bool parseInfoFromObjectName(const String & object_name)
{
auto tryReadUint64Text = [] (const String & str, UInt64 & num)
{
auto [_, ec] = std::from_chars(str.data(), str.data() + str.size(), num);
return ec == std::errc();
};

Strings tokens;

splitInto<'_'>(tokens, object_name);

if (tokens.size() == 4)
{
if (!tryReadUint64Text(tokens[2], log_last_index))
return false;

if (!tryReadUint64Text(tokens[3], object_id))
return false;
}
else if (tokens.size() == 5)
{
if (!tryReadUint64Text(tokens[2], log_last_term))
return false;

if (!tryReadUint64Text(tokens[3], log_last_index))
return false;

if (!tryReadUint64Text(tokens[4], object_id))
return false;
}
else
{
return false;
}

create_time = std::move(tokens[1]);

return true;
}
};

/**
* Operate a snapshot, when the current snapshot is down, we should renew a store.
*
Expand Down Expand Up @@ -121,18 +186,6 @@ class KeeperSnapshotStore
/// parse object id from file name
static size_t getObjectIdx(const String & file_name);

#ifdef __APPLE__
/// create_time, last_log_index, object_id
static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_%s_%llu_%llu";
/// create_time, last_log_index, last_log_term, object_id
static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_%s_%llu_%llu_%llu";
#else
/// create_time, last_log_index, object_id
static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_%s_%lu_%lu";
/// create_time, last_log_index, last_log_term, object_id
static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_%s_%lu_%lu_%lu";
#endif

static constexpr int SNAPSHOT_THREAD_NUM = 8;
static constexpr int IO_BUFFER_SIZE = 16384; /// 16K

Expand Down Expand Up @@ -216,7 +269,24 @@ class KeeperSnapshotStore
std::shared_ptr<ThreadPool> snapshot_thread;
};

using KeeperSnapshotStoreMap = std::map<uint64_t, ptr<KeeperSnapshotStore>>;
// In Raft, each log entry can be uniquely identified by the combination of its Log Index and Term.
inline uint128_t getSnapshotStoreMapKeyImpl(UInt64 log_term, UInt64 log_idx)
{
return static_cast<uint128_t>(log_term) << 64 | log_idx;
}

inline uint128_t getSnapshotStoreMapKey(const snapshot & meta)
{
return getSnapshotStoreMapKeyImpl(meta.get_last_log_term(), meta.get_last_log_idx());
}

inline uint128_t getSnapshotStoreMapKey(const SnapObject & obj)
{
return getSnapshotStoreMapKeyImpl(obj.log_last_term, obj.log_last_index);
}

// Map key is term << 64 | log
using KeeperSnapshotStoreMap = std::map<uint128_t, ptr<KeeperSnapshotStore>>;

/**
* Snapshots manager who may create, remove snapshots.
Expand Down
2 changes: 0 additions & 2 deletions src/Service/SnapshotCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ String toString(SnapshotVersion version)
return "v1";
case SnapshotVersion::V2:
return "v2";
case SnapshotVersion::V3:
return "v3";
case SnapshotVersion::None:
return "none";
}
Expand Down
1 change: 0 additions & 1 deletion src/Service/SnapshotCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ enum SnapshotVersion : uint8_t
V0 = 0,
V1 = 1, /// Add ACL map
V2 = 2, /// Replace protobuf
V3 = 3, /// Add last_log_term to file name
None = 255,
};

Expand Down
Loading