Skip to content

Commit

Permalink
Fix request processor endless loop
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Oct 9, 2023
1 parent 21efc22 commit eee45f5
Show file tree
Hide file tree
Showing 29 changed files with 633 additions and 508 deletions.
10 changes: 5 additions & 5 deletions src/Service/ForwardRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ ForwardResponsePtr ForwardHandshakeRequest::makeResponse() const
return std::make_shared<ForwardHandshakeResponse>();
}

KeeperStore::RequestForSession ForwardHandshakeRequest::requestForSession() const
RequestForSession ForwardHandshakeRequest::requestForSession() const
{
RequestForSession reuqest_info;
return reuqest_info;
Expand All @@ -62,7 +62,7 @@ ForwardResponsePtr ForwardSessionRequest::makeResponse() const
return std::make_shared<ForwardSessionResponse>();
}

KeeperStore::RequestForSession ForwardSessionRequest::requestForSession() const
RequestForSession ForwardSessionRequest::requestForSession() const
{
RequestForSession reuqest_info;
return reuqest_info;
Expand Down Expand Up @@ -100,7 +100,7 @@ ForwardResponsePtr ForwardGetSessionRequest::makeResponse() const
return res;
}

KeeperStore::RequestForSession ForwardGetSessionRequest::requestForSession() const
RequestForSession ForwardGetSessionRequest::requestForSession() const
{
RequestForSession reuqest_info;
reuqest_info.request = request;
Expand Down Expand Up @@ -141,7 +141,7 @@ ForwardResponsePtr ForwardUpdateSessionRequest::makeResponse() const
return res;
}

KeeperStore::RequestForSession ForwardUpdateSessionRequest::requestForSession() const
RequestForSession ForwardUpdateSessionRequest::requestForSession() const
{
RequestForSession reuqest_info;
reuqest_info.request = request;
Expand Down Expand Up @@ -186,7 +186,7 @@ ForwardResponsePtr ForwardOpRequest::makeResponse() const
return res;
}

KeeperStore::RequestForSession ForwardOpRequest::requestForSession() const
RequestForSession ForwardOpRequest::requestForSession() const
{
return request;
}
Expand Down
16 changes: 8 additions & 8 deletions src/Service/ForwardRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct ForwardRequest

virtual ForwardResponsePtr makeResponse() const = 0;

virtual KeeperStore::RequestForSession requestForSession() const = 0;
virtual RequestForSession requestForSession() const = 0;

virtual String toString() const = 0;

Expand All @@ -51,7 +51,7 @@ struct ForwardHandshakeRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -78,7 +78,7 @@ struct ForwardSessionRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -99,7 +99,7 @@ struct ForwardGetSessionRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -120,7 +120,7 @@ struct ForwardUpdateSessionRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -132,7 +132,7 @@ struct ForwardUpdateSessionRequest : public ForwardRequest

