diff --git a/src/Components/ClientConfig.php b/src/Components/ClientConfig.php index 893471a..7a75405 100644 --- a/src/Components/ClientConfig.php +++ b/src/Components/ClientConfig.php @@ -8,6 +8,12 @@ class ClientConfig { + private $scheme; + private $host; + private $user; + private $password; + private $port; + private $timeout = WscCommonsContract::DEFAULT_TIMEOUT; private $headers = []; private $fragmentSize = WscCommonsContract::DEFAULT_FRAGMENT_SIZE; @@ -76,4 +82,84 @@ public function setContext($context) { $this->context = $context; } + + /** + * @return mixed + */ + public function getScheme(): string + { + return $this->scheme; + } + + /** + * @param void $scheme + */ + public function setScheme($scheme): void + { + $this->scheme = $scheme; + } + + /** + * @return string + */ + public function getHost(): string + { + return $this->host; + } + + /** + * @param void $host + */ + public function setHost($host): void + { + $this->host = $host; + } + + /** + * @return string + */ + public function getUser(): string + { + return $this->user; + } + + /** + * @param array $urlParts + */ + public function setUser(array $urlParts): void + { + $this->user = isset($urlParts['user']) ? $urlParts['user'] : ''; + } + + /** + * @return string + */ + public function getPassword(): string + { + return $this->password; + } + + /** + * @param array $urlParts + */ + public function setPassword(array $urlParts): void + { + $this->password = isset($urlParts['pass']) ? $urlParts['pass'] : ''; + } + + /** + * @return string + */ + public function getPort(): string + { + return $this->port; + } + + /** + * @param array $urlParts + */ + public function setPort(array $urlParts): void + { + $this->port = isset($urlParts['port']) ? $urlParts['port'] : ($this->scheme === 'wss' ? 443 : 80); + } } \ No newline at end of file diff --git a/src/Components/WSClientTrait.php b/src/Components/WSClientTrait.php new file mode 100644 index 0000000..25c3d39 --- /dev/null +++ b/src/Components/WSClientTrait.php @@ -0,0 +1,242 @@ +socket, self::DEFAULT_RESPONSE_HEADER, "\r\n\r\n"); + if (!preg_match(self::SEC_WEBSOCKET_ACCEPT_PTTRN, $response, $matches)) { + $address = $config->getScheme() . '://' . $config->getHost() . $pathWithQuery; + throw new ConnectionException( + "Connection to '{$address}' failed: Server sent invalid upgrade response:\n" + . $response, CommonsContract::CLIENT_INVALID_UPGRADE_RESPONSE + ); + } + + $keyAccept = trim($matches[1]); + $expectedResonse = base64_encode(pack('H*', sha1($key . self::SERVER_KEY_ACCEPT))); + if ($keyAccept !== $expectedResonse) { + throw new ConnectionException('Server sent bad upgrade response.', + CommonsContract::CLIENT_INVALID_UPGRADE_RESPONSE); + } + } + + /** + * Gets host uri based on protocol + * + * @param ClientConfig $config + * @return string + * @throws BadUriException + */ + private function getHostUri(ClientConfig $config): string + { + if (in_array($config->getScheme(), ['ws', 'wss'], true) === false) { + throw new BadUriException( + "Url should have scheme ws or wss, not '{$config->getScheme()}' from URI '$this->socketUrl' .", + CommonsContract::CLIENT_INCORRECT_SCHEME + ); + } + + return ($config->getScheme() === 'wss' ? 'ssl' : 'tcp') . '://' . $config->getHost(); + } + + /** + * @param string $data + * @return float|int + * @throws ConnectionException + */ + private function getPayloadLength(string $data) + { + $payloadLength = (int)ord($data[1]) & self::MASK_127; // Bits 1-7 in byte 1 + if ($payloadLength > self::MASK_125) { + if ($payloadLength === self::MASK_126) { + $data = $this->read(2); // 126: Payload is a 16-bit unsigned int + } else { + $data = $this->read(8); // 127: Payload is a 64-bit unsigned int + } + $payloadLength = bindec(self::sprintB($data)); + } + + return $payloadLength; + } + + /** + * @param string $data + * @param int $payloadLength + * @return string + * @throws ConnectionException + */ + private function getPayloadData(string $data, int $payloadLength): string + { + // Masking? + $mask = (bool)(ord($data[1]) >> 7); // Bit 0 in byte 1 + $payload = ''; + $maskingKey = ''; + + // Get masking key. + if ($mask) { + $maskingKey = $this->read(4); + } + + // Get the actual payload, if any (might not be for e.g. close frames. + if ($payloadLength > 0) { + $data = $this->read($payloadLength); + + if ($mask) { + // Unmask payload. + for ($i = 0; $i < $payloadLength; $i++) { + $payload .= ($data[$i] ^ $maskingKey[$i % 4]); + } + } else { + $payload = $data; + } + } + + return $payload; + } + + /** + * @return null|string + * @throws \WSSC\Exceptions\BadOpcodeException + * @throws \InvalidArgumentException + * @throws BadOpcodeException + * @throws BadUriException + * @throws ConnectionException + * @throws \Exception + */ + protected function receiveFragment() + { + // Just read the main fragment information first. + $data = $this->read(2); + + // Is this the final fragment? // Bit 0 in byte 0 + /// @todo Handle huge payloads with multiple fragments. + $final = (bool)(ord($data[0]) & 1 << 7); + + // Parse opcode + $opcode_int = ord($data[0]) & 31; // Bits 4-7 + $opcode_ints = array_flip(self::$opcodes); + if (!array_key_exists($opcode_int, $opcode_ints)) { + throw new ConnectionException("Bad opcode in websocket frame: $opcode_int", + CommonsContract::CLIENT_BAD_OPCODE); + } + + $opcode = $opcode_ints[$opcode_int]; + + // record the opcode if we are not receiving a continutation fragment + if ($opcode !== 'continuation') { + $this->lastOpcode = $opcode; + } + + $payloadLength = $this->getPayloadLength($data); + $payload = $this->getPayloadData($data, $payloadLength); + + if ($opcode === CommonsContract::EVENT_TYPE_CLOSE) { + // Get the close status. + if ($payloadLength >= 2) { + $statusBin = $payload[0] . $payload[1]; + $status = bindec(sprintf('%08b%08b', ord($payload[0]), ord($payload[1]))); + $this->closeStatus = $status; + $payload = substr($payload, 2); + + if (!$this->isClosing) { + $this->send($statusBin . 'Close acknowledged: ' . $status, + CommonsContract::EVENT_TYPE_CLOSE); // Respond. + } + } + + if ($this->isClosing) { + $this->isClosing = false; // A close response, all done. + } + + fclose($this->socket); + $this->isConnected = false; + } + + if (!$final) { + $this->hugePayload .= $payload; + + return NULL; + } // this is the last fragment, and we are processing a huge_payload + + if ($this->hugePayload) { + $payload = $this->hugePayload .= $payload; + $this->hugePayload = NULL; + } + + return $payload; + } + + /** + * @param $final + * @param $payload + * @param $opcode + * @param $masked + * @throws ConnectionException + * @throws \Exception + */ + protected function sendFragment($final, $payload, $opcode, $masked) + { + // Binary string for header. + $frameHeadBin = ''; + // Write FIN, final fragment bit. + $frameHeadBin .= (bool)$final ? '1' : '0'; + // RSV 1, 2, & 3 false and unused. + $frameHeadBin .= '000'; + // Opcode rest of the byte. + $frameHeadBin .= sprintf('%04b', self::$opcodes[$opcode]); + // Use masking? + $frameHeadBin .= $masked ? '1' : '0'; + + // 7 bits of payload length... + $payloadLen = strlen($payload); + if ($payloadLen > self::MAX_BYTES_READ) { + $frameHeadBin .= decbin(self::MASK_127); + $frameHeadBin .= sprintf('%064b', $payloadLen); + } else if ($payloadLen > self::MASK_125) { + $frameHeadBin .= decbin(self::MASK_126); + $frameHeadBin .= sprintf('%016b', $payloadLen); + } else { + $frameHeadBin .= sprintf('%07b', $payloadLen); + } + + $frame = ''; + + // Write frame head to frame. + foreach (str_split($frameHeadBin, 8) as $binstr) { + $frame .= chr(bindec($binstr)); + } + // Handle masking + if ($masked) { + // generate a random mask: + $mask = ''; + for ($i = 0; $i < 4; $i++) { + $mask .= chr(random_int(0, 255)); + } + $frame .= $mask; + } + + // Append payload to frame: + for ($i = 0; $i < $payloadLen; $i++) { + $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i]; + } + + $this->write($frame); + } +} \ No newline at end of file diff --git a/src/Components/WscMain.php b/src/Components/WscMain.php index fb6b5d3..beb4415 100755 --- a/src/Components/WscMain.php +++ b/src/Components/WscMain.php @@ -18,6 +18,8 @@ class WscMain implements WscCommonsContract { + use WSClientTrait; + private $socket; private $isConnected = false; private $isClosing = false; @@ -47,24 +49,25 @@ protected function connect() { $urlParts = parse_url($this->socketUrl); - $scheme = $urlParts['scheme']; - $host = $urlParts['host']; - $user = isset($urlParts['user']) ? $urlParts['user'] : ''; - $pass = isset($urlParts['pass']) ? $urlParts['pass'] : ''; - $port = isset($urlParts['port']) ? $urlParts['port'] : ($scheme === 'wss' ? 443 : 80); + $this->config->setScheme($urlParts['scheme']); + $this->config->setHost($urlParts['host']); + $this->config->setUser($urlParts); + $this->config->setPassword($urlParts); + $this->config->setPort($urlParts); $pathWithQuery = $this->getPathWithQuery($urlParts); - $hostUri = $this->getHostUri($scheme, $host); + $hostUri = $this->getHostUri($this->config); // Set the stream context options if they're already set in the config $context = $this->getStreamContext(); $this->socket = @stream_socket_client( - $hostUri . ':' . $port, $errno, $errstr, $this->config->getTimeout(), STREAM_CLIENT_CONNECT, $context + $hostUri . ':' . $this->config->getPort(), $errno, $errstr, $this->config->getTimeout(), + STREAM_CLIENT_CONNECT, $context ); if ($this->socket === false) { throw new ConnectionException( - "Could not open socket to \"$host:$port\": $errstr ($errno).", + "Could not open socket to \"{$this->config->getHost()}:{$this->config->getPort()}\": $errstr ($errno).", CommonsContract::CLIENT_COULD_NOT_OPEN_SOCKET ); } @@ -75,7 +78,7 @@ protected function connect() // Generate the WebSocket key. $key = $this->generateKey(); $headers = [ - 'Host' => $host . ':' . $port, + 'Host' => $this->config->getHost() . ':' . $this->config->getPort(), 'User-Agent' => 'websocket-client-php', 'Connection' => 'Upgrade', 'Upgrade' => 'WebSocket', @@ -84,8 +87,8 @@ protected function connect() ]; // Handle basic authentication. - if ($user || $pass) { - $headers['authorization'] = 'Basic ' . base64_encode($user . ':' . $pass) . "\r\n"; + if ($this->config->getUser() || $this->config->getPassword()) { + $headers['authorization'] = 'Basic ' . base64_encode($this->config->getUser() . ':' . $this->config->getPassword()) . "\r\n"; } // Add and override with headers from options. @@ -100,54 +103,10 @@ protected function connect() // Get server response header // @todo Handle version switching - $this->validateResponse($scheme, $host, $pathWithQuery, $key); + $this->validateResponse($this->config, $pathWithQuery, $key); $this->isConnected = true; } - /** - * @param string $scheme - * @param string $host - * @return string - * @throws BadUriException - */ - private function getHostUri(string $scheme, string $host): string - { - if (in_array($scheme, ['ws', 'wss'], true) === false) { - throw new BadUriException( - "Url should have scheme ws or wss, not '$scheme' from URI '$this->socketUrl' .", - CommonsContract::CLIENT_INCORRECT_SCHEME - ); - } - - return ($scheme === 'wss' ? 'ssl' : 'tcp') . '://' . $host; - } - - /** - * @param string $scheme - * @param string $host - * @param string $pathWithQuery - * @param string $key - * @throws ConnectionException - */ - private function validateResponse(string $scheme, string $host, string $pathWithQuery, string $key) - { - $response = stream_get_line($this->socket, self::DEFAULT_RESPONSE_HEADER, "\r\n\r\n"); - if (!preg_match(self::SEC_WEBSOCKET_ACCEPT_PTTRN, $response, $matches)) { - $address = $scheme . '://' . $host . $pathWithQuery; - throw new ConnectionException( - "Connection to '{$address}' failed: Server sent invalid upgrade response:\n" - . $response, CommonsContract::CLIENT_INVALID_UPGRADE_RESPONSE - ); - } - - $keyAccept = trim($matches[1]); - $expectedResonse = base64_encode(pack('H*', sha1($key . self::SERVER_KEY_ACCEPT))); - if ($keyAccept !== $expectedResonse) { - throw new ConnectionException('Server sent bad upgrade response.', - CommonsContract::CLIENT_INVALID_UPGRADE_RESPONSE); - } - } - /** * @return mixed|resource * @throws \InvalidArgumentException @@ -272,7 +231,7 @@ public function send($payload, $opcode = CommonsContract::EVENT_TYPE_TEXT) // while we have data to send while ($payloadLength > $fragmentCursor) { // get a fragment of the payload - $sub_payload = substr($payload, $fragmentCursor, $this->config->getFragmentSize()); + $subPayload = substr($payload, $fragmentCursor, $this->config->getFragmentSize()); // advance the cursor $fragmentCursor += $this->config->getFragmentSize(); @@ -281,70 +240,13 @@ public function send($payload, $opcode = CommonsContract::EVENT_TYPE_TEXT) $final = $payloadLength <= $fragmentCursor; // send the fragment - $this->sendFragment($final, $sub_payload, $opcode, true); + $this->sendFragment($final, $subPayload, $opcode, true); // all fragments after the first will be marked a continuation $opcode = 'continuation'; } } - /** - * @param $final - * @param $payload - * @param $opcode - * @param $masked - * @throws ConnectionException - * @throws \Exception - */ - protected function sendFragment($final, $payload, $opcode, $masked) - { - // Binary string for header. - $frameHeadBin = ''; - // Write FIN, final fragment bit. - $frameHeadBin .= (bool)$final ? '1' : '0'; - // RSV 1, 2, & 3 false and unused. - $frameHeadBin .= '000'; - // Opcode rest of the byte. - $frameHeadBin .= sprintf('%04b', self::$opcodes[$opcode]); - // Use masking? - $frameHeadBin .= $masked ? '1' : '0'; - - // 7 bits of payload length... - $payloadLen = strlen($payload); - if ($payloadLen > self::MAX_BYTES_READ) { - $frameHeadBin .= decbin(self::MASK_127); - $frameHeadBin .= sprintf('%064b', $payloadLen); - } else if ($payloadLen > self::MASK_125) { - $frameHeadBin .= decbin(self::MASK_126); - $frameHeadBin .= sprintf('%016b', $payloadLen); - } else { - $frameHeadBin .= sprintf('%07b', $payloadLen); - } - - $frame = ''; - - // Write frame head to frame. - foreach (str_split($frameHeadBin, 8) as $binstr) { - $frame .= chr(bindec($binstr)); - } - // Handle masking - if ($masked) { - // generate a random mask: - $mask = ''; - for ($i = 0; $i < 4; $i++) { - $mask .= chr(random_int(0, 255)); - } - $frame .= $mask; - } - - // Append payload to frame: - for ($i = 0; $i < $payloadLen; $i++) { - $frame .= ($masked === true) ? $payload[$i] ^ $mask[$i % 4] : $payload[$i]; - } - - $this->write($frame); - } - /** * Receives message client<-server * @@ -370,139 +272,17 @@ public function receive() return $response; } - /** - * @return null|string - * @throws \InvalidArgumentException - * @throws BadOpcodeException - * @throws BadUriException - * @throws ConnectionException - * @throws \Exception - */ - protected function receiveFragment() - { - // Just read the main fragment information first. - $data = $this->read(2); - - // Is this the final fragment? // Bit 0 in byte 0 - /// @todo Handle huge payloads with multiple fragments. - $final = (bool)(ord($data[0]) & 1 << 7); - - // Parse opcode - $opcode_int = ord($data[0]) & 31; // Bits 4-7 - $opcode_ints = array_flip(self::$opcodes); - if (!array_key_exists($opcode_int, $opcode_ints)) { - throw new ConnectionException("Bad opcode in websocket frame: $opcode_int", - CommonsContract::CLIENT_BAD_OPCODE); - } - - $opcode = $opcode_ints[$opcode_int]; - - // record the opcode if we are not receiving a continutation fragment - if ($opcode !== 'continuation') { - $this->lastOpcode = $opcode; - } - - $payloadLength = $this->getPayloadLength($data); - $payload = $this->getPayloadData($data, $payloadLength); - - if ($opcode === CommonsContract::EVENT_TYPE_CLOSE) { - // Get the close status. - if ($payloadLength >= 2) { - $statusBin = $payload[0] . $payload[1]; - $status = bindec(sprintf('%08b%08b', ord($payload[0]), ord($payload[1]))); - $this->closeStatus = $status; - $payload = substr($payload, 2); - - if (!$this->isClosing) { - $this->send($statusBin . 'Close acknowledged: ' . $status, - CommonsContract::EVENT_TYPE_CLOSE); // Respond. - } - } - - if ($this->isClosing) { - $this->isClosing = false; // A close response, all done. - } - - fclose($this->socket); - $this->isConnected = false; - } - - if (!$final) { - $this->hugePayload .= $payload; - - return NULL; - } // this is the last fragment, and we are processing a huge_payload - - if ($this->hugePayload) { - $payload = $this->hugePayload .= $payload; - $this->hugePayload = NULL; - } - - return $payload; - } - - /** - * @param string $data - * @param int $payloadLength - * @return string - * @throws ConnectionException - */ - private function getPayloadData(string $data, int $payloadLength): string - { - // Masking? - $mask = (bool)(ord($data[1]) >> 7); // Bit 0 in byte 1 - $payload = ''; - $maskingKey = ''; - - // Get masking key. - if ($mask) { - $maskingKey = $this->read(4); - } - - // Get the actual payload, if any (might not be for e.g. close frames. - if ($payloadLength > 0) { - $data = $this->read($payloadLength); - - if ($mask) { - // Unmask payload. - for ($i = 0; $i < $payloadLength; $i++) { - $payload .= ($data[$i] ^ $maskingKey[$i % 4]); - } - } else { - $payload = $data; - } - } - - return $payload; - } - - /** - * @param string $data - * @return float|int - * @throws ConnectionException - */ - private function getPayloadLength(string $data) - { - $payloadLength = (int)ord($data[1]) & self::MASK_127; // Bits 1-7 in byte 1 - if ($payloadLength > self::MASK_125) { - if ($payloadLength === self::MASK_126) { - $data = $this->read(2); // 126: Payload is a 16-bit unsigned int - } else { - $data = $this->read(8); // 127: Payload is a 64-bit unsigned int - } - $payloadLength = bindec(self::sprintB($data)); - } - - return $payloadLength; - } - /** * Tell the socket to close. * * @param integer $status http://tools.ietf.org/html/rfc6455#section-7.4 * @param string $message A closing message, max 125 bytes. * @return bool|null|string + * @throws \InvalidArgumentException * @throws BadOpcodeException + * @throws BadUriException + * @throws ConnectionException + * @throws \Exception */ public function close(int $status = 1000, string $message = 'ttfn') { @@ -602,5 +382,4 @@ private function generateKey(): string return base64_encode($key); } - }