diff --git a/ryzom/server/src/frontend_service/fe_send_sub.cpp b/ryzom/server/src/frontend_service/fe_send_sub.cpp index 1d5214d7fe..491888c10f 100644 --- a/ryzom/server/src/frontend_service/fe_send_sub.cpp +++ b/ryzom/server/src/frontend_service/fe_send_sub.cpp @@ -333,7 +333,7 @@ inline void CFeSendSub::CSendBuffer::sendOutBox( NLNET::CUdpSock *datasock ) { if (QuicUser.get()) { - QuicUser->Transceiver->sendDatagram(QuicUser.get(), OutBox.buffer(), OutBox.length()); + QuicUser->Transceiver->sendDatagramSwap(QuicUser.get(), OutBox); } else { diff --git a/ryzom/server/src/frontend_service/quic_transceiver.cpp b/ryzom/server/src/frontend_service/quic_transceiver.cpp index 6398c4d8ca..b74cbe68fe 100644 --- a/ryzom/server/src/frontend_service/quic_transceiver.cpp +++ b/ryzom/server/src/frontend_service/quic_transceiver.cpp @@ -407,17 +407,20 @@ _Function_class_(QUIC_CONNECTION_CALLBACK) status = QUIC_STATUS_SUCCESS; break; case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: + user->MaxSendLength = 0; nlinfo("Shutdown initiated by transport"); self->shutdownReceived(user); status = QUIC_STATUS_SUCCESS; break; case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: + user->MaxSendLength = 0; nlinfo("Shutdown initiated by peer"); self->shutdownReceived(user); status = QUIC_STATUS_SUCCESS; break; case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: { CQuicUserContextRelease releaseUser(user); // Hopefully we only get QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE once! + user->MaxSendLength = 0; nlinfo("Shutdown complete"); if (ev->SHUTDOWN_COMPLETE.AppCloseInProgress) { @@ -432,20 +435,27 @@ _Function_class_(QUIC_CONNECTION_CALLBACK) break; } case QUIC_CONNECTION_EVENT_DATAGRAM_RECEIVED: - nlinfo("Datagram received"); + nldebug("Datagram received"); // YES PLEASE self->datagramReceived(user, ev->DATAGRAM_RECEIVED.Buffer->Buffer, ev->DATAGRAM_RECEIVED.Buffer->Length); status = QUIC_STATUS_SUCCESS; break; case QUIC_CONNECTION_EVENT_DATAGRAM_STATE_CHANGED: nlinfo("Datagram state changed"); - user->MaxSendLength.store(ev->DATAGRAM_STATE_CHANGED.SendEnabled ? ev->DATAGRAM_STATE_CHANGED.MaxSendLength : 0, std::memory_order_release); + user->MaxSendLength = ev->DATAGRAM_STATE_CHANGED.SendEnabled ? ev->DATAGRAM_STATE_CHANGED.MaxSendLength : 0; + status = QUIC_STATUS_SUCCESS; + break; + case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED: + if (ev->DATAGRAM_SEND_STATE_CHANGED.State == QUIC_DATAGRAM_SEND_SENT + && (ptrdiff_t)ev->DATAGRAM_SEND_STATE_CHANGED.ClientContext == (ptrdiff_t)user->SentCount.load()) + { + user->SendBusy.clear(); // release + } status = QUIC_STATUS_SUCCESS; break; case QUIC_CONNECTION_EVENT_LOCAL_ADDRESS_CHANGED: case QUIC_CONNECTION_EVENT_PEER_ADDRESS_CHANGED: case QUIC_CONNECTION_EVENT_IDEAL_PROCESSOR_CHANGED: - case QUIC_CONNECTION_EVENT_DATAGRAM_SEND_STATE_CHANGED: case QUIC_CONNECTION_EVENT_RESUMED: case QUIC_CONNECTION_EVENT_PEER_CERTIFICATE_RECEIVED: case QUIC_CONNECTION_EVENT_STREAMS_AVAILABLE: // TODO: Match with msg.xml @@ -527,18 +537,48 @@ NLMISC::CBufFIFO *CQuicTransceiver::swapWriteQueue(NLMISC::CBufFIFO *writeQueue) return previous; } -void CQuicTransceiver::sendDatagram(CQuicUserContext *user, const uint8 *buffer, uint32 size) -{ - QUIC_BUFFER *buf = new QUIC_BUFFER(); // wow leak :) - uint8 *copy = new uint8[size]; - memcpy(copy, buffer, size); - buf->Buffer = copy; // (uint8 *)buffer; - buf->Length = size; - QUIC_STATUS status = MsQuic->DatagramSend((HQUIC)user->Connection, buf, 1, QUIC_SEND_FLAG_NONE, (void *)user); - if (QUIC_FAILED(status)) +// void CQuicTransceiver::sendDatagram(CQuicUserContext *user, const uint8 *buffer, uint32 size) +//{ +// QUIC_BUFFER *buf = new QUIC_BUFFER(); // wow leak :) +// uint8 *copy = new uint8[size]; +// memcpy(copy, buffer, size); +// buf->Buffer = copy; // (uint8 *)buffer; +// buf->Length = size; +// QUIC_STATUS status = MsQuic->DatagramSend((HQUIC)user->Connection, buf, 1, QUIC_SEND_FLAG_NONE, (void *)user); +// if (QUIC_FAILED(status)) +// { +// nlwarning("MsQuic->ConnectionSendDatagram failed with status %d", status); +// } +// } + +bool CQuicTransceiver::sendDatagramSwap(CQuicUserContext *user, NLMISC::CBitMemStream &buffer) +{ + if (buffer.size() <= user->MaxSendLength.load()) { - nlwarning("MsQuic->ConnectionSendDatagram failed with status %d", status); + if (user->SendBusy.testAndSet()) + { + // Already busy + return false; + } + + // Swap buffers + ++user->SentCount; + user->SendBuffer.swap(buffer); + static_assert(sizeof(CQuicBuffer) == sizeof(QUIC_BUFFER)); + static_assert(offsetof(CQuicBuffer, Buffer) == offsetof(QUIC_BUFFER, Buffer)); + static_assert(offsetof(CQuicBuffer, Length) == offsetof(QUIC_BUFFER, Length)); + user->SendQuicBuffer.Buffer = (uint8 *)user->SendBuffer.buffer(); + user->SendQuicBuffer.Length = user->SendBuffer.length(); + QUIC_STATUS status = MsQuic->DatagramSend((HQUIC)user->Connection, (QUIC_BUFFER *)(&user->SendQuicBuffer), 1, QUIC_SEND_FLAG_NONE, (void *)(ptrdiff_t)user->SentCount.load()); + if (QUIC_FAILED(status)) + { + user->SendBusy.clear(); + nlwarning("DatagramSend failed with %d", status); + return false; + } + return true; } + return false; } void CQuicTransceiver::shutdown(CQuicUserContext *user) diff --git a/ryzom/server/src/frontend_service/quic_transceiver.h b/ryzom/server/src/frontend_service/quic_transceiver.h index bf925c34cb..519b92cfd1 100644 --- a/ryzom/server/src/frontend_service/quic_transceiver.h +++ b/ryzom/server/src/frontend_service/quic_transceiver.h @@ -22,7 +22,10 @@ #include #include +#include "nel/misc/atomic.h" +#include "nel/misc/bit_mem_stream.h" #include "nel/net/inet_address.h" + #include "fe_receive_task.h" class CClientHost; @@ -30,6 +33,12 @@ class CClientHost; class CQuicTransceiverImpl; class CQuicTransceiver; +struct CQuicBuffer +{ + uint32 Length; + uint8 *Buffer; +}; + // User context for quic messages class CQuicUserContext { @@ -68,7 +77,13 @@ class CQuicUserContext CClientHost *ClientHost = nullptr; // Set if datagrams can be sent (set on connection thread, read on service main thread) - std::atomic MaxSendLength = 0; + NLMISC::CAtomicInt MaxSendLength; + + // Send buffer, one being sent at a time, released as soon as it's sent out + NLMISC::CAtomicFlag SendBusy; + NLMISC::CBitMemStream SendBuffer = NLMISC::CBitMemStream(false, 512); + CQuicBuffer SendQuicBuffer; + NLMISC::CAtomicInt SentCount; private: std::atomic_int m_RefCount = 0; @@ -167,7 +182,11 @@ class CQuicTransceiver bool listening(); /// Send a datagram, fancier than a telegram, but not as reliable - void sendDatagram(CQuicUserContext *user, const uint8 *buffer, uint32 size); + // void sendDatagram(CQuicUserContext *user, const uint8 *buffer, uint32 size); + + /// Send a datagram, this swaps the buffer with the previous one sent + /// Only one datagram may be in flight at a time + bool sendDatagramSwap(CQuicUserContext *user, NLMISC::CBitMemStream &buffer); /// Shutdown a connection void shutdown(CQuicUserContext *user);