Skip to content

Commit

Permalink
Fix some code style
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy committed Jun 4, 2024
1 parent 3326715 commit db55b2c
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 25 deletions.
14 changes: 8 additions & 6 deletions src/Service/ConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ void ConnectionHandler::onSocketWritable(const Notification &)
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
LOG_TRACE(log, "Remove socket writable event handler for peer {}", peer);
on_socket_writable = false;
socket_writable_event_registered = false;
reactor.removeEventHandler(
sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
}
Expand Down Expand Up @@ -323,7 +323,9 @@ void ConnectionHandler::onSocketWritable(const Notification &)
}
else
{
out_buffer = response->getBuffer();
WriteBufferFromOwnString buf;
response->writeNoCopy(buf);
out_buffer = std::make_shared<ReadBufferFromOwnString>(std::move(buf.str()));
copy_buffer_to_send();
}
packageSent();
Expand Down Expand Up @@ -636,9 +638,9 @@ void ConnectionHandler::sendSessionResponseToClient(const Coordination::ZooKeepe
responses->push(response);

/// We should register write events.
if (!on_socket_writable)
if (!socket_writable_event_registered)
{
on_socket_writable = true;
socket_writable_event_registered = true;
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
Expand All @@ -657,9 +659,9 @@ void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKe
responses->push(response);

/// We should register write events.
if (!on_socket_writable)
if (!socket_writable_event_registered)
{
on_socket_writable = true;
socket_writable_event_registered = true;
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
Expand Down
2 changes: 1 addition & 1 deletion src/Service/ConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class ConnectionHandler
ConnectionStats conn_stats;

mutable std::mutex send_response_mutex;
bool on_socket_writable = false;
bool socket_writable_event_registered = false;
};

}
6 changes: 3 additions & 3 deletions src/Service/ForwardConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &)
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
LOG_TRACE(log, "Remove forwarder socket writable event handler for server {} client {}", server_id, client_id);
on_socket_writable = false;
socket_writable_event_registered = false;
reactor.removeEventHandler(
sock,
Observer<ForwardConnectionHandler, WritableNotification>(
Expand Down Expand Up @@ -360,9 +360,9 @@ void ForwardConnectionHandler::sendResponse(ForwardResponsePtr response)
responses->push(response);

/// We should register write events.
if (!on_socket_writable)
if (!socket_writable_event_registered)
{
on_socket_writable = true;
socket_writable_event_registered = true;
reactor.addEventHandler(sock, Observer<ForwardConnectionHandler, WritableNotification>(*this, &ForwardConnectionHandler::onSocketWritable));
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
Expand Down
2 changes: 1 addition & 1 deletion src/Service/ForwardConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ForwardConnectionHandler
ThreadSafeForwardResponseQueuePtr responses;

std::mutex send_response_mutex;
bool on_socket_writable = false;
bool socket_writable_event_registered = false;

/// server id in client endpoint which actually is my_id
int32_t server_id{-1};
Expand Down
23 changes: 11 additions & 12 deletions src/ZooKeeper/ZooKeeperCommon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,21 @@ void ZooKeeperResponse::write(WriteBuffer & out) const
out.next();
}

std::shared_ptr<ReadBufferFromOwnString> ZooKeeperResponse::getBuffer() const
void ZooKeeperResponse::writeNoCopy(WriteBufferFromOwnString & out) const
{
WriteBufferFromOwnString buf;
Coordination::write(int32_t(0), buf);
Coordination::write(xid, buf);
Coordination::write(zxid, buf);
Coordination::write(error, buf);
auto pre_size = out.offset();
/// Prepended length
Coordination::write(static_cast<int32_t>(0), out);
Coordination::write(xid, out);
Coordination::write(zxid, out);
Coordination::write(error, out);
if (error == Error::ZOK)
writeImpl(buf);
String & result = buf.str();
writeImpl(out);
String & result = out.str();

// write data length at begin of string
int32_t size = __builtin_bswap32(result.size() - sizeof(int32_t));
memcpy(result.data(), reinterpret_cast<const char *>(&size), sizeof(int32_t));

return std::make_shared<ReadBufferFromOwnString>(std::move(result));
int32_t len = __builtin_bswap32(static_cast<int32_t>(result.size() - pre_size - sizeof(int32_t)));
memcpy(result.data() + pre_size, reinterpret_cast<const char *>(&len), sizeof(int32_t));
}

void ZooKeeperRequest::write(WriteBuffer & out) const
Expand Down
6 changes: 4 additions & 2 deletions src/ZooKeeper/ZooKeeperCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <Common/IO/WriteHelpers.h>
#include <boost/noncopyable.hpp>
#include <Common/IO//Operators.h>
#include "Common/IO/ReadBufferFromString.h"
#include <Common/IO/ReadBufferFromString.h>


namespace Coordination
Expand All @@ -37,7 +37,9 @@ struct ZooKeeperResponse : virtual Response
virtual void readImpl(ReadBuffer &) = 0;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual void write(WriteBuffer & out) const;
virtual std::shared_ptr<ReadBufferFromOwnString> getBuffer() const;

/// Prepended length to avoid copy
virtual void writeNoCopy(WriteBufferFromOwnString & out) const;
virtual OpNum getOpNum() const = 0;

virtual bool operator==(const ZooKeeperResponse & response) const
Expand Down

0 comments on commit db55b2c

Please sign in to comment.