Skip to content

Commit

Permalink
Manage number of spare processes and kill processes in destructor
Browse files Browse the repository at this point in the history
  • Loading branch information
ssigwart committed Sep 5, 2020
1 parent e7befe1 commit 9a7dff4
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 12 deletions.
4 changes: 4 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
"autoload": {
"psr-4": { "ssigwart\\ProcessPool\\": "src/" }
},
"readme": "README.md",
"require": {
"php": ">=7.0.0"
},
"require-dev": {
"phpunit/phpunit": "^9"
}
Expand Down
65 changes: 61 additions & 4 deletions src/ProcessPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ class ProcessPool
/** Max number of processes */
private $maxNumProcs = 0;

/** Max number of unassigned processes */
private $maxNumUnassignedProcs = 0;

/** @var ProcessPoolRequest[] Running process pool */
private $runningProcs = [];

Expand Down Expand Up @@ -39,6 +42,7 @@ public function __construct(int $minNumProcs, int $maxNumProcs, string $cmd, ?st
{
$this->minNumProcs = $minNumProcs;
$this->maxNumProcs = $maxNumProcs;
$this->maxNumUnassignedProcs = min($this->minNumProcs + 5, $this->maxNumProcs);
$this->cmd = $cmd;
$this->cwd = $cwd;
$this->env = $env;
Expand All @@ -48,10 +52,63 @@ public function __construct(int $minNumProcs, int $maxNumProcs, string $cmd, ?st
$this->addProcess();
}

/**
* Destructor
*/
public function __destruct()
{
// Close processes
foreach ($this->runningProcs as $proc)
{
try {
$proc->close();
} catch (Throwable $e) {}
}
foreach ($this->unassignedProcs as $proc)
{
try {
$proc->close();
} catch (Throwable $e) {}
}
}

/**
* Set max number of space processes
*
* @param int $maxNumUnassignedProcs Max number of spare processes. Must be at least min number of processes
*
* @throws ProcessPoolException
*/
public function setMaxNumSpareProcesses(int $maxNumUnassignedProcs): void
{
if ($maxNumUnassignedProcs < $this->minNumProcs)
throw new ProcessPoolException('Number of spare servers cannot be less than minimum number of processes (' . $this->minNumProcs . ').');
}

/**
* Get number of processes running
*
* @return int Number of processes
*/
public function getNumRunningProcesses(): int
{
return count($this->runningProcs);
}

/**
* Get number of processes started, but not service a request
*
* @return int Number of processes
*/
public function getNumUnassignedProcesses(): int
{
return count($this->unassignedProcs);
}

/**
* Add a process
*/
private function addProcess()
private function addProcess(): void
{
$this->unassignedProcs[] = new ProcessPoolRequest($this->cmd, $this->cwd, $this->env);
}
Expand Down Expand Up @@ -86,7 +143,7 @@ public function startProcess(): ProcessPoolRequest
* @param ProcessPoolRequest $process Process
* @throws ProcessPoolException
*/
public function releaseProcess(ProcessPoolRequest $process)
public function releaseProcess(ProcessPoolRequest $process): void
{
// Find process
$procIdx = null;
Expand All @@ -106,8 +163,8 @@ public function releaseProcess(ProcessPoolRequest $process)
// Start a new process on failure
if ($process->hasFailed())
$this->addProcess();
// Add this process back to the pull
else
// Add this process back to the pool if needed
else if ($this->getNumUnassignedProcesses() + 1 < $this->maxNumUnassignedProcs)
$this->unassignedProcs[] = $process;
}
}
17 changes: 10 additions & 7 deletions src/ProcessPoolRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public function __destruct()
* @param string $data Data to send
* @throws ProcessPoolException
*/
public function sendRequest(string $data)
public function sendRequest(string $data): void
{
if ($this->process === null)
throw new ProcessPoolResourceFailedException();
Expand All @@ -63,19 +63,22 @@ public function sendRequest(string $data)
*
* @throws ProcessPoolException
*/
public function freeRequest()
public function freeRequest(): void
{
// Make sure we read all data
if ($this->stdoutBuffer === null)
$this->getStdoutResponse();
if ($this->hasStderrData())
$this->getStderrResponse();
if ($this->process !== null)
{
if ($this->stdoutBuffer === null)
$this->getStdoutResponse();
if ($this->hasStderrData())
$this->getStderrResponse();
}
}

/**
* Close process
*/
private function close()
public function close(): void
{
if ($this->process !== null)
{
Expand Down
39 changes: 38 additions & 1 deletion tests/TestCases/ProcessPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use PHPUnit\Framework\TestCase;

use ssigwart\ProcessPool\ProcessPool;
use ssigwart\ProcessPool\ProcessPoolException;
use ssigwart\ProcessPool\ProcessPoolPoolExhaustedException;
use ssigwart\ProcessPool\ProcessPoolUnexpectedEOFException;

Expand All @@ -20,21 +21,47 @@ public function testInvalidResponse(): void
{
$this->expectException(ProcessPoolUnexpectedEOFException::class);
$pool = new ProcessPool(1, 1, 'sleep 0', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..'));
$req1 = $pool->startProcess();
$req1->sendRequest('');
$req1->getStdoutResponse();
}

/**
* Test invalid max spares
*/
public function testInvalidMaxSpares(): void
{
$pool = new ProcessPool(5, 10, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..'));
$this->expectException(ProcessPoolException::class);
$pool->setMaxNumSpareProcesses(3);
}

/**
* Test process pool
*/
public function testProcessPool(): void
{
$minPoolSize = 1;
$poolSize = 3;
$pool = new ProcessPool(1, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..'));
$pool = new ProcessPool($minPoolSize, $poolSize, 'php processes/phpUnitProcesses.php', realpath(__DIR__ . DIRECTORY_SEPARATOR . '..'));
$maxSpares = 2;
$pool->setMaxNumSpareProcesses($maxSpares);

// Single process
$numRunning = 0;
$numUnassigned = $minPoolSize;
$req1 = $pool->startProcess();
$req1->sendRequest('Testing 1');
$numRunning++;
$numUnassigned--;
if ($numUnassigned < 0)
$numUnassigned = 0;
$this->assertEquals($numRunning, $pool->getNumRunningProcesses(), 'Number of processes running incorrect.');
$this->assertEquals($numUnassigned, $pool->getNumUnassignedProcesses(), 'Number of processes unassigned incorrect.');
$this->assertEquals('3560b3b3658d3f95d320367b007ee2b6', $req1->getStdoutResponse(), 'MD5 incorrect.');
$pool->releaseProcess($req1);
$numRunning--;
$numUnassigned++;

// Multiple processes
$msgs = [
Expand All @@ -47,6 +74,12 @@ public function testProcessPool(): void
{
$req = $pool->startProcess();
$req->sendRequest($msg);
$numRunning++;
$numUnassigned--;
if ($numUnassigned < 0)
$numUnassigned = 0;
$this->assertEquals($numRunning, $pool->getNumRunningProcesses(), 'Number of processes running incorrect.');
$this->assertEquals($numUnassigned, $pool->getNumUnassignedProcesses(), 'Number of processes unassigned incorrect.');
$requests[] = $req;
}
reset($requests);
Expand All @@ -55,6 +88,10 @@ public function testProcessPool(): void
{
$this->assertEquals($md5, $req->getStdoutResponse(), 'MD5 incorrect.');
$pool->releaseProcess($req);
$numRunning--;
$numUnassigned++;
$this->assertEquals($numRunning, $pool->getNumRunningProcesses(), 'Number of processes running incorrect.');
$this->assertEquals(min($maxSpares, $numUnassigned), $pool->getNumUnassignedProcesses(), 'Number of processes unassigned incorrect.');
$req = next($requests);
}

Expand Down

0 comments on commit 9a7dff4

Please sign in to comment.