Skip to content

Commit

Permalink
Implement batch logic in SQS sending
Browse files Browse the repository at this point in the history
We used to iterate through each message and therefore send
each message individually which added a performance issue.
Instead, we prefer to send pools of 10 messages in order to
minimize API calls.
  • Loading branch information
neverdane committed Jan 26, 2016
1 parent 248288a commit bcba0f1
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 0 deletions.
46 changes: 46 additions & 0 deletions src/Adapter/SQSAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class SQSAdapter extends AbstractAdapter implements AdapterInterface
private $priorityHandler;

const MAX_NB_MESSAGES = 10;
const SENT_MESSAGES_BATCH_SIZE = 10;
const PRIORITY_SEPARATOR = '-';

/**
Expand Down Expand Up @@ -51,6 +52,51 @@ public function __construct(SqsClient $sqsClient, PriorityHandlerInterface $prio
return $this;
}

/**
* @inheritdoc
*/
public function addMessages($queueName, $messages, $priority = null)
{
if (null === $priority) {
$priority = $this->priorityHandler->getDefault();
}

if (empty($queueName)) {
throw new InvalidArgumentException('Parameter queueName empty or not defined.');
}

$batchMessages = [];
$batchesCount = 0;
$blockCounter = 0;

foreach ($messages as $index => $message) {
if (empty($message)) {
throw new InvalidArgumentException('Parameter message empty or not defined.');
}
$messageData = [
'Id' => (string) $index,
'MessageBody' => serialize($message)
];
if ($blockCounter >= self::SENT_MESSAGES_BATCH_SIZE) {
$blockCounter = 0;
$batchesCount++;
} else {
$blockCounter++;
}
$batchMessages[$batchesCount][] = $messageData;
}

foreach ($batchMessages as $messages) {
$queueUrl = $this->sqsClient->getQueueUrl(['QueueName' => $this->getQueueNameWithPrioritySuffix($queueName, $priority)])->get('QueueUrl');
$this->sqsClient->sendMessageBatch([
'QueueUrl' => $queueUrl,
'Entries' => $messages,
]);
}

return $this;
}

/**
* @inheritdoc
*
Expand Down
17 changes: 17 additions & 0 deletions tests/units/Adapter/SQSAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,23 @@ public function testSQSAdapterAddMessage()
->class($SQSAdapter->addMessage('testQueue', 'test message'))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface');
}

public function testSQSAdapterAddMessages()
{
$this->mockGenerator->orphanize('__construct');
$this->mockGenerator->shuntParentClassCalls();
$mockSqsClient = new \mock\Aws\Sqs\SqsClient;
$mockQueueUrlModel = new \mock\Guzzle\Service\Resource\Model;
$SQSAdapter = new \ReputationVIP\QueueClient\Adapter\SQSAdapter($mockSqsClient);

$mockSqsClient->getMockController()->getQueueUrl = function () use($mockQueueUrlModel) {
return $mockQueueUrlModel;
};
$mockSqsClient->getMockController()->sendMessageBatch = function () {
};
$this->given($SQSAdapter)
->class($SQSAdapter->addMessages('testQueue', ['test message', 'test message 2']))->hasInterface('\ReputationVIP\QueueClient\Adapter\AdapterInterface');
}

public function testSQSAdapterGetMessagesWithEmptyQueueName()
{
$this->mockGenerator->orphanize('__construct');
Expand Down

0 comments on commit bcba0f1

Please sign in to comment.