Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor handlers #182

Closed
wants to merge 11 commits into from
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ $data = [
'url' => $url,
'destinationFile' => $filename,
];
$message = new \Yiisoft\Queue\Message\Message('file-download', $data);
$message = new \Yiisoft\Queue\Message\Message(FileDownloader::class, $data);
```

Then you should push it to the queue:
Expand All @@ -80,7 +80,10 @@ $queue->push($message);
Its handler may look like the following:

```php
class FileDownloader
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageHandlerInterface;

class FileDownloader implements MessageHandlerInterface
{
private string $absolutePath;

Expand All @@ -89,21 +92,21 @@ class FileDownloader
$this->absolutePath = $absolutePath;
}

public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void
public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): MessageInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): MessageInterface
public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void

{
$fileName = $downloadMessage->getData()['destinationFile'];
$fileName = $message->getData()['destinationFile'];
$path = "$this->absolutePath/$fileName";
file_put_contents($path, file_get_contents($downloadMessage->getData()['url']));
file_put_contents($path, file_get_contents($message->getData()['url']));

return $message;
Comment on lines +100 to +101
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return $message;

}
}
```

The last thing we should do is to create a configuration for the `Yiisoft\Queue\Worker\Worker`:

```php
$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']];
$worker = new \Yiisoft\Queue\Worker\Worker(
$handlers, // Here it is
$logger,
$injector,
$container
Expand Down
12 changes: 5 additions & 7 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,13 @@
/* @var array $params */

return [
QueueWorker::class => [
'class' => QueueWorker::class,
'__construct()' => [$params['yiisoft/queue']['handlers']],
],
WorkerInterface::class => QueueWorker::class,
LoopInterface::class => static function (ContainerInterface $container): LoopInterface {
return extension_loaded('pcntl')
? $container->get(SignalLoop::class)
: $container->get(SimpleLoop::class);
return $container->get(
extension_loaded('pcntl')
? SignalLoop::class
: SimpleLoop::class
);
},
QueueFactoryInterface::class => QueueFactory::class,
QueueFactory::class => [
Expand Down
1 change: 0 additions & 1 deletion config/params.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
],
],
'yiisoft/queue' => [
'handlers' => [],
'channel-definitions' => [],
'middlewares-push' => [],
'middlewares-consume' => [],
Expand Down
7 changes: 5 additions & 2 deletions src/Message/EnvelopeTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ public function withMessage(MessageInterface $message): self
return $instance;
}

public function getHandlerName(): string
/**
* @return class-string<MessageHandlerInterface>
*/
public function getHandler(): string
{
return $this->message->getHandlerName();
return $this->message->getHandler();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a problem here, when handler support only original message and don't support envelope.

}

public function getData(): mixed
Expand Down
7 changes: 4 additions & 3 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@
final class Message implements MessageInterface
{
/**
* @param class-string<MessageHandlerInterface> $handler Message handler class name
* @param mixed $data Message data, encodable by a queue adapter
* @param array $metadata Message metadata, encodable by a queue adapter
* @param string|null $id Message id
*/
public function __construct(
private string $handlerName,
private string $handler,
private mixed $data,
private array $metadata = [],
) {
}

public function getHandlerName(): string
public function getHandler(): string
{
return $this->handlerName;
return $this->handler;
}

public function getData(): mixed
Expand Down
10 changes: 10 additions & 0 deletions src/Message/MessageHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

interface MessageHandlerInterface
{
public function handle(MessageInterface $message): void;
}
3 changes: 2 additions & 1 deletion src/Message/MessageInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ interface MessageInterface
* Returns handler name.
*
* @return string
* @psalm-return class-string<MessageHandlerInterface>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any string can be here (for example, ID for container), but class string only.

*/
public function getHandlerName(): string;
public function getHandler(): string;

/**
* Returns payload data.
Expand Down
4 changes: 2 additions & 2 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function push(
): MessageInterface {
$this->logger->debug(
'Preparing to push message with handler name "{handlerName}".',
['handlerName' => $message->getHandlerName()]
['handlerName' => $message->getHandler()]
xepozz marked this conversation as resolved.
Show resolved Hide resolved
);

$request = new PushRequest($message, $this->adapter);
Expand All @@ -61,7 +61,7 @@ public function push(
$messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
$this->logger->info(
'Pushed message with handler name "{handlerName}" to the queue. Assigned ID #{id}.',
['handlerName' => $message->getHandlerName(), 'id' => $messageId]
['handlerName' => $message->getHandler(), 'id' => $messageId]
);

return $message;
Expand Down
92 changes: 14 additions & 78 deletions src/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
namespace Yiisoft\Queue\Worker;

use Closure;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use ReflectionException;
use ReflectionMethod;
use RuntimeException;
use Throwable;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Exception\JobFailureException;
use Yiisoft\Queue\Message\MessageHandlerInterface;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
Expand All @@ -29,10 +26,7 @@

final class Worker implements WorkerInterface
{
private array $handlersCached = [];

public function __construct(
private array $handlers,
private LoggerInterface $logger,
private Injector $injector,
private ContainerInterface $container,
Expand All @@ -48,14 +42,23 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa
{
$this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);

$name = $message->getHandlerName();
$handler = $this->getHandler($name);
$handlerClass = $message->getHandler();

if (!is_subclass_of($handlerClass, MessageHandlerInterface::class, true)) {
throw new RuntimeException(sprintf(
'Message handler "%s" for "%s" must implement "%s".',
$handlerClass,
$message::class,
MessageHandlerInterface::class,
));
}
$handler = $this->container->get($handlerClass);
if ($handler === null) {
throw new RuntimeException("Queue handler with name $name doesn't exist");
throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $handlerClass));
}

$request = new ConsumeRequest($message, $queue);
$closure = fn (MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
$closure = fn (MessageInterface $message): mixed => $this->injector->invoke([$handler, 'handle'], [$message]);
try {
return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
} catch (Throwable $exception) {
Expand All @@ -74,73 +77,6 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa
}
}

private function getHandler(string $name): ?callable
{
if (!array_key_exists($name, $this->handlersCached)) {
$this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null);
}

return $this->handlersCached[$name];
}

/**
* Checks if the handler is a DI container alias
*
* @param array|callable|object|string|null $definition
*
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
private function prepare(callable|object|array|string|null $definition): callable|null
{
if (is_string($definition) && $this->container->has($definition)) {
return $this->container->get($definition);
}

if (
is_array($definition)
&& array_keys($definition) === [0, 1]
&& is_string($definition[0])
&& is_string($definition[1])
) {
[$className, $methodName] = $definition;

if (!class_exists($className) && $this->container->has($className)) {
return [
$this->container->get($className),
$methodName,
];
}

if (!class_exists($className)) {
$this->logger->error("$className doesn't exist.");

return null;
}

try {
$reflection = new ReflectionMethod($className, $methodName);
} catch (ReflectionException $e) {
$this->logger->error($e->getMessage());

return null;
}
if ($reflection->isStatic()) {
return [$className, $methodName];
}
if ($this->container->has($className)) {
return [
$this->container->get($className),
$methodName,
];
}

return null;
}

return $definition;
}

private function createConsumeHandler(Closure $handler): MessageHandlerConsumeInterface
{
return new ConsumeFinalHandler($handler);
Expand Down
11 changes: 6 additions & 5 deletions tests/App/FakeHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

use Yiisoft\Queue\Message\MessageInterface;
use RuntimeException;
use Yiisoft\Queue\Message\MessageHandlerInterface;

final class FakeHandler
final class FakeHandler implements MessageHandlerInterface
{
public static array $processedMessages = [];

Expand All @@ -26,13 +27,13 @@ public function execute(MessageInterface $message): void
self::$processedMessages[] = $message;
}

public static function staticExecute(MessageInterface $message): void
public function executeWithException(MessageInterface $message): void
{
self::$processedMessages[] = $message;
throw new RuntimeException('Test exception');
}

public function executeWithException(MessageInterface $message): void
public function handle(MessageInterface $message): void
{
throw new RuntimeException('Test exception');
self::$processedMessages[] = $message;
}
}
16 changes: 7 additions & 9 deletions tests/Integration/MessageConsumingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Yiisoft\Queue\Tests\Integration;

use Psr\Container\ContainerInterface;
use Psr\Log\NullLogger;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Message\Message;
Expand All @@ -13,20 +12,18 @@
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFactoryFailureInterface;
use Yiisoft\Queue\Tests\Support\StackMessageHandler;
use Yiisoft\Queue\Tests\TestCase;
use Yiisoft\Queue\Worker\Worker;
use Yiisoft\Test\Support\Container\SimpleContainer;

final class MessageConsumingTest extends TestCase
{
private array $messagesProcessed;

public function testMessagesConsumed(): void
{
$this->messagesProcessed = [];

$container = $this->createMock(ContainerInterface::class);
$stackMessageHandler = new StackMessageHandler();
$container = new SimpleContainer([StackMessageHandler::class => $stackMessageHandler]);
$worker = new Worker(
['test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData()],
new NullLogger(),
new Injector($container),
$container,
Expand All @@ -36,9 +33,10 @@ public function testMessagesConsumed(): void

$messages = [1, 'foo', 'bar-baz'];
foreach ($messages as $message) {
$worker->process(new Message('test', $message), $this->getQueue());
$worker->process(new Message(StackMessageHandler::class, $message), $this->getQueue());
}

$this->assertEquals($messages, $this->messagesProcessed);
$data = array_map(fn (MessageInterface $message) => $message->getData(), $stackMessageHandler->processedMessages);
$this->assertEquals($messages, $data);
}
}
Loading
Loading