Skip to content

Commit

Permalink
Update due to yiisoft/queue changes
Browse files Browse the repository at this point in the history
  • Loading branch information
viktorprogger committed Jan 8, 2025
1 parent 3e6b3c2 commit 3f39575
Show file tree
Hide file tree
Showing 13 changed files with 49 additions and 104 deletions.
25 changes: 17 additions & 8 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,30 @@

namespace Yiisoft\Queue\AMQP;

use BackedEnum;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;

final class Adapter implements AdapterInterface
{
public function __construct(
private QueueProviderInterface $queueProvider,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private readonly MessageSerializerInterface $serializer,
private readonly LoopInterface $loop,
) {
}

public function withChannel(string $channel): self
public function withChannel(BackedEnum|string $channel): self
{
$instance = clone $this;
$instance->queueProvider = $this->queueProvider->withChannelName($channel);
$channelName = is_string($channel) ? $channel : (string) $channel->value;
$instance->queueProvider = $this->queueProvider->withChannelName($channelName);

return $instance;
}
Expand All @@ -44,12 +47,12 @@ public function runExisting(callable $handlerCallback): void
/**
* @return never
*/
public function status(string $id): JobStatus
public function status(int|string $id): JobStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
Expand All @@ -68,7 +71,8 @@ public function push(MessageInterface $message): void
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);

return new IdEnvelope($message, $messageId);
}

public function subscribe(callable $handlerCallback): void
Expand All @@ -87,7 +91,7 @@ public function subscribe(callable $handlerCallback): void
true,
function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
try {
$handlerCallback($this->serializer->unserialize($amqpMessage->body));
$handlerCallback($this->serializer->unserialize($amqpMessage->getBody()));
$channel->basic_ack($amqpMessage->getDeliveryTag());
} catch (Throwable $exception) {
$consumerTag = $amqpMessage->getConsumerTag();
Expand Down Expand Up @@ -117,4 +121,9 @@ public function withQueueProvider(QueueProviderInterface $queueProvider): self

return $new;
}

public function getChannel(): string
{
return $this->queueProvider->getQueueSettings()->getName();
}
}
2 changes: 0 additions & 2 deletions src/Exception/NoKeyInPayloadException.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public function getName(): string
}

/**
* @return string
*
* @infection-ignore-all
*/
public function getSolution(): ?string
Expand Down
8 changes: 4 additions & 4 deletions src/ExistingMessagesConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ final class ExistingMessagesConsumer
private bool $messageConsumed = false;

public function __construct(
private AMQPChannel $channel,
private string $queueName,
private MessageSerializerInterface $serializer
private readonly AMQPChannel $channel,
private readonly string $queueName,
private readonly MessageSerializerInterface $serializer
) {
}

