Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle response serialization in the IO thread #250

Merged
merged 5 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/Service/ConnCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <Service/Context.h>
#include <Service/ThreadSafeQueue.h>
#include <Service/WriteBufferFromFiFoBuffer.h>
#include <Service/ForwardResponse.h>
#include <ZooKeeper/ZooKeeperCommon.h>
#include <ZooKeeper/ZooKeeperConstants.h>
#include <Poco/Net/TCPServerConnection.h>
Expand Down Expand Up @@ -56,10 +57,12 @@ struct ConnectRequest
struct SocketInterruptablePollWrapper;
using SocketInterruptablePollWrapperPtr = std::unique_ptr<SocketInterruptablePollWrapper>;

using ThreadSafeResponseQueue = ThreadSafeQueue<std::shared_ptr<FIFOBuffer>>;

using ThreadSafeResponseQueue = ThreadSafeQueue<Coordination::ZooKeeperResponsePtr>;
using ThreadSafeResponseQueuePtr = std::unique_ptr<ThreadSafeResponseQueue>;

using ThreadSafeForwardResponseQueue = ThreadSafeQueue<ForwardResponsePtr>;
using ThreadSafeForwardResponseQueuePtr = std::unique_ptr<ThreadSafeForwardResponseQueue>;

struct LastOp;
using LastOpMultiVersion = MultiVersion<LastOp>;
using LastOpPtr = LastOpMultiVersion::Version;
Expand Down
220 changes: 108 additions & 112 deletions src/Service/ConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,100 +248,88 @@ void ConnectionHandler::onSocketReadable(const Notification &)
}
}


void ConnectionHandler::onSocketWritable(const Notification &)
{
LOG_TRACE(log, "Peer {}#{} is writable", peer, toHexString(session_id.load()));

auto remove_event_handler_if_needed = [this]
{
/// Double check to avoid dead lock
if (responses->empty() && send_buf.used() == 0)
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
std::lock_guard lock(send_response_mutex);
{
/// If all sent unregister writable event.
if (responses->empty() && send_buf.used() == 0)
/// If all sent, unregister writable event.
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
LOG_TRACE(log, "Remove socket writable event handler for peer {}", peer);

reactor.removeEventHandler(
sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
}
}
}
};

try
auto copy_buffer_to_send = [this]
{
if (responses->empty() && send_buf.used() == 0)
auto used = send_buf.used();
if (used + out_buffer->used() <= SENT_BUFFER_SIZE)
{
remove_event_handler_if_needed();
LOG_DEBUG(log, "Peer {} is writable, but there is nothing to send, will remove event handler.", peer);
return;
send_buf.write(out_buffer->begin(), out_buffer->used());
out_buffer.reset();
}
else
{
send_buf.write(out_buffer->begin(), SENT_BUFFER_SIZE - used);
out_buffer->drain(SENT_BUFFER_SIZE - used);
}
};

/// TODO use zero copy buffer
size_t size_to_sent = 0;

/// 1. accumulate data into tmp_buf
responses->forEach(
[&size_to_sent, this](const auto & resp) -> bool
{
if (resp == is_close)
return false;
try
{
/// If the buffer was not completely sent last time, continue sending.
if (out_buffer)
copy_buffer_to_send();

if (size_to_sent + resp->used() < SENT_BUFFER_SIZE)
{
/// add whole resp to send_buf
send_buf.write(resp->begin(), resp->used());
size_to_sent += resp->used();
}
else if (size_to_sent + resp->used() == SENT_BUFFER_SIZE)
{
/// add whole resp to send_buf
send_buf.write(resp->begin(), resp->used());
size_to_sent += resp->used();
}
else
{
/// add part of resp to send_buf
send_buf.write(resp->begin(), SENT_BUFFER_SIZE - size_to_sent);
}
return size_to_sent < SENT_BUFFER_SIZE;
});
while (!responses->empty() && send_buf.available())
{
Coordination::ZooKeeperResponsePtr response;

/// 2. send data
size_t sent = sock.sendBytes(send_buf);
if (!responses->tryPop(response))
throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug.");

/// 3. remove sent responses
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
LOG_DEBUG(log, "Received close event for session_id {}, internal_id {}", toHexString(session_id.load()), toHexString(internal_id.load()));
destroyMe();
return;
}

ptr<FIFOBuffer> resp;
while (responses->peek(resp) && sent > 0)
{
if (sent >= resp->used())
if (response->getOpNum() == OpNum::NewSession || response->getOpNum() == OpNum::UpdateSession)
{
sent -= resp->used();
responses->remove();
/// package sent
packageSent();
LOG_TRACE(log, "Sent response to {}#{}", peer, toHexString(session_id.load()));
if (!sendHandshake(response))
{
LOG_ERROR(log, "Failed to establish session, close connection.");
destroyMe();
return;
}
copy_buffer_to_send();
}
else
{
resp->drain(sent);
/// move data to begin
resp->begin();
sent = 0;
WriteBufferFromFiFoBuffer buf;
lzydmxy marked this conversation as resolved.
Show resolved Hide resolved
response->write(buf);
out_buffer = buf.getBuffer();
copy_buffer_to_send();
}
packageSent();
}

if (responses->peek(resp) && resp == is_close)
{
LOG_DEBUG(log, "Received close event for session_id {}, internal_id {}", toHexString(session_id.load()), toHexString(internal_id.load()));
destroyMe();
return;
}
size_t sent = sock.sendBytes(send_buf);
lzydmxy marked this conversation as resolved.
Show resolved Hide resolved
Metrics::getMetrics().response_socket_send_size->add(sent);

/// If all sent remove writable event.
remove_event_handler_if_needed();
}
catch (...)
Expand Down Expand Up @@ -495,6 +483,47 @@ Coordination::OpNum ConnectionHandler::receiveHandshake(int32_t handshake_req_le
return opnum;
}

