Skip to content

Commit

Permalink
use Non-blocking IO
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy committed May 31, 2024
1 parent 3fdb13c commit f60227b
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 54 deletions.
90 changes: 57 additions & 33 deletions src/Service/ConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ ConnectionHandler::ConnectionHandler(Context & global_context_, StreamSocket & s
{
LOG_INFO(log, "New connection from {}", peer);
registerConnection(this);
send_buf.emplace(sock);

auto read_handler = Observer<ConnectionHandler, ReadableNotification>(*this, &ConnectionHandler::onSocketReadable);
auto error_handler = Observer<ConnectionHandler, ErrorNotification>(*this, &ConnectionHandler::onSocketError);
Expand Down Expand Up @@ -249,22 +248,21 @@ void ConnectionHandler::onSocketReadable(const Notification &)
}
}


void ConnectionHandler::onSocketWritable(const Notification &)
{
LOG_TRACE(log, "Peer {}#{} is writable", peer, toHexString(session_id.load()));

auto remove_event_handler_if_needed = [this]
{
/// Double check to avoid dead lock
if (responses->empty())
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
std::lock_guard lock(send_response_mutex);
{
/// If all sent unregister writable event.
if (responses->empty())
/// If all sent, unregister writable event.
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
send_buf->next();

LOG_TRACE(log, "Remove socket writable event handler for peer {}", peer);

reactor.removeEventHandler(
Expand All @@ -274,9 +272,28 @@ void ConnectionHandler::onSocketWritable(const Notification &)
}
};

auto copy_buffer_to_send = [this]
{
auto used = send_buf.used();
if (used + out_buffer->used() <= SENT_BUFFER_SIZE)
{
send_buf.write(out_buffer->begin(), out_buffer->used());
out_buffer.reset();
}
else
{
send_buf.write(out_buffer->begin(), SENT_BUFFER_SIZE - used);
out_buffer->drain(SENT_BUFFER_SIZE - used);
}
};

try
{
while (!responses->empty())
/// If the buffer was not completely sent last time, continue sending.
if (out_buffer)
copy_buffer_to_send();

while (!responses->empty() && send_buf.available())
{
Coordination::ZooKeeperResponsePtr response;

Expand All @@ -285,8 +302,6 @@ void ConnectionHandler::onSocketWritable(const Notification &)

if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close)
{
send_buf->next();

LOG_DEBUG(log, "Received close event for session_id {}, internal_id {}", toHexString(session_id.load()), toHexString(internal_id.load()));
destroyMe();
return;
Expand All @@ -300,19 +315,21 @@ void ConnectionHandler::onSocketWritable(const Notification &)
destroyMe();
return;
}
continue;
copy_buffer_to_send();
}

response->write(*send_buf);

packageSent();

if (send_buf->available() >= SENT_BUFFER_SIZE)
else
{
send_buf->next();
WriteBufferFromFiFoBuffer buf;
response->write(buf);
out_buffer = buf.getBuffer();
copy_buffer_to_send();
}
packageSent();
}
/// If all sent remove writable event.

size_t sent = sock.sendBytes(send_buf);
Metrics::getMetrics().response_socket_send_size->add(sent);

remove_event_handler_if_needed();
}
catch (...)
Expand Down Expand Up @@ -470,6 +487,7 @@ bool ConnectionHandler::sendHandshake(const Coordination::ZooKeeperResponsePtr &
{
bool success;
uint64_t sid;
WriteBufferFromFiFoBuffer buf;

if (const auto * new_session_resp = dynamic_cast<const ZooKeeperNewSessionResponse *>(response.get()))
{
Expand All @@ -486,23 +504,22 @@ bool ConnectionHandler::sendHandshake(const Coordination::ZooKeeperResponsePtr &
throw Exception(ErrorCodes::LOGICAL_ERROR, "Bad session response {}", response->toString());
}

Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *send_buf);
Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, buf);
if (success)
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *send_buf);
Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, buf);
else
Coordination::write(42, *send_buf);
Coordination::write(42, buf);

/// Session timeout -1 represent session expired in Zookeeper
int32_t negotiated_session_timeout
= !success && response->error == Coordination::Error::ZSESSIONEXPIRED ? -1 : session_timeout.totalMilliseconds();
Coordination::write(negotiated_session_timeout, *send_buf);
Coordination::write(negotiated_session_timeout, buf);

Coordination::write(sid, *send_buf);
Coordination::write(sid, buf);
std::array<char, Coordination::PASSWORD_LENGTH> passwd{};
Coordination::write(passwd, *send_buf);
Coordination::write(passwd, buf);

send_buf->next();
packageSent();
out_buffer = buf.getBuffer();

return success;
}
Expand Down Expand Up @@ -615,9 +632,13 @@ void ConnectionHandler::sendSessionResponseToClient(const Coordination::ZooKeepe
{
std::lock_guard lock(send_response_mutex);
responses->push(response);
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));

if (responses->size() == 1)
{
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
reactor.wakeUp();
}
}
reactor.wakeUp();
}

void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKeeperResponsePtr & response)
Expand All @@ -630,12 +651,15 @@ void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKe
std::lock_guard lock(send_response_mutex);
responses->push(response);

/// Trigger socket writable event
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
/// If
if (responses->size() == 1)
{
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
}
}

