From ec7caaa8430a3144a8be190765fd7ba8d39bb8b2 Mon Sep 17 00:00:00 2001 From: Neverdane Date: Mon, 25 Jan 2016 23:16:24 +0100 Subject: [PATCH 1/7] Simplify QueueClient addMessage method The addMessage method tested if the queues to process were contained in an array in order to iterate through them otherwise we assumed it was the queueName only. This condition was useless and duplicated some lines of code as we could directly cast the given queues as an array. --- src/QueueClient.php | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/QueueClient.php b/src/QueueClient.php index 591c5f9..9f321bf 100644 --- a/src/QueueClient.php +++ b/src/QueueClient.php @@ -53,13 +53,10 @@ public function __construct(AdapterInterface $adapter = null) */ public function addMessage($queueName, $message, $priority = null) { - $queues = $this->resolveAliasQueueName($queueName); - if (is_array($queues)) { - foreach ($queues as $queue) { - $this->adapter->addMessage($queue, $message, $priority); - } - } else { - $this->adapter->addMessage($queues, $message, $priority); + $queues = (array) $this->resolveAliasQueueName($queueName); + + foreach ($queues as $queue) { + $this->adapter->addMessage($queue, $message, $priority); } return $this; From bc3a1121500a8652f724fc787e63661b204af587 Mon Sep 17 00:00:00 2001 From: Neverdane Date: Mon, 25 Jan 2016 23:25:25 +0100 Subject: [PATCH 2/7] Resolve alias queue name only once The addMessages method called the addMessage for each message. This addMessage resolved each time the alias queue name whereas it was already done the first time. We only had to resolve it at the beginning of addMessages and call the adapter's method for each message. --- src/QueueClient.php | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/QueueClient.php b/src/QueueClient.php index 9f321bf..c308262 100644 --- a/src/QueueClient.php +++ b/src/QueueClient.php @@ -67,8 +67,12 @@ public function addMessage($queueName, $message, $priority = null) */ public function addMessages($queueName, $messages, $priority = null) { - foreach ($messages as $message) { - $this->addMessage($queueName, $message, $priority); + $queues = (array) $this->resolveAliasQueueName($queueName); + + foreach ($queues as $queue) { + foreach ($messages as $message) { + $this->adapter->addMessage($queue, $message, $priority); + } } return $this; From 373aa242b4bcfae916a82b856d5514d15927ac71 Mon Sep 17 00:00:00 2001 From: Neverdane Date: Mon, 25 Jan 2016 23:33:02 +0100 Subject: [PATCH 3/7] Inherit each Adapter from AbstractAdapter We have common behaviours in some adapters methods. In order to share these behaviours, we inherit all Adapters from an AbstractAdapter (which is empty for the moment). --- src/Adapter/AbstractAdapter.php | 7 +++++++ src/Adapter/FileAdapter.php | 2 +- src/Adapter/MemoryAdapter.php | 2 +- src/Adapter/NullAdapter.php | 2 +- src/Adapter/SQSAdapter.php | 2 +- 5 files changed, 11 insertions(+), 4 deletions(-) create mode 100644 src/Adapter/AbstractAdapter.php diff --git a/src/Adapter/AbstractAdapter.php b/src/Adapter/AbstractAdapter.php new file mode 100644 index 0000000..a85954e --- /dev/null +++ b/src/Adapter/AbstractAdapter.php @@ -0,0 +1,7 @@ + Date: Mon, 25 Jan 2016 23:39:49 +0100 Subject: [PATCH 4/7] Use adapter's addMessages method Instead of iterating through each message, we can call the method addMessages of the adapter which can now be overridden by each adapter as it inherits from AbstractAdapter. --- src/Adapter/AbstractAdapter.php | 31 +++++++++++++++++++++++++++++++ src/Adapter/NullAdapter.php | 8 -------- src/QueueClient.php | 4 +--- 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/src/Adapter/AbstractAdapter.php b/src/Adapter/AbstractAdapter.php index a85954e..fc67bb0 100644 --- a/src/Adapter/AbstractAdapter.php +++ b/src/Adapter/AbstractAdapter.php @@ -2,6 +2,37 @@ namespace ReputationVIP\QueueClient\Adapter; +use ReputationVIP\QueueClient\QueueClientInterface; + class AbstractAdapter { + + /** + * @param string $queueName + * @param array $messages + * @param string $priority + * + * @return QueueClientInterface + */ + public function addMessages($queueName, $messages, $priority = null) + { + foreach ($messages as $message) { + $this->addMessage($queueName, $message, $priority); + } + + return $this; + } + + /** + * @param string $queueName + * @param mixed $message + * @param string $priority + * + * @return AdapterInterface + */ + public function addMessage($queueName, $message, $priority = null) + { + return $this; + } + } diff --git a/src/Adapter/NullAdapter.php b/src/Adapter/NullAdapter.php index 04664e1..f1b904d 100644 --- a/src/Adapter/NullAdapter.php +++ b/src/Adapter/NullAdapter.php @@ -23,14 +23,6 @@ public function __construct(PriorityHandlerInterface $priorityHandler = null) $this->priorityHandler = $priorityHandler; } - /** - * @inheritdoc - */ - public function addMessage($queueName, $message, $priority = null) - { - return $this; - } - /** * @inheritdoc */ diff --git a/src/QueueClient.php b/src/QueueClient.php index c308262..9c38f6f 100644 --- a/src/QueueClient.php +++ b/src/QueueClient.php @@ -70,9 +70,7 @@ public function addMessages($queueName, $messages, $priority = null) $queues = (array) $this->resolveAliasQueueName($queueName); foreach ($queues as $queue) { - foreach ($messages as $message) { - $this->adapter->addMessage($queue, $message, $priority); - } + $this->adapter->addMessages($queue, $messages, $priority); } return $this; From bcba0f1d1bb852eeef14647bb872527bb7e8a709 Mon Sep 17 00:00:00 2001 From: Neverdane Date: Tue, 26 Jan 2016 09:34:49 +0100 Subject: [PATCH 5/7] Implement batch logic in SQS sending We used to iterate through each message and therefore send each message individually which added a performance issue. Instead, we prefer to send pools of 10 messages in order to minimize API calls. --- src/Adapter/SQSAdapter.php | 46 ++++++++++++++++++++++++++++++ tests/units/Adapter/SQSAdapter.php | 17 +++++++++++ 2 files changed, 63 insertions(+) diff --git a/src/Adapter/SQSAdapter.php b/src/Adapter/SQSAdapter.php index a328e3c..b2a6fbb 100644 --- a/src/Adapter/SQSAdapter.php +++ b/src/Adapter/SQSAdapter.php @@ -19,6 +19,7 @@ class SQSAdapter extends AbstractAdapter implements AdapterInterface private $priorityHandler; const MAX_NB_MESSAGES = 10; + const SENT_MESSAGES_BATCH_SIZE = 10; const PRIORITY_SEPARATOR = '-'; /** @@ -51,6 +52,51 @@ public function __construct(SqsClient $sqsClient, PriorityHandlerInterface $prio return $this; } + /** + * @inheritdoc + */ + public function addMessages($queueName, $messages, $priority = null) + { + if (null === $priority) { + $priority = $this->priorityHandler->getDefault(); + } + + if (empty($queueName)) { + throw new InvalidArgumentException('Parameter queueName empty or not defined.'); + } + + $batchMessages = []; + $batchesCount = 0; + $blockCounter = 0; + + foreach ($messages as $index => $message) { + if (empty($message)) { + throw new InvalidArgumentException('Parameter message empty or not defined.'); + } + $messageData = [ + 'Id' => (string) $index, + 'MessageBody' => serialize($message) + ]; + if ($blockCounter >= self::SENT_MESSAGES_BATCH_SIZE) { + $blockCounter = 0; + $batchesCount++; + } else { + $blockCounter++; + } + $batchMessages[$batchesCount][] = $messageData; + } + + foreach ($batchMessages as $messages) { + $queueUrl = $this->sqsClient->getQueueUrl(['QueueName' => $this->getQueueNameWithPrioritySuffix($queueName, $priority)])->get('QueueUrl'); + $this->sqsClient->sendMessageBatch([ + 'QueueUrl' => $queueUrl, + 'Entries' => $messages, + ]); + } + + return $this; + } + /** * @inheritdoc * diff --git a/tests/units/Adapter/SQSAdapter.php b/tests/units/Adapter/SQSAdapter.php index 3f59829..27b1c79 100644 --- a/tests/units/Adapter/SQSAdapter.php +++ b/tests/units/Adapter/SQSAdapter.php @@ -48,6 +48,23 @@ public function testSQSAdapterAddMessage() ->class($SQSAdapter->addMessage('testQueue', 'test message'))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface'); } + public function testSQSAdapterAddMessages() + { + $this->mockGenerator->orphanize('__construct'); + $this->mockGenerator->shuntParentClassCalls(); + $mockSqsClient = new \mock\Aws\Sqs\SqsClient; + $mockQueueUrlModel = new \mock\Guzzle\Service\Resource\Model; + $SQSAdapter = new \ReputationVIP\QueueClient\Adapter\SQSAdapter($mockSqsClient); + + $mockSqsClient->getMockController()->getQueueUrl = function () use($mockQueueUrlModel) { + return $mockQueueUrlModel; + }; + $mockSqsClient->getMockController()->sendMessageBatch = function () { + }; + $this->given($SQSAdapter) + ->class($SQSAdapter->addMessages('testQueue', ['test message', 'test message 2']))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface'); + } + public function testSQSAdapterGetMessagesWithEmptyQueueName() { $this->mockGenerator->orphanize('__construct'); From 09e53abde57f21fed4671ef5a8bc7cdf889b496a Mon Sep 17 00:00:00 2001 From: Neverdane Date: Tue, 26 Jan 2016 09:36:14 +0100 Subject: [PATCH 6/7] Update Changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9ffc15b..528bba2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Queue Client Changelog +## v1.0.1 + +- Send SQS messages in batch + ## v1.0.0 - Initial commit From a9914db75e9dde97fb21bc49b8373f711b8c6db8 Mon Sep 17 00:00:00 2001 From: Neverdane Date: Tue, 26 Jan 2016 23:04:42 +0100 Subject: [PATCH 7/7] Cover all SQS addMessages conditions with tests Some addMessages conditions weren't tested which reduced the unit tests code coverage. --- tests/units/Adapter/NullAdapter.php | 6 ++++++ tests/units/Adapter/SQSAdapter.php | 26 +++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/tests/units/Adapter/NullAdapter.php b/tests/units/Adapter/NullAdapter.php index 8255b23..20d7070 100644 --- a/tests/units/Adapter/NullAdapter.php +++ b/tests/units/Adapter/NullAdapter.php @@ -13,6 +13,12 @@ public function testNullAdapterAddMessage() $this->given($NullAdapter)->class($NullAdapter->addMessage('testQueue', 'test Message one'))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface'); } + public function testNullAdapterAddMessages() + { + $NullAdapter = new \ReputationVIP\QueueClient\Adapter\NullAdapter(); + $this->given($NullAdapter)->class($NullAdapter->addMessages('testQueue', ['test Message one']))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface'); + } + public function testNullAdapterGetMessages() { $NullAdapter = new \ReputationVIP\QueueClient\Adapter\NullAdapter(); diff --git a/tests/units/Adapter/SQSAdapter.php b/tests/units/Adapter/SQSAdapter.php index 27b1c79..569224d 100644 --- a/tests/units/Adapter/SQSAdapter.php +++ b/tests/units/Adapter/SQSAdapter.php @@ -62,7 +62,31 @@ public function testSQSAdapterAddMessages() $mockSqsClient->getMockController()->sendMessageBatch = function () { }; $this->given($SQSAdapter) - ->class($SQSAdapter->addMessages('testQueue', ['test message', 'test message 2']))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface'); + ->class($SQSAdapter->addMessages('testQueue', array_fill(0, 11, 'test message')))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface'); + } + + public function testSQSAdapterAddMessagesWithEmptyMessage() + { + $this->mockGenerator->orphanize('__construct'); + $this->mockGenerator->shuntParentClassCalls(); + $mockSqsClient = new \mock\Aws\Sqs\SqsClient; + $SQSAdapter = new \ReputationVIP\QueueClient\Adapter\SQSAdapter($mockSqsClient); + + $this->exception(function() use($SQSAdapter) { + $SQSAdapter->addMessages('testQueue', ['test message', '']); + }); + } + + public function testSQSAdapterAddMessagesWithEmptyQueueName() + { + $this->mockGenerator->orphanize('__construct'); + $this->mockGenerator->shuntParentClassCalls(); + $mockSqsClient = new \mock\Aws\Sqs\SqsClient; + $SQSAdapter = new \ReputationVIP\QueueClient\Adapter\SQSAdapter($mockSqsClient); + + $this->exception(function() use($SQSAdapter) { + $SQSAdapter->addMessages('', ['']); + }); } public function testSQSAdapterGetMessagesWithEmptyQueueName()