Skip to content

Commit

Permalink
Replace Adapter with Queue
Browse files Browse the repository at this point in the history
  • Loading branch information
xepozz committed Jan 14, 2024
1 parent beaf64b commit cc5309b
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 65 deletions.
2 changes: 1 addition & 1 deletion src/Middleware/AdapterHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ final class AdapterHandler implements MessageHandlerInterface
{
public function handle(Request $request): Request
{
if (($adapter = $request->getAdapter()) === null) {
if (($adapter = $request->getQueue()?->getAdapter()) === null) {
throw new AdapterNotConfiguredException();
}
return $request->withMessage($adapter->push($request->getMessage()));
Expand Down
21 changes: 0 additions & 21 deletions src/Middleware/FailureHandling/FailureHandlingRequest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,4 @@ public function getException(): ?Throwable
{
return $this->exception;
}

public function getQueue(): QueueInterface
{
return $this->queue;
}

public function withException(Throwable $exception): self
{
$instance = clone $this;
$instance->exception = $exception;

return $instance;
}

public function withQueue(QueueInterface $queue): self
{
$instance = clone $this;
$instance->queue = $queue;

return $instance;
}
}
15 changes: 3 additions & 12 deletions src/Middleware/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@
//final class Request
class Request
{
protected ?QueueInterface $queue = null;
public function __construct(private MessageInterface $message, private ?AdapterInterface $adapter)
public function __construct(private MessageInterface $message, private ?QueueInterface $queue)
{
}

Expand All @@ -22,11 +21,6 @@ public function getMessage(): MessageInterface
return $this->message;
}

public function getAdapter(): ?AdapterInterface
{
return $this->adapter;
}

public function withMessage(MessageInterface $message): self
{
$instance = clone $this;
Expand All @@ -35,12 +29,9 @@ public function withMessage(MessageInterface $message): self
return $instance;
}

public function withAdapter(AdapterInterface $adapter): self
public function getQueue(): ?QueueInterface
{
$instance = clone $this;
$instance->adapter = $adapter;

return $instance;
return $this->queue;
}

public function withQueue(QueueInterface $queue): self
Expand Down
4 changes: 2 additions & 2 deletions src/Middleware/SendAgainMiddleware.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ public function process(Request $request, MessageHandlerInterface $handler): Req
$envelope = new FailureEnvelope($message, $this->createMeta($message));
$envelope = $this->queue->push($envelope);

$request1 = $request->withMessage($envelope);
return $request1->withQueue($this->queue);
$request = $request->withMessage($envelope);
return $request->withQueue($this->queue);
}

return $handler->handle($request);
Expand Down
2 changes: 1 addition & 1 deletion src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function push(
['data' => $message->getData(), 'metadata' => $message->getMetadata()]
);

$request = new Request($message, $this->adapter);
$request = new Request($message, $this);
$message = $this->pushMiddlewareDispatcher
->dispatch($request, $this->createHandler($middlewareDefinitions))
->getMessage();
Expand Down
2 changes: 1 addition & 1 deletion src/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public function process(MessageInterface $message, QueueInterface $queue): Messa
));
}

