diff --git a/.github/workflows/rector.yml b/.github/workflows/rector.yml index 35411d0a..457772af 100644 --- a/.github/workflows/rector.yml +++ b/.github/workflows/rector.yml @@ -1,5 +1,5 @@ on: - pull_request: + pull_request_target: paths-ignore: - 'docs/**' - 'README.md' @@ -17,6 +17,7 @@ jobs: secrets: token: ${{ secrets.YIISOFT_GITHUB_TOKEN }} with: + repository: ${{ github.event.pull_request.head.repo.full_name }} os: >- ['ubuntu-latest'] php: >- diff --git a/README.md b/README.md index 1706eaf0..4914175e 100644 --- a/README.md +++ b/README.md @@ -28,15 +28,15 @@ The package could be installed with [Composer](https://getcomposer.org): composer require yiisoft/queue ``` -## Ready for yiisoft/config +## Ready for Yii Config If you are using [yiisoft/config](https://github.com/yiisoft/config), you'll find out this package has some defaults in the [`common`](config/di.php) and [`params`](config/params.php) configurations saving your time. Things you should change to start working with the queue: - Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation. -- And/or define channel-specific `AdapterInterface` implementations in the `channel-definitions` params key to be used - with the [queue factory](#different-queue-channels). +- And/or define channel-specific `AdapterInterface` implementations in the `channel` params key to be used + with the [queue provider](#different-queue-channels). - Define [message handlers](docs/guide/worker.md#handler-format) in the `handlers` params key to be used with the `QueueWorker`. - Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher). @@ -159,15 +159,23 @@ $worker = new \Yiisoft\Queue\Worker\Worker( ## Different queue channels -Often we need to push to different queue channels with an only application. There is the `QueueFactory` class to make -different `Queue` objects creation for different channels. With this factory channel-specific `Queue` creation is as -simple as +Often we need to push to different queue channels with an only application. There is the `QueueProviderInterface` +interface that provides different `Queue` objects creation for different channels. With implementation of this interface +channel-specific `Queue` creation is as simple as ```php -$queue = $factory->get('channel-name'); +$queue = $provider->get('channel-name'); ``` -The main usage strategy is with explicit definition of channel-specific adapters. Definitions are passed in +Out of the box, there are four implementations of the `QueueProviderInterface`: + +- `AdapterFactoryQueueProvider` +- `PrototypeQueueProvider` +- `CompositeQueueProvider` + +### `AdapterFactoryQueueProvider` + +Provider based on definition of channel-specific adapters. Definitions are passed in the `$definitions` constructor parameter of the factory, where keys are channel names and values are definitions for the [`Yiisoft\Factory\Factory`](https://github.com/yiisoft/factory). Below are some examples: @@ -186,19 +194,19 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter; For more information about a definition formats available see the [factory](https://github.com/yiisoft/factory) documentation. -Another queue factory usage strategy is implicit adapter creation via `withChannel()` method call. To use this approach -you should pass some specific constructor parameters: +### `PrototypeQueueProvider` -- `true` to the `$enableRuntimeChannelDefinition` -- a default `AdapterInterface` implementation to the `$defaultAdapter`. - -In this case `$factory->get('channel-name')` call will be converted -to `$this->queue->withAdapter($this->defaultAdapter->withChannel($channel))`, when there is no explicit adapter definition -in the `$definitions`. +Queue provider that only changes the channel name of the base queue. It can be useful when your queues used the same +adapter. > Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes > in channel names. +### `CompositeQueueProvider` + +This provider allows you to combine multiple providers into one. It will try to get a queue from each provider in the +order they are passed to the constructor. The first queue found will be returned. + ## Console execution The exact way of task execution depends on the adapter used. Most adapters can be run using diff --git a/composer.json b/composer.json index 664f5853..d03fd620 100644 --- a/composer.json +++ b/composer.json @@ -41,6 +41,7 @@ "psr/log": "^2.0|^3.0", "symfony/console": "^5.4|^6.0", "yiisoft/definitions": "^1.0|^2.0|^3.0", + "yiisoft/factory": "dev-strict", "yiisoft/friendly-exception": "^1.0", "yiisoft/injector": "^1.0" }, @@ -48,7 +49,7 @@ "maglnet/composer-require-checker": "^4.7", "phpbench/phpbench": "^1.3", "phpunit/phpunit": "^10.5", - "rector/rector": "^1.0.0", + "rector/rector": "^1.2", "roave/infection-static-analysis-plugin": "^1.34", "spatie/phpunit-watcher": "^1.23", "vimeo/psalm": "^5.20", @@ -60,7 +61,8 @@ }, "autoload": { "psr-4": { - "Yiisoft\\Queue\\": "src" + "Yiisoft\\Queue\\": "src", + "Yiisoft\\Queue\\Stubs\\": "stubs" } }, "autoload-dev": { diff --git a/config/di.php b/config/di.php index 3af1ad8b..b9b28033 100644 --- a/config/di.php +++ b/config/di.php @@ -19,9 +19,9 @@ use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush; use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPushInterface; use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; +use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\Queue; -use Yiisoft\Queue\QueueFactory; -use Yiisoft\Queue\QueueFactoryInterface; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Worker\Worker as QueueWorker; use Yiisoft\Queue\Worker\WorkerInterface; @@ -29,6 +29,12 @@ /* @var array $params */ return [ + AdapterFactoryQueueProvider::class => [ + '__construct()' => [ + 'definitions' => $params['yiisoft/queue']['channels'], + ], + ], + QueueProviderInterface::class => AdapterFactoryQueueProvider::class, QueueWorker::class => [ 'class' => QueueWorker::class, '__construct()' => [$params['yiisoft/queue']['handlers']], @@ -39,10 +45,6 @@ ? $container->get(SignalLoop::class) : $container->get(SimpleLoop::class); }, - QueueFactoryInterface::class => QueueFactory::class, - QueueFactory::class => [ - '__construct()' => ['channelConfiguration' => $params['yiisoft/queue']['channel-definitions']], - ], QueueInterface::class => Queue::class, MiddlewareFactoryPushInterface::class => MiddlewareFactoryPush::class, MiddlewareFactoryConsumeInterface::class => MiddlewareFactoryConsume::class, @@ -59,12 +61,12 @@ MessageSerializerInterface::class => JsonMessageSerializer::class, RunCommand::class => [ '__construct()' => [ - 'channels' => array_keys($params['yiisoft/queue']['channel-definitions']), + 'channels' => array_keys($params['yiisoft/queue']['channels']), ], ], ListenAllCommand::class => [ '__construct()' => [ - 'channels' => array_keys($params['yiisoft/queue']['channel-definitions']), + 'channels' => array_keys($params['yiisoft/queue']['channels']), ], ], ]; diff --git a/config/params.php b/config/params.php index 1251c408..fc87d078 100644 --- a/config/params.php +++ b/config/params.php @@ -7,9 +7,10 @@ use Yiisoft\Queue\Command\ListenCommand; use Yiisoft\Queue\Command\RunCommand; use Yiisoft\Queue\Debug\QueueCollector; -use Yiisoft\Queue\Debug\QueueFactoryInterfaceProxy; +use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy; use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; +use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Worker\WorkerInterface; return [ @@ -22,8 +23,8 @@ ], 'yiisoft/queue' => [ 'handlers' => [], - 'channel-definitions' => [ - QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class, + 'channels' => [ + QueueInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class, ], 'middlewares-push' => [], 'middlewares-consume' => [], @@ -34,7 +35,7 @@ QueueCollector::class, ], 'trackedServices' => [ - QueueFactoryInterface::class => [QueueFactoryInterfaceProxy::class, QueueCollector::class], + QueueProviderInterface::class => [QueueProviderInterfaceProxy::class, QueueCollector::class], WorkerInterface::class => [QueueWorkerInterfaceProxy::class, QueueCollector::class], ], ], diff --git a/rector.php b/rector.php index 90fea6ba..dd88e1b4 100644 --- a/rector.php +++ b/rector.php @@ -4,7 +4,6 @@ use Rector\CodeQuality\Rector\Class_\InlineConstructorDefaultToPropertyRector; use Rector\Config\RectorConfig; -use Rector\Php73\Rector\FuncCall\JsonThrowOnErrorRector; use Rector\Php74\Rector\Closure\ClosureToArrowFunctionRector; use Rector\Php81\Rector\Property\ReadOnlyPropertyRector; use Rector\Set\ValueObject\LevelSetList; @@ -25,7 +24,6 @@ $rectorConfig->skip([ ClosureToArrowFunctionRector::class, - JsonThrowOnErrorRector::class, ReadOnlyPropertyRector::class, ]); }; diff --git a/src/Adapter/SynchronousAdapter.php b/src/Adapter/SynchronousAdapter.php index 7c36f716..84148941 100644 --- a/src/Adapter/SynchronousAdapter.php +++ b/src/Adapter/SynchronousAdapter.php @@ -7,7 +7,6 @@ use InvalidArgumentException; use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\MessageInterface; -use Yiisoft\Queue\QueueFactory; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Worker\WorkerInterface; use Yiisoft\Queue\Message\IdEnvelope; @@ -20,7 +19,7 @@ final class SynchronousAdapter implements AdapterInterface public function __construct( private WorkerInterface $worker, private QueueInterface $queue, - private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME, + private string $channel = QueueInterface::DEFAULT_CHANNEL_NAME, ) { } diff --git a/src/Command/ListenAllCommand.php b/src/Command/ListenAllCommand.php index e94a77fe..8be6dead 100644 --- a/src/Command/ListenAllCommand.php +++ b/src/Command/ListenAllCommand.php @@ -10,7 +10,7 @@ use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Yiisoft\Queue\Cli\LoopInterface; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; final class ListenAllCommand extends Command { @@ -20,8 +20,11 @@ final class ListenAllCommand extends Command 'Listens all configured queues by default in case you\'re using yiisoft/config. ' . 'Needs to be stopped manually.'; - public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels) - { + public function __construct( + private readonly QueueProviderInterface $queueProvider, + private readonly LoopInterface $loop, + private readonly array $channels, + ) { parent::__construct(); } @@ -45,7 +48,7 @@ public function configure(): void 'm', InputOption::VALUE_REQUIRED, 'Maximum number of messages to process in each channel before switching to another channel. ' . - 'Default is 0 (no limits).', + 'Default is 0 (no limits).', 0, ); @@ -57,17 +60,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int $queues = []; /** @var string $channel */ foreach ($input->getArgument('channel') as $channel) { - $queues[] = $this->queueFactory->get($channel); + $queues[] = $this->queueProvider->get($channel); } while ($this->loop->canContinue()) { $hasMessages = false; foreach ($queues as $queue) { - $hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages; + $hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages; } if (!$hasMessages) { - $pauseSeconds = (int)$input->getOption('pause'); + $pauseSeconds = (int) $input->getOption('pause'); if ($pauseSeconds < 0) { $pauseSeconds = 1; } diff --git a/src/Command/ListenCommand.php b/src/Command/ListenCommand.php index 21915c47..ecafb837 100644 --- a/src/Command/ListenCommand.php +++ b/src/Command/ListenCommand.php @@ -8,16 +8,17 @@ use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -use Yiisoft\Queue\QueueFactory; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; +use Yiisoft\Queue\QueueInterface; final class ListenCommand extends Command { protected static $defaultName = 'queue:listen'; protected static $defaultDescription = 'Listens the queue and executes messages as they come. Needs to be stopped manually.'; - public function __construct(private QueueFactoryInterface $queueFactory) - { + public function __construct( + private readonly QueueProviderInterface $queueProvider + ) { parent::__construct(); } @@ -27,13 +28,13 @@ public function configure(): void 'channel', InputArgument::OPTIONAL, 'Queue channel name to connect to', - QueueFactory::DEFAULT_CHANNEL_NAME + QueueInterface::DEFAULT_CHANNEL_NAME, ); } protected function execute(InputInterface $input, OutputInterface $output): int { - $this->queueFactory + $this->queueProvider ->get($input->getArgument('channel')) ->listen(); diff --git a/src/Command/RunCommand.php b/src/Command/RunCommand.php index f01a9e17..a6bd3c18 100644 --- a/src/Command/RunCommand.php +++ b/src/Command/RunCommand.php @@ -9,7 +9,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; final class RunCommand extends Command { @@ -17,8 +17,10 @@ final class RunCommand extends Command protected static $defaultDescription = 'Runs all the existing messages in the given queues. ' . 'Exits once messages are over.'; - public function __construct(private QueueFactoryInterface $queueFactory, private array $channels) - { + public function __construct( + private readonly QueueProviderInterface $queueProvider, + private readonly array $channels, + ) { parent::__construct(); } @@ -45,7 +47,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int /** @var string $channel */ foreach ($input->getArgument('channel') as $channel) { $output->write("Processing channel $channel... "); - $count = $this->queueFactory + $count = $this->queueProvider ->get($channel) ->run((int)$input->getOption('maximum')); diff --git a/src/Debug/QueueFactoryInterfaceProxy.php b/src/Debug/QueueFactoryInterfaceProxy.php deleted file mode 100644 index cbfe4e76..00000000 --- a/src/Debug/QueueFactoryInterfaceProxy.php +++ /dev/null @@ -1,24 +0,0 @@ -queueFactory->get($channel); - - return new QueueDecorator($queue, $this->collector); - } -} diff --git a/src/Debug/QueueProviderInterfaceProxy.php b/src/Debug/QueueProviderInterfaceProxy.php new file mode 100644 index 00000000..5f00096e --- /dev/null +++ b/src/Debug/QueueProviderInterfaceProxy.php @@ -0,0 +1,28 @@ +queueProvider->get($channel); + return new QueueDecorator($queue, $this->collector); + } + + public function has(string $channel): bool + { + return $this->queueProvider->has($channel); + } +} diff --git a/src/Exception/AdapterConfiguration/AdapterNotConfiguredException.php b/src/Exception/AdapterConfiguration/AdapterNotConfiguredException.php index 487fa5ca..68ae754a 100644 --- a/src/Exception/AdapterConfiguration/AdapterNotConfiguredException.php +++ b/src/Exception/AdapterConfiguration/AdapterNotConfiguredException.php @@ -6,8 +6,8 @@ use RuntimeException; use Yiisoft\FriendlyException\FriendlyExceptionInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\Queue; -use Yiisoft\Queue\QueueFactory; class AdapterNotConfiguredException extends RuntimeException implements FriendlyExceptionInterface { @@ -21,7 +21,7 @@ public function getName(): string public function getSolution(): ?string { $queueClass = Queue::class; - $factoryClass = QueueFactory::class; + $queueProviderInterface = QueueProviderInterface::class; return <<channel = $channel; - parent::__construct($message, $code, $previous); - } - - public function getName(): string - { - return 'Incorrect queue channel configuration'; - } - - public function getSolution(): ?string - { - $factoryClass = QueueFactory::class; - - return <<channel" which is incorrectly configured. - Please take a look to the documentation for the $factoryClass "\$definitions" constructor parameter. - SOLUTION; - } -} diff --git a/src/Exception/AdapterConfiguration/ChannelNotConfiguredException.php b/src/Exception/AdapterConfiguration/ChannelNotConfiguredException.php deleted file mode 100644 index ae417d60..00000000 --- a/src/Exception/AdapterConfiguration/ChannelNotConfiguredException.php +++ /dev/null @@ -1,39 +0,0 @@ -channel = $channel; - parent::__construct($message, $code, $previous); - } - - public function getName(): string - { - return 'Queue channel is not properly configured'; - } - - public function getSolution(): ?string - { - $factoryClass = QueueFactory::class; - - return <<channel" creation is not configured in the $factoryClass. - Please take a look to the documentation for the $factoryClass constructor. - The most important parameters are "\$definitions" and "\$enableRuntimeChannelDefinition". - - SOLUTION; - } -} diff --git a/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php b/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php index be8de7df..7cef6572 100644 --- a/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php +++ b/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php @@ -9,11 +9,11 @@ final class ConsumeMiddlewareDispatcher { /** - * Contains a middleware pipeline handler. + * Contains a middleware pipeline handlers. * - * @var MiddlewareConsumeStack|null The middleware stack. + * @var MiddlewareConsumeStack[] The middleware stack divided by message types. */ - private ?MiddlewareConsumeStack $stack = null; + private array $stack = []; /** * @var array[]|callable[]|MiddlewareConsumeInterface[]|string[] @@ -37,11 +37,12 @@ public function dispatch( ConsumeRequest $request, MessageHandlerConsumeInterface $finishHandler ): ConsumeRequest { - if ($this->stack === null) { - $this->stack = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler); + $handlerName = $request->getMessage()->getHandlerName(); + if (!array_key_exists($handlerName, $this->stack)) { + $this->stack[$handlerName] = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler); } - return $this->stack->handleConsume($request); + return $this->stack[$handlerName]->handleConsume($request); } /** @@ -68,7 +69,7 @@ public function withMiddlewares(array $middlewareDefinitions): self // Fixes a memory leak. unset($instance->stack); - $instance->stack = null; + $instance->stack = []; return $instance; } diff --git a/src/Provider/AdapterFactoryQueueProvider.php b/src/Provider/AdapterFactoryQueueProvider.php new file mode 100644 index 00000000..ffe777f5 --- /dev/null +++ b/src/Provider/AdapterFactoryQueueProvider.php @@ -0,0 +1,95 @@ + + */ + private array $queues = []; + + private readonly StrictFactory $factory; + + /** + * @param QueueInterface $baseQueue Base queue for queues creation. + * @param array $definitions Adapter definitions indexed by channel names. + * @param ContainerInterface|null $container Container to use for dependencies resolving. + * @param bool $validate If definitions should be validated when set. + * + * @psalm-param array $definitions + * @throws InvalidQueueConfigException + */ + public function __construct( + private readonly QueueInterface $baseQueue, + array $definitions, + ?ContainerInterface $container = null, + bool $validate = true, + ) { + try { + $this->factory = new StrictFactory($definitions, $container, $validate); + } catch (InvalidConfigException $exception) { + throw new InvalidQueueConfigException($exception->getMessage(), previous: $exception); + } + } + + public function get(string $channel): QueueInterface + { + $queue = $this->getOrTryToCreate($channel); + if ($queue === null) { + throw new ChannelNotFoundException($channel); + } + return $queue; + } + + public function has(string $channel): bool + { + return $this->factory->has($channel); + } + + /** + * @throws InvalidQueueConfigException + */ + private function getOrTryToCreate(string $channel): QueueInterface|null + { + if (array_key_exists($channel, $this->queues)) { + return $this->queues[$channel]; + } + + if ($this->factory->has($channel)) { + $adapter = $this->factory->create($channel); + if (!$adapter instanceof AdapterInterface) { + throw new InvalidQueueConfigException( + sprintf( + 'Adapter must implement "%s". For channel "%s" got "%s" instead.', + AdapterInterface::class, + $channel, + get_debug_type($adapter), + ), + ); + } + $this->queues[$channel] = $this->baseQueue->withAdapter($adapter)->withChannelName($channel); + } else { + $this->queues[$channel] = null; + } + + return $this->queues[$channel]; + } +} diff --git a/src/Provider/ChannelNotFoundException.php b/src/Provider/ChannelNotFoundException.php new file mode 100644 index 00000000..2538cda7 --- /dev/null +++ b/src/Provider/ChannelNotFoundException.php @@ -0,0 +1,25 @@ +providers = $providers; + } + + public function get(string $channel): QueueInterface + { + foreach ($this->providers as $provider) { + if ($provider->has($channel)) { + return $provider->get($channel); + } + } + throw new ChannelNotFoundException($channel); + } + + public function has(string $channel): bool + { + foreach ($this->providers as $provider) { + if ($provider->has($channel)) { + return true; + } + } + return false; + } +} diff --git a/src/Provider/InvalidQueueConfigException.php b/src/Provider/InvalidQueueConfigException.php new file mode 100644 index 00000000..136fe746 --- /dev/null +++ b/src/Provider/InvalidQueueConfigException.php @@ -0,0 +1,14 @@ +baseQueue->withChannelName($channel); + } + + public function has(string $channel): bool + { + return true; + } +} diff --git a/src/Provider/QueueProviderException.php b/src/Provider/QueueProviderException.php new file mode 100644 index 00000000..17fa3bc4 --- /dev/null +++ b/src/Provider/QueueProviderException.php @@ -0,0 +1,14 @@ + $channelConfiguration Configuration array in [channel_name => definition] format. - * "Definition" here is a {@see Factory} definition - * @param QueueInterface $queue A default queue implementation. `$queue->withAdapter()` will be returned - * with the `get` method - * @param bool $enableRuntimeChannelDefinition A flag whether to enable a such behavior when there is no - * explicit channel adapter definition: `return $this->queue->withAdapter($this->adapter->withChannel($channel)` - * When this flag is set to false, only explicit definitions from the $definition parameter are used. - * @param AdapterInterface|null $defaultAdapter A default adapter implementation. - * It must be set when $enableRuntimeChannelDefinition is true. - */ - public function __construct( - private array $channelConfiguration, - private QueueInterface $queue, - private ContainerInterface $container, - private CallableFactory $callableFactory, - private Injector $injector, - private bool $enableRuntimeChannelDefinition = false, - private ?AdapterInterface $defaultAdapter = null, - ) { - if ($enableRuntimeChannelDefinition === true && $defaultAdapter === null) { - $message = 'Either $enableRuntimeChannelDefinition must be false, or $defaultAdapter should be provided.'; - - throw new InvalidArgumentException($message); - } - } - - public function get(string $channel = self::DEFAULT_CHANNEL_NAME): QueueInterface - { - if ($channel === $this->queue->getChannelName()) { - return $this->queue; - } - - if (isset($this->queueCollection[$channel]) && $this->queueCollection[$channel]->get() !== null) { - $queue = $this->queueCollection[$channel]->get(); - } else { - $queue = $this->create($channel); - $this->queueCollection[$channel] = WeakReference::create($queue); - } - - return $queue; - } - - /** - * @throws ChannelIncorrectlyConfigured - * @return QueueInterface - */ - private function create(string $channel): QueueInterface - { - if (isset($this->channelConfiguration[$channel])) { - $definition = $this->channelConfiguration[$channel]; - $this->checkDefinitionType($channel, $definition); - $adapter = $this->createFromDefinition($channel, $definition)->withChannel($channel); - - return $this->queue->withAdapter($adapter); - } - - if ($this->enableRuntimeChannelDefinition === false) { - throw new ChannelNotConfiguredException($channel); - } - - /** @psalm-suppress PossiblyNullReference */ - return $this->queue->withAdapter($this->defaultAdapter->withChannel($channel)); - } - - private function checkDefinitionType(string $channel, mixed $definition): void - { - if ( - !$definition instanceof AdapterInterface - && !is_array($definition) - && !is_callable($definition) - && !is_string($definition) - ) { - throw new ChannelIncorrectlyConfigured($channel, $definition); - } - } - - public function createFromDefinition( - string $channel, - AdapterInterface|callable|array|string $definition - ): AdapterInterface { - if ($definition instanceof AdapterInterface) { - return $definition; - } - - if (is_string($definition)) { - return $this->getFromContainer($channel, $definition); - } - - return $this->tryGetFromCallable($channel, $definition) - ?? $this->tryGetFromArrayDefinition($channel, $definition) - ?? throw new ChannelIncorrectlyConfigured($channel, $definition); - } - - private function getFromContainer(string $channel, string $definition): AdapterInterface - { - if (class_exists($definition)) { - if (is_subclass_of($definition, AdapterInterface::class)) { - /** @var AdapterInterface */ - return $this->container->get($definition); - } - } elseif ($this->container->has($definition)) { - $middleware = $this->container->get($definition); - if ($middleware instanceof AdapterInterface) { - return $middleware; - } - } - - throw new ChannelIncorrectlyConfigured($channel, $definition); - } - - private function tryGetFromCallable( - string $channel, - callable|AdapterInterface|array|string $definition - ): ?AdapterInterface { - $callable = null; - - if ($definition instanceof Closure) { - $callable = $definition; - } - - if ( - is_array($definition) - && array_keys($definition) === [0, 1] - ) { - try { - $callable = $this->callableFactory->create($definition); - } catch (InvalidCallableConfigurationException $exception) { - throw new ChannelIncorrectlyConfigured($channel, $definition, previous: $exception); - } - } - - if ($callable !== null) { - $adapter = $this->injector->invoke($callable); - - if (!$adapter instanceof AdapterInterface) { - throw new ChannelIncorrectlyConfigured($channel, $definition); - } - } - - return null; - } - - private function tryGetFromArrayDefinition( - string $channel, - callable|AdapterInterface|array|string $definition - ): ?AdapterInterface { - if (!is_array($definition)) { - return null; - } - - try { - DefinitionValidator::validateArrayDefinition($definition); - - $middleware = ArrayDefinition::fromConfig($definition)->resolve($this->container); - if ($middleware instanceof AdapterInterface) { - return $middleware; - } - - throw new ChannelIncorrectlyConfigured($channel, $definition); - } catch (InvalidConfigException) { - } - - throw new ChannelIncorrectlyConfigured($channel, $definition); - } -} diff --git a/src/QueueFactoryInterface.php b/src/QueueFactoryInterface.php deleted file mode 100644 index 9761e583..00000000 --- a/src/QueueFactoryInterface.php +++ /dev/null @@ -1,18 +0,0 @@ -canContinue; + } +} diff --git a/stubs/StubQueue.php b/stubs/StubQueue.php new file mode 100644 index 00000000..8436b46a --- /dev/null +++ b/stubs/StubQueue.php @@ -0,0 +1,71 @@ +adapter; + } + + public function withAdapter(AdapterInterface $adapter): QueueInterface + { + $new = clone $this; + $new->adapter = $adapter; + return $new; + } + + public function getChannelName(): string + { + return $this->channelName; + } + + public function withChannelName(string $channel): QueueInterface + { + $new = clone $this; + $new->channelName = $channel; + if ($new->adapter !== null) { + $new->adapter = $new->adapter->withChannel($channel); + } + return $new; + } +} diff --git a/stubs/StubWorker.php b/stubs/StubWorker.php new file mode 100644 index 00000000..7a939006 --- /dev/null +++ b/stubs/StubWorker.php @@ -0,0 +1,20 @@ +messagesProcessed = []; + $this->messagesProcessedSecond = []; $container = $this->createMock(ContainerInterface::class); $worker = new Worker( - ['test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData()], + [ + 'test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData(), + 'test2' => fn (MessageInterface $message): mixed => $this->messagesProcessedSecond[] = $message->getData(), + ], new NullLogger(), new Injector($container), $container, @@ -38,9 +43,11 @@ public function testMessagesConsumed(): void $messages = [1, 'foo', 'bar-baz']; foreach ($messages as $message) { $worker->process(new Message('test', $message), $this->getQueue()); + $worker->process(new Message('test2', $message), $this->getQueue()); } $this->assertEquals($messages, $this->messagesProcessed); + $this->assertEquals($messages, $this->messagesProcessedSecond); } public function testMessagesConsumedByHandlerClass(): void diff --git a/tests/Integration/QueueFactoryTest.php b/tests/Integration/QueueFactoryTest.php deleted file mode 100644 index 50076896..00000000 --- a/tests/Integration/QueueFactoryTest.php +++ /dev/null @@ -1,82 +0,0 @@ -createMock(WorkerInterface::class); - $queue = $this->getDefaultQueue($worker); - $container = $this->createMock(ContainerInterface::class); - $factory = new QueueFactory( - [], - $queue, - $container, - new CallableFactory($container), - new Injector($container), - true, - new SynchronousAdapter($worker, $queue) - ); - - $adapter = $factory->get('test-channel'); - - self::assertEquals('test-channel', $adapter->getChannelName()); - } - - public function testConfiguredChange(): void - { - $worker = $this->createMock(WorkerInterface::class); - $queue = $this->getDefaultQueue($worker); - $container = $this->createMock(ContainerInterface::class); - $factory = new QueueFactory( - [ - 'test-channel' => [ - 'class' => FakeAdapter::class, - 'withChannel()' => ['test-channel'], - ], - QueueFactoryInterface::DEFAULT_CHANNEL_NAME => [ - 'class' => FakeAdapter::class, - 'withChannel()' => [QueueFactoryInterface::DEFAULT_CHANNEL_NAME], - ], - ], - $queue, - $container, - new CallableFactory($container), - new Injector($container), - true, - new SynchronousAdapter($worker, $queue) - ); - $queue = $factory->get('test-channel'); - - self::assertEquals('test-channel', $queue->getChannelName()); - self::assertEquals(QueueFactoryInterface::DEFAULT_CHANNEL_NAME, $factory->get()->getChannelName()); - } - - private function getDefaultQueue(WorkerInterface $worker): Queue - { - return new Queue( - $worker, - $this->createMock(LoopInterface::class), - $this->createMock(LoggerInterface::class), - new PushMiddlewareDispatcher($this->createMock(MiddlewareFactoryPushInterface::class)), - ); - } -} diff --git a/tests/Unit/Command/ListenCommandTest.php b/tests/Unit/Command/ListenCommandTest.php index f4a337a4..2e2971ea 100644 --- a/tests/Unit/Command/ListenCommandTest.php +++ b/tests/Unit/Command/ListenCommandTest.php @@ -8,14 +8,14 @@ use Symfony\Component\Console\Input\StringInput; use Symfony\Component\Console\Output\OutputInterface; use Yiisoft\Queue\Command\ListenCommand; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\QueueInterface; final class ListenCommandTest extends TestCase { public function testConfigure(): void { - $command = new ListenCommand($this->createMock(QueueFactoryInterface::class)); + $command = new ListenCommand($this->createMock(QueueProviderInterface::class)); $channelArgument = $command->getNativeDefinition()->getArgument('channel'); $this->assertEquals('channel', $channelArgument->getName()); } @@ -24,7 +24,7 @@ public function testExecute(): void { $queue = $this->createMock(QueueInterface::class); $queue->expects($this->once())->method('listen'); - $queueFactory = $this->createMock(QueueFactoryInterface::class); + $queueFactory = $this->createMock(QueueProviderInterface::class); $queueFactory->method('get')->willReturn($queue); $input = new StringInput('channel'); diff --git a/tests/Unit/Command/RunCommandTest.php b/tests/Unit/Command/RunCommandTest.php index fb70914a..3e38305e 100644 --- a/tests/Unit/Command/RunCommandTest.php +++ b/tests/Unit/Command/RunCommandTest.php @@ -8,14 +8,14 @@ use Symfony\Component\Console\Input\StringInput; use Symfony\Component\Console\Output\OutputInterface; use Yiisoft\Queue\Command\RunCommand; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\QueueInterface; final class RunCommandTest extends TestCase { public function testConfigure(): void { - $command = new RunCommand($this->createMock(QueueFactoryInterface::class), []); + $command = new RunCommand($this->createMock(QueueProviderInterface::class), []); $channelArgument = $command->getNativeDefinition()->getArgument('channel'); $this->assertEquals('channel', $channelArgument->getName()); } @@ -24,11 +24,11 @@ public function testExecute(): void { $queue = $this->createMock(QueueInterface::class); $queue->expects($this->once())->method('run'); - $queueFactory = $this->createMock(QueueFactoryInterface::class); - $queueFactory->method('get')->willReturn($queue); + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->method('get')->willReturn($queue); $input = new StringInput('channel'); - $command = new RunCommand($queueFactory, []); + $command = new RunCommand($queueProvider, []); $exitCode = $command->run($input, $this->createMock(OutputInterface::class)); $this->assertEquals(0, $exitCode); diff --git a/tests/Unit/Debug/QueueFactoryInterfaceProxyTest.php b/tests/Unit/Debug/QueueProviderInterfaceProxyTest.php similarity index 61% rename from tests/Unit/Debug/QueueFactoryInterfaceProxyTest.php rename to tests/Unit/Debug/QueueProviderInterfaceProxyTest.php index d8534aa3..2bde59c5 100644 --- a/tests/Unit/Debug/QueueFactoryInterfaceProxyTest.php +++ b/tests/Unit/Debug/QueueProviderInterfaceProxyTest.php @@ -7,20 +7,20 @@ use PHPUnit\Framework\TestCase; use Yiisoft\Queue\Debug\QueueCollector; use Yiisoft\Queue\Debug\QueueDecorator; -use Yiisoft\Queue\Debug\QueueFactoryInterfaceProxy; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\QueueInterface; -class QueueFactoryInterfaceProxyTest extends TestCase +class QueueProviderInterfaceProxyTest extends TestCase { public function testGet(): void { - $queueFactory = $this->createMock(QueueFactoryInterface::class); + $queueFactory = $this->createMock(QueueProviderInterface::class); $queue = $this->createMock(QueueInterface::class); $queueFactory->expects($this->once())->method('get')->willReturn($queue); $collector = new QueueCollector(); - $factory = new QueueFactoryInterfaceProxy($queueFactory, $collector); + $factory = new QueueProviderInterfaceProxy($queueFactory, $collector); - $this->assertInstanceOf(QueueDecorator::class, $factory->get()); + $this->assertInstanceOf(QueueDecorator::class, $factory->get('test')); } } diff --git a/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php b/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php new file mode 100644 index 00000000..24ca9096 --- /dev/null +++ b/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php @@ -0,0 +1,103 @@ + StubAdapter::class, + ], + ); + + $queue = $provider->get('channel1'); + + $this->assertInstanceOf(StubQueue::class, $queue); + $this->assertSame('channel1', $queue->getChannelName()); + $this->assertInstanceOf(StubAdapter::class, $queue->getAdapter()); + $this->assertTrue($provider->has('channel1')); + $this->assertFalse($provider->has('not-exist-channel')); + } + + public function testGetTwice(): void + { + $provider = new AdapterFactoryQueueProvider( + new StubQueue(), + [ + 'channel1' => StubAdapter::class, + ], + ); + + $queue1 = $provider->get('channel1'); + $queue2 = $provider->get('channel1'); + + $this->assertSame($queue1, $queue2); + } + + public function testGetNotExistChannel(): void + { + $provider = new AdapterFactoryQueueProvider( + new StubQueue(), + [ + 'channel1' => StubAdapter::class, + ], + ); + + $this->expectException(ChannelNotFoundException::class); + $this->expectExceptionMessage('Channel "not-exist-channel" not found.'); + $provider->get('not-exist-channel'); + } + + public function testInvalidQueueConfig(): void + { + $baseQueue = new StubQueue(); + $definitions = [ + 'channel1' => [ + 'class' => StubAdapter::class, + '__construct()' => 'hello', + ], + ]; + + $this->expectException(InvalidQueueConfigException::class); + $this->expectExceptionMessage( + 'Invalid definition: incorrect constructor arguments. Expected array, got string.' + ); + new AdapterFactoryQueueProvider($baseQueue, $definitions); + } + + public function testInvalidQueueConfigOnGet(): void + { + $provider = new AdapterFactoryQueueProvider( + new StubQueue(), + [ + 'channel1' => StubLoop::class, + ] + ); + + $this->expectException(InvalidQueueConfigException::class); + $this->expectExceptionMessage( + sprintf( + 'Adapter must implement "%s". For channel "channel1" got "%s" instead.', + AdapterInterface::class, + StubLoop::class, + ) + ); + $provider->get('channel1'); + } +} diff --git a/tests/Unit/Provider/CompositeQueueProviderTest.php b/tests/Unit/Provider/CompositeQueueProviderTest.php new file mode 100644 index 00000000..491017ee --- /dev/null +++ b/tests/Unit/Provider/CompositeQueueProviderTest.php @@ -0,0 +1,48 @@ + new StubAdapter()], + ), + new AdapterFactoryQueueProvider( + $queue, + ['channel2' => new StubAdapter()], + ), + ); + + $this->assertTrue($provider->has('channel1')); + $this->assertTrue($provider->has('channel2')); + $this->assertFalse($provider->has('channel3')); + + $this->assertSame('channel1', $provider->get('channel1')->getChannelName()); + $this->assertSame('channel2', $provider->get('channel2')->getChannelName()); + } + + public function testNotFound(): void + { + $provider = new CompositeQueueProvider( + new AdapterFactoryQueueProvider(new StubQueue('channel'), ['channel1' => new StubAdapter()]), + ); + + $this->expectException(ChannelNotFoundException::class); + $this->expectExceptionMessage('Channel "not-exists" not found.'); + $provider->get('not-exists'); + } +} diff --git a/tests/Unit/Provider/PrototypeQueueProviderTest.php b/tests/Unit/Provider/PrototypeQueueProviderTest.php new file mode 100644 index 00000000..601a0f24 --- /dev/null +++ b/tests/Unit/Provider/PrototypeQueueProviderTest.php @@ -0,0 +1,26 @@ +get('test-channel'); + + $this->assertInstanceOf(StubQueue::class, $queue); + $this->assertSame('test-channel', $queue->getChannelName()); + $this->assertTrue($provider->has('test-channel')); + $this->assertTrue($provider->has('yet-another-channel')); + } +} diff --git a/tests/Unit/QueueFactoryTest.php b/tests/Unit/QueueFactoryTest.php deleted file mode 100644 index a11de961..00000000 --- a/tests/Unit/QueueFactoryTest.php +++ /dev/null @@ -1,164 +0,0 @@ -createMock(QueueInterface::class); - $queue - ->expects(self::once()) - ->method('withAdapter') - ->willReturn($queue); - - $adapter = $this->createMock(AdapterInterface::class); - $adapter - ->expects(self::once()) - ->method('withChannel') - ->willReturn($adapter); - - $factory = new QueueFactory( - [], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - true, - $adapter - ); - - $factory->get('test'); - } - - public function testThrowExceptionOnEmptyAdapter(): void - { - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage( - 'Either $enableRuntimeChannelDefinition must be false, or $defaultAdapter should be provided.' - ); - - new QueueFactory( - [], - $this->createMock(QueueInterface::class), - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - true - ); - } - - public function testThrowExceptionOnEmptyDefinition(): void - { - try { - $queue = $this->createMock(QueueInterface::class); - $factory = new QueueFactory( - [], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - false - ); - - $factory->get('test'); - } catch (ChannelNotConfiguredException $exception) { - self::assertSame($exception::class, ChannelNotConfiguredException::class); - self::assertSame($exception->getName(), 'Queue channel is not properly configured'); - $this->assertMatchesRegularExpression('/"test"/', $exception->getSolution()); - } - } - - public function testThrowExceptionOnIncorrectDefinition(): void - { - try { - $queue = $this->createMock(QueueInterface::class); - $factory = new QueueFactory( - ['test' => new stdClass()], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - false - ); - - $factory->get('test'); - } catch (ChannelIncorrectlyConfigured $exception) { - self::assertSame($exception::class, ChannelIncorrectlyConfigured::class); - self::assertSame($exception->getName(), 'Incorrect queue channel configuration'); - $this->assertMatchesRegularExpression('/"test"/', $exception->getSolution()); - } - } - - public function testSuccessfulDefinitionWithDefaultAdapter(): void - { - $adapterDefault = $this->createMock(AdapterInterface::class); - $adapterDefault->method('withChannel')->willReturn($adapterDefault); - - $adapterNew = $this->createMock(AdapterInterface::class); - $adapterNew->method('withChannel')->willReturn($adapterNew); - - $queue = $this->createMock(QueueInterface::class); - $queue - ->expects(self::once()) - ->method('withAdapter') - ->with($adapterNew) - ->willReturn($queue); - - $factory = new QueueFactory( - ['test' => $adapterNew], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - false, - $adapterDefault - ); - - $factory->get('test'); - } - - public function testSuccessfulDefinitionWithoutDefaultAdapter(): void - { - $adapterNew = $this->createMock(AdapterInterface::class); - $adapterNew->method('withChannel')->willReturn($adapterNew); - - $queue = $this->createMock(QueueInterface::class); - $queue - ->expects(self::once()) - ->method('withAdapter') - ->with($adapterNew) - ->willReturn($queue); - - $factory = new QueueFactory( - ['test' => $adapterNew], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - false - ); - - $factory->get('test'); - } - - private function getContainer(array $instances = []): ContainerInterface - { - return new SimpleContainer($instances); - } -} diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 80403883..1bfe8001 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -131,7 +131,7 @@ public function testAdapterNotConfiguredExceptionForRun(): void public function testRunWithSignalLoop(): void { if (!extension_loaded('pcntl')) { - self::markTestSkipped('pcntl is not installed'); + $this->markTestSkipped('This rest requires PCNTL extension'); } $this->loop = new SignalLoop(); diff --git a/tests/Unit/Stubs/StubAdapterTest.php b/tests/Unit/Stubs/StubAdapterTest.php new file mode 100644 index 00000000..b7337967 --- /dev/null +++ b/tests/Unit/Stubs/StubAdapterTest.php @@ -0,0 +1,24 @@ +assertSame($message, $adapter->push($message)); + $this->assertTrue($adapter->status('test')->isDone()); + $this->assertNotSame($adapter, $adapter->withChannel('test')); + $adapter->runExisting(static fn() => null); + $adapter->subscribe(static fn() => null); + } +} diff --git a/tests/Unit/Stubs/StubLoopTest.php b/tests/Unit/Stubs/StubLoopTest.php new file mode 100644 index 00000000..c78d3af7 --- /dev/null +++ b/tests/Unit/Stubs/StubLoopTest.php @@ -0,0 +1,26 @@ + [true]; + yield 'false' => [false]; + } + + #[DataProvider('dataBase')] + public function testBase(bool $canContinue): void + { + $loop = new StubLoop($canContinue); + + $this->assertSame($canContinue, $loop->canContinue()); + } +} diff --git a/tests/Unit/Stubs/StubQueueTest.php b/tests/Unit/Stubs/StubQueueTest.php new file mode 100644 index 00000000..29efe050 --- /dev/null +++ b/tests/Unit/Stubs/StubQueueTest.php @@ -0,0 +1,48 @@ +assertSame($message, $queue->push($message)); + $this->assertSame(0, $queue->run()); + $this->assertTrue($queue->status('test')->isDone()); + $this->assertSame(QueueInterface::DEFAULT_CHANNEL_NAME, $queue->getChannelName()); + $this->assertNull($queue->getAdapter()); + $queue->listen(); + } + + public function testWithAdapter(): void + { + $sourceQueue = new StubQueue(); + + $queue = $sourceQueue->withAdapter(new StubAdapter()); + + $this->assertNotSame($queue, $sourceQueue); + $this->assertInstanceOf(StubAdapter::class, $queue->getAdapter()); + } + + public function testWithChannelName(): void + { + $sourceQueue = new StubQueue(); + + $queue = $sourceQueue->withChannelName('test'); + + $this->assertNotSame($queue, $sourceQueue); + $this->assertSame(QueueInterface::DEFAULT_CHANNEL_NAME, $sourceQueue->getChannelName()); + $this->assertSame('test', $queue->getChannelName()); + } +} diff --git a/tests/Unit/Stubs/StubWorkerTest.php b/tests/Unit/Stubs/StubWorkerTest.php new file mode 100644 index 00000000..e61021ec --- /dev/null +++ b/tests/Unit/Stubs/StubWorkerTest.php @@ -0,0 +1,27 @@ +process($sourceMessage, $this->createMock(QueueInterface::class)); + + $this->assertSame($sourceMessage, $message); + $this->assertSame('test', $message->getHandlerName()); + $this->assertSame(42, $message->getData()); + $this->assertSame([], $message->getMetadata()); + } +} diff --git a/tests/Unit/SynchronousAdapterTest.php b/tests/Unit/SynchronousAdapterTest.php index 31a43f50..6e913932 100644 --- a/tests/Unit/SynchronousAdapterTest.php +++ b/tests/Unit/SynchronousAdapterTest.php @@ -6,7 +6,7 @@ use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\Message; -use Yiisoft\Queue\QueueFactory; +use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Tests\TestCase; use Yiisoft\Queue\Message\IdEnvelope; @@ -51,7 +51,7 @@ public function testIdSetting(): void public function testWithSameChannel(): void { $adapter = $this->getAdapter(); - self::assertEquals($adapter, $adapter->withChannel(QueueFactory::DEFAULT_CHANNEL_NAME)); + self::assertEquals($adapter, $adapter->withChannel(QueueInterface::DEFAULT_CHANNEL_NAME)); } public function testWithAnotherChannel(): void