struct ForwardOpRequest : public ForwardRequest
{
KeeperStore::RequestForSession request;
RequestForSession request;

ForwardType forwardType() const override { return ForwardType::Operation; }

Expand All @@ -142,7 +142,7 @@ struct ForwardOpRequest : public ForwardRequest

ForwardResponsePtr makeResponse() const override;

KeeperStore::RequestForSession requestForSession() const override;
RequestForSession requestForSession() const override;

String toString() const override
{
Expand All @@ -162,7 +162,7 @@ class ForwardRequestFactory final : private boost::noncopyable

ForwardRequestPtr get(ForwardType op_num) const;

static ForwardRequestPtr convertFromRequest(const KeeperStore::RequestForSession & request_for_session)
static ForwardRequestPtr convertFromRequest(const RequestForSession & request_for_session)
{
auto opnum = request_for_session.request->getOpNum();
switch (opnum)
Expand Down
76 changes: 45 additions & 31 deletions src/Service/KeeperCommon.cpp
Original file line number Diff line number Diff line change
@@ -1,55 +1,69 @@
#include <Service/KeeperCommon.h>
#include <Service/WriteBufferFromNuraftBuffer.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <ZooKeeper/ZooKeeperIO.h>
#include <boost/algorithm/string/split.hpp>
#include <Poco/File.h>
#include <Common/IO/WriteHelpers.h>

using namespace nuraft;
#include <Service/formatHex.h>

namespace RK
{

namespace ErrorCodes
String ErrorRequest::toString() const
{
extern const int INVALID_CONFIG_PARAMETER;
return fmt::format(
"session_id:{}, xid:{}, opnum:{}, accepted:{}, error_code:{}",
toHexString(session_id),
xid,
Coordination::toString(opnum),
accepted,
error_code);
}

int Directory::createDir(const std::string & dir)
String ErrorRequest::getRequestId() const
{
Poco::File(dir).createDirectories();
return 0;
return RequestId{session_id, xid}.toString();
}

std::string checkAndGetSuperdigest(const String & user_and_digest)
String RequestId::toString() const
{
if (user_and_digest.empty())
return "";
return fmt::format("session_id:{}, xid:{}", toHexString(session_id), xid);
}

std::vector<std::string> scheme_and_id;
boost::split(scheme_and_id, user_and_digest, [](char c) { return c == ':'; });
if (scheme_and_id.size() != 2 || scheme_and_id[0] != "super")
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER, "Incorrect superdigest in keeper_server config. Must be 'super:base64string'");
bool RequestId::operator==(const RequestId & other) const
{
return session_id == other.session_id && xid == other.xid;
}

return user_and_digest;
std::size_t RequestId::RequestIdHash::operator()(const RequestId & request_id) const
{
std::size_t seed = 0;
std::hash<int64_t> hash64;
std::hash<int32_t> hash32;

seed ^= hash64(request_id.session_id) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
seed ^= hash32(request_id.xid) + 0x9e3779b9 + (seed << 6) + (seed >> 2);

return seed;
}

nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request)
String RequestForSession::toString() const
{
RK::WriteBufferFromNuraftBuffer buf;
RK::writeIntBinary(session_id, buf);
request->write(buf);
Coordination::write(time, buf);
return buf.getBuffer();
return fmt::format(
"session_id: {}, xid:{}, opnum:{}, create_time:{}, server_id:{}, client_id:{}, request:{}",
toHexString(session_id),
request->xid,
Coordination::toString(request->getOpNum()),
create_time,
server_id,
client_id,
request->toString());
}

String RequestForSession::toSimpleString() const
{
return fmt::format(
"session_id: {}, xid:{}, opnum:{}", toHexString(session_id), request->xid, Coordination::toString(request->getOpNum()));
}

ptr<log_entry> makeClone(const ptr<log_entry> & entry)
RequestId RequestForSession::getRequestId() const
{
ptr<log_entry> clone = cs_new<log_entry>(entry->get_term(), buffer::clone(entry->get_buf()), entry->get_val_type());
return clone;
return {session_id, request->xid};
}

}
109 changes: 42 additions & 67 deletions src/Service/KeeperCommon.h
Original file line number Diff line number Diff line change
@@ -1,91 +1,66 @@
#pragma once

#include <fstream>
#include <time.h>
#include <Service/Crc32.h>
#include <ZooKeeper/IKeeper.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <libnuraft/log_entry.hxx>
#include <libnuraft/nuraft.hxx>
#include <common/logger_useful.h>


namespace RK
{

std::string checkAndGetSuperdigest(const String & user_and_digest);
nuraft::ptr<nuraft::buffer> getZooKeeperLogEntry(int64_t session_id, int64_t time, const Coordination::ZooKeeperRequestPtr & request);
nuraft::ptr<nuraft::log_entry> makeClone(const nuraft::ptr<nuraft::log_entry> & entry);
/// Simple error request info.
struct ErrorRequest
{
bool accepted;
nuraft::cmd_result_code error_code; /// TODO new error code instead of NuRaft error code
int64_t session_id;
Coordination::XID xid;
Coordination::OpNum opnum;

String toString() const;
String getRequestId() const;
};

struct BackendTimer
/// Global client request id.
struct RequestId
{
static constexpr char TIME_FMT[] = "%Y%m%d%H%M%S";
int64_t session_id;
Coordination::XID xid;

/// default min interval is 1 hour
UInt32 interval = 1 * 3600;
UInt32 random_window = 1200; //20 minutes
String toString() const;
bool operator==(const RequestId & other) const;

static void getCurrentTime(std::string & date_str)
struct RequestIdHash
{
time_t curr_time;
time(&curr_time);
char tmp_buf[24];
std::strftime(tmp_buf, sizeof(tmp_buf), TIME_FMT, localtime(&curr_time));
date_str = tmp_buf;
}
std::size_t operator()(const RequestId & request_id) const;
};
};

static time_t parseTime(const std::string & date_str)
{
struct tm prev_tm;
memset(&prev_tm, 0, sizeof(tm));
strptime(date_str.data(), TIME_FMT, &prev_tm);
time_t prev_time = mktime(&prev_tm);
return prev_time;
}
/// Attached session id to request
struct RequestForSession
{
int64_t session_id;
Coordination::ZooKeeperRequestPtr request;

bool isActionTime(const time_t & prev_time, time_t curr_time) const
{
if (curr_time == 0L)
time(&curr_time);
return difftime(curr_time, prev_time) >= (interval + rand() % random_window);
}
};
/// measured in millisecond
int64_t create_time{};

/// for forward request
int32_t server_id{-1};
int32_t client_id{-1};

class Directory
{
public:
static int createDir(const std::string & path);
};
explicit RequestForSession() = default;

inline int readUInt32(nuraft::ptr<std::fstream> & fs, UInt32 & x)
{
errno = 0;
char * buf = reinterpret_cast<char *>(&x);
fs->read(buf, sizeof(UInt32));
return fs->good() ? 0 : -1;
}
RequestForSession(Coordination::ZooKeeperRequestPtr request_, int64_t session_id_, int64_t create_time_)
: session_id(session_id_), request(request_), create_time(create_time_)
{
}

inline int writeUInt32(nuraft::ptr<std::fstream> & fs, const UInt32 & x)
{
errno = 0;
fs->write(reinterpret_cast<const char *>(&x), sizeof(UInt32));
return fs->good() ? 0 : -1;
}
bool isForwardRequest() const { return server_id > -1 && client_id > -1; }
RequestId getRequestId() const;

inline int readUInt64(nuraft::ptr<std::fstream> & fs, UInt64 & x)
{
errno = 0;
char * buf = reinterpret_cast<char *>(&x);
fs->read(buf, sizeof(UInt64));
return fs->good() ? 0 : -1;
}
String toString() const;
String toSimpleString() const;

inline int writeUInt64(nuraft::ptr<std::fstream> & fs, const UInt64 & x)
{
errno = 0;
fs->write(reinterpret_cast<const char *>(&x), sizeof(UInt64));
return fs->good() ? 0 : -1;
}
};

}
10 changes: 5 additions & 5 deletions src/Service/KeeperDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ void KeeperDispatcher::requestThread(RunnerId runner_id)