/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
}

void ConnectionHandler::packageSent()
Expand Down
13 changes: 8 additions & 5 deletions src/Service/ConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,14 @@ class ConnectionHandler
/// destroy connection
void destroyMe();

static constexpr size_t SENT_BUFFER_SIZE = 1024;
std::optional<WriteBufferFromPocoSocket> send_buf;
// FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE);

std::shared_ptr<FIFOBuffer> is_close = nullptr;
// Todo Add configuration sent_buffer_size
static constexpr size_t SENT_BUFFER_SIZE = 16384;
FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE);

/// Storing the result of the response serialization temporarily,
/// We cannot directly serialize it onto send_buf,
/// because `send_buf` maybe too small to hold a large size response.
std::shared_ptr<Poco::BasicFIFOBuffer<char>> out_buffer;

Logger * log;

Expand Down
39 changes: 26 additions & 13 deletions src/Service/ForwardConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ ForwardConnectionHandler::ForwardConnectionHandler(Context & global_context_, St
, responses(std::make_unique<ThreadSafeForwardResponseQueue>())
{
LOG_INFO(log, "New forward connection from {}", sock.peerAddress().toString());
send_buf.emplace(sock);

auto read_handler = Observer<ForwardConnectionHandler, ReadableNotification>(*this, &ForwardConnectionHandler::onSocketReadable);
auto error_handler = Observer<ForwardConnectionHandler, ErrorNotification>(*this, &ForwardConnectionHandler::onSocketError);
Expand Down Expand Up @@ -267,15 +266,13 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &)
auto remove_event_handler_if_needed = [this]
{
/// Double check to avoid dead lock
if (responses->empty())
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
std::lock_guard lock(send_response_mutex);
{
/// If all sent unregister writable event.
if (responses->empty())
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
send_buf->next();

LOG_TRACE(log, "Remove forwarder socket writable event handler for server {} client {}", server_id, client_id);

reactor.removeEventHandler(
Expand All @@ -287,9 +284,24 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &)
}
};

auto copy_buffer_to_send = [this]
{
auto used = send_buf.used();
if (used + out_buffer->used() <= SENT_BUFFER_SIZE)
{
send_buf.write(out_buffer->begin(), out_buffer->used());
out_buffer.reset();
}
else
{
send_buf.write(out_buffer->begin(), SENT_BUFFER_SIZE - used);
out_buffer->drain(SENT_BUFFER_SIZE - used);
}
};

try
{
while (!responses->empty())
while (!responses->empty() && send_buf.available())
{
ForwardResponsePtr response;

Expand All @@ -300,18 +312,19 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &)
if (response->forwardType() == ForwardType::Destroy)
{
LOG_WARNING(log, "The connection for server {} client {} is stale, will close it", server_id, client_id);
delete this;
destroyMe();
return;
}

response->write(*send_buf);

if (send_buf->available() >= SENT_BUFFER_SIZE)
{
send_buf->next();
}
WriteBufferFromFiFoBuffer buf;
response->write(buf);
out_buffer = buf.getBuffer();
copy_buffer_to_send();
}

size_t sent = sock.sendBytes(send_buf);
Metrics::getMetrics().forward_response_socket_send_size->add(sent);

remove_event_handler_if_needed();
}
catch (...)
Expand Down
10 changes: 7 additions & 3 deletions src/Service/ForwardConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,13 @@ class ForwardConnectionHandler
/// destroy connection
void destroyMe();

static constexpr size_t SENT_BUFFER_SIZE = 1024;
std::optional<WriteBufferFromPocoSocket> send_buf;
// FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE);
static constexpr size_t SENT_BUFFER_SIZE = 16384;
FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE);

/// Storing the result of the response serialization temporarily,
/// We cannot directly serialize it onto send_buf,
/// because `send_buf` maybe too small to hold a large size response.
std::shared_ptr<Poco::BasicFIFOBuffer<char>> out_buffer;

Logger * log;

Expand Down
2 changes: 2 additions & 0 deletions src/Service/Metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ Metrics::Metrics()
{
push_request_queue_time_ms = getSummary("push_request_queue_time_ms", SummaryLevel::ADVANCED);
log_replication_batch_size = getSummary("log_replication_batch_size", SummaryLevel::BASIC);
response_socket_send_size = getSummary("response_socket_send_size", SummaryLevel::BASIC);
forward_response_socket_send_size = getSummary("forward_response_socket_send_size", SummaryLevel::BASIC);
apply_write_request_time_ms = getSummary("apply_write_request_time_ms", SummaryLevel::ADVANCED);
apply_read_request_time_ms = getSummary("apply_read_request_time_ms", SummaryLevel::ADVANCED);
read_latency = getSummary("readlatency", SummaryLevel::ADVANCED);
Expand Down
2 changes: 2 additions & 0 deletions src/Service/Metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ class Metrics

SummaryPtr push_request_queue_time_ms;
SummaryPtr log_replication_batch_size;
SummaryPtr response_socket_send_size;
SummaryPtr forward_response_socket_send_size;
SummaryPtr apply_write_request_time_ms;
SummaryPtr apply_read_request_time_ms;
SummaryPtr read_latency;
Expand Down

0 comments on commit f60227b

Please sign in to comment.