Skip to content

Commit

Permalink
Implementing QUIC support on the server side, ref #628
Browse files Browse the repository at this point in the history
  • Loading branch information
kaetemi committed Feb 21, 2023
1 parent ecfea66 commit 312be3c
Show file tree
Hide file tree
Showing 7 changed files with 636 additions and 39 deletions.
2 changes: 2 additions & 0 deletions nel/include/nel/misc/types_nl.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@
# define NL_CPP11
#endif

#define null nullptr

#ifdef NL_CPP14
#define NL_ALIGNLIKE(type) alignas(alignof(type))
#else
Expand Down
68 changes: 42 additions & 26 deletions ryzom/server/src/frontend_service/fe_receive_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include "uid_impulsions.h"
#include "id_impulsions.h"
#include "quic_transceiver.h"

#include "game_share/ryzom_entity_id.h"
#include "game_share/system_message.h"
Expand Down Expand Up @@ -124,15 +125,22 @@ void CFeReceiveSub::init( uint16 firstAcceptableFrontendPort, uint16 lastAccepta
nlassert( (_ReceiveTask==NULL) && (_ReceiveThread==NULL) );

// Start external datagram socket
nlinfo( "FERECV: Starting external datagram socket" );
_ReceiveTask = new CFEReceiveTask( firstAcceptableFrontendPort, lastAcceptableFrontendPort, dgrammaxlength );
_CurrentReadQueue = &_Queue2;
_ReceiveTask->setWriteQueue( &_Queue1 );
nlassert( _ReceiveTask != NULL );
_ReceiveThread = IThread::create( _ReceiveTask );
nlassert( _ReceiveThread != NULL );
nlinfo("FERECV: Starting external datagram socket");
_ReceiveTask = new CFEReceiveTask(firstAcceptableFrontendPort, lastAcceptableFrontendPort, dgrammaxlength);
nlassert(_ReceiveTask != NULL);
m_CurrentReadQueue = &m_Queues[1];
_ReceiveTask->swapWriteQueue(&m_Queues[0]);
_ReceiveThread = IThread::create(_ReceiveTask);
nlassert(_ReceiveThread != NULL);
_ReceiveThread->start();

// Start QUIC transceiver
nlinfo("FERECV: Starting QUIC transceiver");
m_QuicTransceiver = new CQuicTransceiver(firstAcceptableFrontendPort - 5000, lastAcceptableFrontendPort - 5000, dgrammaxlength);
m_CurrentQuicReadQueue = &m_Queues[3];
m_QuicTransceiver->swapWriteQueue(&m_Queues[2]);
m_QuicTransceiver->start();

// Setup current message placeholder
_CurrentInMsg = new TReceivedMessage();

Expand Down Expand Up @@ -237,14 +245,32 @@ void CFeReceiveSub::readIncomingData()
}

// Read queue of messages received from clients
while ( ! _CurrentReadQueue->empty() )
_CurrentInMsg->AddrFrom.setNull(); // FIXME: QUIC
while (!m_CurrentQuicReadQueue->empty())
{
//nlinfo( "Read queue size = %u", _CurrentReadQueue->size() );
_CurrentReadQueue->front( _CurrentInMsg->data() );
_CurrentReadQueue->pop();
nlassert( ! _CurrentReadQueue->empty() );
_CurrentReadQueue->front( _CurrentInMsg->VAddrFrom );
_CurrentReadQueue->pop();
// nlinfo( "Read queue size = %u", _CurrentReadQueue->size() );
m_CurrentQuicReadQueue->front(_CurrentInMsg->data());
m_CurrentQuicReadQueue->pop();
nlassert(!m_CurrentQuicReadQueue->empty());
m_CurrentQuicReadQueue->front(_CurrentInMsg->VAddrFrom);
m_CurrentQuicReadQueue->pop();
// _CurrentInMsg->vectorToAddress(); // FIXME: QUIC

#ifndef MEASURE_RECEIVE_TASK
handleIncomingMsg();
#endif
}