bool ConnectionHandler::sendHandshake(const Coordination::ZooKeeperResponsePtr & response)
{
bool success;
uint64_t sid;
WriteBufferFromFiFoBuffer buf;

if (const auto * new_session_resp = dynamic_cast<const ZooKeeperNewSessionResponse *>(response.get()))
{
success = new_session_resp->success;
sid = new_session_resp->session_id;
}
else if (const auto * update_session_resp = dynamic_cast<const ZooKeeperUpdateSessionResponse *>(response.get()))
{
success = update_session_resp->success;
sid = update_session_resp->session_id;
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Bad session response {}", response->toString());
}

Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, buf);
if (success)
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, buf);
else
Coordination::write(42, buf);

/// Session timeout -1 represent session expired in Zookeeper
int32_t negotiated_session_timeout
= !success && response->error == Coordination::Error::ZSESSIONEXPIRED ? -1 : session_timeout.totalMilliseconds();
Coordination::write(negotiated_session_timeout, buf);

Coordination::write(sid, buf);
std::array<char, Coordination::PASSWORD_LENGTH> passwd{};
Coordination::write(passwd, buf);

out_buffer = buf.getBuffer();

return success;
}


bool ConnectionHandler::isHandShake(Int32 & handshake_length)
{
Expand Down Expand Up @@ -586,54 +615,29 @@ void ConnectionHandler::sendSessionResponseToClient(const Coordination::ZooKeepe
throw Exception(ErrorCodes::LOGICAL_ERROR, "Bad session response {}", response->toString());
}

WriteBufferFromFiFoBuffer buf;
Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, buf);
/// Register callback
if (success)
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, buf);
else
Coordination::write(42, buf);
{
session_id = sid;
handshake_done = true;

/// Session timeout -1 represent session expired in Zookeeper
int32_t negotiated_session_timeout
= !success && response->error == Coordination::Error::ZSESSIONEXPIRED ? -1 : session_timeout.totalMilliseconds();
Coordination::write(negotiated_session_timeout, buf);
keeper_dispatcher->unRegisterSessionResponseCallbackWithoutLock(id);
auto response_callback = [this](const Coordination::ZooKeeperResponsePtr & response_) { pushUserResponseToSendingQueue(response_); };

Coordination::write(sid, buf);
std::array<char, Coordination::PASSWORD_LENGTH> passwd{};
Coordination::write(passwd, buf);
bool is_reconnected = response->getOpNum() == Coordination::OpNum::UpdateSession;
keeper_dispatcher->registerUserResponseCallBackWithoutLock(sid, response_callback, is_reconnected);
}

// Send response to client
{
std::lock_guard lock(send_response_mutex);
responses->push(buf.getBuffer());
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
}
reactor.wakeUp();

if (!success)
{
LOG_ERROR(log, "Failed to establish session, close connection.");
responses->push(response);

// Send empty FIFOBuffer to close connection
if (responses->size() == 1)
{
std::lock_guard lock(send_response_mutex);
responses->push(ptr<FIFOBuffer>());
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
reactor.wakeUp();
}
reactor.wakeUp();
}
else
{
session_id = sid;
handshake_done = true;

/// 2. Register callback

keeper_dispatcher->unRegisterSessionResponseCallbackWithoutLock(id);
auto response_callback = [this](const Coordination::ZooKeeperResponsePtr & response_) { pushUserResponseToSendingQueue(response_); };

bool is_reconnected = response->getOpNum() == Coordination::OpNum::UpdateSession;
keeper_dispatcher->registerUserResponseCallBackWithoutLock(sid, response_callback, is_reconnected);
}
}

Expand All @@ -645,25 +649,17 @@ void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKe
/// Lock to avoid data condition which will lead response leak
{
std::lock_guard lock(send_response_mutex);
/// We do not need send anything for close request to client.
if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
responses->push(ptr<FIFOBuffer>());
}
else
responses->push(response);

/// If
if (responses->size() == 1)
{
WriteBufferFromFiFoBuffer buf;
response->write(buf);
/// TODO handle push timeout
responses->push(buf.getBuffer());
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
}

/// Trigger socket writable event
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
}

/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
}

void ConnectionHandler::packageSent()
Expand Down
9 changes: 7 additions & 2 deletions src/Service/ConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ConnectionHandler

private:
Coordination::OpNum receiveHandshake(int32_t handshake_length);
bool sendHandshake(const Coordination::ZooKeeperResponsePtr & response);
static bool isHandShake(Int32 & handshake_length);

void tryExecuteFourLetterWordCmd(int32_t four_letter_cmd);
Expand All @@ -91,10 +92,14 @@ class ConnectionHandler
/// destroy connection
void destroyMe();

static constexpr size_t SENT_BUFFER_SIZE = 1024;
// Todo Add configuration sent_buffer_size
static constexpr size_t SENT_BUFFER_SIZE = 16384;
FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE);

std::shared_ptr<FIFOBuffer> is_close = nullptr;
/// Storing the result of the response serialization temporarily,
/// We cannot directly serialize it onto send_buf,
/// because `send_buf` maybe too small to hold a large size response.
std::shared_ptr<Poco::BasicFIFOBuffer<char>> out_buffer;

Logger * log;

Expand Down
Loading
Loading