From 1b5cc9d70a8ee9fecb48abd7138b337bae65b838 Mon Sep 17 00:00:00 2001 From: kaetemi Date: Fri, 24 Feb 2023 10:33:55 +0800 Subject: [PATCH] Asynchronous QUIC connection and disconnection in client, ref ryzom/ryzomcore#628 --- ryzom/client/src/quic_connection.cpp | 299 ++++++++++++++++----------- ryzom/client/src/quic_connection.h | 9 + 2 files changed, 190 insertions(+), 118 deletions(-) diff --git a/ryzom/client/src/quic_connection.cpp b/ryzom/client/src/quic_connection.cpp index 45716b1a47..72cad2aa2a 100644 --- a/ryzom/client/src/quic_connection.cpp +++ b/ryzom/client/src/quic_connection.cpp @@ -43,13 +43,23 @@ using namespace NLNET; class CQuicConnectionImpl { public: + // The desired state change + enum TDesiredStateChange + { + NoChange, + Connect, + Disconnect + }; + CQuicConnectionImpl() : Api(NULL) , Registration(NULL) , Configuration(NULL) + , Connection(NULL) + , DesiredStateChange(NoChange) + , BufferReads(0) , State(CQuicConnection::Disconnected) - , ShutdownFlag(false) - , MaxSendLength(NULL) + , MaxSendLength(0) { } @@ -58,26 +68,35 @@ class CQuicConnectionImpl HQUIC Configuration; HQUIC Connection; - bool ConnectingAddrSet; - NLNET::CInetHost ConnectingAddr; - - CMutex BufferMutex; - NLMISC::CBufFIFO Buffer; - - CMutex StateMutex; - CQuicConnection::TState State; - - CMutex ShutdownMutex; // Don't have fancy atomics and synchronization primitives here, so mutex everything! - bool ShutdownFlag; - - uint32 MaxSendLength; - - + // This value is set by connect and disconnect, + // and checked from the update call. (main thread only) + TDesiredStateChange DesiredStateChange; + CInetHost DesiredAddr; + + // This is the FIFO buffer for all datagrams that are received. + // It is filled by the receive callback (any thread), and emptied by the receiveDatagram call (main thread only). + CSynchronizedFIFO Buffer; // (any thread) + CAtomicInt BufferWrites; // (any thread) + int BufferReads; // If BufferReads == BufferWrites, then there's no more datagrams (main thread only) + + // Current state of the connection + CQuicConnection::TState State; // Updated by the update call (main thread only) + CAtomicFlag ConnectedFlag; // Set by the callback thread, checked by update (main) + CAtomicFlag ShuttingDownFlag; // Set by the callback thread, checked by the update call (main thread), reset by update on complete disconnection + CAtomicFlag ShutdownFlag; // Set by the callback thread, checked by the update call (main thread), reset by update on complete disconnection + + CAtomicInt MaxSendLength; // Set by the callback thread, not used currently + static QUIC_STATUS #ifdef _Function_class_ _Function_class_(QUIC_CONNECTION_CALLBACK) #endif connectionCallback(HQUIC connection, void *context, QUIC_CONNECTION_EVENT *ev); + + void update(); + void connect(); + void shutdown(); + void close(); }; CQuicConnection::CQuicConnection() @@ -116,30 +135,39 @@ CQuicConnection::~CQuicConnection() void CQuicConnection::connect(const NLNET::CInetHost &addr) { - disconnect(false); + m->DesiredStateChange = CQuicConnectionImpl::Connect; + m->DesiredAddr = addr; + update(); +} + +void CQuicConnectionImpl::connect() +{ + CQuicConnectionImpl *m = this; + + nlassert(m->State == CQuicConnection::Disconnected); + nlassert(m->DesiredStateChange == Connect); + + NLNET::CInetHost addr = m->DesiredAddr; + + // Reset state + m->DesiredStateChange = NoChange; + m->ConnectedFlag.clear(); + m->ShuttingDownFlag.clear(); + m->ShutdownFlag.clear(); + m->MaxSendLength = 0; + m->BufferReads = m->BufferWrites; if (!MsQuic) { nlwarning("QUIC API not available"); + return; } if (!addr.isValid()) { return; } - - { - CUnlockableAutoMutex lock(m->StateMutex); - if (m->State != Disconnected) - { - lock.unlock(); - m->ConnectingAddr = addr; - m->ConnectingAddrSet = true; - return; // Try again in update() - } - m->State = Connecting; - } - + static const char *protocolName = "ryzomcore4"; static const QUIC_BUFFER alpn = { sizeof(protocolName) - 1, (uint8_t *)protocolName }; @@ -165,8 +193,7 @@ void CQuicConnection::connect(const NLNET::CInetHost &addr) status = MsQuic->ConfigurationOpen(m->Registration, &alpn, 1, &settings, sizeof(settings), NULL, &m->Configuration); if (QUIC_FAILED(status)) { - m->ShutdownFlag = true; - disconnect(false); + close(); nlwarning("MsQuic->ConfigurationOpen failed with status 0x%x", status); return; } @@ -179,8 +206,7 @@ void CQuicConnection::connect(const NLNET::CInetHost &addr) status = MsQuic->ConfigurationLoadCredential(m->Configuration, &credConfig); if (QUIC_FAILED(status)) { - m->ShutdownFlag = true; - disconnect(false); + close(); MsQuic->ConfigurationClose(m->Configuration); m->Configuration = nullptr; nlwarning("MsQuic->ConfigurationLoadCredential failed with status 0x%x", status); @@ -192,8 +218,7 @@ void CQuicConnection::connect(const NLNET::CInetHost &addr) status = MsQuic->ConnectionOpen(m->Registration, CQuicConnectionImpl::connectionCallback, (void *)this, &m->Connection); if (QUIC_FAILED(status)) { - m->ShutdownFlag = true; - disconnect(false); + close(); nlwarning("MsQuic->ConnectionOpen failed with status 0x%x", status); return; } @@ -202,79 +227,113 @@ void CQuicConnection::connect(const NLNET::CInetHost &addr) status = MsQuic->ConnectionStart(m->Connection, m->Configuration, QUIC_ADDRESS_FAMILY_UNSPEC, nlUtf8ToMbcs(addr.hostname()), addr.port()); if (QUIC_FAILED(status)) { - m->ShutdownFlag = true; - disconnect(false); + close(); nlwarning("MsQuic->ConnectionStart to %s failed with status 0x%x", addr.toStringLong().c_str(), status); return; } // Check + m->State = CQuicConnection::Connecting; update(); } void CQuicConnection::disconnect(bool blocking) { - // Stop connection - if (m->Connection) + m->DesiredStateChange = CQuicConnectionImpl::Disconnect; + if (blocking) { - MsQuic->ConnectionShutdown(m->Connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); - if (blocking) + try { - try + while (m->State != CQuicConnection::Disconnected) { - for (;;) // Spin wait because we don't have fancy mechanisms when supporting legacy code base - { - CAutoMutex lock(m->ShutdownMutex); - if (m->ShutdownFlag) - break; - nlSleep(1); - } - } - catch (const std::exception &e) - { - nlwarning("Exception while waiting for connection shutdown: %s", e.what()); + nlSleep(1); + update(); } + return; + } + catch (const std::exception &e) + { + m->close(); // Not very safe safety fallback + nlwarning("Exception while waiting for connection shutdown: %s", e.what()); } } - m->ConnectingAddr.clear(); - m->ConnectingAddrSet = false; +} - // Check +void CQuicConnectionImpl::shutdown() +{ + CQuicConnectionImpl *m = this; + if (m->Connection) + { + MsQuic->ConnectionShutdown(m->Connection, QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, 0); + } update(); } -void CQuicConnection::update() +void CQuicConnectionImpl::close() { - bool shutdownFlag; + CQuicConnectionImpl *m = this; + if (m->Connection) { - CAutoMutex lock(m->ShutdownMutex); - shutdownFlag = m->ShutdownFlag; + MsQuic->ConnectionClose(m->Connection); + m->Connection = NULL; } - if (shutdownFlag) +} + +// All the state updates and switches are done here +void CQuicConnectionImpl::update() +{ + CQuicConnectionImpl *m = this; + + // Simple case, initial state, user wants to connect and we're disconnected + if (m->DesiredStateChange == Connect + && m->State == CQuicConnection::Disconnected) { + nlassert(!m->Connection); + connect(); + return; + } + + // Also simple case + if (m->DesiredStateChange == Disconnect + && m->State == CQuicConnection::Connected) + { + m->State = CQuicConnection::Disconnecting; + shutdown(); + return; + } + + // Connected! + if (m->ConnectedFlag.test()) + { + nlassert(m->State == CQuicConnection::Connecting); + m->State = CQuicConnection::Connected; + // We're good! + } + + // Shutting down, this is just a limbo state + if (m->ShuttingDownFlag.test()) + { + if (m->State == CQuicConnection::Connected) { - CAutoMutex lock(m->StateMutex); - m->State = Disconnected; - } - if (m->Connection) - { - MsQuic->ConnectionClose(m->Connection); - m->Connection = NULL; - } - { - CAutoMutex lock(m->ShutdownMutex); - shutdownFlag = false; + m->State = CQuicConnection::Disconnecting; } } - if (m->ConnectingAddrSet) + + // Shutdown complete, now close! + if (m->ShutdownFlag.test()) { - CInetHost addr = m->ConnectingAddr; - m->ConnectingAddr.clear(); - m->ConnectingAddrSet = false; - connect(addr); + nlassert(m->State != CQuicConnection::Disconnected); + nlassert(m->Connection); + close(); + m->State = CQuicConnection::Disconnected; } } +void CQuicConnection::update() +{ + m->update(); +} + void CQuicConnection::release() { // Close configuration @@ -301,10 +360,14 @@ void CQuicConnection::release() CQuicConnection::TState CQuicConnection::state() const { - CAutoMutex lock(m->StateMutex); return m->State; } +uint32 CQuicConnection::maxSendLength() const +{ + return m->MaxSendLength; +} + QUIC_STATUS #ifdef _Function_class_ _Function_class_(QUIC_CONNECTION_CALLBACK) @@ -317,44 +380,27 @@ _Function_class_(QUIC_CONNECTION_CALLBACK) switch (ev->Type) { case QUIC_CONNECTION_EVENT_CONNECTED: { + m->ConnectedFlag.testAndSet(); nlinfo("Connected"); nlassert(CStringView((const char *)ev->CONNECTED.NegotiatedAlpn, ev->CONNECTED.NegotiatedAlpnLength) == "ryzomcore4"); - // MsQuic->ConnectionSendResumptionTicket(connection, QUIC_SEND_RESUMPTION_FLAG_NONE, 0, NULL); // What does this even do? - { - CAutoMutex lock(m->StateMutex); - m->State = CQuicConnection::Connected; - } status = QUIC_STATUS_SUCCESS; break; } case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT: { - nlinfo("Shutdown initiated by transport"); - { - CAutoMutex lock(m->StateMutex); - m->State = CQuicConnection::Disconnecting; - } + m->ShuttingDownFlag.testAndSet(); + m->MaxSendLength = 0; status = QUIC_STATUS_SUCCESS; break; } case QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_PEER: { - nlinfo("Shutdown initiated by peer"); - { - CAutoMutex lock(m->StateMutex); - m->State = CQuicConnection::Disconnecting; - } + m->ShuttingDownFlag.testAndSet(); + m->MaxSendLength = 0; status = QUIC_STATUS_SUCCESS; break; } case QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE: { - nlinfo("Shutdown complete"); - { - CAutoMutex lock(m->StateMutex); - m->State = CQuicConnection::Disconnecting; - } - { - CAutoMutex lock(m->ShutdownMutex); - m->ShutdownFlag = true; - } + m->ShutdownFlag.testAndSet(); + m->MaxSendLength = 0; status = QUIC_STATUS_SUCCESS; break; } @@ -390,7 +436,7 @@ _Function_class_(QUIC_CONNECTION_CALLBACK) void CQuicConnection::sendDatagram(const uint8 *buffer, uint32 size) { - if (m->Connection) + if (m->Connection && size < m->MaxSendLength) { QUIC_BUFFER buf; buf.Buffer = (uint8 *)buffer; @@ -401,27 +447,44 @@ void CQuicConnection::sendDatagram(const uint8 *buffer, uint32 size) bool CQuicConnection::datagramAvailable() { - CAutoMutex lock(m->BufferMutex); - return !m->Buffer.empty(); + // CFifoAccessor access(&m->Buffer); + // return !access.value().empty(); + return m->BufferWrites != m->BufferReads; } bool CQuicConnection::receiveDatagram(NLMISC::CBitMemStream &msgin) { - CAutoMutex lock(m->BufferMutex); - if (!m->Buffer.empty()) - return false; - uint8 *buffer; - uint32 size; - m->Buffer.front(buffer, size); - msgin.clear(); - memcpy(msgin.bufferToFill(size), buffer, size); - return true; + int writes = m->BufferWrites; + int reads = m->BufferReads; + if (writes != reads); + { + CFifoAccessor access(&m->Buffer); + CBufFIFO *fifo = &access.value(); + if (fifo->empty()) + { + m->BufferReads = writes; // Update value (not a real counter) + return false; + } + uint8 *buffer; + uint32 size; + fifo->front(buffer, size); + msgin.clear(); + memcpy(msgin.bufferToFill(size), buffer, size); + fifo->pop(); + if (fifo->empty()) + { + m->BufferReads = writes; // Update value (not a real counter) + } + return true; + } } void CQuicConnection::datagramReceived(const uint8 *buffer, uint32 length) { - CAutoMutex lock(m->BufferMutex); - m->Buffer.push(buffer, length); + CFifoAccessor access(&m->Buffer); + CBufFIFO *fifo = &access.value(); + fifo->push(buffer, length); + ++m->BufferWrites; } #else diff --git a/ryzom/client/src/quic_connection.h b/ryzom/client/src/quic_connection.h index 286d35ca34..e99533fb96 100644 --- a/ryzom/client/src/quic_connection.h +++ b/ryzom/client/src/quic_connection.h @@ -70,6 +70,9 @@ class CQuicConnection /// Check if still connecting or connected TState state() const; + + /// Check the maximum datagram length + uint32 maxSendLength() const; /// Check if the connection is in a limbo state inline bool limbo() const @@ -77,6 +80,12 @@ class CQuicConnection TState s = state(); return s == Connecting || s == Disconnecting; } + + /// Check if we can send + inline bool canSend() const + { + return state() == Connected && maxSendLength() > 0; + } /// Check if the connection is connected inline bool connected() const { return state() == Connected; }