Skip to content

Commit

Permalink
net: move message conversion to wire bytes from PushMessage to Socket…
Browse files Browse the repository at this point in the history
…SendData

This furthers transport abstraction by removing the assumption that a message
can always immediately be converted to wire bytes. This assumption does not hold
for the v2 transport proposed by BIP324, as no messages can be sent before the
handshake completes.

This is done by only keeping (complete) CSerializedNetMsg objects in vSendMsg,
rather than the resulting bytes (for header and payload) that need to be sent.
In SocketSendData, these objects are handed to the transport as permitted by it,
and sending out the bytes the transport tells us to send. This also removes the
nSendOffset member variable in CNode, as keeping track of how much has been sent
is now a responsability of the transport.

This is not a pure refactor, and has the following effects even for the current
v1 transport:

* Checksum calculation now happens in SocketSendData rather than PushMessage.
  For non-optimistic-send messages, that means this computation now happens in
  the network thread rather than the message handler thread (generally a good
  thing, as the message handler thread is more of a computational bottleneck).
* Checksum calculation now happens while holding the cs_vSend lock. This is
  technically unnecessary for the v1 transport, as messages are encoded
  independent from one another, but is untenable for the v2 transport anyway.
* Statistics updates about per-message sent bytes now happen when those bytes
  are actually handed to the OS, rather than at PushMessage time.
  • Loading branch information
sipa committed Aug 24, 2023
1 parent a1a1060 commit bb4aab9
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 58 deletions.
96 changes: 46 additions & 50 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -915,35 +915,49 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;

while (it != node.vSendMsg.end()) {
const auto& data = *it;
assert(data.size() > node.nSendOffset);
bool data_left{false}; //!< second return value (whether unsent data remains)

while (true) {
if (it != node.vSendMsg.end()) {
// If possible, move one message from the send queue to the transport. This fails when
// there is an existing message still being sent.
size_t memusage = it->GetMemoryUsage();
if (node.m_transport->SetMessageToSend(*it)) {
// Update memory usage of send buffer (as *it will be deleted).
node.m_send_memusage -= memusage;
++it;
}
}
const auto& [data, more, msg_type] = node.m_transport->GetBytesToSend();
data_left = !data.empty(); // will be overwritten on next loop if all of data gets sent
int nBytes = 0;
{
if (!data.empty()) {
LOCK(node.m_sock_mutex);
// There is no socket in case we've already disconnected, or in test cases without
// real connections. In these cases, we bail out immediately and just leave things
// in the send queue and transport.
if (!node.m_sock) {
break;
}
int flags = MSG_NOSIGNAL | MSG_DONTWAIT;
#ifdef MSG_MORE
if (it + 1 != node.vSendMsg.end()) {
// We have more to send if either the transport itself has more, or if we have more
// messages to send.
if (more || it != node.vSendMsg.end()) {
flags |= MSG_MORE;
}
#endif
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()) + node.nSendOffset, data.size() - node.nSendOffset, flags);
nBytes = node.m_sock->Send(reinterpret_cast<const char*>(data.data()), data.size(), flags);
}
if (nBytes > 0) {
node.m_last_send = GetTime<std::chrono::seconds>();
node.nSendBytes += nBytes;
node.nSendOffset += nBytes;
// Notify transport that bytes have been processed.
node.m_transport->MarkBytesSent(nBytes);
// Update statistics per message type.
node.AccountForSentBytes(msg_type, nBytes);
nSentSize += nBytes;
if (node.nSendOffset == data.size()) {
node.nSendOffset = 0;
// Update memory usage of send buffer (as *it will be deleted).
node.m_send_memusage -= sizeof(data) + memusage::DynamicUsage(data);
it++;
} else {
if ((size_t)nBytes != data.size()) {
// could not send full message; stop sending more
break;
}
Expand All @@ -956,19 +970,17 @@ std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
node.CloseSocketDisconnect();
}
}
// couldn't send anything at all
break;
}
}

node.fPauseSend = node.m_send_memusage + node.m_transport->GetSendMemoryUsage() > nSendBufferMaxSize;

