Skip to content

Commit

Permalink
Merge pull request #61 from ostark/test-rate-limiter
Browse files Browse the repository at this point in the history
Fix & improve RateLimiter
  • Loading branch information
Oliver Stark authored Sep 22, 2022
2 parents abab77c + 6eddc51 commit 9e225aa
Show file tree
Hide file tree
Showing 15 changed files with 204 additions and 50 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

All notable changes to this project will be documented in this file.

## [3.1.0] - 2022-09-22

> {note} Upgrading is highly recommended. The previous version did not limit concurrency of queue runners. `ASYNC_QUEUE_CONCURRENCY` defaults to `1` now.
- Fixed RateLimiter
- Added tests for RateLimiter / concurrency
- Concurrency of queue runners defaults to `1` now


## [3.0.0] - 2022-05-12
- Craft 4 support
- PHP 8 syntax
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ PHP_BINARY="/usr/local/Cellar/php71/7.1.0_11/bin/php"
```


By default `2` background processes handle the queue. With the `ASYNC_QUEUE_CONCURRENCY` ENV var you can modify this behaviour.
By default `1` background process handles the queue. With the `ASYNC_QUEUE_CONCURRENCY` ENV var you can modify this behaviour.
```
# No concurrency
ASYNC_QUEUE_CONCURRENCY=1
Expand Down
7 changes: 2 additions & 5 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,14 @@
"require": {
"php": "^8.0",
"craftcms/cms": "^4.0.0",
"symfony/process": "^5.0",
"treeware/plant": "^0.1.0"
"symfony/process": "^5.0"
},
"autoload": {
"psr-4": {
"ostark\\AsyncQueue\\": "src/"
}
},
"extra": {
"treeware": {},
"name": "AsyncQueue",
"handle": "async-queue",
"hasCpSettings": false,
Expand All @@ -51,8 +49,7 @@
"config": {
"allow-plugins": {
"yiisoft/yii2-composer": true,
"craftcms/plugin-installer": true,
"treeware/plant": true
"craftcms/plugin-installer": true
}
},
"prefer-stable": true,
Expand Down
6 changes: 2 additions & 4 deletions src/BackgroundProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,10 @@ public function __construct(QueueCommand $command = null)
public function start()
{
$cmd = $this->command->getPreparedCommand();
$cwd = realpath(CRAFT_BASE_PATH);

$process = Process::fromShellCommandline($cmd, $cwd);
$process = Process::fromShellCommandline($cmd);

try {
$process->run();
$process->start();
} catch (\Symfony\Component\Process\Exception\RuntimeException $runtimeException) {
$runtimeException = new RuntimeException($runtimeException->getMessage());
$runtimeException->setProcess($process);
Expand Down
13 changes: 8 additions & 5 deletions src/Handlers/BackgroundQueueHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,24 +26,28 @@ public function __construct(Plugin $plugin)

public function __invoke(PushEvent $event): void
{

$context = ($event->job instanceof JobInterface)
? $event->job->getDescription()
: 'Not instanceof craft\queue\JobInterface';

// Run queue in the background
if ($this->plugin->getRateLimiter()->canIUse($context)) {

try {
$this->plugin->getProcess()->start();
$process = $this->plugin->getProcess()->start();
$this->plugin->getRateLimiter()->increment();
$handled = true;

$process->wait();

} catch (PhpExecutableNotFound) {
Craft::debug(
Craft::error(
'QueueHandler::startBackgroundProcess() (PhpExecutableNotFound)',
'async-queue'
);
} catch (RuntimeException | LogicException $e) {
Craft::debug(
Craft::error(
Craft::t(
'async-queue',
'QueueHandler::startBackgroundProcess() (Job status: {status}. Exit code: {code})', [
Expand All @@ -54,7 +58,6 @@ public function __invoke(PushEvent $event): void
'async-queue'
);
}

}

// Log what's going on
Expand All @@ -69,7 +72,7 @@ protected function logPushEvent(PushEvent $event, bool $handled = false): void
}

if ($event->job instanceof BaseJob) {
Craft::debug(
Craft::info(
Craft::t(
'async-queue',
'New PushEvent for {job} job - ({handled})', [
Expand Down
7 changes: 5 additions & 2 deletions src/QueueCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ class QueueCommand extends Component
{
public const DEFAULT_SCRIPT = "craft";

public const DEFAULT_ARGS = "queue/run";
public const DEFAULT_ARGS = "queue/run -v";

public const EVENT_PREPARE_COMMAND = 'prepareCommand';

Expand Down Expand Up @@ -45,7 +45,9 @@ public function getPreparedCommand(callable $wrapper = null): string
throw new PhpExecutableNotFound('Unable to find php executable.');
}

$commandLine = implode(" ", [$php, $this->scriptName, $this->scriptArgs]);
$path = realpath(CRAFT_BASE_PATH);
$script = $path . DIRECTORY_SEPARATOR . $this->scriptName;
$commandLine = implode(" ", [$php, $script, $this->scriptArgs]);

return $this->decorate($commandLine);
}
Expand All @@ -67,6 +69,7 @@ protected function decorate(string $commandLine): string

// default decoration
return "nice -n 15 {$commandLine} > /dev/null 2>&1 &";

}

}
10 changes: 10 additions & 0 deletions src/RateLimiter.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,12 @@ public function __construct(Queue $queue, Settings $settings)
*/
public function canIUse(string $context = null): bool
{
if ($this->internalCount >= $this->maxItems) {
return false;
}

try {
$this->queue->channel = 'queue';
$reserved = $this->queue->getTotalReserved();
} catch (\Exception) {
$reserved = 0;
Expand All @@ -54,6 +59,11 @@ public function increment(): void
$this->internalCount++;
}

public function getInternalCount(): int
{
return $this->internalCount;
}


protected function logAttempt(int $currentUsage, string $context = null): void
{
Expand Down
21 changes: 3 additions & 18 deletions src/Settings.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,9 @@

class Settings extends Model
{
// Public Properties
// =========================================================================
public int $concurrency;

/**
* @var integer
*/
public $concurrency;

/**
* @var integer
*/
public $poolLifetime;

/**
* @var bool
*/
public $enabled = true;
public bool $enabled = true;


/**
Expand All @@ -29,8 +15,7 @@ class Settings extends Model
public function __construct(array $config = [])
{
$config = array_merge([
'concurrency' => (int)$this->env('ASYNC_QUEUE_CONCURRENCY', 2),
'poolLifetime' => (int)$this->env('ASYNC_QUEUE_POOL_LIFETIME', 3600),
'concurrency' => (int) $this->env('ASYNC_QUEUE_CONCURRENCY', 1),
'enabled' => ($this->env('DISABLE_ASYNC_QUEUE', '0') == '1') ? false : true
], $config);

Expand Down
2 changes: 1 addition & 1 deletion src/TestUtility/TestJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class TestJob extends BaseJob
*/
public function execute($queue): void
{
sleep(10);
sleep(3);
}


Expand Down
30 changes: 24 additions & 6 deletions tests/BackgroundProcessRunTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,40 @@ public function test_start_default_dummy_script_success(): void
$bgProcess = new BackgroundProcess($command);
$process = $bgProcess->start();

$process->wait();

// give it some time to write the test file
usleep(150000);

$this->assertEquals(0, $process->getExitCode());
$this->assertTrue($process->isSuccessful());
$this->assertEquals(\Symfony\Component\Process\Process::STATUS_TERMINATED, $process->getStatus());

// Wait 0.25 seconds
usleep(250000);
$this->assertFileExists(TEST_FILE);

$content = json_decode(file_get_contents(TEST_FILE), true);

$this->assertTrue(is_array($content), 'Unable to read and json_decode test file.');
$this->assertContains('craft.php', $content['$argv']);
$this->assertContains('queue/run', $content['$argv']);
$this->assertStringContainsString('craft.php', $content['$argv'][0]);
$this->assertStringContainsString('queue/run', $content['$argv'][1]);
$this->assertGreaterThanOrEqual($content['timestamp'], time());
}

/**
* @covers \ostark\AsyncQueue\BackgroundProcess::start
*/
public function test_process_does_not_block(): void
{
$command = new \ostark\AsyncQueue\QueueCommand('craft.php', '--sleep');
$bgProcess = new BackgroundProcess($command);
$process = $bgProcess->start();

}
// give it some time to write the test file
usleep(150000);

$this->assertFileExists(TEST_FILE);

$content = json_decode(file_get_contents(TEST_FILE), true);
$this->assertGreaterThanOrEqual($content['timestamp'], time());

}
}
73 changes: 73 additions & 0 deletions tests/RateLimiterTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
<?php

use ostark\AsyncQueue\BackgroundProcess;
use PHPUnit\Framework\TestCase;
use yii\queue\PushEvent;

/**
* @covers \ostark\AsyncQueue\RateLimiter
* @covers \ostark\AsyncQueue\Handlers\BackgroundQueueHandler
*/
class RateLimiterTest extends TestCase
{
public $plugin;

public function setUp(): void
{
parent::setUp();

$dummyCommand = new \ostark\AsyncQueue\QueueCommand('craft.php', '--sleep');
$this->plugin = new \ostark\AsyncQueue\Plugin('async-queue', null, []);
$this->plugin->set('async_process', new BackgroundProcess($dummyCommand));
}

public function tearDown(): void
{
parent::tearDown();
@unlink(TEST_FILE);
}


public function test_can_setup_handler_and_invoke_event(): void
{
$this->plugin = new \ostark\AsyncQueue\Plugin('async-queue', null, []);
$handler = new \ostark\AsyncQueue\Handlers\BackgroundQueueHandler($this->plugin);

$handler->__invoke(new PushEvent());

$this->assertSame(1, $this->plugin->getRateLimiter()->getInternalCount());
}

public function test_respect_the_limit_when_adding_multiple_jobs_in_one_request(): void
{
$handler = new \ostark\AsyncQueue\Handlers\BackgroundQueueHandler($this->plugin);

$handler->__invoke(new PushEvent());
$handler->__invoke(new PushEvent());
$handler->__invoke(new PushEvent());
$handler->__invoke(new PushEvent());

$this->assertSame(
$this->plugin->getRateLimiter()->maxItems,
$this->plugin->getRateLimiter()->getInternalCount()
);
}

public function test_stop_spawning_processes_when_too_many_jobs_are_reserved(): void
{
// Fake the reserved count
$queue = new class extends \craft\queue\Queue {
public function getTotalReserved(): int
{
return 5;
}
};

$this->plugin->set('async_rate_limiter', new \ostark\AsyncQueue\RateLimiter($queue, $this->plugin->getSettings()));
$handler = new \ostark\AsyncQueue\Handlers\BackgroundQueueHandler($this->plugin);

$handler->__invoke(new PushEvent());

$this->assertSame(0, $this->plugin->getRateLimiter()->getInternalCount());
}
}
Empty file added tests/_craft/storage/.gitkeep
Empty file.
2 changes: 2 additions & 0 deletions tests/_craft/storage/runtime/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*
!.gitignore
Loading

0 comments on commit 9e225aa

Please sign in to comment.