Skip to content

Commit

Permalink
Add crc info when cloning log entry
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Aug 19, 2024
1 parent c0b702d commit 25857d3
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 65 deletions.
3 changes: 2 additions & 1 deletion src/Service/KeeperCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using ThreadPoolPtr = std::shared_ptr<ThreadPool>;

struct RequestId;

/// Attached session id to request
/// Attached session id and forwarding info to request
struct RequestForSession
{
int64_t session_id;
Expand Down Expand Up @@ -107,4 +107,5 @@ bool isSessionRequest(Coordination::OpNum opnum);
bool isSessionRequest(const Coordination::ZooKeeperRequestPtr & request);

bool isNewSessionRequest(Coordination::OpNum opnum);

}
2 changes: 1 addition & 1 deletion src/Service/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ KeeperServer::KeeperServer(
new_session_id_callback_mutex,
new_session_id_callback,
state_manager->load_log_store(),
checkAndGetSuperdigest(settings->super_digest),
checkAndGetSuperDigest(settings->super_digest),
MAX_OBJECT_NODE_SIZE,
request_processor_);

Expand Down
36 changes: 16 additions & 20 deletions src/Service/KeeperUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace ErrorCodes
extern const int INVALID_CONFIG_PARAMETER;
}

String checkAndGetSuperdigest(const String & user_and_digest)
String checkAndGetSuperDigest(const String & user_and_digest)
{
if (user_and_digest.empty())
return "";
Expand All @@ -32,26 +32,24 @@ String checkAndGetSuperdigest(const String & user_and_digest)
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; });
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super")
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'");
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect super digest in keeper_server config. Must be 'super:base64string'");

return user_and_digest;
}

ptr<buffer> serializeKeeperRequest(const RequestForSession & session_request)
ptr<buffer> serializeKeeperRequest(const RequestForSession & request)
{
WriteBufferFromNuraftBuffer out;
/// TODO unify digital encoding mode, see deserializeKeeperRequest
writeIntBinary(session_request.session_id, out);
session_request.request->write(out);
Coordination::write(session_request.create_time, out);
writeIntBinary(request.session_id, out);
request.request->write(out);
Coordination::write(request.create_time, out);
return out.getBuffer();
}

RequestForSession deserializeKeeperRequest(nuraft::buffer & data)
{
ReadBufferFromNuRaftBuffer buffer(data);
RequestForSession request_for_session;
/// TODO unify digital encoding mode
readIntBinary(request_for_session.session_id, buffer);

int32_t length;
Expand All @@ -76,22 +74,20 @@ RequestForSession deserializeKeeperRequest(nuraft::buffer & data)
request_for_session.create_time
= std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();

auto * log = &(Poco::Logger::get("NuRaftStateMachine"));
LOG_TRACE(
log,
"Parsed request session id {}, length {}, xid {}, opnum {}",
toHexString(request_for_session.session_id),
length,
xid,
Coordination::toString(opnum));

return request_for_session;
}

ptr<log_entry> makeClone(const ptr<log_entry> & entry)
ptr<log_entry> cloneLogEntry(const ptr<log_entry> & entry)
{
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type());
return clone;
ptr<log_entry> cloned = cs_new<log_entry>(
entry->get_term(),
buffer::clone(entry->get_buf()),
entry->get_val_type(),
entry->get_timestamp(),
entry->has_crc32(),
entry->get_crc32(),
false);
return cloned;
}

String getBaseName(const String & path)
Expand Down
4 changes: 2 additions & 2 deletions src/Service/KeeperUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace RK
nuraft::ptr<nuraft::buffer> serializeKeeperRequest(const RequestForSession & request);
RequestForSession deserializeKeeperRequest(nuraft::buffer & data);

nuraft::ptr<nuraft::log_entry> makeClone(const nuraft::ptr<nuraft::log_entry> & entry);
nuraft::ptr<nuraft::log_entry> cloneLogEntry(const nuraft::ptr<nuraft::log_entry> & entry);

/// Parent of a path, for example: got '/a/b' from '/a/b/c'
String getParentPath(const String & path);
Expand All @@ -28,7 +28,7 @@ String getBaseName(const String & path);
String base64Encode(const String & decoded);
String getSHA1(const String & userdata);
String generateDigest(const String & userdata);
String checkAndGetSuperdigest(const String & user_and_digest);
String checkAndGetSuperDigest(const String & user_and_digest);