Expand All @@ -37,7 +37,7 @@ public function consume(callable $callback): void
false,
function (AMQPMessage $amqpMessage) use ($callback): void {
try {
$message = $this->serializer->unserialize($amqpMessage->body);
$message = $this->serializer->unserialize($amqpMessage->getBody());
if ($this->messageConsumed = $callback($message)) {
$this->channel->basic_ack($amqpMessage->getDeliveryTag());
}
Expand Down
12 changes: 7 additions & 5 deletions src/MessageSerializer.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use InvalidArgumentException;
use JsonException;
use Yiisoft\Queue\AMQP\Exception\NoKeyInPayloadException;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Message\MessageInterface;

Expand All @@ -18,7 +19,7 @@ class MessageSerializer implements MessageSerializerInterface
public function serialize(MessageInterface $message): string
{
$payload = [
'id' => $message->getId(),
'id' => IdEnvelope::fromMessage($message)->getId(),
'name' => $message->getHandlerName(),
'data' => $message->getData(),
'meta' => $message->getMetadata(),
Expand All @@ -32,7 +33,7 @@ public function serialize(MessageInterface $message): string
* @throws NoKeyInPayloadException
* @throws InvalidArgumentException
*/
public function unserialize(string $value): Message
public function unserialize(string $value): MessageInterface
{
$payload = json_decode($value, true, 512, JSON_THROW_ON_ERROR);
if (!is_array($payload)) {
Expand All @@ -45,7 +46,7 @@ public function unserialize(string $value): Message
}

$id = $payload['id'] ?? null;
if ($id !== null && !is_string($id)) {
if ($id !== null && !is_string($id) && !is_int($id)) {
throw new NoKeyInPayloadException('id', $payload);
}

Expand All @@ -54,11 +55,12 @@ public function unserialize(string $value): Message
throw new NoKeyInPayloadException('meta', $payload);
}

return new Message(
$message = new Message(
$name,
$payload['data'] ?? null,
$meta,
$id,
);

return $id === null ? $message : new IdEnvelope($message, $id);
}
}
2 changes: 1 addition & 1 deletion src/Middleware/DelayMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

final class DelayMiddleware implements DelayMiddlewareInterface
{
public function __construct(private float $delayInSeconds, private bool $forcePersistentMessages = true)
public function __construct(private float $delayInSeconds, private readonly bool $forcePersistentMessages = true)
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final class QueueProvider implements QueueProviderInterface
private ?AMQPChannel $channel = null;

public function __construct(
private AbstractConnection $connection,
private readonly AbstractConnection $connection,
private QueueSettingsInterface $queueSettings,
private ?ExchangeSettingsInterface $exchangeSettings = null,
private array $messageProperties = [],
Expand Down
4 changes: 2 additions & 2 deletions src/Settings/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
namespace Yiisoft\Queue\AMQP\Settings;

use PhpAmqpLib\Wire\AMQPTable;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\QueueInterface;

final class Queue implements QueueSettingsInterface
{
public function __construct(
private string $queueName = QueueFactoryInterface::DEFAULT_CHANNEL_NAME,
private string $queueName = QueueInterface::DEFAULT_CHANNEL,
private bool $passive = false,
private bool $durable = false,
private bool $exclusive = false,
Expand Down
2 changes: 1 addition & 1 deletion tests/Support/ExtendedSimpleMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
*/
final class ExtendedSimpleMessageHandler
{
public function __construct(private FileHelper $fileHelper)
public function __construct(private readonly FileHelper $fileHelper)
{
}

Expand Down
18 changes: 12 additions & 6 deletions tests/Support/FakeAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Yiisoft\Queue\AMQP\Tests\Support;

use BackedEnum;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\MessageSerializerInterface;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
Expand All @@ -14,9 +15,9 @@
final class FakeAdapter implements AdapterInterface
{
public function __construct(
private QueueProviderInterface $queueProvider,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private readonly QueueProviderInterface $queueProvider,
private readonly MessageSerializerInterface $serializer,
private readonly LoopInterface $loop,
) {
}

Expand All @@ -25,12 +26,12 @@ public function runExisting(callable $handlerCallback): void
// TODO: Implement runExisting() method.
}

public function status(string $id): JobStatus
public function status(int|string $id): JobStatus
{
// TODO: Implement status() method.
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
// TODO: Implement push() method.
}
Expand All @@ -40,8 +41,13 @@ public function subscribe(callable $handlerCallback): void
// TODO: Implement subscribe() method.
}

public function withChannel(string $channel): AdapterInterface
public function withChannel(BackedEnum|string $channel): AdapterInterface
{
// TODO: Implement withChannel() method.
}

public function getChannel(): string
{
// TODO: Implement getChannel() method.
}
}
2 changes: 1 addition & 1 deletion tests/Support/SimpleMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

final class SimpleMessageHandler
{
public function __construct(private FileHelper $fileHelper)
public function __construct(private readonly FileHelper $fileHelper)
{
}

Expand Down
71 changes: 0 additions & 71 deletions tests/Unit/QueueFactoryTest.php

This file was deleted.

2 changes: 1 addition & 1 deletion tests/Unit/QueueProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public function testWithQueueAndExchangeSettings(): void
self::assertNotNull($result);
self::assertEquals($time, $result);

$messageBody = json_decode($message->body, true, 512, JSON_THROW_ON_ERROR);
$messageBody = json_decode($message->getBody(), true, 512, JSON_THROW_ON_ERROR);
self::assertEquals($messageBody['data']['payload']['time'], $result);
}

Expand Down
3 changes: 2 additions & 1 deletion tests/Unit/QueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Yiisoft\Queue\AMQP\Tests\Support\FileHelper;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Exception\JobFailureException;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Queue;

Expand All @@ -41,7 +42,7 @@ public function testStatus(): void

$this->expectException(NotImplementedException::class);
$this->expectExceptionMessage("Status check is not supported by the adapter $adapterClass.");
$adapter->status($message->getId());
$adapter->status(IdEnvelope::fromMessage($message)->getId() ?? '');
}

/**
Expand Down

0 comments on commit 3f39575

Please sign in to comment.