Skip to content

Commit

Permalink
[235] Auto calculate the amount of concurrent processes for the broker.
Browse files Browse the repository at this point in the history
  • Loading branch information
DumitracheAdrian committed Mar 19, 2024
1 parent 4c15ee5 commit 97ec48a
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use Draw\Component\Messenger\Broker\Command\StartMessengerBrokerCommand;
use Draw\Component\Messenger\Broker\EventListener\BrokerDefaultValuesListener;
use Draw\Component\Messenger\Broker\EventListener\StopBrokerOnSigtermSignalListener;
use Draw\Component\Messenger\Counter\CpuCounter;
use Draw\Component\Messenger\DoctrineEnvelopeEntityReference\EventListener\PropertyReferenceEncodingListener;
use Draw\Component\Messenger\DoctrineMessageBusHook\EnvelopeFactory\BasicEnvelopeFactory;
use Draw\Component\Messenger\DoctrineMessageBusHook\EnvelopeFactory\EnvelopeFactoryInterface;
Expand Down Expand Up @@ -168,6 +169,10 @@ public static function provideTestLoad(): iterable
'draw.messenger.manual_trigger.action.click_message_action',
[ClickMessageAction::class]
),
new ServiceConfiguration(
'draw.messenger.counter.cpu_counter',
[CpuCounter::class]
),
];

