diff --git a/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php b/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php index b63978fd..f65a95c9 100644 --- a/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php +++ b/src/Kdyby/RabbitMq/Command/BaseConsumerCommand.php @@ -43,6 +43,7 @@ protected function configure() ->addOption('messages', 'm', InputOption::VALUE_OPTIONAL, 'Messages to consume', 0) ->addOption('route', 'r', InputOption::VALUE_OPTIONAL, 'Routing Key', '') ->addOption('memory-limit', 'l', InputOption::VALUE_OPTIONAL, 'Allowed memory for this process', null) + ->addOption('time-limit', 't', InputOption::VALUE_OPTIONAL, 'Allowed time in seconds for this process', null) ->addOption('debug', 'd', InputOption::VALUE_NONE, 'Enable Debugging') ->addOption('without-signals', 'w', InputOption::VALUE_NONE, 'Disable catching of system signals'); } @@ -88,6 +89,10 @@ protected function initialize(InputInterface $input, OutputInterface $output) $this->consumer->setMemoryLimit($input->getOption('memory-limit')); } + if (!is_null($input->getOption('time-limit')) && ctype_digit((string) $input->getOption('time-limit')) && $input->getOption('time-limit') > 0) { + $this->consumer->setTimeLimit($input->getOption('time-limit')); + } + if ($routingKey = $input->getOption('route')) { $this->consumer->setRoutingKey($routingKey); } diff --git a/src/Kdyby/RabbitMq/Consumer.php b/src/Kdyby/RabbitMq/Consumer.php index 8fe5aa94..ece18ee3 100644 --- a/src/Kdyby/RabbitMq/Consumer.php +++ b/src/Kdyby/RabbitMq/Consumer.php @@ -63,7 +63,15 @@ class Consumer extends BaseConsumer */ protected $memoryLimit; + /** + * @var int $timeLimit + */ + protected $timeLimit; + /** + * @var int Start timestamp + */ + private $startTimestamp; /** * Set the memory limit @@ -88,10 +96,42 @@ public function getMemoryLimit() } + /** + * Set the time limit + * + * @param int $timeLimit + */ + public function setTimeLimit($timeLimit) + { + $this->timeLimit = $timeLimit; + } + + + + /** + * Get the time limit + * + * @return int + */ + public function getTimeLimit() + { + return $this->timeLimit; + } + + + + /** + * Registers listener to onStart event. + * @param IConsumerStartListener $listener + */ + public function addConsumerStartListener(IConsumerStartListener $listener) { + $this -> onStart []= [$listener, 'onStartListener']; + } public function consume($msgAmount) { $this->target = $msgAmount; + $this->startTimestamp = time(); $this->setupConsumer(); $this->onStart($this); @@ -153,7 +193,7 @@ public function purge() public function processMessage(AMQPMessage $msg) { - $this->onConsume($this, $msg); + $this->onConsume($this, $msg, $this->queueOptions['name'], $this->callback); try { $processFlag = call_user_func($this->callback, $msg); $this->handleProcessMessage($msg, $processFlag); @@ -198,7 +238,7 @@ protected function handleProcessMessage(AMQPMessage $msg, $processFlag) $this->consumed++; $this->maybeStopConsumer(); - if ($this->isRamAlmostOverloaded()) { + if ($this->isRamAlmostOverloaded() || $this->isTimeLimitExceeded()) { $this->stopConsuming(); } } @@ -219,4 +259,17 @@ protected function isRamAlmostOverloaded() return memory_get_usage(true) >= ($this->getMemoryLimit() * 1024 * 1024); } + + /** + * Checks if consumer running time is greater or equal for time allowed for this process + * + * @return boolean + */ + protected function isTimeLimitExceeded() { + if ($this->getTimeLimit() === NULL) { + return FALSE; + } + + return (time() - $this->startTimestamp) >= ($this->getTimeLimit()); + } } diff --git a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php index 99798c3d..cc84a4ab 100644 --- a/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php +++ b/src/Kdyby/RabbitMq/DI/RabbitMqExtension.php @@ -222,6 +222,14 @@ public function loadConfiguration() public function beforeCompile() { + $listeners = array_keys($this->getContainerBuilder()->findByType('Kdyby\RabbitMq\IConsumerStartListener')); + if ($listeners) { // setup onstart listener for every consumer if listeners are defined + foreach ($this->getContainerBuilder()->findByType('Kdyby\RabbitMq\Consumer') as $serviceDefinition) { + foreach ($listeners as $listener) { + $serviceDefinition->addSetup('addConsumerStartListener', [$listener]); + } + } + } unset($this->getContainerBuilder()->parameters[$this->name]); } diff --git a/src/Kdyby/RabbitMq/IConsumerStartListener.php b/src/Kdyby/RabbitMq/IConsumerStartListener.php new file mode 100644 index 00000000..18ad52a2 --- /dev/null +++ b/src/Kdyby/RabbitMq/IConsumerStartListener.php @@ -0,0 +1,21 @@ + + */ +interface IConsumerStartListener +{ + + /** + * + * @param Consumer $consumer Consumer currently starting + * @return NULL nothing to return + */ + public function onStartListener(Consumer $consumer); + +} diff --git a/src/Kdyby/RabbitMq/MultipleConsumer.php b/src/Kdyby/RabbitMq/MultipleConsumer.php index ba0f6481..0c83aff0 100644 --- a/src/Kdyby/RabbitMq/MultipleConsumer.php +++ b/src/Kdyby/RabbitMq/MultipleConsumer.php @@ -89,13 +89,23 @@ protected function queueDeclare() + public function stopConsuming() + { + foreach ($this->queues as $name => $options) { + $this->getChannel()->basic_cancel($this->getQueueConsumerTag($name)); + } + $this->onStop($this); + } + + + public function processQueueMessage($queueName, AMQPMessage $msg) { if (!isset($this->queues[$queueName])) { throw new QueueNotFoundException(); } - $this->onConsume($this, $msg); + $this->onConsume($this, $msg, $queueName, $this->queues[$queueName]['callback']); try { $processFlag = call_user_func($this->queues[$queueName]['callback'], $msg); $this->handleProcessMessage($msg, $processFlag);