From db55b2c55b9ca32b5c56445a908371f55533a450 Mon Sep 17 00:00:00 2001 From: lzydmxy <13126752315@163.com> Date: Tue, 4 Jun 2024 16:15:48 +0800 Subject: [PATCH] Fix some code style --- src/Service/ConnectionHandler.cpp | 14 ++++++++------ src/Service/ConnectionHandler.h | 2 +- src/Service/ForwardConnectionHandler.cpp | 6 +++--- src/Service/ForwardConnectionHandler.h | 2 +- src/ZooKeeper/ZooKeeperCommon.cpp | 23 +++++++++++------------ src/ZooKeeper/ZooKeeperCommon.h | 6 ++++-- 6 files changed, 28 insertions(+), 25 deletions(-) diff --git a/src/Service/ConnectionHandler.cpp b/src/Service/ConnectionHandler.cpp index d976bf2dde..eabd3a0aa5 100644 --- a/src/Service/ConnectionHandler.cpp +++ b/src/Service/ConnectionHandler.cpp @@ -264,7 +264,7 @@ void ConnectionHandler::onSocketWritable(const Notification &) if (responses->empty() && send_buf.isEmpty() && !out_buffer) { LOG_TRACE(log, "Remove socket writable event handler for peer {}", peer); - on_socket_writable = false; + socket_writable_event_registered = false; reactor.removeEventHandler( sock, Observer(*this, &ConnectionHandler::onSocketWritable)); } @@ -323,7 +323,9 @@ void ConnectionHandler::onSocketWritable(const Notification &) } else { - out_buffer = response->getBuffer(); + WriteBufferFromOwnString buf; + response->writeNoCopy(buf); + out_buffer = std::make_shared(std::move(buf.str())); copy_buffer_to_send(); } packageSent(); @@ -636,9 +638,9 @@ void ConnectionHandler::sendSessionResponseToClient(const Coordination::ZooKeepe responses->push(response); /// We should register write events. - if (!on_socket_writable) + if (!socket_writable_event_registered) { - on_socket_writable = true; + socket_writable_event_registered = true; reactor.addEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketWritable)); /// We must wake up getWorkerReactor to interrupt it's sleeping. reactor.wakeUp(); @@ -657,9 +659,9 @@ void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKe responses->push(response); /// We should register write events. - if (!on_socket_writable) + if (!socket_writable_event_registered) { - on_socket_writable = true; + socket_writable_event_registered = true; reactor.addEventHandler(sock, Observer(*this, &ConnectionHandler::onSocketWritable)); /// We must wake up getWorkerReactor to interrupt it's sleeping. reactor.wakeUp(); diff --git a/src/Service/ConnectionHandler.h b/src/Service/ConnectionHandler.h index 09fbb219e1..ccbbed4561 100644 --- a/src/Service/ConnectionHandler.h +++ b/src/Service/ConnectionHandler.h @@ -145,7 +145,7 @@ class ConnectionHandler ConnectionStats conn_stats; mutable std::mutex send_response_mutex; - bool on_socket_writable = false; + bool socket_writable_event_registered = false; }; } diff --git a/src/Service/ForwardConnectionHandler.cpp b/src/Service/ForwardConnectionHandler.cpp index 00989be375..cb6f651d2f 100644 --- a/src/Service/ForwardConnectionHandler.cpp +++ b/src/Service/ForwardConnectionHandler.cpp @@ -274,7 +274,7 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &) if (responses->empty() && send_buf.isEmpty() && !out_buffer) { LOG_TRACE(log, "Remove forwarder socket writable event handler for server {} client {}", server_id, client_id); - on_socket_writable = false; + socket_writable_event_registered = false; reactor.removeEventHandler( sock, Observer( @@ -360,9 +360,9 @@ void ForwardConnectionHandler::sendResponse(ForwardResponsePtr response) responses->push(response); /// We should register write events. - if (!on_socket_writable) + if (!socket_writable_event_registered) { - on_socket_writable = true; + socket_writable_event_registered = true; reactor.addEventHandler(sock, Observer(*this, &ForwardConnectionHandler::onSocketWritable)); /// We must wake up getWorkerReactor to interrupt it's sleeping. reactor.wakeUp(); diff --git a/src/Service/ForwardConnectionHandler.h b/src/Service/ForwardConnectionHandler.h index da5b6d9a5e..575abd6736 100644 --- a/src/Service/ForwardConnectionHandler.h +++ b/src/Service/ForwardConnectionHandler.h @@ -73,7 +73,7 @@ class ForwardConnectionHandler ThreadSafeForwardResponseQueuePtr responses; std::mutex send_response_mutex; - bool on_socket_writable = false; + bool socket_writable_event_registered = false; /// server id in client endpoint which actually is my_id int32_t server_id{-1}; diff --git a/src/ZooKeeper/ZooKeeperCommon.cpp b/src/ZooKeeper/ZooKeeperCommon.cpp index a4a3455b01..be347ce37d 100644 --- a/src/ZooKeeper/ZooKeeperCommon.cpp +++ b/src/ZooKeeper/ZooKeeperCommon.cpp @@ -22,22 +22,21 @@ void ZooKeeperResponse::write(WriteBuffer & out) const out.next(); } -std::shared_ptr ZooKeeperResponse::getBuffer() const +void ZooKeeperResponse::writeNoCopy(WriteBufferFromOwnString & out) const { - WriteBufferFromOwnString buf; - Coordination::write(int32_t(0), buf); - Coordination::write(xid, buf); - Coordination::write(zxid, buf); - Coordination::write(error, buf); + auto pre_size = out.offset(); + /// Prepended length + Coordination::write(static_cast(0), out); + Coordination::write(xid, out); + Coordination::write(zxid, out); + Coordination::write(error, out); if (error == Error::ZOK) - writeImpl(buf); - String & result = buf.str(); + writeImpl(out); + String & result = out.str(); // write data length at begin of string - int32_t size = __builtin_bswap32(result.size() - sizeof(int32_t)); - memcpy(result.data(), reinterpret_cast(&size), sizeof(int32_t)); - - return std::make_shared(std::move(result)); + int32_t len = __builtin_bswap32(static_cast(result.size() - pre_size - sizeof(int32_t))); + memcpy(result.data() + pre_size, reinterpret_cast(&len), sizeof(int32_t)); } void ZooKeeperRequest::write(WriteBuffer & out) const diff --git a/src/ZooKeeper/ZooKeeperCommon.h b/src/ZooKeeper/ZooKeeperCommon.h index 9e696bde0c..2c2309d9e9 100644 --- a/src/ZooKeeper/ZooKeeperCommon.h +++ b/src/ZooKeeper/ZooKeeperCommon.h @@ -19,7 +19,7 @@ #include #include #include -#include "Common/IO/ReadBufferFromString.h" +#include namespace Coordination @@ -37,7 +37,9 @@ struct ZooKeeperResponse : virtual Response virtual void readImpl(ReadBuffer &) = 0; virtual void writeImpl(WriteBuffer &) const = 0; virtual void write(WriteBuffer & out) const; - virtual std::shared_ptr getBuffer() const; + + /// Prepended length to avoid copy + virtual void writeNoCopy(WriteBufferFromOwnString & out) const; virtual OpNum getOpNum() const = 0; virtual bool operator==(const ZooKeeperResponse & response) const