diff --git a/.travis.yml b/.travis.yml index 4874175322..47a30d0912 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,9 @@ php: matrix: include: - php: 5.5 - env: EXCLUDE_AMQP_INTEROP=true + env: + - EXCLUDE_AMQP_INTEROP=true + - EXCLUDE_STOMP=true fast_finish: true @@ -19,6 +21,7 @@ services: - postgresql - redis-server - rabbitmq + - docker # cache vendor dirs cache: @@ -30,12 +33,15 @@ before_install: - sudo apt-get install -qq beanstalkd - sudo beanstalkd -v - sudo service beanstalkd start + - docker pull webcenter/activemq + - docker run -d -p 61613:61613 webcenter/activemq - if [[ ${TRAVIS_PHP_VERSION:0:1} == "5" ]]; then pecl install igbinary-2.0.8; else pecl install igbinary; fi install: - travis_retry composer self-update && composer --version - export PATH="$HOME/.composer/vendor/bin:$PATH" - if [ "$EXCLUDE_AMQP_INTEROP" = true ]; then travis_retry composer remove "enqueue/amqp-lib" "enqueue/amqp-tools" --dev --no-interaction --no-update; fi + - if [ "$EXCLUDE_STOMP" = true ]; then travis_retry composer remove "enqueue/stomp" --dev --no-interaction --no-update; fi - travis_retry composer install --prefer-dist --no-interaction before_script: diff --git a/CHANGELOG.md b/CHANGELOG.md index 485ba78c72..5e60e2d8b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,10 @@ Yii2 Queue Extension Change Log =============================== -2.2.2 under development +2.3.0 under development ----------------------- -- no changes in this release. +- Enh #260: Added STOMP driver (versh23) 2.2.1 May 21, 2019 diff --git a/README.md b/README.md index 74a974e262..83f5d26319 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ An extension for running tasks asynchronously via queues. -It supports queues based on **DB**, **Redis**, **RabbitMQ**, **AMQP**, **Beanstalk** and **Gearman**. +It supports queues based on **DB**, **Redis**, **RabbitMQ**, **AMQP**, **Beanstalk**, **ActiveMQ** and **Gearman**. Documentation is at [docs/guide/README.md](docs/guide/README.md). diff --git a/composer.json b/composer.json index 032db57c8b..f38e0f6b6d 100644 --- a/composer.json +++ b/composer.json @@ -29,7 +29,8 @@ "yiisoft/yii2-debug": "*", "yiisoft/yii2-gii": "*", "phpunit/phpunit": "~4.4", - "aws/aws-sdk-php": ">=2.4" + "aws/aws-sdk-php": ">=2.4", + "enqueue/stomp": "^0.8.39" }, "suggest": { "ext-pcntl": "Need for process signals.", @@ -38,7 +39,8 @@ "php-amqplib/php-amqplib": "Need for AMQP queue.", "enqueue/amqp-lib": "Need for AMQP interop queue.", "ext-gearman": "Need for Gearman queue.", - "aws/aws-sdk-php": "Need for aws SQS." + "aws/aws-sdk-php": "Need for aws SQS.", + "enqueue/stomp": "Need for Stomp queue." }, "autoload": { "psr-4": { @@ -51,7 +53,8 @@ "yii\\queue\\gearman\\": "src/drivers/gearman", "yii\\queue\\redis\\": "src/drivers/redis", "yii\\queue\\sync\\": "src/drivers/sync", - "yii\\queue\\sqs\\": "src/drivers/sqs" + "yii\\queue\\sqs\\": "src/drivers/sqs", + "yii\\queue\\stomp\\": "src/drivers/stomp" } }, "autoload-dev": { diff --git a/docs/guide-ru/README.md b/docs/guide-ru/README.md index 30bd03b218..f14100c17f 100644 --- a/docs/guide-ru/README.md +++ b/docs/guide-ru/README.md @@ -22,6 +22,7 @@ * [Beanstalk драйвер](driver-beanstalk.md) * [Gearman драйвер](driver-gearman.md) * [AWS SQS драйвер](driver-sqs.md) +* [Stomp драйвер](driver-stomp.md) Инструменты разработчика ------------------------ diff --git a/docs/guide-ru/driver-stomp.md b/docs/guide-ru/driver-stomp.md new file mode 100644 index 0000000000..a813b2a305 --- /dev/null +++ b/docs/guide-ru/driver-stomp.md @@ -0,0 +1,43 @@ +Stomp драйвер +================ + +Драйвер работает с очередью на базе ActiveMQ. + +В приложении должно быть установлено расширение `enqueue/stomp`. + +Пример настройки: + +```php +return [ + 'bootstrap' => [ + 'queue', // Компонент регистрирует свои консольные команды + ], + 'components' => [ + 'queue' => [ + 'class' => \yii\queue\stomp\Queue::class, + 'host' => 'localhost', + 'port' => 61613, + 'queueName' => 'queue', + ], + ], +]; +``` + +Консоль +------- + +Для обработки очереди используются консольные команды. + +```sh +yii queue/listen +``` + +Команда `listen` запускает обработку очереди в режиме демона. Очередь опрашивается непрерывно. +Если добавляются новые задания, то они сразу же извлекаются и выполняются. Способ наиболее эфективен +если запускать команду через [supervisor](worker.md#supervisor) или [systemd](worker.md#systemd). + +Для команды `listen` доступны следующие опции: + +- `--verbose`, `-v`: состояние обработки заданий выводится в консоль. +- `--isolate`: каждое задание выполняется в отдельном дочернем процессе. +- `--color`: подсветка вывода в режиме `--verbose`. diff --git a/docs/guide/README.md b/docs/guide/README.md index c48c99898a..0811c2dd66 100644 --- a/docs/guide/README.md +++ b/docs/guide/README.md @@ -22,6 +22,7 @@ Queue Drivers * [Beanstalk](driver-beanstalk.md) * [Gearman](driver-gearman.md) * [AWS SQS](driver-sqs.md) +* [Stomp](driver-stomp.md) Developer tools --------------- diff --git a/docs/guide/driver-stomp.md b/docs/guide/driver-stomp.md new file mode 100644 index 0000000000..6d33983c71 --- /dev/null +++ b/docs/guide/driver-stomp.md @@ -0,0 +1,39 @@ +Stomp Driver +=============== + + +This driver works with ActiveMQ queues. + +It requires the `enqueue/stomp` package. + +Configuration example: + +```php +return [ + 'bootstrap' => [ + 'queue', // The component registers its own console commands + ], + 'components' => [ + 'queue' => [ + 'class' => \yii\queue\stomp\Queue::class, + 'host' => 'localhost', + 'port' => 61613, + 'queueName' => 'queue', + ], + ], +]; +``` + +Console +------- + +A console command is used to execute queued jobs. + +```sh +yii queue/listen [timeout] +``` + +The `listen` command launches a daemon which infinitely queries the queue. If there are new tasks +they're immediately obtained and executed. The `timeout` parameter specifies the number of seconds to sleep between +querying the queue. This method is most efficient when the command is properly daemonized via +[supervisor](worker.md#supervisor) or [systemd](worker.md#systemd). diff --git a/src/drivers/stomp/Command.php b/src/drivers/stomp/Command.php new file mode 100644 index 0000000000..63844279b7 --- /dev/null +++ b/src/drivers/stomp/Command.php @@ -0,0 +1,66 @@ + + * @since 2.3.0 + */ +class Command extends CliCommand +{ + /** + * @var Queue + */ + public $queue; + + + /** + * @inheritdoc + */ + protected function isWorkerAction($actionID) + { + return in_array($actionID, ['run', 'listen']); + } + + + /** + * Runs all jobs from stomp-queue. + * It can be used as cron job. + * + * @return null|int exit code. + */ + public function actionRun() + { + return $this->queue->run(false); + } + + /** + * Listens stomp-queue and runs new jobs. + * It can be used as daemon process. + * + * @param int $timeout number of seconds to wait a job. + * @throws Exception when params are invalid. + * @return null|int exit code. + */ + public function actionListen($timeout = 3) + { + if (!is_numeric($timeout)) { + throw new Exception('Timeout must be numeric.'); + } + if ($timeout < 1) { + throw new Exception('Timeout must be greater that zero.'); + } + + return $this->queue->run(true, $timeout); + } +} diff --git a/src/drivers/stomp/Queue.php b/src/drivers/stomp/Queue.php new file mode 100644 index 0000000000..67a6167314 --- /dev/null +++ b/src/drivers/stomp/Queue.php @@ -0,0 +1,292 @@ + + * @since 2.3.0 + */ +class Queue extends CliQueue +{ + const ATTEMPT = 'yii-attempt'; + const TTR = 'yii-ttr'; + + /** + * The message queue broker's host. + * + * @var string|null + */ + public $host; + /** + * The message queue broker's port. + * + * @var string|null + */ + public $port; + /** + * This is user which is used to login on the broker. + * + * @var string|null + */ + public $user; + /** + * This is password which is used to login on the broker. + * + * @var string|null + */ + public $password; + /** + * Sets an fixed vhostname, which will be passed on connect as header['host']. + * + * @var string|null + */ + public $vhost; + /** + * @var int + */ + public $bufferSize; + /** + * @var int + */ + public $connectionTimeout; + /** + * Perform request synchronously. + * @var bool + */ + public $sync; + /** + * The connection will be established as later as possible if set true. + * + * @var bool|null + */ + public $lazy; + /** + * Defines whether secure connection should be used or not. + * + * @var bool|null + */ + public $sslOn; + /** + * The queue used to consume messages from. + * + * @var string + */ + public $queueName = 'stomp_queue'; + /** + * The property contains a command class which used in cli. + * + * @var string command class name + */ + public $commandClass = Command::class; + + /** + * Set the read timeout. + * @var int + */ + public $readTimeOut = 0; + + /** + * @var StompContext + */ + protected $context; + + /** + * @inheritdoc + */ + public function init() + { + parent::init(); + Event::on(BaseApp::class, BaseApp::EVENT_AFTER_REQUEST, function () { + $this->close(); + }); + } + + /** + * Opens connection. + */ + protected function open() + { + if ($this->context) { + return; + } + + $config = [ + 'host' => $this->host, + 'port' => $this->port, + 'login' => $this->user, + 'password' => $this->password, + 'vhost' => $this->vhost, + 'buffer_size' => $this->bufferSize, + 'connection_timeout' => $this->connectionTimeout, + 'sync' => $this->sync, + 'lazy' => $this->lazy, + 'ssl_on' => $this->sslOn, + ]; + + $config = array_filter($config, function ($value) { + return null !== $value; + }); + + $factory = new StompConnectionFactory($config); + + $this->context = $factory->createContext(); + } + + /** + * Listens queue and runs each job. + * + * @param $repeat + * @param int $timeout + * @return int|null + */ + public function run($repeat, $timeout = 0) + { + return $this->runWorker(function (callable $canContinue) use ($repeat, $timeout) { + $this->open(); + $queue = $this->createQueue($this->queueName); + $consumer = $this->context->createConsumer($queue); + + while ($canContinue()) { + if ($message = ($this->readTimeOut > 0 ? $consumer->receive($this->readTimeOut) : $consumer->receiveNoWait())) { + $messageId = $message->getMessageId(); + if (!$messageId) { + $message = $this->setMessageId($message); + } + + if ($message->isRedelivered()) { + $consumer->acknowledge($message); + + $this->redeliver($message); + + continue; + } + + $ttr = $message->getProperty(self::TTR, $this->ttr); + $attempt = $message->getProperty(self::ATTEMPT, 1); + + if ($this->handleMessage($message->getMessageId(), $message->getBody(), $ttr, $attempt)) { + $consumer->acknowledge($message); + } else { + $consumer->acknowledge($message); + + $this->redeliver($message); + } + } elseif (!$repeat) { + break; + } elseif ($timeout) { + sleep($timeout); + $this->context->getStomp()->getConnection()->sendAlive(); + } + } + }); + } + + /** + * @param StompMessage $message + * @return StompMessage + * @throws \Interop\Queue\Exception + */ + protected function setMessageId(StompMessage $message) + { + $message->setMessageId(uniqid('', true)); + return $message; + } + + /** + * @inheritdoc + * @throws \Interop\Queue\Exception + * @throws NotSupportedException + */ + protected function pushMessage($message, $ttr, $delay, $priority) + { + $this->open(); + + $queue = $this->createQueue($this->queueName); + $message = $this->context->createMessage($message); + $message = $this->setMessageId($message); + $message->setPersistent(true); + $message->setProperty(self::ATTEMPT, 1); + $message->setProperty(self::TTR, $ttr); + + $producer = $this->context->createProducer(); + + if ($delay) { + throw new NotSupportedException('Delayed work is not supported in the driver.'); + } + + if ($priority) { + throw new NotSupportedException('Job priority is not supported in the driver.'); + } + + $producer->send($queue, $message); + + return $message->getMessageId(); + } + + /** + * Closes connection. + */ + protected function close() + { + if (!$this->context) { + return; + } + + $this->context->close(); + $this->context = null; + } + + /** + * @inheritdoc + * @throws NotSupportedException + */ + public function status($id) + { + throw new NotSupportedException('Status is not supported in the driver.'); + } + + /** + * @param StompMessage $message + * @throws \Interop\Queue\Exception + */ + protected function redeliver(StompMessage $message) + { + $attempt = $message->getProperty(self::ATTEMPT, 1); + + $newMessage = $this->context->createMessage($message->getBody(), $message->getProperties(), $message->getHeaders()); + $newMessage->setProperty(self::ATTEMPT, ++$attempt); + + $this->context->createProducer()->send( + $this->createQueue($this->queueName), + $newMessage + ); + } + + /** + * @param $name + * @return \Enqueue\Stomp\StompDestination + */ + private function createQueue($name) + { + $queue = $this->context->createQueue($name); + $queue->setDurable(true); + $queue->setAutoDelete(false); + $queue->setExclusive(false); + + return $queue; + } +} diff --git a/tests/app/benchmark/waiting/Action.php b/tests/app/benchmark/waiting/Action.php index 53f841737c..101c55783a 100644 --- a/tests/app/benchmark/waiting/Action.php +++ b/tests/app/benchmark/waiting/Action.php @@ -33,6 +33,7 @@ class Action extends \yii\base\Action 'amqpInteropQueue' => 'amqp-interop-queue/listen --isolate=0', 'mysqlQueue' => 'mysql-queue/listen 1 --isolate=0', 'fileQueue' => 'file-queue/listen 1 --isolate=0', + 'stompQueue' => 'stomp-queue/listen --isolate=0', ], // Worker will be run in isolate mode 'isolate' => [ @@ -43,6 +44,7 @@ class Action extends \yii\base\Action 'amqpInteropQueue' => 'amqp-interop-queue/listen --isolate=1', 'mysqlQueue' => 'mysql-queue/listen 1 --isolate=1', 'fileQueue' => 'file-queue/listen 1 --isolate=1', + 'stompQueue' => 'stomp-queue/listen 1 --isolate=1', ], ]; /** diff --git a/tests/app/config/main.php b/tests/app/config/main.php index f77aa000b8..942f080ce1 100644 --- a/tests/app/config/main.php +++ b/tests/app/config/main.php @@ -13,6 +13,7 @@ 'amqpQueue', 'amqpInteropQueue', 'beanstalkQueue', + 'stompQueue', ], 'components' => [ 'syncQueue' => [ @@ -100,6 +101,10 @@ 'class' => \yii\queue\beanstalk\Queue::class, 'host' => getenv('BEANSTALK_HOST') ?: 'localhost', ], + 'stompQueue' => [ + 'class' => \yii\queue\stomp\Queue::class, + 'host' => getenv('ACTIVEMQ_HOST') ?: 'localhost', + ], ], ]; diff --git a/tests/bootstrap.php b/tests/bootstrap.php index 7b44c2e912..64cfda50f5 100644 --- a/tests/bootstrap.php +++ b/tests/bootstrap.php @@ -17,6 +17,7 @@ Yii::setAlias('@yii/queue/redis', dirname(__DIR__) . '/src/drivers/redis'); Yii::setAlias('@yii/queue/sync', dirname(__DIR__) . '/src/drivers/sync'); Yii::setAlias('@yii/queue/sqs', dirname(__DIR__) . '/src/drivers/sqs'); +Yii::setAlias('@yii/queue/stomp', dirname(__DIR__) . '/src/drivers/stomp'); Yii::setAlias('@tests', __DIR__); $config = require(__DIR__ . '/app/config/main.php'); diff --git a/tests/docker-compose.yml b/tests/docker-compose.yml index 2306786e70..68c210a9e7 100644 --- a/tests/docker-compose.yml +++ b/tests/docker-compose.yml @@ -34,6 +34,7 @@ services: AWS_SECRET: ${AWS_SECRET} AWS_REGION: ${AWS_REGION} AWS_SQS_URL: ${AWS_SQS_URL} + ACTIVEMQ_HOST: activemq AWS_SQS_FIFO_ENABLED: ${AWS_SQS_FIFO_ENABLED} AWS_SQS_FIFO_URL: ${AWS_SQS_FIFO_URL} AWS_SQS_FIFO_MESSAGE_GROUP_ID: ${AWS_SQS_FIFO_MESSAGE_GROUP_ID} @@ -44,6 +45,7 @@ services: - rabbitmq - beanstalk - gearmand + - activemq networks: net: {} php72: @@ -148,6 +150,14 @@ services: networks: net: {} + # https://hub.docker.com/r/webcenter/activemq/ + activemq: + image: webcenter/activemq + ports: + - 61613:61613 + networks: + net: {} + networks: net: name: yii2_queue_net diff --git a/tests/docker/php/entrypoint.sh b/tests/docker/php/entrypoint.sh index b82c35f3b2..641ee5b4d9 100755 --- a/tests/docker/php/entrypoint.sh +++ b/tests/docker/php/entrypoint.sh @@ -20,6 +20,8 @@ tests/docker/wait-for-it.sh beanstalk:11300 -t 180 tests/docker/wait-for-it.sh gearmand:4730 -t 180 +tests/docker/wait-for-it.sh activemq:61613 -t 180 + php --version set -x exec "$@" diff --git a/tests/drivers/amqp_interop/QueueTest.php b/tests/drivers/amqp_interop/QueueTest.php index f5953b1261..0e57102d39 100644 --- a/tests/drivers/amqp_interop/QueueTest.php +++ b/tests/drivers/amqp_interop/QueueTest.php @@ -75,7 +75,7 @@ protected function setUp() if ('true' == getenv('EXCLUDE_AMQP_INTEROP')) { $this->markTestSkipped('Amqp tests are disabled for php 5.5'); } - + parent::setUp(); } } diff --git a/tests/drivers/stomp/QueueTest.php b/tests/drivers/stomp/QueueTest.php new file mode 100644 index 0000000000..af41e193ca --- /dev/null +++ b/tests/drivers/stomp/QueueTest.php @@ -0,0 +1,57 @@ +startProcess('php yii queue/listen'); + $job = $this->createSimpleJob(); + $this->getQueue()->push($job); + + $this->assertSimpleJobDone($job); + } + + public function testRetry() + { + $this->startProcess('php yii queue/listen'); + $job = new RetryJob(['uid' => uniqid()]); + $this->getQueue()->push($job); + sleep(6); + + $this->assertFileExists($job->getFileName()); + $this->assertEquals('aa', file_get_contents($job->getFileName())); + } + + /** + * @return Queue + */ + protected function getQueue() + { + return Yii::$app->stompQueue; + } + + + protected function setUp() + { + if ('true' == getenv('EXCLUDE_STOMP')) { + $this->markTestSkipped('Stomp tests are disabled for php 5.5'); + } + + parent::setUp(); + } + +} diff --git a/tests/yii b/tests/yii index 72945f7668..8a69da60a6 100755 --- a/tests/yii +++ b/tests/yii @@ -18,6 +18,7 @@ Yii::setAlias('@yii/queue/gearman', dirname(__DIR__) . '/src/drivers/gearman'); Yii::setAlias('@yii/queue/redis', dirname(__DIR__) . '/src/drivers/redis'); Yii::setAlias('@yii/queue/sync', dirname(__DIR__) . '/src/drivers/sync'); Yii::setAlias('@yii/queue/sqs', dirname(__DIR__) . '/src/drivers/sqs'); +Yii::setAlias('@yii/queue/stomp', dirname(__DIR__) . '/src/drivers/stomp'); Yii::setAlias('@tests', __DIR__); $config = \yii\helpers\ArrayHelper::merge(