Skip to content

Commit

Permalink
First look at per-session inter-thread channels
Browse files Browse the repository at this point in the history
this is a working implementation of #52.

I have my doubts that this gigantic refactor was worth the effort, because there's still lock contention on Snooze to consider; however, this does substantially reduce the problem described in pmmp/ext-pmmpthread#42, as well as reducing the overhead of inter-thread communication by removing the need to transmit session IDs.

This currently relies on each end of the IPC channels to roundtrip session open/close notifications to setup/cleanup stuff; I'll try to improve this before landing this in main.
  • Loading branch information
dktapps committed Apr 29, 2021
1 parent bd8b753 commit b1f1d6b
Show file tree
Hide file tree
Showing 21 changed files with 726 additions and 264 deletions.
30 changes: 9 additions & 21 deletions src/server/Server.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
use raklib\generic\SocketException;
use raklib\protocol\ACK;
use raklib\protocol\Datagram;
use raklib\protocol\EncapsulatedPacket;
use raklib\protocol\NACK;
use raklib\protocol\Packet;
use raklib\protocol\PacketSerializer;
Expand Down Expand Up @@ -152,10 +151,12 @@ public function tickProcessor() : void{
* The below code is designed to allow co-op between sending and receiving to avoid slowing down either one
* when high traffic is coming either way. Yielding will occur after 100 messages.
*/
$eventGenerator = $this->eventSource->process($this);
do{
$stream = !$this->shutdown;
for($i = 0; $i < 100 && $stream && !$this->shutdown; ++$i){ //if we received a shutdown event, we don't care about any more messages from the event source
$stream = $this->eventSource->process($this);
$eventGenerator->next();
$stream = $eventGenerator->valid();
}

$socket = true;
Expand Down Expand Up @@ -327,13 +328,6 @@ public function getEventListener() : ServerEventListener{
return $this->eventListener;
}

public function sendEncapsulated(int $sessionId, EncapsulatedPacket $packet, bool $immediate = false) : void{
$session = $this->sessions[$sessionId] ?? null;
if($session !== null and $session->isConnected()){
$session->addEncapsulatedToQueue($packet, $immediate);
}
}

public function sendRaw(string $address, int $port, string $payload) : void{
try{
$this->socket->writePacket($payload, $address, $port);
Expand All @@ -342,12 +336,6 @@ public function sendRaw(string $address, int $port, string $payload) : void{
}
}

public function closeSession(int $sessionId) : void{
if(isset($this->sessions[$sessionId])){
$this->sessions[$sessionId]->initiateDisconnect("server disconnect");
}
}

public function setName(string $name) : void{
$this->name = $name;
}
Expand Down Expand Up @@ -383,6 +371,10 @@ public function addRawPacketFilter(string $regex) : void{
$this->rawPacketFilters[] = $regex;
}

public function getSession(int $id) : ?SessionInterface{
return $this->sessions[$id] ?? null;
}

public function getSessionByAddress(InternetAddress $address) : ?Session{
return $this->sessionsByAddress[$address->toString()] ?? null;
}
Expand Down Expand Up @@ -411,9 +403,9 @@ private function removeSessionInternal(Session $session) : void{
unset($this->sessionsByAddress[$session->getAddress()->toString()], $this->sessions[$session->getInternalId()]);
}

public function openSession(Session $session) : void{
public function openSession(Session $session) : SessionEventListener{
$address = $session->getAddress();
$this->eventListener->onClientConnect($session->getInternalId(), $address->ip, $address->port, $session->getID());
return $this->eventListener->onClientConnect($session->getInternalId(), $address->ip, $address->port, $session->getID());
}

private function checkSessions() : void{
Expand All @@ -429,10 +421,6 @@ private function checkSessions() : void{
}
}

public function notifyACK(Session $session, int $identifierACK) : void{
$this->eventListener->onPacketAck($session->getInternalId(), $identifierACK);
}

public function getName() : string{
return $this->name;
}
Expand Down
10 changes: 1 addition & 9 deletions src/server/ServerEventListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,9 @@

interface ServerEventListener{

public function onClientConnect(int $sessionId, string $address, int $port, int $clientID) : void;

public function onClientDisconnect(int $sessionId, string $reason) : void;

public function onPacketReceive(int $sessionId, string $packet) : void;
public function onClientConnect(int $sessionId, string $address, int $port, int $clientID) : SessionEventListener;

public function onRawPacketReceive(string $address, int $port, string $payload) : void;

public function onPacketAck(int $sessionId, int $identifierACK) : void;

public function onBandwidthStatsUpdate(int $bytesSentDiff, int $bytesReceivedDiff) : void;

public function onPingMeasure(int $sessionId, int $pingMS) : void;
}
5 changes: 4 additions & 1 deletion src/server/ServerEventSource.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@

interface ServerEventSource{

public function process(ServerInterface $server) : bool;
/**
* @phpstan-return \Generator<int, null, void, void>
*/
public function process(ServerInterface $server) : \Generator;
}
6 changes: 1 addition & 5 deletions src/server/ServerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

namespace raklib\server;

use raklib\protocol\EncapsulatedPacket;

interface ServerInterface{

public function sendEncapsulated(int $sessionId, EncapsulatedPacket $packet, bool $immediate = false) : void;
public function getSession(int $id) : ?SessionInterface;

public function sendRaw(string $address, int $port, string $payload) : void;

public function closeSession(int $sessionId) : void;

public function setName(string $name) : void;

public function setPortCheck(bool $value) : void;
Expand Down
30 changes: 21 additions & 9 deletions src/server/Session.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
use function microtime;
use function ord;

class Session{
class Session implements SessionInterface{
public const MAX_SPLIT_PART_COUNT = 128;
public const MAX_CONCURRENT_SPLIT_COUNT = 4;

Expand Down Expand Up @@ -90,6 +90,8 @@ class Session{
/** @var SendReliabilityLayer */
private $sendLayer;

private ?SessionEventListener $eventListener = null;

public function __construct(Server $server, \Logger $logger, InternetAddress $address, int $clientId, int $mtuSize, int $internalId){
if($mtuSize < self::MIN_MTU_SIZE){
throw new \InvalidArgumentException("MTU size must be at least " . self::MIN_MTU_SIZE . ", got $mtuSize");
Expand Down Expand Up @@ -120,7 +122,9 @@ function(Datagram $datagram) : void{
$this->sendPacket($datagram);
},
function(int $identifierACK) : void{
$this->server->getEventListener()->onPacketAck($this->internalId, $identifierACK);
if($this->eventListener !== null){
$this->eventListener->onPacketAck($identifierACK);
}
}
);
}
Expand Down Expand Up @@ -195,7 +199,7 @@ private function queueConnectedPacket(ConnectedPacket $packet, int $reliability,
$this->sendLayer->addEncapsulatedToQueue($encapsulated, $immediate);
}

public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immediate) : void{
public function sendEncapsulated(EncapsulatedPacket $packet, bool $immediate = false) : void{
$this->sendLayer->addEncapsulatedToQueue($packet, $immediate);
}

Expand Down Expand Up @@ -231,7 +235,7 @@ private function handleEncapsulatedPacketRoute(EncapsulatedPacket $packet) : voi
if($dataPacket->address->port === $this->server->getPort() or !$this->server->portChecking){
$this->state = self::STATE_CONNECTED; //FINALLY!
$this->isTemporal = false;
$this->server->openSession($this);
$this->eventListener = $this->server->openSession($this);

//$this->handlePong($dataPacket->sendPingTime, $dataPacket->sendPongTime); //can't use this due to system-address count issues in MCPE >.<
$this->sendPing();
Expand All @@ -252,8 +256,8 @@ private function handleEncapsulatedPacketRoute(EncapsulatedPacket $packet) : voi

$this->handlePong($dataPacket->sendPingTime, $dataPacket->sendPongTime);
}
}elseif($this->state === self::STATE_CONNECTED){
$this->server->getEventListener()->onPacketReceive($this->internalId, $packet->buffer);
}elseif($this->eventListener !== null){
$this->eventListener->onPacketReceive($packet->buffer);
}else{
//$this->logger->notice("Received packet before connection: " . bin2hex($packet->buffer));
}
Expand All @@ -264,7 +268,9 @@ private function handleEncapsulatedPacketRoute(EncapsulatedPacket $packet) : voi
*/
private function handlePong(int $sendPingTime, int $sendPongTime) : void{
$this->lastPingMeasure = $this->server->getRakNetTimeMS() - $sendPingTime;
$this->server->getEventListener()->onPingMeasure($this->internalId, $this->lastPingMeasure);
if($this->eventListener !== null){
$this->eventListener->onPingMeasure($this->lastPingMeasure);
}
}

public function handlePacket(Packet $packet) : void{
Expand All @@ -288,7 +294,10 @@ public function initiateDisconnect(string $reason) : void{
$this->state = self::STATE_DISCONNECTING;
$this->disconnectionTime = microtime(true);
$this->queueConnectedPacket(new DisconnectionNotification(), PacketReliability::RELIABLE_ORDERED, 0, true);
$this->server->getEventListener()->onClientDisconnect($this->internalId, $reason);
if($this->eventListener !== null){
$this->eventListener->onDisconnect($reason);
$this->eventListener = null;
}
$this->logger->debug("Requesting graceful disconnect because \"$reason\"");
}
}
Expand All @@ -298,7 +307,10 @@ public function initiateDisconnect(string $reason) : void{
*/
public function forciblyDisconnect(string $reason) : void{
$this->state = self::STATE_DISCONNECTED;
$this->server->getEventListener()->onClientDisconnect($this->internalId, $reason);
if($this->eventListener !== null){
$this->eventListener->onDisconnect($reason);
$this->eventListener = null;
}
$this->logger->debug("Forcibly disconnecting session due to \"$reason\"");
}

Expand Down
47 changes: 47 additions & 0 deletions src/server/SessionEventListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/

declare(strict_types=1);

namespace raklib\server;

interface SessionEventListener{

/**
* Called when the client disconnects, or when RakLib terminates the connection (e.g. due to a timeout).
*/
public function onDisconnect(string $reason) : void;

/**
* Called when a non-RakNet packet is received (user packet).
*/
public function onPacketReceive(string $payload) : void;

/**
* Called when a packet that was sent with a requested ACK receipt is ACKed by the recipient.
*/
public function onPacketAck(int $identifierACK) : void;

/**
* Called when RakLib records a new ping measurement for the session.
*/
public function onPingMeasure(int $pingMS) : void;
}
33 changes: 33 additions & 0 deletions src/server/SessionInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/

declare(strict_types=1);

namespace raklib\server;

use raklib\protocol\EncapsulatedPacket;

interface SessionInterface{

public function sendEncapsulated(EncapsulatedPacket $packet, bool $immediate = false) : void;

public function initiateDisconnect(string $reason) : void;
}
36 changes: 36 additions & 0 deletions src/server/ipc/InterThreadChannelFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?php

/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/

declare(strict_types=1);

namespace raklib\server\ipc;

interface InterThreadChannelFactory{
/**
* Returns an array of two parts: the first is a serialized representation of a channel reader that will be passed
* to an InterThreadChannelReaderDeserializer, and the second is an InterThreadChannelWriter that will be used by
* this thread.
*
* @see InterThreadChannelReaderDeserializer
* @phpstan-return array{string, InterThreadChannelWriter}
*/
public function createChannel() : array;
}
29 changes: 29 additions & 0 deletions src/server/ipc/InterThreadChannelReaderDeserializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

/*
*
* ____ _ _ __ __ _ __ __ ____
* | _ \ ___ ___| | _____| |_| \/ (_)_ __ ___ | \/ | _ \
* | |_) / _ \ / __| |/ / _ \ __| |\/| | | '_ \ / _ \_____| |\/| | |_) |
* | __/ (_) | (__| < __/ |_| | | | | | | | __/_____| | | | __/
* |_| \___/ \___|_|\_\___|\__|_| |_|_|_| |_|\___| |_| |_|_|
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* @author PocketMine Team
* @link http://www.pocketmine.net/
*
*
*/

declare(strict_types=1);

namespace raklib\server\ipc;

interface InterThreadChannelReaderDeserializer{

public function deserialize(string $channelInfo) : ?InterThreadChannelReader;
}
Loading

0 comments on commit b1f1d6b

Please sign in to comment.