Skip to content

Commit

Permalink
Swap message buffer to avoid copy, ref #628
Browse files Browse the repository at this point in the history
  • Loading branch information
kaetemi committed Feb 24, 2023
1 parent 37b2127 commit e94d702
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 17 deletions.
13 changes: 7 additions & 6 deletions ryzom/client/src/network_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ bool CNetworkConnection::connect(string &result)
// then connect to the frontend using the udp sock
//ace faut faire la nouveau login client result = CLoginClient::connectToShard (_FrontendAddress, _Connection);

m_QuicConnection.update();
nlassert((!m_Connection.connected()) && !(m_QuicConnection.state() == CQuicConnection::Connected));

try
Expand Down Expand Up @@ -1118,7 +1119,7 @@ void CNetworkConnection::sendSystemLogin()
{
//sendUDP (&(_Connection), message.buffer(), length);
if (m_UseQuic)
m_QuicConnection.sendDatagram(message.buffer(), length);
m_QuicConnection.sendDatagramSwap(message, length);
else
m_Connection.send(message.buffer(), length);
}
Expand Down Expand Up @@ -1350,7 +1351,7 @@ void CNetworkConnection::sendSystemAckSync()

uint32 length = message.length();
if (m_UseQuic)
m_QuicConnection.sendDatagram(message.buffer(), length);
m_QuicConnection.sendDatagramSwap(message, length);
else
m_Connection.send (message.buffer(), length);
//sendUDP (&(_Connection), message.buffer(), length);
Expand Down Expand Up @@ -2239,7 +2240,7 @@ void CNetworkConnection::sendNormalMessage()
//_PropertyDecoder.send (_CurrentSendNumber, _LastReceivedNumber);
uint32 length = message.length();
if (m_UseQuic)
m_QuicConnection.sendDatagram(message.buffer(), length);
m_QuicConnection.sendDatagramSwap(message, length);
else
m_Connection.send(message.buffer(), length);
//sendUDP (&(_Connection), message.buffer(), length);
Expand Down Expand Up @@ -2403,7 +2404,7 @@ void CNetworkConnection::sendSystemAckProbe()

uint32 length = message.length();
if (m_UseQuic)
m_QuicConnection.sendDatagram(message.buffer(), length);
m_QuicConnection.sendDatagramSwap(message, length);
else
m_Connection.send(message.buffer(), length);
//sendUDP (&(_Connection), message.buffer(), length);
Expand Down Expand Up @@ -2946,7 +2947,7 @@ void CNetworkConnection::sendSystemDisconnection()
}
if (m_QuicConnection.canSend())
{
m_QuicConnection.sendDatagram(message.buffer(), length);
m_QuicConnection.sendDatagramSwap(message, length);
}
//sendUDP (&(_Connection), message.buffer(), length);
statsSend(length);
Expand Down Expand Up @@ -3186,7 +3187,7 @@ void CNetworkConnection::sendSystemQuit()

uint32 length = message.length();
if (m_UseQuic)
m_QuicConnection.sendDatagram(message.buffer(), length);
m_QuicConnection.sendDatagramSwap(message, length);
else
m_Connection.send(message.buffer(), length);
//sendUDP (&(_Connection), message.buffer(), length);
Expand Down
67 changes: 57 additions & 10 deletions ryzom/client/src/quic_connection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ class CQuicConnectionImpl
bool ReceiveEnable; // Set when datagrams may be received, otherwise they'll be ignored, this is to avoid stray messages after disconnecting (set and accessed by main thread only)
CAtomicInt MaxSendLength; // Set by the callback thread, used to check if we can send

CAtomicFlag SendBusy;
CBitMemStream SendBuffer;
QUIC_BUFFER SendQuicBuffer;
CAtomicInt SentCount;

static QUIC_STATUS
#ifdef _Function_class_
_Function_class_(QUIC_CONNECTION_CALLBACK)
Expand Down Expand Up @@ -136,6 +141,7 @@ void CQuicConnectionImpl::init()
m->ConnectedFlag.testAndSet();
m->ShuttingDownFlag.testAndSet();
m->ShutdownFlag.testAndSet();
m->SendBusy.clear(); // release order

// Open library
QUIC_STATUS status = MsQuicOpenVersion(QUIC_API_VERSION_2, (const void **)&MsQuic);
Expand Down Expand Up @@ -414,6 +420,8 @@ void CQuicConnectionImpl::release()
fifo->clear();
}

m->SendBuffer.clear();

