Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update due to yiisoft/queue changes #111

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions config/di.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

declare(strict_types=1);

use Yiisoft\Queue\AMQP\MessageSerializer;
use Yiisoft\Queue\AMQP\MessageSerializerInterface;
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\QueueProviderInterface;
use Yiisoft\Queue\AMQP\Settings\Queue;
use Yiisoft\Queue\AMQP\Settings\QueueSettingsInterface;

return [
MessageSerializerInterface::class => MessageSerializer::class,
QueueProviderInterface::class => QueueProvider::class,
QueueSettingsInterface::class => Queue::class,
];
26 changes: 18 additions & 8 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,31 @@

namespace Yiisoft\Queue\AMQP;

use BackedEnum;
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Exception\NotImplementedException;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;

final class Adapter implements AdapterInterface
{
public function __construct(
private QueueProviderInterface $queueProvider,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private readonly MessageSerializerInterface $serializer,
private readonly LoopInterface $loop,
) {
}

public function withChannel(string $channel): self
public function withChannel(BackedEnum|string $channel): self
{
$instance = clone $this;
$instance->queueProvider = $this->queueProvider->withChannelName($channel);
$channelName = is_string($channel) ? $channel : (string) $channel->value;
$instance->queueProvider = $this->queueProvider->withChannelName($channelName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better add enumeration support to QueueProviderInterface


return $instance;
}
Expand All @@ -44,12 +48,12 @@
/**
* @return never
*/
public function status(string $id): JobStatus
public function status(int|string $id): JobStatus
{
throw new NotImplementedException('Status check is not supported by the adapter ' . self::class . '.');
}

public function push(MessageInterface $message): void
public function push(MessageInterface $message): MessageInterface
{
$payload = $this->serializer->serialize($message);
$amqpMessage = new AMQPMessage(
Expand All @@ -68,7 +72,8 @@
);
/** @var string $messageId */
$messageId = $amqpMessage->get('message_id');
$message->setId($messageId);

return new IdEnvelope($message, $messageId);
}

public function subscribe(callable $handlerCallback): void
Expand All @@ -81,18 +86,18 @@
$this->queueProvider
->getQueueSettings()
->getName(),
false,

Check warning on line 89 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.3-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": --- Original +++ New @@ @@ public function subscribe(callable $handlerCallback) : void { $channel = $this->queueProvider->getChannel(); - $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), false, false, false, true, function (AMQPMessage $amqpMessage) use($handlerCallback, $channel) : void { + $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), true, false, false, true, function (AMQPMessage $amqpMessage) use($handlerCallback, $channel) : void { try { $handlerCallback($this->serializer->unserialize($amqpMessage->getBody())); $channel->basic_ack($amqpMessage->getDeliveryTag());
false,
false,

Check warning on line 91 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.3-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": --- Original +++ New @@ @@ public function subscribe(callable $handlerCallback) : void { $channel = $this->queueProvider->getChannel(); - $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), false, false, false, true, function (AMQPMessage $amqpMessage) use($handlerCallback, $channel) : void { + $channel->basic_consume($this->queueProvider->getQueueSettings()->getName(), $this->queueProvider->getQueueSettings()->getName(), false, false, true, true, function (AMQPMessage $amqpMessage) use($handlerCallback, $channel) : void { try { $handlerCallback($this->serializer->unserialize($amqpMessage->getBody())); $channel->basic_ack($amqpMessage->getDeliveryTag());
true,
function (AMQPMessage $amqpMessage) use ($handlerCallback, $channel): void {
try {
$handlerCallback($this->serializer->unserialize($amqpMessage->body));
$handlerCallback($this->serializer->unserialize($amqpMessage->getBody()));
$channel->basic_ack($amqpMessage->getDeliveryTag());
} catch (Throwable $exception) {
$consumerTag = $amqpMessage->getConsumerTag();
if ($consumerTag !== null) {
$channel->basic_cancel($consumerTag);

Check warning on line 100 in src/Adapter.php

View workflow job for this annotation

GitHub Actions / PHP 8.3-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ } catch (Throwable $exception) { $consumerTag = $amqpMessage->getConsumerTag(); if ($consumerTag !== null) { - $channel->basic_cancel($consumerTag); + } throw $exception; }
}

throw $exception;
Expand All @@ -117,4 +122,9 @@

return $new;
}

public function getChannel(): string
{
return $this->queueProvider->getQueueSettings()->getName();
}
}
54 changes: 0 additions & 54 deletions src/Exception/NoKeyInPayloadException.php

This file was deleted.

9 changes: 5 additions & 4 deletions src/ExistingMessagesConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use PhpAmqpLib\Message\AMQPMessage;
use Throwable;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface as MessageSerializerInterfaceAlias;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
use Yiisoft\Queue\Message\MessageSerializerInterface as MessageSerializerInterfaceAlias;
use Yiisoft\Queue\Message\MessageSerializerInterface;


/**
* @internal
Expand All @@ -17,9 +18,9 @@
private bool $messageConsumed = false;

public function __construct(
private AMQPChannel $channel,
private string $queueName,
private MessageSerializerInterface $serializer
private readonly AMQPChannel $channel,
private readonly string $queueName,
private readonly MessageSerializerInterfaceAlias $serializer
) {
}

Expand All @@ -31,15 +32,15 @@
$this->channel->basic_consume(
$this->queueName,
'',
false,

Check warning on line 35 in src/ExistingMessagesConsumer.php

View workflow job for this annotation

GitHub Actions / PHP 8.3-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": --- Original +++ New @@ @@ */ public function consume(callable $callback) : void { - $this->channel->basic_consume($this->queueName, '', false, false, false, false, function (AMQPMessage $amqpMessage) use($callback) : void { + $this->channel->basic_consume($this->queueName, '', true, false, false, false, function (AMQPMessage $amqpMessage) use($callback) : void { try { $message = $this->serializer->unserialize($amqpMessage->getBody()); if ($this->messageConsumed = $callback($message)) {
false,
false,

Check warning on line 37 in src/ExistingMessagesConsumer.php

View workflow job for this annotation

GitHub Actions / PHP 8.3-ubuntu-latest

Escaped Mutant for Mutator "FalseValue": --- Original +++ New @@ @@ */ public function consume(callable $callback) : void { - $this->channel->basic_consume($this->queueName, '', false, false, false, false, function (AMQPMessage $amqpMessage) use($callback) : void { + $this->channel->basic_consume($this->queueName, '', false, false, true, false, function (AMQPMessage $amqpMessage) use($callback) : void { try { $message = $this->serializer->unserialize($amqpMessage->getBody()); if ($this->messageConsumed = $callback($message)) {
false,
function (AMQPMessage $amqpMessage) use ($callback): void {
try {
$message = $this->serializer->unserialize($amqpMessage->body);
$message = $this->serializer->unserialize($amqpMessage->getBody());
if ($this->messageConsumed = $callback($message)) {
$this->channel->basic_ack($amqpMessage->getDeliveryTag());

Check warning on line 43 in src/ExistingMessagesConsumer.php

View workflow job for this annotation

GitHub Actions / PHP 8.3-ubuntu-latest

Escaped Mutant for Mutator "MethodCallRemoval": --- Original +++ New @@ @@ try { $message = $this->serializer->unserialize($amqpMessage->getBody()); if ($this->messageConsumed = $callback($message)) { - $this->channel->basic_ack($amqpMessage->getDeliveryTag()); + } } catch (Throwable $exception) { $this->messageConsumed = false;
}
} catch (Throwable $exception) {
$this->messageConsumed = false;
Expand All @@ -53,7 +54,7 @@
}
);

do {

Check warning on line 57 in src/ExistingMessagesConsumer.php

View workflow job for this annotation

GitHub Actions / PHP 8.3-ubuntu-latest

Escaped Mutant for Mutator "DoWhile": --- Original +++ New @@ @@ do { $this->messageConsumed = false; $this->channel->wait(null, true); - } while ($this->messageConsumed === true); + } while (false); } }
$this->messageConsumed = false;
$this->channel->wait(null, true);
} while ($this->messageConsumed === true);
Expand Down
64 changes: 0 additions & 64 deletions src/MessageSerializer.php

This file was deleted.

14 changes: 0 additions & 14 deletions src/MessageSerializerInterface.php

This file was deleted.

2 changes: 1 addition & 1 deletion src/Middleware/DelayMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

final class DelayMiddleware implements DelayMiddlewareInterface
{
public function __construct(private float $delayInSeconds, private bool $forcePersistentMessages = true)
public function __construct(private float $delayInSeconds, private readonly bool $forcePersistentMessages = true)
{
}

Expand Down
2 changes: 1 addition & 1 deletion src/QueueProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ final class QueueProvider implements QueueProviderInterface
private ?AMQPChannel $channel = null;

public function __construct(
private AbstractConnection $connection,
private readonly AbstractConnection $connection,
private QueueSettingsInterface $queueSettings,
private ?ExchangeSettingsInterface $exchangeSettings = null,
private array $messageProperties = [],
Expand Down
4 changes: 2 additions & 2 deletions src/Settings/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
namespace Yiisoft\Queue\AMQP\Settings;

use PhpAmqpLib\Wire\AMQPTable;
use Yiisoft\Queue\QueueFactoryInterface;
use Yiisoft\Queue\QueueInterface;

final class Queue implements QueueSettingsInterface
{
public function __construct(
private string $queueName = QueueFactoryInterface::DEFAULT_CHANNEL_NAME,
private string $queueName = QueueInterface::DEFAULT_CHANNEL,
private bool $passive = false,
private bool $durable = false,
private bool $exclusive = false,
Expand Down
6 changes: 3 additions & 3 deletions tests/Integration/DelayMiddlewareTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
use Psr\Log\LoggerInterface;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\AMQP\Adapter;
use Yiisoft\Queue\AMQP\MessageSerializer;
use Yiisoft\Queue\AMQP\Middleware\DelayMiddleware;
use Yiisoft\Queue\AMQP\QueueProvider;
use Yiisoft\Queue\AMQP\Settings\Queue as QueueSettings;
Expand All @@ -18,6 +17,7 @@
use Yiisoft\Queue\AMQP\Tests\Support\SimpleMessageHandler;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Cli\SignalLoop;
use Yiisoft\Queue\Message\JsonMessageSerializer;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Middleware\CallableFactory;
use Yiisoft\Queue\Middleware\Push\MiddlewareFactoryPush;
Expand All @@ -44,7 +44,7 @@ public function testMainFlow(): void
$this->createConnection(),
new QueueSettings(),
),
new MessageSerializer(),
new JsonMessageSerializer(),
new SignalLoop(),
);
$queue = $this->makeQueue($adapter);
Expand Down Expand Up @@ -75,7 +75,7 @@ public function testMainFlowWithFakeAdapter(): void
$this->createConnection(),
new QueueSettings(),
),
new MessageSerializer(),
new JsonMessageSerializer(),
new SignalLoop(),
);
$queue = $this->makeQueue($adapter);
Expand Down
5 changes: 5 additions & 0 deletions tests/Integration/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@ protected function queueListen(?string $queue = null): void
$process = new Process($command);
$this->processes[] = $process;
$process->start();

usleep(500000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really good to sleep during the tests

if (!$process->isRunning()) {
throw new \Exception('Failed to start queue listener process');
}
}
}
2 changes: 1 addition & 1 deletion tests/Support/ExtendedSimpleMessageHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
*/
final class ExtendedSimpleMessageHandler
{
public function __construct(private FileHelper $fileHelper)
public function __construct(private readonly FileHelper $fileHelper)
{
}

Expand Down
Loading
Loading