Skip to content

Commit

Permalink
Rework receive buffer pushback
Browse files Browse the repository at this point in the history
Co-authored-by: Anthony Towns <[email protected]>
  • Loading branch information
sipa and ajtowns committed Jul 20, 2023
1 parent 296735f commit 3388e52
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 37 deletions.
63 changes: 27 additions & 36 deletions src/net.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ void V1TransportSerializer::prepareForTransport(CSerializedNetMsg& msg, std::vec
CVectorWriter{SER_NETWORK, INIT_PROTO_VERSION, header, 0, hdr};
}

size_t CConnman::SocketSendData(CNode& node) const
std::pair<size_t, bool> CConnman::SocketSendData(CNode& node) const
{
auto it = node.vSendMsg.begin();
size_t nSentSize = 0;
Expand Down Expand Up @@ -882,7 +882,7 @@ size_t CConnman::SocketSendData(CNode& node) const
assert(node.nSendSize == 0);
}
node.vSendMsg.erase(node.vSendMsg.begin(), it);
return nSentSize;
return {nSentSize, !node.vSendMsg.empty()};
}

/** Try to find a connection to evict when the node is full.
Expand Down Expand Up @@ -1217,37 +1217,15 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span<CNode* const> nodes)
}

for (CNode* pnode : nodes) {
// Implement the following logic:
// * If there is data to send, select() for sending data. As this only
// happens when optimistic write failed, we choose to first drain the
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
// * Otherwise, if there is space left in the receive buffer, select() for
// receiving data.
// * Hand off all complete messages to the processor, to be handled without
// blocking here.

bool select_recv = !pnode->fPauseRecv;
bool select_send;
{
LOCK(pnode->cs_vSend);
select_send = !pnode->vSendMsg.empty();
}
bool select_send = WITH_LOCK(pnode->cs_vSend, return !pnode->vSendMsg.empty());
if (!select_recv && !select_send) continue;

LOCK(pnode->m_sock_mutex);
if (!pnode->m_sock) {
continue;
if (pnode->m_sock) {
Sock::Event event = (select_send ? Sock::SEND : 0) | (select_recv ? Sock::RECV : 0);
events_per_sock.emplace(pnode->m_sock, Sock::Events{event});
}

Sock::Event requested{0};
if (select_send) {
requested = Sock::SEND;
} else if (select_recv) {
requested = Sock::RECV;
}

events_per_sock.emplace(pnode->m_sock, Sock::Events{requested});
}

return events_per_sock;
Expand Down Expand Up @@ -1308,6 +1286,24 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
errorSet = it->second.occurred & Sock::ERR;
}
}

if (sendSet) {
// Send data
auto [bytes_sent, data_left] = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) {
RecordBytesSent(bytes_sent);

// If both receiving and (non-optimistic) sending were possible, we first attempt
// sending. If that succeeds, but does not fully drain the send queue, do not
// attempt to receive. This avoids needlessly queueing data if the remote peer
// is slow at receiving data, by means of TCP flow control. We only do this when
// sending actually succeeded to make sure progress is always made; otherwise a
// deadlock would be possible when both sides have data to send, but neither is
// receiving.
if (data_left) recvSet = false;
}
}

if (recvSet || errorSet)
{
// typical socket buffer is 8K-64K
Expand Down Expand Up @@ -1354,12 +1350,6 @@ void CConnman::SocketHandlerConnected(const std::vector<CNode*>& nodes,
}
}

if (sendSet) {
// Send data
size_t bytes_sent = WITH_LOCK(pnode->cs_vSend, return SocketSendData(*pnode));
if (bytes_sent) RecordBytesSent(bytes_sent);
}

if (InactivityCheck(*pnode)) pnode->fDisconnect = true;
}
}
Expand Down Expand Up @@ -2887,7 +2877,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data));

// If write queue empty, attempt "optimistic write"
if (optimisticSend) nBytesSent = SocketSendData(*pnode);
bool data_left;
if (optimisticSend) std::tie(nBytesSent, data_left) = SocketSendData(*pnode);
}
if (nBytesSent) RecordBytesSent(nBytesSent);
}
Expand Down
4 changes: 3 additions & 1 deletion src/net.h
Original file line number Diff line number Diff line change
Expand Up @@ -992,7 +992,9 @@ class CConnman

NodeId GetNewNodeId();

size_t SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);
/** (Try to) send data from node's vSendMsg. Returns (bytes_sent, data_left). */
std::pair<size_t, bool> SocketSendData(CNode& node) const EXCLUSIVE_LOCKS_REQUIRED(node.cs_vSend);

void DumpAddresses();

// Network stats
Expand Down

0 comments on commit 3388e52

Please sign in to comment.