Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor handlers #182

Closed
wants to merge 11 commits into from
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ $data = [
'url' => $url,
'destinationFile' => $filename,
];
$message = new \Yiisoft\Queue\Message\Message('file-download', $data);
$message = new \Yiisoft\Queue\Message\Message(FileDownloader::class, $data);
```

Then you should push it to the queue:
Expand All @@ -80,7 +80,10 @@ $queue->push($message);
Its handler may look like the following:

```php
class FileDownloader
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageHandlerInterface;

class FileDownloader implements MessageHandlerInterface
{
private string $absolutePath;

Expand All @@ -89,21 +92,21 @@ class FileDownloader
$this->absolutePath = $absolutePath;
}

public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void
public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): MessageInterface
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): MessageInterface
public function handle(\Yiisoft\Queue\Message\MessageInterface $downloadMessage): void

{
$fileName = $downloadMessage->getData()['destinationFile'];
$fileName = $message->getData()['destinationFile'];
$path = "$this->absolutePath/$fileName";
file_put_contents($path, file_get_contents($downloadMessage->getData()['url']));
file_put_contents($path, file_get_contents($message->getData()['url']));

return $message;
Comment on lines +100 to +101
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return $message;

}
}
```

The last thing we should do is to create a configuration for the `Yiisoft\Queue\Worker\Worker`:

```php
$handlers = ['file-download' => [new FileDownloader('/path/to/save/files'), 'handle']];
$worker = new \Yiisoft\Queue\Worker\Worker(
$handlers, // Here it is
$logger,
$injector,
$container
Expand Down
12 changes: 5 additions & 7 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
/* @var array $params */

return [
QueueWorker::class => [
'class' => QueueWorker::class,
'__construct()' => [$params['yiisoft/queue']['handlers']],
],
WorkerInterface::class => QueueWorker::class,
LoopInterface::class => static function (ContainerInterface $container): LoopInterface {
return extension_loaded('pcntl')
? $container->get(SignalLoop::class)
: $container->get(SimpleLoop::class);
return $container->get(
extension_loaded('pcntl')
? SignalLoop::class
: SimpleLoop::class
);

Check warning on line 36 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L32-L36

Added lines #L32 - L36 were not covered by tests
},
QueueFactoryInterface::class => QueueFactory::class,
QueueFactory::class => [
Expand Down
7 changes: 5 additions & 2 deletions src/Message/EnvelopeTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ public function withMessage(MessageInterface $message): self
return $instance;
}

public function getHandlerName(): string
/**
* @return class-string<MessageHandlerInterface>
*/
public function getHandler(): string
{
return $this->message->getHandlerName();
return $this->message->getHandler();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may be a problem here, when handler support only original message and don't support envelope.

}

public function getData(): mixed
Expand Down
7 changes: 4 additions & 3 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,21 @@
final class Message implements MessageInterface
{
/**
* @param class-string<MessageHandlerInterface> $handler Message handler class name
* @param mixed $data Message data, encodable by a queue adapter
* @param array $metadata Message metadata, encodable by a queue adapter
* @param string|null $id Message id
*/
public function __construct(
private string $handlerName,
private string $handler,
private mixed $data,
private array $metadata = [],
) {
}

public function getHandlerName(): string
public function getHandler(): string
{
return $this->handlerName;
return $this->handler;
}

public function getData(): mixed
Expand Down
10 changes: 10 additions & 0 deletions src/Message/MessageHandlerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

interface MessageHandlerInterface
{
public function handle(MessageInterface $message): void;
}
3 changes: 2 additions & 1 deletion src/Message/MessageInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ interface MessageInterface
* Returns handler name.
*
* @return string
* @psalm-return class-string<MessageHandlerInterface>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any string can be here (for example, ID for container), but class string only.

*/
public function getHandlerName(): string;
public function getHandler(): string;

/**
* Returns payload data.
Expand Down
4 changes: 2 additions & 2 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function push(
): MessageInterface {
$this->logger->debug(
'Preparing to push message with handler name "{handlerName}".',
['handlerName' => $message->getHandlerName()]
['handlerName' => $message->getHandler()]
xepozz marked this conversation as resolved.
Show resolved Hide resolved
);

$request = new PushRequest($message, $this->adapter);
Expand All @@ -61,7 +61,7 @@ public function push(
$messageId = $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null';
$this->logger->info(
'Pushed message with handler name "{handlerName}" to the queue. Assigned ID #{id}.',
['handlerName' => $message->getHandlerName(), 'id' => $messageId]
['handlerName' => $message->getHandler(), 'id' => $messageId]
);

return $message;
Expand Down
92 changes: 14 additions & 78 deletions src/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@
namespace Yiisoft\Queue\Worker;

use Closure;
use Psr\Container\ContainerExceptionInterface;
use Psr\Container\ContainerInterface;
use Psr\Container\NotFoundExceptionInterface;
use Psr\Log\LoggerInterface;
use ReflectionException;
use ReflectionMethod;
use RuntimeException;
use Throwable;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Exception\JobFailureException;
use Yiisoft\Queue\Message\MessageHandlerInterface;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\Consume\ConsumeFinalHandler;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
Expand All @@ -29,10 +26,7 @@

final class Worker implements WorkerInterface
{
private array $handlersCached = [];

public function __construct(
private array $handlers,
private LoggerInterface $logger,
private Injector $injector,
private ContainerInterface $container,
Expand All @@ -48,14 +42,23 @@
{
$this->logger->info('Processing message #{message}.', ['message' => $message->getMetadata()[IdEnvelope::MESSAGE_ID_KEY] ?? 'null']);

$name = $message->getHandlerName();
$handler = $this->getHandler($name);
$handlerClass = $message->getHandler();

if (!is_subclass_of($handlerClass, MessageHandlerInterface::class, true)) {
throw new RuntimeException(sprintf(
'Message handler "%s" for "%s" must implement "%s".',
$handlerClass,
$message::class,
MessageHandlerInterface::class,
));

Check warning on line 53 in src/Worker/Worker.php

View check run for this annotation

Codecov / codecov/patch

src/Worker/Worker.php#L48-L53

Added lines #L48 - L53 were not covered by tests
}
$handler = $this->container->get($handlerClass);
if ($handler === null) {
throw new RuntimeException("Queue handler with name $name doesn't exist");
throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $handlerClass));

Check warning on line 57 in src/Worker/Worker.php

View check run for this annotation

Codecov / codecov/patch

src/Worker/Worker.php#L57

Added line #L57 was not covered by tests
}

$request = new ConsumeRequest($message, $queue);
$closure = fn (MessageInterface $message): mixed => $this->injector->invoke($handler, [$message]);
$closure = fn (MessageInterface $message): mixed => $this->injector->invoke([$handler, 'handle'], [$message]);
try {
return $this->consumeMiddlewareDispatcher->dispatch($request, $this->createConsumeHandler($closure))->getMessage();
} catch (Throwable $exception) {
Expand All @@ -74,73 +77,6 @@
}
}

private function getHandler(string $name): ?callable
{
if (!array_key_exists($name, $this->handlersCached)) {
$this->handlersCached[$name] = $this->prepare($this->handlers[$name] ?? null);
}

return $this->handlersCached[$name];
}

/**
* Checks if the handler is a DI container alias
*
* @param array|callable|object|string|null $definition
*
* @throws ContainerExceptionInterface
* @throws NotFoundExceptionInterface
*/
private function prepare(callable|object|array|string|null $definition): callable|null
{
if (is_string($definition) && $this->container->has($definition)) {
return $this->container->get($definition);
}

if (
is_array($definition)
&& array_keys($definition) === [0, 1]
&& is_string($definition[0])
&& is_string($definition[1])
) {
[$className, $methodName] = $definition;

if (!class_exists($className) && $this->container->has($className)) {
return [
$this->container->get($className),
$methodName,
];
}

if (!class_exists($className)) {
$this->logger->error("$className doesn't exist.");

return null;
}

try {
$reflection = new ReflectionMethod($className, $methodName);
} catch (ReflectionException $e) {
$this->logger->error($e->getMessage());

return null;
}
if ($reflection->isStatic()) {
return [$className, $methodName];
}
if ($this->container->has($className)) {
return [
$this->container->get($className),
$methodName,
];
}

return null;
}

return $definition;
}

private function createConsumeHandler(Closure $handler): MessageHandlerConsumeInterface
{
return new ConsumeFinalHandler($handler);
Expand Down
11 changes: 6 additions & 5 deletions tests/App/FakeHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@

use Yiisoft\Queue\Message\MessageInterface;
use RuntimeException;
use Yiisoft\Queue\Message\MessageHandlerInterface;

final class FakeHandler
final class FakeHandler implements MessageHandlerInterface
{
public static array $processedMessages = [];

Expand All @@ -26,13 +27,13 @@ public function execute(MessageInterface $message): void
self::$processedMessages[] = $message;
}

public static function staticExecute(MessageInterface $message): void
public function executeWithException(MessageInterface $message): never
{
self::$processedMessages[] = $message;
throw new RuntimeException('Test exception');
}

public function executeWithException(MessageInterface $message): never
public function handle(MessageInterface $message): void
{
throw new RuntimeException('Test exception');
self::$processedMessages[] = $message;
}
}
16 changes: 7 additions & 9 deletions tests/Integration/MessageConsumingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

namespace Yiisoft\Queue\Tests\Integration;

use Psr\Container\ContainerInterface;
use Psr\Log\NullLogger;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Message\Message;
Expand All @@ -13,20 +12,18 @@
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
use Yiisoft\Queue\Middleware\FailureHandling\FailureMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\FailureHandling\MiddlewareFactoryFailureInterface;
use Yiisoft\Queue\Tests\Support\StackMessageHandler;
use Yiisoft\Queue\Tests\TestCase;
use Yiisoft\Queue\Worker\Worker;
use Yiisoft\Test\Support\Container\SimpleContainer;

final class MessageConsumingTest extends TestCase
{
private array $messagesProcessed;

public function testMessagesConsumed(): void
{
$this->messagesProcessed = [];

$container = $this->createMock(ContainerInterface::class);
$stackMessageHandler = new StackMessageHandler();
$container = new SimpleContainer([StackMessageHandler::class => $stackMessageHandler]);
$worker = new Worker(
['test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData()],
new NullLogger(),
new Injector($container),
$container,
Expand All @@ -36,9 +33,10 @@ public function testMessagesConsumed(): void

$messages = [1, 'foo', 'bar-baz'];
foreach ($messages as $message) {
$worker->process(new Message('test', $message), $this->getQueue());
$worker->process(new Message(StackMessageHandler::class, $message), $this->getQueue());
}

$this->assertEquals($messages, $this->messagesProcessed);
$data = array_map(fn (MessageInterface $message) => $message->getData(), $stackMessageHandler->processedMessages);
$this->assertEquals($messages, $data);
}
}
8 changes: 4 additions & 4 deletions tests/Integration/MiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Yiisoft\Injector\Injector;
use Yiisoft\Queue\Tests\Support\NullMessageHandler;
use Yiisoft\Test\Support\Container\SimpleContainer;
use Yiisoft\Test\Support\Log\SimpleLogger;
use Yiisoft\Queue\Adapter\SynchronousAdapter;
Expand Down Expand Up @@ -90,7 +91,7 @@ public function testFullStackConsume(): void
'common 1',
'common 2',
];
$container = new SimpleContainer();
$container = new SimpleContainer([NullMessageHandler::class => new NullMessageHandler()]);
$callableFactory = new CallableFactory($container);

$consumeMiddlewareDispatcher = new ConsumeMiddlewareDispatcher(
Expand All @@ -110,15 +111,14 @@ public function testFullStackConsume(): void
);

$worker = new Worker(
['test' => static fn () => true],
new SimpleLogger(),
new Injector($container),
$container,
$consumeMiddlewareDispatcher,
$failureMiddlewareDispatcher,
);

$message = new Message('test', ['initial']);
$message = new Message(NullMessageHandler::class, ['initial']);
$messageConsumed = $worker->process($message, $this->createMock(QueueInterface::class));

self::assertEquals($stack, $messageConsumed->getData());
Expand All @@ -129,7 +129,7 @@ public function testFullStackFailure(): void
$exception = new InvalidArgumentException('test');
$this->expectExceptionObject($exception);

$message = new Message('simple', null, []);
$message = new Message(NullMessageHandler::class, null, []);
$queueCallback = static fn (MessageInterface $message): MessageInterface => $message;
$queue = $this->createMock(QueueInterface::class);
$container = new SimpleContainer([SendAgainMiddleware::class => new SendAgainMiddleware('test-container', 1, $queue)]);
Expand Down
Loading
Loading