Skip to content

Commit

Permalink
Fix request processor endless loop
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Oct 10, 2023
1 parent 21efc22 commit 8ebc875
Show file tree
Hide file tree
Showing 31 changed files with 727 additions and 583 deletions.
1 change: 1 addition & 0 deletions src/Service/ConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ void ConnectionHandler::sendResponse(const Coordination::ZooKeeperResponsePtr &
/// TODO should invoked after response sent to client.
updateStats(response);

/// We do not need send anything for close request to client.
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
responses->push(ptr<FIFOBuffer>());
Expand Down
19 changes: 12 additions & 7 deletions src/Service/ForwardRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ForwardResponsePtr ForwardHandshakeRequest::makeResponse() const
return std::make_shared<ForwardHandshakeResponse>();
}

KeeperStore::RequestForSession ForwardHandshakeRequest::requestForSession() const
RequestForSession ForwardHandshakeRequest::requestForSession() const
{
RequestForSession reuqest_info;
return reuqest_info;
Expand All @@ -62,10 +62,10 @@ ForwardResponsePtr ForwardSessionRequest::makeResponse() const
return std::make_shared<ForwardSessionResponse>();
}

KeeperStore::RequestForSession ForwardSessionRequest::requestForSession() const
RequestForSession ForwardSessionRequest::requestForSession() const
{
RequestForSession reuqest_info;
return reuqest_info;
RequestForSession request_info;
return request_info;
}

void ForwardGetSessionRequest::readImpl(ReadBuffer & buf)
Expand Down Expand Up @@ -100,7 +100,7 @@ ForwardResponsePtr ForwardGetSessionRequest::makeResponse() const
return res;
}

KeeperStore::RequestForSession ForwardGetSessionRequest::requestForSession() const
RequestForSession ForwardGetSessionRequest::requestForSession() const
{
RequestForSession reuqest_info;
reuqest_info.request = request;
Expand Down Expand Up @@ -141,7 +141,7 @@ ForwardResponsePtr ForwardUpdateSessionRequest::makeResponse() const
return res;
}

KeeperStore::RequestForSession ForwardUpdateSessionRequest::requestForSession() const
RequestForSession ForwardUpdateSessionRequest::requestForSession() const
{
RequestForSession reuqest_info;
reuqest_info.request = request;
Expand All @@ -162,6 +162,10 @@ void ForwardOpRequest::readImpl(ReadBuffer & buf)
request.request = Coordination::ZooKeeperRequestFactory::instance().get(opnum);
request.request->xid = xid;
request.request->readImpl(buf);

// bool is_internal;
// Coordination::read(is_internal, buf);
// request.is_internal = is_internal;
}

void ForwardOpRequest::writeImpl(WriteBuffer & buf) const
Expand All @@ -172,6 +176,7 @@ void ForwardOpRequest::writeImpl(WriteBuffer & buf) const
Coordination::write(request.request->getOpNum(), out_buf);
request.request->writeImpl(out_buf);
Coordination::write(out_buf.str(), buf);
// Coordination::write(request.is_internal, buf);
}


Expand All @@ -186,7 +191,7 @@ ForwardResponsePtr ForwardOpRequest::makeResponse() const
return res;
}

KeeperStore::RequestForSession ForwardOpRequest::requestForSession() const
RequestForSession ForwardOpRequest::requestForSession() const
{
return request;
}
Expand Down
16 changes: 8 additions & 8 deletions src/Service/ForwardRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct ForwardRequest

virtual ForwardResponsePtr makeResponse() const = 0;

virtual KeeperStore::RequestForSession requestForSession() const = 0;
virtual RequestForSession requestForSession() const = 0;

virtual String toString() const = 0;

Expand All @@ -51,7 +51,7 @@ struct ForwardHandshakeRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -78,7 +78,7 @@ struct ForwardSessionRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -99,7 +99,7 @@ struct ForwardGetSessionRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -120,7 +120,7 @@ struct ForwardUpdateSessionRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -132,7 +132,7 @@ struct ForwardUpdateSessionRequest : public ForwardRequest

