Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace queue factory with queue providers #222

Merged
merged 39 commits into from
Dec 7, 2024
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
fdbf137
Remove `@internal` from `QueueInterface`
vjik Oct 31, 2024
a268694
queue provider
vjik Nov 1, 2024
fed6a1c
improve
vjik Nov 1, 2024
c8953c4
`QueueProviderInterfaceProxy`
vjik Nov 5, 2024
372aeca
remove exceptions
vjik Nov 5, 2024
3634ab1
config
vjik Nov 5, 2024
7384ad0
Remove factory
vjik Nov 5, 2024
fe55578
fix tests
vjik Nov 5, 2024
47c6940
test `PrototypeQueueProvider`
vjik Nov 5, 2024
48ad3af
test `FactoryQueueProvider`
vjik Nov 5, 2024
0fb0d45
test `CompositeQueueProvider`
vjik Nov 5, 2024
bfd3445
Apply fixes from StyleCI
StyleCIBot Nov 5, 2024
97689da
Improve `QueueFactoryQueueProvider`
vjik Nov 6, 2024
9bbe358
Add `AdapterFactoryQueueProvider`
vjik Nov 6, 2024
4b25f4b
improve tests
vjik Nov 6, 2024
06a3fb8
Merge remote-tracking branch 'origin/queue-factory' into queue-factory
vjik Nov 6, 2024
8f593d9
Rename "channel-definitions" to "channels"
vjik Nov 7, 2024
394c50f
stubs phpdoc
vjik Nov 7, 2024
7a8601c
phpdoc
vjik Nov 7, 2024
f90b3df
Apply fixes from StyleCI
StyleCIBot Nov 7, 2024
cf9dd96
readme
vjik Nov 7, 2024
45fd659
Merge remote-tracking branch 'origin/queue-factory' into queue-factory
vjik Nov 7, 2024
03b742d
fix cs
vjik Nov 11, 2024
5cd929b
Update src/Command/ListenAllCommand.php
vjik Nov 11, 2024
00ee57d
Move `DEFAULT_CHANNEL_NAME` to `QueueInterface`
vjik Nov 11, 2024
addfd5e
Merge remote-tracking branch 'origin/queue-factory' into queue-factory
vjik Nov 11, 2024
80cc195
Apply fixes from StyleCI
StyleCIBot Nov 11, 2024
808bc5a
Extract stubs to separate namespace
vjik Nov 11, 2024
341f15e
Merge remote-tracking branch 'origin/queue-factory' into queue-factory
vjik Nov 11, 2024
0354ea8
Merge branch 'master' into queue-factory
vjik Nov 11, 2024
305d64f
Fix `Queue`
vjik Nov 18, 2024
136b584
fix exception
vjik Nov 18, 2024
dc95ac7
fix
vjik Nov 18, 2024
3699e6f
Merge branch 'master' into queue-factory
vjik Nov 25, 2024
94d8540
Remove QueueFactoryQueueProvider
viktorprogger Dec 7, 2024
14401e1
Fix docs
viktorprogger Dec 7, 2024
1cc4438
Fix config
viktorprogger Dec 7, 2024
f982680
Bugfixes
viktorprogger Dec 7, 2024
88370f9
Apply fixes from StyleCI
StyleCIBot Dec 7, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ The package could be installed with [Composer](https://getcomposer.org):
composer require yiisoft/queue
```

## Ready for yiisoft/config
## Ready for Yii Config

If you are using [yiisoft/config](https://github.com/yiisoft/config), you'll find out this package has some defaults
in the [`common`](config/di.php) and [`params`](config/params.php) configurations saving your time. Things you should
change to start working with the queue:

- Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation.
- And/or define channel-specific `AdapterInterface` implementations in the `channel-definitions` params key to be used
with the [queue factory](#different-queue-channels).
- And/or define channel-specific `AdapterInterface` implementations in the `channel` params key to be used
with the [queue provider](#different-queue-channels).
- Define [message handlers](docs/guide/worker.md#handler-format) in the `handlers` params key to be used with the `QueueWorker`.
- Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher).

Expand Down Expand Up @@ -136,15 +136,24 @@ $status->isDone();

## Different queue channels

Often we need to push to different queue channels with an only application. There is the `QueueFactory` class to make
different `Queue` objects creation for different channels. With this factory channel-specific `Queue` creation is as
simple as
Often we need to push to different queue channels with an only application. There is the `QueueProviderInterface`
interface that provides different `Queue` objects creation for different channels. With implementation of this interface
channel-specific `Queue` creation is as simple as

```php
$queue = $factory->get('channel-name');
$queue = $provider->get('channel-name');
```

The main usage strategy is with explicit definition of channel-specific adapters. Definitions are passed in
Out of the box, there are four implementations of the `QueueProviderInterface`:

- `AdapterFactoryQueueProvider`
- `QueueFactoryQueueProvider`
- `PrototypeQueueProvider`
- `CompositeQueueProvider`

### `AdapterFactoryQueueProvider`

Provider based on definition of channel-specific adapters. Definitions are passed in
the `$definitions` constructor parameter of the factory, where keys are channel names and values are definitions
for the [`Yiisoft\Factory\Factory`](https://github.com/yiisoft/factory). Below are some examples:

Expand All @@ -163,19 +172,23 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter;

For more information about a definition formats available see the [factory](https://github.com/yiisoft/factory) documentation.

Another queue factory usage strategy is implicit adapter creation via `withChannel()` method call. To use this approach
you should pass some specific constructor parameters:
### `QueueFactoryQueueProvider`

- `true` to the `$enableRuntimeChannelDefinition`
- a default `AdapterInterface` implementation to the `$defaultAdapter`.
Provider is similar to `AdapterFactoryQueueProvider`, but it uses definitions of channel-specific queues.

In this case `$factory->get('channel-name')` call will be converted
to `$this->queue->withAdapter($this->defaultAdapter->withChannel($channel))`, when there is no explicit adapter definition
in the `$definitions`.
### `PrototypeQueueProvider`

Queue provider that only changes the channel name of the base queue. It can be useful when your queues used the same
adapter.

> Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes
> in channel names.

### `CompositeQueueProvider`

This provider allows you to combine multiple providers into one. It will try to get a queue from each provider in the
order they are passed to the constructor. The first queue found will be returned.

## Console execution

The exact way of task execution depends on the adapter used. Most adapters can be run using
Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"psr/log": "^2.0|^3.0",
"symfony/console": "^5.4|^6.0",
"yiisoft/definitions": "^1.0|^2.0|^3.0",
"yiisoft/factory": "dev-strict",
"yiisoft/friendly-exception": "^1.0",
"yiisoft/injector": "^1.0"
},
Expand Down
18 changes: 10 additions & 8 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,22 @@
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPushInterface;
use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher;
use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\Worker as QueueWorker;
use Yiisoft\Queue\Worker\WorkerInterface;

/* @var array $params */

return [
QueueProviderInterface::class => [
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add definitions for all the providers, so if users want to use one of them, they can do it easily with just setting a definition for the provider interface.

AdapterFactoryQueueProvider::class => [...],
QueueFactoryQueueProvider::class => [...],

// and set the default one
QueueProviderInterface::class => AdapterFactoryQueueProvider::class,

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now the configuration is as it was before. I want to improve package configuration in one of the next PRs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no guarantee those future PRs will be merged into master soon. Those who already use this package should have a convenient configuration now. Let's do it before the PR is merged.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently package in dev status. It's OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For a not released package I'm for making a breaking change, but I'm against lack of configuration. I have two arguments for this:

  • Some people already use this package. I'd prefer to make its usage as comfortable as we can. Despite there are breaking changes.
  • If a change needs to have a default configuration, it should have a default configuration from the very beginning. This way we'll avoid decisions, which are good at architectural point, but are hard to be used in real-life projects. The goal is to provide some default configuration, document it and see if it's really comfortable to use in a project. If it's impossible to describe it in simple words - than the decision is likely to be reimagined and simplified for the end user.

'class' => AdapterFactoryQueueProvider::class,
'__construct()' => [
'definitions' => $params['yiisoft/queue']['channels'],
],
],
QueueWorker::class => [
'class' => QueueWorker::class,
'__construct()' => [$params['yiisoft/queue']['handlers']],
Expand All @@ -39,10 +45,6 @@
? $container->get(SignalLoop::class)
: $container->get(SimpleLoop::class);
},
QueueFactoryInterface::class => QueueFactory::class,
QueueFactory::class => [
'__construct()' => ['channelConfiguration' => $params['yiisoft/queue']['channel-definitions']],
],
QueueInterface::class => Queue::class,
MiddlewareFactoryPushInterface::class => MiddlewareFactoryPush::class,
MiddlewareFactoryConsumeInterface::class => MiddlewareFactoryConsume::class,
Expand All @@ -59,12 +61,12 @@
MessageSerializerInterface::class => JsonMessageSerializer::class,
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
'channels' => array_keys($params['yiisoft/queue']['channels']),
],
],
ListenAllCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/queue']['channel-definitions']),
'channels' => array_keys($params['yiisoft/queue']['channels']),
],
],
];
11 changes: 6 additions & 5 deletions config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
use Yiisoft\Queue\Command\ListenCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Debug\QueueCollector;
use Yiisoft\Queue\Debug\QueueFactoryInterfaceProxy;
use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy;
use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\Worker\WorkerInterface;

return [
Expand All @@ -22,8 +23,8 @@
],
'yiisoft/queue' => [
'handlers' => [],
'channel-definitions' => [
QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
'channels' => [
Queue::DEFAULT_CHANNEL_NAME => AdapterInterface::class,
],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand All @@ -34,7 +35,7 @@
QueueCollector::class,
],
'trackedServices' => [
QueueFactoryInterface::class => [QueueFactoryInterfaceProxy::class, QueueCollector::class],
QueueProviderInterface::class => [QueueProviderInterfaceProxy::class, QueueCollector::class],
WorkerInterface::class => [QueueWorkerInterfaceProxy::class, QueueCollector::class],
],
],
Expand Down
37 changes: 37 additions & 0 deletions src/Adapter/StubAdapter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Adapter;

use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;

/**
* Stub adapter that does nothing. Job status is always "done".
viktorprogger marked this conversation as resolved.
Show resolved Hide resolved
*/
final class StubAdapter implements AdapterInterface
{
public function runExisting(callable $handlerCallback): void
{
}

public function status(int|string $id): JobStatus
{
return JobStatus::done();
}

public function push(MessageInterface $message): MessageInterface
{
return $message;
}

public function subscribe(callable $handlerCallback): void
{
}

public function withChannel(string $channel): AdapterInterface
{
return clone $this;
}
}
4 changes: 2 additions & 2 deletions src/Adapter/SynchronousAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use InvalidArgumentException;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\Queue;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Worker\WorkerInterface;
use Yiisoft\Queue\Message\IdEnvelope;
Expand All @@ -20,7 +20,7 @@ final class SynchronousAdapter implements AdapterInterface
public function __construct(
private WorkerInterface $worker,
private QueueInterface $queue,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
private string $channel = Queue::DEFAULT_CHANNEL_NAME,
) {
}

Expand Down
21 changes: 21 additions & 0 deletions src/Cli/StubLoop.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Cli;

/**
* Stub loop.
viktorprogger marked this conversation as resolved.
Show resolved Hide resolved
*/
final class StubLoop implements LoopInterface
{
public function __construct(
private readonly bool $canContinue = true,
) {
}

public function canContinue(): bool
{
return $this->canContinue;
}
}
17 changes: 10 additions & 7 deletions src/Command/ListenAllCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class ListenAllCommand extends Command
{
Expand All @@ -20,8 +20,11 @@ final class ListenAllCommand extends Command
'Listens all configured queues by default in case you\'re using yiisoft/config. ' .
'Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly LoopInterface $loop,
private readonly array $channels
vjik marked this conversation as resolved.
Show resolved Hide resolved
) {
parent::__construct();
}

Expand All @@ -45,7 +48,7 @@ public function configure(): void
'm',
InputOption::VALUE_REQUIRED,
'Maximum number of messages to process in each channel before switching to another channel. ' .
'Default is 0 (no limits).',
'Default is 0 (no limits).',
viktorprogger marked this conversation as resolved.
Show resolved Hide resolved
0,
);

Expand All @@ -57,17 +60,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int
$queues = [];
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$queues[] = $this->queueFactory->get($channel);
$queues[] = $this->queueProvider->get($channel);
}

while ($this->loop->canContinue()) {
$hasMessages = false;
foreach ($queues as $queue) {
$hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages;
$hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages;
}

if (!$hasMessages) {
$pauseSeconds = (int)$input->getOption('pause');
$pauseSeconds = (int) $input->getOption('pause');
if ($pauseSeconds < 0) {
$pauseSeconds = 1;
}
Expand Down
13 changes: 7 additions & 6 deletions src/Command/ListenCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;
use Yiisoft\Queue\Queue;

final class ListenCommand extends Command
{
protected static $defaultName = 'queue:listen';
protected static $defaultDescription = 'Listens the queue and executes messages as they come. Needs to be stopped manually.';

public function __construct(private QueueFactoryInterface $queueFactory)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider
) {
parent::__construct();
}

Expand All @@ -27,13 +28,13 @@ public function configure(): void
'channel',
InputArgument::OPTIONAL,
'Queue channel name to connect to',
QueueFactory::DEFAULT_CHANNEL_NAME
Queue::DEFAULT_CHANNEL_NAME,
);
}

protected function execute(InputInterface $input, OutputInterface $output): int
{
$this->queueFactory
$this->queueProvider
->get($input->getArgument('channel'))
->listen();

Expand Down
10 changes: 6 additions & 4 deletions src/Command/RunCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Output\OutputInterface;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\Provider\QueueProviderInterface;

final class RunCommand extends Command
{
protected static $defaultName = 'queue:run';
protected static $defaultDescription = 'Runs all the existing messages in the given queues. ' .
'Exits once messages are over.';

public function __construct(private QueueFactoryInterface $queueFactory, private array $channels)
{
public function __construct(
private readonly QueueProviderInterface $queueProvider,
private readonly array $channels,
) {
parent::__construct();
}

Expand All @@ -45,7 +47,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int
/** @var string $channel */
foreach ($input->getArgument('channel') as $channel) {
$output->write("Processing channel $channel... ");
$count = $this->queueFactory
$count = $this->queueProvider
->get($channel)
->run((int)$input->getOption('maximum'));

Expand Down
24 changes: 0 additions & 24 deletions src/Debug/QueueFactoryInterfaceProxy.php

This file was deleted.

Loading
Loading