Skip to content

Commit

Permalink
Rename forwarding to forward
Browse files Browse the repository at this point in the history
  • Loading branch information
JackyWoo committed Jan 4, 2024
1 parent d036953 commit 3d613e7
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 47 deletions.
4 changes: 2 additions & 2 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
= global_context.getConfigRef().getUInt("keeper.raft_settings.operation_timeout_ms", Coordination::DEFAULT_OPERATION_TIMEOUT_MS);

/// start server
std::shared_ptr<SvsSocketReactor<SocketReactor>> server;
std::shared_ptr<SvsSocketAcceptor<ConnectionHandler, SocketReactor>> conn_acceptor;
AsyncSocketReactorPtr server;
std::shared_ptr<SocketAcceptor<ConnectionHandler>> conn_acceptor;
int32_t port = config().getInt("keeper.port", 8101);

auto cpu_core_size = getNumberOfPhysicalCPUCores();
Expand Down
4 changes: 2 additions & 2 deletions src/Common/ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,13 @@
M(106, INVALID_LOG_LEVEL) \
M(107, UNEXPECTED_ZOOKEEPER_ERROR) \
M(108, UNEXPECTED_NODE_IN_ZOOKEEPER) \
M(109, RAFT_FORWARDING_ERROR) \
M(109, RAFT_FORWARD_ERROR) \
M(110, CANNOT_PTHREAD_ATTR) \
M(111, UNEXPECTED_FORWARD_PACKET) \
M(112, RAFT_IS_LEADER) \
M(113, RAFT_NO_LEADER) \
M(114, RAFT_FWD_NO_CONN) \
M(115, FORWARDING_DISCONNECTED) \
M(115, FORWARD_NOT_CONNECTED) \
M(116, ILLEGAL_SETTING_VALUE) \
/* See END */

Expand Down
10 changes: 5 additions & 5 deletions src/Service/ForwardConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ namespace ErrorCodes
extern const int ALL_CONNECTION_TRIES_FAILED;
extern const int NETWORK_ERROR;
extern const int UNEXPECTED_FORWARD_PACKET;
extern const int RAFT_FORWARDING_ERROR;
extern const int FORWARDING_DISCONNECTED;
extern const int RAFT_FORWARD_ERROR;
extern const int FORWARD_NOT_CONNECTED;
}

void ForwardConnection::connect()
Expand Down Expand Up @@ -44,7 +44,7 @@ void ForwardConnection::connect()
LOG_TRACE(log, "Sent handshake to {}", endpoint);

if (!receiveHandshake())
throw Exception(ErrorCodes::RAFT_FORWARDING_ERROR, "Handshake with {} failed", endpoint);
throw Exception(ErrorCodes::RAFT_FORWARD_ERROR, "Handshake with {} failed", endpoint);

connected = true;
LOG_TRACE(log, "Connect to {} success", endpoint);
Expand Down Expand Up @@ -76,7 +76,7 @@ void ForwardConnection::send(ForwardRequestPtr request)
if (!connected)
throw Exception("Connect to server failed", ErrorCodes::ALL_CONNECTION_TRIES_FAILED);

LOG_TRACE(log, "Forwarding request {} to endpoint {}", request->toString(), endpoint);
LOG_TRACE(log, "Forward request {} to endpoint {}", request->toString(), endpoint);