// Close configuration
if (m->Configuration)
{
Expand Down Expand Up @@ -509,10 +517,17 @@ _Function_class_(QUIC_CONNECTION_CALLBACK)
m->MaxSendLength = ev->DATAGRAM_STATE_CHANGED.SendEnabled ? ev->DATAGRAM_STATE_CHANGED.MaxSendLength : 0;
status = QUIC_STATUS_SUCCESS;
break;
case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED:
if (ev->DATAGRAM_SEND_STATE_CHANGED.State == QUIC_DATAGRAM_SEND_SENT
&& (ptrdiff_t)ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext == (ptrdiff_t)m->SentCount.load())
{
m->SendBusy.clear(); // release
}
status = QUIC_STATUS_SUCCESS;
break;
case QUIC_CONNECTION_EVENT_LOCAL_ADDRESS_CHANGED:
case QUIC_CONNECTION_EVENT_PEER_ADDRESS_CHANGED:
case QUIC_CONNECTION_EVENT_IDEAL_PROCESSOR_CHANGED:
case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED:
case QUIC_CONNECTION_EVENT_RESUMED:
case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED:
case QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE: // TODO: Match with msg.xml
Expand All @@ -530,18 +545,45 @@ _Function_class_(QUIC_CONNECTION_CALLBACK)
return status;
}

bool CQuicConnection::sendDatagram(const uint8 *buffer, uint32 size)
//bool CQuicConnection::sendDatagram(const uint8 *buffer, uint32 size)
//{
// if (m->Connection && m->State && CQuicConnection::Connected && size <= m->MaxSendLength.load())
// {
// QUIC_BUFFER *buf = new QUIC_BUFFER(); // wow leak :)
// uint8 *copy = new uint8[size];
// memcpy(copy, buffer, size);
// buf->Buffer = copy; // (uint8 *)buffer;
// buf->Length = size;
// QUIC_STATUS status = MsQuic->DatagramSend(m->Connection, buf, 1, QUIC_SEND_FLAG_NONE, this);
// if (QUIC_FAILED(status))
// {
// nlwarning("DatagramSend failed with %d", status);
// return false;
// }
// return true;
// }
// return false;
//}

bool CQuicConnection::sendDatagramSwap(NLMISC::CBitMemStream &buffer, uint32 size)
{
if (m->Connection && m->State && CQuicConnection::Connected && size <= m->MaxSendLength.load())
if (m->Connection && m->State && CQuicConnection::Connected && size <= m->MaxSendLength.load() && size <= buffer.length())
{
QUIC_BUFFER *buf = new QUIC_BUFFER(); // wow leak :)
uint8 *copy = new uint8[size];
memcpy(copy, buffer, size);
buf->Buffer = copy; // (uint8 *)buffer;
buf->Length = size;
QUIC_STATUS status = MsQuic->DatagramSend(m->Connection, buf, 1, QUIC_SEND_FLAG_NONE, this);
if (m->SendBusy.testAndSet())
{
// Already busy
return false;
}

// Swap buffers
++m->SentCount;
m->SendBuffer.swap(buffer);
m->SendQuicBuffer.Buffer = (uint8 *)m->SendBuffer.buffer();
m->SendQuicBuffer.Length = size;
QUIC_STATUS status = MsQuic->DatagramSend(m->Connection, &m->SendQuicBuffer, 1, QUIC_SEND_FLAG_NONE, (void *)(ptrdiff_t)m->SentCount.load());
if (QUIC_FAILED(status))
{
m->SendBusy.clear();
nlwarning("DatagramSend failed with %d", status);
return false;
}
Expand Down Expand Up @@ -638,8 +680,13 @@ CQuicConnection::TState CQuicConnection::state() const
return Disconnected;
}

void CQuicConnection::sendDatagram(const uint8 *buffer, uint32 size)
//bool CQuicConnection::sendDatagram(const uint8 *buffer, uint32 size)
//{
//}

bool CQuicConnection::sendDatagramSwap(NLMISC::CBitMemStream &buffer, uint32 size)
{

}

bool CQuicConnection::datagramAvailable()
Expand Down
6 changes: 5 additions & 1 deletion ryzom/client/src/quic_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@ class CQuicConnection
inline bool connected() const { return state() == Connected; }

/// Send a datagram, fancier than a telegram, but not as reliable
bool sendDatagram(const uint8 *buffer, uint32 size);
// bool sendDatagram(const uint8 *buffer, uint32 size);

/// Send a datagram, this swaps the buffer with the previous one sent
/// Only one datagram may be in flight at a time
bool sendDatagramSwap(NLMISC::CBitMemStream &buffer, uint32 size = 0);

/// Check if any datagram has been received
bool datagramAvailable() const;
Expand Down

0 comments on commit e94d702

Please sign in to comment.