Skip to content

Commit

Permalink
Use WriteBufferFromOwnString to avoid str copy
Browse files Browse the repository at this point in the history
  • Loading branch information
lzydmxy committed Jun 3, 2024
1 parent f60227b commit 3326715
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 27 deletions.
9 changes: 9 additions & 0 deletions src/Common/IO/ReadBufferFromString.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,13 @@ class ReadBufferFromString : public ReadBufferFromMemory
ReadBufferFromString(const S & s) : ReadBufferFromMemory(s.data(), s.size()) {}
};

class ReadBufferFromOwnString : public String, public ReadBufferFromString
{
public:
template <typename S>
explicit ReadBufferFromOwnString(S && s_) : String(std::forward<S>(s_)), ReadBufferFromString(*this)
{
}
};

}
33 changes: 19 additions & 14 deletions src/Service/ConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ void ConnectionHandler::onSocketWritable(const Notification &)
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
LOG_TRACE(log, "Remove socket writable event handler for peer {}", peer);

on_socket_writable = false;
reactor.removeEventHandler(
sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
}
Expand All @@ -275,15 +275,15 @@ void ConnectionHandler::onSocketWritable(const Notification &)
auto copy_buffer_to_send = [this]
{
auto used = send_buf.used();
if (used + out_buffer->used() <= SENT_BUFFER_SIZE)
if (used + out_buffer->available() <= SENT_BUFFER_SIZE)
{
send_buf.write(out_buffer->begin(), out_buffer->used());
send_buf.write(out_buffer->position(), out_buffer->available());
out_buffer.reset();
}
else
{
send_buf.write(out_buffer->begin(), SENT_BUFFER_SIZE - used);
out_buffer->drain(SENT_BUFFER_SIZE - used);
send_buf.write(out_buffer->position(), SENT_BUFFER_SIZE - used);
out_buffer->seek(SENT_BUFFER_SIZE - used, SEEK_CUR);
}
};

