diff --git a/src/Service/ConnectionHandler.cpp b/src/Service/ConnectionHandler.cpp index 2f2c074c5d..8e84c1bf23 100644 --- a/src/Service/ConnectionHandler.cpp +++ b/src/Service/ConnectionHandler.cpp @@ -77,7 +77,6 @@ ConnectionHandler::ConnectionHandler(Context & global_context_, StreamSocket & s { LOG_INFO(log, "New connection from {}", peer); registerConnection(this); - send_buf.emplace(sock); auto read_handler = Observer<ConnectionHandler, ReadableNotification>(*this, &ConnectionHandler::onSocketReadable); auto error_handler = Observer<ConnectionHandler, ErrorNotification>(*this, &ConnectionHandler::onSocketError); @@ -249,6 +248,7 @@ void ConnectionHandler::onSocketReadable(const Notification &) } } + void ConnectionHandler::onSocketWritable(const Notification &) { LOG_TRACE(log, "Peer {}#{} is writable", peer, toHexString(session_id.load())); @@ -256,15 +256,13 @@ void ConnectionHandler::onSocketWritable(const Notification &) auto remove_event_handler_if_needed = [this] { /// Double check to avoid dead lock - if (responses->empty()) + if (responses->empty() && send_buf.isEmpty() && !out_buffer) { std::lock_guard lock(send_response_mutex); { - /// If all sent unregister writable event. - if (responses->empty()) + /// If all sent, unregister writable event. + if (responses->empty() && send_buf.isEmpty() && !out_buffer) { - send_buf->next(); - LOG_TRACE(log, "Remove socket writable event handler for peer {}", peer); reactor.removeEventHandler( @@ -274,9 +272,28 @@ void ConnectionHandler::onSocketWritable(const Notification &) } }; + auto copy_buffer_to_send = [this] + { + auto used = send_buf.used(); + if (used + out_buffer->used() <= SENT_BUFFER_SIZE) + { + send_buf.write(out_buffer->begin(), out_buffer->used()); + out_buffer.reset(); + } + else + { + send_buf.write(out_buffer->begin(), SENT_BUFFER_SIZE - used); + out_buffer->drain(SENT_BUFFER_SIZE - used); + } + }; + try { - while (!responses->empty()) + /// If the buffer was not completely sent last time, continue sending. + if (out_buffer) + copy_buffer_to_send(); + + while (!responses->empty() && send_buf.available()) { Coordination::ZooKeeperResponsePtr response; @@ -285,8 +302,6 @@ void ConnectionHandler::onSocketWritable(const Notification &) if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) { - send_buf->next(); - LOG_DEBUG(log, "Received close event for session_id {}, internal_id {}", toHexString(session_id.load()), toHexString(internal_id.load())); destroyMe(); return; @@ -300,19 +315,21 @@ void ConnectionHandler::onSocketWritable(const Notification &) destroyMe(); return; } - continue; + copy_buffer_to_send(); } - - response->write(*send_buf); - - packageSent(); - - if (send_buf->available() >= SENT_BUFFER_SIZE) + else { - send_buf->next(); + WriteBufferFromFiFoBuffer buf; + response->write(buf); + out_buffer = buf.getBuffer(); + copy_buffer_to_send(); } + packageSent(); } - /// If all sent remove writable event. + + size_t sent = sock.sendBytes(send_buf); + Metrics::getMetrics().response_socket_send_size->add(sent); + remove_event_handler_if_needed(); } catch (...) @@ -470,6 +487,7 @@ bool ConnectionHandler::sendHandshake(const Coordination::ZooKeeperResponsePtr & { bool success; uint64_t sid; + WriteBufferFromFiFoBuffer buf; if (const auto * new_session_resp = dynamic_cast<const ZooKeeperNewSessionResponse *>(response.get())) { @@ -486,23 +504,22 @@ bool ConnectionHandler::sendHandshake(const Coordination::ZooKeeperResponsePtr & throw Exception(ErrorCodes::LOGICAL_ERROR, "Bad session response {}", response->toString()); } - Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, *send_buf); + Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, buf); if (success) - Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, *send_buf); + Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, buf); else - Coordination::write(42, *send_buf); + Coordination::write(42, buf); /// Session timeout -1 represent session expired in Zookeeper int32_t negotiated_session_timeout = !success && response->error == Coordination::Error::ZSESSIONEXPIRED ? -1 : session_timeout.totalMilliseconds(); - Coordination::write(negotiated_session_timeout, *send_buf); + Coordination::write(negotiated_session_timeout, buf); - Coordination::write(sid, *send_buf); + Coordination::write(sid, buf); std::array<char, Coordination::PASSWORD_LENGTH> passwd{}; - Coordination::write(passwd, *send_buf); + Coordination::write(passwd, buf); - send_buf->next(); - packageSent(); + out_buffer = buf.getBuffer(); return success; } @@ -615,9 +632,13 @@ void ConnectionHandler::sendSessionResponseToClient(const Coordination::ZooKeepe { std::lock_guard lock(send_response_mutex); responses->push(response); - reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable)); + + if (responses->size() == 1) + { + reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable)); + reactor.wakeUp(); + } } - reactor.wakeUp(); } void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKeeperResponsePtr & response) @@ -630,12 +651,15 @@ void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKe std::lock_guard lock(send_response_mutex); responses->push(response); - /// Trigger socket writable event - reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable)); + /// If + if (responses->size() == 1) + { + /// We must wake up getWorkerReactor to interrupt it's sleeping. + reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable)); + /// We must wake up getWorkerReactor to interrupt it's sleeping. + reactor.wakeUp(); + } } - - /// We must wake up getWorkerReactor to interrupt it's sleeping. - reactor.wakeUp(); } void ConnectionHandler::packageSent() diff --git a/src/Service/ConnectionHandler.h b/src/Service/ConnectionHandler.h index 757d80f75b..1dc5845ba1 100644 --- a/src/Service/ConnectionHandler.h +++ b/src/Service/ConnectionHandler.h @@ -92,11 +92,14 @@ class ConnectionHandler /// destroy connection void destroyMe(); - static constexpr size_t SENT_BUFFER_SIZE = 1024; - std::optional<WriteBufferFromPocoSocket> send_buf; -// FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE); - - std::shared_ptr<FIFOBuffer> is_close = nullptr; + // Todo Add configuration sent_buffer_size + static constexpr size_t SENT_BUFFER_SIZE = 16384; + FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE); + + /// Storing the result of the response serialization temporarily, + /// We cannot directly serialize it onto send_buf, + /// because `send_buf` maybe too small to hold a large size response. + std::shared_ptr<Poco::BasicFIFOBuffer<char>> out_buffer; Logger * log; diff --git a/src/Service/ForwardConnectionHandler.cpp b/src/Service/ForwardConnectionHandler.cpp index 5feac7bebc..f67a22dcfa 100644 --- a/src/Service/ForwardConnectionHandler.cpp +++ b/src/Service/ForwardConnectionHandler.cpp @@ -24,7 +24,6 @@ ForwardConnectionHandler::ForwardConnectionHandler(Context & global_context_, St , responses(std::make_unique<ThreadSafeForwardResponseQueue>()) { LOG_INFO(log, "New forward connection from {}", sock.peerAddress().toString()); - send_buf.emplace(sock); auto read_handler = Observer<ForwardConnectionHandler, ReadableNotification>(*this, &ForwardConnectionHandler::onSocketReadable); auto error_handler = Observer<ForwardConnectionHandler, ErrorNotification>(*this, &ForwardConnectionHandler::onSocketError); @@ -267,15 +266,13 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &) auto remove_event_handler_if_needed = [this] { /// Double check to avoid dead lock - if (responses->empty()) + if (responses->empty() && send_buf.isEmpty() && !out_buffer) { std::lock_guard lock(send_response_mutex); { /// If all sent unregister writable event. - if (responses->empty()) + if (responses->empty() && send_buf.isEmpty() && !out_buffer) { - send_buf->next(); - LOG_TRACE(log, "Remove forwarder socket writable event handler for server {} client {}", server_id, client_id); reactor.removeEventHandler( @@ -287,9 +284,24 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &) } }; + auto copy_buffer_to_send = [this] + { + auto used = send_buf.used(); + if (used + out_buffer->used() <= SENT_BUFFER_SIZE) + { + send_buf.write(out_buffer->begin(), out_buffer->used()); + out_buffer.reset(); + } + else + { + send_buf.write(out_buffer->begin(), SENT_BUFFER_SIZE - used); + out_buffer->drain(SENT_BUFFER_SIZE - used); + } + }; + try { - while (!responses->empty()) + while (!responses->empty() && send_buf.available()) { ForwardResponsePtr response; @@ -300,18 +312,19 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &) if (response->forwardType() == ForwardType::Destroy) { LOG_WARNING(log, "The connection for server {} client {} is stale, will close it", server_id, client_id); - delete this; + destroyMe(); return; } - response->write(*send_buf); - - if (send_buf->available() >= SENT_BUFFER_SIZE) - { - send_buf->next(); - } + WriteBufferFromFiFoBuffer buf; + response->write(buf); + out_buffer = buf.getBuffer(); + copy_buffer_to_send(); } + size_t sent = sock.sendBytes(send_buf); + Metrics::getMetrics().forward_response_socket_send_size->add(sent); + remove_event_handler_if_needed(); } catch (...) diff --git a/src/Service/ForwardConnectionHandler.h b/src/Service/ForwardConnectionHandler.h index 0aeddb70f3..f73bdbe443 100644 --- a/src/Service/ForwardConnectionHandler.h +++ b/src/Service/ForwardConnectionHandler.h @@ -40,9 +40,13 @@ class ForwardConnectionHandler /// destroy connection void destroyMe(); - static constexpr size_t SENT_BUFFER_SIZE = 1024; - std::optional<WriteBufferFromPocoSocket> send_buf; -// FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE); + static constexpr size_t SENT_BUFFER_SIZE = 16384; + FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE); + + /// Storing the result of the response serialization temporarily, + /// We cannot directly serialize it onto send_buf, + /// because `send_buf` maybe too small to hold a large size response. + std::shared_ptr<Poco::BasicFIFOBuffer<char>> out_buffer; Logger * log; diff --git a/src/Service/Metrics.cpp b/src/Service/Metrics.cpp index 41a2e55cc4..c3fbb8e449 100644 --- a/src/Service/Metrics.cpp +++ b/src/Service/Metrics.cpp @@ -120,6 +120,8 @@ Metrics::Metrics() { push_request_queue_time_ms = getSummary("push_request_queue_time_ms", SummaryLevel::ADVANCED); log_replication_batch_size = getSummary("log_replication_batch_size", SummaryLevel::BASIC); + response_socket_send_size = getSummary("response_socket_send_size", SummaryLevel::BASIC); + forward_response_socket_send_size = getSummary("forward_response_socket_send_size", SummaryLevel::BASIC); apply_write_request_time_ms = getSummary("apply_write_request_time_ms", SummaryLevel::ADVANCED); apply_read_request_time_ms = getSummary("apply_read_request_time_ms", SummaryLevel::ADVANCED); read_latency = getSummary("readlatency", SummaryLevel::ADVANCED); diff --git a/src/Service/Metrics.h b/src/Service/Metrics.h index 005fc0322d..65f15f1b78 100644 --- a/src/Service/Metrics.h +++ b/src/Service/Metrics.h @@ -196,6 +196,8 @@ class Metrics SummaryPtr push_request_queue_time_ms; SummaryPtr log_replication_batch_size; + SummaryPtr response_socket_send_size; + SummaryPtr forward_response_socket_send_size; SummaryPtr apply_write_request_time_ms; SummaryPtr apply_read_request_time_ms; SummaryPtr read_latency;