diff --git a/src/Service/NuRaftLogSnapshot.cpp b/src/Service/NuRaftLogSnapshot.cpp index 386de4c3b1..bc7e6c58c0 100644 --- a/src/Service/NuRaftLogSnapshot.cpp +++ b/src/Service/NuRaftLogSnapshot.cpp @@ -1,9 +1,12 @@ #include +#include #include #include #include #include +#include + #include #include #include @@ -40,10 +43,8 @@ using Poco::NumberFormatter; void KeeperSnapshotStore::getObjectPath(ulong object_id, String & obj_path) { - char path_buf[1024]; - snprintf(path_buf, 1024, SNAPSHOT_FILE_NAME, curr_time.c_str(), last_log_index, object_id); - obj_path = path_buf; - obj_path = snap_dir + "/" + obj_path; + SnapObject s_obj(curr_time.c_str(), last_log_term, last_log_index, object_id); + obj_path = snap_dir + "/" + s_obj.getObjectName(); } size_t KeeperSnapshotStore::getObjectIdx(const String & file_name) @@ -757,7 +758,7 @@ size_t KeeperSnapshotManager::createSnapshotAsync(SnapTask & snap_task, Snapshot snap_task.next_session_id, snap_task.next_zxid); size_t obj_size = snap_store->createObjectsAsync(snap_task); - snapshots[meta->get_last_log_idx()] = snap_store; + snapshots[getSnapshotStoreMapKey(*meta)] = snap_store; return obj_size; } @@ -781,7 +782,7 @@ size_t KeeperSnapshotManager::createSnapshot( next_session_id, next_zxid); size_t obj_size = snap_store->createObjects(store, next_zxid, next_session_id); - snapshots[meta.get_last_log_idx()] = snap_store; + snapshots[getSnapshotStoreMapKey(meta)] = snap_store; return obj_size; } @@ -789,18 +790,18 @@ bool KeeperSnapshotManager::receiveSnapshotMeta(snapshot & meta) { ptr snap_store = cs_new(snap_dir, meta, object_node_size); snap_store->init(); - snapshots[meta.get_last_log_idx()] = snap_store; + snapshots[getSnapshotStoreMapKey(meta)] = snap_store; return true; } bool KeeperSnapshotManager::existSnapshot(const snapshot & meta) { - return snapshots.find(meta.get_last_log_idx()) != snapshots.end(); + return snapshots.find(getSnapshotStoreMapKey(meta)) != snapshots.end(); } bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj_id) { - auto it = snapshots.find(meta.get_last_log_idx()); + auto it = snapshots.find(getSnapshotStoreMapKey(meta)); if (it == snapshots.end()) { LOG_INFO(log, "Not exists snapshot last_log_idx {}", meta.get_last_log_idx()); @@ -814,7 +815,7 @@ bool KeeperSnapshotManager::existSnapshotObject(const snapshot & meta, ulong obj bool KeeperSnapshotManager::loadSnapshotObject(const snapshot & meta, ulong obj_id, ptr & buffer) { - auto it = snapshots.find(meta.get_last_log_idx()); + auto it = snapshots.find(getSnapshotStoreMapKey(meta)); if (it == snapshots.end()) throw Exception( @@ -831,14 +832,14 @@ bool KeeperSnapshotManager::loadSnapshotObject(const snapshot & meta, ulong obj_ bool KeeperSnapshotManager::saveSnapshotObject(snapshot & meta, ulong obj_id, buffer & buffer) { - auto it = snapshots.find(meta.get_last_log_idx()); + auto it = snapshots.find(getSnapshotStoreMapKey(meta)); ptr store; if (it == snapshots.end()) { meta.set_size(0); store = cs_new(snap_dir, meta); store->init(); - snapshots[meta.get_last_log_idx()] = store; + snapshots[getSnapshotStoreMapKey(meta)] = store; } else { @@ -850,7 +851,7 @@ bool KeeperSnapshotManager::saveSnapshotObject(snapshot & meta, ulong obj_id, bu bool KeeperSnapshotManager::parseSnapshot(const snapshot & meta, KeeperStore & storage) { - auto it = snapshots.find(meta.get_last_log_idx()); + auto it = snapshots.find(getSnapshotStoreMapKey(meta)); if (it == snapshots.end()) { throw Exception(ErrorCodes::SNAPSHOT_NOT_EXISTS, "Error when parsing snapshot {}, for it does not exist", meta.get_last_log_idx()); @@ -869,31 +870,35 @@ size_t KeeperSnapshotManager::loadSnapshotMetas() std::vector file_vec; file_dir.list(file_vec); - char time_str[128]; - - unsigned long log_last_index; - unsigned long object_id; for (const auto & file : file_vec) { if (file.find("snapshot_") == file.npos) { - LOG_INFO(log, "Skip non-snapshot file {}", file); + LOG_WARNING(log, "Skip non-snapshot file {}", file); continue; } - sscanf(file.c_str(), "snapshot_%[^_]_%lu_%lu", time_str, &log_last_index, &object_id); - if (snapshots.find(log_last_index) == snapshots.end()) + SnapObject s_obj; + if (!s_obj.parseInfoFromObjectName(file)) { - ptr config = cs_new(log_last_index, log_last_index - 1); - nuraft::snapshot meta(log_last_index, 1, config); + LOG_ERROR(log, "Can't parse object info from file name {}", file); + continue; + } + + auto key = getSnapshotStoreMapKey(s_obj); + + if (snapshots.find(key) == snapshots.end()) + { + ptr config = cs_new(s_obj.log_last_index, s_obj.log_last_index - 1); + nuraft::snapshot meta(s_obj.log_last_index, s_obj.log_last_term, config); ptr snap_store = cs_new(snap_dir, meta, object_node_size); - snap_store->init(time_str); - snapshots[meta.get_last_log_idx()] = snap_store; - LOG_INFO(log, "Load filename {}, time {}, index {}, object id {}", file, time_str, log_last_index, object_id); + snap_store->init(s_obj.create_time); + snapshots[key] = snap_store; } String full_path = snap_dir + "/" + file; - snapshots[log_last_index]->addObjectPath(object_id, full_path); + LOG_INFO(log, "Load filename {}, term {}, index {}, object id {}", file, s_obj.log_last_term, s_obj.log_last_index, s_obj.object_id); + snapshots[key]->addObjectPath(s_obj.object_id, full_path); } LOG_INFO(log, "Load snapshot metas {} from snapshot directory {}", snapshots.size(), snap_dir); return snapshots.size(); @@ -911,10 +916,6 @@ ptr KeeperSnapshotManager::lastSnapshot() size_t KeeperSnapshotManager::removeSnapshots() { Int64 remove_count = static_cast(snapshots.size()) - static_cast(keep_max_snapshot_count); - char time_str[128]; - - unsigned long log_last_index; - unsigned long object_id; while (remove_count > 0) { @@ -932,8 +933,13 @@ size_t KeeperSnapshotManager::removeSnapshots() LOG_INFO(log, "Skip no snapshot file {}", file); continue; } - sscanf(file.c_str(), "snapshot_%[^_]_%lu_%lu", time_str, &log_last_index, &object_id); - if (remove_log_index == log_last_index) + SnapObject s_obj; + if (!s_obj.parseInfoFromObjectName(file)) + { + LOG_ERROR(log, "Can't parse object info from file name {}", file); + continue; + } + if (remove_log_index == s_obj.log_last_index) { LOG_INFO( log, diff --git a/src/Service/NuRaftLogSnapshot.h b/src/Service/NuRaftLogSnapshot.h index 1f46a1c289..f4b6901cbf 100644 --- a/src/Service/NuRaftLogSnapshot.h +++ b/src/Service/NuRaftLogSnapshot.h @@ -3,6 +3,7 @@ #include #include #include +#include namespace RK @@ -49,6 +50,70 @@ struct SnapTask } }; +struct SnapObject +{ + /// create_time, last_log_index, object_id + static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_{}_{}_{}"; + /// create_time, last_log_term, last_log_index, object_id + static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_{}_{}_{}_{}"; + + String create_time; + UInt64 log_last_term; + UInt64 log_last_index; + UInt64 object_id; + + SnapObject(String _create_time = "", UInt64 _log_last_term = 1, UInt64 _log_last_index = 1, UInt64 _object_id = 1) + :create_time(_create_time), log_last_term(_log_last_term), log_last_index(_log_last_index), object_id(_object_id) + { + } + + String getObjectName() + { + return fmt::format(SNAPSHOT_FILE_NAME_V1, create_time, log_last_term, log_last_index, object_id); + } + + bool parseInfoFromObjectName(const String & object_name) + { + auto tryReadUint64Text = [] (const String & str, UInt64 & num) + { + auto [_, ec] = std::from_chars(str.data(), str.data() + str.size(), num); + return ec == std::errc(); + }; + + Strings tokens; + + splitInto<'_'>(tokens, object_name); + + if (tokens.size() == 4) + { + if (!tryReadUint64Text(tokens[2], log_last_index)) + return false; + + if (!tryReadUint64Text(tokens[3], object_id)) + return false; + } + else if (tokens.size() == 5) + { + if (!tryReadUint64Text(tokens[2], log_last_term)) + return false; + + if (!tryReadUint64Text(tokens[3], log_last_index)) + return false; + + if (!tryReadUint64Text(tokens[4], object_id)) + return false; + } + else + { + return false; + } + + create_time = std::move(tokens[1]); + + return true; + } +}; + /** * Operate a snapshot, when the current snapshot is down, we should renew a store. * @@ -121,18 +186,6 @@ class KeeperSnapshotStore /// parse object id from file name static size_t getObjectIdx(const String & file_name); -#ifdef __APPLE__ - /// create_time, last_log_index, object_id - static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_%s_%llu_%llu"; - /// create_time, last_log_index, last_log_term, object_id - static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_%s_%llu_%llu_%llu"; -#else - /// create_time, last_log_index, object_id - static constexpr char SNAPSHOT_FILE_NAME[] = "snapshot_%s_%lu_%lu"; - /// create_time, last_log_index, last_log_term, object_id - static constexpr char SNAPSHOT_FILE_NAME_V1[] = "snapshot_%s_%lu_%lu_%lu"; -#endif - static constexpr int SNAPSHOT_THREAD_NUM = 8; static constexpr int IO_BUFFER_SIZE = 16384; /// 16K @@ -216,7 +269,24 @@ class KeeperSnapshotStore std::shared_ptr snapshot_thread; }; -using KeeperSnapshotStoreMap = std::map>; +// In Raft, each log entry can be uniquely identified by the combination of its Log Index and Term. +inline uint128_t getSnapshotStoreMapKeyImpl(UInt64 log_term, UInt64 log_idx) +{ + return static_cast(log_term) << 64 | log_idx; +} + +inline uint128_t getSnapshotStoreMapKey(const snapshot & meta) +{ + return getSnapshotStoreMapKeyImpl(meta.get_last_log_term(), meta.get_last_log_idx()); +} + +inline uint128_t getSnapshotStoreMapKey(const SnapObject & obj) +{ + return getSnapshotStoreMapKeyImpl(obj.log_last_term, obj.log_last_index); +} + +// Map key is term << 64 | log +using KeeperSnapshotStoreMap = std::map>; /** * Snapshots manager who may create, remove snapshots. diff --git a/src/Service/SnapshotCommon.cpp b/src/Service/SnapshotCommon.cpp index 7f9265d17d..f753e8dbbd 100644 --- a/src/Service/SnapshotCommon.cpp +++ b/src/Service/SnapshotCommon.cpp @@ -34,8 +34,6 @@ String toString(SnapshotVersion version) return "v1"; case SnapshotVersion::V2: return "v2"; - case SnapshotVersion::V3: - return "v3"; case SnapshotVersion::None: return "none"; } diff --git a/src/Service/SnapshotCommon.h b/src/Service/SnapshotCommon.h index b96886b152..6e80f325e4 100644 --- a/src/Service/SnapshotCommon.h +++ b/src/Service/SnapshotCommon.h @@ -44,7 +44,6 @@ enum SnapshotVersion : uint8_t V0 = 0, V1 = 1, /// Add ACL map V2 = 2, /// Replace protobuf - V3 = 3, /// Add last_log_term to file name None = 255, };