while (!shutdown_called)
{
KeeperStore::RequestForSession request_for_session;
RequestForSession request_for_session;

UInt64 max_wait = configuration_and_settings->raft_settings->operation_timeout_ms;

Expand Down Expand Up @@ -161,7 +161,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ
return false;
}

KeeperStore::RequestForSession request_info;
RequestForSession request_info;
request_info.request = request;
request_info.session_id = session_id;
using namespace std::chrono;
Expand Down Expand Up @@ -191,7 +191,7 @@ bool KeeperDispatcher::putRequest(const Coordination::ZooKeeperRequestPtr & requ

bool KeeperDispatcher::putForwardingRequest(size_t server_id, size_t client_id, ForwardRequestPtr request)
{
KeeperStore::RequestForSession && request_info = request->requestForSession();
RequestForSession && request_info = request->requestForSession();

using namespace std::chrono;
request_info.create_time = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
Expand Down Expand Up @@ -316,7 +316,7 @@ void KeeperDispatcher::shutdown()
}

LOG_INFO(log, "for unhandled requests sending session expired error to client.");
KeeperStore::RequestForSession request_for_session;
RequestForSession request_for_session;
while (requests_queue->tryPopAny(request_for_session))
{
auto response = request_for_session.request->makeResponse();
Expand Down Expand Up @@ -375,7 +375,7 @@ void KeeperDispatcher::sessionCleanerTask()
Coordination::ZooKeeperRequestPtr request
= Coordination::ZooKeeperRequestFactory::instance().get(Coordination::OpNum::Close);
request->xid = Coordination::CLOSE_XID;
KeeperStore::RequestForSession request_info;
RequestForSession request_info;
request_info.request = request;
request_info.session_id = dead_session;
using namespace std::chrono;
Expand Down
Loading

0 comments on commit eee45f5

Please sign in to comment.