diff --git a/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php b/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php index be8de7df..7cef6572 100644 --- a/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php +++ b/src/Middleware/Consume/ConsumeMiddlewareDispatcher.php @@ -9,11 +9,11 @@ final class ConsumeMiddlewareDispatcher { /** - * Contains a middleware pipeline handler. + * Contains a middleware pipeline handlers. * - * @var MiddlewareConsumeStack|null The middleware stack. + * @var MiddlewareConsumeStack[] The middleware stack divided by message types. */ - private ?MiddlewareConsumeStack $stack = null; + private array $stack = []; /** * @var array[]|callable[]|MiddlewareConsumeInterface[]|string[] @@ -37,11 +37,12 @@ public function dispatch( ConsumeRequest $request, MessageHandlerConsumeInterface $finishHandler ): ConsumeRequest { - if ($this->stack === null) { - $this->stack = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler); + $handlerName = $request->getMessage()->getHandlerName(); + if (!array_key_exists($handlerName, $this->stack)) { + $this->stack[$handlerName] = new MiddlewareConsumeStack($this->buildMiddlewares(), $finishHandler); } - return $this->stack->handleConsume($request); + return $this->stack[$handlerName]->handleConsume($request); } /** @@ -68,7 +69,7 @@ public function withMiddlewares(array $middlewareDefinitions): self // Fixes a memory leak. unset($instance->stack); - $instance->stack = null; + $instance->stack = []; return $instance; } diff --git a/tests/Integration/MessageConsumingTest.php b/tests/Integration/MessageConsumingTest.php index e5775ea0..8e1d40e7 100644 --- a/tests/Integration/MessageConsumingTest.php +++ b/tests/Integration/MessageConsumingTest.php @@ -20,14 +20,19 @@ final class MessageConsumingTest extends TestCase { private array $messagesProcessed; + private array $messagesProcessedSecond; public function testMessagesConsumed(): void { $this->messagesProcessed = []; + $this->messagesProcessedSecond = []; $container = $this->createMock(ContainerInterface::class); $worker = new Worker( - ['test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData()], + [ + 'test' => fn (MessageInterface $message): mixed => $this->messagesProcessed[] = $message->getData(), + 'test2' => fn (MessageInterface $message): mixed => $this->messagesProcessedSecond[] = $message->getData(), + ], new NullLogger(), new Injector($container), $container, @@ -38,9 +43,11 @@ public function testMessagesConsumed(): void $messages = [1, 'foo', 'bar-baz']; foreach ($messages as $message) { $worker->process(new Message('test', $message), $this->getQueue()); + $worker->process(new Message('test2', $message), $this->getQueue()); } $this->assertEquals($messages, $this->messagesProcessed); + $this->assertEquals($messages, $this->messagesProcessedSecond); } public function testMessagesConsumedByHandlerClass(): void