Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail fast for log error #340

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/Common/Exception.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ std::string Exception::getStackTraceString() const
#endif
}


void throwFromErrno(const std::string & s, int code, int the_errno)
{
throw ErrnoException(s + ", " + errnoToString(code, the_errno), code, the_errno);
Expand Down
7 changes: 7 additions & 0 deletions src/Common/Exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <Poco/Version.h>
#include <Poco/Exception.h>

#include <common/errnoToString.h>
#include <Common/StackTrace.h>

#include <fmt/format.h>
Expand Down Expand Up @@ -139,6 +140,12 @@ class ParsingException : public Exception
using Exceptions = std::vector<std::exception_ptr>;


template <typename ...Args>
void throwFromErrno(int code, const std::string & fmt, Args&&... args)
{
throw ErrnoException(fmt::format(fmt, std::forward<Args>(args)...) + ", " + errnoToString(code, errno), code, errno);
}

[[noreturn]] void throwFromErrno(const std::string & s, int code, int the_errno = errno);
/// Useful to produce some extra information about available space and inodes on device
[[noreturn]] void throwFromErrnoWithPath(const std::string & s, const std::string & path, int code,
Expand Down
3 changes: 2 additions & 1 deletion src/Service/KeeperCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ using ThreadPoolPtr = std::shared_ptr<ThreadPool>;

struct RequestId;

/// Attached session id to request
/// Attached session id and forwarding info to request
struct RequestForSession
{
int64_t session_id;
Expand Down Expand Up @@ -107,4 +107,5 @@ bool isSessionRequest(Coordination::OpNum opnum);
bool isSessionRequest(const Coordination::ZooKeeperRequestPtr & request);

bool isNewSessionRequest(Coordination::OpNum opnum);

}
8 changes: 4 additions & 4 deletions src/Service/KeeperServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ KeeperServer::KeeperServer(
new_session_id_callback_mutex,
new_session_id_callback,
state_manager->load_log_store(),
checkAndGetSuperdigest(settings->super_digest),
checkAndGetSuperDigest(settings->super_digest),
MAX_OBJECT_NODE_SIZE,
request_processor_);

Expand Down Expand Up @@ -122,8 +122,8 @@ void KeeperServer::shutdown()
LOG_WARNING(log, "Failed to shutdown NuRaft core in {}ms", settings->raft_settings->shutdown_timeout);

LOG_INFO(log, "Flush Log store.");
if (state_manager->load_log_store() && !state_manager->load_log_store()->flush())
LOG_WARNING(log, "Log store flush error while server shutdown.");
if (state_manager->load_log_store())
state_manager->load_log_store()->flush();

dynamic_cast<NuRaftFileLogStore &>(*state_manager->load_log_store()).shutdown();
state_machine->shutdown();
Expand All @@ -138,7 +138,7 @@ ptr<nuraft::cmd_result<ptr<buffer>>> KeeperServer::pushRequestBatch(const std::v
for (const auto & request_session : request_batch)
{
LOG_TRACE(log, "Push request {}", request_session.toSimpleString());
entries.push_back(getZooKeeperLogEntry(request_session.session_id, request_session.create_time, request_session.request));
entries.push_back(serializeKeeperRequest(request_session));
}
/// append_entries write request
ptr<nuraft::cmd_result<ptr<buffer>>> result = raft_instance->append_entries(entries);
Expand Down
66 changes: 53 additions & 13 deletions src/Service/KeeperUtils.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include <Service/KeeperUtils.h>

#include <Poco/Base64Encoder.h>
#include <Poco/File.h>
#include <Poco/SHA1Engine.h>

#include <Common/IO/ReadHelpers.h>
#include <Common/IO/WriteHelpers.h>
#include <boost/algorithm/string/split.hpp>

#include <Service/KeeperUtils.h>
#include <Service/formatHex.h>
#include <Service/ReadBufferFromNuRaftBuffer.h>
#include <Service/WriteBufferFromNuraftBuffer.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <ZooKeeper/ZooKeeperIO.h>
Expand All @@ -20,7 +23,7 @@ namespace ErrorCodes
extern const int INVALID_CONFIG_PARAMETER;
}

String checkAndGetSuperdigest(const String & user_and_digest)
String checkAndGetSuperDigest(const String & user_and_digest)
{
if (user_and_digest.empty())
return "";
Expand All @@ -29,25 +32,62 @@ String checkAndGetSuperdigest(const String & user_and_digest)
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; });
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super")
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'");
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect super digest in keeper_server config. Must be 'super:base64string'");

return user_and_digest;
}

nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request)
ptr<buffer> serializeKeeperRequest(const RequestForSession & request)
{
RK::WriteBufferFromNuraftBuffer buf;
RK::writeIntBinary(session_id, buf);
request->write(buf);
Coordination::write(time, buf);
return buf.getBuffer();
WriteBufferFromNuraftBuffer out;
writeIntBinary(request.session_id, out);
request.request->write(out);
Coordination::write(request.create_time, out);
return out.getBuffer();
}

RequestForSession deserializeKeeperRequest(nuraft::buffer & data)
{
ReadBufferFromNuRaftBuffer buffer(data);
RequestForSession request_for_session;
readIntBinary(request_for_session.session_id, buffer);

int32_t length;
Coordination::read(length, buffer);

int32_t xid;
Coordination::read(xid, buffer);

Coordination::OpNum opnum;
Coordination::read(opnum, buffer);

// bool is_internal;
// Coordination::read(is_internal, buffer);

request_for_session.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request_for_session.request->xid = xid;
request_for_session.request->readImpl(buffer);

if (!buffer.eof())
Coordination::read(request_for_session.create_time, buffer);
else /// backward compatibility
request_for_session.create_time
= std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();

return request_for_session;
}

ptr<log_entry> makeClone(const ptr<log_entry> & entry)
ptr<log_entry> cloneLogEntry(const ptr<log_entry> & entry)
{
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type());
return clone;
ptr<log_entry> cloned = cs_new<log_entry>(
entry->get_term(),
buffer::clone(entry->get_buf()),
entry->get_val_type(),
entry->get_timestamp(),
entry->has_crc32(),
entry->get_crc32(),
false);
return cloned;
}

String getBaseName(const String & path)
Expand Down
10 changes: 7 additions & 3 deletions src/Service/KeeperUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
#include <libnuraft/log_entry.hxx>
#include <libnuraft/nuraft.hxx>
#include <common/logger_useful.h>
#include <Service/KeeperCommon.h>


namespace RK
{

nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request);
nuraft::ptr<nuraft::log_entry> makeClone(const nuraft::ptr<nuraft::log_entry> & entry);
/// Serialize and deserialize ZooKeeper request to log
nuraft::ptr<nuraft::buffer> serializeKeeperRequest(const RequestForSession & request);
RequestForSession deserializeKeeperRequest(nuraft::buffer & data);

nuraft::ptr<nuraft::log_entry> cloneLogEntry(const nuraft::ptr<nuraft::log_entry> & entry);

/// Parent of a path, for example: got '/a/b' from '/a/b/c'
String getParentPath(const String & path);
Expand All @@ -24,7 +28,7 @@ String getBaseName(const String & path);
String base64Encode(const String & decoded);
String getSHA1(const String & userdata);
String generateDigest(const String & userdata);
String checkAndGetSuperdigest(const String & user_and_digest);
String checkAndGetSuperDigest(const String & user_and_digest);

inline int readUInt32(nuraft::ptr<std::fstream> & fs, UInt32 & x)
{
Expand Down
14 changes: 7 additions & 7 deletions src/Service/LogEntry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace RK
using nuraft::byte;
using nuraft::cs_new;


ptr<buffer> LogEntryBody::serialize(ptr<log_entry> & entry)
/// Add entry type to the log entry
ptr<buffer> LogEntryBody::serialize(const ptr<log_entry> & entry)
{
ptr<buffer> entry_buf;
ptr<buffer> data = entry->get_buf_ptr();
Expand All @@ -22,13 +22,13 @@ ptr<buffer> LogEntryBody::serialize(ptr<log_entry> & entry)
return entry_buf;
}

ptr<log_entry> LogEntryBody::parse(const char * entry_str, size_t buf_size)
ptr<log_entry> LogEntryBody::deserialize(const ptr<buffer> & serialized_entry)
{
nuraft::log_val_type tp = static_cast<nuraft::log_val_type>(entry_str[0]);
auto data = buffer::alloc(buf_size - 1);
data->put_raw(reinterpret_cast<const byte *>(entry_str + 1), buf_size - 1);
nuraft::log_val_type type = static_cast<nuraft::log_val_type>(*serialized_entry->data_begin());
auto data = buffer::alloc(serialized_entry->size() - 1);
data->put_raw(serialized_entry->data_begin() + 1, serialized_entry->size() - 1);
data->pos(0);
return cs_new<log_entry>(0, data, tp); /// term is set latter
return cs_new<log_entry>(0, data, type); /// term is set latter
}

}
4 changes: 2 additions & 2 deletions src/Service/LogEntry.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ struct LogEntryHeader
class LogEntryBody
{
public:
static ptr<buffer> serialize(ptr<log_entry> & entry);
static ptr<log_entry> parse(const char * entry_str, size_t buf_size);
static ptr<buffer> serialize(const ptr<log_entry> & entry);
static ptr<log_entry> deserialize(const ptr<buffer> & serialized_entry);
};

}
Loading
Loading