From 1f2cbae852aa67ce9ddb266bbab9e5a9a2de8b0b Mon Sep 17 00:00:00 2001 From: Oleg Sherbakov Date: Fri, 8 Nov 2024 22:56:07 +0400 Subject: [PATCH 1/2] fixed according to issue. --- .../Consume/ConsumeMiddlewareDispatcher.php | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php b/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php index be8de7df..7cef6572 100644 --- a/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php +++ b/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php @@ -9,11 +9,11 @@ final class ConsumeMiddlewareDispatcher { /** - * Contains a middleware pipeline handler. + * Contains a middleware pipeline handlers. * - * @var MiddlewareConsumeStack|null The middleware stack. + * @var MiddlewareConsumeStack[] The middleware stack divided by message types. */ - private ?MiddlewareConsumeStack $stack = null; + private array $stack = []; /** * @var array[]|callable[]|MiddlewareConsumeInterface[]|string[] @@ -37,11 +37,12 @@ public function dispatch( ConsumeRequest $request, MessageHandlerConsumeInterface $finishHandler ): ConsumeRequest { - if ($this->stack === null) { - $this->stack = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler); + $handlerName = $request->getMessage()->getHandlerName(); + if (!array_key_exists($handlerName, $this->stack)) { + $this->stack[$handlerName] = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler); } - return $this->stack->handleConsume($request); + return $this->stack[$handlerName]->handleConsume($request); } /** @@ -68,7 +69,7 @@ public function withMiddlewares(array $middlewareDefinitions): self // Fixes a memory leak. unset($instance->stack); - $instance->stack = null; + $instance->stack = []; return $instance; } From 8f21a91cd8860ce4e4fc46301a2cda80e0a1c752 Mon Sep 17 00:00:00 2001 From: viktorprogger Date: Sun, 8 Dec 2024 01:47:05 +0500 Subject: [PATCH 2/2] Improve tests --- tests/Integration/MessageConsumingTest.php | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/tests/Integration/MessageConsumingTest.php b/tests/Integration/MessageConsumingTest.php index 6c30ba92..0622fce5 100644 --- a/tests/Integration/MessageConsumingTest.php +++ b/tests/Integration/MessageConsumingTest.php @@ -19,14 +19,19 @@ final class MessageConsumingTest extends TestCase { private array $messagesProcessed; + private array $messagesProcessedSecond; public function testMessagesConsumed(): void { $this->messagesProcessed = []; + $this->messagesProcessedSecond = []; $container = $this->createMock(ContainerInterface::class); $worker = new Worker( - ['test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData()], + [ + 'test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData(), + 'test2' => fn (MessageInterface $message): mixed => $this->messagesProcessedSecond[] = $message->getData(), + ], new NullLogger(), new Injector($container), $container, @@ -37,8 +42,10 @@ public function testMessagesConsumed(): void $messages = [1, 'foo', 'bar-baz']; foreach ($messages as $message) { $worker->process(new Message('test', $message), $this->getQueue()); + $worker->process(new Message('test2', $message), $this->getQueue()); } $this->assertEquals($messages, $this->messagesProcessed); + $this->assertEquals($messages, $this->messagesProcessedSecond); } }