Skip to content

Commit

Permalink
Implement QUIC receive and send for server, draft, ref #628
Browse files Browse the repository at this point in the history
  • Loading branch information
kaetemi committed Feb 21, 2023
1 parent 312be3c commit 0167391
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 65 deletions.
2 changes: 0 additions & 2 deletions nel/include/nel/misc/types_nl.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@
# define NL_CPP11
#endif

#define null nullptr

#ifdef NL_CPP14
#define NL_ALIGNLIKE(type) alignas(alignof(type))
#else
Expand Down
5 changes: 5 additions & 0 deletions ryzom/server/src/frontend_service/client_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,11 @@ CClientHost::~CClientHost()
{
//REMOVE_PROPERTY_FROM_EMITER( _Id, uint16, "AvailImpulseBitsize" );
//_Id = CEntityId::Unknown;

if (QuicUser.get() && QuicUser->ClientHost == this)
{
QuicUser->ClientHost = nullptr;
}
}


Expand Down
17 changes: 12 additions & 5 deletions ryzom/server/src/frontend_service/client_host.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
#include "game_share/ryzom_entity_id.h"
#include "game_share/entity_types.h"
#include "game_share/welcome_service_itf.h"
#include "quic_transceiver.h"

#include <vector>
#include <deque>


const uint32 FirstClientId = 1;
const uint16 InvalidClientId = 0xFFFF;

Expand All @@ -49,7 +49,6 @@ namespace NLNET

struct TPairState;


