diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 2dc627e1..61a36c51 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -20,7 +20,7 @@ jobs: - windows-latest php-version: - - "7.4" + - "7.4.5" - "8.0" steps: diff --git a/composer.json b/composer.json index 9711250e..89c34a9c 100644 --- a/composer.json +++ b/composer.json @@ -66,9 +66,9 @@ "tests-app": [ "$common", "$console", - "tests/App/config/main.php", "tests/App/config/console.php" - ] + ], + "events-console": "config/events-console.php" } }, "config": { diff --git a/config/common.php b/config/common.php index bf333217..f8db1f67 100644 --- a/config/common.php +++ b/config/common.php @@ -15,6 +15,6 @@ EventDispatcherInterface::class => Dispatcher::class, WorkerInterface::class => QueueWorker::class, ListenerProviderInterface::class => Provider::class, - ContainerInterface::class => Container::class, + ContainerInterface::class => fn (ContainerInterface $container) => $container, LoopInterface::class => SignalLoop::class ]; diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 662e1775..337e4813 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -10,4 +10,10 @@ ./tests/unit + + + + ./src + + diff --git a/src/DriverInterface.php b/src/Driver/DriverInterface.php similarity index 95% rename from src/DriverInterface.php rename to src/Driver/DriverInterface.php index 3f46b7e2..38ef9340 100644 --- a/src/DriverInterface.php +++ b/src/Driver/DriverInterface.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Yiisoft\Yii\Queue; +namespace Yiisoft\Yii\Queue\Driver; use InvalidArgumentException; use Yiisoft\Yii\Queue\Enum\JobStatus; @@ -10,6 +10,7 @@ use Yiisoft\Yii\Queue\Job\JobInterface; use Yiisoft\Yii\Queue\Job\PrioritisedJobInterface; use Yiisoft\Yii\Queue\Job\RetryableJobInterface; +use Yiisoft\Yii\Queue\MessageInterface; interface DriverInterface { diff --git a/src/Driver/SynchronousDriver.php b/src/Driver/SynchronousDriver.php new file mode 100644 index 00000000..cf794828 --- /dev/null +++ b/src/Driver/SynchronousDriver.php @@ -0,0 +1,100 @@ +loop = $loop; + $this->worker = $worker; + } + + public function __destruct() + { + $this->run([$this->worker, 'process']); + } + + public function nextMessage(): ?MessageInterface + { + $message = null; + + if (isset($this->messages[$this->current])) { + $message = $this->messages[$this->current]; + unset($this->messages[$this->current]); + $this->current++; + } + + return $message; + } + + public function status(string $id): JobStatus + { + $id = (int) $id; + + if ($id < 0) { + throw new InvalidArgumentException('This driver ids starts with 0'); + } + + if ($id < $this->current) { + return JobStatus::done(); + } + + if (isset($this->messages[$id])) { + return JobStatus::waiting(); + } + + throw new InvalidArgumentException('There is no job with the given id.'); + } + + public function push(JobInterface $job): MessageInterface + { + $key = count($this->messages) + $this->current; + $message = new Message((string) $key, $job); + $this->messages[] = $message; + + return $message; + } + + public function subscribe(callable $handler): void + { + $this->run($handler); + } + + public function canPush(JobInterface $job): bool + { + return !($job instanceof DelayableJobInterface || $job instanceof PrioritisedJobInterface); + } + + public function setQueue(Queue $queue): void + { + $this->queue = $queue; + } + + private function run(callable $handler): void + { + while ($this->loop->canContinue() && $message = $this->nextMessage()) { + $handler($message, $this->queue); + } + } +} diff --git a/src/Exception/JobNotSupportedException.php b/src/Exception/JobNotSupportedException.php index 180477aa..5ab673d0 100644 --- a/src/Exception/JobNotSupportedException.php +++ b/src/Exception/JobNotSupportedException.php @@ -7,7 +7,7 @@ use Throwable; use UnexpectedValueException; use Yiisoft\FriendlyException\FriendlyExceptionInterface; -use Yiisoft\Yii\Queue\DriverInterface; +use Yiisoft\Yii\Queue\Driver\DriverInterface; use Yiisoft\Yii\Queue\Job\DelayableJobInterface; use Yiisoft\Yii\Queue\Job\JobInterface; use Yiisoft\Yii\Queue\Job\PrioritisedJobInterface; diff --git a/src/Job/RetryableJobInterface.php b/src/Job/RetryableJobInterface.php index 33c444a0..e57e12bf 100644 --- a/src/Job/RetryableJobInterface.php +++ b/src/Job/RetryableJobInterface.php @@ -1,10 +1,6 @@ getQueue() === $this && $job->canRetry($event->getException()) ) { + $event->preventThrowing(); $this->logger->debug('Retrying job "{job}".', ['job' => get_class($job)]); $job->retry(); $this->push($job); @@ -70,9 +72,9 @@ public function jobRetry(JobFailure $event): void * * @param JobInterface|mixed $job * - * @return string|null id of a job message + * @return string id of a job message */ - public function push(JobInterface $job): ?string + public function push(JobInterface $job): string { $this->logger->debug('Preparing to push job "{job}".', ['job' => get_class($job)]); $event = new BeforePush($this, $job); diff --git a/tests/App/DelayableJob.php b/tests/App/DelayableJob.php new file mode 100644 index 00000000..0ba6968a --- /dev/null +++ b/tests/App/DelayableJob.php @@ -0,0 +1,15 @@ + - */ -class PriorityJob extends BaseObject implements JobInterface -{ - public $number; - - public function __construct($number) - { - $this->number = $number; - } - - public function execute($queue) - { - file_put_contents(self::getFileName(), $this->number, FILE_APPEND); - } - - public static function getFileName() - { - return Yii::getAlias('@runtime/job-priority.log'); - } -} diff --git a/tests/App/RetryJob.php b/tests/App/RetryJob.php deleted file mode 100644 index 6b6edeb8..00000000 --- a/tests/App/RetryJob.php +++ /dev/null @@ -1,50 +0,0 @@ - - */ -class RetryJob extends BaseObject implements RetryableJobInterface -{ - public $uid; - - public function __construct($uid) - { - $this->uid = $uid; - } - - public function execute($queue) - { - file_put_contents($this->getFileName(), 'a', FILE_APPEND); - - throw new \Exception('Planned error.'); - } - - public function getFileName() - { - return Yii::getAlias("@runtime/job-{$this->uid}.lock"); - } - - public function getTtr(): int - { - return 2; - } - - public function canRetry($attempt, $error): bool - { - return $attempt < 2; - } -} diff --git a/tests/App/RetryableJob.php b/tests/App/RetryableJob.php new file mode 100644 index 00000000..ff246535 --- /dev/null +++ b/tests/App/RetryableJob.php @@ -0,0 +1,32 @@ +attemptsMax = $attemptsMax; + } + + public function getTtr(): int + { + return 1; + } + + public function execute(): void + { + if ($this->canRetry()) { + throw new RuntimeException('Test exception'); + } + + $this->executed = true; + } +} diff --git a/tests/App/SimpleJob.php b/tests/App/SimpleJob.php index 595f86c2..c0fa94d8 100644 --- a/tests/App/SimpleJob.php +++ b/tests/App/SimpleJob.php @@ -1,10 +1,6 @@ NullLogger::class, + DriverInterface::class => SynchronousDriver::class, ]; diff --git a/tests/App/config/main.php b/tests/App/config/main.php deleted file mode 100644 index b6251283..00000000 --- a/tests/App/config/main.php +++ /dev/null @@ -1,4 +0,0 @@ -container = new Container(require Builder::path('tests-app')); + $eventConfigurator = $this->container->get(EventConfigurator::class); + $eventConfigurator->registerListeners(require Builder::path('events-console')); } } diff --git a/tests/unit/QueueTest.php b/tests/unit/QueueTest.php new file mode 100644 index 00000000..019a062d --- /dev/null +++ b/tests/unit/QueueTest.php @@ -0,0 +1,100 @@ +eventManager = $this->createMock(EventManager::class); + + $configurator = $this->container->get(EventConfigurator::class); + $configurator->registerListeners([BeforePush::class => [[$this->eventManager, 'beforePushHandler']]]); + $configurator->registerListeners([AfterPush::class => [[$this->eventManager, 'afterPushHandler']]]); + $configurator->registerListeners([BeforeExecution::class => [[$this->eventManager, 'beforeExecutionHandler']]]); + $configurator->registerListeners([AfterExecution::class => [[$this->eventManager, 'afterExecutionHandler']]]); + $configurator->registerListeners([JobFailure::class => [[$this->eventManager, 'jobFailureHandler']]]); + } + + public function testPushSuccessful(): void + { + $this->eventManager->expects(self::once())->method('beforePushHandler'); + $this->eventManager->expects(self::once())->method('afterPushHandler'); + $this->eventManager->expects(self::never())->method('beforeExecutionHandler'); + $this->eventManager->expects(self::never())->method('afterExecutionHandler'); + $this->eventManager->expects(self::never())->method('jobFailureHandler'); + + $queue = $this->container->get(Queue::class); + $job = $this->container->get(SimpleJob::class); + $id = $queue->push($job); + + $this->assertNotEquals('', $id, 'Pushed message should has an id'); + } + + public function testPushNotSuccessful(): void + { + $this->expectException(JobNotSupportedException::class); + $this->eventManager->expects(self::once())->method('beforePushHandler'); + $this->eventManager->expects(self::never())->method('afterPushHandler'); + $this->eventManager->expects(self::never())->method('beforeExecutionHandler'); + $this->eventManager->expects(self::never())->method('afterExecutionHandler'); + $this->eventManager->expects(self::never())->method('jobFailureHandler'); + + $queue = $this->container->get(Queue::class); + $job = $this->container->get(DelayableJob::class); + $queue->push($job); + } + + public function testJobRetry(): void + { + $this->eventManager->expects(self::exactly(2))->method('beforePushHandler'); + $this->eventManager->expects(self::exactly(2))->method('afterPushHandler'); + $this->eventManager->expects(self::exactly(2))->method('beforeExecutionHandler'); + $this->eventManager->expects(self::once())->method('afterExecutionHandler'); + $this->eventManager->expects(self::once())->method('jobFailureHandler'); + + $queue = $this->container->get(Queue::class); + $job = $this->container->get(RetryableJob::class); + $queue->push($job); + $queue->run(); + + $this->assertTrue($job->executed); + } + + public function testStatus(): void + { + $queue = $this->container->get(Queue::class); + $job = $this->container->get(SimpleJob::class); + $id = $queue->push($job); + + $status = $queue->status($id); + $this->assertEquals(true, $status->isWaiting()); + + $queue->run(); + $status = $queue->status($id); + $this->assertEquals(true, $status->isDone()); + } +} diff --git a/tests/unit/SynchronousDriverTest.php b/tests/unit/SynchronousDriverTest.php new file mode 100644 index 00000000..b1dfb9ba --- /dev/null +++ b/tests/unit/SynchronousDriverTest.php @@ -0,0 +1,62 @@ +container->get(Queue::class); + $job = $this->container->get($class); + + if (!$available) { + $this->expectException(JobNotSupportedException::class); + } + + $id = $queue->push($job); + + if ($available) { + $this->assertTrue($id >= 0); + } + } + + public static function getJobTypes(): array + { + return [ + 'Simple job' => [ + SimpleJob::class, + true, + ], + DelayableJobInterface::class => [ + DelayableJob::class, + false, + ], + PrioritisedJobInterface::class => [ + PrioritizedJob::class, + false, + ], + RetryableJobInterface::class => [ + RetryableJob::class, + true, + ], + ]; + } +} diff --git a/tests/unit/WorkerTest.php b/tests/unit/WorkerTest.php index 33f40018..25a506b0 100644 --- a/tests/unit/WorkerTest.php +++ b/tests/unit/WorkerTest.php @@ -49,7 +49,6 @@ public function testJobExecuted(): void public function testJobNotExecuted(): void { $handler = fn (BeforeExecution $event) => $event->stopExecution(); - /** @var EventConfigurator $configurator */ $configurator = $this->container->get(EventConfigurator::class); $configurator->registerListeners([BeforeExecution::class => [$handler]]);