if (it == node.vSendMsg.end()) {
assert(node.nSendOffset == 0);
assert(node.m_send_memusage == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
return {nSentSize, !node.vSendMsg.empty()};
return {nSentSize, data_left};
}

/** Try to find a connection to evict when the node is full.
Expand Down Expand Up @@ -1307,7 +1319,14 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)

for (CNode* pnode : nodes) {
bool select_recv = !pnode->fPauseRecv;
bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
bool select_send;
{
LOCK(pnode->cs_vSend);
// Sending is possible if either there are bytes to send right now, or if there will be
// once a potential message from vSendMsg is handed to the transport.
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
select_send = !to_send.empty() || !pnode->vSendMsg.empty();
}
if (!select_recv && !select_send) continue;

LOCK(pnode->m_sock_mutex);
Expand Down Expand Up @@ -2988,42 +3007,19 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
size_t nBytesSent = 0;
{
LOCK(pnode->cs_vSend);
const bool queue_was_empty{pnode->vSendMsg.empty()};

// Give the message to the transport, and add all bytes it wants us to send out as byte
// vectors to vSendMsg. This is temporary code that exists to support the new transport
// sending interface using the old way of queueing data. In a future commit vSendMsg will
// be replaced with a queue of CSerializedNetMsg objects to be sent instead, and this code
// will disappear.
bool queued = pnode->m_transport->SetMessageToSend(msg);
assert(queued);
// In the current transport (V1Transport), GetBytesToSend first returns a header to send,
// and then the payload data (if any), necessitating a loop.
while (true) {
const auto& [bytes, _more, msg_type] = pnode->m_transport->GetBytesToSend();
if (bytes.empty()) break;
// Update statistics per message type.
pnode->AccountForSentBytes(msg_type, bytes.size());
pnode->vSendMsg.push_back({bytes.begin(), bytes.end()});
// Update memory usage of send buffer. For now, use static + dynamic memory usage of
// byte vectors in vSendMsg as send memory. In a future commit, vSendMsg will be
// replaced with a queue of CSerializedNetMsg objects, and we'll use their memory usage
// instead.
pnode->m_send_memusage += sizeof(pnode->vSendMsg.back()) + memusage::DynamicUsage(pnode->vSendMsg.back());
// Notify transport that bytes have been processed (they're not actually sent yet,
// but pushed onto the vSendMsg queue of bytes to send).
pnode->m_transport->MarkBytesSent(bytes.size());
}
// At this point, m_transport->GetSendMemoryUsage() isn't very interesting as the
// transport's message is fully flushed (and converted to byte arrays). It's still included
// here for correctness, and will become relevant in a future commit when a queued message
// inside the transport may survive PushMessage calls.
const auto& [to_send, _more, _msg_type] = pnode->m_transport->GetBytesToSend();
const bool queue_was_empty{to_send.empty() && pnode->vSendMsg.empty()};

// Update memory usage of send buffer.
pnode->m_send_memusage += msg.GetMemoryUsage();
if (pnode->m_send_memusage + pnode->m_transport->GetSendMemoryUsage() > nSendBufferMaxSize) pnode->fPauseSend = true;
// Move message to vSendMsg queue.
pnode->vSendMsg.push_back(std::move(msg));

// If the write queue was empty before and isn't now, attempt "optimistic write":
// If there was nothing to send before, attempt "optimistic write":
// because the poll/select loop may pause for SELECT_TIMEOUT_MILLISECONDS before actually
// doing a send, try sending from the calling thread if the queue was empty before.
if (queue_was_empty && !pnode->vSendMsg.empty()) {
if (queue_was_empty) {
std::tie(nBytesSent, std::ignore) = SocketSendData(*pnode);
}
}
Expand Down
9 changes: 4 additions & 5 deletions src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -436,13 +436,12 @@ class CNode
*/
std::shared_ptr<Sock> m_sock GUARDED_BY(m_sock_mutex);

/** Total memory usage of vSendMsg (counting the vectors and their dynamic usage, but not the
* deque overhead). */
/** Sum of GetMemoryUsage of all vSendMsg entries. */
size_t m_send_memusage GUARDED_BY(cs_vSend){0};
/** Offset inside the first vSendMsg already sent */
size_t nSendOffset GUARDED_BY(cs_vSend){0};
/** Total number of bytes sent on the wire to this peer. */
uint64_t nSendBytes GUARDED_BY(cs_vSend){0};
std::deque<std::vector<unsigned char>> vSendMsg GUARDED_BY(cs_vSend);
/** Messages still to be fed to m_transport->SetMessageToSend. */
std::deque<CSerializedNetMsg> vSendMsg GUARDED_BY(cs_vSend);
Mutex cs_vSend;
Mutex m_sock_mutex;
Mutex cs_vRecv;
Expand Down
8 changes: 5 additions & 3 deletions src/test/denialofservice_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,19 @@ BOOST_AUTO_TEST_CASE(outbound_slow_chain_eviction)

{
LOCK(dummyNode1.cs_vSend);
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
dummyNode1.vSendMsg.clear();
const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
BOOST_CHECK(!to_send.empty());
}
connman.FlushSendBuffer(dummyNode1);

int64_t nStartTime = GetTime();
// Wait 21 minutes
SetMockTime(nStartTime+21*60);
BOOST_CHECK(peerman.SendMessages(&dummyNode1)); // should result in getheaders
{
LOCK(dummyNode1.cs_vSend);
BOOST_CHECK(dummyNode1.vSendMsg.size() > 0);
const auto& [to_send, _more, _msg_type] = dummyNode1.m_transport->GetBytesToSend();
BOOST_CHECK(!to_send.empty());
}
// Wait 3 more minutes
SetMockTime(nStartTime+24*60);
Expand Down
1 change: 1 addition & 0 deletions src/test/fuzz/process_messages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ FUZZ_TARGET(process_messages, .init = initialize_process_messages)

CNode& random_node = *PickValue(fuzzed_data_provider, peers);

connman.FlushSendBuffer(random_node);
(void)connman.ReceiveMsgFrom(random_node, std::move(net_msg));
random_node.fPauseSend = false;

Expand Down
14 changes: 14 additions & 0 deletions src/test/util/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
const CNetMsgMaker mm{0};

peerman.InitializeNode(node, local_services);
FlushSendBuffer(node); // Drop the version message added by InitializeNode.

CSerializedNetMsg msg_version{
mm.Make(NetMsgType::VERSION,
Expand All @@ -45,6 +46,7 @@ void ConnmanTestMsg::Handshake(CNode& node,
node.fPauseSend = false;
connman.ProcessMessagesOnce(node);
peerman.SendMessages(&node);
FlushSendBuffer(node); // Drop the verack message added by SendMessages.
if (node.fDisconnect) return;
assert(node.nVersion == version);
assert(node.GetCommonVersion() == std::min(version, PROTOCOL_VERSION));
Expand All @@ -70,6 +72,18 @@ void ConnmanTestMsg::NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_by
}
}

void ConnmanTestMsg::FlushSendBuffer(CNode& node) const
{
LOCK(node.cs_vSend);
node.vSendMsg.clear();
node.m_send_memusage = 0;
while (true) {
const auto& [to_send, _more, _msg_type] = node.m_transport->GetBytesToSend();
if (to_send.empty()) break;
node.m_transport->MarkBytesSent(to_send.size());
}
}

bool ConnmanTestMsg::ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const
{
bool queued = node.m_transport->SetMessageToSend(ser_msg);
Expand Down
1 change: 1 addition & 0 deletions src/test/util/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct ConnmanTestMsg : public CConnman {
void NodeReceiveMsgBytes(CNode& node, Span<const uint8_t> msg_bytes, bool& complete) const;

bool ReceiveMsgFrom(CNode& node, CSerializedNetMsg&& ser_msg) const;
void FlushSendBuffer(CNode& node) const;
};

constexpr ServiceFlags ALL_SERVICE_FLAGS[]{
Expand Down

0 comments on commit bb4aab9

Please sign in to comment.