Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add envelope stack + MessageSerializer #188

Merged
merged 15 commits into from
Jan 22, 2024
3 changes: 3 additions & 0 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
use Yiisoft\Queue\Cli\SimpleLoop;
use Yiisoft\Queue\Command\ListenAllCommand;
use Yiisoft\Queue\Command\RunCommand;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\MessageSerializerInterface;
use Yiisoft\Queue\Middleware\Consume\ConsumeMiddlewareDispatcher;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsume;
use Yiisoft\Queue\Middleware\Consume\MiddlewareFactoryConsumeInterface;
Expand Down Expand Up @@ -54,6 +56,7 @@
FailureMiddlewareDispatcher::class => [
'__construct()' => ['middlewareDefinitions' => $params['yiisoft/queue']['middlewares-fail']],
],
MessageSerializerInterface::class => JsonMessageSerializer::class,

Check warning on line 59 in config/di.php

View check run for this annotation

Codecov / codecov/patch

config/di.php#L59

Added line #L59 was not covered by tests
RunCommand::class => [
'__construct()' => [
'channels' => array_keys($params['yiisoft/yii-queue']['channel-definitions']),
Expand Down
4 changes: 4 additions & 0 deletions src/Message/EnvelopeInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
*/
interface EnvelopeInterface extends MessageInterface
{
public const ENVELOPE_STACK_KEY = 'envelopes';

public static function fromMessage(MessageInterface $message): self;

public function getMessage(): MessageInterface;

public function withMessage(MessageInterface $message): self;
Expand Down
21 changes: 20 additions & 1 deletion src/Message/EnvelopeTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,27 @@
return $this->message->getData();
}

public static function fromMessage(MessageInterface $message): self
{
return new static($message);
}

public function getMetadata(): array
{
return $this->message->getMetadata();
return array_merge(
$this->message->getMetadata(),
[
self::ENVELOPE_STACK_KEY => array_merge(
$this->message->getMetadata()[self::ENVELOPE_STACK_KEY] ?? [],
[self::class],
),
],
$this->getEnvelopeMetadata(),
);
}

public function getEnvelopeMetadata(): array

Check warning on line 53 in src/Message/EnvelopeTrait.php

View check run for this annotation

Codecov / codecov/patch

src/Message/EnvelopeTrait.php#L53

Added line #L53 was not covered by tests
{
return [];

Check warning on line 55 in src/Message/EnvelopeTrait.php

View check run for this annotation

Codecov / codecov/patch

src/Message/EnvelopeTrait.php#L55

Added line #L55 was not covered by tests
}
}
6 changes: 2 additions & 4 deletions src/Message/IdEnvelope.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ public function getId(): string|int|null
return $this->id ?? $this->message->getMetadata()[self::MESSAGE_ID_KEY] ?? null;
}

public function getMetadata(): array
private function getEnvelopeMetadata(): array
{
return array_merge($this->message->getMetadata(), [
self::MESSAGE_ID_KEY => $this->getId(),
]);
return [self::MESSAGE_ID_KEY => $this->getId()];
}
}
59 changes: 59 additions & 0 deletions src/Message/JsonMessageSerializer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

use InvalidArgumentException;
use JsonException;