struct ForwardOpRequest : public ForwardRequest
{
KeeperStore::RequestForSession request;
RequestForSession request;

ForwardType forwardType() const override { return ForwardType::Operation; }

Expand All @@ -142,7 +142,7 @@ struct ForwardOpRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -162,7 +162,7 @@ class ForwardRequestFactory final : private boost::noncopyable

ForwardRequestPtr get(ForwardType op_num) const;

static ForwardRequestPtr convertFromRequest(const KeeperStore::RequestForSession & request_for_session)
static ForwardRequestPtr convertFromRequest(const RequestForSession & request_for_session)
{
auto opnum = request_for_session.request->getOpNum();
switch (opnum)
Expand Down
76 changes: 45 additions & 31 deletions src/Service/KeeperCommon.cpp
Original file line number Diff line number Diff line change
@@ -1,55 +1,69 @@
#include <Service/KeeperCommon.h>
#include <Service/WriteBufferFromNuraftBuffer.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <ZooKeeper/ZooKeeperIO.h>
#include <boost/algorithm/string/split.hpp>
#include <Poco/File.h>
#include <Common/IO/WriteHelpers.h>

using namespace nuraft;
#include <Service/formatHex.h>

namespace RK
{

namespace ErrorCodes
String ErrorRequest::toString() const
{
extern const int INVALID_CONFIG_PARAMETER;
return fmt::format(
"[session_id:{}, xid:{}, opnum:{}, accepted:{}, error_code:{}]",
toHexString(session_id),
xid,
Coordination::toString(opnum),
accepted,
error_code);
}

int Directory::createDir(const std::string & dir)
String ErrorRequest::getRequestId() const
{
Poco::File(dir).createDirectories();
return 0;
return RequestId{session_id, xid}.toString();
}

std::string checkAndGetSuperdigest(const String & user_and_digest)
String RequestId::toString() const
{
if (user_and_digest.empty())
return "";
return fmt::format("[session_id:{}, xid:{}]", toHexString(session_id), xid);
}

std::vector<std::string> scheme_and_id;
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; });
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super")
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'");
bool RequestId::operator==(const RequestId & other) const
{
return session_id == other.session_id && xid == other.xid;
}

return user_and_digest;
std::size_t RequestId::RequestIdHash::operator()(const RequestId & request_id) const
{
std::size_t seed = 0;
std::hash<int64_t> hash64;
std::hash<int32_t> hash32;

seed ^= hash64(request_id.session_id);
seed ^= hash32(request_id.xid);

return seed;
}

nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request)
String RequestForSession::toString() const
{
RK::WriteBufferFromNuraftBuffer buf;
RK::writeIntBinary(session_id, buf);
request->write(buf);
Coordination::write(time, buf);
return buf.getBuffer();
return fmt::format(
"[session_id: {}, xid:{}, opnum:{}, create_time:{}, server_id:{}, client_id:{}, request:{}]",
toHexString(session_id),
request->xid,
Coordination::toString(request->getOpNum()),
create_time,
server_id,
client_id,
request->toString());
}

String RequestForSession::toSimpleString() const
{
return fmt::format(
"[session_id:{}, xid:{}, opnum:{}]", toHexString(session_id), request->xid, Coordination::toString(request->getOpNum()));
}

ptr<log_entry> makeClone(const ptr<log_entry> & entry)
RequestId RequestForSession::getRequestId() const
{
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type());
return clone;
return {session_id, request->xid};
}

}
111 changes: 49 additions & 62 deletions src/Service/KeeperCommon.h
Original file line number Diff line number Diff line change
@@ -1,91 +1,78 @@
#pragma once

#include <fstream>
#include <time.h>
#include <Service/Crc32.h>
#include <ZooKeeper/IKeeper.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <libnuraft/log_entry.hxx>
#include <libnuraft/nuraft.hxx>
#include <common/logger_useful.h>