inline int readUInt32(nuraft::ptr<std::fstream> & fs, UInt32 & x)
{
Expand Down
4 changes: 2 additions & 2 deletions src/Service/LogEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ using nuraft::byte;
using nuraft::cs_new;

/// Add entry type to the log entry
ptr<buffer> LogEntryBody::serialize(ptr<log_entry> & entry)
ptr<buffer> LogEntryBody::serialize(const ptr<log_entry> & entry)
{
ptr<buffer> entry_buf;
ptr<buffer> data = entry->get_buf_ptr();
Expand All @@ -22,7 +22,7 @@ ptr<buffer> LogEntryBody::serialize(ptr<log_entry> & entry)
return entry_buf;
}

ptr<log_entry> LogEntryBody::deserialize(ptr<buffer> serialized_entry)
ptr<log_entry> LogEntryBody::deserialize(const ptr<buffer> & serialized_entry)
{
nuraft::log_val_type type = static_cast<nuraft::log_val_type>(*serialized_entry->data_begin());
auto data = buffer::alloc(serialized_entry->size() - 1);
Expand Down
4 changes: 2 additions & 2 deletions src/Service/LogEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ struct LogEntryHeader
class LogEntryBody
{
public:
static ptr<buffer> serialize(ptr<log_entry> & entry);
static ptr<log_entry> deserialize(ptr<buffer> serialized_entry);
static ptr<buffer> serialize(const ptr<log_entry> & entry);
static ptr<log_entry> deserialize(const ptr<buffer> & serialized_entry);
};

}
39 changes: 11 additions & 28 deletions src/Service/NuRaftFileLogStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ ptr<log_entry> LogEntryQueue::getEntry(const UInt64 & index)
return nullptr;
}

void LogEntryQueue::putEntry(UInt64 & index, ptr<log_entry> & entry)
void LogEntryQueue::putEntry(UInt64 & index, const ptr<log_entry> & entry)
{
LOG_TRACE(log, "put entry {}, index {}, batch {}", index, index & (MAX_VECTOR_SIZE - 1), batch_index);
std::lock_guard write_lock(queue_mutex);
Expand All @@ -32,19 +32,6 @@ void LogEntryQueue::putEntry(UInt64 & index, ptr<log_entry> & entry)
max_index = std::max(max_index, index);
}

[[maybe_unused]] void LogEntryQueue::putEntryOrClear(UInt64 & index, ptr<log_entry> & entry)
{
std::lock_guard write_lock(queue_mutex);
if (index >> BIT_SIZE == batch_index || index >> BIT_SIZE == batch_index - 1)
{
entry_vec[index & (MAX_VECTOR_SIZE - 1)] = entry;
max_index = index;
return;
}
/// next cycle
clear();
}