final class JsonMessageSerializer implements MessageSerializerInterface
{
/**
* @throws JsonException
*/
public function serialize(MessageInterface $message): string
{
$payload = [
'name' => $message->getHandlerName(),
'data' => $message->getData(),
'meta' => $message->getMetadata(),
];

return json_encode($payload, JSON_THROW_ON_ERROR);
}

/**
* @throws JsonException
* @throws InvalidArgumentException
*/
public function unserialize(string $value): MessageInterface
{
$payload = json_decode($value, true, 512, JSON_THROW_ON_ERROR);
if (!is_array($payload)) {
throw new InvalidArgumentException('Payload must be array. Got ' . get_debug_type($payload) . '.');
}

$meta = $payload['meta'] ?? [];
if (!is_array($meta)) {
throw new InvalidArgumentException('Metadata must be array. Got ' . get_debug_type($meta) . '.');
}

// TODO: will be removed later
$message = new Message($payload['name'] ?? '$name', $payload['data'] ?? null, $meta);

if (isset($meta[EnvelopeInterface::ENVELOPE_STACK_KEY]) && is_array($meta[EnvelopeInterface::ENVELOPE_STACK_KEY])) {
$message = $message->withMetadata(
array_merge($message->getMetadata(), [EnvelopeInterface::ENVELOPE_STACK_KEY => []]),
);
foreach ($meta[EnvelopeInterface::ENVELOPE_STACK_KEY] as $envelope) {
if (is_string($envelope) && class_exists($envelope) && is_subclass_of($envelope, EnvelopeInterface::class)) {
$message = $envelope::fromMessage($message);
}
}
}


return $message;
}
}
8 changes: 8 additions & 0 deletions src/Message/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,12 @@ public function getMetadata(): array
{
return $this->metadata;
}

public function withMetadata(array $metadata): self
{
$instance = clone $this;
$instance->metadata = $metadata;

return $instance;
}
}
12 changes: 12 additions & 0 deletions src/Message/MessageSerializerInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Message;

interface MessageSerializerInterface
{
public function serialize(MessageInterface $message): string;

public function unserialize(string $value): MessageInterface;
}
2 changes: 1 addition & 1 deletion src/Middleware/FailureHandling/FailureEnvelope.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ final class FailureEnvelope implements EnvelopeInterface

public function __construct(
private MessageInterface $message,
private array $meta,
private array $meta = [],
) {
}

Expand Down
4 changes: 0 additions & 4 deletions src/QueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@ interface QueueInterface
/**
* Pushes a message into the queue.
*
* @param MessageInterface $message
* @param array|callable|MiddlewarePushInterface|string ...$middlewareDefinitions
*
* @return MessageInterface
*/
public function push(MessageInterface $message, MiddlewarePushInterface|callable|array|string ...$middlewareDefinitions): MessageInterface;

/**
* Execute all existing jobs and exit
*
* @param int $max
*
* @return int How many messages were processed
*/
public function run(int $max = 0): int;
Expand Down
2 changes: 1 addition & 1 deletion src/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa
$name = $message->getHandlerName();
$handler = $this->getHandler($name);
if ($handler === null) {
throw new RuntimeException("Queue handler with name $name doesn't exist");
throw new RuntimeException(sprintf('Queue handler with name "%s" does not exist', $name));
}

$request = new ConsumeRequest($message, $queue);
Expand Down
47 changes: 47 additions & 0 deletions tests/Unit/EnvelopeTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php

declare(strict_types=1);

namespace Unit;

use PHPUnit\Framework\TestCase;
use Yiisoft\Queue\Message\EnvelopeInterface;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\Message;

final class EnvelopeTest extends TestCase
{
public function testEnvelopeStack(): void
{
$message = new Message('handler', 'test');
$message = new IdEnvelope($message, 'test-id');

$this->assertEquals('test', $message->getMessage()->getData());

$stack = $message->getMetadata()[EnvelopeInterface::ENVELOPE_STACK_KEY];
$this->assertIsArray($stack);

$this->assertEquals([
IdEnvelope::class,
], $stack);
}

public function testEnvelopeDuplicates(): void
{
$message = new Message('handler', 'test');
$message = new IdEnvelope($message, 'test-id');
$message = new IdEnvelope($message, 'test-id');
$message = new IdEnvelope($message, 'test-id');

$this->assertEquals('test', $message->getMessage()->getData());

$stack = $message->getMetadata()[EnvelopeInterface::ENVELOPE_STACK_KEY];
$this->assertIsArray($stack);

$this->assertEquals([
IdEnvelope::class,
IdEnvelope::class,
IdEnvelope::class,
], $stack);
}
}
Loading
Loading