From b1f1d6b07d6462081739d8c6af5357cef4dfc223 Mon Sep 17 00:00:00 2001 From: "Dylan K. Taylor" Date: Thu, 29 Apr 2021 01:03:47 +0100 Subject: [PATCH] First look at per-session inter-thread channels this is a working implementation of #52. I have my doubts that this gigantic refactor was worth the effort, because there's still lock contention on Snooze to consider; however, this does substantially reduce the problem described in pmmp/pthreads#42, as well as reducing the overhead of inter-thread communication by removing the need to transmit session IDs. This currently relies on each end of the IPC channels to roundtrip session open/close notifications to setup/cleanup stuff; I'll try to improve this before landing this in main. --- src/server/Server.php | 30 ++-- src/server/ServerEventListener.php | 10 +- src/server/ServerEventSource.php | 5 +- src/server/ServerInterface.php | 6 +- src/server/Session.php | 30 ++-- src/server/SessionEventListener.php | 47 ++++++ src/server/SessionInterface.php | 33 +++++ src/server/ipc/InterThreadChannelFactory.php | 36 +++++ .../InterThreadChannelReaderDeserializer.php | 29 ++++ .../ipc/RakLibToUserThreadMessageProtocol.php | 29 ---- .../ipc/RakLibToUserThreadMessageReceiver.php | 127 +++++++++------- .../ipc/RakLibToUserThreadMessageSender.php | 46 ++---- ...kLibToUserThreadSessionMessageProtocol.php | 52 +++++++ ...kLibToUserThreadSessionMessageReceiver.php | 68 +++++++++ ...RakLibToUserThreadSessionMessageSender.php | 67 +++++++++ .../ipc/UserToRakLibThreadMessageProtocol.php | 21 +-- .../ipc/UserToRakLibThreadMessageReceiver.php | 137 ++++++++++-------- .../ipc/UserToRakLibThreadMessageSender.php | 39 +++-- ...erToRakLibThreadSessionMessageProtocol.php | 42 ++++++ ...erToRakLibThreadSessionMessageReceiver.php | 78 ++++++++++ ...UserToRakLibThreadSessionMessageSender.php | 58 ++++++++ 21 files changed, 726 insertions(+), 264 deletions(-) create mode 100644 src/server/SessionEventListener.php create mode 100644 src/server/SessionInterface.php create mode 100644 src/server/ipc/InterThreadChannelFactory.php create mode 100644 src/server/ipc/InterThreadChannelReaderDeserializer.php create mode 100644 src/server/ipc/RakLibToUserThreadSessionMessageProtocol.php create mode 100644 src/server/ipc/RakLibToUserThreadSessionMessageReceiver.php create mode 100644 src/server/ipc/RakLibToUserThreadSessionMessageSender.php create mode 100644 src/server/ipc/UserToRakLibThreadSessionMessageProtocol.php create mode 100644 src/server/ipc/UserToRakLibThreadSessionMessageReceiver.php create mode 100644 src/server/ipc/UserToRakLibThreadSessionMessageSender.php diff --git a/src/server/Server.php b/src/server/Server.php index 4fc74c0..6db3668 100644 --- a/src/server/Server.php +++ b/src/server/Server.php @@ -22,7 +22,6 @@ use raklib\generic\SocketException; use raklib\protocol\ACK; use raklib\protocol\Datagram; -use raklib\protocol\EncapsulatedPacket; use raklib\protocol\NACK; use raklib\protocol\Packet; use raklib\protocol\PacketSerializer; @@ -152,10 +151,12 @@ public function tickProcessor() : void{ * The below code is designed to allow co-op between sending and receiving to avoid slowing down either one * when high traffic is coming either way. Yielding will occur after 100 messages. */ + $eventGenerator = $this->eventSource->process($this); do{ $stream = !$this->shutdown; for($i = 0; $i < 100 && $stream && !$this->shutdown; ++$i){ //if we received a shutdown event, we don't care about any more messages from the event source - $stream = $this->eventSource->process($this); + $eventGenerator->next(); + $stream = $eventGenerator->valid(); } $socket = true; @@ -327,13 +328,6 @@ public function getEventListener() : ServerEventListener{ return $this->eventListener; } - public function sendEncapsulated(int $sessionId, EncapsulatedPacket $packet, bool $immediate = false) : void{ - $session = $this->sessions[$sessionId] ?? null; - if($session !== null and $session->isConnected()){ - $session->addEncapsulatedToQueue($packet, $immediate); - } - } - public function sendRaw(string $address, int $port, string $payload) : void{ try{ $this->socket->writePacket($payload, $address, $port); @@ -342,12 +336,6 @@ public function sendRaw(string $address, int $port, string $payload) : void{ } } - public function closeSession(int $sessionId) : void{ - if(isset($this->sessions[$sessionId])){ - $this->sessions[$sessionId]->initiateDisconnect("server disconnect"); - } - } - public function setName(string $name) : void{ $this->name = $name; } @@ -383,6 +371,10 @@ public function addRawPacketFilter(string $regex) : void{ $this->rawPacketFilters[] = $regex; } + public function getSession(int $id) : ?SessionInterface{ + return $this->sessions[$id] ?? null; + } + public function getSessionByAddress(InternetAddress $address) : ?Session{ return $this->sessionsByAddress[$address->toString()] ?? null; } @@ -411,9 +403,9 @@ private function removeSessionInternal(Session $session) : void{ unset($this->sessionsByAddress[$session->getAddress()->toString()], $this->sessions[$session->getInternalId()]); } - public function openSession(Session $session) : void{ + public function openSession(Session $session) : SessionEventListener{ $address = $session->getAddress(); - $this->eventListener->onClientConnect($session->getInternalId(), $address->ip, $address->port, $session->getID()); + return $this->eventListener->onClientConnect($session->getInternalId(), $address->ip, $address->port, $session->getID()); } private function checkSessions() : void{ @@ -429,10 +421,6 @@ private function checkSessions() : void{ } } - public function notifyACK(Session $session, int $identifierACK) : void{ - $this->eventListener->onPacketAck($session->getInternalId(), $identifierACK); - } - public function getName() : string{ return $this->name; } diff --git a/src/server/ServerEventListener.php b/src/server/ServerEventListener.php index 24cc417..d2b4d8f 100644 --- a/src/server/ServerEventListener.php +++ b/src/server/ServerEventListener.php @@ -19,17 +19,9 @@ interface ServerEventListener{ - public function onClientConnect(int $sessionId, string $address, int $port, int $clientID) : void; - - public function onClientDisconnect(int $sessionId, string $reason) : void; - - public function onPacketReceive(int $sessionId, string $packet) : void; + public function onClientConnect(int $sessionId, string $address, int $port, int $clientID) : SessionEventListener; public function onRawPacketReceive(string $address, int $port, string $payload) : void; - public function onPacketAck(int $sessionId, int $identifierACK) : void; - public function onBandwidthStatsUpdate(int $bytesSentDiff, int $bytesReceivedDiff) : void; - - public function onPingMeasure(int $sessionId, int $pingMS) : void; } diff --git a/src/server/ServerEventSource.php b/src/server/ServerEventSource.php index 2f175fa..45d2ae9 100644 --- a/src/server/ServerEventSource.php +++ b/src/server/ServerEventSource.php @@ -19,5 +19,8 @@ interface ServerEventSource{ - public function process(ServerInterface $server) : bool; + /** + * @phpstan-return \Generator + */ + public function process(ServerInterface $server) : \Generator; } diff --git a/src/server/ServerInterface.php b/src/server/ServerInterface.php index 3bfad68..e86a506 100644 --- a/src/server/ServerInterface.php +++ b/src/server/ServerInterface.php @@ -17,16 +17,12 @@ namespace raklib\server; -use raklib\protocol\EncapsulatedPacket; - interface ServerInterface{ - public function sendEncapsulated(int $sessionId, EncapsulatedPacket $packet, bool $immediate = false) : void; + public function getSession(int $id) : ?SessionInterface; public function sendRaw(string $address, int $port, string $payload) : void; - public function closeSession(int $sessionId) : void; - public function setName(string $name) : void; public function setPortCheck(bool $value) : void; diff --git a/src/server/Session.php b/src/server/Session.php index c9e5448..c18e0de 100644 --- a/src/server/Session.php +++ b/src/server/Session.php @@ -39,7 +39,7 @@ use function microtime; use function ord; -class Session{ +class Session implements SessionInterface{ public const MAX_SPLIT_PART_COUNT = 128; public const MAX_CONCURRENT_SPLIT_COUNT = 4; @@ -90,6 +90,8 @@ class Session{ /** @var SendReliabilityLayer */ private $sendLayer; + private ?SessionEventListener $eventListener = null; + public function __construct(Server $server, \Logger $logger, InternetAddress $address, int $clientId, int $mtuSize, int $internalId){ if($mtuSize < self::MIN_MTU_SIZE){ throw new \InvalidArgumentException("MTU size must be at least " . self::MIN_MTU_SIZE . ", got $mtuSize"); @@ -120,7 +122,9 @@ function(Datagram $datagram) : void{ $this->sendPacket($datagram); }, function(int $identifierACK) : void{ - $this->server->getEventListener()->onPacketAck($this->internalId, $identifierACK); + if($this->eventListener !== null){ + $this->eventListener->onPacketAck($identifierACK); + } } ); } @@ -195,7 +199,7 @@ private function queueConnectedPacket(ConnectedPacket $packet, int $reliability, $this->sendLayer->addEncapsulatedToQueue($encapsulated, $immediate); } - public function addEncapsulatedToQueue(EncapsulatedPacket $packet, bool $immediate) : void{ + public function sendEncapsulated(EncapsulatedPacket $packet, bool $immediate = false) : void{ $this->sendLayer->addEncapsulatedToQueue($packet, $immediate); } @@ -231,7 +235,7 @@ private function handleEncapsulatedPacketRoute(EncapsulatedPacket $packet) : voi if($dataPacket->address->port === $this->server->getPort() or !$this->server->portChecking){ $this->state = self::STATE_CONNECTED; //FINALLY! $this->isTemporal = false; - $this->server->openSession($this); + $this->eventListener = $this->server->openSession($this); //$this->handlePong($dataPacket->sendPingTime, $dataPacket->sendPongTime); //can't use this due to system-address count issues in MCPE >.< $this->sendPing(); @@ -252,8 +256,8 @@ private function handleEncapsulatedPacketRoute(EncapsulatedPacket $packet) : voi $this->handlePong($dataPacket->sendPingTime, $dataPacket->sendPongTime); } - }elseif($this->state === self::STATE_CONNECTED){ - $this->server->getEventListener()->onPacketReceive($this->internalId, $packet->buffer); + }elseif($this->eventListener !== null){ + $this->eventListener->onPacketReceive($packet->buffer); }else{ //$this->logger->notice("Received packet before connection: " . bin2hex($packet->buffer)); } @@ -264,7 +268,9 @@ private function handleEncapsulatedPacketRoute(EncapsulatedPacket $packet) : voi */ private function handlePong(int $sendPingTime, int $sendPongTime) : void{ $this->lastPingMeasure = $this->server->getRakNetTimeMS() - $sendPingTime; - $this->server->getEventListener()->onPingMeasure($this->internalId, $this->lastPingMeasure); + if($this->eventListener !== null){ + $this->eventListener->onPingMeasure($this->lastPingMeasure); + } } public function handlePacket(Packet $packet) : void{ @@ -288,7 +294,10 @@ public function initiateDisconnect(string $reason) : void{ $this->state = self::STATE_DISCONNECTING; $this->disconnectionTime = microtime(true); $this->queueConnectedPacket(new DisconnectionNotification(), PacketReliability::RELIABLE_ORDERED, 0, true); - $this->server->getEventListener()->onClientDisconnect($this->internalId, $reason); + if($this->eventListener !== null){ + $this->eventListener->onDisconnect($reason); + $this->eventListener = null; + } $this->logger->debug("Requesting graceful disconnect because \"$reason\""); } } @@ -298,7 +307,10 @@ public function initiateDisconnect(string $reason) : void{ */ public function forciblyDisconnect(string $reason) : void{ $this->state = self::STATE_DISCONNECTED; - $this->server->getEventListener()->onClientDisconnect($this->internalId, $reason); + if($this->eventListener !== null){ + $this->eventListener->onDisconnect($reason); + $this->eventListener = null; + } $this->logger->debug("Forcibly disconnecting session due to \"$reason\""); } diff --git a/src/server/SessionEventListener.php b/src/server/SessionEventListener.php new file mode 100644 index 0000000..22f6eb0 --- /dev/null +++ b/src/server/SessionEventListener.php @@ -0,0 +1,47 @@ + + */ + private array $sessionMap = []; + + public function __construct(InterThreadChannelReader $channel, InterThreadChannelReaderDeserializer $channelFactory){ $this->channel = $channel; + $this->channelFactory = $channelFactory; } - public function handle(ServerEventListener $listener) : bool{ - if(($packet = $this->channel->read()) !== null){ - $id = ord($packet[0]); - $offset = 1; - if($id === ITCProtocol::PACKET_ENCAPSULATED){ - $sessionId = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $buffer = substr($packet, $offset); - $listener->onPacketReceive($sessionId, $buffer); - }elseif($id === ITCProtocol::PACKET_RAW){ - $len = ord($packet[$offset++]); - $address = substr($packet, $offset, $len); - $offset += $len; - $port = Binary::readShort(substr($packet, $offset, 2)); - $offset += 2; - $payload = substr($packet, $offset); - $listener->onRawPacketReceive($address, $port, $payload); - }elseif($id === ITCProtocol::PACKET_REPORT_BANDWIDTH_STATS){ - $sentBytes = Binary::readLong(substr($packet, $offset, 8)); - $offset += 8; - $receivedBytes = Binary::readLong(substr($packet, $offset, 8)); - $listener->onBandwidthStatsUpdate($sentBytes, $receivedBytes); - }elseif($id === ITCProtocol::PACKET_OPEN_SESSION){ - $sessionId = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $len = ord($packet[$offset++]); - $rawAddr = substr($packet, $offset, $len); - $offset += $len; - $address = inet_ntop($rawAddr); - if($address === false){ - throw new \RuntimeException("Unexpected invalid IP address in inter-thread message"); + /** + * @phpstan-return \Generator + */ + public function handle(ServerEventListener $listener) : \Generator{ + do{ + $processed = false; + if(($packet = $this->channel->read()) !== null){ + $id = ord($packet[0]); + $offset = 1; + if($id === ITCProtocol::PACKET_RAW){ + $len = ord($packet[$offset++]); + $address = substr($packet, $offset, $len); + $offset += $len; + $port = Binary::readShort(substr($packet, $offset, 2)); + $offset += 2; + $payload = substr($packet, $offset); + $listener->onRawPacketReceive($address, $port, $payload); + }elseif($id === ITCProtocol::PACKET_REPORT_BANDWIDTH_STATS){ + $sentBytes = Binary::readLong(substr($packet, $offset, 8)); + $offset += 8; + $receivedBytes = Binary::readLong(substr($packet, $offset, 8)); + $listener->onBandwidthStatsUpdate($sentBytes, $receivedBytes); + }elseif($id === ITCProtocol::PACKET_OPEN_SESSION){ + $sessionId = Binary::readInt(substr($packet, $offset, 4)); + $offset += 4; + $len = ord($packet[$offset++]); + $rawAddr = substr($packet, $offset, $len); + $offset += $len; + $address = inet_ntop($rawAddr); + if($address === false){ + throw new \RuntimeException("Unexpected invalid IP address in inter-thread message"); + } + $port = Binary::readShort(substr($packet, $offset, 2)); + $offset += 2; + $clientID = Binary::readLong(substr($packet, $offset, 8)); + $offset += 8; + + $channelReaderInfo = substr($packet, $offset); + $channelReader = $this->channelFactory->deserialize($channelReaderInfo); + if($channelReader !== null){ //the channel may have been destroyed before we could deserialize it + $receiver = new RakLibToUserThreadSessionMessageReceiver($channelReader); + $sessionListener = $listener->onClientConnect($sessionId, $address, $port, $clientID); + $this->sessionMap[$sessionId] = [$receiver, $sessionListener]; + } } - $port = Binary::readShort(substr($packet, $offset, 2)); - $offset += 2; - $clientID = Binary::readLong(substr($packet, $offset, 8)); - $listener->onClientConnect($sessionId, $address, $port, $clientID); - }elseif($id === ITCProtocol::PACKET_CLOSE_SESSION){ - $sessionId = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $len = ord($packet[$offset++]); - $reason = substr($packet, $offset, $len); - $listener->onClientDisconnect($sessionId, $reason); - }elseif($id === ITCProtocol::PACKET_ACK_NOTIFICATION){ - $sessionId = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $identifierACK = Binary::readInt(substr($packet, $offset, 4)); - $listener->onPacketAck($sessionId, $identifierACK); - }elseif($id === ITCProtocol::PACKET_REPORT_PING){ - $sessionId = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $pingMS = Binary::readInt(substr($packet, $offset, 4)); - $listener->onPingMeasure($sessionId, $pingMS); - } - return true; - } + $processed = true; + yield; + } - return false; + foreach($this->sessionMap as $sessionId => [$receiver, $sessionListener]){ + try{ + if($receiver->process($sessionListener)){ + $processed = true; + yield; + } + }finally{ + if($receiver->isClosed()){ + unset($this->sessionMap[$sessionId]); + } + } + } + }while($processed); } } diff --git a/src/server/ipc/RakLibToUserThreadMessageSender.php b/src/server/ipc/RakLibToUserThreadMessageSender.php index 24d1f6c..102d45b 100644 --- a/src/server/ipc/RakLibToUserThreadMessageSender.php +++ b/src/server/ipc/RakLibToUserThreadMessageSender.php @@ -20,6 +20,7 @@ use pocketmine\utils\Binary; use raklib\server\ipc\RakLibToUserThreadMessageProtocol as ITCProtocol; use raklib\server\ServerEventListener; +use raklib\server\SessionEventListener; use function chr; use function inet_pton; use function strlen; @@ -29,38 +30,29 @@ final class RakLibToUserThreadMessageSender implements ServerEventListener{ /** @var InterThreadChannelWriter */ private $channel; - public function __construct(InterThreadChannelWriter $channel){ + private InterThreadChannelFactory $channelFactory; + + public function __construct(InterThreadChannelWriter $channel, InterThreadChannelFactory $channelFactory){ $this->channel = $channel; + $this->channelFactory = $channelFactory; } - public function onClientConnect(int $sessionId, string $address, int $port, int $clientId) : void{ + public function onClientConnect(int $sessionId, string $address, int $port, int $clientID) : SessionEventListener{ $rawAddr = inet_pton($address); if($rawAddr === false){ throw new \InvalidArgumentException("Invalid IP address"); } + + [$channelReaderInfo, $channelWriter] = $this->channelFactory->createChannel(); $this->channel->write( chr(ITCProtocol::PACKET_OPEN_SESSION) . Binary::writeInt($sessionId) . chr(strlen($rawAddr)) . $rawAddr . Binary::writeShort($port) . - Binary::writeLong($clientId) - ); - } - - public function onClientDisconnect(int $sessionId, string $reason) : void{ - $this->channel->write( - chr(ITCProtocol::PACKET_CLOSE_SESSION) . - Binary::writeInt($sessionId) . - chr(strlen($reason)) . $reason - ); - } - - public function onPacketReceive(int $sessionId, string $packet) : void{ - $this->channel->write( - chr(ITCProtocol::PACKET_ENCAPSULATED) . - Binary::writeInt($sessionId) . - $packet + Binary::writeLong($clientID) . + $channelReaderInfo ); + return new RakLibToUserThreadSessionMessageSender($channelWriter); } public function onRawPacketReceive(string $address, int $port, string $payload) : void{ @@ -72,14 +64,6 @@ public function onRawPacketReceive(string $address, int $port, string $payload) ); } - public function onPacketAck(int $sessionId, int $identifierACK) : void{ - $this->channel->write( - chr(ITCProtocol::PACKET_ACK_NOTIFICATION) . - Binary::writeInt($sessionId) . - Binary::writeInt($identifierACK) - ); - } - public function onBandwidthStatsUpdate(int $bytesSentDiff, int $bytesReceivedDiff) : void{ $this->channel->write( chr(ITCProtocol::PACKET_REPORT_BANDWIDTH_STATS) . @@ -87,12 +71,4 @@ public function onBandwidthStatsUpdate(int $bytesSentDiff, int $bytesReceivedDif Binary::writeLong($bytesReceivedDiff) ); } - - public function onPingMeasure(int $sessionId, int $pingMS) : void{ - $this->channel->write( - chr(ITCProtocol::PACKET_REPORT_PING) . - Binary::writeInt($sessionId) . - Binary::writeInt($pingMS) - ); - } } diff --git a/src/server/ipc/RakLibToUserThreadSessionMessageProtocol.php b/src/server/ipc/RakLibToUserThreadSessionMessageProtocol.php new file mode 100644 index 0000000..f645892 --- /dev/null +++ b/src/server/ipc/RakLibToUserThreadSessionMessageProtocol.php @@ -0,0 +1,52 @@ +channel = $channel; + } + + public function process(SessionEventListener $listener) : bool{ + if(($packet = $this->channel->read()) !== null){ + $id = ord($packet[0]); + $offset = 1; + if($id === ITCSessionProtocol::PACKET_ENCAPSULATED){ + $buffer = substr($packet, $offset); + $listener->onPacketReceive($buffer); + }elseif($id === ITCSessionProtocol::PACKET_CLOSE_SESSION){ + $len = ord($packet[$offset++]); + $reason = substr($packet, $offset, $len); + $listener->onDisconnect($reason); + $this->closed = true; + }elseif($id === ITCSessionProtocol::PACKET_ACK_NOTIFICATION){ + $identifierACK = Binary::readInt(substr($packet, $offset, 4)); + $listener->onPacketAck($identifierACK); + }elseif($id === ITCSessionProtocol::PACKET_REPORT_PING){ + $pingMS = Binary::readInt(substr($packet, $offset, 4)); + $listener->onPingMeasure($pingMS); + } + + return true; + } + + return false; + } + + public function isClosed() : bool{ return $this->closed; } +} diff --git a/src/server/ipc/RakLibToUserThreadSessionMessageSender.php b/src/server/ipc/RakLibToUserThreadSessionMessageSender.php new file mode 100644 index 0000000..139fbfb --- /dev/null +++ b/src/server/ipc/RakLibToUserThreadSessionMessageSender.php @@ -0,0 +1,67 @@ +channel = $channel; + } + + public function onDisconnect(string $reason) : void{ + $this->channel->write( + chr(ITCSessionProtocol::PACKET_CLOSE_SESSION) . + chr(strlen($reason)) . $reason + ); + } + + public function onPacketReceive(string $payload) : void{ + $this->channel->write( + chr(ITCSessionProtocol::PACKET_ENCAPSULATED) . + $payload + ); + } + + public function onPacketAck(int $identifierACK) : void{ + $this->channel->write( + chr(ITCSessionProtocol::PACKET_ACK_NOTIFICATION) . + Binary::writeInt($identifierACK) + ); + } + + public function onPingMeasure(int $pingMS) : void{ + $this->channel->write( + chr(ITCSessionProtocol::PACKET_REPORT_PING) . + Binary::writeInt($pingMS) + ); + } +} diff --git a/src/server/ipc/UserToRakLibThreadMessageProtocol.php b/src/server/ipc/UserToRakLibThreadMessageProtocol.php index da52e80..9957265 100644 --- a/src/server/ipc/UserToRakLibThreadMessageProtocol.php +++ b/src/server/ipc/UserToRakLibThreadMessageProtocol.php @@ -40,24 +40,11 @@ private function __construct(){ */ /* - * ENCAPSULATED payload: - * int32 (internal session ID) - * byte (flags, last 3 bits, priority) - * byte (reliability) - * int32 (ack identifier) - * byte? (order channel, only when sequenced or ordered reliability) - * byte[] (user packet payload) + * PACKET_OPEN_SESSION_RESPONSE payload: + * int32 (session ID) + * byte[] (serialized channel information) */ - public const PACKET_ENCAPSULATED = 0x01; - - public const ENCAPSULATED_FLAG_NEED_ACK = 1 << 0; - public const ENCAPSULATED_FLAG_IMMEDIATE = 1 << 1; - - /* - * CLOSE_SESSION payload: - * int32 (internal session ID) - */ - public const PACKET_CLOSE_SESSION = 0x02; + public const PACKET_OPEN_SESSION_RESPONSE = 0x01; /* * RAW payload: diff --git a/src/server/ipc/UserToRakLibThreadMessageReceiver.php b/src/server/ipc/UserToRakLibThreadMessageReceiver.php index 64b6bbf..fdbc07b 100644 --- a/src/server/ipc/UserToRakLibThreadMessageReceiver.php +++ b/src/server/ipc/UserToRakLibThreadMessageReceiver.php @@ -18,11 +18,10 @@ namespace raklib\server\ipc; use pocketmine\utils\Binary; -use raklib\protocol\EncapsulatedPacket; -use raklib\protocol\PacketReliability; use raklib\server\ipc\UserToRakLibThreadMessageProtocol as ITCProtocol; use raklib\server\ServerEventSource; use raklib\server\ServerInterface; +use raklib\server\SessionInterface; use function ord; use function substr; @@ -30,73 +29,87 @@ final class UserToRakLibThreadMessageReceiver implements ServerEventSource{ /** @var InterThreadChannelReader */ private $channel; - public function __construct(InterThreadChannelReader $channel){ - $this->channel = $channel; - } + private InterThreadChannelReaderDeserializer $channelReaderDeserializer; - public function process(ServerInterface $server) : bool{ - if(($packet = $this->channel->read()) !== null){ - $id = ord($packet[0]); - $offset = 1; - if($id === ITCProtocol::PACKET_ENCAPSULATED){ - $sessionId = Binary::readInt(substr($packet, $offset, 4)); - $offset += 4; - $flags = ord($packet[$offset++]); - $immediate = ($flags & ITCProtocol::ENCAPSULATED_FLAG_IMMEDIATE) !== 0; - $needACK = ($flags & ITCProtocol::ENCAPSULATED_FLAG_NEED_ACK) !== 0; + /** + * @var SessionInterface[][]|UserToRakLibThreadSessionMessageReceiver[][] + * @phpstan-var array + */ + private array $sessionMap = []; - $encapsulated = new EncapsulatedPacket(); - $encapsulated->reliability = ord($packet[$offset++]); + public function __construct(InterThreadChannelReader $channel, InterThreadChannelReaderDeserializer $channelReaderDeserializer){ + $this->channel = $channel; + $this->channelReaderDeserializer = $channelReaderDeserializer; + } - if($needACK){ - $encapsulated->identifierACK = Binary::readInt(substr($packet, $offset, 4)); + /** + * @phpstan-return \Generator + */ + public function process(ServerInterface $server) : \Generator{ + do{ + $processed = false; + if(($packet = $this->channel->read()) !== null){ + $id = ord($packet[0]); + $offset = 1; + if($id === ITCProtocol::PACKET_RAW){ + $len = ord($packet[$offset++]); + $address = substr($packet, $offset, $len); + $offset += $len; + $port = Binary::readShort(substr($packet, $offset, 2)); + $offset += 2; + $payload = substr($packet, $offset); + $server->sendRaw($address, $port, $payload); + }elseif($id === ITCProtocol::PACKET_SET_NAME){ + $server->setName(substr($packet, $offset)); + }elseif($id === ITCProtocol::PACKET_ENABLE_PORT_CHECK){ + $server->setPortCheck(true); + }elseif($id === ITCProtocol::PACKET_DISABLE_PORT_CHECK){ + $server->setPortCheck(false); + }elseif($id === ITCProtocol::PACKET_SET_PACKETS_PER_TICK_LIMIT){ + $limit = Binary::readLong(substr($packet, $offset, 8)); + $server->setPacketsPerTickLimit($limit); + }elseif($id === ITCProtocol::PACKET_BLOCK_ADDRESS){ + $len = ord($packet[$offset++]); + $address = substr($packet, $offset, $len); + $offset += $len; + $timeout = Binary::readInt(substr($packet, $offset, 4)); + $server->blockAddress($address, $timeout); + }elseif($id === ITCProtocol::PACKET_UNBLOCK_ADDRESS){ + $len = ord($packet[$offset++]); + $address = substr($packet, $offset, $len); + $server->unblockAddress($address); + }elseif($id === ITCProtocol::PACKET_RAW_FILTER){ + $pattern = substr($packet, $offset); + $server->addRawPacketFilter($pattern); + }elseif($id === ITCProtocol::PACKET_OPEN_SESSION_RESPONSE){ + $sessionId = Binary::readInt(substr($packet, $offset, 4)); $offset += 4; + $session = $server->getSession($sessionId); + if($session !== null){ + $channelInfo = substr($packet, $offset); + $channel = $this->channelReaderDeserializer->deserialize($channelInfo); + if($channel !== null){ + $this->sessionMap[$sessionId] = [new UserToRakLibThreadSessionMessageReceiver($channel), $session]; + } + } } - if(PacketReliability::isSequencedOrOrdered($encapsulated->reliability)){ - $encapsulated->orderChannel = ord($packet[$offset++]); - } - - $encapsulated->buffer = substr($packet, $offset); - $server->sendEncapsulated($sessionId, $encapsulated, $immediate); - }elseif($id === ITCProtocol::PACKET_RAW){ - $len = ord($packet[$offset++]); - $address = substr($packet, $offset, $len); - $offset += $len; - $port = Binary::readShort(substr($packet, $offset, 2)); - $offset += 2; - $payload = substr($packet, $offset); - $server->sendRaw($address, $port, $payload); - }elseif($id === ITCProtocol::PACKET_CLOSE_SESSION){ - $sessionId = Binary::readInt(substr($packet, $offset, 4)); - $server->closeSession($sessionId); - }elseif($id === ITCProtocol::PACKET_SET_NAME){ - $server->setName(substr($packet, $offset)); - }elseif($id === ITCProtocol::PACKET_ENABLE_PORT_CHECK){ - $server->setPortCheck(true); - }elseif($id === ITCProtocol::PACKET_DISABLE_PORT_CHECK){ - $server->setPortCheck(false); - }elseif($id === ITCProtocol::PACKET_SET_PACKETS_PER_TICK_LIMIT){ - $limit = Binary::readLong(substr($packet, $offset, 8)); - $server->setPacketsPerTickLimit($limit); - }elseif($id === ITCProtocol::PACKET_BLOCK_ADDRESS){ - $len = ord($packet[$offset++]); - $address = substr($packet, $offset, $len); - $offset += $len; - $timeout = Binary::readInt(substr($packet, $offset, 4)); - $server->blockAddress($address, $timeout); - }elseif($id === ITCProtocol::PACKET_UNBLOCK_ADDRESS){ - $len = ord($packet[$offset++]); - $address = substr($packet, $offset, $len); - $server->unblockAddress($address); - }elseif($id === ITCProtocol::PACKET_RAW_FILTER){ - $pattern = substr($packet, $offset); - $server->addRawPacketFilter($pattern); + $processed = true; + yield; } - return true; - } - - return false; + foreach($this->sessionMap as $sessionId => [$receiver, $session]){ + try{ + if($receiver->process($session)){ + $processed = true; + yield; + } + }finally{ + if($receiver->isClosed()){ + unset($this->sessionMap[$sessionId]); + } + } + } + }while($processed); } } diff --git a/src/server/ipc/UserToRakLibThreadMessageSender.php b/src/server/ipc/UserToRakLibThreadMessageSender.php index ed331c5..2797372 100644 --- a/src/server/ipc/UserToRakLibThreadMessageSender.php +++ b/src/server/ipc/UserToRakLibThreadMessageSender.php @@ -18,10 +18,9 @@ namespace raklib\server\ipc; use pocketmine\utils\Binary; -use raklib\protocol\EncapsulatedPacket; -use raklib\protocol\PacketReliability; use raklib\server\ipc\UserToRakLibThreadMessageProtocol as ITCProtocol; use raklib\server\ServerInterface; +use raklib\server\SessionInterface; use function chr; use function strlen; @@ -29,32 +28,32 @@ class UserToRakLibThreadMessageSender implements ServerInterface{ /** @var InterThreadChannelWriter */ private $channel; - public function __construct(InterThreadChannelWriter $channel){ + private InterThreadChannelFactory $channelFactory; + + public function __construct(InterThreadChannelWriter $channel, InterThreadChannelFactory $channelFactory){ $this->channel = $channel; + $this->channelFactory = $channelFactory; } - public function sendEncapsulated(int $sessionId, EncapsulatedPacket $packet, bool $immediate = false) : void{ - $flags = - ($immediate ? ITCProtocol::ENCAPSULATED_FLAG_IMMEDIATE : 0) | - ($packet->identifierACK !== null ? ITCProtocol::ENCAPSULATED_FLAG_NEED_ACK : 0); - - $buffer = chr(ITCProtocol::PACKET_ENCAPSULATED) . + /** + * Opens an inter-thread channel to the RakLib thread for the given session. + */ + public function openSessionChannel(int $sessionId) : SessionInterface{ + [$channelReaderInfo, $channelWriter] = $this->channelFactory->createChannel(); + $this->channel->write( + chr(ITCProtocol::PACKET_OPEN_SESSION_RESPONSE) . Binary::writeInt($sessionId) . - chr($flags) . - chr($packet->reliability) . - ($packet->identifierACK !== null ? Binary::writeInt($packet->identifierACK) : "") . - (PacketReliability::isSequencedOrOrdered($packet->reliability) ? chr($packet->orderChannel) : "") . - $packet->buffer; - $this->channel->write($buffer); + $channelReaderInfo + ); + return new UserToRakLibThreadSessionMessageSender($channelWriter); } - public function sendRaw(string $address, int $port, string $payload) : void{ - $buffer = chr(ITCProtocol::PACKET_RAW) . chr(strlen($address)) . $address . Binary::writeShort($port) . $payload; - $this->channel->write($buffer); + public function getSession(int $id) : ?SessionInterface{ + return null; } - public function closeSession(int $sessionId) : void{ - $buffer = chr(ITCProtocol::PACKET_CLOSE_SESSION) . Binary::writeInt($sessionId); + public function sendRaw(string $address, int $port, string $payload) : void{ + $buffer = chr(ITCProtocol::PACKET_RAW) . chr(strlen($address)) . $address . Binary::writeShort($port) . $payload; $this->channel->write($buffer); } diff --git a/src/server/ipc/UserToRakLibThreadSessionMessageProtocol.php b/src/server/ipc/UserToRakLibThreadSessionMessageProtocol.php new file mode 100644 index 0000000..6e767d4 --- /dev/null +++ b/src/server/ipc/UserToRakLibThreadSessionMessageProtocol.php @@ -0,0 +1,42 @@ +channel = $channel; + } + + public function process(SessionInterface $session) : bool{ + if(($packet = $this->channel->read()) !== null){ + $id = ord($packet[0]); + $offset = 1; + if($id === ITCSessionProtocol::PACKET_ENCAPSULATED){ + $flags = ord($packet[$offset++]); + $immediate = ($flags & UserToRakLibThreadSessionMessageProtocol::ENCAPSULATED_FLAG_IMMEDIATE) !== 0; + $needACK = ($flags & UserToRakLibThreadSessionMessageProtocol::ENCAPSULATED_FLAG_NEED_ACK) !== 0; + + $encapsulated = new EncapsulatedPacket(); + $encapsulated->reliability = ord($packet[$offset++]); + + if($needACK){ + $encapsulated->identifierACK = Binary::readInt(substr($packet, $offset, 4)); + $offset += 4; + } + + if(PacketReliability::isSequencedOrOrdered($encapsulated->reliability)){ + $encapsulated->orderChannel = ord($packet[$offset++]); + } + + $encapsulated->buffer = substr($packet, $offset); + $session->sendEncapsulated($encapsulated, $immediate); + }elseif($id === ITCSessionProtocol::PACKET_CLOSE_SESSION){ + $session->initiateDisconnect("server disconnect"); + $this->closed = true; + } + + return true; + } + + return false; + } + + public function isClosed() : bool{ return $this->closed; } +} diff --git a/src/server/ipc/UserToRakLibThreadSessionMessageSender.php b/src/server/ipc/UserToRakLibThreadSessionMessageSender.php new file mode 100644 index 0000000..c517b97 --- /dev/null +++ b/src/server/ipc/UserToRakLibThreadSessionMessageSender.php @@ -0,0 +1,58 @@ +channel = $channel; + } + + public function sendEncapsulated(EncapsulatedPacket $packet, bool $immediate = false) : void{ + $flags = + ($immediate ? ITCSessionProtocol::ENCAPSULATED_FLAG_IMMEDIATE : 0) | + ($packet->identifierACK !== null ? ITCSessionProtocol::ENCAPSULATED_FLAG_NEED_ACK : 0); + + $buffer = chr(ITCSessionProtocol::PACKET_ENCAPSULATED) . + chr($flags) . + chr($packet->reliability) . + ($packet->identifierACK !== null ? Binary::writeInt($packet->identifierACK) : "") . + (PacketReliability::isSequencedOrOrdered($packet->reliability) ? chr($packet->orderChannel) : "") . + $packet->buffer; + $this->channel->write($buffer); + } + + public function initiateDisconnect(string $reason) : void{ + $this->channel->write(chr(ITCSessionProtocol::PACKET_CLOSE_SESSION)); + } +}