Expand Down Expand Up @@ -312,16 +312,18 @@ void ConnectionHandler::onSocketWritable(const Notification &)
if (!sendHandshake(response))
{
LOG_ERROR(log, "Failed to establish session, close connection.");
sock.setBlocking(true);
sock.sendBytes(send_buf);
sock.sendBytes(out_buffer->position(), out_buffer->available());

destroyMe();
return;
}
copy_buffer_to_send();
}
else
{
WriteBufferFromFiFoBuffer buf;
response->write(buf);
out_buffer = buf.getBuffer();
out_buffer = response->getBuffer();
copy_buffer_to_send();
}
packageSent();
Expand Down Expand Up @@ -487,7 +489,7 @@ bool ConnectionHandler::sendHandshake(const Coordination::ZooKeeperResponsePtr &
{
bool success;
uint64_t sid;
WriteBufferFromFiFoBuffer buf;
WriteBufferFromOwnString buf;

if (const auto * new_session_resp = dynamic_cast<const ZooKeeperNewSessionResponse *>(response.get()))
{
Expand Down Expand Up @@ -519,7 +521,7 @@ bool ConnectionHandler::sendHandshake(const Coordination::ZooKeeperResponsePtr &
std::array<char, Coordination::PASSWORD_LENGTH> passwd{};
Coordination::write(passwd, buf);

out_buffer = buf.getBuffer();
out_buffer = std::make_shared<ReadBufferFromOwnString>(std::move(buf.str()));

return success;
}
Expand Down Expand Up @@ -633,9 +635,12 @@ void ConnectionHandler::sendSessionResponseToClient(const Coordination::ZooKeepe
std::lock_guard lock(send_response_mutex);
responses->push(response);

if (responses->size() == 1)
/// We should register write events.
if (!on_socket_writable)
{
on_socket_writable = true;
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
}
}
Expand All @@ -651,10 +656,10 @@ void ConnectionHandler::pushUserResponseToSendingQueue(const Coordination::ZooKe
std::lock_guard lock(send_response_mutex);
responses->push(response);

/// If
if (responses->size() == 1)
/// We should register write events.
if (!on_socket_writable)
{
/// We must wake up getWorkerReactor to interrupt it's sleeping.
on_socket_writable = true;
reactor.addEventHandler(sock, Observer<ConnectionHandler, WritableNotification>(*this, &ConnectionHandler::onSocketWritable));
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
Expand Down
3 changes: 2 additions & 1 deletion src/Service/ConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class ConnectionHandler
/// Storing the result of the response serialization temporarily,
/// We cannot directly serialize it onto send_buf,
/// because `send_buf` maybe too small to hold a large size response.
std::shared_ptr<Poco::BasicFIFOBuffer<char>> out_buffer;
std::shared_ptr<ReadBufferFromOwnString> out_buffer;

Logger * log;

Expand Down Expand Up @@ -145,6 +145,7 @@ class ConnectionHandler
ConnectionStats conn_stats;

mutable std::mutex send_response_mutex;
bool on_socket_writable = false;
};

}
30 changes: 20 additions & 10 deletions src/Service/ForwardConnectionHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &)
if (responses->empty() && send_buf.isEmpty() && !out_buffer)
{
LOG_TRACE(log, "Remove forwarder socket writable event handler for server {} client {}", server_id, client_id);

on_socket_writable = false;
reactor.removeEventHandler(
sock,
Observer<ForwardConnectionHandler, WritableNotification>(
Expand All @@ -287,20 +287,24 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &)
auto copy_buffer_to_send = [this]
{
auto used = send_buf.used();
if (used + out_buffer->used() <= SENT_BUFFER_SIZE)
if (used + out_buffer->available() <= SENT_BUFFER_SIZE)
{
send_buf.write(out_buffer->begin(), out_buffer->used());
send_buf.write(out_buffer->position(), out_buffer->available());
out_buffer.reset();
}
else
{
send_buf.write(out_buffer->begin(), SENT_BUFFER_SIZE - used);
out_buffer->drain(SENT_BUFFER_SIZE - used);
send_buf.write(out_buffer->position(), SENT_BUFFER_SIZE - used);
out_buffer->seek(SENT_BUFFER_SIZE - used, SEEK_CUR);
}
};

try
{
/// If the buffer was not completely sent last time, continue sending.
if (out_buffer)
copy_buffer_to_send();

while (!responses->empty() && send_buf.available())
{
ForwardResponsePtr response;
Expand All @@ -316,9 +320,9 @@ void ForwardConnectionHandler::onSocketWritable(const Notification &)
return;
}

WriteBufferFromFiFoBuffer buf;
WriteBufferFromOwnString buf;
response->write(buf);
out_buffer = buf.getBuffer();
out_buffer = std::make_shared<ReadBufferFromOwnString>(std::move(buf.str()));
copy_buffer_to_send();
}

Expand Down Expand Up @@ -354,9 +358,15 @@ void ForwardConnectionHandler::sendResponse(ForwardResponsePtr response)
std::lock_guard lock(send_response_mutex);
/// TODO handle timeout
responses->push(response);
/// Trigger socket writable event
reactor.addEventHandler(
sock, Observer<ForwardConnectionHandler, WritableNotification>(*this, &ForwardConnectionHandler::onSocketWritable));

/// We should register write events.
if (!on_socket_writable)
{
on_socket_writable = true;
reactor.addEventHandler(sock, Observer<ForwardConnectionHandler, WritableNotification>(*this, &ForwardConnectionHandler::onSocketWritable));
/// We must wake up getWorkerReactor to interrupt it's sleeping.
reactor.wakeUp();
}
}

/// We must wake up getWorkerReactor to interrupt it's sleeping.
Expand Down
3 changes: 2 additions & 1 deletion src/Service/ForwardConnectionHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ForwardConnectionHandler
/// Storing the result of the response serialization temporarily,
/// We cannot directly serialize it onto send_buf,
/// because `send_buf` maybe too small to hold a large size response.
std::shared_ptr<Poco::BasicFIFOBuffer<char>> out_buffer;
std::shared_ptr<ReadBufferFromOwnString> out_buffer;

Logger * log;

Expand All @@ -73,6 +73,7 @@ class ForwardConnectionHandler
ThreadSafeForwardResponseQueuePtr responses;

std::mutex send_response_mutex;
bool on_socket_writable = false;

/// server id in client endpoint which actually is my_id
int32_t server_id{-1};
Expand Down
19 changes: 18 additions & 1 deletion src/ZooKeeper/ZooKeeperCommon.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "ZooKeeperCommon.h"
#include <array>
#include "common/logger_useful.h"
#include <Common/IO//Operators.h>
#include "ZooKeeperIO.h"


Expand All @@ -23,6 +22,24 @@ void ZooKeeperResponse::write(WriteBuffer & out) const
out.next();
}

std::shared_ptr<ReadBufferFromOwnString> ZooKeeperResponse::getBuffer() const
{
WriteBufferFromOwnString buf;
Coordination::write(int32_t(0), buf);
Coordination::write(xid, buf);
Coordination::write(zxid, buf);
Coordination::write(error, buf);
if (error == Error::ZOK)
writeImpl(buf);
String & result = buf.str();

// write data length at begin of string
int32_t size = __builtin_bswap32(result.size() - sizeof(int32_t));
memcpy(result.data(), reinterpret_cast<const char *>(&size), sizeof(int32_t));

return std::make_shared<ReadBufferFromOwnString>(std::move(result));
}

void ZooKeeperRequest::write(WriteBuffer & out) const
{
/// Excessive copy to calculate length.
Expand Down
3 changes: 3 additions & 0 deletions src/ZooKeeper/ZooKeeperCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include <Common/IO/WriteBuffer.h>
#include <Common/IO/WriteHelpers.h>
#include <boost/noncopyable.hpp>
#include <Common/IO//Operators.h>
#include "Common/IO/ReadBufferFromString.h"


namespace Coordination
Expand All @@ -35,6 +37,7 @@ struct ZooKeeperResponse : virtual Response
virtual void readImpl(ReadBuffer &) = 0;
virtual void writeImpl(WriteBuffer &) const = 0;
virtual void write(WriteBuffer & out) const;
virtual std::shared_ptr<ReadBufferFromOwnString> getBuffer() const;
virtual OpNum getOpNum() const = 0;

virtual bool operator==(const ZooKeeperResponse & response) const
Expand Down

0 comments on commit 3326715

Please sign in to comment.