$request = new Request($message, $queue->getAdapter());
$request = new Request($message, $queue);
$closure = fn (MessageInterface $message): mixed => $this->injector->invoke([$handler, 'handle'], [$message]);
try {
$result = $this->consumeMiddlewareDispatcher->dispatch($request, new ConsumeFinalHandler($closure));
Expand Down
64 changes: 64 additions & 0 deletions tests/App/FakeQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?php

declare(strict_types=1);

namespace Yiisoft\Queue\Tests\App;

use Exception;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Middleware\MiddlewareInterface;
use Yiisoft\Queue\QueueInterface;

final class FakeQueue implements QueueInterface
{
private ?AdapterInterface $adapter = null;

public function __construct(private string $channelName)
{
}

public function push(
MessageInterface $message,
string|array|callable|MiddlewareInterface ...$middlewareDefinitions
): MessageInterface {
return $message;
}

public function run(int $max = 0): void
{
}

public function listen(): void
{
}

public function status(string|int $id): JobStatus
{
throw new Exception('`status()` method is not implemented yet.');
}

public function withAdapter(AdapterInterface $adapter): self
{
$new = clone $this;
$new->adapter = $adapter;

return $new;
}

public function getChannelName(): string
{
return $this->channelName;
}

public function withChannelName(string $channel): self
{
throw new Exception('`withChannelName()` method is not implemented yet.');
}

public function getAdapter(): ?AdapterInterface
{
return $this->adapter;
}
}
8 changes: 4 additions & 4 deletions tests/Unit/Middleware/Consume/ConsumeRequestTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,20 @@

namespace Yiisoft\Queue\Tests\Unit\Middleware\Consume;

use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Message\Message;
use Yiisoft\Queue\Middleware\Request;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Tests\TestCase;

final class ConsumeRequestTest extends TestCase
{
public function testImmutable(): void
{
$message = new Message('test');
$adapter = $this->createMock(AdapterInterface::class);
$consumeRequest = new Request($message, $adapter);
$queue = $this->createMock(QueueInterface::class);
$consumeRequest = new Request($message, $queue);

$this->assertNotSame($consumeRequest, $consumeRequest->withMessage($message));
$this->assertNotSame($consumeRequest, $consumeRequest->withAdapter($adapter));
$this->assertNotSame($consumeRequest, $consumeRequest->withQueue($queue));
}
}
7 changes: 3 additions & 4 deletions tests/Unit/Middleware/Consume/MiddlewareDispatcherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ public function testCallableMiddlewareCalled(): void
{
$request = $this->getRequest();
$queue = $this->createMock(QueueInterface::class);
$adapter = $this->createMock(AdapterInterface::class);

$dispatcher = $this->createDispatcher()->withMiddlewares(
[
static function (Request $request) use ($adapter): Request {
return $request->withMessage(new Message('New closure test data'))->withAdapter($adapter);
static function (Request $request) use ($queue): Request {
return $request->withMessage(new Message('New closure test data'))->withQueue($queue);
},
]
);
Expand Down Expand Up @@ -180,7 +179,7 @@ private function getRequest(): Request
{
return new Request(
new Message('data'),
$this->createMock(AdapterInterface::class)
$this->createMock(QueueInterface::class)
);
}
}
5 changes: 3 additions & 2 deletions tests/Unit/Middleware/Consume/MiddlewareFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Tests\Integration\Support\ConsumeMiddleware;
use Yiisoft\Queue\Tests\Unit\Middleware\Support\TestCallableMiddleware;
use Yiisoft\Test\Support\Container\SimpleContainer;
Expand Down Expand Up @@ -58,7 +59,7 @@ public function testCreateFromClosureResponse(): void
$middleware = $this->getMiddlewareFactory($container)->createMiddleware(
fn (): Request => new Request(
new Message('test data'),
$this->createMock(AdapterInterface::class),
$this->createMock(QueueInterface::class),
)
);
self::assertSame(
Expand Down Expand Up @@ -201,7 +202,7 @@ private function getRequest(): Request
{
return new Request(
new Message(['data']),
$this->createMock(AdapterInterface::class)
$this->createMock(QueueInterface::class)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private function getRequest(): Request
{
return new Request(
new Message('data'),
$this->createMock(AdapterInterface::class)
$this->createMock(QueueInterface::class)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Yiisoft\Queue\Middleware\MiddlewareFactoryInterface;
use Yiisoft\Queue\Middleware\MiddlewareInterface;
use Yiisoft\Queue\Middleware\Request;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Test\Support\Container\SimpleContainer;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Message\Message;
Expand Down Expand Up @@ -182,7 +183,7 @@ private function getConsumeRequest(): Request
{
return new Request(
new Message('data'),
$this->createMock(AdapterInterface::class)
$this->createMock(QueueInterface::class)
);
}
}
34 changes: 26 additions & 8 deletions tests/Unit/Middleware/MiddlewareDispatcherTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Queue\Tests\App\FakeQueue;
use Yiisoft\Test\Support\Container\SimpleContainer;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Message\Message;
Expand All @@ -26,9 +28,15 @@ public function testCallableMiddlewareCalled(): void

$dispatcher = $this->createDispatcher()->withMiddlewares(
[
static fn (Request $request, AdapterInterface $adapter): Request => $request
static fn (Request $request): Request => $request
->withMessage(new Message('New closure test data'))
->withAdapter($adapter->withChannel('closure-channel')),
->withQueue(
$request->getQueue()->getAdapter() === null
? $request->getQueue()
: $request->getQueue()->withAdapter(
$request->getQueue()->getAdapter()->withChannel('closure-channel')
)
),
]
);

Expand All @@ -38,7 +46,7 @@ public function testCallableMiddlewareCalled(): void
* @psalm-suppress NoInterfaceProperties
* @psalm-suppress PossiblyNullPropertyFetch
*/
$this->assertSame('closure-channel', $request->getAdapter()->channel);
$this->assertSame('closure-channel', $request->getQueue()->getAdapter()->channel);
}

public function testArrayMiddlewareCallableDefinition(): void
Expand Down Expand Up @@ -69,8 +77,6 @@ public function testFactoryArrayDefinition(): void

public function testMiddlewareFullStackCalled(): void
{
$request = $this->getRequest();

$middleware1 = static function (Request $request, MessageHandlerInterface $handler): Request {
$request = $request->withMessage($request->getMessage()->withData('new test data'));

Expand All @@ -82,20 +88,31 @@ public function testMiddlewareFullStackCalled(): void
*
* @psalm-suppress PossiblyNullReference
*/
$request = $request->withAdapter($request->getAdapter()->withChannel('new channel'));
$queue = $request->getQueue();
if ($queue !== null && $queue->getAdapter() !== null) {
$request = $request->withQueue(
$queue->withAdapter(
$queue->getAdapter()->withChannel('new channel')
)
);
}

return $handler->handle($request);
};

$dispatcher = $this->createDispatcher()->withMiddlewares([$middleware1, $middleware2]);

$request = $this->getRequest();
$request = $dispatcher->dispatch($request, $this->getRequestHandler());
$this->assertSame('new test data', $request->getMessage()->getData());
/**
* @psalm-suppress NoInterfaceProperties
* @psalm-suppress PossiblyNullPropertyFetch
*/
$this->assertSame('new channel', $request->getAdapter()->channel);
$this->assertNotNull($request->getQueue());
$this->assertNotNull($request->getQueue()->getAdapter());
$this->assertInstanceOf(FakeAdapter::class, $request->getQueue()->getAdapter());
$this->assertSame('new channel', $request->getQueue()->getAdapter()->channel);
}

public function testMiddlewareStackInterrupted(): void
Expand Down Expand Up @@ -185,6 +202,7 @@ private function createContainer(array $instances = []): ContainerInterface

private function getRequest(): Request
{
return new Request(new Message('data'), new FakeAdapter());
$queue = new FakeQueue('chan1');
return new Request(new Message('data'), $queue->withAdapter(new FakeAdapter()));
}
}
8 changes: 5 additions & 3 deletions tests/Unit/Middleware/MiddlewareFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use PHPUnit\Framework\TestCase;
use Psr\Container\ContainerInterface;
use Yiisoft\Queue\QueueInterface;
use Yiisoft\Test\Support\Container\SimpleContainer;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Message\Message;
Expand Down Expand Up @@ -46,9 +47,10 @@ public function testCreateCallableFromArray(): void
public function testCreateFromClosureResponse(): void
{
$container = $this->getContainer([TestCallableMiddleware::class => new TestCallableMiddleware()]);
$queue = $this->createMock(QueueInterface::class);
$middleware = $this->getMiddlewareFactory($container)->createMiddleware(
static function (): Request {
return new Request(new Message('test data'), new FakeAdapter());
static function () use($queue): Request {
return new Request(new Message('test data'), $queue);
}
);
self::assertSame(
Expand Down Expand Up @@ -167,6 +169,6 @@ public function handle(Request $request): Request

private function getRequest(): Request
{
return new Request(new Message('data'), new FakeAdapter());
return new Request(new Message('data'), $this->createMock(QueueInterface::class));
}
}
Loading

0 comments on commit cc5309b

Please sign in to comment.