Skip to content

Commit

Permalink
Add AdapterFactoryQueueProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
vjik committed Nov 6, 2024
1 parent 97689da commit 9bbe358
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 2 deletions.
84 changes: 84 additions & 0 deletions src/Provider/AdapterFactoryQueueProvider.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Provider;

use Psr\Container\ContainerInterface;
use Yiisoft\Definitions\Exception\InvalidConfigException;
use Yiisoft\Factory\StrictFactory;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\QueueInterface;

use function array_key_exists;
use function sprintf;

final class AdapterFactoryQueueProvider implements QueueProviderInterface
{
/**
* @psalm-var array<string, QueueInterface|null>
*/
private array $queues = [];

private readonly StrictFactory $factory;

/**
* @psalm-param array<string, mixed> $definitions
* @throws InvalidQueueConfigException
*/
public function __construct(
private readonly QueueInterface $baseQueue,
array $definitions,
?ContainerInterface $container = null,
bool $validate = true,
) {
try {
$this->factory = new StrictFactory($definitions, $container, $validate);
} catch (InvalidConfigException $exception) {
throw new InvalidQueueConfigException($exception->getMessage(), previous: $exception);
}
}

public function get(string $channel): QueueInterface
{
$queue = $this->getOrTryCreate($channel);
if ($queue === null) {
throw new ChannelNotFoundException($channel);
}
return $queue;
}

public function has(string $channel): bool
{
return $this->factory->has($channel);
}

/**
* @throws InvalidQueueConfigException
*/
private function getOrTryCreate(string $channel): QueueInterface|null
{
if (array_key_exists($channel, $this->queues)) {
return $this->queues[$channel];
}

if ($this->factory->has($channel)) {
$adapter = $this->factory->create($channel);
if (!$adapter instanceof AdapterInterface) {
throw new InvalidQueueConfigException(
sprintf(
'Adapter must implement "%s". For channel "%s" got "%s" instead.',
AdapterInterface::class,
$channel,
get_debug_type($adapter),
),
);
}
$this->queues[$channel] = $this->baseQueue->withAdapter($adapter)->withChannelName($channel);
} else {
$this->queues[$channel] = null;
}

return $this->queues[$channel];
}
}
13 changes: 12 additions & 1 deletion src/StubQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ final class StubQueue implements QueueInterface
{
public function __construct(
private string $channelName = Queue::DEFAULT_CHANNEL_NAME,
private ?AdapterInterface $adapter = null,
) {
}

Expand All @@ -37,9 +38,16 @@ public function status(int|string $id): JobStatus
return JobStatus::done();
}

public function getAdapter(): ?AdapterInterface
{
return $this->adapter;
}

public function withAdapter(AdapterInterface $adapter): QueueInterface
{
return clone $this;
$new = clone $this;
$new->adapter = $adapter;
return $new;
}

public function getChannelName(): string
Expand All @@ -51,6 +59,9 @@ public function withChannelName(string $channel): QueueInterface
{
$new = clone $this;
$new->channelName = $channel;
if ($new->adapter !== null) {
$new->adapter = $new->adapter->withChannel($channel);
}
return $new;
}
}
103 changes: 103 additions & 0 deletions tests/Unit/Provider/AdapterFactoryQueueProviderTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?php

declare(strict_types=1);

namespace Provider;

use PHPUnit\Framework\TestCase;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Adapter\StubAdapter;
use Yiisoft\Queue\Cli\StubLoop;
use Yiisoft\Queue\Provider\AdapterFactoryQueueProvider;
use Yiisoft\Queue\Provider\ChannelNotFoundException;
use Yiisoft\Queue\Provider\InvalidQueueConfigException;
use Yiisoft\Queue\StubQueue;

use function sprintf;

final class AdapterFactoryQueueProviderTest extends TestCase
{
public function testBase(): void
{
$provider = new AdapterFactoryQueueProvider(
new StubQueue(),
[
'channel1' => StubAdapter::class,
],
);

$queue = $provider->get('channel1');

$this->assertInstanceOf(StubQueue::class, $queue);
$this->assertSame('channel1', $queue->getChannelName());
$this->assertInstanceOf(StubAdapter::class, $queue->getAdapter());
$this->assertTrue($provider->has('channel1'));
$this->assertFalse($provider->has('not-exist-channel'));
}

public function testGetTwice(): void
{
$provider = new AdapterFactoryQueueProvider(
new StubQueue(),
[
'channel1' => StubAdapter::class,
],
);

$queue1 = $provider->get('channel1');
$queue2 = $provider->get('channel1');

$this->assertSame($queue1, $queue2);
}

public function testGetNotExistChannel(): void
{
$provider = new AdapterFactoryQueueProvider(
new StubQueue(),
[
'channel1' => StubAdapter::class,
],
);

$this->expectException(ChannelNotFoundException::class);
$this->expectExceptionMessage('Channel "not-exist-channel" not found.');
$provider->get('not-exist-channel');
}

public function testInvalidQueueConfig(): void
{
$baseQueue = new StubQueue();
$definitions = [
'channel1' => [
'class' => StubAdapter::class,
'__construct()' => 'hello',
],
];

$this->expectException(InvalidQueueConfigException::class);
$this->expectExceptionMessage(
'Invalid definition: incorrect constructor arguments. Expected array, got string.'
);
new AdapterFactoryQueueProvider($baseQueue, $definitions);
}

public function testInvalidQueueConfigOnGet(): void
{
$provider = new AdapterFactoryQueueProvider(
new StubQueue(),
[
'channel1' => StubLoop::class,
]
);

$this->expectException(InvalidQueueConfigException::class);
$this->expectExceptionMessage(
sprintf(
'Adapter must implement "%s". For channel "channel1" got "%s" instead.',
AdapterInterface::class,
StubLoop::class,
)
);
$provider->get('channel1');
}
}
4 changes: 3 additions & 1 deletion tests/Unit/Provider/QueueFactoryQueueProviderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\StubQueue;

use function sprintf;

final class QueueFactoryQueueProviderTest extends TestCase
{
public function testBase(): void
Expand Down Expand Up @@ -44,7 +46,7 @@ public function testGetTwice(): void
$this->assertSame($queue1, $queue2);
}

public function testGetNotExistChannel()
public function testGetNotExistChannel(): void
{
$provider = new QueueFactoryQueueProvider(
[
Expand Down

0 comments on commit 9bbe358

Please sign in to comment.