$defaultAliases = [
Expand Down
75 changes: 69 additions & 6 deletions packages/messenger/Broker/Command/StartMessengerBrokerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Draw\Component\Messenger\Broker\Command;

use Draw\Component\Messenger\Broker\Broker;
use Draw\Component\Messenger\Counter\CpuCounter;
use Draw\Contracts\Process\ProcessFactoryInterface;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Exception\InvalidOptionException;
Expand All @@ -14,10 +15,13 @@

class StartMessengerBrokerCommand extends Command
{
private const OPTION_VALUE_CONCURRENT_AUTO = 'auto';

public function __construct(
private string $consolePath,
private ProcessFactoryInterface $processFactory,
private EventDispatcherInterface $eventDispatcher
private EventDispatcherInterface $eventDispatcher,
private CpuCounter $cpuCounter,
) {
parent::__construct();
}
Expand All @@ -38,7 +42,30 @@ protected function configure(): void
'concurrent',
null,
InputOption::VALUE_REQUIRED,
'The number of concurrent consumer you want to run',
sprintf(
'The number of concurrent consumers you want to run; use "%s" to use the auto calculation of CPU',
self::OPTION_VALUE_CONCURRENT_AUTO
),
1
)
->addOption(
'processes-per-core',
null,
InputOption::VALUE_REQUIRED,
sprintf(
'The number of processes per CPU (used only if "concurrent" is set to "%s")',
self::OPTION_VALUE_CONCURRENT_AUTO
),
1
)
->addOption(
'minimum-processes',
null,
InputOption::VALUE_REQUIRED,
sprintf(
'Minimum number of processes (used only if "concurrent" is set to "%s")',
self::OPTION_VALUE_CONCURRENT_AUTO
),
1
)
->addOption(
Expand All @@ -54,10 +81,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
{
$io = new SymfonyStyle($input, $output);

$concurrent = (int) $input->getOption('concurrent');
if ($concurrent <= 0) {
throw new InvalidOptionException('Concurrent value ['.$concurrent.'] is invalid. Must be 1 or greater');
}
$concurrent = $this->getConcurrent($input);

$timeout = (int) $input->getOption('timeout');
if ($timeout < 0) {
Expand All @@ -79,4 +103,43 @@ protected function createBroker(string $context): Broker
{
return new Broker($context, $this->consolePath, $this->processFactory, $this->eventDispatcher);
}

private function getConcurrent(InputInterface $input): int
{
$concurrent = $input->getOption('concurrent');
if (self::OPTION_VALUE_CONCURRENT_AUTO === $concurrent) {
return $this->calculateAutoConcurrent($input);
}

$concurrent = (int) $concurrent;
if ($concurrent <= 0) {
throw new InvalidOptionException(sprintf(
'Concurrent value [%d] is invalid. Must be 1 or greater',
$concurrent
));
}

return $concurrent;
}

private function calculateAutoConcurrent(InputInterface $input): int
{
$processesPerCore = (float) $input->getOption('processes-per-core');
if ($processesPerCore <= 0) {
throw new InvalidOptionException(sprintf(
'Processes per core value [%f] is invalid. Must be greater than 0',
$processesPerCore
));
}

$minProcesses = (int) $input->getOption('minimum-processes');
if ($minProcesses <= 0) {
throw new InvalidOptionException(sprintf(
'Minimum processes value [%d] is invalid. Must be greater than 0',
$minProcesses
));
}

return max([$minProcesses, (int) round($processesPerCore * $this->cpuCounter->count())]);
}
}
24 changes: 24 additions & 0 deletions packages/messenger/Counter/CpuCounter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

namespace Draw\Component\Messenger\Counter;

use Fidry\CpuCoreCounter\CpuCoreCounter;
use Fidry\CpuCoreCounter\NumberOfCpuCoreNotFound;

class CpuCounter
{
public function count(): int
{
if (!class_exists(CpuCoreCounter::class) || !class_exists(NumberOfCpuCoreNotFound::class)) {
throw new \RuntimeException('"fidry/cpu-core-counter" must be installed');
}

$counter = new CpuCoreCounter();

try {
return $counter->getCount();
} catch (NumberOfCpuCoreNotFound) {
return 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
use Draw\Component\Core\Reflection\ReflectionAccessor;
use Draw\Component\Messenger\Broker\Command\StartMessengerBrokerCommand;
use Draw\Component\Messenger\Broker\Event\BrokerStartedEvent;
use Draw\Component\Messenger\Counter\CpuCounter;
use Draw\Component\Tester\Application\CommandDataTester;
use Draw\Component\Tester\Application\CommandTestTrait;
use Draw\Contracts\Process\ProcessFactoryInterface;
use PHPUnit\Framework\Attributes\CoversClass;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Exception\InvalidOptionException;
Expand All @@ -24,14 +26,17 @@ class StartMessengerBrokerCommandTest extends TestCase

private EventDispatcher $eventDispatcher;

private CpuCounter&MockObject $cpuCounter;

private string $consolePath;

public function createCommand(): Command
{
return new StartMessengerBrokerCommand(
$this->consolePath = uniqid('console-path-'),
$this->processFactory = $this->createMock(ProcessFactoryInterface::class),
$this->eventDispatcher = new EventDispatcher()
$this->eventDispatcher = new EventDispatcher(),
$this->cpuCounter = $this->createMock(CpuCounter::class)
);
}

Expand Down Expand Up @@ -61,6 +66,20 @@ public static function provideTestOption(): iterable
1,
];

yield [
'processes-per-core',
null,
InputOption::VALUE_REQUIRED,
1,
];

yield [
'minimum-processes',
null,
InputOption::VALUE_REQUIRED,
1,
];

yield [
'timeout',
null,
Expand All @@ -87,6 +106,51 @@ public function testExecuteInvalidTimeout(): void
$this->execute(['--timeout' => $timeout]);
}

public function testExecuteInvalidProcessesPerCoreWithAutoConcurrent(): void
{
$processesPerCore = random_int(\PHP_INT_MIN, 0);
$this->expectException(InvalidOptionException::class);
$this->expectExceptionMessage(sprintf(
'Processes per core value [%f] is invalid. Must be greater than 0',
$processesPerCore
));

$this->execute([
'--concurrent' => 'auto',
'--processes-per-core' => $processesPerCore,
]);
}

public function testExecuteInvalidMinimumProcessesWithAutoConcurrent(): void
{
$minProcesses = random_int(\PHP_INT_MIN, 0);
$this->expectException(InvalidOptionException::class);
$this->expectExceptionMessage(sprintf(
'Minimum processes value [%d] is invalid. Must be greater than 0',
$minProcesses
));

$this->execute([
'--concurrent' => 'auto',
'--minimum-processes' => $minProcesses,
]);
}

public function testExecuteInvalidAutoConcurrent(): void
{
$exception = new \RuntimeException('"fidry/cpu-core-counter" must be installed');

$this
->cpuCounter
->method('count')
->willThrowException($exception);

$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage($exception->getMessage());

$this->execute(['--concurrent' => 'auto']);
}

public function testExecute(): void
{
$concurrent = random_int(1, 10);
Expand Down Expand Up @@ -136,6 +200,8 @@ function (BrokerStartedEvent $event) use ($concurrent, $timeout, $context): void
'--context' => $context,
'--concurrent' => $concurrent,
'--timeout' => $timeout,
'--processes-per-core' => random_int(\PHP_INT_MIN, 0),
'--minimum-processes' => random_int(\PHP_INT_MIN, 0),
])->test(
CommandDataTester::create(
0,
Expand All @@ -148,4 +214,103 @@ function (BrokerStartedEvent $event) use ($concurrent, $timeout, $context): void
)
);
}

/**
* @dataProvider provideDataForTestExecuteWithAutoConcurrent
*/
public function testExecuteWithAutoConcurrent(
int $numCpus,
float $processesPerCore,
int $minProcesses,
int $concurrent
): void {
$this
->cpuCounter
->method('count')
->willReturn($numCpus);

$timeout = random_int(1, 10);
$context = uniqid('context-');

$this->eventDispatcher->addListener(
BrokerStartedEvent::class,
function (BrokerStartedEvent $event) use ($concurrent, $timeout, $context): void {
$this->assertSame(
$context,
$event->getBroker()->getContext()
);

$this->assertSame(
$concurrent,
$event->getConcurrent()
);

$this->assertSame(
$timeout,
$event->getTimeout()
);

$broker = $event->getBroker();

$this->assertSame(
$this->processFactory,
ReflectionAccessor::getPropertyValue($broker, 'processFactory')
);

$this->assertSame(
$this->eventDispatcher,
ReflectionAccessor::getPropertyValue($broker, 'eventDispatcher')
);

$this->assertSame(
$this->consolePath,
ReflectionAccessor::getPropertyValue($broker, 'consolePath')
);

$broker->stop();
}
);

$this
->execute([
'--context' => $context,
'--concurrent' => 'auto',
'--timeout' => $timeout,
'--processes-per-core' => $processesPerCore,
'--minimum-processes' => $minProcesses,
])
->test(CommandDataTester::create(
0,
[
'[OK] Broker starting.',
sprintf('! [NOTE] Concurrency %d', $concurrent),
sprintf('! [NOTE] Timeout %d', $timeout),
'[OK] Broker stopped. ',
]
));
}

public static function provideDataForTestExecuteWithAutoConcurrent(): iterable
{
yield [
'$numCpus' => 4,
'$processesPerCore' => 1.0,
'$minProcesses' => 2,
'$concurrent' => 4,
];

yield [
'$numCpus' => 4,
'$processesPerCore' => 0.8,
'$minProcesses' => 1,
'$concurrent' => 3,
];

yield [
'$numCpus' => 2,
'$processesPerCore' => 0.8,
'$minProcesses' => 5,
'$concurrent' => 5,
];
}
}
3 changes: 2 additions & 1 deletion packages/messenger/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
"symfony/process": "^6.4.0",
"symfony/routing": "^6.4.0",
"symfony/translation": "^6.4.0",
"ramsey/uuid": "^4.2"
"ramsey/uuid": "^4.2",
"fidry/cpu-core-counter": "^1.1"
},
"require-dev": {
"ext-pcntl": "*",
Expand Down

0 comments on commit 97ec48a

Please sign in to comment.