namespace RK
{

std::string checkAndGetSuperdigest(const String & user_and_digest);
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request);
nuraft::ptr<nuraft::log_entry> makeClone(const nuraft::ptr<nuraft::log_entry> & entry);
struct RequestId;

struct BackendTimer
/// Attached session id to request
struct RequestForSession
{
static constexpr char TIME_FMT[] = "%Y%m%d%H%M%S";
int64_t session_id;
Coordination::ZooKeeperRequestPtr request;

/// default min interval is 1 hour
UInt32 interval = 1 * 3600;
UInt32 random_window = 1200; //20 minutes
/// measured in millisecond
int64_t create_time{};

static void getCurrentTime(std::string & date_str)
{
time_t curr_time;
time(&curr_time);
char tmp_buf[24];
std::strftime(tmp_buf, sizeof(tmp_buf), TIME_FMT, localtime(&curr_time));
date_str = tmp_buf;
}
/// for forward request
int32_t server_id{-1};
int32_t client_id{-1};

static time_t parseTime(const std::string & date_str)
{
struct tm prev_tm;
memset(&prev_tm, 0, sizeof(tm));
strptime(date_str.data(), TIME_FMT, &prev_tm);
time_t prev_time = mktime(&prev_tm);
return prev_time;
}
// /// RaftKeeper can generate request, for example: sessionCleanerTask
// bool is_internal{false};

explicit RequestForSession() = default;

bool isActionTime(const time_t & prev_time, time_t curr_time) const
RequestForSession(Coordination::ZooKeeperRequestPtr request_, int64_t session_id_, int64_t create_time_)
: session_id(session_id_), request(request_), create_time(create_time_)
{
if (curr_time == 0L)
time(&curr_time);
return difftime(curr_time, prev_time) >= (interval + rand() % random_window);
}
};

bool isForwardRequest() const { return server_id > -1 && client_id > -1; }
RequestId getRequestId() const;

String toString() const;
String toSimpleString() const;

class Directory
{
public:
static int createDir(const std::string & path);
};

inline int readUInt32(nuraft::ptr<std::fstream> & fs, UInt32 & x)
/// Attached session id to response
struct ResponseForSession
{
errno = 0;
char * buf = reinterpret_cast<char *>(&x);
fs->read(buf, sizeof(UInt32));
return fs->good() ? 0 : -1;
}
int64_t session_id;
Coordination::ZooKeeperResponsePtr response;
};

inline int writeUInt32(nuraft::ptr<std::fstream> & fs, const UInt32 & x)
/// Simple error request info.
struct ErrorRequest
{
errno = 0;
fs->write(reinterpret_cast<const char *>(&x), sizeof(UInt32));
return fs->good() ? 0 : -1;
}
bool accepted;
nuraft::cmd_result_code error_code; /// TODO new error code instead of NuRaft error code
int64_t session_id;
Coordination::XID xid;
Coordination::OpNum opnum;

String toString() const;
String getRequestId() const;
};

inline int readUInt64(nuraft::ptr<std::fstream> & fs, UInt64 & x)
/// Global client request id.
struct RequestId
{
errno = 0;
char * buf = reinterpret_cast<char *>(&x);
fs->read(buf, sizeof(UInt64));
return fs->good() ? 0 : -1;
}
int64_t session_id;
Coordination::XID xid;

inline int writeUInt64(nuraft::ptr<std::fstream> & fs, const UInt64 & x)
{
errno = 0;
fs->write(reinterpret_cast<const char *>(&x), sizeof(UInt64));
return fs->good() ? 0 : -1;
}
String toString() const;
bool operator==(const RequestId & other) const;

struct RequestIdHash
{
std::size_t operator()(const RequestId & request_id) const;
};
};

}
Loading

0 comments on commit 8ebc875

Please sign in to comment.