Skip to content

Commit

Permalink
Save term info in snapshot filename
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy authored and JackyWoo committed Jun 5, 2024
1 parent f6dc73d commit 87c638d
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 45 deletions.
70 changes: 38 additions & 32 deletions src/Service/NuRaftLogSnapshot.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#include <algorithm>
#include <fmt/format.h>
#include <fcntl.h>
#include <filesystem>
#include <stdio.h>
#include <unistd.h>

#include "common/find_symbols.h"

#include <Poco/DateTime.h>
#include <Poco/DateTimeFormatter.h>
#include <Poco/File.h>
Expand Down Expand Up @@ -40,10 +43,8 @@ using Poco::NumberFormatter;

void KeeperSnapshotStore::getObjectPath(ulong object_id, String & obj_path)
{
char path_buf[1024];
snprintf(path_buf, 1024, SNAPSHOT_FILE_NAME, curr_time.c_str(), last_log_index, object_id);
obj_path = path_buf;
obj_path = snap_dir + "/" + obj_path;
SnapObject s_obj(curr_time.c_str(), last_log_term, last_log_index, object_id);
obj_path = snap_dir + "/" + s_obj.getObjectName();
}

size_t KeeperSnapshotStore::getObjectIdx(const String & file_name)
Expand Down Expand Up @@ -757,7 +758,7 @@ size_t KeeperSnapshotManager::createSnapshotAsync(SnapTask & snap_task, Snapshot
snap_task.next_session_id,
snap_task.next_zxid);
size_t obj_size = snap_store->createObjectsAsync(snap_task);
snapshots[meta->get_last_log_idx()] = snap_store;
snapshots[getSnapshotStoreMapKey(*meta)] = snap_store;
return obj_size;
}

Expand All @@ -781,26 +782,26 @@ size_t KeeperSnapshotManager::createSnapshot(
next_session_id,
next_zxid);
size_t obj_size = snap_store->createObjects(store, next_zxid, next_session_id);
snapshots[meta.get_last_log_idx()] = snap_store;
snapshots[getSnapshotStoreMapKey(meta)] = snap_store;
return obj_size;
}

bool KeeperSnapshotManager::receiveSnapshotMeta(snapshot & meta)
{
ptr<KeeperSnapshotStore> snap_store = cs_new<KeeperSnapshotStore>(snap_dir, meta, object_node_size);
snap_store->init();
snapshots[meta.get_last_log_idx()] = snap_store;
snapshots[getSnapshotStoreMapKey(meta)] = snap_store;
return true;
}

bool KeeperSnapshotManager::existSnapshot(const snapshot & meta)
{
return snapshots.find(meta.get_last_log_idx()) != snapshots.end();
return snapshots.find(getSnapshotStoreMapKey(meta)) != snapshots.end();
}

bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id)
{
auto it = snapshots.find(meta.get_last_log_idx());
auto it = snapshots.find(getSnapshotStoreMapKey(meta));
if (it == snapshots.end())
{
LOG_INFO(log, "Not exists snapshot last_log_idx {}", meta.get_last_log_idx());
Expand All @@ -814,7 +815,7 @@ bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj

bool KeeperSnapshotManager::loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr<buffer> & buffer)
{
auto it = snapshots.find(meta.get_last_log_idx());
auto it = snapshots.find(getSnapshotStoreMapKey(meta));

if (it == snapshots.end())
throw Exception(
Expand All @@ -831,14 +832,14 @@ bool KeeperSnapshotManager::loadSnapshotObject(const snapshot & meta, ulong obj_

bool KeeperSnapshotManager::saveSnapshotObject(snapshot & meta, ulong obj_id, buffer & buffer)
{
auto it = snapshots.find(meta.get_last_log_idx());
auto it = snapshots.find(getSnapshotStoreMapKey(meta));
ptr<KeeperSnapshotStore> store;
if (it == snapshots.end())
{
meta.set_size(0);
store = cs_new<KeeperSnapshotStore>(snap_dir, meta);
store->init();
snapshots[meta.get_last_log_idx()] = store;
snapshots[getSnapshotStoreMapKey(meta)] = store;
}
else
{
Expand All @@ -850,7 +851,7 @@ bool KeeperSnapshotManager::saveSnapshotObject(snapshot & meta, ulong obj_id, bu

bool KeeperSnapshotManager::parseSnapshot(const snapshot & meta, KeeperStore & storage)
{
auto it = snapshots.find(meta.get_last_log_idx());
auto it = snapshots.find(getSnapshotStoreMapKey(meta));
if (it == snapshots.end())
{
throw Exception(ErrorCodes::SNAPSHOT_NOT_EXISTS, "Error when parsing snapshot {}, for it does not exist", meta.get_last_log_idx());
Expand All @@ -869,31 +870,35 @@ size_t KeeperSnapshotManager::loadSnapshotMetas()

std::vector<String> file_vec;
file_dir.list(file_vec);
char time_str[128];

unsigned long log_last_index;
unsigned long object_id;

for (const auto & file : file_vec)
{
if (file.find("snapshot_") == file.npos)
{
LOG_INFO(log, "Skip non-snapshot file {}", file);
LOG_WARNING(log, "Skip non-snapshot file {}", file);
continue;
}
sscanf(file.c_str(), "snapshot_%[^_]_%lu_%lu", time_str, &log_last_index, &object_id);

if (snapshots.find(log_last_index) == snapshots.end())
SnapObject s_obj;
if (!s_obj.parseInfoFromObjectName(file))
{
ptr<nuraft::cluster_config> config = cs_new<nuraft::cluster_config>(log_last_index, log_last_index - 1);
nuraft::snapshot meta(log_last_index, 1, config);
LOG_ERROR(log, "Can't parse object info from file name {}", file);
continue;
}

auto key = static_cast<uint128_t>(s_obj.log_last_term) << 64 | s_obj.log_last_index;

if (snapshots.find(key) == snapshots.end())
{
ptr<nuraft::cluster_config> config = cs_new<nuraft::cluster_config>(s_obj.log_last_index, s_obj.log_last_index - 1);
nuraft::snapshot meta(s_obj.log_last_index, s_obj.log_last_term, config);
ptr<KeeperSnapshotStore> snap_store = cs_new<KeeperSnapshotStore>(snap_dir, meta, object_node_size);
snap_store->init(time_str);
snapshots[meta.get_last_log_idx()] = snap_store;
LOG_INFO(log, "Load filename {}, time {}, index {}, object id {}", file, time_str, log_last_index, object_id);
snap_store->init(s_obj.time_str);
snapshots[key] = snap_store;
LOG_INFO(log, "Load filename {}, term {}, index {}, object id {}", file, s_obj.log_last_term, s_obj.log_last_index, s_obj.object_id);
}
String full_path = snap_dir + "/" + file;
snapshots[log_last_index]->addObjectPath(object_id, full_path);
snapshots[key]->addObjectPath(s_obj.object_id, full_path);
}
LOG_INFO(log, "Load snapshot metas {} from snapshot directory {}", snapshots.size(), snap_dir);
return snapshots.size();
Expand All @@ -911,10 +916,6 @@ ptr<snapshot> KeeperSnapshotManager::lastSnapshot()
size_t KeeperSnapshotManager::removeSnapshots()
{
Int64 remove_count = static_cast<Int64>(snapshots.size()) - static_cast<Int64>(keep_max_snapshot_count);
char time_str[128];

unsigned long log_last_index;
unsigned long object_id;

while (remove_count > 0)
{
Expand All @@ -932,8 +933,13 @@ size_t KeeperSnapshotManager::removeSnapshots()
LOG_INFO(log, "Skip no snapshot file {}", file);
continue;
}
sscanf(file.c_str(), "snapshot_%[^_]_%lu_%lu", time_str, &log_last_index, &object_id);
if (remove_log_index == log_last_index)
SnapObject s_obj;
if (!s_obj.parseInfoFromObjectName(file))
{
LOG_ERROR(log, "Can't parse object info from file name {}", file);
continue;
}
if (remove_log_index == s_obj.log_last_index)
{
LOG_INFO(
log,
Expand Down
86 changes: 73 additions & 13 deletions src/Service/NuRaftLogSnapshot.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <Service/SnapshotCommon.h>
#include <Service/Metrics.h>
#include <Common/Stopwatch.h>
#include <charconv>


namespace RK
Expand Down Expand Up @@ -49,6 +50,70 @@ struct SnapTask
}
};

struct SnapObject
{
/// create_time, last_log_index, object_id
static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_{}_{}_{}";
/// create_time, last_log_term, last_log_index, object_id
static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_{}_{}_{}_{}";

String time_str;
UInt64 log_last_term;
UInt64 log_last_index;
UInt64 object_id;

SnapObject(String _time_str = "", UInt64 _log_last_term = 1, UInt64 _log_last_index = 1, UInt64 _object_id = 1)
:time_str(_time_str), log_last_term(_log_last_term), log_last_index(_log_last_index), object_id(_object_id)
{
}

String getObjectName()
{
return fmt::format(SNAPSHOT_FILE_NAME_V1, time_str, log_last_term, log_last_index, object_id);
}

bool parseInfoFromObjectName(const String & object_name)
{
auto tryReadUint64Text = [] (const String & str, UInt64 & num)
{
auto [_, ec] = std::from_chars(str.data(), str.data() + str.size(), num);
return ec == std::errc();
};

Strings tokens;

splitInto<'_'>(tokens, object_name);

if (tokens.size() == 4)
{
if (!tryReadUint64Text(tokens[2], log_last_index))
return false;

if (!tryReadUint64Text(tokens[3], object_id))
return false;
}
else if (tokens.size() == 5)
{
if (!tryReadUint64Text(tokens[2], log_last_term))
return false;

if (!tryReadUint64Text(tokens[3], log_last_index))
return false;

if (!tryReadUint64Text(tokens[2], object_id))
return false;
}
else
{
return false;
}

time_str = std::move(tokens[1]);

return true;
}
};

/**
* Operate a snapshot, when the current snapshot is down, we should renew a store.
*
Expand Down Expand Up @@ -121,18 +186,6 @@ class KeeperSnapshotStore
/// parse object id from file name
static size_t getObjectIdx(const String & file_name);

#ifdef __APPLE__
/// create_time, last_log_index, object_id
static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_%s_%llu_%llu";
/// create_time, last_log_index, last_log_term, object_id
static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_%s_%llu_%llu_%llu";
#else
/// create_time, last_log_index, object_id
static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_%s_%lu_%lu";
/// create_time, last_log_index, last_log_term, object_id
static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_%s_%lu_%lu_%lu";
#endif

static constexpr int SNAPSHOT_THREAD_NUM = 8;
static constexpr int IO_BUFFER_SIZE = 16384; /// 16K

Expand Down Expand Up @@ -216,7 +269,14 @@ class KeeperSnapshotStore
std::shared_ptr<ThreadPool> snapshot_thread;
};

using KeeperSnapshotStoreMap = std::map<uint64_t, ptr<KeeperSnapshotStore>>;
// In Raft, each log entry can be uniquely identified by the combination of its Log Index and Term.
inline uint128_t getSnapshotStoreMapKey(const snapshot & meta)
{
return static_cast<uint128_t>(meta.get_last_log_term()) << 64 | meta.get_last_log_idx();
}

// Map key is term << 64 | log
using KeeperSnapshotStoreMap = std::map<uint128_t, ptr<KeeperSnapshotStore>>;

/**
* Snapshots manager who may create, remove snapshots.
Expand Down

0 comments on commit 87c638d

Please sign in to comment.