diff --git a/README.md b/README.md index 25f94f74..4914175e 100644 --- a/README.md +++ b/README.md @@ -28,15 +28,15 @@ The package could be installed with [Composer](https://getcomposer.org): composer require yiisoft/queue ``` -## Ready for yiisoft/config +## Ready for Yii Config If you are using [yiisoft/config](https://github.com/yiisoft/config), you'll find out this package has some defaults in the [`common`](config/di.php) and [`params`](config/params.php) configurations saving your time. Things you should change to start working with the queue: - Optionally: define default `\Yiisoft\Queue\Adapter\AdapterInterface` implementation. -- And/or define channel-specific `AdapterInterface` implementations in the `channel-definitions` params key to be used - with the [queue factory](#different-queue-channels). +- And/or define channel-specific `AdapterInterface` implementations in the `channel` params key to be used + with the [queue provider](#different-queue-channels). - Define [message handlers](docs/guide/worker.md#handler-format) in the `handlers` params key to be used with the `QueueWorker`. - Resolve other `\Yiisoft\Queue\Queue` dependencies (psr-compliant event dispatcher). @@ -53,14 +53,16 @@ Each queue task consists of two parts: `Yiisoft\Queue\Message\Message`. For more complex cases you should implement the interface by your own. 2. A message handler is a callable called by a `Yiisoft\Queue\Worker\Worker`. The handler handles each queue message. -For example, if you need to download and save a file, your message may look like the following: +For example, if you need to download and save a file, your message creation may look like the following: +- Message handler as the first parameter +- Message data as the second parameter ```php $data = [ 'url' => $url, 'destinationFile' => $filename, ]; -$message = new \Yiisoft\Queue\Message\Message('file-download', $data); +$message = new \Yiisoft\Queue\Message\Message(FileDownloader::class, $data); ``` Then you should push it to the queue: @@ -93,9 +95,8 @@ class FileDownloader The last thing we should do is to create a configuration for the `Yiisoft\Queue\Worker\Worker`: ```php -$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']]; $worker = new \Yiisoft\Queue\Worker\Worker( - $handlers, // Here it is + [], $logger, $injector, $container @@ -134,17 +135,47 @@ $status->isReserved(); $status->isDone(); ``` +## Custom handler names +### Custom handler names + +By default, when you push a message to the queue, the message handler name is the fully qualified class name of the handler. +This can be useful for most cases, but sometimes you may want to use a shorter name or arbitrary string as the handler name. +This can be useful when you want to reduce the amount of data being passed or when you communicate with external systems. + +To use a custom handler name before message push, you can pass it as the first argument `Message` when creating it: +```php +new Message('handler-name', $data); +``` + +To use a custom handler name on message consume, you should configure handler mapping for the `Worker` class: +```php +$worker = new \Yiisoft\Queue\Worker\Worker( + ['handler-name' => FooHandler::class], + $logger, + $injector, + $container +); +``` + ## Different queue channels -Often we need to push to different queue channels with an only application. There is the `QueueFactory` class to make -different `Queue` objects creation for different channels. With this factory channel-specific `Queue` creation is as -simple as +Often we need to push to different queue channels with an only application. There is the `QueueProviderInterface` +interface that provides different `Queue` objects creation for different channels. With implementation of this interface +channel-specific `Queue` creation is as simple as ```php -$queue = $factory->get('channel-name'); +$queue = $provider->get('channel-name'); ``` -The main usage strategy is with explicit definition of channel-specific adapters. Definitions are passed in +Out of the box, there are four implementations of the `QueueProviderInterface`: + +- `AdapterFactoryQueueProvider` +- `PrototypeQueueProvider` +- `CompositeQueueProvider` + +### `AdapterFactoryQueueProvider` + +Provider based on definition of channel-specific adapters. Definitions are passed in the `$definitions` constructor parameter of the factory, where keys are channel names and values are definitions for the [`Yiisoft\Factory\Factory`](https://github.com/yiisoft/factory). Below are some examples: @@ -163,19 +194,19 @@ use Yiisoft\Queue\Adapter\SynchronousAdapter; For more information about a definition formats available see the [factory](https://github.com/yiisoft/factory) documentation. -Another queue factory usage strategy is implicit adapter creation via `withChannel()` method call. To use this approach -you should pass some specific constructor parameters: - -- `true` to the `$enableRuntimeChannelDefinition` -- a default `AdapterInterface` implementation to the `$defaultAdapter`. +### `PrototypeQueueProvider` -In this case `$factory->get('channel-name')` call will be converted -to `$this->queue->withAdapter($this->defaultAdapter->withChannel($channel))`, when there is no explicit adapter definition -in the `$definitions`. +Queue provider that only changes the channel name of the base queue. It can be useful when your queues used the same +adapter. > Warning: This strategy is not recommended as it does not give you any protection against typos and mistakes > in channel names. +### `CompositeQueueProvider` + +This provider allows you to combine multiple providers into one. It will try to get a queue from each provider in the +order they are passed to the constructor. The first queue found will be returned. + ## Console execution The exact way of task execution depends on the adapter used. Most adapters can be run using diff --git a/composer.json b/composer.json index 664f5853..0a7ad970 100644 --- a/composer.json +++ b/composer.json @@ -41,6 +41,7 @@ "psr/log": "^2.0|^3.0", "symfony/console": "^5.4|^6.0", "yiisoft/definitions": "^1.0|^2.0|^3.0", + "yiisoft/factory": "dev-strict", "yiisoft/friendly-exception": "^1.0", "yiisoft/injector": "^1.0" }, @@ -60,7 +61,8 @@ }, "autoload": { "psr-4": { - "Yiisoft\\Queue\\": "src" + "Yiisoft\\Queue\\": "src", + "Yiisoft\\Queue\\Stubs\\": "stubs" } }, "autoload-dev": { diff --git a/config/di.php b/config/di.php index 3af1ad8b..b9b28033 100644 --- a/config/di.php +++ b/config/di.php @@ -19,9 +19,9 @@ use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush; use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPushInterface; use Yiisoft\Queue\Middleware\Push\PushMiddlewareDispatcher; +use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\Queue; -use Yiisoft\Queue\QueueFactory; -use Yiisoft\Queue\QueueFactoryInterface; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Worker\Worker as QueueWorker; use Yiisoft\Queue\Worker\WorkerInterface; @@ -29,6 +29,12 @@ /* @var array $params */ return [ + AdapterFactoryQueueProvider::class => [ + '__construct()' => [ + 'definitions' => $params['yiisoft/queue']['channels'], + ], + ], + QueueProviderInterface::class => AdapterFactoryQueueProvider::class, QueueWorker::class => [ 'class' => QueueWorker::class, '__construct()' => [$params['yiisoft/queue']['handlers']], @@ -39,10 +45,6 @@ ? $container->get(SignalLoop::class) : $container->get(SimpleLoop::class); }, - QueueFactoryInterface::class => QueueFactory::class, - QueueFactory::class => [ - '__construct()' => ['channelConfiguration' => $params['yiisoft/queue']['channel-definitions']], - ], QueueInterface::class => Queue::class, MiddlewareFactoryPushInterface::class => MiddlewareFactoryPush::class, MiddlewareFactoryConsumeInterface::class => MiddlewareFactoryConsume::class, @@ -59,12 +61,12 @@ MessageSerializerInterface::class => JsonMessageSerializer::class, RunCommand::class => [ '__construct()' => [ - 'channels' => array_keys($params['yiisoft/queue']['channel-definitions']), + 'channels' => array_keys($params['yiisoft/queue']['channels']), ], ], ListenAllCommand::class => [ '__construct()' => [ - 'channels' => array_keys($params['yiisoft/queue']['channel-definitions']), + 'channels' => array_keys($params['yiisoft/queue']['channels']), ], ], ]; diff --git a/config/params.php b/config/params.php index 1251c408..fc87d078 100644 --- a/config/params.php +++ b/config/params.php @@ -7,9 +7,10 @@ use Yiisoft\Queue\Command\ListenCommand; use Yiisoft\Queue\Command\RunCommand; use Yiisoft\Queue\Debug\QueueCollector; -use Yiisoft\Queue\Debug\QueueFactoryInterfaceProxy; +use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy; use Yiisoft\Queue\Debug\QueueWorkerInterfaceProxy; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; +use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Worker\WorkerInterface; return [ @@ -22,8 +23,8 @@ ], 'yiisoft/queue' => [ 'handlers' => [], - 'channel-definitions' => [ - QueueFactoryInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class, + 'channels' => [ + QueueInterface::DEFAULT_CHANNEL_NAME => AdapterInterface::class, ], 'middlewares-push' => [], 'middlewares-consume' => [], @@ -34,7 +35,7 @@ QueueCollector::class, ], 'trackedServices' => [ - QueueFactoryInterface::class => [QueueFactoryInterfaceProxy::class, QueueCollector::class], + QueueProviderInterface::class => [QueueProviderInterfaceProxy::class, QueueCollector::class], WorkerInterface::class => [QueueWorkerInterfaceProxy::class, QueueCollector::class], ], ], diff --git a/src/Adapter/SynchronousAdapter.php b/src/Adapter/SynchronousAdapter.php index e0430733..d2ee489b 100644 --- a/src/Adapter/SynchronousAdapter.php +++ b/src/Adapter/SynchronousAdapter.php @@ -7,7 +7,6 @@ use InvalidArgumentException; use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\MessageInterface; -use Yiisoft\Queue\QueueFactory; use Yiisoft\Queue\QueueInterface; use Yiisoft\Queue\Worker\WorkerInterface; use Yiisoft\Queue\Message\IdEnvelope; @@ -20,7 +19,7 @@ final class SynchronousAdapter implements AdapterInterface public function __construct( private WorkerInterface $worker, private QueueInterface $queue, - private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME, + private string $channel = QueueInterface::DEFAULT_CHANNEL_NAME, ) { } diff --git a/src/Command/ListenAllCommand.php b/src/Command/ListenAllCommand.php index e94a77fe..8be6dead 100644 --- a/src/Command/ListenAllCommand.php +++ b/src/Command/ListenAllCommand.php @@ -10,7 +10,7 @@ use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; use Yiisoft\Queue\Cli\LoopInterface; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; final class ListenAllCommand extends Command { @@ -20,8 +20,11 @@ final class ListenAllCommand extends Command 'Listens all configured queues by default in case you\'re using yiisoft/config. ' . 'Needs to be stopped manually.'; - public function __construct(private QueueFactoryInterface $queueFactory, private LoopInterface $loop, private array $channels) - { + public function __construct( + private readonly QueueProviderInterface $queueProvider, + private readonly LoopInterface $loop, + private readonly array $channels, + ) { parent::__construct(); } @@ -45,7 +48,7 @@ public function configure(): void 'm', InputOption::VALUE_REQUIRED, 'Maximum number of messages to process in each channel before switching to another channel. ' . - 'Default is 0 (no limits).', + 'Default is 0 (no limits).', 0, ); @@ -57,17 +60,17 @@ protected function execute(InputInterface $input, OutputInterface $output): int $queues = []; /** @var string $channel */ foreach ($input->getArgument('channel') as $channel) { - $queues[] = $this->queueFactory->get($channel); + $queues[] = $this->queueProvider->get($channel); } while ($this->loop->canContinue()) { $hasMessages = false; foreach ($queues as $queue) { - $hasMessages = $queue->run((int)$input->getOption('maximum')) > 0 || $hasMessages; + $hasMessages = $queue->run((int) $input->getOption('maximum')) > 0 || $hasMessages; } if (!$hasMessages) { - $pauseSeconds = (int)$input->getOption('pause'); + $pauseSeconds = (int) $input->getOption('pause'); if ($pauseSeconds < 0) { $pauseSeconds = 1; } diff --git a/src/Command/ListenCommand.php b/src/Command/ListenCommand.php index 21915c47..ecafb837 100644 --- a/src/Command/ListenCommand.php +++ b/src/Command/ListenCommand.php @@ -8,16 +8,17 @@ use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; -use Yiisoft\Queue\QueueFactory; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; +use Yiisoft\Queue\QueueInterface; final class ListenCommand extends Command { protected static $defaultName = 'queue:listen'; protected static $defaultDescription = 'Listens the queue and executes messages as they come. Needs to be stopped manually.'; - public function __construct(private QueueFactoryInterface $queueFactory) - { + public function __construct( + private readonly QueueProviderInterface $queueProvider + ) { parent::__construct(); } @@ -27,13 +28,13 @@ public function configure(): void 'channel', InputArgument::OPTIONAL, 'Queue channel name to connect to', - QueueFactory::DEFAULT_CHANNEL_NAME + QueueInterface::DEFAULT_CHANNEL_NAME, ); } protected function execute(InputInterface $input, OutputInterface $output): int { - $this->queueFactory + $this->queueProvider ->get($input->getArgument('channel')) ->listen(); diff --git a/src/Command/RunCommand.php b/src/Command/RunCommand.php index f01a9e17..a6bd3c18 100644 --- a/src/Command/RunCommand.php +++ b/src/Command/RunCommand.php @@ -9,7 +9,7 @@ use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Output\OutputInterface; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; final class RunCommand extends Command { @@ -17,8 +17,10 @@ final class RunCommand extends Command protected static $defaultDescription = 'Runs all the existing messages in the given queues. ' . 'Exits once messages are over.'; - public function __construct(private QueueFactoryInterface $queueFactory, private array $channels) - { + public function __construct( + private readonly QueueProviderInterface $queueProvider, + private readonly array $channels, + ) { parent::__construct(); } @@ -45,7 +47,7 @@ protected function execute(InputInterface $input, OutputInterface $output): int /** @var string $channel */ foreach ($input->getArgument('channel') as $channel) { $output->write("Processing channel $channel... "); - $count = $this->queueFactory + $count = $this->queueProvider ->get($channel) ->run((int)$input->getOption('maximum')); diff --git a/src/Debug/QueueFactoryInterfaceProxy.php b/src/Debug/QueueFactoryInterfaceProxy.php deleted file mode 100644 index cbfe4e76..00000000 --- a/src/Debug/QueueFactoryInterfaceProxy.php +++ /dev/null @@ -1,24 +0,0 @@ -queueFactory->get($channel); - - return new QueueDecorator($queue, $this->collector); - } -} diff --git a/src/Debug/QueueProviderInterfaceProxy.php b/src/Debug/QueueProviderInterfaceProxy.php new file mode 100644 index 00000000..5f00096e --- /dev/null +++ b/src/Debug/QueueProviderInterfaceProxy.php @@ -0,0 +1,28 @@ +queueProvider->get($channel); + return new QueueDecorator($queue, $this->collector); + } + + public function has(string $channel): bool + { + return $this->queueProvider->has($channel); + } +} diff --git a/src/Exception/AdapterConfiguration/AdapterNotConfiguredException.php b/src/Exception/AdapterConfiguration/AdapterNotConfiguredException.php index 487fa5ca..68ae754a 100644 --- a/src/Exception/AdapterConfiguration/AdapterNotConfiguredException.php +++ b/src/Exception/AdapterConfiguration/AdapterNotConfiguredException.php @@ -6,8 +6,8 @@ use RuntimeException; use Yiisoft\FriendlyException\FriendlyExceptionInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\Queue; -use Yiisoft\Queue\QueueFactory; class AdapterNotConfiguredException extends RuntimeException implements FriendlyExceptionInterface { @@ -21,7 +21,7 @@ public function getName(): string public function getSolution(): ?string { $queueClass = Queue::class; - $factoryClass = QueueFactory::class; + $queueProviderInterface = QueueProviderInterface::class; return <<channel = $channel; - parent::__construct($message, $code, $previous); - } - - public function getName(): string - { - return 'Incorrect queue channel configuration'; - } - - public function getSolution(): ?string - { - $factoryClass = QueueFactory::class; - - return <<channel" which is incorrectly configured. - Please take a look to the documentation for the $factoryClass "\$definitions" constructor parameter. - SOLUTION; - } -} diff --git a/src/Exception/AdapterConfiguration/ChannelNotConfiguredException.php b/src/Exception/AdapterConfiguration/ChannelNotConfiguredException.php deleted file mode 100644 index ae417d60..00000000 --- a/src/Exception/AdapterConfiguration/ChannelNotConfiguredException.php +++ /dev/null @@ -1,39 +0,0 @@ -channel = $channel; - parent::__construct($message, $code, $previous); - } - - public function getName(): string - { - return 'Queue channel is not properly configured'; - } - - public function getSolution(): ?string - { - $factoryClass = QueueFactory::class; - - return <<channel" creation is not configured in the $factoryClass. - Please take a look to the documentation for the $factoryClass constructor. - The most important parameters are "\$definitions" and "\$enableRuntimeChannelDefinition". - - SOLUTION; - } -} diff --git a/src/Message/EnvelopeTrait.php b/src/Message/EnvelopeTrait.php index cc52cf15..e8d19d65 100644 --- a/src/Message/EnvelopeTrait.php +++ b/src/Message/EnvelopeTrait.php @@ -8,6 +8,16 @@ trait EnvelopeTrait { private MessageInterface $message; + /** + * A mirror of {@see MessageInterface::fromData()} + */ + abstract public static function fromMessage(MessageInterface $message): self; + + public static function fromData(string $handlerName, mixed $data, array $metadata = []): MessageInterface + { + return self::fromMessage(Message::fromData($handlerName, $data, $metadata)); + } + public function getMessage(): MessageInterface { return $this->message; diff --git a/src/Message/JsonMessageSerializer.php b/src/Message/JsonMessageSerializer.php index 81a6220c..d9b3a92e 100644 --- a/src/Message/JsonMessageSerializer.php +++ b/src/Message/JsonMessageSerializer.php @@ -19,6 +19,11 @@ public function serialize(MessageInterface $message): string 'data' => $message->getData(), 'meta' => $message->getMetadata(), ]; + if (!isset($payload['meta']['message-class'])) { + $payload['meta']['message-class'] = $message instanceof EnvelopeInterface + ? $message->getMessage()::class + : $message::class; + } return json_encode($payload, JSON_THROW_ON_ERROR); } @@ -34,25 +39,38 @@ public function unserialize(string $value): MessageInterface throw new InvalidArgumentException('Payload must be array. Got ' . get_debug_type($payload) . '.'); } + $name = $payload['name'] ?? null; + if (!isset($name) || !is_string($name)) { + throw new InvalidArgumentException('Handler name must be a string. Got ' . get_debug_type($name) . '.'); + } + $meta = $payload['meta'] ?? []; if (!is_array($meta)) { - throw new InvalidArgumentException('Metadata must be array. Got ' . get_debug_type($meta) . '.'); + throw new InvalidArgumentException('Metadata must be an array. Got ' . get_debug_type($meta) . '.'); } - // TODO: will be removed later - $message = new Message($payload['name'] ?? '$name', $payload['data'] ?? null, $meta); - + $envelopes = []; if (isset($meta[EnvelopeInterface::ENVELOPE_STACK_KEY]) && is_array($meta[EnvelopeInterface::ENVELOPE_STACK_KEY])) { - $message = $message->withMetadata( - array_merge($message->getMetadata(), [EnvelopeInterface::ENVELOPE_STACK_KEY => []]), - ); - foreach ($meta[EnvelopeInterface::ENVELOPE_STACK_KEY] as $envelope) { - if (is_string($envelope) && class_exists($envelope) && is_subclass_of($envelope, EnvelopeInterface::class)) { - $message = $envelope::fromMessage($message); - } - } + $envelopes = $meta[EnvelopeInterface::ENVELOPE_STACK_KEY]; } + $meta[EnvelopeInterface::ENVELOPE_STACK_KEY] = []; + $class = $payload['meta']['message-class'] ?? Message::class; + // Don't check subclasses when it's a default class: that's faster + if ($class !== Message::class && !is_subclass_of($class, MessageInterface::class)) { + $class = Message::class; + } + + /** + * @var class-string $class + */ + $message = $class::fromData($name, $payload['data'] ?? null, $meta); + + foreach ($envelopes as $envelope) { + if (is_string($envelope) && class_exists($envelope) && is_subclass_of($envelope, EnvelopeInterface::class)) { + $message = $envelope::fromMessage($message); + } + } return $message; } diff --git a/src/Message/Message.php b/src/Message/Message.php index a414ffb0..ab85d069 100644 --- a/src/Message/Message.php +++ b/src/Message/Message.php @@ -18,6 +18,11 @@ public function __construct( ) { } + public static function fromData(string $handlerName, mixed $data, array $metadata = []): MessageInterface + { + return new self($handlerName, $data, $metadata); + } + public function getHandlerName(): string { return $this->handlerName; diff --git a/src/Message/MessageHandlerInterface.php b/src/Message/MessageHandlerInterface.php new file mode 100644 index 00000000..7a8ab1f9 --- /dev/null +++ b/src/Message/MessageHandlerInterface.php @@ -0,0 +1,10 @@ + + */ + private array $queues = []; + + private readonly StrictFactory $factory; + + /** + * @param QueueInterface $baseQueue Base queue for queues creation. + * @param array $definitions Adapter definitions indexed by channel names. + * @param ContainerInterface|null $container Container to use for dependencies resolving. + * @param bool $validate If definitions should be validated when set. + * + * @psalm-param array $definitions + * @throws InvalidQueueConfigException + */ + public function __construct( + private readonly QueueInterface $baseQueue, + array $definitions, + ?ContainerInterface $container = null, + bool $validate = true, + ) { + try { + $this->factory = new StrictFactory($definitions, $container, $validate); + } catch (InvalidConfigException $exception) { + throw new InvalidQueueConfigException($exception->getMessage(), previous: $exception); + } + } + + public function get(string $channel): QueueInterface + { + $queue = $this->getOrTryToCreate($channel); + if ($queue === null) { + throw new ChannelNotFoundException($channel); + } + return $queue; + } + + public function has(string $channel): bool + { + return $this->factory->has($channel); + } + + /** + * @throws InvalidQueueConfigException + */ + private function getOrTryToCreate(string $channel): QueueInterface|null + { + if (array_key_exists($channel, $this->queues)) { + return $this->queues[$channel]; + } + + if ($this->factory->has($channel)) { + $adapter = $this->factory->create($channel); + if (!$adapter instanceof AdapterInterface) { + throw new InvalidQueueConfigException( + sprintf( + 'Adapter must implement "%s". For channel "%s" got "%s" instead.', + AdapterInterface::class, + $channel, + get_debug_type($adapter), + ), + ); + } + $this->queues[$channel] = $this->baseQueue->withAdapter($adapter)->withChannelName($channel); + } else { + $this->queues[$channel] = null; + } + + return $this->queues[$channel]; + } +} diff --git a/src/Provider/ChannelNotFoundException.php b/src/Provider/ChannelNotFoundException.php new file mode 100644 index 00000000..2538cda7 --- /dev/null +++ b/src/Provider/ChannelNotFoundException.php @@ -0,0 +1,25 @@ +providers = $providers; + } + + public function get(string $channel): QueueInterface + { + foreach ($this->providers as $provider) { + if ($provider->has($channel)) { + return $provider->get($channel); + } + } + throw new ChannelNotFoundException($channel); + } + + public function has(string $channel): bool + { + foreach ($this->providers as $provider) { + if ($provider->has($channel)) { + return true; + } + } + return false; + } +} diff --git a/src/Provider/InvalidQueueConfigException.php b/src/Provider/InvalidQueueConfigException.php new file mode 100644 index 00000000..136fe746 --- /dev/null +++ b/src/Provider/InvalidQueueConfigException.php @@ -0,0 +1,14 @@ +baseQueue->withChannelName($channel); + } + + public function has(string $channel): bool + { + return true; + } +} diff --git a/src/Provider/QueueProviderException.php b/src/Provider/QueueProviderException.php new file mode 100644 index 00000000..17fa3bc4 --- /dev/null +++ b/src/Provider/QueueProviderException.php @@ -0,0 +1,14 @@ +middlewareDefinitions = $middlewareDefinitions; @@ -140,7 +140,7 @@ public function withChannelName(string $channel): self { $instance = clone $this; $instance->channelName = $channel; - + $instance->adapter = $this->adapter?->withChannel($channel); return $instance; } diff --git a/src/QueueFactory.php b/src/QueueFactory.php deleted file mode 100644 index e6b963d0..00000000 --- a/src/QueueFactory.php +++ /dev/null @@ -1,192 +0,0 @@ - $channelConfiguration Configuration array in [channel_name => definition] format. - * "Definition" here is a {@see Factory} definition - * @param QueueInterface $queue A default queue implementation. `$queue->withAdapter()` will be returned - * with the `get` method - * @param bool $enableRuntimeChannelDefinition A flag whether to enable a such behavior when there is no - * explicit channel adapter definition: `return $this->queue->withAdapter($this->adapter->withChannel($channel)` - * When this flag is set to false, only explicit definitions from the $definition parameter are used. - * @param AdapterInterface|null $defaultAdapter A default adapter implementation. - * It must be set when $enableRuntimeChannelDefinition is true. - */ - public function __construct( - private array $channelConfiguration, - private QueueInterface $queue, - private ContainerInterface $container, - private CallableFactory $callableFactory, - private Injector $injector, - private bool $enableRuntimeChannelDefinition = false, - private ?AdapterInterface $defaultAdapter = null, - ) { - if ($enableRuntimeChannelDefinition === true && $defaultAdapter === null) { - $message = 'Either $enableRuntimeChannelDefinition must be false, or $defaultAdapter should be provided.'; - - throw new InvalidArgumentException($message); - } - } - - public function get(string $channel = self::DEFAULT_CHANNEL_NAME): QueueInterface - { - if ($channel === $this->queue->getChannelName()) { - return $this->queue; - } - - if (isset($this->queueCollection[$channel]) && $this->queueCollection[$channel]->get() !== null) { - $queue = $this->queueCollection[$channel]->get(); - } else { - $queue = $this->create($channel); - $this->queueCollection[$channel] = WeakReference::create($queue); - } - - return $queue; - } - - /** - * @throws ChannelIncorrectlyConfigured - * @return QueueInterface - */ - private function create(string $channel): QueueInterface - { - if (isset($this->channelConfiguration[$channel])) { - $definition = $this->channelConfiguration[$channel]; - $this->checkDefinitionType($channel, $definition); - $adapter = $this->createFromDefinition($channel, $definition)->withChannel($channel); - - return $this->queue->withChannelName($channel)->withAdapter($adapter); - } - - if ($this->enableRuntimeChannelDefinition === false) { - throw new ChannelNotConfiguredException($channel); - } - - /** @psalm-suppress PossiblyNullReference */ - return $this->queue->withChannelName($channel)->withAdapter($this->defaultAdapter->withChannel($channel)); - } - - private function checkDefinitionType(string $channel, mixed $definition): void - { - if ( - !$definition instanceof AdapterInterface - && !is_array($definition) - && !is_callable($definition) - && !is_string($definition) - ) { - throw new ChannelIncorrectlyConfigured($channel, $definition); - } - } - - public function createFromDefinition( - string $channel, - AdapterInterface|callable|array|string $definition - ): AdapterInterface { - if ($definition instanceof AdapterInterface) { - return $definition; - } - - if (is_string($definition)) { - return $this->getFromContainer($channel, $definition); - } - - return $this->tryGetFromCallable($channel, $definition) - ?? $this->tryGetFromArrayDefinition($channel, $definition) - ?? throw new ChannelIncorrectlyConfigured($channel, $definition); - } - - private function getFromContainer(string $channel, string $definition): AdapterInterface - { - if (class_exists($definition)) { - if (is_subclass_of($definition, AdapterInterface::class)) { - /** @var AdapterInterface */ - return $this->container->get($definition); - } - } elseif ($this->container->has($definition)) { - $middleware = $this->container->get($definition); - if ($middleware instanceof AdapterInterface) { - return $middleware; - } - } - - throw new ChannelIncorrectlyConfigured($channel, $definition); - } - - private function tryGetFromCallable( - string $channel, - callable|AdapterInterface|array|string $definition - ): ?AdapterInterface { - $callable = null; - - if ($definition instanceof Closure) { - $callable = $definition; - } - - if ( - is_array($definition) - && array_keys($definition) === [0, 1] - ) { - try { - $callable = $this->callableFactory->create($definition); - } catch (InvalidCallableConfigurationException $exception) { - throw new ChannelIncorrectlyConfigured($channel, $definition, previous: $exception); - } - } - - if ($callable !== null) { - $adapter = $this->injector->invoke($callable); - - if (!$adapter instanceof AdapterInterface) { - throw new ChannelIncorrectlyConfigured($channel, $definition); - } - } - - return null; - } - - private function tryGetFromArrayDefinition( - string $channel, - callable|AdapterInterface|array|string $definition - ): ?AdapterInterface { - if (!is_array($definition)) { - return null; - } - - try { - DefinitionValidator::validateArrayDefinition($definition); - - $middleware = ArrayDefinition::fromConfig($definition)->resolve($this->container); - if ($middleware instanceof AdapterInterface) { - return $middleware; - } - - throw new ChannelIncorrectlyConfigured($channel, $definition); - } catch (InvalidConfigException) { - } - - throw new ChannelIncorrectlyConfigured($channel, $definition); - } -} diff --git a/src/QueueFactoryInterface.php b/src/QueueFactoryInterface.php deleted file mode 100644 index 9761e583..00000000 --- a/src/QueueFactoryInterface.php +++ /dev/null @@ -1,18 +0,0 @@ -handlersCached)) { + $definition = $this->handlers[$name] ?? null; + if ($definition === null && $this->container->has($name)) { + $handler = $this->container->get($name); + if ($handler instanceof MessageHandlerInterface) { + $this->handlersCached[$name] = $handler->handle(...); + + return $this->handlersCached[$name]; + } + + return null; + } + $this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null); } diff --git a/stubs/StubAdapter.php b/stubs/StubAdapter.php new file mode 100644 index 00000000..b47c850b --- /dev/null +++ b/stubs/StubAdapter.php @@ -0,0 +1,38 @@ +canContinue; + } +} diff --git a/stubs/StubQueue.php b/stubs/StubQueue.php new file mode 100644 index 00000000..8436b46a --- /dev/null +++ b/stubs/StubQueue.php @@ -0,0 +1,71 @@ +adapter; + } + + public function withAdapter(AdapterInterface $adapter): QueueInterface + { + $new = clone $this; + $new->adapter = $adapter; + return $new; + } + + public function getChannelName(): string + { + return $this->channelName; + } + + public function withChannelName(string $channel): QueueInterface + { + $new = clone $this; + $new->channelName = $channel; + if ($new->adapter !== null) { + $new->adapter = $new->adapter->withChannel($channel); + } + return $new; + } +} diff --git a/stubs/StubWorker.php b/stubs/StubWorker.php new file mode 100644 index 00000000..7a939006 --- /dev/null +++ b/stubs/StubWorker.php @@ -0,0 +1,20 @@ +assertEquals($messages, $this->messagesProcessed); $this->assertEquals($messages, $this->messagesProcessedSecond); } + + public function testMessagesConsumedByHandlerClass(): void + { + $handler = new TestHandler(); + $container = $this->createMock(ContainerInterface::class); + $container->method('get')->with(TestHandler::class)->willReturn($handler); + $container->method('has')->with(TestHandler::class)->willReturn(true); + $worker = new Worker( + [], + new NullLogger(), + new Injector($container), + $container, + new ConsumeMiddlewareDispatcher($this->createMock(MiddlewareFactoryConsumeInterface::class)), + new FailureMiddlewareDispatcher($this->createMock(MiddlewareFactoryFailureInterface::class), []) + ); + + $messages = [1, 'foo', 'bar-baz']; + foreach ($messages as $message) { + $worker->process(new Message(TestHandler::class, $message), $this->getQueue()); + } + + $this->assertEquals($messages, $handler->messagesProcessed); + } } diff --git a/tests/Integration/QueueFactoryTest.php b/tests/Integration/QueueFactoryTest.php deleted file mode 100644 index 50076896..00000000 --- a/tests/Integration/QueueFactoryTest.php +++ /dev/null @@ -1,82 +0,0 @@ -createMock(WorkerInterface::class); - $queue = $this->getDefaultQueue($worker); - $container = $this->createMock(ContainerInterface::class); - $factory = new QueueFactory( - [], - $queue, - $container, - new CallableFactory($container), - new Injector($container), - true, - new SynchronousAdapter($worker, $queue) - ); - - $adapter = $factory->get('test-channel'); - - self::assertEquals('test-channel', $adapter->getChannelName()); - } - - public function testConfiguredChange(): void - { - $worker = $this->createMock(WorkerInterface::class); - $queue = $this->getDefaultQueue($worker); - $container = $this->createMock(ContainerInterface::class); - $factory = new QueueFactory( - [ - 'test-channel' => [ - 'class' => FakeAdapter::class, - 'withChannel()' => ['test-channel'], - ], - QueueFactoryInterface::DEFAULT_CHANNEL_NAME => [ - 'class' => FakeAdapter::class, - 'withChannel()' => [QueueFactoryInterface::DEFAULT_CHANNEL_NAME], - ], - ], - $queue, - $container, - new CallableFactory($container), - new Injector($container), - true, - new SynchronousAdapter($worker, $queue) - ); - $queue = $factory->get('test-channel'); - - self::assertEquals('test-channel', $queue->getChannelName()); - self::assertEquals(QueueFactoryInterface::DEFAULT_CHANNEL_NAME, $factory->get()->getChannelName()); - } - - private function getDefaultQueue(WorkerInterface $worker): Queue - { - return new Queue( - $worker, - $this->createMock(LoopInterface::class), - $this->createMock(LoggerInterface::class), - new PushMiddlewareDispatcher($this->createMock(MiddlewareFactoryPushInterface::class)), - ); - } -} diff --git a/tests/Integration/Support/TestHandler.php b/tests/Integration/Support/TestHandler.php new file mode 100644 index 00000000..ce71f7fb --- /dev/null +++ b/tests/Integration/Support/TestHandler.php @@ -0,0 +1,20 @@ +messagesProcessed[] = $message->getData(); + } +} diff --git a/tests/Unit/Command/ListenCommandTest.php b/tests/Unit/Command/ListenCommandTest.php index f4a337a4..2e2971ea 100644 --- a/tests/Unit/Command/ListenCommandTest.php +++ b/tests/Unit/Command/ListenCommandTest.php @@ -8,14 +8,14 @@ use Symfony\Component\Console\Input\StringInput; use Symfony\Component\Console\Output\OutputInterface; use Yiisoft\Queue\Command\ListenCommand; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\QueueInterface; final class ListenCommandTest extends TestCase { public function testConfigure(): void { - $command = new ListenCommand($this->createMock(QueueFactoryInterface::class)); + $command = new ListenCommand($this->createMock(QueueProviderInterface::class)); $channelArgument = $command->getNativeDefinition()->getArgument('channel'); $this->assertEquals('channel', $channelArgument->getName()); } @@ -24,7 +24,7 @@ public function testExecute(): void { $queue = $this->createMock(QueueInterface::class); $queue->expects($this->once())->method('listen'); - $queueFactory = $this->createMock(QueueFactoryInterface::class); + $queueFactory = $this->createMock(QueueProviderInterface::class); $queueFactory->method('get')->willReturn($queue); $input = new StringInput('channel'); diff --git a/tests/Unit/Command/RunCommandTest.php b/tests/Unit/Command/RunCommandTest.php index fb70914a..3e38305e 100644 --- a/tests/Unit/Command/RunCommandTest.php +++ b/tests/Unit/Command/RunCommandTest.php @@ -8,14 +8,14 @@ use Symfony\Component\Console\Input\StringInput; use Symfony\Component\Console\Output\OutputInterface; use Yiisoft\Queue\Command\RunCommand; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\QueueInterface; final class RunCommandTest extends TestCase { public function testConfigure(): void { - $command = new RunCommand($this->createMock(QueueFactoryInterface::class), []); + $command = new RunCommand($this->createMock(QueueProviderInterface::class), []); $channelArgument = $command->getNativeDefinition()->getArgument('channel'); $this->assertEquals('channel', $channelArgument->getName()); } @@ -24,11 +24,11 @@ public function testExecute(): void { $queue = $this->createMock(QueueInterface::class); $queue->expects($this->once())->method('run'); - $queueFactory = $this->createMock(QueueFactoryInterface::class); - $queueFactory->method('get')->willReturn($queue); + $queueProvider = $this->createMock(QueueProviderInterface::class); + $queueProvider->method('get')->willReturn($queue); $input = new StringInput('channel'); - $command = new RunCommand($queueFactory, []); + $command = new RunCommand($queueProvider, []); $exitCode = $command->run($input, $this->createMock(OutputInterface::class)); $this->assertEquals(0, $exitCode); diff --git a/tests/Unit/Debug/QueueFactoryInterfaceProxyTest.php b/tests/Unit/Debug/QueueProviderInterfaceProxyTest.php similarity index 61% rename from tests/Unit/Debug/QueueFactoryInterfaceProxyTest.php rename to tests/Unit/Debug/QueueProviderInterfaceProxyTest.php index d8534aa3..2bde59c5 100644 --- a/tests/Unit/Debug/QueueFactoryInterfaceProxyTest.php +++ b/tests/Unit/Debug/QueueProviderInterfaceProxyTest.php @@ -7,20 +7,20 @@ use PHPUnit\Framework\TestCase; use Yiisoft\Queue\Debug\QueueCollector; use Yiisoft\Queue\Debug\QueueDecorator; -use Yiisoft\Queue\Debug\QueueFactoryInterfaceProxy; -use Yiisoft\Queue\QueueFactoryInterface; +use Yiisoft\Queue\Debug\QueueProviderInterfaceProxy; +use Yiisoft\Queue\Provider\QueueProviderInterface; use Yiisoft\Queue\QueueInterface; -class QueueFactoryInterfaceProxyTest extends TestCase +class QueueProviderInterfaceProxyTest extends TestCase { public function testGet(): void { - $queueFactory = $this->createMock(QueueFactoryInterface::class); + $queueFactory = $this->createMock(QueueProviderInterface::class); $queue = $this->createMock(QueueInterface::class); $queueFactory->expects($this->once())->method('get')->willReturn($queue); $collector = new QueueCollector(); - $factory = new QueueFactoryInterfaceProxy($queueFactory, $collector); + $factory = new QueueProviderInterfaceProxy($queueFactory, $collector); - $this->assertInstanceOf(QueueDecorator::class, $factory->get()); + $this->assertInstanceOf(QueueDecorator::class, $factory->get('test')); } } diff --git a/tests/Unit/Message/JsonMessageSerializerTest.php b/tests/Unit/Message/JsonMessageSerializerTest.php index 776a9835..18bc5ed9 100644 --- a/tests/Unit/Message/JsonMessageSerializerTest.php +++ b/tests/Unit/Message/JsonMessageSerializerTest.php @@ -11,6 +11,7 @@ use Yiisoft\Queue\Message\JsonMessageSerializer; use Yiisoft\Queue\Message\Message; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Tests\Unit\Support\TestMessage; /** * Testing message serialization options @@ -42,10 +43,10 @@ public static function dataUnsupportedPayloadFormat(): iterable */ public function testMetadataFormat(mixed $meta): void { - $payload = ['data' => 'test', 'meta' => $meta]; + $payload = ['name' => 'handler', 'data' => 'test', 'meta' => $meta]; $serializer = $this->createSerializer(); - $this->expectExceptionMessage(sprintf('Metadata must be array. Got %s.', get_debug_type($meta))); + $this->expectExceptionMessage(sprintf('Metadata must be an array. Got %s.', get_debug_type($meta))); $this->expectException(InvalidArgumentException::class); $serializer->unserialize(json_encode($payload)); } @@ -59,31 +60,32 @@ public static function dataUnsupportedMetadataFormat(): iterable public function testUnserializeFromData(): void { - $payload = ['data' => 'test']; + $payload = ['name' => 'handler', 'data' => 'test']; $serializer = $this->createSerializer(); $message = $serializer->unserialize(json_encode($payload)); $this->assertInstanceOf(MessageInterface::class, $message); $this->assertEquals($payload['data'], $message->getData()); - $this->assertEquals([], $message->getMetadata()); + $this->assertEquals([EnvelopeInterface::ENVELOPE_STACK_KEY => []], $message->getMetadata()); } public function testUnserializeWithMetadata(): void { - $payload = ['data' => 'test', 'meta' => ['int' => 1, 'str' => 'string', 'bool' => true]]; + $payload = ['name' => 'handler', 'data' => 'test', 'meta' => ['int' => 1, 'str' => 'string', 'bool' => true]]; $serializer = $this->createSerializer(); $message = $serializer->unserialize(json_encode($payload)); $this->assertInstanceOf(MessageInterface::class, $message); $this->assertEquals($payload['data'], $message->getData()); - $this->assertEquals(['int' => 1, 'str' => 'string', 'bool' => true], $message->getMetadata()); + $this->assertEquals(['int' => 1, 'str' => 'string', 'bool' => true, EnvelopeInterface::ENVELOPE_STACK_KEY => []], $message->getMetadata()); } public function testUnserializeEnvelopeStack(): void { $payload = [ + 'name' => 'handler', 'data' => 'test', 'meta' => [ EnvelopeInterface::ENVELOPE_STACK_KEY => [ @@ -113,7 +115,7 @@ public function testSerialize(): void $json = $serializer->serialize($message); $this->assertEquals( - '{"name":"handler","data":"test","meta":[]}', + '{"name":"handler","data":"test","meta":{"message-class":"Yiisoft\\\\Queue\\\\Message\\\\Message"}}', $json, ); } @@ -129,9 +131,10 @@ public function testSerializeEnvelopeStack(): void $this->assertEquals( sprintf( - '{"name":"handler","data":"test","meta":{"envelopes":["%s"],"%s":"test-id"}}', + '{"name":"handler","data":"test","meta":{"envelopes":["%s"],"%s":"test-id","message-class":"%s"}}', str_replace('\\', '\\\\', IdEnvelope::class), IdEnvelope::MESSAGE_ID_KEY, + str_replace('\\', '\\\\', Message::class), ), $json, ); @@ -145,14 +148,35 @@ public function testSerializeEnvelopeStack(): void IdEnvelope::class, ], IdEnvelope::MESSAGE_ID_KEY => 'test-id', + 'message-class' => Message::class, ], $message->getMetadata()); $this->assertEquals([ EnvelopeInterface::ENVELOPE_STACK_KEY => [], IdEnvelope::MESSAGE_ID_KEY => 'test-id', + 'message-class' => Message::class, ], $message->getMessage()->getMetadata()); } + public function testRestoreOriginalMessageClass(): void + { + $message = new TestMessage(); + $serializer = $this->createSerializer(); + $serializer->unserialize($serializer->serialize($message)); + + $this->assertInstanceOf(TestMessage::class, $message); + } + + public function testRestoreOriginalMessageClassWithEnvelope(): void + { + $message = new IdEnvelope(new TestMessage()); + $serializer = $this->createSerializer(); + $serializer->unserialize($serializer->serialize($message)); + + $this->assertInstanceOf(IdEnvelope::class, $message); + $this->assertInstanceOf(TestMessage::class, $message->getMessage()); + } + private function createSerializer(): JsonMessageSerializer { return new JsonMessageSerializer(); diff --git a/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php b/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php new file mode 100644 index 00000000..24ca9096 --- /dev/null +++ b/tests/Unit/Provider/AdapterFactoryQueueProviderTest.php @@ -0,0 +1,103 @@ + StubAdapter::class, + ], + ); + + $queue = $provider->get('channel1'); + + $this->assertInstanceOf(StubQueue::class, $queue); + $this->assertSame('channel1', $queue->getChannelName()); + $this->assertInstanceOf(StubAdapter::class, $queue->getAdapter()); + $this->assertTrue($provider->has('channel1')); + $this->assertFalse($provider->has('not-exist-channel')); + } + + public function testGetTwice(): void + { + $provider = new AdapterFactoryQueueProvider( + new StubQueue(), + [ + 'channel1' => StubAdapter::class, + ], + ); + + $queue1 = $provider->get('channel1'); + $queue2 = $provider->get('channel1'); + + $this->assertSame($queue1, $queue2); + } + + public function testGetNotExistChannel(): void + { + $provider = new AdapterFactoryQueueProvider( + new StubQueue(), + [ + 'channel1' => StubAdapter::class, + ], + ); + + $this->expectException(ChannelNotFoundException::class); + $this->expectExceptionMessage('Channel "not-exist-channel" not found.'); + $provider->get('not-exist-channel'); + } + + public function testInvalidQueueConfig(): void + { + $baseQueue = new StubQueue(); + $definitions = [ + 'channel1' => [ + 'class' => StubAdapter::class, + '__construct()' => 'hello', + ], + ]; + + $this->expectException(InvalidQueueConfigException::class); + $this->expectExceptionMessage( + 'Invalid definition: incorrect constructor arguments. Expected array, got string.' + ); + new AdapterFactoryQueueProvider($baseQueue, $definitions); + } + + public function testInvalidQueueConfigOnGet(): void + { + $provider = new AdapterFactoryQueueProvider( + new StubQueue(), + [ + 'channel1' => StubLoop::class, + ] + ); + + $this->expectException(InvalidQueueConfigException::class); + $this->expectExceptionMessage( + sprintf( + 'Adapter must implement "%s". For channel "channel1" got "%s" instead.', + AdapterInterface::class, + StubLoop::class, + ) + ); + $provider->get('channel1'); + } +} diff --git a/tests/Unit/Provider/CompositeQueueProviderTest.php b/tests/Unit/Provider/CompositeQueueProviderTest.php new file mode 100644 index 00000000..491017ee --- /dev/null +++ b/tests/Unit/Provider/CompositeQueueProviderTest.php @@ -0,0 +1,48 @@ + new StubAdapter()], + ), + new AdapterFactoryQueueProvider( + $queue, + ['channel2' => new StubAdapter()], + ), + ); + + $this->assertTrue($provider->has('channel1')); + $this->assertTrue($provider->has('channel2')); + $this->assertFalse($provider->has('channel3')); + + $this->assertSame('channel1', $provider->get('channel1')->getChannelName()); + $this->assertSame('channel2', $provider->get('channel2')->getChannelName()); + } + + public function testNotFound(): void + { + $provider = new CompositeQueueProvider( + new AdapterFactoryQueueProvider(new StubQueue('channel'), ['channel1' => new StubAdapter()]), + ); + + $this->expectException(ChannelNotFoundException::class); + $this->expectExceptionMessage('Channel "not-exists" not found.'); + $provider->get('not-exists'); + } +} diff --git a/tests/Unit/Provider/PrototypeQueueProviderTest.php b/tests/Unit/Provider/PrototypeQueueProviderTest.php new file mode 100644 index 00000000..601a0f24 --- /dev/null +++ b/tests/Unit/Provider/PrototypeQueueProviderTest.php @@ -0,0 +1,26 @@ +get('test-channel'); + + $this->assertInstanceOf(StubQueue::class, $queue); + $this->assertSame('test-channel', $queue->getChannelName()); + $this->assertTrue($provider->has('test-channel')); + $this->assertTrue($provider->has('yet-another-channel')); + } +} diff --git a/tests/Unit/QueueFactoryTest.php b/tests/Unit/QueueFactoryTest.php deleted file mode 100644 index a72f9751..00000000 --- a/tests/Unit/QueueFactoryTest.php +++ /dev/null @@ -1,178 +0,0 @@ -createMock(QueueInterface::class); - $queue - ->expects(self::once()) - ->method('withAdapter') - ->willReturn($queue); - $queue - ->expects(self::once()) - ->method('withChannelName') - ->willReturn($queue); - - $adapter = $this->createMock(AdapterInterface::class); - $adapter - ->expects(self::once()) - ->method('withChannel') - ->willReturn($adapter); - - $factory = new QueueFactory( - [], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - true, - $adapter - ); - - $factory->get('test'); - } - - public function testThrowExceptionOnEmptyAdapter(): void - { - $this->expectException(InvalidArgumentException::class); - $this->expectExceptionMessage( - 'Either $enableRuntimeChannelDefinition must be false, or $defaultAdapter should be provided.' - ); - - new QueueFactory( - [], - $this->createMock(QueueInterface::class), - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - true - ); - } - - public function testThrowExceptionOnEmptyDefinition(): void - { - try { - $queue = $this->createMock(QueueInterface::class); - $factory = new QueueFactory( - [], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - false - ); - - $factory->get('test'); - } catch (ChannelNotConfiguredException $exception) { - self::assertSame($exception::class, ChannelNotConfiguredException::class); - self::assertSame($exception->getName(), 'Queue channel is not properly configured'); - $this->assertMatchesRegularExpression('/"test"/', $exception->getSolution()); - } - } - - public function testThrowExceptionOnIncorrectDefinition(): void - { - try { - $queue = $this->createMock(QueueInterface::class); - $factory = new QueueFactory( - ['test' => new stdClass()], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - false - ); - - $factory->get('test'); - } catch (ChannelIncorrectlyConfigured $exception) { - self::assertSame($exception::class, ChannelIncorrectlyConfigured::class); - self::assertSame($exception->getName(), 'Incorrect queue channel configuration'); - $this->assertMatchesRegularExpression('/"test"/', $exception->getSolution()); - } - } - - public function testSuccessfulDefinitionWithDefaultAdapter(): void - { - $adapterDefault = $this->createMock(AdapterInterface::class); - $adapterDefault->method('withChannel')->willReturn($adapterDefault); - - $adapterNew = $this->createMock(AdapterInterface::class); - $adapterNew->method('withChannel')->willReturn($adapterNew); - - $queue = $this->createMock(QueueInterface::class); - $queue - ->expects(self::once()) - ->method('withAdapter') - ->with($adapterNew) - ->willReturn($queue); - $queue - ->expects(self::once()) - ->method('withChannelName') - ->with('test') - ->willReturn($queue); - - $factory = new QueueFactory( - ['test' => $adapterNew], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - false, - $adapterDefault - ); - - $factory->get('test'); - } - - public function testSuccessfulDefinitionWithoutDefaultAdapter(): void - { - $adapterNew = $this->createMock(AdapterInterface::class); - $adapterNew->method('withChannel')->willReturn($adapterNew); - - $queue = $this->createMock(QueueInterface::class); - $queue - ->expects(self::once()) - ->method('withAdapter') - ->with($adapterNew) - ->willReturn($queue); - $queue - ->expects(self::once()) - ->method('withChannelName') - ->with('test') - ->willReturn($queue); - - $factory = new QueueFactory( - ['test' => $adapterNew], - $queue, - $this->getContainer(), - new CallableFactory($this->getContainer()), - new Injector($this->getContainer()), - false - ); - - $factory->get('test'); - } - - private function getContainer(array $instances = []): ContainerInterface - { - return new SimpleContainer($instances); - } -} diff --git a/tests/Unit/QueueTest.php b/tests/Unit/QueueTest.php index 7a477e66..1bfe8001 100644 --- a/tests/Unit/QueueTest.php +++ b/tests/Unit/QueueTest.php @@ -130,6 +130,10 @@ public function testAdapterNotConfiguredExceptionForRun(): void public function testRunWithSignalLoop(): void { + if (!extension_loaded('pcntl')) { + $this->markTestSkipped('This rest requires PCNTL extension'); + } + $this->loop = new SignalLoop(); $queue = $this ->getQueue() diff --git a/tests/Unit/Stubs/StubAdapterTest.php b/tests/Unit/Stubs/StubAdapterTest.php new file mode 100644 index 00000000..b7337967 --- /dev/null +++ b/tests/Unit/Stubs/StubAdapterTest.php @@ -0,0 +1,24 @@ +assertSame($message, $adapter->push($message)); + $this->assertTrue($adapter->status('test')->isDone()); + $this->assertNotSame($adapter, $adapter->withChannel('test')); + $adapter->runExisting(static fn() => null); + $adapter->subscribe(static fn() => null); + } +} diff --git a/tests/Unit/Stubs/StubLoopTest.php b/tests/Unit/Stubs/StubLoopTest.php new file mode 100644 index 00000000..c78d3af7 --- /dev/null +++ b/tests/Unit/Stubs/StubLoopTest.php @@ -0,0 +1,26 @@ + [true]; + yield 'false' => [false]; + } + + #[DataProvider('dataBase')] + public function testBase(bool $canContinue): void + { + $loop = new StubLoop($canContinue); + + $this->assertSame($canContinue, $loop->canContinue()); + } +} diff --git a/tests/Unit/Stubs/StubQueueTest.php b/tests/Unit/Stubs/StubQueueTest.php new file mode 100644 index 00000000..29efe050 --- /dev/null +++ b/tests/Unit/Stubs/StubQueueTest.php @@ -0,0 +1,48 @@ +assertSame($message, $queue->push($message)); + $this->assertSame(0, $queue->run()); + $this->assertTrue($queue->status('test')->isDone()); + $this->assertSame(QueueInterface::DEFAULT_CHANNEL_NAME, $queue->getChannelName()); + $this->assertNull($queue->getAdapter()); + $queue->listen(); + } + + public function testWithAdapter(): void + { + $sourceQueue = new StubQueue(); + + $queue = $sourceQueue->withAdapter(new StubAdapter()); + + $this->assertNotSame($queue, $sourceQueue); + $this->assertInstanceOf(StubAdapter::class, $queue->getAdapter()); + } + + public function testWithChannelName(): void + { + $sourceQueue = new StubQueue(); + + $queue = $sourceQueue->withChannelName('test'); + + $this->assertNotSame($queue, $sourceQueue); + $this->assertSame(QueueInterface::DEFAULT_CHANNEL_NAME, $sourceQueue->getChannelName()); + $this->assertSame('test', $queue->getChannelName()); + } +} diff --git a/tests/Unit/Stubs/StubWorkerTest.php b/tests/Unit/Stubs/StubWorkerTest.php new file mode 100644 index 00000000..e61021ec --- /dev/null +++ b/tests/Unit/Stubs/StubWorkerTest.php @@ -0,0 +1,27 @@ +process($sourceMessage, $this->createMock(QueueInterface::class)); + + $this->assertSame($sourceMessage, $message); + $this->assertSame('test', $message->getHandlerName()); + $this->assertSame(42, $message->getData()); + $this->assertSame([], $message->getMetadata()); + } +} diff --git a/tests/Unit/Support/TestMessage.php b/tests/Unit/Support/TestMessage.php new file mode 100644 index 00000000..a4c0ab8d --- /dev/null +++ b/tests/Unit/Support/TestMessage.php @@ -0,0 +1,30 @@ +getAdapter(); - self::assertEquals($adapter, $adapter->withChannel(QueueFactory::DEFAULT_CHANNEL_NAME)); + self::assertEquals($adapter, $adapter->withChannel(QueueInterface::DEFAULT_CHANNEL_NAME)); } public function testWithAnotherChannel(): void