Skip to content

Commit

Permalink
Added amqpMessage property to events for amqp (#6518)
Browse files Browse the repository at this point in the history
  • Loading branch information
huangdijia authored Feb 2, 2024
1 parent dae5588 commit cb48673
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
4 changes: 2 additions & 2 deletions src/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ protected function getCallback(ConsumerMessageInterface $consumerMessage, AMQPMe
try {
$data = $consumerMessage->unserialize($message->getBody());

$this->eventDispatcher?->dispatch(new BeforeConsume($consumerMessage));
$this->eventDispatcher?->dispatch(new BeforeConsume($consumerMessage, $message));
$result = $consumerMessage->consumeMessage($data, $message);
$this->eventDispatcher?->dispatch(new AfterConsume($consumerMessage, $result));
$this->eventDispatcher?->dispatch(new AfterConsume($consumerMessage, $result, $message));
} catch (Throwable $exception) {
$this->eventDispatcher?->dispatch(new FailToConsume($consumerMessage, $exception, $message));
if ($this->container->has(FormatterInterface::class)) {
Expand Down
8 changes: 7 additions & 1 deletion src/Event/AfterConsume.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@

use Hyperf\Amqp\Message\ConsumerMessageInterface;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;

class AfterConsume extends ConsumeEvent
{
public function __construct(ConsumerMessageInterface $message, protected Result $result)
public function __construct(ConsumerMessageInterface $message, protected Result $result, protected AMQPMessage $amqpMessage)
{
parent::__construct($message);
}
Expand All @@ -25,4 +26,9 @@ public function getResult(): Result
{
return $this->result;
}

public function getAMQPMessage(): AMQPMessage
{
return $this->amqpMessage;
}
}
12 changes: 12 additions & 0 deletions src/Event/BeforeConsume.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,18 @@
*/
namespace Hyperf\Amqp\Event;

use Hyperf\Amqp\Message\ConsumerMessageInterface;
use PhpAmqpLib\Message\AMQPMessage;

class BeforeConsume extends ConsumeEvent
{
public function __construct(ConsumerMessageInterface $message, protected AMQPMessage $amqpMessage)
{
parent::__construct($message);
}

public function getAMQPMessage(): AMQPMessage
{
return $this->amqpMessage;
}
}

0 comments on commit cb48673

Please sign in to comment.