try
{
Expand All @@ -99,7 +99,7 @@ bool ForwardConnection::poll(UInt64 timeout_microseconds)
void ForwardConnection::receive(ForwardResponsePtr & response)
{
if (!connected)
throw Exception("Forwarding connection disconnected", ErrorCodes::FORWARDING_DISCONNECTED);
throw Exception(ErrorCodes::FORWARD_NOT_CONNECTED, "Forwarding connection disconnected");

/// There are two situations,
/// 1. Feedback not accepted.
Expand Down
6 changes: 3 additions & 3 deletions src/Service/ForwardConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void ForwardConnectionHandler::onSocketReadable(const Notification &)
{
try
{
LOG_TRACE(log, "Forwarding handler socket readable");
LOG_TRACE(log, "Forward handler socket readable");
if (!sock.available())
{
LOG_INFO(log, "Client close connection!");
Expand All @@ -68,7 +68,7 @@ void ForwardConnectionHandler::onSocketReadable(const Notification &)

while (sock.available())
{
LOG_TRACE(log, "forwarding handler socket available");
LOG_TRACE(log, "Forward handler socket available");

if (current_package.is_done)
{
Expand Down Expand Up @@ -235,7 +235,7 @@ void ForwardConnectionHandler::processUserOrSessionRequest(ForwardRequestPtr req
{
ReadBufferFromMemory body(req_body_buf->begin(), req_body_buf->used());
request->readImpl(body);
keeper_dispatcher->pushForwardingRequest(server_id, client_id, request);
keeper_dispatcher->pushForwardRequest(server_id, client_id, request);
}

void ForwardConnectionHandler::processHandshake()
Expand Down
4 changes: 2 additions & 2 deletions src/Service/ForwardConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Common/NIO/SocketReactor.h>

#include <Service/ConnCommon.h>
#include <Service/ForwardingConnection.h>
#include <Service/ForwardConnection.h>
#include <Service/WriteBufferFromFiFoBuffer.h>


Expand All @@ -32,7 +32,7 @@ using Poco::Logger;
using Poco::Thread;

/**
* Server endpoint for forwarding request.
* Server endpoint for forward request.
*/
class ForwardConnectionHandler
{
Expand Down
14 changes: 7 additions & 7 deletions src/Service/KeeperDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void KeeperDispatcher::invokeResponseCallBack(int64_t session_id, const Coordina
}
}

void KeeperDispatcher::invokeForwardResponseCallBack(ForwardingClientId client_id, ForwardResponsePtr response)
void KeeperDispatcher::invokeForwardResponseCallBack(ForwardClientId client_id, ForwardResponsePtr response)
{
std::lock_guard lock(forward_response_callbacks_mutex);
auto forward_response_writer = forward_response_callbacks.find(client_id);
Expand Down Expand Up @@ -204,7 +204,7 @@ bool KeeperDispatcher::pushRequest(const Coordination::ZooKeeperRequestPtr & req
}


bool KeeperDispatcher::pushForwardingRequest(size_t server_id, size_t client_id, ForwardRequestPtr request)
bool KeeperDispatcher::pushForwardRequest(size_t server_id, size_t client_id, ForwardRequestPtr request)
{
RequestForSession && request_info = request->requestForSession();

Expand All @@ -216,7 +216,7 @@ bool KeeperDispatcher::pushForwardingRequest(size_t server_id, size_t client_id,

LOG_TRACE(
log,
"Push forwarding request #{}#{}#{} which is from server {} client {}",
"Push forward request #{}#{}#{} which is from server {} client {}",
toHexString(request_info.session_id),
request_info.request->xid,
Coordination::toString(request_info.request->getOpNum()),
Expand All @@ -230,7 +230,7 @@ bool KeeperDispatcher::pushForwardingRequest(size_t server_id, size_t client_id,
throw Exception(ErrorCodes::SYSTEM_ERROR, "Cannot push request to queue");
}
else if (!requests_queue->tryPush(std::move(request_info), configuration_and_settings->raft_settings->operation_timeout_ms))
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push forwarding request to queue within operation timeout");
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED, "Cannot push forward request to queue within operation timeout");
return true;
}

Expand Down Expand Up @@ -398,15 +398,15 @@ void KeeperDispatcher::unregisterUserResponseCallBackWithoutLock(int64_t session
user_response_callbacks.erase(it);
}

void KeeperDispatcher::registerForwarderResponseCallBack(ForwardingClientId client_id, ForwardResponseCallback callback)
void KeeperDispatcher::registerForwarderResponseCallBack(ForwardClientId client_id, ForwardResponseCallback callback)
{
std::lock_guard lock(forward_response_callbacks_mutex);

if (forward_response_callbacks.contains(client_id))
{
LOG_WARNING(
log,
"Receive new forwarding connection from server_id {}, client_id {}, will destroy the older one",
"Receive new forward connection from server_id {}, client_id {}, will destroy the older one",
client_id.first,
client_id.second);
auto & call_back = forward_response_callbacks[client_id];
Expand All @@ -418,7 +418,7 @@ void KeeperDispatcher::registerForwarderResponseCallBack(ForwardingClientId clie
forward_response_callbacks.emplace(client_id, callback);
}

void KeeperDispatcher::unRegisterForwarderResponseCallBack(ForwardingClientId client_id)
void KeeperDispatcher::unRegisterForwarderResponseCallBack(ForwardClientId client_id)
{
std::lock_guard lock(forward_response_callbacks_mutex);
auto forward_response_writer = forward_response_callbacks.find(client_id);
Expand Down
16 changes: 8 additions & 8 deletions src/Service/KeeperDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ class KeeperDispatcher : public std::enable_shared_from_this<KeeperDispatcher>
};

/// <server id, client id>
using ForwardingClientId = std::pair<int32_t, int32_t>;
using ForwardResponseCallbacks = std::unordered_map<ForwardingClientId, ForwardResponseCallback, PairHash>;
using ForwardClientId = std::pair<int32_t, int32_t>;
using ForwardResponseCallbacks = std::unordered_map<ForwardClientId, ForwardResponseCallback, PairHash>;

ForwardResponseCallbacks forward_response_callbacks;
std::mutex forward_response_callbacks_mutex;
Expand Down Expand Up @@ -122,8 +122,8 @@ class KeeperDispatcher : public std::enable_shared_from_this<KeeperDispatcher>
/// Push new session or update session request
bool pushSessionRequest(const Coordination::ZooKeeperRequestPtr & request, int64_t internal_id);

/// Push forwarding request
bool pushForwardingRequest(size_t server_id, size_t client_id, ForwardRequestPtr request);
/// Push forward request
bool pushForwardRequest(size_t server_id, size_t client_id, ForwardRequestPtr request);

/// TODO remove
int64_t newSession(int64_t session_timeout_ms) { return server->newSession(session_timeout_ms); }
Expand All @@ -133,8 +133,8 @@ class KeeperDispatcher : public std::enable_shared_from_this<KeeperDispatcher>
}

/// Register response callback for forwarder
void registerForwarderResponseCallBack(ForwardingClientId client_id, ForwardResponseCallback callback);
void unRegisterForwarderResponseCallBack(ForwardingClientId client_id);
void registerForwarderResponseCallBack(ForwardClientId client_id, ForwardResponseCallback callback);
void unRegisterForwarderResponseCallBack(ForwardClientId client_id);

/// Register response callback for user request
[[maybe_unused]] void registerUserResponseCallBack(int64_t session_id, ZooKeeperResponseCallback callback, bool is_reconnected = false);
Expand Down Expand Up @@ -163,8 +163,8 @@ class KeeperDispatcher : public std::enable_shared_from_this<KeeperDispatcher>
/// Invoked when a request completes.
void updateKeeperStatLatency(uint64_t process_time_ms);

/// Send forwarding response
void invokeForwardResponseCallBack(ForwardingClientId client_id, ForwardResponsePtr response);
/// Send forward response
void invokeForwardResponseCallBack(ForwardClientId client_id, ForwardResponsePtr response);

/// Are we leader
bool isLeader() const { return server->isLeader(); }
Expand Down
29 changes: 14 additions & 15 deletions src/Service/RequestForwarder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ namespace RK

namespace ErrorCodes
{
extern const int RAFT_FORWARDING_ERROR;
extern const int RAFT_FORWARD_ERROR;
extern const int RAFT_IS_LEADER;
extern const int RAFT_NO_LEADER;
extern const int RAFT_FWD_NO_CONN;
Expand All @@ -23,7 +23,7 @@ void RequestForwarder::runSend(RunnerId runner_id)
{
setThreadName(("ReqFwdSend#" + toString(runner_id)).c_str());

LOG_DEBUG(log, "Starting forwarding request sending thread.");
LOG_DEBUG(log, "Starting forward request sending thread.");
while (!shutdown_called)
{
UInt64 max_wait = session_sync_period_ms;
Expand Down Expand Up @@ -62,7 +62,7 @@ void RequestForwarder::runSend(RunnerId runner_id)
forward_request->send_time = clock::now();
connection->send(forward_request);

forwarding_queues[runner_id]->push(std::move(forward_request));
forward_request_queue[runner_id]->push(std::move(forward_request));
}
catch (...)
{
Expand Down Expand Up @@ -104,14 +104,13 @@ void RequestForwarder::runSend(RunnerId runner_id)
ForwardRequestPtr forward_request = std::make_shared<ForwardSyncSessionsRequest>(std::move(session_to_expiration_time));
forward_request->send_time = clock::now();
connection->send(forward_request);
forwarding_queues[runner_id]->push(std::move(forward_request));
forward_request_queue[runner_id]->push(std::move(forward_request));
}
}
else
{
throw Exception(
"Not found connection when sending sessions for runner " + std::to_string(runner_id),
ErrorCodes::RAFT_FORWARDING_ERROR);
ErrorCodes::RAFT_FORWARD_ERROR, "Not found connection when sending sessions for runner {}", runner_id);
}
}
catch (...)
Expand All @@ -130,7 +129,7 @@ void RequestForwarder::runReceive(RunnerId runner_id)
{
setThreadName(("ReqFwdRecv#" + toString(runner_id)).c_str());

LOG_DEBUG(log, "Starting forwarding response receiving thread.");
LOG_DEBUG(log, "Starting forward response receiving thread.");
while (!shutdown_called)
{
try
Expand All @@ -140,7 +139,7 @@ void RequestForwarder::runReceive(RunnerId runner_id)

/// Check if the earliest request has timed out. And handle all timed out requests.
ForwardRequestPtr earliest_request;
if (forwarding_queues[runner_id]->peek(earliest_request))
if (forward_request_queue[runner_id]->peek(earliest_request))
{
auto earliest_request_deadline = earliest_request->send_time + std::chrono::microseconds(operation_timeout.totalMicroseconds());
if (now > earliest_request_deadline)
Expand Down Expand Up @@ -195,15 +194,15 @@ void RequestForwarder::runReceive(RunnerId runner_id)
}
catch (...)
{
tryLogCurrentException(log, "Error when receiving forwarding response, runner " + std::to_string(runner_id));
tryLogCurrentException(log, "Error when receiving forward response, runner " + std::to_string(runner_id));
std::this_thread::sleep_for(std::chrono::milliseconds(session_sync_period_ms));
}
}
}

bool RequestForwarder::processTimeoutRequest(RunnerId runner_id, ForwardRequestPtr newFront)
{
LOG_INFO(log, "Process timeout request for runner {} queue size {}", runner_id, forwarding_queues[runner_id]->size());
LOG_INFO(log, "Process timeout request for runner {} queue size {}", runner_id, forward_request_queue[runner_id]->size());

clock::time_point now = clock::now();

Expand All @@ -226,13 +225,13 @@ bool RequestForwarder::processTimeoutRequest(RunnerId runner_id, ForwardRequestP
}
};

return forwarding_queues[runner_id]->removeFrontIf(func, newFront);
return forward_request_queue[runner_id]->removeFrontIf(func, newFront);
}


bool RequestForwarder::removeFromQueue(RunnerId runner_id, ForwardResponsePtr forward_response_ptr)
{
return forwarding_queues[runner_id]->findAndRemove([forward_response_ptr](const ForwardRequestPtr & request) -> bool
return forward_request_queue[runner_id]->findAndRemove([forward_response_ptr](const ForwardRequestPtr & request) -> bool
{
if (request->forwardType() != forward_response_ptr->forwardType())
return false;
Expand Down Expand Up @@ -266,9 +265,9 @@ void RequestForwarder::shutdown()
request_thread->wait();
response_thread->wait();

for (auto & forwarding_queue : forwarding_queues)
for (auto & queue : forward_request_queue)
{
forwarding_queue->forEach([this](const ForwardRequestPtr & request) -> bool
queue->forEach([this](const ForwardRequestPtr & request) -> bool
{
ForwardResponsePtr response = request->makeResponse();
response->setAppendEntryResult(false, nuraft::cmd_result_code::FAILED);
Expand Down Expand Up @@ -380,7 +379,7 @@ void RequestForwarder::initialize(

for (RunnerId runner_id = 0; runner_id < thread_count; runner_id++)
{
forwarding_queues.push_back(std::make_unique<ForwardingQueue>());
forward_request_queue.push_back(std::make_unique<ForwardRequestQueue>());
}

initConnections();
Expand Down
6 changes: 3 additions & 3 deletions src/Service/RequestForwarder.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class RequestForwarder
std::atomic<UInt64> session_sync_idx{0};
Stopwatch session_sync_time_watch;

using ForwardingQueue = ThreadSafeQueue<ForwardRequestPtr, std::list<ForwardRequestPtr>>;
using ForwardingQueuePtr = std::unique_ptr<ForwardingQueue>;
std::vector<ForwardingQueuePtr> forwarding_queues;
using ForwardRequestQueue = ThreadSafeQueue<ForwardRequestPtr, std::list<ForwardRequestPtr>>;
using ForwardRequestQueuePtr = std::unique_ptr<ForwardRequestQueue>;
std::vector<ForwardRequestQueuePtr> forward_request_queue;

Poco::Timespan operation_timeout;

Expand Down

0 comments on commit 3d613e7

Please sign in to comment.