From 9a7dff4a7fc9e9dcc79959e3af7c872b02fe6087 Mon Sep 17 00:00:00 2001 From: Stephen Sigwart Date: Sat, 5 Sep 2020 10:44:26 -0400 Subject: [PATCH] Manage number of spare processes and kill processes in destructor --- composer.json | 4 ++ src/ProcessPool.php | 65 +++++++++++++++++++++++++++-- src/ProcessPoolRequest.php | 17 ++++---- tests/TestCases/ProcessPoolTest.php | 39 ++++++++++++++++- 4 files changed, 113 insertions(+), 12 deletions(-) diff --git a/composer.json b/composer.json index f072b46..1a769c0 100644 --- a/composer.json +++ b/composer.json @@ -12,6 +12,10 @@ "autoload": { "psr-4": { "ssigwart\\ProcessPool\\": "src/" } }, + "readme": "README.md", + "require": { + "php": ">=7.0.0" + }, "require-dev": { "phpunit/phpunit": "^9" } diff --git a/src/ProcessPool.php b/src/ProcessPool.php index d3ae282..e4b386b 100644 --- a/src/ProcessPool.php +++ b/src/ProcessPool.php @@ -11,6 +11,9 @@ class ProcessPool /** Max number of processes */ private $maxNumProcs = 0; + /** Max number of unassigned processes */ + private $maxNumUnassignedProcs = 0; + /** @var ProcessPoolRequest[] Running process pool */ private $runningProcs = []; @@ -39,6 +42,7 @@ public function __construct(int $minNumProcs, int $maxNumProcs, string $cmd, ?st { $this->minNumProcs = $minNumProcs; $this->maxNumProcs = $maxNumProcs; + $this->maxNumUnassignedProcs = min($this->minNumProcs + 5, $this->maxNumProcs); $this->cmd = $cmd; $this->cwd = $cwd; $this->env = $env; @@ -48,10 +52,63 @@ public function __construct(int $minNumProcs, int $maxNumProcs, string $cmd, ?st $this->addProcess(); } + /** + * Destructor + */ + public function __destruct() + { + // Close processes + foreach ($this->runningProcs as $proc) + { + try { + $proc->close(); + } catch (Throwable $e) {} + } + foreach ($this->unassignedProcs as $proc) + { + try { + $proc->close(); + } catch (Throwable $e) {} + } + } + + /** + * Set max number of space processes + * + * @param int $maxNumUnassignedProcs Max number of spare processes. Must be at least min number of processes + * + * @throws ProcessPoolException + */ + public function setMaxNumSpareProcesses(int $maxNumUnassignedProcs): void + { + if ($maxNumUnassignedProcs < $this->minNumProcs) + throw new ProcessPoolException('Number of spare servers cannot be less than minimum number of processes (' . $this->minNumProcs . ').'); + } + + /** + * Get number of processes running + * + * @return int Number of processes + */ + public function getNumRunningProcesses(): int + { + return count($this->runningProcs); + } + + /** + * Get number of processes started, but not service a request + * + * @return int Number of processes + */ + public function getNumUnassignedProcesses(): int + { + return count($this->unassignedProcs); + } + /** * Add a process */ - private function addProcess() + private function addProcess(): void { $this->unassignedProcs[] = new ProcessPoolRequest($this->cmd, $this->cwd, $this->env); } @@ -86,7 +143,7 @@ public function startProcess(): ProcessPoolRequest * @param ProcessPoolRequest $process Process * @throws ProcessPoolException */ - public function releaseProcess(ProcessPoolRequest $process) + public function releaseProcess(ProcessPoolRequest $process): void { // Find process $procIdx = null; @@ -106,8 +163,8 @@ public function releaseProcess(ProcessPoolRequest $process) // Start a new process on failure if ($process->hasFailed()) $this->addProcess(); - // Add this process back to the pull - else + // Add this process back to the pool if needed + else if ($this->getNumUnassignedProcesses() + 1 < $this->maxNumUnassignedProcs) $this->unassignedProcs[] = $process; } } diff --git a/src/ProcessPoolRequest.php b/src/ProcessPoolRequest.php index 1f44969..3e2a28d 100644 --- a/src/ProcessPoolRequest.php +++ b/src/ProcessPoolRequest.php @@ -48,7 +48,7 @@ public function __destruct() * @param string $data Data to send * @throws ProcessPoolException */ - public function sendRequest(string $data) + public function sendRequest(string $data): void { if ($this->process === null) throw new ProcessPoolResourceFailedException(); @@ -63,19 +63,22 @@ public function sendRequest(string $data) * * @throws ProcessPoolException */ - public function freeRequest() + public function freeRequest(): void { // Make sure we read all data - if ($this->stdoutBuffer === null) - $this->getStdoutResponse(); - if ($this->hasStderrData()) - $this->getStderrResponse(); + if ($this->process !== null) + { + if ($this->stdoutBuffer === null) + $this->getStdoutResponse(); + if ($this->hasStderrData()) + $this->getStderrResponse(); + } } /** * Close process */ - private function close() + public function close(): void { if ($this->process !== null) { diff --git a/tests/TestCases/ProcessPoolTest.php b/tests/TestCases/ProcessPoolTest.php index 826552b..86c7752 100644 --- a/tests/TestCases/ProcessPoolTest.php +++ b/tests/TestCases/ProcessPoolTest.php @@ -5,6 +5,7 @@ use PHPUnit\Framework\TestCase; use ssigwart\ProcessPool\ProcessPool; +use ssigwart\ProcessPool\ProcessPoolException; use ssigwart\ProcessPool\ProcessPoolPoolExhaustedException; use ssigwart\ProcessPool\ProcessPoolUnexpectedEOFException; @@ -20,6 +21,19 @@ public function testInvalidResponse(): void { $this->expectException(ProcessPoolUnexpectedEOFException::class); $pool = new ProcessPool(1, 1, 'sleep 0', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..')); + $req1 = $pool->startProcess(); + $req1->sendRequest(''); + $req1->getStdoutResponse(); + } + + /** + * Test invalid max spares + */ + public function testInvalidMaxSpares(): void + { + $pool = new ProcessPool(5, 10, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..')); + $this->expectException(ProcessPoolException::class); + $pool->setMaxNumSpareProcesses(3); } /** @@ -27,14 +41,27 @@ public function testInvalidResponse(): void */ public function testProcessPool(): void { + $minPoolSize = 1; $poolSize = 3; - $pool = new ProcessPool(1, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..')); + $pool = new ProcessPool($minPoolSize, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..')); + $maxSpares = 2; + $pool->setMaxNumSpareProcesses($maxSpares); // Single process + $numRunning = 0; + $numUnassigned = $minPoolSize; $req1 = $pool->startProcess(); $req1->sendRequest('Testing 1'); + $numRunning++; + $numUnassigned--; + if ($numUnassigned < 0) + $numUnassigned = 0; + $this->assertEquals($numRunning, $pool->getNumRunningProcesses(), 'Number of processes running incorrect.'); + $this->assertEquals($numUnassigned, $pool->getNumUnassignedProcesses(), 'Number of processes unassigned incorrect.'); $this->assertEquals('3560b3b3658d3f95d320367b007ee2b6', $req1->getStdoutResponse(), 'MD5 incorrect.'); $pool->releaseProcess($req1); + $numRunning--; + $numUnassigned++; // Multiple processes $msgs = [ @@ -47,6 +74,12 @@ public function testProcessPool(): void { $req = $pool->startProcess(); $req->sendRequest($msg); + $numRunning++; + $numUnassigned--; + if ($numUnassigned < 0) + $numUnassigned = 0; + $this->assertEquals($numRunning, $pool->getNumRunningProcesses(), 'Number of processes running incorrect.'); + $this->assertEquals($numUnassigned, $pool->getNumUnassignedProcesses(), 'Number of processes unassigned incorrect.'); $requests[] = $req; } reset($requests); @@ -55,6 +88,10 @@ public function testProcessPool(): void { $this->assertEquals($md5, $req->getStdoutResponse(), 'MD5 incorrect.'); $pool->releaseProcess($req); + $numRunning--; + $numUnassigned++; + $this->assertEquals($numRunning, $pool->getNumRunningProcesses(), 'Number of processes running incorrect.'); + $this->assertEquals(min($maxSpares, $numUnassigned), $pool->getNumUnassignedProcesses(), 'Number of processes unassigned incorrect.'); $req = next($requests); }