// _CurrentInMsg->AddrFrom.setNull(); // FIXME: QUIC
// Read queue of messages received from clients
while (!m_CurrentReadQueue->empty())
{
// nlinfo( "Read queue size = %u", m_CurrentReadQueue->size() );
m_CurrentReadQueue->front(_CurrentInMsg->data());
m_CurrentReadQueue->pop();
nlassert(!m_CurrentReadQueue->empty());
m_CurrentReadQueue->front(_CurrentInMsg->VAddrFrom);
m_CurrentReadQueue->pop();
_CurrentInMsg->vectorToAddress();

#ifndef MEASURE_RECEIVE_TASK
Expand Down Expand Up @@ -299,18 +325,8 @@ void CFeReceiveSub::readIncomingData()
*/
void CFeReceiveSub::swapReadQueues()
{
if ( _CurrentReadQueue == &_Queue1 )
{
_CurrentReadQueue = &_Queue2;
_ReceiveTask->setWriteQueue( &_Queue1 );
//nlinfo( "** Write1 Read2 ** Read=%p Write=%p", &_Queue2, &_Queue1 );
}
else
{
_CurrentReadQueue = &_Queue1;
_ReceiveTask->setWriteQueue( &_Queue2 );
//nlinfo( "** Read1 Write2 ** Read=%p Write=%p", &_Queue1, &_Queue2 );
}
m_CurrentReadQueue = _ReceiveTask->swapWriteQueue(m_CurrentReadQueue);
m_CurrentQuicReadQueue = m_QuicTransceiver->swapWriteQueue(m_CurrentQuicReadQueue);
}


Expand Down
21 changes: 12 additions & 9 deletions ryzom/server/src/frontend_service/fe_receive_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class CHistory;
class CVision;
class CVisionData;

class CQuicTransceiver;

/// Type of remove list
typedef std::list< std::pair<TClientId,uint8> > TClientsToRemove;
Expand Down Expand Up @@ -94,11 +95,11 @@ class CFeReceiveSub
EntityToClient(),
_ReceiveTask(NULL),
_ReceiveThread(NULL),
m_QuicTransceiver(NULL),
_ClientMap(),
_ClientIdCont(NULL),
_Queue1(),
_Queue2(),
_CurrentReadQueue(NULL),
m_CurrentReadQueue(NULL),
m_CurrentQuicReadQueue(NULL),
_CurrentInMsg(),
_RcvCounter(0),
_RcvBytes(0),
Expand Down Expand Up @@ -214,21 +215,23 @@ class CFeReceiveSub

/// Receive thread
NLMISC::IThread *_ReceiveThread;

/// QUIC receiver and sender
CQuicTransceiver *m_QuicTransceiver;

/// Client map by address
THostMap _ClientMap;

/// Client map by id (belonging to the send subsystem)
TClientIdCont *_ClientIdCont;

/// First queue
NLMISC::CBufFIFO _Queue1;

/// Second queue
NLMISC::CBufFIFO _Queue2;
/// Queues
NLMISC::CBufFIFO m_Queues[4];

/// Current read queue
NLMISC::CBufFIFO *_CurrentReadQueue;
NLMISC::CBufFIFO *m_CurrentReadQueue;

NLMISC::CBufFIFO *m_CurrentQuicReadQueue;

/// Current incoming message
TReceivedMessage *_CurrentInMsg;
Expand Down
9 changes: 6 additions & 3 deletions ryzom/server/src/frontend_service/fe_receive_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,9 +205,12 @@ void CFEReceiveTask::run()
/*
* Set new write queue
*/
void CFEReceiveTask::setWriteQueue( CBufFIFO *writequeue )
CBufFIFO *CFEReceiveTask::swapWriteQueue(CBufFIFO *writeQueue)
{
CSynchronized<CBufFIFO*>::CAccessor wq( &_WriteQueue );
wq.value() = writequeue;
CSynchronized<CBufFIFO *>::CAccessor wq(&_WriteQueue);
CBufFIFO *previous = wq.value();
wq.value() = writeQueue;
return previous;
}

/* end of file */
2 changes: 1 addition & 1 deletion ryzom/server/src/frontend_service/fe_receive_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class CFEReceiveTask : public NLMISC::IRunnable
virtual void run();

/// Set new write queue (thread-safe because mutexed)
void setWriteQueue( NLMISC::CBufFIFO *writequeue );
CBufFIFO *swapWriteQueue(NLMISC::CBufFIFO *writeQueue);

/// Require exit (thread-safe because atomic assignment)
void requireExit() { _ExitRequired = true; }
Expand Down
Loading

0 comments on commit 312be3c

Please sign in to comment.