-
Notifications
You must be signed in to change notification settings - Fork 37
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix request processor endless loop (#90)
* Fix request processor endless loop * Fix typo * Use list for errors
- Loading branch information
Showing
31 changed files
with
745 additions
and
591 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,55 +1,69 @@ | ||
#include <Service/KeeperCommon.h> | ||
#include <Service/WriteBufferFromNuraftBuffer.h> | ||
#include <ZooKeeper/ZooKeeperCommon.h> | ||
#include <ZooKeeper/ZooKeeperIO.h> | ||
#include <boost/algorithm/string/split.hpp> | ||
#include <Poco/File.h> | ||
#include <Common/IO/WriteHelpers.h> | ||
|
||
using namespace nuraft; | ||
#include <Service/formatHex.h> | ||
|
||
namespace RK | ||
{ | ||
|
||
namespace ErrorCodes | ||
String ErrorRequest::toString() const | ||
{ | ||
extern const int INVALID_CONFIG_PARAMETER; | ||
return fmt::format( | ||
"[session_id:{}, xid:{}, opnum:{}, accepted:{}, error_code:{}]", | ||
toHexString(session_id), | ||
xid, | ||
Coordination::toString(opnum), | ||
accepted, | ||
error_code); | ||
} | ||
|
||
int Directory::createDir(const std::string & dir) | ||
RequestId ErrorRequest::getRequestId() const | ||
{ | ||
Poco::File(dir).createDirectories(); | ||
return 0; | ||
return {session_id, xid}; | ||
} | ||
|
||
std::string checkAndGetSuperdigest(const String & user_and_digest) | ||
String RequestId::toString() const | ||
{ | ||
if (user_and_digest.empty()) | ||
return ""; | ||
return fmt::format("[session_id:{}, xid:{}]", toHexString(session_id), xid); | ||
} | ||
|
||
std::vector<std::string> scheme_and_id; | ||
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; }); | ||
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super") | ||
throw Exception( | ||
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'"); | ||
bool RequestId::operator==(const RequestId & other) const | ||
{ | ||
return session_id == other.session_id && xid == other.xid; | ||
} | ||
|
||
return user_and_digest; | ||
std::size_t RequestId::RequestIdHash::operator()(const RequestId & request_id) const | ||
{ | ||
std::size_t seed = 0; | ||
std::hash<int64_t> hash64; | ||
std::hash<int32_t> hash32; | ||
|
||
seed ^= hash64(request_id.session_id); | ||
seed ^= hash32(request_id.xid); | ||
|
||
return seed; | ||
} | ||
|
||
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request) | ||
String RequestForSession::toString() const | ||
{ | ||
RK::WriteBufferFromNuraftBuffer buf; | ||
RK::writeIntBinary(session_id, buf); | ||
request->write(buf); | ||
Coordination::write(time, buf); | ||
return buf.getBuffer(); | ||
return fmt::format( | ||
"[session_id: {}, xid:{}, opnum:{}, create_time:{}, server_id:{}, client_id:{}, request:{}]", | ||
toHexString(session_id), | ||
request->xid, | ||
Coordination::toString(request->getOpNum()), | ||
create_time, | ||
server_id, | ||
client_id, | ||
request->toString()); | ||
} | ||
|
||
String RequestForSession::toSimpleString() const | ||
{ | ||
return fmt::format( | ||
"[session_id:{}, xid:{}, opnum:{}]", toHexString(session_id), request->xid, Coordination::toString(request->getOpNum())); | ||
} | ||
|
||
ptr<log_entry> makeClone(const ptr<log_entry> & entry) | ||
RequestId RequestForSession::getRequestId() const | ||
{ | ||
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type()); | ||
return clone; | ||
return {session_id, request->xid}; | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,91 +1,80 @@ | ||
#pragma once | ||
|
||
#include <fstream> | ||
#include <time.h> | ||
#include <Service/Crc32.h> | ||
#include <ZooKeeper/IKeeper.h> | ||
#include <ZooKeeper/ZooKeeperCommon.h> | ||
#include <libnuraft/log_entry.hxx> | ||
#include <libnuraft/nuraft.hxx> | ||
#include <common/logger_useful.h> | ||
|
||
|
||
namespace RK | ||
{ | ||
|
||
std::string checkAndGetSuperdigest(const String & user_and_digest); | ||
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request); | ||
nuraft::ptr<nuraft::log_entry> makeClone(const nuraft::ptr<nuraft::log_entry> & entry); | ||
struct RequestId; | ||
|
||
struct BackendTimer | ||
/// Attached session id to request | ||
struct RequestForSession | ||
{ | ||
static constexpr char TIME_FMT[] = "%Y%m%d%H%M%S"; | ||
int64_t session_id; | ||
Coordination::ZooKeeperRequestPtr request; | ||
|
||
/// default min interval is 1 hour | ||
UInt32 interval = 1 * 3600; | ||
UInt32 random_window = 1200; //20 minutes | ||
/// measured in millisecond | ||
int64_t create_time{}; | ||
|
||
static void getCurrentTime(std::string & date_str) | ||
{ | ||
time_t curr_time; | ||
time(&curr_time); | ||
char tmp_buf[24]; | ||
std::strftime(tmp_buf, sizeof(tmp_buf), TIME_FMT, localtime(&curr_time)); | ||
date_str = tmp_buf; | ||
} | ||
/// for forward request | ||
int32_t server_id{-1}; | ||
int32_t client_id{-1}; | ||
|
||
static time_t parseTime(const std::string & date_str) | ||
{ | ||
struct tm prev_tm; | ||
memset(&prev_tm, 0, sizeof(tm)); | ||
strptime(date_str.data(), TIME_FMT, &prev_tm); | ||
time_t prev_time = mktime(&prev_tm); | ||
return prev_time; | ||
} | ||
// /// RaftKeeper can generate request, for example: sessionCleanerTask | ||
// bool is_internal{false}; | ||
|
||
explicit RequestForSession() = default; | ||
|
||
bool isActionTime(const time_t & prev_time, time_t curr_time) const | ||
RequestForSession(Coordination::ZooKeeperRequestPtr request_, int64_t session_id_, int64_t create_time_) | ||
: session_id(session_id_), request(request_), create_time(create_time_) | ||
{ | ||
if (curr_time == 0L) | ||
time(&curr_time); | ||
return difftime(curr_time, prev_time) >= (interval + rand() % random_window); | ||
} | ||
}; | ||
|
||
bool isForwardRequest() const { return server_id > -1 && client_id > -1; } | ||
RequestId getRequestId() const; | ||
|
||
String toString() const; | ||
String toSimpleString() const; | ||
|
||
class Directory | ||
{ | ||
public: | ||
static int createDir(const std::string & path); | ||
}; | ||
|
||
inline int readUInt32(nuraft::ptr<std::fstream> & fs, UInt32 & x) | ||
/// Attached session id to response | ||
struct ResponseForSession | ||
{ | ||
errno = 0; | ||
char * buf = reinterpret_cast<char *>(&x); | ||
fs->read(buf, sizeof(UInt32)); | ||
return fs->good() ? 0 : -1; | ||
} | ||
int64_t session_id; | ||
Coordination::ZooKeeperResponsePtr response; | ||
}; | ||
|
||
inline int writeUInt32(nuraft::ptr<std::fstream> & fs, const UInt32 & x) | ||
/// Global client request id. | ||
struct RequestId | ||
{ | ||
errno = 0; | ||
fs->write(reinterpret_cast<const char *>(&x), sizeof(UInt32)); | ||
return fs->good() ? 0 : -1; | ||
} | ||
int64_t session_id; | ||
Coordination::XID xid; | ||
|
||
inline int readUInt64(nuraft::ptr<std::fstream> & fs, UInt64 & x) | ||
{ | ||
errno = 0; | ||
char * buf = reinterpret_cast<char *>(&x); | ||
fs->read(buf, sizeof(UInt64)); | ||
return fs->good() ? 0 : -1; | ||
} | ||
String toString() const; | ||
bool operator==(const RequestId & other) const; | ||
|
||
inline int writeUInt64(nuraft::ptr<std::fstream> & fs, const UInt64 & x) | ||
struct RequestIdHash | ||
{ | ||
std::size_t operator()(const RequestId & request_id) const; | ||
}; | ||
}; | ||
|
||
/// Simple error request info. | ||
struct ErrorRequest | ||
{ | ||
errno = 0; | ||
fs->write(reinterpret_cast<const char *>(&x), sizeof(UInt64)); | ||
return fs->good() ? 0 : -1; | ||
} | ||
bool accepted; | ||
nuraft::cmd_result_code error_code; /// TODO new error code instead of NuRaft error code | ||
int64_t session_id; | ||
Coordination::XID xid; | ||
Coordination::OpNum opnum; | ||
|
||
String toString() const; | ||
RequestId getRequestId() const; | ||
}; | ||
|
||
using ErrorRequests = std::list<ErrorRequest>; | ||
|
||
} |
Oops, something went wrong.