From 4cf8e5eef0d5c7259d22e754e9fa8f5bd350abe3 Mon Sep 17 00:00:00 2001 From: kaetemi Date: Fri, 24 Feb 2023 11:54:42 +0800 Subject: [PATCH] Hooking up CQuicConnection to CNetworkConnection, ref ryzom/ryzomcore#628 --- nel/include/nel/misc/atomic.h | 6 + ryzom/client/src/network_connection.cpp | 170 +++++++++++++++++------- ryzom/client/src/network_connection.h | 13 +- ryzom/client/src/quic_connection.cpp | 102 +++++++++++--- ryzom/client/src/quic_connection.h | 8 +- 5 files changed, 224 insertions(+), 75 deletions(-) diff --git a/nel/include/nel/misc/atomic.h b/nel/include/nel/misc/atomic.h index d6de7ada2e..987b8295b5 100644 --- a/nel/include/nel/misc/atomic.h +++ b/nel/include/nel/misc/atomic.h @@ -629,6 +629,12 @@ class CAtomicEnum } }; +#ifdef NL_CPP11 +using CAtomicBool = CAtomicEnum; +#else +typedef CAtomicEnum CAtomicBool; +#endif + /// Hold a spinlock on an atomic flag class CAtomicLockSpin { diff --git a/ryzom/client/src/network_connection.cpp b/ryzom/client/src/network_connection.cpp index 6557c75bc4..a1645092db 100644 --- a/ryzom/client/src/network_connection.cpp +++ b/ryzom/client/src/network_connection.cpp @@ -660,8 +660,8 @@ bool CNetworkConnection::connect(string &result) // then connect to the frontend using the udp sock //ace faut faire la nouveau login client result = CLoginClient::connectToShard (_FrontendAddress, _Connection); - nlassert (!_Connection.connected()); - + nlassert((!m_Connection.connected()) && !(m_QuicConnection.state() == CQuicConnection::Connected)); + try { // @@ -671,42 +671,53 @@ bool CNetworkConnection::connect(string &result) // Under UDP we need to connect one by one by ourselves _FrontendHost = frontendHost; _FrontendHostIndex = 0; - std::string tres = "No remaining FS connection addresses"; - while (_FrontendHostIndex < _FrontendHost.size()) + if (m_UseQuic) { - try - { - // Connect - _Connection.connect(_FrontendHost.at(_FrontendHostIndex)); - tres.clear(); - break; - } - catch (const ESocket &e) + // Only need one connection attempt on QUIC as it handles migration + _FrontendHost.setPort(_FrontendHost.port() - 5000); // Same trick as SBS, just use a port offset + nlinfo("Connecting to QUIC at port instead %d", _FrontendHost.port()); + m_QuicConnection.connect(_FrontendHost); + } + else + { + m_QuicConnection.release(); // Do clean up any resources if we're no longer using QUIC + std::string tres = "No remaining FS connection addresses"; + while (_FrontendHostIndex < _FrontendHost.size()) { - tres = toString ("FS refused the connection (%s)", e.what()); - } - ++_FrontendHostIndex; - - // Reuse the udp socket - _Connection.~CUdpSimSock(); + try + { + // Connect + m_Connection.connect(_FrontendHost.at(_FrontendHostIndex)); + tres.clear(); + break; + } + catch (const ESocket &e) + { + tres = toString("FS refused the connection (%s)", e.what()); + } + ++_FrontendHostIndex; + + // Reuse the udp socket + m_Connection.~CUdpSimSock(); #ifdef new #undef new #endif - new (&_Connection)CUdpSimSock(); + new (&m_Connection) CUdpSimSock(); #ifdef DEBUG_NEW #define new DEBUG_NEW #endif - } - if (!tres.empty()) - { - result = tres; - return false; + } + if (!tres.empty()) + { + result = tres; + return false; + } } } catch (const ESocket &e) { - result = toString ("FS hostname resolution failed (%s)", e.what()); + result = toString("FS hostname resolution failed (%s)", e.what()); return false; } @@ -779,6 +790,7 @@ bool CNetworkConnection::update() _TotalMessages = 0; //nldebug("CNET[%d]: begin update()", this); + m_QuicConnection.update(); // Always update QUIC state // If we are disconnected, bypass the real network update if ( _ConnectionState == Disconnect ) @@ -822,19 +834,25 @@ bool CNetworkConnection::update() return res; #endif + if (m_UseQuic && _ConnectionState == NextAddress) + { + // Early exit this attempt since we're switching to QUIC + _ConnectionState = Disconnect; + } + if (_ConnectionState == NextAddress) { - nlassert(!_Connection.connected()); + nlassert(!m_Connection.connected()); std::string tres = "No remaining FS connection addresses"; while (_FrontendHostIndex < _FrontendHost.size()) { // Reuse the udp socket - _Connection.~CUdpSimSock(); + m_Connection.~CUdpSimSock(); #ifdef new #undef new #endif - new (&_Connection)CUdpSimSock(); + new (&m_Connection)CUdpSimSock(); #ifdef DEBUG_NEW #define new DEBUG_NEW @@ -844,7 +862,7 @@ bool CNetworkConnection::update() try { // Connect - _Connection.connect(frontendHost); + m_Connection.connect(frontendHost); tres.clear(); break; } @@ -873,7 +891,7 @@ bool CNetworkConnection::update() } } - if (!_Connection.connected()) + if (m_UseQuic ? (m_QuicConnection.state() == CQuicConnection::Disconnected) : !m_Connection.connected()) { //if(!ClientCfg.Local) // nlwarning("CNET[%p]: update() attempted whereas socket is not connected !", this); @@ -957,6 +975,12 @@ bool CNetworkConnection::update() } } + if (m_UseQuic && m_QuicConnection.state() == CQuicConnection::Disconnected) + { + // Bye + _ConnectionState = Disconnect; + } + //updateBufferizedPackets (); PacketLossGraph.addOneValue (getMeanPacketLoss ()); @@ -1000,8 +1024,19 @@ bool CNetworkConnection::buildStream( CBitMemStream &msgin ) } #endif + if (m_UseQuic) + { + if (m_QuicConnection.receiveDatagram(msgin)) + { + return true; + } + // Under quic receiving a datagram never fails if a datagram is flagged as available + nlwarning("QUIC datagram available but receiving datagram failed, this is a bug."); + return false; + } + uint32 len = 65536; - if ( _Connection.receive( (uint8*)_ReceiveBuffer, len, false ) ) + if ( m_Connection.receive( (uint8*)_ReceiveBuffer, len, false ) ) { // Compute some statistics statsReceive( len ); @@ -1082,7 +1117,10 @@ void CNetworkConnection::sendSystemLogin() try { //sendUDP (&(_Connection), message.buffer(), length); - _Connection.send( message.buffer(), length ); + if (m_UseQuic) + m_QuicConnection.sendDatagram(message.buffer(), length); + else + m_Connection.send(message.buffer(), length); } catch (const ESocket &e) { @@ -1132,7 +1170,7 @@ bool CNetworkConnection::stateLogin() while ( (_ReplayIncomingMessagesOn && dataToReplayAvailable()) || _Connection.dataAvailable() ) #else - while ( _Connection.dataAvailable() )// && _TotalMessages<5) + while ( m_UseQuic ? m_QuicConnection.datagramAvailable() : m_Connection.dataAvailable() )// && _TotalMessages<5) #endif { _DecodedHeader = false; @@ -1192,7 +1230,8 @@ bool CNetworkConnection::stateLogin() { sendSystemLogin(); _LatestLoginTime = _UpdateTime; - if (m_LoginAttempts > 24) + // On UDP time out the login after 24 attempts (every 300ms, so after 7.2 seconds) + if ((!m_UseQuic) && (m_LoginAttempts > 24)) { m_LoginAttempts = 0; disconnect(); // will send disconnection message @@ -1308,7 +1347,10 @@ void CNetworkConnection::sendSystemAckSync() message.serial(_LatestSync); uint32 length = message.length(); - _Connection.send (message.buffer(), length); + if (m_UseQuic) + m_QuicConnection.sendDatagram(message.buffer(), length); + else + m_Connection.send (message.buffer(), length); //sendUDP (&(_Connection), message.buffer(), length); statsSend(length); @@ -1350,7 +1392,7 @@ bool CNetworkConnection::stateSynchronize() while ( (_ReplayIncomingMessagesOn && dataToReplayAvailable()) || _Connection.dataAvailable() ) #else - while (_Connection.dataAvailable())// && _TotalMessages<5) + while (m_UseQuic ? m_QuicConnection.datagramAvailable() : m_Connection.dataAvailable())// && _TotalMessages<5) #endif { _DecodedHeader = false; @@ -2194,7 +2236,10 @@ void CNetworkConnection::sendNormalMessage() //_PropertyDecoder.send (_CurrentSendNumber, _LastReceivedNumber); uint32 length = message.length(); - _Connection.send (message.buffer(), length); + if (m_UseQuic) + m_QuicConnection.sendDatagram(message.buffer(), length); + else + m_Connection.send(message.buffer(), length); //sendUDP (&(_Connection), message.buffer(), length); statsSend(length); // remember send time @@ -2221,7 +2266,7 @@ bool CNetworkConnection::stateConnected() TTime now = ryzomGetLocalTime (); TTime diff = now - previousTime; previousTime = now; - if ( (diff > 3000) && (! _Connection.dataAvailable()) ) + if ( (diff > 3000) && (! (m_UseQuic ? m_QuicConnection.datagramAvailable() : m_Connection.dataAvailable())) ) { return false; } @@ -2237,7 +2282,7 @@ bool CNetworkConnection::stateConnected() _MachineTicksAtTick = _UpdateTicks; } - if (_CurrentClientTick >= _CurrentServerTick && !_Connection.dataAvailable()) + if (_CurrentClientTick >= _CurrentServerTick && !(m_UseQuic ? m_QuicConnection.datagramAvailable() : m_Connection.dataAvailable())) { return false; } @@ -2249,7 +2294,7 @@ bool CNetworkConnection::stateConnected() while ( (_ReplayIncomingMessagesOn && dataToReplayAvailable()) || _Connection.dataAvailable() ) #else - while (_Connection.dataAvailable())// && _TotalMessages<5) + while (m_UseQuic ? m_QuicConnection.datagramAvailable() : m_Connection.dataAvailable())// && _TotalMessages<5) #endif { _DecodedHeader = false; @@ -2355,7 +2400,10 @@ void CNetworkConnection::sendSystemAckProbe() _LatestProbes.clear(); uint32 length = message.length(); - _Connection.send (message.buffer(), length); + if (m_UseQuic) + m_QuicConnection.sendDatagram(message.buffer(), length); + else + m_Connection.send(message.buffer(), length); //sendUDP (&(_Connection), message.buffer(), length); statsSend(length); @@ -2375,7 +2423,7 @@ bool CNetworkConnection::stateProbe() while ( (_ReplayIncomingMessagesOn && dataToReplayAvailable()) || _Connection.dataAvailable() ) #else - while (_Connection.dataAvailable())// && _TotalMessages<5) + while (m_UseQuic ? m_QuicConnection.datagramAvailable() : m_Connection.dataAvailable())// && _TotalMessages<5) #endif { _DecodedHeader = false; @@ -2464,7 +2512,7 @@ bool CNetworkConnection::stateStalled() while ( (_ReplayIncomingMessagesOn && dataToReplayAvailable()) || _Connection.dataAvailable() ) #else - while (_Connection.dataAvailable())// && _TotalMessages<5) + while (m_UseQuic ? m_QuicConnection.datagramAvailable() : m_Connection.dataAvailable())// && _TotalMessages<5) #endif { _DecodedHeader = false; @@ -2883,17 +2931,21 @@ void CNetworkConnection::sendSystemDisconnection() uint32 length = message.length(); - if (_Connection.connected()) + if (m_Connection.connected()) { try { - _Connection.send(message.buffer(), length); + m_Connection.send(message.buffer(), length); } catch (const ESocket &e) { nlwarning("Socket exception: %s", e.what()); } } + if (m_QuicConnection.canSend()) + { + m_QuicConnection.sendDatagram(message.buffer(), length); + } //sendUDP (&(_Connection), message.buffer(), length); statsSend(length); @@ -2916,12 +2968,18 @@ void CNetworkConnection::disconnect() _ConnectionState == Disconnect) { //nlwarning("Unable to disconnect(): not connected yet, or already disconnected."); - _Connection.close(); + m_QuicConnection.disconnect(); + if (!m_UseQuic) + m_QuicConnection.release(); + m_Connection.close(); return; } sendSystemDisconnection(); - _Connection.close(); + m_QuicConnection.disconnect(); + if (!m_UseQuic) + m_QuicConnection.release(); + m_Connection.close(); _ConnectionState = Disconnect; } @@ -2947,7 +3005,7 @@ bool CNetworkConnection::stateQuit() while ( (_ReplayIncomingMessagesOn && dataToReplayAvailable()) || _Connection.dataAvailable() ) #else - while (_Connection.dataAvailable())// && _TotalMessages<5) + while (m_UseQuic ? m_QuicConnection.datagramAvailable() : m_Connection.dataAvailable())// && _TotalMessages<5) #endif { _DecodedHeader = false; @@ -3065,6 +3123,8 @@ void CNetworkConnection::reset() _TotalLostPackets = 0; _ConnectionQuality = false; + m_UseQuic = false; + _CurrentSmoothServerTick= 0; _SSTLastLocalTime= 0; } @@ -3093,16 +3153,21 @@ void CNetworkConnection::reinit() initTicks(); // Reuse the udp socket - _Connection.~CUdpSimSock(); + m_Connection.~CUdpSimSock(); #ifdef new #undef new #endif - new (&_Connection) CUdpSimSock(); + new (&m_Connection) CUdpSimSock(); #ifdef DEBUG_NEW #define new DEBUG_NEW #endif + + // Disconnect QUIC + m_QuicConnection.disconnect(); + if (!m_UseQuic) + m_QuicConnection.release(); } // sends system sync acknowledge @@ -3118,7 +3183,10 @@ void CNetworkConnection::sendSystemQuit() message.serial(_QuitId); uint32 length = message.length(); - _Connection.send (message.buffer(), length); + if (m_UseQuic) + m_QuicConnection.sendDatagram(message.buffer(), length); + else + m_Connection.send(message.buffer(), length); //sendUDP (&(_Connection), message.buffer(), length); statsSend(length); diff --git a/ryzom/client/src/network_connection.h b/ryzom/client/src/network_connection.h index 6877c2602c..e4162b6f81 100644 --- a/ryzom/client/src/network_connection.h +++ b/ryzom/client/src/network_connection.h @@ -40,6 +40,7 @@ #include #include +#include "quic_connection.h" #include "time_client.h" #include "game_share/entity_types.h" #include "property_decoder.h" @@ -512,13 +513,13 @@ class CNetworkConnection /// Get local IP address const NLNET::CInetAddress& getAddress() { - return _Connection.localAddr(); + return m_Connection.localAddr(); // FIXME: QUIC } /// Get socket NLNET::CUdpSock& getConnection() { - return _Connection.UdpSock; + return m_Connection.UdpSock; // FIXME: QUIC } /// Get userId @@ -575,7 +576,13 @@ class CNetworkConnection NLNET::CLoginCookie _LoginCookie; /// The UDP connection to the frontend - NLNET::CUdpSimSock _Connection; + NLNET::CUdpSimSock m_Connection; + + /// The QUIC connection to the frontend + CQuicConnection m_QuicConnection; + + /// Whether to use QUIC or not + bool m_UseQuic; /// The receive buffer uint32 _ReceiveBuffer[65536]; diff --git a/ryzom/client/src/quic_connection.cpp b/ryzom/client/src/quic_connection.cpp index d3c4757f8b..e5a9d8ef07 100644 --- a/ryzom/client/src/quic_connection.cpp +++ b/ryzom/client/src/quic_connection.cpp @@ -48,7 +48,8 @@ class CQuicConnectionImpl { NoChange, Connect, - Disconnect + Disconnect, + Release, }; CQuicConnectionImpl() @@ -59,7 +60,7 @@ class CQuicConnectionImpl , DesiredStateChange(NoChange) , BufferReads(0) , State(CQuicConnection::Disconnected) - , MaxSendLength(0) + , ReceiveEnable(0) { } @@ -85,7 +86,8 @@ class CQuicConnectionImpl CAtomicFlag ShuttingDownFlag; // Set by the callback thread, checked by the update call (main thread), reset by update on complete disconnection CAtomicFlag ShutdownFlag; // Set by the callback thread, checked by the update call (main thread), reset by update on complete disconnection - CAtomicInt MaxSendLength; // Set by the callback thread, not used currently + bool ReceiveEnable; // Set when datagrams may be received, otherwise they'll be ignored, this is to avoid stray messages after disconnecting (set and accessed by main thread only) + CAtomicInt MaxSendLength; // Set by the callback thread, used to check if we can send static QUIC_STATUS #ifdef _Function_class_ @@ -93,10 +95,12 @@ class CQuicConnectionImpl #endif connectionCallback(HQUIC connection, void *context, QUIC_CONNECTION_EVENT *ev); + void init(); // Delayed init void update(); void connect(); void shutdown(); void close(); + void release(); }; CQuicConnection::CQuicConnection() @@ -106,6 +110,23 @@ CQuicConnection::CQuicConnection() : m(new CQuicConnectionImpl()) #endif { +} + +CQuicConnection::~CQuicConnection() +{ + disconnect(true); + release(); +} + +void CQuicConnectionImpl::init() +{ + CQuicConnectionImpl *const m = this; + + if (MsQuic) + { + return; + } + // TestAndSet has acquire semantics, clear has release semantics, so we use clear to flag the events m->ConnectedFlag.testAndSet(); m->ShuttingDownFlag.testAndSet(); @@ -132,12 +153,6 @@ CQuicConnection::CQuicConnection() } } -CQuicConnection::~CQuicConnection() -{ - disconnect(true); - release(); -} - void CQuicConnection::connect(const NLNET::CInetHost &addr) { m->DesiredStateChange = CQuicConnectionImpl::Connect; @@ -149,6 +164,8 @@ void CQuicConnectionImpl::connect() { CQuicConnectionImpl *const m = this; + init(); + nlassert(m->State == CQuicConnection::Disconnected); nlassert(m->DesiredStateChange == Connect); @@ -161,6 +178,14 @@ void CQuicConnectionImpl::connect() m->ShutdownFlag.testAndSet(); m->MaxSendLength = 0; m->BufferReads = m->BufferWrites; + m->ReceiveEnable = true; + + // Clear FIFO + { + CFifoAccessor access(&m->Buffer); + CBufFIFO *fifo = &access.value(); + fifo->clear(); + } if (!MsQuic) { @@ -228,7 +253,7 @@ void CQuicConnectionImpl::connect() return; } - // Start the connection + // Start the connection (NOTE: The hostname lookup is a bit redundant here, but it's okay I suppose...) status = MsQuic->ConnectionStart(m->Connection, m->Configuration, QUIC_ADDRESS_FAMILY_UNSPEC, nlUtf8ToMbcs(addr.hostname()), addr.port()); if (QUIC_FAILED(status)) { @@ -245,7 +270,8 @@ void CQuicConnectionImpl::connect() void CQuicConnection::disconnect(bool blocking) { m->DesiredStateChange = CQuicConnectionImpl::Disconnect; - if (blocking) + m->ReceiveEnable = false; + if (MsQuic && blocking) { try { @@ -289,6 +315,17 @@ void CQuicConnectionImpl::update() { CQuicConnectionImpl *const m = this; + // Asynchronous release + if (!m->Connection + && m->DesiredStateChange == CQuicConnectionImpl::Release) + { + nlassert(m->State == CQuicConnection::Disconnected); + m->release(); + m->DesiredStateChange = CQuicConnectionImpl::NoChange; + update(); + return; + } + // Early exit if (m->State == CQuicConnection::Disconnected && m->DesiredStateChange != CQuicConnectionImpl::Connect) @@ -327,6 +364,8 @@ void CQuicConnectionImpl::update() { nlwarning("Connection is connected, but state is %d", m->State); } + update(); + return; } // Shutting down, this is just a limbo state @@ -336,6 +375,8 @@ void CQuicConnectionImpl::update() { m->State = CQuicConnection::Disconnecting; } + update(); + return; } // Shutdown complete, now close! @@ -345,6 +386,8 @@ void CQuicConnectionImpl::update() nlassert(m->Connection); close(); m->State = CQuicConnection::Disconnected; + update(); + return; } } @@ -353,8 +396,17 @@ void CQuicConnection::update() m->update(); } -void CQuicConnection::release() +void CQuicConnectionImpl::release() { + CQuicConnectionImpl *const m = this; + + // Clear FIFO + { + CFifoAccessor access(&m->Buffer); + CBufFIFO *fifo = &access.value(); + fifo->clear(); + } + // Close configuration if (m->Configuration) { @@ -377,6 +429,13 @@ void CQuicConnection::release() } } +void CQuicConnection::release() +{ + m->DesiredStateChange = CQuicConnectionImpl::Release; + m->ReceiveEnable = false; + update(); +} + CQuicConnection::TState CQuicConnection::state() const { return m->State; @@ -453,30 +512,39 @@ _Function_class_(QUIC_CONNECTION_CALLBACK) return status; } -void CQuicConnection::sendDatagram(const uint8 *buffer, uint32 size) +bool CQuicConnection::sendDatagram(const uint8 *buffer, uint32 size) { - if (m->Connection && size < m->MaxSendLength) + if (m->Connection && size <= m->MaxSendLength) { QUIC_BUFFER buf; buf.Buffer = (uint8 *)buffer; buf.Length = size; - MsQuic->DatagramSend(m->Connection, &buf, 1, QUIC_SEND_FLAG_NONE, this); + QUIC_STATUS status = MsQuic->DatagramSend(m->Connection, &buf, 1, QUIC_SEND_FLAG_NONE, this); + if (QUIC_FAILED(status)) + { + nlwarning("DatagramSend failed with %d", status); + return false; + } + return true; } + return false; } bool CQuicConnection::datagramAvailable() const { // CFifoAccessor access(&m->Buffer); // return !access.value().empty(); - return m->BufferWrites != m->BufferReads; + return m->ReceiveEnable && m->BufferWrites != m->BufferReads; } bool CQuicConnection::receiveDatagram(NLMISC::CBitMemStream &msgin) { + if (!m->ReceiveEnable) + return false; + int writes = m->BufferWrites; int reads = m->BufferReads; if (writes != reads) - ; { CFifoAccessor access(&m->Buffer); CBufFIFO *fifo = &access.value(); diff --git a/ryzom/client/src/quic_connection.h b/ryzom/client/src/quic_connection.h index e3168670a0..7521325c2c 100644 --- a/ryzom/client/src/quic_connection.h +++ b/ryzom/client/src/quic_connection.h @@ -59,13 +59,13 @@ class CQuicConnection /// Connect void connect(const NLNET::CInetHost &addr); // const CInetAddress &addr); - /// Shutdown and close gracefully, this object can be reused immediately for a new connection - void disconnect(bool blocking); + /// Shutdown and close gracefully, this object can be reused immediately for a new connection even if non-blocking + void disconnect(bool blocking = false); /// Update connection state void update(); - /// Release. Instance is useless after this call + /// Release QUIC resources asynchronously (during update), library can be safely reused even after this call void release(); /// Check if still connecting or connected @@ -91,7 +91,7 @@ class CQuicConnection inline bool connected() const { return state() == Connected; } /// Send a datagram, fancier than a telegram, but not as reliable - void sendDatagram(const uint8 *buffer, uint32 size); + bool sendDatagram(const uint8 *buffer, uint32 size); /// Check if any datagram has been received bool datagramAvailable() const;