Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into remove-queue-channe…
Browse files Browse the repository at this point in the history
…l-name

# Conflicts:
#	src/Queue.php
#	src/QueueFactory.php
#	tests/Unit/QueueFactoryTest.php
#	tests/Unit/QueueTest.php
  • Loading branch information
viktorprogger committed Dec 8, 2024
2 parents 1a7c3c9 + 3602957 commit 864f58d
Show file tree
Hide file tree
Showing 45 changed files with 852 additions and 651 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/rector.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
on:
pull_request:
pull_request_target:
paths-ignore:
- 'docs/**'
- 'README.md'
Expand All @@ -17,6 +17,7 @@ jobs:
secrets:
token: ${{ secrets.YIISOFT_GITHUB_TOKEN }}
with:
repository: ${{ github.event.pull_request.head.repo.full_name }}
os: >-
['ubuntu-latest']
php: >-
Expand Down
40 changes: 24 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ The package could be installed with [Composer](https://getcomposer.org):
composer require yiisoft/queue
```

## Ready for yiisoft/config
## Ready for Yii Config

If you are using [yiisoft/config](https://github.com/yiisoft/config), you'll find out this package has some defaults
in the [`common`](config/di.php) and [`params`](config/params.php) configurations saving your time. Things you should
change to start working with the queue:

- Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation.
- And/or define channel-specific `AdapterInterface` implementations in the `channel-definitions` params key to be used
with the [queue factory](#different-queue-channels).
- And/or define channel-specific `AdapterInterface` implementations in the `channel` params key to be used
with the [queue provider](#different-queue-channels).
- Define [message handlers](docs/guide/worker.md#handler-format) in the `handlers` params key to be used with the `QueueWorker`.
- Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher).

Expand Down Expand Up @@ -159,15 +159,23 @@ $worker = new \Yiisoft\Queue\Worker\Worker(

## Different queue channels

Often we need to push to different queue channels with an only application. There is the `QueueFactory` class to make
different `Queue` objects creation for different channels. With this factory channel-specific `Queue` creation is as
simple as
Often we need to push to different queue channels with an only application. There is the `QueueProviderInterface`
interface that provides different `Queue` objects creation for different channels. With implementation of this interface
channel-specific `Queue` creation is as simple as

```php
$queue = $factory->get('channel-name');
$queue = $provider->get('channel-name');
```

The main usage strategy is with explicit definition of channel-specific adapters. Definitions are passed in
Out of the box, there are four implementations of the `QueueProviderInterface`:

- `AdapterFactoryQueueProvider`
- `PrototypeQueueProvider`
- `CompositeQueueProvider`

### `AdapterFactoryQueueProvider`

Provider based on definition of channel-specific adapters. Definitions are passed in
the `$definitions` constructor parameter of the factory, where keys are channel names and values are definitions
for the [`Yiisoft\Factory\Factory`](https://github.com/yiisoft/factory). Below are some examples:

Expand All @@ -186,19 +194,19 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter;

For more information about a definition formats available see the [factory](https://github.com/yiisoft/factory) documentation.

Another queue factory usage strategy is implicit adapter creation via `withChannel()` method call. To use this approach
you should pass some specific constructor parameters:
### `PrototypeQueueProvider`

- `true` to the `$enableRuntimeChannelDefinition`
- a default `AdapterInterface` implementation to the `$defaultAdapter`.

In this case `$factory->get('channel-name')` call will be converted
to `$this->queue->withAdapter($this->defaultAdapter->withChannel($channel))`, when there is no explicit adapter definition
in the `$definitions`.
Queue provider that only changes the channel name of the base queue. It can be useful when your queues used the same
adapter.

> Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes
> in channel names.
### `CompositeQueueProvider`

This provider allows you to combine multiple providers into one. It will try to get a queue from each provider in the
order they are passed to the constructor. The first queue found will be returned.

## Console execution

The exact way of task execution depends on the adapter used. Most adapters can be run using
Expand Down
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@
"psr/log": "^2.0|^3.0",
"symfony/console": "^5.4|^6.0",
"yiisoft/definitions": "^1.0|^2.0|^3.0",
"yiisoft/factory": "dev-strict",
"yiisoft/friendly-exception": "^1.0",
"yiisoft/injector": "^1.0"
},
"require-dev": {
"maglnet/composer-require-checker": "^4.7",
"phpbench/phpbench": "^1.3",
"phpunit/phpunit": "^10.5",
"rector/rector": "^1.0.0",
"rector/rector": "^1.2",
"roave/infection-static-analysis-plugin": "^1.34",
"spatie/phpunit-watcher": "^1.23",
"vimeo/psalm": "^5.20",
Expand All @@ -60,7 +61,8 @@
},
"autoload": {
"psr-4": {
"Yiisoft\\Queue\\": "src"
"Yiisoft\\Queue\\": "src",
"Yiisoft\\Queue\\Stubs\\": "stubs"
}
},
"autoload-dev": {
Expand Down
18 changes: 10 additions & 8 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPushInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\Worker as QueueWorker;
use Yiisoft\Queue\Worker\WorkerInterface;

/* @var array $params */

return [
AdapterFactoryQueueProvider::class => [
'__construct()' => [
'definitions' => $params['yiisoft/queue']['channels'],
],
],
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,
QueueWorker::class => [
'class' => QueueWorker::class,
'__construct()' => [$params['yiisoft/queue']['handlers']],
Expand All @@ -39,10 +45,6 @@
? $container->get(SignalLoop::class)
: $container->get(SimpleLoop::class);
},
QueueFactoryInterface::class => QueueFactory::class,
QueueFactory::class => [
'__construct()' => ['channelConfiguration' => $params['yiisoft/queue']['channel-definitions']],
],
QueueInterface::class => Queue::class,
MiddlewareFactoryPushInterface::class => MiddlewareFactoryPush::class,
MiddlewareFactoryConsumeInterface::class => MiddlewareFactoryConsume::class,
Expand All @@ -59,12 +61,12 @@
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
'channels' => array_keys($params['yiisoft/queue']['channels']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
'channels' => array_keys($params['yiisoft/queue']['channels']),
],
],
];
11 changes: 6 additions & 5 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
use Yiisoft\Queue\Command\ListenCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Debug\QueueCollector;
use Yiisoft\Queue\Debug\QueueFactoryInterfaceProxy;
use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy;
use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;

return [
Expand All @@ -22,8 +23,8 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channel-definitions' => [
QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
'channels' => [
QueueInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand All @@ -34,7 +35,7 @@
QueueCollector::class,
],
'trackedServices' => [
QueueFactoryInterface::class => [QueueFactoryInterfaceProxy::class, QueueCollector::class],
QueueProviderInterface::class => [QueueProviderInterfaceProxy::class, QueueCollector::class],
WorkerInterface::class => [QueueWorkerInterfaceProxy::class, QueueCollector::class],
],
],
Expand Down
2 changes: 0 additions & 2 deletions rector.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector;
use Rector\Config\RectorConfig;
use Rector\Php73\Rector\FuncCall\JsonThrowOnErrorRector;
use Rector\Php74\Rector\Closure\ClosureToArrowFunctionRector;
use Rector\Php81\Rector\Property\ReadOnlyPropertyRector;
use Rector\Set\ValueObject\LevelSetList;
Expand All @@ -25,7 +24,6 @@

$rectorConfig->skip([
ClosureToArrowFunctionRector::class,
JsonThrowOnErrorRector::class,
ReadOnlyPropertyRector::class,
]);
};
3 changes: 1 addition & 2 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use InvalidArgumentException;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;
Expand All @@ -20,7 +19,7 @@ final class SynchronousAdapter implements AdapterInterface
public function __construct(
private WorkerInterface $worker,
private QueueInterface $queue,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
private string $channel = QueueInterface::DEFAULT_CHANNEL_NAME,
) {
}

Expand Down
17 changes: 10 additions & 7 deletions src/Command/ListenAllCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class ListenAllCommand extends Command
{
Expand All @@ -20,8 +20,11 @@ final class ListenAllCommand extends Command
'Listens all configured queues by default in case you\'re using yiisoft/config. ' .
'Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels,
) {
parent::__construct();
}

Expand All @@ -45,7 +48,7 @@ public function configure(): void
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. ' .
'Default is 0 (no limits).',
'Default is 0 (no limits).',
0,
);

Expand All @@ -57,17 +60,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueFactory->get($channel);
$queues[] = $this->queueProvider->get($channel);
}

while ($this->loop->canContinue()) {
$hasMessages = false;
foreach ($queues as $queue) {
$hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages;
$hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;
}

if (!$hasMessages) {
$pauseSeconds = (int)$input->getOption('pause');
$pauseSeconds = (int) $input->getOption('pause');
if ($pauseSeconds < 0) {
$pauseSeconds = 1;
}
Expand Down
13 changes: 7 additions & 6 deletions src/Command/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;

final class ListenCommand extends Command
{
protected static $defaultName = 'queue:listen';
protected static $defaultDescription = 'Listens the queue and executes messages as they come. Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider
) {
parent::__construct();
}

Expand All @@ -27,13 +28,13 @@ public function configure(): void
'channel',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueFactory::DEFAULT_CHANNEL_NAME
QueueInterface::DEFAULT_CHANNEL_NAME,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueFactory
$this->queueProvider
->get($input->getArgument('channel'))
->listen();

Expand Down
10 changes: 6 additions & 4 deletions src/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class RunCommand extends Command
{
protected static $defaultName = 'queue:run';
protected static $defaultDescription = 'Runs all the existing messages in the given queues. ' .
'Exits once messages are over.';

public function __construct(private QueueFactoryInterface $queueFactory, private array $channels)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly array $channels,
) {
parent::__construct();
}

Expand All @@ -45,7 +47,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");
$count = $this->queueFactory
$count = $this->queueProvider
->get($channel)
->run((int)$input->getOption('maximum'));

Expand Down
24 changes: 0 additions & 24 deletions src/Debug/QueueFactoryInterfaceProxy.php

This file was deleted.

Loading

0 comments on commit 864f58d

Please sign in to comment.