Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into fork/holdmann/suppo…
Browse files Browse the repository at this point in the history
…ring_of_many_channels_many_type_of_messages
  • Loading branch information
viktorprogger committed Dec 7, 2024
2 parents 8f21a91 + 3dd8a7b commit 0a08e2b
Show file tree
Hide file tree
Showing 52 changed files with 1,041 additions and 678 deletions.
71 changes: 51 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ The package could be installed with [Composer](https://getcomposer.org):
composer require yiisoft/queue
```

## Ready for yiisoft/config
## Ready for Yii Config

If you are using [yiisoft/config](https://github.com/yiisoft/config), you'll find out this package has some defaults
in the [`common`](config/di.php) and [`params`](config/params.php) configurations saving your time. Things you should
change to start working with the queue:

- Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation.
- And/or define channel-specific `AdapterInterface` implementations in the `channel-definitions` params key to be used
with the [queue factory](#different-queue-channels).
- And/or define channel-specific `AdapterInterface` implementations in the `channel` params key to be used
with the [queue provider](#different-queue-channels).
- Define [message handlers](docs/guide/worker.md#handler-format) in the `handlers` params key to be used with the `QueueWorker`.
- Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher).

Expand All @@ -53,14 +53,16 @@ Each queue task consists of two parts:
`Yiisoft\Queue\Message\Message`. For more complex cases you should implement the interface by your own.
2. A message handler is a callable called by a `Yiisoft\Queue\Worker\Worker`. The handler handles each queue message.

For example, if you need to download and save a file, your message may look like the following:
For example, if you need to download and save a file, your message creation may look like the following:
- Message handler as the first parameter
- Message data as the second parameter

```php
$data = [
'url' => $url,
'destinationFile' => $filename,
];
$message = new \Yiisoft\Queue\Message\Message('file-download', $data);
$message = new \Yiisoft\Queue\Message\Message(FileDownloader::class, $data);
```

Then you should push it to the queue:
Expand Down Expand Up @@ -93,9 +95,8 @@ class FileDownloader
The last thing we should do is to create a configuration for the `Yiisoft\Queue\Worker\Worker`:

```php
$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']];
$worker = new \Yiisoft\Queue\Worker\Worker(
$handlers, // Here it is
[],
$logger,
$injector,
$container
Expand Down Expand Up @@ -134,17 +135,47 @@ $status->isReserved();
$status->isDone();
```

## Custom handler names
### Custom handler names

By default, when you push a message to the queue, the message handler name is the fully qualified class name of the handler.
This can be useful for most cases, but sometimes you may want to use a shorter name or arbitrary string as the handler name.
This can be useful when you want to reduce the amount of data being passed or when you communicate with external systems.

To use a custom handler name before message push, you can pass it as the first argument `Message` when creating it:
```php
new Message('handler-name', $data);
```

To use a custom handler name on message consume, you should configure handler mapping for the `Worker` class:
```php
$worker = new \Yiisoft\Queue\Worker\Worker(
['handler-name' => FooHandler::class],
$logger,
$injector,
$container
);
```

## Different queue channels

Often we need to push to different queue channels with an only application. There is the `QueueFactory` class to make
different `Queue` objects creation for different channels. With this factory channel-specific `Queue` creation is as
simple as
Often we need to push to different queue channels with an only application. There is the `QueueProviderInterface`
interface that provides different `Queue` objects creation for different channels. With implementation of this interface
channel-specific `Queue` creation is as simple as

```php
$queue = $factory->get('channel-name');
$queue = $provider->get('channel-name');
```

The main usage strategy is with explicit definition of channel-specific adapters. Definitions are passed in
Out of the box, there are four implementations of the `QueueProviderInterface`:

- `AdapterFactoryQueueProvider`
- `PrototypeQueueProvider`
- `CompositeQueueProvider`

### `AdapterFactoryQueueProvider`

Provider based on definition of channel-specific adapters. Definitions are passed in
the `$definitions` constructor parameter of the factory, where keys are channel names and values are definitions
for the [`Yiisoft\Factory\Factory`](https://github.com/yiisoft/factory). Below are some examples:

Expand All @@ -163,19 +194,19 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter;

For more information about a definition formats available see the [factory](https://github.com/yiisoft/factory) documentation.

Another queue factory usage strategy is implicit adapter creation via `withChannel()` method call. To use this approach
you should pass some specific constructor parameters:

- `true` to the `$enableRuntimeChannelDefinition`
- a default `AdapterInterface` implementation to the `$defaultAdapter`.
### `PrototypeQueueProvider`

In this case `$factory->get('channel-name')` call will be converted
to `$this->queue->withAdapter($this->defaultAdapter->withChannel($channel))`, when there is no explicit adapter definition
in the `$definitions`.
Queue provider that only changes the channel name of the base queue. It can be useful when your queues used the same
adapter.

> Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes
> in channel names.
### `CompositeQueueProvider`

This provider allows you to combine multiple providers into one. It will try to get a queue from each provider in the
order they are passed to the constructor. The first queue found will be returned.

## Console execution

The exact way of task execution depends on the adapter used. Most adapters can be run using
Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"psr/log": "^2.0|^3.0",
"symfony/console": "^5.4|^6.0",
"yiisoft/definitions": "^1.0|^2.0|^3.0",
"yiisoft/factory": "dev-strict",
"yiisoft/friendly-exception": "^1.0",
"yiisoft/injector": "^1.0"
},
Expand All @@ -60,7 +61,8 @@
},
"autoload": {
"psr-4": {
"Yiisoft\\Queue\\": "src"
"Yiisoft\\Queue\\": "src",
"Yiisoft\\Queue\\Stubs\\": "stubs"
}
},
"autoload-dev": {
Expand Down
18 changes: 10 additions & 8 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPushInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\Worker as QueueWorker;
use Yiisoft\Queue\Worker\WorkerInterface;

/* @var array $params */

return [
AdapterFactoryQueueProvider::class => [
'__construct()' => [
'definitions' => $params['yiisoft/queue']['channels'],
],
],
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,
QueueWorker::class => [
'class' => QueueWorker::class,
'__construct()' => [$params['yiisoft/queue']['handlers']],
Expand All @@ -39,10 +45,6 @@
? $container->get(SignalLoop::class)
: $container->get(SimpleLoop::class);
},
QueueFactoryInterface::class => QueueFactory::class,
QueueFactory::class => [
'__construct()' => ['channelConfiguration' => $params['yiisoft/queue']['channel-definitions']],
],
QueueInterface::class => Queue::class,
MiddlewareFactoryPushInterface::class => MiddlewareFactoryPush::class,
MiddlewareFactoryConsumeInterface::class => MiddlewareFactoryConsume::class,
Expand All @@ -59,12 +61,12 @@
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
'channels' => array_keys($params['yiisoft/queue']['channels']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
'channels' => array_keys($params['yiisoft/queue']['channels']),
],
],
];
11 changes: 6 additions & 5 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
use Yiisoft\Queue\Command\ListenCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Debug\QueueCollector;
use Yiisoft\Queue\Debug\QueueFactoryInterfaceProxy;
use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy;
use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;

return [
Expand All @@ -22,8 +23,8 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channel-definitions' => [
QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
'channels' => [
QueueInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand All @@ -34,7 +35,7 @@
QueueCollector::class,
],
'trackedServices' => [
QueueFactoryInterface::class => [QueueFactoryInterfaceProxy::class, QueueCollector::class],
QueueProviderInterface::class => [QueueProviderInterfaceProxy::class, QueueCollector::class],
WorkerInterface::class => [QueueWorkerInterfaceProxy::class, QueueCollector::class],
],
],
Expand Down
3 changes: 1 addition & 2 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
use InvalidArgumentException;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;
Expand All @@ -20,7 +19,7 @@ final class SynchronousAdapter implements AdapterInterface
public function __construct(
private WorkerInterface $worker,
private QueueInterface $queue,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
private string $channel = QueueInterface::DEFAULT_CHANNEL_NAME,
) {
}

Expand Down
17 changes: 10 additions & 7 deletions src/Command/ListenAllCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class ListenAllCommand extends Command
{
Expand All @@ -20,8 +20,11 @@ final class ListenAllCommand extends Command
'Listens all configured queues by default in case you\'re using yiisoft/config. ' .
'Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels,
) {
parent::__construct();
}

Expand All @@ -45,7 +48,7 @@ public function configure(): void
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. ' .
'Default is 0 (no limits).',
'Default is 0 (no limits).',
0,
);

Expand All @@ -57,17 +60,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueFactory->get($channel);
$queues[] = $this->queueProvider->get($channel);
}

while ($this->loop->canContinue()) {
$hasMessages = false;
foreach ($queues as $queue) {
$hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages;
$hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;
}

if (!$hasMessages) {
$pauseSeconds = (int)$input->getOption('pause');
$pauseSeconds = (int) $input->getOption('pause');
if ($pauseSeconds < 0) {
$pauseSeconds = 1;
}
Expand Down
13 changes: 7 additions & 6 deletions src/Command/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\QueueInterface;

final class ListenCommand extends Command
{
protected static $defaultName = 'queue:listen';
protected static $defaultDescription = 'Listens the queue and executes messages as they come. Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider
) {
parent::__construct();
}

Expand All @@ -27,13 +28,13 @@ public function configure(): void
'channel',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueFactory::DEFAULT_CHANNEL_NAME
QueueInterface::DEFAULT_CHANNEL_NAME,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueFactory
$this->queueProvider
->get($input->getArgument('channel'))
->listen();

Expand Down
10 changes: 6 additions & 4 deletions src/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class RunCommand extends Command
{
protected static $defaultName = 'queue:run';
protected static $defaultDescription = 'Runs all the existing messages in the given queues. ' .
'Exits once messages are over.';

public function __construct(private QueueFactoryInterface $queueFactory, private array $channels)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly array $channels,
) {
parent::__construct();
}

Expand All @@ -45,7 +47,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");
$count = $this->queueFactory
$count = $this->queueProvider
->get($channel)
->run((int)$input->getOption('maximum'));

Expand Down
24 changes: 0 additions & 24 deletions src/Debug/QueueFactoryInterfaceProxy.php

This file was deleted.

Loading

0 comments on commit 0a08e2b

Please sign in to comment.