Skip to content

Commit

Permalink
Fix blocking reads; Better error handling
Browse files Browse the repository at this point in the history
- Fix process not being closed on exit message failure.
- Fix blocking reads.
- Add error handling to writes.
  • Loading branch information
ssigwart committed Jun 6, 2024
1 parent d4f588d commit 09d855d
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 18 deletions.
22 changes: 18 additions & 4 deletions src/ProcessPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,26 @@ public function __destruct()
foreach ($this->runningProcs as $proc)
{
try {
$proc->sendExitRequest();
if (!$proc->hasFailed())
$proc->sendExitRequest();
} catch (Throwable $e) {
}
try {
$proc->close();
} catch (Throwable $e) {}
} catch (Throwable $e) {
}
}
foreach ($this->unassignedProcs as $proc)
{
try {
$proc->sendExitRequest();
if (!$proc->hasFailed())
$proc->sendExitRequest();
} catch (Throwable $e) {
}
try {
$proc->close();
} catch (Throwable $e) {}
} catch (Throwable $e) {
}
}
}

Expand Down Expand Up @@ -181,6 +191,10 @@ public function releaseProcess(ProcessPoolRequest $process): void
{
try {
$process->sendExitRequest();
} catch (Throwable $e) {
// Suppress error
}
try {
$process->close();
} catch (Throwable $e) {
// Suppress error
Expand Down
10 changes: 6 additions & 4 deletions src/ProcessPoolProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,14 @@ private function waitForRequest()
if ($msgType === ProcessPoolMessageTypes::MSG_START_REQUEST)
{
$length = $this->_waitForNumberWithEndChar(PHP_EOL);
while (strlen($this->inputBuffer) < $length)
$numBytesMoreToRead = $length - strlen($this->inputBuffer);
while ($numBytesMoreToRead > 0)
{
$input = fread(STDIN, 1024);
$input = fread(STDIN, min($numBytesMoreToRead, 1024));
if ($input === false)
throw new ProcessPoolUnexpectedEOFException();
$this->inputBuffer .= $input;
$numBytesMoreToRead = $length - strlen($this->inputBuffer);
}
$data = substr($this->inputBuffer, 0, $length);
$this->inputBuffer = substr($this->inputBuffer, $length);
Expand All @@ -73,7 +75,7 @@ private function waitForRequest()
}

/**
* Wait for a number followed by an exd character
* Wait for a number followed by a delimiter character
*
* @return int Number
* @throws ProcessPoolException
Expand All @@ -99,7 +101,7 @@ private function _waitForNumberWithEndChar(string $char)
throw new ProcessPoolUnexpectedMessageException();
}

// Get more input
// Get more input. Note that we expect a new ling after message types, so we can expect fread to exit before 1024 characters.
$input = fread(STDIN, 1024);
if ($input === false || ($input === '' && feof(STDIN)))
throw new ProcessPoolUnexpectedEOFException();
Expand Down
48 changes: 40 additions & 8 deletions src/ProcessPoolRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ public function __construct(string $cmd, ?string $cwd = null, ?array $env = null
];
$proc = proc_open($cmd, $descriptorSpec, $this->pipes, $cwd, $env);
if ($proc !== false)
{
$this->process = $proc;

// Make sure `fread` doesn't block for stderr reads
stream_set_blocking($this->pipes[2], false);
}
}

/**
Expand All @@ -42,6 +47,32 @@ public function __destruct()
$this->close();
}

/**
* Write message
*
* @param string $msg Message
*/
private function writeMsg(string $msg): void
{
// Write data
$bytesWritten = fwrite($this->pipes[0], $msg);
if ($bytesWritten === false || $bytesWritten === 0)
{
$exceptionMsg = 'Failed to write message.';
$error = error_get_last();
if ($error !== null)
$exceptionMsg .= ' ' . $error['message'];
throw new ProcessPoolException($exceptionMsg, $error['type'] ?? 0);
}

// Write more data
if ($bytesWritten < strlen($msg))
$this->writeMsg(substr($msg, $bytesWritten));
// Flush when done
else
fflush($this->pipes[0]);
}

/**
* Send a request
*
Expand All @@ -54,8 +85,7 @@ public function sendRequest(string $data): void
throw new ProcessPoolResourceFailedException();

// Write data
fwrite($this->pipes[0], ProcessPoolMessageTypes::MSG_START_REQUEST . ';' . strlen($data) . PHP_EOL . $data);
fflush($this->pipes[0]);
$this->writeMsg(ProcessPoolMessageTypes::MSG_START_REQUEST . ';' . strlen($data) . PHP_EOL . $data);
}

/**
Expand All @@ -69,8 +99,7 @@ public function sendExitRequest(): void
throw new ProcessPoolResourceFailedException();

// Write data
fwrite($this->pipes[0], ProcessPoolMessageTypes::MSG_EXIT . ';');
fflush($this->pipes[0]);
$this->writeMsg(ProcessPoolMessageTypes::MSG_EXIT . ';' . PHP_EOL);
}

/**
Expand Down Expand Up @@ -184,10 +213,12 @@ public function getStdoutResponse(): string
$this->stdoutBuffer = substr($this->stdoutBuffer, strlen($match[1]) + 1);

// Get response
while (strlen($this->stdoutBuffer) < $length)
$numBytesMoreToRead = $length - strlen($this->stdoutBuffer);
while ($numBytesMoreToRead > 0)
{
$newInput = $this->_getResponseFromPipe(1);
$newInput = $this->_getResponseFromPipe(1, $numBytesMoreToRead);
$this->stdoutBuffer .= $newInput;
$numBytesMoreToRead = $length - strlen($this->stdoutBuffer);

// Make sure not at EOF
if ($newInput === '')
Expand Down Expand Up @@ -219,17 +250,18 @@ public function getStderrResponse(): string
* Get response from pipe
*
* @param int $pipeIdx Pipe index
* @param int|null $maxNumBytes Max number of bytes
*
* @return string Response
* @throws ProcessPoolException
*/
private function _getResponseFromPipe(int $pipeIdx): string
private function _getResponseFromPipe(int $pipeIdx, ?int $maxNumBytes = null): string
{
if ($this->process === null)
throw new ProcessPoolResourceFailedException();

// Read some data
$input = fread($this->pipes[$pipeIdx], 1024);
$input = fread($this->pipes[$pipeIdx], min($maxNumBytes ?? 1024, 1024));
if ($input === false)
{
// Don't let the process be reuse
Expand Down
13 changes: 11 additions & 2 deletions tests/TestAuxFiles/PhpUnitTestProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,16 @@ public function handleRequest(string $data)
if (preg_match('/^Error/A', $data, $match))
error_log('Error-' . md5($data));

// Return MD5 of data
print md5($data);
// Check if we should echo data
if (preg_match('/^echo/A', $data, $match))
print $data;
// Check if we should echo data to stderr
else if (preg_match('/^stderr echo/A', $data, $match))
fwrite(STDERR, $data);
else
{
// Output MD5 of data
print md5($data);
}
}
}
48 changes: 48 additions & 0 deletions tests/TestCases/ProcessPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,52 @@ public function testProcessPoolNoRelease(): void
self::assertEquals('', $req1->getStderrResponse(), 'Stderr should be empty.');
$pool->releaseProcess($req1);
}

/**
* Test pool release with long message
*/
public function testProcessPoolWithLongMessage(): void
{
$poolSize = 1;
$pool = new ProcessPool($poolSize, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..'));

$req1 = $pool->startProcess();
$data = str_repeat('0123456789', 120);
$req1->sendRequest($data);
self::assertEquals(md5($data), $req1->getStdoutResponse(), 'MD5 incorrect.');
self::assertEquals('', $req1->getStderrResponse(), 'Stderr should be empty.');
$pool->releaseProcess($req1);
}

/**
* Test pool release with long response
*/
public function testProcessPoolWithLongResponse(): void
{
$poolSize = 1;
$pool = new ProcessPool($poolSize, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..'));

$req1 = $pool->startProcess();
$data = 'echo ' . str_repeat('0123456789', 120);
$req1->sendRequest($data);
self::assertEquals($data, $req1->getStdoutResponse(), 'MD5 incorrect.');
self::assertEquals('', $req1->getStderrResponse(), 'Stderr should be empty.');
$pool->releaseProcess($req1);
}

/**
* Test pool release with long STDERR response
*/
public function testProcessPoolWithLongStderrResponse(): void
{
$poolSize = 1;
$pool = new ProcessPool($poolSize, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..'));

$req1 = $pool->startProcess();
$data = 'stderr echo ' . str_repeat('0123456789', 120);
$req1->sendRequest($data);
self::assertEquals('', $req1->getStdoutResponse(), 'MD5 incorrect.');
self::assertEquals($data, $req1->getStderrResponse(), 'Stderr should be empty.');
$pool->releaseProcess($req1);
}
}

0 comments on commit 09d855d

Please sign in to comment.