From 09d855d211d0e99920f4d005363d4868972c6688 Mon Sep 17 00:00:00 2001 From: Stephen Sigwart Date: Thu, 6 Jun 2024 16:49:43 -0400 Subject: [PATCH] Fix blocking reads; Better error handling - Fix process not being closed on exit message failure. - Fix blocking reads. - Add error handling to writes. --- src/ProcessPool.php | 22 +++++++++-- src/ProcessPoolProcess.php | 10 +++-- src/ProcessPoolRequest.php | 48 +++++++++++++++++++---- tests/TestAuxFiles/PhpUnitTestProcess.php | 13 +++++- tests/TestCases/ProcessPoolTest.php | 48 +++++++++++++++++++++++ 5 files changed, 123 insertions(+), 18 deletions(-) diff --git a/src/ProcessPool.php b/src/ProcessPool.php index 00acc51..ca6b740 100644 --- a/src/ProcessPool.php +++ b/src/ProcessPool.php @@ -63,16 +63,26 @@ public function __destruct() foreach ($this->runningProcs as $proc) { try { - $proc->sendExitRequest(); + if (!$proc->hasFailed()) + $proc->sendExitRequest(); + } catch (Throwable $e) { + } + try { $proc->close(); - } catch (Throwable $e) {} + } catch (Throwable $e) { + } } foreach ($this->unassignedProcs as $proc) { try { - $proc->sendExitRequest(); + if (!$proc->hasFailed()) + $proc->sendExitRequest(); + } catch (Throwable $e) { + } + try { $proc->close(); - } catch (Throwable $e) {} + } catch (Throwable $e) { + } } } @@ -181,6 +191,10 @@ public function releaseProcess(ProcessPoolRequest $process): void { try { $process->sendExitRequest(); + } catch (Throwable $e) { + // Suppress error + } + try { $process->close(); } catch (Throwable $e) { // Suppress error diff --git a/src/ProcessPoolProcess.php b/src/ProcessPoolProcess.php index 8d7044c..287815e 100644 --- a/src/ProcessPoolProcess.php +++ b/src/ProcessPoolProcess.php @@ -51,12 +51,14 @@ private function waitForRequest() if ($msgType === ProcessPoolMessageTypes::MSG_START_REQUEST) { $length = $this->_waitForNumberWithEndChar(PHP_EOL); - while (strlen($this->inputBuffer) < $length) + $numBytesMoreToRead = $length - strlen($this->inputBuffer); + while ($numBytesMoreToRead > 0) { - $input = fread(STDIN, 1024); + $input = fread(STDIN, min($numBytesMoreToRead, 1024)); if ($input === false) throw new ProcessPoolUnexpectedEOFException(); $this->inputBuffer .= $input; + $numBytesMoreToRead = $length - strlen($this->inputBuffer); } $data = substr($this->inputBuffer, 0, $length); $this->inputBuffer = substr($this->inputBuffer, $length); @@ -73,7 +75,7 @@ private function waitForRequest() } /** - * Wait for a number followed by an exd character + * Wait for a number followed by a delimiter character * * @return int Number * @throws ProcessPoolException @@ -99,7 +101,7 @@ private function _waitForNumberWithEndChar(string $char) throw new ProcessPoolUnexpectedMessageException(); } - // Get more input + // Get more input. Note that we expect a new ling after message types, so we can expect fread to exit before 1024 characters. $input = fread(STDIN, 1024); if ($input === false || ($input === '' && feof(STDIN))) throw new ProcessPoolUnexpectedEOFException(); diff --git a/src/ProcessPoolRequest.php b/src/ProcessPoolRequest.php index c5afd52..b0b7b59 100644 --- a/src/ProcessPoolRequest.php +++ b/src/ProcessPoolRequest.php @@ -30,7 +30,12 @@ public function __construct(string $cmd, ?string $cwd = null, ?array $env = null ]; $proc = proc_open($cmd, $descriptorSpec, $this->pipes, $cwd, $env); if ($proc !== false) + { $this->process = $proc; + + // Make sure `fread` doesn't block for stderr reads + stream_set_blocking($this->pipes[2], false); + } } /** @@ -42,6 +47,32 @@ public function __destruct() $this->close(); } + /** + * Write message + * + * @param string $msg Message + */ + private function writeMsg(string $msg): void + { + // Write data + $bytesWritten = fwrite($this->pipes[0], $msg); + if ($bytesWritten === false || $bytesWritten === 0) + { + $exceptionMsg = 'Failed to write message.'; + $error = error_get_last(); + if ($error !== null) + $exceptionMsg .= ' ' . $error['message']; + throw new ProcessPoolException($exceptionMsg, $error['type'] ?? 0); + } + + // Write more data + if ($bytesWritten < strlen($msg)) + $this->writeMsg(substr($msg, $bytesWritten)); + // Flush when done + else + fflush($this->pipes[0]); + } + /** * Send a request * @@ -54,8 +85,7 @@ public function sendRequest(string $data): void throw new ProcessPoolResourceFailedException(); // Write data - fwrite($this->pipes[0], ProcessPoolMessageTypes::MSG_START_REQUEST . ';' . strlen($data) . PHP_EOL . $data); - fflush($this->pipes[0]); + $this->writeMsg(ProcessPoolMessageTypes::MSG_START_REQUEST . ';' . strlen($data) . PHP_EOL . $data); } /** @@ -69,8 +99,7 @@ public function sendExitRequest(): void throw new ProcessPoolResourceFailedException(); // Write data - fwrite($this->pipes[0], ProcessPoolMessageTypes::MSG_EXIT . ';'); - fflush($this->pipes[0]); + $this->writeMsg(ProcessPoolMessageTypes::MSG_EXIT . ';' . PHP_EOL); } /** @@ -184,10 +213,12 @@ public function getStdoutResponse(): string $this->stdoutBuffer = substr($this->stdoutBuffer, strlen($match[1]) + 1); // Get response - while (strlen($this->stdoutBuffer) < $length) + $numBytesMoreToRead = $length - strlen($this->stdoutBuffer); + while ($numBytesMoreToRead > 0) { - $newInput = $this->_getResponseFromPipe(1); + $newInput = $this->_getResponseFromPipe(1, $numBytesMoreToRead); $this->stdoutBuffer .= $newInput; + $numBytesMoreToRead = $length - strlen($this->stdoutBuffer); // Make sure not at EOF if ($newInput === '') @@ -219,17 +250,18 @@ public function getStderrResponse(): string * Get response from pipe * * @param int $pipeIdx Pipe index + * @param int|null $maxNumBytes Max number of bytes * * @return string Response * @throws ProcessPoolException */ - private function _getResponseFromPipe(int $pipeIdx): string + private function _getResponseFromPipe(int $pipeIdx, ?int $maxNumBytes = null): string { if ($this->process === null) throw new ProcessPoolResourceFailedException(); // Read some data - $input = fread($this->pipes[$pipeIdx], 1024); + $input = fread($this->pipes[$pipeIdx], min($maxNumBytes ?? 1024, 1024)); if ($input === false) { // Don't let the process be reuse diff --git a/tests/TestAuxFiles/PhpUnitTestProcess.php b/tests/TestAuxFiles/PhpUnitTestProcess.php index 175ff96..1902c5e 100644 --- a/tests/TestAuxFiles/PhpUnitTestProcess.php +++ b/tests/TestAuxFiles/PhpUnitTestProcess.php @@ -30,7 +30,16 @@ public function handleRequest(string $data) if (preg_match('/^Error/A', $data, $match)) error_log('Error-' . md5($data)); - // Return MD5 of data - print md5($data); + // Check if we should echo data + if (preg_match('/^echo/A', $data, $match)) + print $data; + // Check if we should echo data to stderr + else if (preg_match('/^stderr echo/A', $data, $match)) + fwrite(STDERR, $data); + else + { + // Output MD5 of data + print md5($data); + } } } diff --git a/tests/TestCases/ProcessPoolTest.php b/tests/TestCases/ProcessPoolTest.php index dc139a1..9a47a24 100644 --- a/tests/TestCases/ProcessPoolTest.php +++ b/tests/TestCases/ProcessPoolTest.php @@ -169,4 +169,52 @@ public function testProcessPoolNoRelease(): void self::assertEquals('', $req1->getStderrResponse(), 'Stderr should be empty.'); $pool->releaseProcess($req1); } + + /** + * Test pool release with long message + */ + public function testProcessPoolWithLongMessage(): void + { + $poolSize = 1; + $pool = new ProcessPool($poolSize, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..')); + + $req1 = $pool->startProcess(); + $data = str_repeat('0123456789', 120); + $req1->sendRequest($data); + self::assertEquals(md5($data), $req1->getStdoutResponse(), 'MD5 incorrect.'); + self::assertEquals('', $req1->getStderrResponse(), 'Stderr should be empty.'); + $pool->releaseProcess($req1); + } + + /** + * Test pool release with long response + */ + public function testProcessPoolWithLongResponse(): void + { + $poolSize = 1; + $pool = new ProcessPool($poolSize, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..')); + + $req1 = $pool->startProcess(); + $data = 'echo ' . str_repeat('0123456789', 120); + $req1->sendRequest($data); + self::assertEquals($data, $req1->getStdoutResponse(), 'MD5 incorrect.'); + self::assertEquals('', $req1->getStderrResponse(), 'Stderr should be empty.'); + $pool->releaseProcess($req1); + } + + /** + * Test pool release with long STDERR response + */ + public function testProcessPoolWithLongStderrResponse(): void + { + $poolSize = 1; + $pool = new ProcessPool($poolSize, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..')); + + $req1 = $pool->startProcess(); + $data = 'stderr echo ' . str_repeat('0123456789', 120); + $req1->sendRequest($data); + self::assertEquals('', $req1->getStdoutResponse(), 'MD5 incorrect.'); + self::assertEquals($data, $req1->getStderrResponse(), 'Stderr should be empty.'); + $pool->releaseProcess($req1); + } }