diff --git a/src/Common/IO/ReadBufferFromString.h b/src/Common/IO/ReadBufferFromString.h index abb4445e7e..c56d1e76c1 100644 --- a/src/Common/IO/ReadBufferFromString.h +++ b/src/Common/IO/ReadBufferFromString.h @@ -16,4 +16,13 @@ class ReadBufferFromString : public ReadBufferFromMemory ReadBufferFromString(const S & s) : ReadBufferFromMemory(s.data(), s.size()) {} }; +class ReadBufferFromOwnString : public String, public ReadBufferFromString +{ +public: + template + explicit ReadBufferFromOwnString(S && s_) : String(std::forward(s_)), ReadBufferFromString(*this) + { + } +}; + } diff --git a/src/Service/ConnCommon.h b/src/Service/ConnCommon.h index 45a1348aa5..fe6d54ac14 100644 --- a/src/Service/ConnCommon.h +++ b/src/Service/ConnCommon.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -56,10 +57,12 @@ struct ConnectRequest struct SocketInterruptablePollWrapper; using SocketInterruptablePollWrapperPtr = std::unique_ptr; -using ThreadSafeResponseQueue = ThreadSafeQueue>; - +using ThreadSafeResponseQueue = ThreadSafeQueue; using ThreadSafeResponseQueuePtr = std::unique_ptr; +using ThreadSafeForwardResponseQueue = ThreadSafeQueue; +using ThreadSafeForwardResponseQueuePtr = std::unique_ptr; + struct LastOp; using LastOpMultiVersion = MultiVersion; using LastOpPtr = LastOpMultiVersion::Version; diff --git a/src/Service/ConnectionHandler.cpp b/src/Service/ConnectionHandler.cpp index 3fb4e2f1f4..eabd3a0aa5 100644 --- a/src/Service/ConnectionHandler.cpp +++ b/src/Service/ConnectionHandler.cpp @@ -248,6 +248,7 @@ void ConnectionHandler::onSocketReadable(const Notification &) } } + void ConnectionHandler::onSocketWritable(const Notification &) { LOG_TRACE(log, "Peer {}#{} is writable", peer, toHexString(session_id.load())); @@ -255,14 +256,15 @@ void ConnectionHandler::onSocketWritable(const Notification &) auto remove_event_handler_if_needed = [this] { /// Double check to avoid dead lock - if (responses->empty() && send_buf.used() == 0) + if (responses->empty() && send_buf.isEmpty() && !out_buffer) { std::lock_guard lock(send_response_mutex); { - /// If all sent unregister writable event. - if (responses->empty() && send_buf.used() == 0) + /// If all sent, unregister writable event. + if (responses->empty() && send_buf.isEmpty() && !out_buffer) { LOG_TRACE(log, "Remove socket writable event handler for peer {}", peer); + socket_writable_event_registered = false; reactor.removeEventHandler( sock, Observer(*this, &ConnectionHandler::onSocketWritable)); } @@ -270,78 +272,68 @@ void ConnectionHandler::onSocketWritable(const Notification &) } }; - try + auto copy_buffer_to_send = [this] { - if (responses->empty() && send_buf.used() == 0) + auto used = send_buf.used(); + if (used + out_buffer->available() <= SENT_BUFFER_SIZE) { - remove_event_handler_if_needed(); - LOG_DEBUG(log, "Peer {} is writable, but there is nothing to send, will remove event handler.", peer); - return; + send_buf.write(out_buffer->position(), out_buffer->available()); + out_buffer.reset(); } + else + { + send_buf.write(out_buffer->position(), SENT_BUFFER_SIZE - used); + out_buffer->seek(SENT_BUFFER_SIZE - used, SEEK_CUR); + } + }; - /// TODO use zero copy buffer - size_t size_to_sent = 0; - - /// 1. accumulate data into tmp_buf - responses->forEach( - [&size_to_sent, this](const auto & resp) -> bool - { - if (resp == is_close) - return false; + try + { + /// If the buffer was not completely sent last time, continue sending. + if (out_buffer) + copy_buffer_to_send(); - if (size_to_sent + resp->used() < SENT_BUFFER_SIZE) - { - /// add whole resp to send_buf - send_buf.write(resp->begin(), resp->used()); - size_to_sent += resp->used(); - } - else if (size_to_sent + resp->used() == SENT_BUFFER_SIZE) - { - /// add whole resp to send_buf - send_buf.write(resp->begin(), resp->used()); - size_to_sent += resp->used(); - } - else - { - /// add part of resp to send_buf - send_buf.write(resp->begin(), SENT_BUFFER_SIZE - size_to_sent); - } - return size_to_sent < SENT_BUFFER_SIZE; - }); + while (!responses->empty() && send_buf.available()) + { + Coordination::ZooKeeperResponsePtr response; - /// 2. send data - size_t sent = sock.sendBytes(send_buf); + if (!responses->tryPop(response)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug."); - /// 3. remove sent responses + if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) + { + LOG_DEBUG(log, "Received close event for session_id {}, internal_id {}", toHexString(session_id.load()), toHexString(internal_id.load())); + destroyMe(); + return; + } - ptr resp; - while (responses->peek(resp) && sent > 0) - { - if (sent >= resp->used()) + if (response->getOpNum() == OpNum::NewSession || response->getOpNum() == OpNum::UpdateSession) { - sent -= resp->used(); - responses->remove(); - /// package sent - packageSent(); - LOG_TRACE(log, "Sent response to {}#{}", peer, toHexString(session_id.load())); + if (!sendHandshake(response)) + { + LOG_ERROR(log, "Failed to establish session, close connection."); + sock.setBlocking(true); + sock.sendBytes(send_buf); + sock.sendBytes(out_buffer->position(), out_buffer->available()); + + destroyMe(); + return; + } + copy_buffer_to_send(); } else { - resp->drain(sent); - /// move data to begin - resp->begin(); - sent = 0; + WriteBufferFromOwnString buf; + response->writeNoCopy(buf); + out_buffer = std::make_shared(std::move(buf.str())); + copy_buffer_to_send(); } + packageSent(); } - if (responses->peek(resp) && resp == is_close) - { - LOG_DEBUG(log, "Received close event for session_id {}, internal_id {}", toHexString(session_id.load()), toHexString(internal_id.load())); - destroyMe(); - return; - } + size_t sent = sock.sendBytes(send_buf); + Metrics::getMetrics().response_socket_send_size->add(sent); - /// If all sent remove writable event. remove_event_handler_if_needed(); } catch (...) @@ -495,6 +487,47 @@ Coordination::OpNum ConnectionHandler::receiveHandshake(int32_t handshake_req_le return opnum; } +bool ConnectionHandler::sendHandshake(const Coordination::ZooKeeperResponsePtr & response) +{ + bool success; + uint64_t sid; + WriteBufferFromOwnString buf; + + if (const auto * new_session_resp = dynamic_cast(response.get())) + { + success = new_session_resp->success; + sid = new_session_resp->session_id; + } + else if (const auto * update_session_resp = dynamic_cast(response.get())) + { + success = update_session_resp->success; + sid = update_session_resp->session_id; + } + else + { + throw Exception(ErrorCodes::LOGICAL_ERROR, "Bad session response {}", response->toString()); + } + + Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, buf); + if (success) + Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, buf); + else + Coordination::write(42, buf); + + /// Session timeout -1 represent session expired in Zookeeper + int32_t negotiated_session_timeout + = !success && response->error == Coordination::Error::ZSESSIONEXPIRED ? -1 : session_timeout.totalMilliseconds(); + Coordination::write(negotiated_session_timeout, buf); + + Coordination::write(sid, buf); + std::array passwd{}; + Coordination::write(passwd, buf); + + out_buffer = std::make_shared(std::move(buf.str())); + + return success; +} + bool ConnectionHandler::isHandShake(Int32 & handshake_length) { @@ -586,54 +619,32 @@ void ConnectionHandler::sendSessionResponseToClient(const Coordination::ZooKeepe throw Exception(ErrorCodes::LOGICAL_ERROR, "Bad session response {}", response->toString()); } - WriteBufferFromFiFoBuffer buf; - Coordination::write(Coordination::SERVER_HANDSHAKE_LENGTH, buf); + /// Register callback if (success) - Coordination::write(Coordination::ZOOKEEPER_PROTOCOL_VERSION, buf); - else - Coordination::write(42, buf); + { + session_id = sid; + handshake_done = true; - /// Session timeout -1 represent session expired in Zookeeper - int32_t negotiated_session_timeout - = !success && response->error == Coordination::Error::ZSESSIONEXPIRED ? -1 : session_timeout.totalMilliseconds(); - Coordination::write(negotiated_session_timeout, buf); + keeper_dispatcher->unRegisterSessionResponseCallbackWithoutLock(id); + auto response_callback = [this](const Coordination::ZooKeeperResponsePtr & response_) { pushUserResponseToSendingQueue(response_); }; - Coordination::write(sid, buf); - std::array passwd{}; - Coordination::write(passwd, buf); + bool is_reconnected = response->getOpNum() == Coordination::OpNum::UpdateSession; + keeper_dispatcher->registerUserResponseCallBackWithoutLock(sid, response_callback, is_reconnected); + } // Send response to client { std::lock_guard lock(send_response_mutex); - responses->push(buf.getBuffer()); - reactor.addEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketWritable)); - } - reactor.wakeUp(); - - if (!success) - { - LOG_ERROR(log, "Failed to establish session, close connection."); + responses->push(response); - // Send empty FIFOBuffer to close connection + /// We should register write events. + if (!socket_writable_event_registered) { - std::lock_guard lock(send_response_mutex); - responses->push(ptr()); + socket_writable_event_registered = true; reactor.addEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketWritable)); + /// We must wake up getWorkerReactor to interrupt it's sleeping. + reactor.wakeUp(); } - reactor.wakeUp(); - } - else - { - session_id = sid; - handshake_done = true; - - /// 2. Register callback - - keeper_dispatcher->unRegisterSessionResponseCallbackWithoutLock(id); - auto response_callback = [this](const Coordination::ZooKeeperResponsePtr & response_) { pushUserResponseToSendingQueue(response_); }; - - bool is_reconnected = response->getOpNum() == Coordination::OpNum::UpdateSession; - keeper_dispatcher->registerUserResponseCallBackWithoutLock(sid, response_callback, is_reconnected); } } @@ -645,25 +656,17 @@ void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKe /// Lock to avoid data condition which will lead response leak { std::lock_guard lock(send_response_mutex); - /// We do not need send anything for close request to client. - if (response->xid != Coordination::WATCH_XID && response->getOpNum() == Coordination::OpNum::Close) - { - responses->push(ptr()); - } - else + responses->push(response); + + /// We should register write events. + if (!socket_writable_event_registered) { - WriteBufferFromFiFoBuffer buf; - response->write(buf); - /// TODO handle push timeout - responses->push(buf.getBuffer()); + socket_writable_event_registered = true; + reactor.addEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketWritable)); + /// We must wake up getWorkerReactor to interrupt it's sleeping. + reactor.wakeUp(); } - - /// Trigger socket writable event - reactor.addEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketWritable)); } - - /// We must wake up getWorkerReactor to interrupt it's sleeping. - reactor.wakeUp(); } void ConnectionHandler::packageSent() diff --git a/src/Service/ConnectionHandler.h b/src/Service/ConnectionHandler.h index 7b92d74a42..ccbbed4561 100644 --- a/src/Service/ConnectionHandler.h +++ b/src/Service/ConnectionHandler.h @@ -70,6 +70,7 @@ class ConnectionHandler private: Coordination::OpNum receiveHandshake(int32_t handshake_length); + bool sendHandshake(const Coordination::ZooKeeperResponsePtr & response); static bool isHandShake(Int32 & handshake_length); void tryExecuteFourLetterWordCmd(int32_t four_letter_cmd); @@ -91,10 +92,14 @@ class ConnectionHandler /// destroy connection void destroyMe(); - static constexpr size_t SENT_BUFFER_SIZE = 1024; + // Todo Add configuration sent_buffer_size + static constexpr size_t SENT_BUFFER_SIZE = 16384; FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE); - std::shared_ptr is_close = nullptr; + /// Storing the result of the response serialization temporarily, + /// We cannot directly serialize it onto send_buf, + /// because `send_buf` maybe too small to hold a large size response. + std::shared_ptr out_buffer; Logger * log; @@ -140,6 +145,7 @@ class ConnectionHandler ConnectionStats conn_stats; mutable std::mutex send_response_mutex; + bool socket_writable_event_registered = false; }; } diff --git a/src/Service/ForwardConnectionHandler.cpp b/src/Service/ForwardConnectionHandler.cpp index 30fa9b5c2c..cb6f651d2f 100644 --- a/src/Service/ForwardConnectionHandler.cpp +++ b/src/Service/ForwardConnectionHandler.cpp @@ -10,6 +10,10 @@ namespace RK { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} ForwardConnectionHandler::ForwardConnectionHandler(Context & global_context_, StreamSocket & socket_, SocketReactor & reactor_) : log(&Logger::get("ForwardConnectionHandler")) @@ -17,8 +21,7 @@ ForwardConnectionHandler::ForwardConnectionHandler(Context & global_context_, St , reactor(reactor_) , global_context(global_context_) , keeper_dispatcher(global_context.getDispatcher()) - , responses(std::make_unique()) - , need_destroy(false) + , responses(std::make_unique()) { LOG_INFO(log, "New forward connection from {}", sock.peerAddress().toString()); @@ -263,14 +266,15 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &) auto remove_event_handler_if_needed = [this] { /// Double check to avoid dead lock - if (responses->empty() && send_buf.used() == 0) + if (responses->empty() && send_buf.isEmpty() && !out_buffer) { std::lock_guard lock(send_response_mutex); { /// If all sent unregister writable event. - if (responses->empty() && send_buf.used() == 0) + if (responses->empty() && send_buf.isEmpty() && !out_buffer) { - LOG_TRACE(log, "Remove socket writable event handler"); + LOG_TRACE(log, "Remove forwarder socket writable event handler for server {} client {}", server_id, client_id); + socket_writable_event_registered = false; reactor.removeEventHandler( sock, Observer( @@ -280,70 +284,51 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &) } }; - try + auto copy_buffer_to_send = [this] { - if (need_destroy) + auto used = send_buf.used(); + if (used + out_buffer->available() <= SENT_BUFFER_SIZE) { - LOG_WARNING(log, "The connection for server {} client {} is stale, will close it", server_id, client_id); - delete this; - return; + send_buf.write(out_buffer->position(), out_buffer->available()); + out_buffer.reset(); } - - if (responses->empty() && send_buf.used() == 0) + else { - remove_event_handler_if_needed(); - return; + send_buf.write(out_buffer->position(), SENT_BUFFER_SIZE - used); + out_buffer->seek(SENT_BUFFER_SIZE - used, SEEK_CUR); } + }; - /// TODO use zero copy buffer - size_t size_to_sent = 0; - - /// 1. accumulate data into tmp_buf - responses->forEach( - [&size_to_sent, this](const auto & resp) -> bool - { - if (size_to_sent + resp->used() < SENT_BUFFER_SIZE) - { - /// add whole resp to send_buf - send_buf.write(resp->begin(), resp->used()); - size_to_sent += resp->used(); - } - else if (size_to_sent + resp->used() == SENT_BUFFER_SIZE) - { - /// add whole resp to send_buf - send_buf.write(resp->begin(), resp->used()); - size_to_sent += resp->used(); - } - else - { - /// add part of resp to send_buf - send_buf.write(resp->begin(), SENT_BUFFER_SIZE - size_to_sent); - } - return size_to_sent < SENT_BUFFER_SIZE; - }); + try + { + /// If the buffer was not completely sent last time, continue sending. + if (out_buffer) + copy_buffer_to_send(); - /// 2. send data - size_t sent = sock.sendBytes(send_buf); + while (!responses->empty() && send_buf.available()) + { + ForwardResponsePtr response; - /// 3. remove sent responses + if (!responses->tryPop(response)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "We must have ready response, but queue is empty. It's a bug."); - ptr resp; - while (responses->peek(resp) && sent > 0) - { - if (sent >= resp->used()) + /// The connection is stale and need destroyed, receive from keeper_dispatcher + if (response->forwardType() == ForwardType::Destroy) { - sent -= resp->used(); - responses->remove(); - } - else - { - resp->drain(sent); - /// move data to begin - resp->begin(); - sent = 0; + LOG_WARNING(log, "The connection for server {} client {} is stale, will close it", server_id, client_id); + destroyMe(); + return; } + + WriteBufferFromOwnString buf; + response->write(buf); + out_buffer = std::make_shared(std::move(buf.str())); + copy_buffer_to_send(); } + size_t sent = sock.sendBytes(send_buf); + Metrics::getMetrics().forward_response_socket_send_size->add(sent); + remove_event_handler_if_needed(); } catch (...) @@ -368,22 +353,20 @@ void ForwardConnectionHandler::sendResponse(ForwardResponsePtr response) { LOG_TRACE(log, "Send response {}", response->toString()); - if (response->forwardType() == ForwardType::Destroy) - { - need_destroy = true; - } - - WriteBufferFromFiFoBuffer buf; - response->write(buf); - { /// Lock to avoid data condition which will lead response leak std::lock_guard lock(send_response_mutex); /// TODO handle timeout - responses->push(buf.getBuffer()); - /// Trigger socket writable event - reactor.addEventHandler( - sock, Observer(*this, &ForwardConnectionHandler::onSocketWritable)); + responses->push(response); + + /// We should register write events. + if (!socket_writable_event_registered) + { + socket_writable_event_registered = true; + reactor.addEventHandler(sock, Observer(*this, &ForwardConnectionHandler::onSocketWritable)); + /// We must wake up getWorkerReactor to interrupt it's sleeping. + reactor.wakeUp(); + } } /// We must wake up getWorkerReactor to interrupt it's sleeping. diff --git a/src/Service/ForwardConnectionHandler.h b/src/Service/ForwardConnectionHandler.h index 8a714fa9b6..575abd6736 100644 --- a/src/Service/ForwardConnectionHandler.h +++ b/src/Service/ForwardConnectionHandler.h @@ -40,9 +40,14 @@ class ForwardConnectionHandler /// destroy connection void destroyMe(); - static constexpr size_t SENT_BUFFER_SIZE = 1024; + static constexpr size_t SENT_BUFFER_SIZE = 16384; FIFOBuffer send_buf = FIFOBuffer(SENT_BUFFER_SIZE); + /// Storing the result of the response serialization temporarily, + /// We cannot directly serialize it onto send_buf, + /// because `send_buf` maybe too small to hold a large size response. + std::shared_ptr out_buffer; + Logger * log; StreamSocket sock; @@ -65,9 +70,10 @@ class ForwardConnectionHandler Context & global_context; std::shared_ptr keeper_dispatcher; - ThreadSafeResponseQueuePtr responses; + ThreadSafeForwardResponseQueuePtr responses; std::mutex send_response_mutex; + bool socket_writable_event_registered = false; /// server id in client endpoint which actually is my_id int32_t server_id{-1}; @@ -78,9 +84,6 @@ class ForwardConnectionHandler void processHandshake(); void processUserOrSessionRequest(ForwardRequestPtr request); void processSyncSessionsRequest(ForwardRequestPtr request); - - /// The connection is stale and need destroyed - std::atomic need_destroy; }; } diff --git a/src/Service/Metrics.cpp b/src/Service/Metrics.cpp index 41a2e55cc4..c3fbb8e449 100644 --- a/src/Service/Metrics.cpp +++ b/src/Service/Metrics.cpp @@ -120,6 +120,8 @@ Metrics::Metrics() { push_request_queue_time_ms = getSummary("push_request_queue_time_ms", SummaryLevel::ADVANCED); log_replication_batch_size = getSummary("log_replication_batch_size", SummaryLevel::BASIC); + response_socket_send_size = getSummary("response_socket_send_size", SummaryLevel::BASIC); + forward_response_socket_send_size = getSummary("forward_response_socket_send_size", SummaryLevel::BASIC); apply_write_request_time_ms = getSummary("apply_write_request_time_ms", SummaryLevel::ADVANCED); apply_read_request_time_ms = getSummary("apply_read_request_time_ms", SummaryLevel::ADVANCED); read_latency = getSummary("readlatency", SummaryLevel::ADVANCED); diff --git a/src/Service/Metrics.h b/src/Service/Metrics.h index 005fc0322d..65f15f1b78 100644 --- a/src/Service/Metrics.h +++ b/src/Service/Metrics.h @@ -196,6 +196,8 @@ class Metrics SummaryPtr push_request_queue_time_ms; SummaryPtr log_replication_batch_size; + SummaryPtr response_socket_send_size; + SummaryPtr forward_response_socket_send_size; SummaryPtr apply_write_request_time_ms; SummaryPtr apply_read_request_time_ms; SummaryPtr read_latency; diff --git a/src/ZooKeeper/ZooKeeperCommon.cpp b/src/ZooKeeper/ZooKeeperCommon.cpp index 9401bef55f..be347ce37d 100644 --- a/src/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/ZooKeeper/ZooKeeperCommon.cpp @@ -1,7 +1,6 @@ #include "ZooKeeperCommon.h" #include #include "common/logger_useful.h" -#include #include "ZooKeeperIO.h" @@ -23,6 +22,23 @@ void ZooKeeperResponse::write(WriteBuffer & out) const out.next(); } +void ZooKeeperResponse::writeNoCopy(WriteBufferFromOwnString & out) const +{ + auto pre_size = out.offset(); + /// Prepended length + Coordination::write(static_cast(0), out); + Coordination::write(xid, out); + Coordination::write(zxid, out); + Coordination::write(error, out); + if (error == Error::ZOK) + writeImpl(out); + String & result = out.str(); + + // write data length at begin of string + int32_t len = __builtin_bswap32(static_cast(result.size() - pre_size - sizeof(int32_t))); + memcpy(result.data() + pre_size, reinterpret_cast(&len), sizeof(int32_t)); +} + void ZooKeeperRequest::write(WriteBuffer & out) const { /// Excessive copy to calculate length. diff --git a/src/ZooKeeper/ZooKeeperCommon.h b/src/ZooKeeper/ZooKeeperCommon.h index 4562b68c9a..2c2309d9e9 100644 --- a/src/ZooKeeper/ZooKeeperCommon.h +++ b/src/ZooKeeper/ZooKeeperCommon.h @@ -18,6 +18,8 @@ #include #include #include +#include +#include namespace Coordination @@ -35,6 +37,9 @@ struct ZooKeeperResponse : virtual Response virtual void readImpl(ReadBuffer &) = 0; virtual void writeImpl(WriteBuffer &) const = 0; virtual void write(WriteBuffer & out) const; + + /// Prepended length to avoid copy + virtual void writeNoCopy(WriteBufferFromOwnString & out) const; virtual OpNum getOpNum() const = 0; virtual bool operator==(const ZooKeeperResponse & response) const