/**
* CClientIdPool
*/
Expand Down Expand Up @@ -108,7 +107,7 @@ class CClientHost
{
public:
/// Constructor
CClientHost( const NLNET::CInetAddress& addr, TClientId id ) :
CClientHost( const NLNET::CInetAddress& addr, CQuicUserContext *quicUser, TClientId id ) :
Uid (0xFFFFFFFF),
InstanceId(0xFFFFFFFF),
StartupRole(WS::TUserRole::ur_player),
Expand Down Expand Up @@ -145,12 +144,17 @@ class CClientHost
//
//AvailableImpulseBitsize( "AvailImpulseBitsize", MaxImpulseBitSizes[2] ),
NbActionsSentAtCycle(0),
QuitId(0)
QuitId(0),
QuicUser(quicUser)
{
IdTranslator.setId( id );
ImpulseEncoder.setClientHost( this );
ConnectionState = Synchronize;
initClientBandwidth();
if (quicUser)
{
quicUser->ClientHost = this;
}
}

/// Destructor
Expand Down Expand Up @@ -495,6 +499,8 @@ class CClientHost
/// Quit Id
uint32 QuitId;

CQuicUserContextPtr QuicUser;

private:

/// Client IP and port
Expand Down Expand Up @@ -591,7 +597,7 @@ class CLimboClient
{
public:
CLimboClient( CClientHost* client ) :
AddrFrom(client->address()), Uid(client->Uid), UserName(client->UserName), UserPriv(client->UserPriv), UserExtended(client->UserExtended),
AddrFrom(client->address()), QuicUser(client->QuicUser), Uid(client->Uid), UserName(client->UserName), UserPriv(client->UserPriv), UserExtended(client->UserExtended),
LanguageId(client->LanguageId), QuitId(client->QuitId)
{
// Set limbo timeout start
Expand All @@ -600,6 +606,7 @@ class CLimboClient
}

NLNET::CInetAddress AddrFrom;
CQuicUserContextPtr QuicUser;
TUid Uid;
std::string UserName, UserPriv, UserExtended, LanguageId;
uint32 QuitId;
Expand Down
37 changes: 26 additions & 11 deletions ryzom/server/src/frontend_service/fe_receive_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,14 @@ void CFeReceiveSub::init( uint16 firstAcceptableFrontendPort, uint16 lastAccepta
_ReceiveThread = IThread::create(_ReceiveTask);
nlassert(_ReceiveThread != NULL);
_ReceiveThread->start();
uint16 udpPort = _ReceiveTask->DataSock->localAddr().port();

// Start QUIC transceiver
nlinfo("FERECV: Starting QUIC transceiver");
m_QuicTransceiver = new CQuicTransceiver(firstAcceptableFrontendPort - 5000, lastAcceptableFrontendPort - 5000, dgrammaxlength);
m_QuicTransceiver = new CQuicTransceiver(dgrammaxlength);
m_CurrentQuicReadQueue = &m_Queues[3];
m_QuicTransceiver->swapWriteQueue(&m_Queues[2]);
m_QuicTransceiver->start();
m_QuicTransceiver->start((udpPort ? udpPort : firstAcceptableFrontendPort) - 5000);

// Setup current message placeholder
_CurrentInMsg = new TReceivedMessage();
Expand Down Expand Up @@ -245,24 +246,31 @@ void CFeReceiveSub::readIncomingData()
}

// Read queue of messages received from clients
_CurrentInMsg->AddrFrom.setNull(); // FIXME: QUIC
_CurrentInMsg->AddrFrom.setNull();
while (!m_CurrentQuicReadQueue->empty())
{
// nlinfo( "Read queue size = %u", _CurrentReadQueue->size() );
m_CurrentQuicReadQueue->front(_CurrentInMsg->data());
m_CurrentQuicReadQueue->pop();
nlassert(!m_CurrentQuicReadQueue->empty());
m_CurrentQuicReadQueue->front(_CurrentInMsg->VAddrFrom);
uint8 *buffer;
uint32 size;
m_CurrentQuicReadQueue->front(buffer, size);
nlassert(size == sizeof(_CurrentInMsg->QuicUser));
memcpy(&_CurrentInMsg->QuicUser, buffer, size);
CQuicUserContextRelease(_CurrentInMsg->QuicUser); // Decrease ref count after handling message
m_CurrentQuicReadQueue->pop();
// _CurrentInMsg->vectorToAddress(); // FIXME: QUIC

// Use token address for user mapping, since it'll stay constant
_CurrentInMsg->AddrFrom = _CurrentInMsg->QuicUser->TokenAddr;

#ifndef MEASURE_RECEIVE_TASK
handleIncomingMsg();
#endif
}

// _CurrentInMsg->AddrFrom.setNull(); // FIXME: QUIC
// Read queue of messages received from clients
_CurrentInMsg->QuicUser = nullptr;
while (!m_CurrentReadQueue->empty())
{
// nlinfo( "Read queue size = %u", m_CurrentReadQueue->size() );
Expand Down Expand Up @@ -341,6 +349,13 @@ void CFeReceiveSub::handleIncomingMsg()
#ifndef SIMUL_CLIENTS

//nldebug( "FERECV: Handling incoming message" );

if (_CurrentInMsg->QuicUser)
{
// TODO: Bypass _ClientMap lookup
// _CurrentInMsg->QuicUser->ClientHost
// _CurrentInMsg->QuicUser->ClientHost->QuicUser
}

// Retrieve client info or add one
THostMap::iterator ihm = _ClientMap.find( _CurrentInMsg->AddrFrom );
Expand Down Expand Up @@ -404,7 +419,7 @@ bool CFeReceiveSub::acceptUserIdConnection( TUid userId )
/*
* Add client
*/
CClientHost *CFeReceiveSub::addClient( const NLNET::CInetAddress& addrfrom, TUid userId, const string &userName, const string &userPriv, const std::string & userExtended, const std::string & languageId, const NLNET::CLoginCookie &cookie, uint32 instanceId, uint8 authorizedCharSlot, bool sendCLConnect )
CClientHost *CFeReceiveSub::addClient( const NLNET::CInetAddress& addrfrom, CQuicUserContext *quicUser, TUid userId, const string &userName, const string &userPriv, const std::string & userExtended, const std::string & languageId, const NLNET::CLoginCookie &cookie, uint32 instanceId, uint8 authorizedCharSlot, bool sendCLConnect )
{
MEM_DELTA_MULTI_LAST2(Client,AddClient);
if ( ! acceptUserIdConnection( userId ) )
Expand All @@ -430,7 +445,7 @@ CClientHost *CFeReceiveSub::addClient( const NLNET::CInetAddress& addrfrom, TUid
THostMap::iterator cmPreviousEnd = _ClientMap.end();
{
MEM_DELTA_MULTI2(CClientHost,New);
clienthost = new CClientHost( addrfrom, clientid );
clienthost = new CClientHost( addrfrom, quicUser, clientid );
}
nlassert( clienthost );
nlinfo( "Adding client %u (uid %u name %s priv '%s') at %s", clientid, userId, userName.c_str(), userPriv.c_str(), addrfrom.asIPString().c_str() );
Expand Down Expand Up @@ -462,7 +477,7 @@ CClientHost *CFeReceiveSub::addClient( const NLNET::CInetAddress& addrfrom, TUid
clienthost->AuthorizedCharSlot = authorizedCharSlot;

clienthost->initSendCycle( false );
CFrontEndService::instance()->sendSub()->setSendBufferAddress( clientid, &addrfrom );
CFrontEndService::instance()->sendSub()->setSendBufferAddress( clientid, &addrfrom, quicUser );

//This must be commented out when the GPMS always activates slot 0
//CFrontEndService::instance()->PrioSub.Prioritizer.addEntitySeenByClient( clientid, 0 );
Expand Down Expand Up @@ -572,7 +587,7 @@ CClientHost *CFeReceiveSub::exitFromLimboMode( const CLimboClient& lc )
{
nldebug( "Restoring user %u from limbo mode", lc.Uid );

CClientHost *client = addClient( lc.AddrFrom, lc.Uid, lc.UserName, lc.UserPriv, lc.UserExtended, lc.LanguageId, lc.LoginCookie, 0xF, false );
CClientHost *client = addClient(lc.AddrFrom, lc.QuicUser.get(), lc.Uid, lc.UserName, lc.UserPriv, lc.UserExtended, lc.LanguageId, lc.LoginCookie, 0xF, false);
NLMISC::TGameCycle tick = CTickEventHandler::getGameCycle();
client->setFirstSentPacket( client->sendNumber()+1, tick );
client->setSynchronizeState();
Expand Down Expand Up @@ -935,7 +950,7 @@ void CFeReceiveSub::handleReceivedMsg( CClientHost *clienthost )
// ALWAYS REMOVE CLIENT FROM LIMBO!
LimboClients.erase(uid);

clienthost = addClient( _CurrentInMsg->AddrFrom, uid, userName, userPriv, userExtended, languageId, lc, instanceId, (uint8)charSlot );
clienthost = addClient( _CurrentInMsg->AddrFrom, _CurrentInMsg->QuicUser, uid, userName, userPriv, userExtended, languageId, lc, instanceId, (uint8)charSlot );

// Check if the addition worked
if ( clienthost != NULL )
Expand Down
2 changes: 1 addition & 1 deletion ryzom/server/src/frontend_service/fe_receive_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class CFeReceiveSub
void release();

/// Add client
CClientHost *addClient( const NLNET::CInetAddress& addrfrom, TUid userId, const std::string &userName, const std::string &userPriv, const std::string & userExtended, const std::string & languageId, const NLNET::CLoginCookie &cookie, uint32 instanceId, uint8 authorisedCharSlot, bool sendCLConnect=true );
CClientHost *addClient( const NLNET::CInetAddress& addrfrom, CQuicUserContext *quicUser, TUid userId, const std::string &userName, const std::string &userPriv, const std::string & userExtended, const std::string & languageId, const NLNET::CLoginCookie &cookie, uint32 instanceId, uint8 authorisedCharSlot, bool sendCLConnect=true );

/// Add to the list of clients which will be removed by addr at the three cycles later (leaving the time to send an impulsion to the client)
void addToRemoveList( TClientId clientid ) { _ClientsToRemove.push_back( std::make_pair(clientid,3) ); }
Expand Down
2 changes: 1 addition & 1 deletion ryzom/server/src/frontend_service/fe_receive_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ volatile uint32 CFEReceiveTask::LastUDPPacketReceived = 0;
*/

/// Constructor
TReceivedMessage::TReceivedMessage()
TReceivedMessage::TReceivedMessage() : QuicUser(nullptr)
{
VAddrFrom.resize(sizeof(sockaddr_in6));
}
Expand Down
6 changes: 5 additions & 1 deletion ryzom/server/src/frontend_service/fe_receive_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

const uint32 MsgHeaderSize = 1;

class CQuicUserContext;

/**
* Placeholder for received messages
Expand Down Expand Up @@ -85,6 +86,9 @@ struct TReceivedMessage

/// Placeholder vector for address info
std::vector<uint8> VAddrFrom;

/// QUIC user context, no need to refcount since it's already held in the FIFO readout
CQuicUserContext *QuicUser;
};


Expand All @@ -108,7 +112,7 @@ class CFEReceiveTask : public NLMISC::IRunnable
virtual void run();

/// Set new write queue (thread-safe because mutexed)
CBufFIFO *swapWriteQueue(NLMISC::CBufFIFO *writeQueue);
NLMISC::CBufFIFO *swapWriteQueue(NLMISC::CBufFIFO *writeQueue);

/// Require exit (thread-safe because atomic assignment)
void requireExit() { _ExitRequired = true; }
Expand Down
11 changes: 9 additions & 2 deletions ryzom/server/src/frontend_service/fe_send_sub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,15 @@ inline void CFeSendSub::CSendBuffer::sendOutBox( NLNET::CUdpSock *datasock )
{
if ( OutBox.length() != 0 )
{
sendUDP( datasock, OutBox.buffer(), OutBox.length(), &DestAddress );
//nlinfo( "Sent %u bytes to %s", OutBox.length(), DestAddress.asString().c_str() );
if (QuicUser.get())
{
QuicUser->Transceiver->sendDatagram(QuicUser.get(), OutBox.buffer(), OutBox.length());
}
else
{
sendUDP(datasock, OutBox.buffer(), OutBox.length(), &DestAddress);
//nlinfo( "Sent %u bytes to %s", OutBox.length(), DestAddress.asString().c_str() );
}
}

#ifdef MEASURE_SENDING
Expand Down
12 changes: 7 additions & 5 deletions ryzom/server/src/frontend_service/fe_send_sub.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class CFeSendSub

/// Destination address
NLNET::CInetAddress DestAddress;
CQuicUserContextPtr QuicUser;

/// Used (connected) or not
volatile TSBState SBState;
Expand All @@ -59,13 +60,14 @@ class CFeSendSub
TOutBox OutBox;

/// Default constructor
CSendBuffer() : SBState(false), OutBox(false, 512) {} // prealloc 512 bytes to avoid bitmemstream reallocation
CSendBuffer() : SBState(false), OutBox(false, 512), QuicUser(nullptr) {} // prealloc 512 bytes to avoid bitmemstream reallocation

/// Set the new address. state should be SBReady or SBNotReady only.
void setAddress( const NLNET::CInetAddress *addr, TSBState state )
void setAddress( const NLNET::CInetAddress *addr, CQuicUserContext *quicUser, TSBState state )
{
// Copy the address (just a link would not work)
DestAddress = *addr;
QuicUser = quicUser;
SBState = state;
}

Expand Down Expand Up @@ -127,11 +129,11 @@ class CFeSendSub
volatile uint32 &sendCounter() { return _SendCounter; }

/// Set the address for send buffer, to match a connected client
void setSendBufferAddress( TClientId id, const NLNET::CInetAddress *addr )
void setSendBufferAddress( TClientId id, const NLNET::CInetAddress *addr, CQuicUserContext *quicUser )
{
// We set the address for both, but we can't enable the buffer yet
((*_CurrentFillingBuffers)[id]).setAddress( addr, false /*true*/ );
((*_CurrentFlushingBuffers)[id]).setAddress( addr, false );
((*_CurrentFillingBuffers)[id]).setAddress( addr, quicUser, false /*true*/ );
((*_CurrentFlushingBuffers)[id]).setAddress( addr, quicUser, false );
//_BuffersToEnable.insert( id ); // OBSOLETE: now it is enabled/disabled in CFeSendSub::fillPrioritizedActions
}

Expand Down
Loading

0 comments on commit 0167391

Please sign in to comment.