diff --git a/src/Commanding/CommandProcessor.php b/src/Commanding/CommandProcessor.php index 027e419..305a68b 100644 --- a/src/Commanding/CommandProcessor.php +++ b/src/Commanding/CommandProcessor.php @@ -240,13 +240,27 @@ public function __invoke(GenericJsonSchemaCommand $command) } [$eventName, $payload] = $event; + $metadata = []; + + if (array_key_exists(2, $event)) { + $metadata = $event[2]; + if (! is_array($metadata)) { + throw new \RuntimeException(sprintf( + 'Event returned by aggregate of type %s while handling command %s contains additional metadata but metadata type is not array. Detected type is: %s', + $this->aggregateType, + $this->commandName, + (is_object($metadata) ? get_class($metadata) : gettype($metadata)) + )); + } + } + /** @var GenericJsonSchemaEvent $event */ $event = $this->messageFactory->createMessageFromArray($eventName, [ 'payload' => $payload, - 'metadata' => [ + 'metadata' => array_merge([ '_causation_id' => $command->uuid()->toString(), '_causation_name' => $this->commandName, - ], + ], $metadata), ]); $aggregate->recordThat($event); diff --git a/tests/Commanding/CommandProcessorTest.php b/tests/Commanding/CommandProcessorTest.php index 15dc5c1..4c9bac6 100644 --- a/tests/Commanding/CommandProcessorTest.php +++ b/tests/Commanding/CommandProcessorTest.php @@ -360,4 +360,73 @@ public function it_provides_context_using_context_provider() 'context' => ['msg' => 'it works'], ], $event->payload()); } + + /** + * @test + */ + public function it_adds_additional_metadata_to_event() + { + $eventMachine = new EventMachine(); + + $eventMachine->load(MessageDescription::class); + $eventMachine->load(CacheableUserDescription::class); + + $container = $this->prophesize(ContainerInterface::class); + + $eventMachine->initialize($container->reveal()); + + $config = $eventMachine->compileCacheableConfig(); + + $commandRouting = $config['compiledCommandRouting']; + $aggregateDescriptions = $config['aggregateDescriptions']; + + $recordedEvents = []; + + $eventStore = $this->prophesize(EventStore::class); + + $eventStore->appendTo(new StreamName('event_stream'), Argument::any())->will(function ($args) use (&$recordedEvents) { + $recordedEvents = iterator_to_array($args[1]); + }); + + $processorDesc = $commandRouting[Command::REGISTER_USER]; + $processorDesc['eventApplyMap'] = $aggregateDescriptions[Aggregate::USER]['eventApplyMap']; + + $arFunc = $processorDesc['aggregateFunction']; + + //Wrap ar function to add additional metadata for this test + $processorDesc['aggregateFunction'] = function (Message $registerUser) use ($arFunc): \Generator { + [$event] = iterator_to_array($arFunc($registerUser)); + [$eventName, $payload] = $event; + yield [$eventName, $payload, ['additional' => 'metadata']]; + }; + + $commandProcessor = CommandProcessor::fromDescriptionArrayAndDependencies( + $processorDesc, + $this->getMockedEventMessageFactory(), + $eventStore->reveal() + ); + + $userId = Uuid::uuid4()->toString(); + + $registerUser = $this->getMockedCommandMessageFactory()->createMessageFromArray(Command::REGISTER_USER, [ + UserDescription::IDENTIFIER => $userId, + UserDescription::USERNAME => 'Alex', + UserDescription::EMAIL => 'contact@prooph.de', + ]); + + $commandProcessor($registerUser); + + self::assertCount(1, $recordedEvents); + /** @var GenericJsonSchemaEvent $event */ + $event = $recordedEvents[0]; + self::assertEquals(Event::USER_WAS_REGISTERED, $event->messageName()); + self::assertEquals([ + '_causation_id' => $registerUser->uuid()->toString(), + '_causation_name' => $registerUser->messageName(), + '_aggregate_version' => 1, + '_aggregate_id' => $userId, + '_aggregate_type' => 'User', + 'additional' => 'metadata', + ], $event->metadata()); + } }