void LogEntryQueue::clear()
{
LOG_INFO(log, "clear log queue.");
Expand Down Expand Up @@ -137,18 +124,17 @@ ulong NuRaftFileLogStore::start_index() const
ptr<log_entry> NuRaftFileLogStore::last_entry() const
{
if (last_log_entry)
return makeClone(last_log_entry);
else
return nullptr;
return cloneLogEntry(last_log_entry);
return cs_new<log_entry>(0, nuraft::buffer::alloc(0));
}

ulong NuRaftFileLogStore::append(ptr<log_entry> & entry)
{
ptr<log_entry> clone = makeClone(entry);
ptr<log_entry> cloned = cloneLogEntry(entry);
UInt64 log_index = segment_store->appendEntry(entry);
log_queue.putEntry(log_index, clone);
log_queue.putEntry(log_index, cloned);

last_log_entry = clone;
last_log_entry = cloned;

if (log_fsync_mode == FsyncMode::FSYNC_PARALLEL && entry->get_val_type() != log_val_type::app_log)
parallel_fsync_event->set();
Expand Down Expand Up @@ -209,15 +195,14 @@ ptr<std::vector<ptr<log_entry>>> NuRaftFileLogStore::log_entries_ext(ulong start
ptr<std::vector<ptr<log_entry>>> ret = cs_new<std::vector<ptr<log_entry>>>();

int64 get_size = 0;
int64 entry_size;

for (auto i = start; i < end; i++)
{
auto entry = entry_at(i);
if (!entry)
return nullptr;

entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char);
int64_t entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char);

if (batch_size_hint_in_bytes > 0 && get_size + entry_size > batch_size_hint_in_bytes)
break;
Expand All @@ -234,15 +219,14 @@ ptr<std::vector<LogEntryWithVersion>> NuRaftFileLogStore::log_entries_version_ex
ptr<std::vector<LogEntryWithVersion>> ret = cs_new<std::vector<LogEntryWithVersion>>();

int64 get_size = 0;
int64 entry_size;

for (auto i = start; i < end; i++)
{
auto entry = entry_at(i);
if (!entry)
return nullptr;

entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char);
int64 entry_size = entry->get_buf().size() + sizeof(ulong) + sizeof(char);

if (batch_size_hint_in_bytes > 0 && get_size + entry_size > batch_size_hint_in_bytes)
break;
Expand All @@ -256,7 +240,7 @@ ptr<std::vector<LogEntryWithVersion>> NuRaftFileLogStore::log_entries_version_ex

ptr<log_entry> NuRaftFileLogStore::entry_at(ulong index)
{
ptr<nuraft::log_entry> res = log_queue.getEntry(index);
auto res = log_queue.getEntry(index);
if (res)
{
LOG_TRACE(log, "Get log {} from queue", index);
Expand All @@ -266,15 +250,14 @@ ptr<log_entry> NuRaftFileLogStore::entry_at(ulong index)
LOG_TRACE(log, "Get log {} from disk", index);
res = segment_store->getEntry(index);
}
return res ? makeClone(res) : nullptr;
return res ? cloneLogEntry(res) : nullptr;
}

ulong NuRaftFileLogStore::term_at(ulong index)
{
if (entry_at(index))
return entry_at(index)->get_term();
else
return 0;
return 0;
}

ptr<buffer> NuRaftFileLogStore::pack(ulong index, int32 cnt)
Expand Down
4 changes: 1 addition & 3 deletions src/Service/NuRaftFileLogStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ class LogEntryQueue
ptr<log_entry> getEntry(const UInt64 & index);

/// put log into the queue
void putEntry(UInt64 & index, ptr<log_entry> & entry);

[[maybe_unused]] void putEntryOrClear(UInt64 & index, ptr<log_entry> & entry);
void putEntry(UInt64 & index, const ptr<log_entry> & entry);

/// clean all log
void clear();
Expand Down
6 changes: 3 additions & 3 deletions src/Service/NuRaftLogSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ void NuRaftLogSegment::remove()
f.remove();
}

UInt64 NuRaftLogSegment::appendEntry(ptr<log_entry> entry, std::atomic<UInt64> & last_log_index)
UInt64 NuRaftLogSegment::appendEntry(const ptr<log_entry> & entry, std::atomic<UInt64> & last_log_index)
{
LogEntryHeader header;
struct iovec vec[2];
Expand Down Expand Up @@ -663,14 +663,14 @@ LogVersion LogSegmentStore::getVersion(UInt64 index)
return seg->getVersion();
}

UInt64 LogSegmentStore::appendEntry(ptr<log_entry> entry)
UInt64 LogSegmentStore::appendEntry(const ptr<log_entry> & entry)
{
openNewSegmentIfNeeded();
std::shared_lock read_lock(seg_mutex);
return open_segment->appendEntry(entry, last_log_index);
}

UInt64 LogSegmentStore::writeAt(UInt64 index, ptr<log_entry> entry)
UInt64 LogSegmentStore::writeAt(UInt64 index, const ptr<log_entry> & entry)
{
truncateLog(index - 1);
if (index == lastLogIndex() + 1)
Expand Down
6 changes: 3 additions & 3 deletions src/Service/NuRaftLogSegment.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class NuRaftLogSegment
LogVersion getVersion() const { return version; }

/// serialize entry, and append to open segment, return appended log index
UInt64 appendEntry(ptr<log_entry> entry, std::atomic<UInt64> & last_log_index);
UInt64 appendEntry(const ptr<log_entry> & entry, std::atomic<UInt64> & last_log_index);

/// get entry by index, return null if not exist.
ptr<log_entry> getEntry(UInt64 index);
Expand Down Expand Up @@ -219,10 +219,10 @@ class LogSegmentStore
UInt64 lastLogIndex() { return last_log_index.load(std::memory_order_acquire); }

/// Append entry to log store
UInt64 appendEntry(ptr<log_entry> entry);
UInt64 appendEntry(const ptr<log_entry> & entry);

/// First truncate log whose index is large than or equals with index of entry, then append it.
UInt64 writeAt(UInt64 index, ptr<log_entry> entry);
UInt64 writeAt(UInt64 index, const ptr<log_entry> & entry);
ptr<log_entry> getEntry(UInt64 index);

/// Just for test, collection entries in [start_index, end_index]
Expand Down

0 comments on commit 25857d3

Please sign in to comment.