Consumer needs AMQPChannel channel as only dependency.
If you have channel instance in your project already than you can skip this, otherwise lets setup rabbitmq connection, we recommend to use container for this.
$config = new \Solcloud\Consumer\QueueConfig();
$config
->setHost('solcloud_rabbitmq')
->setVhost('/')
#->setHeartbeatSec(5)
->setUsername('dev')
->setPassword('dev')
;
$connectionFactory = new \Solcloud\Consumer\QueueConnectionFactory($config);
$connection = $connectionFactory->createSocketConnection();
#(new \PhpAmqpLib\Connection\Heartbeat\PCNTLHeartbeatSender($connection))->register(); // if heartbeat and pcntl_async_signals() is available
Create channel from connection or use your own channel
/** @var \PhpAmqpLib\Channel\AMQPChannel $channel */
$channel = $connection->channel();
Create worker (consumer) class for your business logic and inject $channel dependency. You can extend AbstractConsumer for lightweight abstraction or use "solcloud standard" BaseConsumer. We will use BaseConsumer in this example
$worker = new class($channel) extends \Solcloud\Consumer\BaseConsumer {
protected function run(): void
{
// Your hard work here
echo "Processing message: " . $this->data->id . PHP_EOL;
}
};
Start consuming message from queue using blocking method wait
$worker->consume($consumeQueueName);
while ($worker->hasCallback()) {
try {
// While we have callback lets enter event loop with some timeout
$worker->wait(rand(8, 11));
} catch (\PhpAmqpLib\Exception\AMQPTimeoutException $ex) {
echo $ex->getMessage() . PHP_EOL;
}
}
$worker->closeChannel();
For message publish you can use $worker directly or use rabbitmq management plugin or different scripts
$worker->publishMessage(
$worker->createMessageHelper([], ["id" => 1]),
'',
$consumeQueueName
); // OR open rabbitmq management and publish: {"meta":[],"data":{"id":1}}
Worker can log to Psr\Log\LoggerInterface
compatible logger.
$worker->setLogger(new YourPsrLogger());
$worker->getLogger()->info('Something');
For complete example for this readme see example.php