Skip to content

Commit

Permalink
Swap flushing outbox buffer to avoid copy on FS, ref #628
Browse files Browse the repository at this point in the history
  • Loading branch information
kaetemi committed Feb 24, 2023
1 parent baa450d commit 8310484
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 16 deletions.
2 changes: 1 addition & 1 deletion ryzom/server/src/frontend_service/fe_send_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ inline void CFeSendSub::CSendBuffer::sendOutBox( NLNET::CUdpSock *datasock )
{
if (QuicUser.get())
{
QuicUser->Transceiver->sendDatagram(QuicUser.get(), OutBox.buffer(), OutBox.length());
QuicUser->Transceiver->sendDatagramSwap(QuicUser.get(), OutBox);
}
else
{
Expand Down
66 changes: 53 additions & 13 deletions ryzom/server/src/frontend_service/quic_transceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,17 +407,20 @@ _Function_class_(QUIC_CONNECTION_CALLBACK)
status = QUIC_STATUS_SUCCESS;
break;
case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT:
user->MaxSendLength = 0;
nlinfo("Shutdown initiated by transport");
self->shutdownReceived(user);
status = QUIC_STATUS_SUCCESS;
break;
case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER:
user->MaxSendLength = 0;
nlinfo("Shutdown initiated by peer");
self->shutdownReceived(user);
status = QUIC_STATUS_SUCCESS;
break;
case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: {
CQuicUserContextRelease releaseUser(user); // Hopefully we only get QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE once!
user->MaxSendLength = 0;
nlinfo("Shutdown complete");
if (ev->SHUTDOWN_COMPLETE.AppCloseInProgress)
{
Expand All @@ -432,20 +435,27 @@ _Function_class_(QUIC_CONNECTION_CALLBACK)
break;
}
case QUIC_CONNECTION_EVENT_DATAGRAM_RECEIVED:
nlinfo("Datagram received");
nldebug("Datagram received");
// YES PLEASE
self->datagramReceived(user, ev->DATAGRAM_RECEIVED.Buffer->Buffer, ev->DATAGRAM_RECEIVED.Buffer->Length);
status = QUIC_STATUS_SUCCESS;
break;
case QUIC_CONNECTION_EVENT_DATAGRAM_STATE_CHANGED:
nlinfo("Datagram state changed");
user->MaxSendLength.store(ev->DATAGRAM_STATE_CHANGED.SendEnabled ? ev->DATAGRAM_STATE_CHANGED.MaxSendLength : 0, std::memory_order_release);
user->MaxSendLength = ev->DATAGRAM_STATE_CHANGED.SendEnabled ? ev->DATAGRAM_STATE_CHANGED.MaxSendLength : 0;
status = QUIC_STATUS_SUCCESS;
break;
case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED:
if (ev->DATAGRAM_SEND_STATE_CHANGED.State == QUIC_DATAGRAM_SEND_SENT
&& (ptrdiff_t)ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext == (ptrdiff_t)user->SentCount.load())
{
user->SendBusy.clear(); // release
}
status = QUIC_STATUS_SUCCESS;
break;
case QUIC_CONNECTION_EVENT_LOCAL_ADDRESS_CHANGED:
case QUIC_CONNECTION_EVENT_PEER_ADDRESS_CHANGED:
case QUIC_CONNECTION_EVENT_IDEAL_PROCESSOR_CHANGED:
case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED:
case QUIC_CONNECTION_EVENT_RESUMED:
case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED:
case QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE: // TODO: Match with msg.xml
Expand Down Expand Up @@ -527,18 +537,48 @@ NLMISC::CBufFIFO *CQuicTransceiver::swapWriteQueue(NLMISC::CBufFIFO *writeQueue)
return previous;
}

void CQuicTransceiver::sendDatagram(CQuicUserContext *user, const uint8 *buffer, uint32 size)
{
QUIC_BUFFER *buf = new QUIC_BUFFER(); // wow leak :)
uint8 *copy = new uint8[size];
memcpy(copy, buffer, size);
buf->Buffer = copy; // (uint8 *)buffer;
buf->Length = size;
QUIC_STATUS status = MsQuic->DatagramSend((HQUIC)user->Connection, buf, 1, QUIC_SEND_FLAG_NONE, (void *)user);
if (QUIC_FAILED(status))
// void CQuicTransceiver::sendDatagram(CQuicUserContext *user, const uint8 *buffer, uint32 size)
//{
// QUIC_BUFFER *buf = new QUIC_BUFFER(); // wow leak :)
// uint8 *copy = new uint8[size];
// memcpy(copy, buffer, size);
// buf->Buffer = copy; // (uint8 *)buffer;
// buf->Length = size;
// QUIC_STATUS status = MsQuic->DatagramSend((HQUIC)user->Connection, buf, 1, QUIC_SEND_FLAG_NONE, (void *)user);
// if (QUIC_FAILED(status))
// {
// nlwarning("MsQuic->ConnectionSendDatagram failed with status %d", status);
// }
// }

bool CQuicTransceiver::sendDatagramSwap(CQuicUserContext *user, NLMISC::CBitMemStream &buffer)
{
if (buffer.size() <= user->MaxSendLength.load())
{
nlwarning("MsQuic->ConnectionSendDatagram failed with status %d", status);
if (user->SendBusy.testAndSet())
{
// Already busy
return false;
}

// Swap buffers
++user->SentCount;
user->SendBuffer.swap(buffer);
static_assert(sizeof(CQuicBuffer) == sizeof(QUIC_BUFFER));
static_assert(offsetof(CQuicBuffer, Buffer) == offsetof(QUIC_BUFFER, Buffer));
static_assert(offsetof(CQuicBuffer, Length) == offsetof(QUIC_BUFFER, Length));
user->SendQuicBuffer.Buffer = (uint8 *)user->SendBuffer.buffer();
user->SendQuicBuffer.Length = user->SendBuffer.length();
QUIC_STATUS status = MsQuic->DatagramSend((HQUIC)user->Connection, (QUIC_BUFFER *)(&user->SendQuicBuffer), 1, QUIC_SEND_FLAG_NONE, (void *)(ptrdiff_t)user->SentCount.load());
if (QUIC_FAILED(status))
{
user->SendBusy.clear();
nlwarning("DatagramSend failed with %d", status);
return false;
}
return true;
}
return false;
}

void CQuicTransceiver::shutdown(CQuicUserContext *user)
Expand Down
23 changes: 21 additions & 2 deletions ryzom/server/src/frontend_service/quic_transceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,23 @@
#include <memory>
#include <atomic>

#include "nel/misc/atomic.h"
#include "nel/misc/bit_mem_stream.h"
#include "nel/net/inet_address.h"

#include "fe_receive_task.h"

class CClientHost;

class CQuicTransceiverImpl;
class CQuicTransceiver;

struct CQuicBuffer
{
uint32 Length;
uint8 *Buffer;
};

// User context for quic messages
class CQuicUserContext
{
Expand Down Expand Up @@ -68,7 +77,13 @@ class CQuicUserContext
CClientHost *ClientHost = nullptr;

// Set if datagrams can be sent (set on connection thread, read on service main thread)
std::atomic<uint16> MaxSendLength = 0;
NLMISC::CAtomicInt MaxSendLength;

// Send buffer, one being sent at a time, released as soon as it's sent out
NLMISC::CAtomicFlag SendBusy;
NLMISC::CBitMemStream SendBuffer = NLMISC::CBitMemStream(false, 512);
CQuicBuffer SendQuicBuffer;
NLMISC::CAtomicInt SentCount;

private:
std::atomic_int m_RefCount = 0;
Expand Down Expand Up @@ -167,7 +182,11 @@ class CQuicTransceiver
bool listening();

/// Send a datagram, fancier than a telegram, but not as reliable
void sendDatagram(CQuicUserContext *user, const uint8 *buffer, uint32 size);
// void sendDatagram(CQuicUserContext *user, const uint8 *buffer, uint32 size);

/// Send a datagram, this swaps the buffer with the previous one sent
/// Only one datagram may be in flight at a time
bool sendDatagramSwap(CQuicUserContext *user, NLMISC::CBitMemStream &buffer);

/// Shutdown a connection
void shutdown(CQuicUserContext *user);
Expand Down

0 comments on commit 8310484

Please sign in to comment.