Skip to content

Commit

Permalink
Merge pull request #14 from ReputationVIP/use-new-lock-component
Browse files Browse the repository at this point in the history
Use new lock component
  • Loading branch information
bobey authored Dec 6, 2017
2 parents 391d84c + 3624b5f commit 9d57856
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 245 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: php
php:
- 5.6
- 7.0
- 7.1
- 7.2

script:
- composer install
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Queue Client Changelog

## v2.0.0

- Use new Symfony/Lock component in Filesystem handler

## v1.0.3

- Use Priority objects instead of string
Expand Down
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@
}
],
"require": {
"symfony/filesystem": ">=2.7",
"symfony/finder": ">=2.7",
"symfony/filesystem": ">=3.4",
"symfony/finder": ">=3.4",
"symfony/lock": ">=3.4",
"aws/aws-sdk-php": ">=2.7"
},
"require-dev": {
"atoum/atoum": "~2",
"atoum/atoum": "~3.1",
"satooshi/php-coveralls": "dev-master"
},
"autoload": {
Expand Down
4 changes: 2 additions & 2 deletions docker/test/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM php:7
FROM php:7.1

RUN pecl install xdebug
RUN docker-php-ext-enable xdebug
Expand All @@ -9,4 +9,4 @@ RUN apt-get update && apt-get install -y \

COPY ./ssh/ssh_config /etc/ssh/ssh_config

WORKDIR /data
WORKDIR /data
65 changes: 33 additions & 32 deletions src/Adapter/FileAdapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
use ReputationVIP\QueueClient\PriorityHandler\Priority\Priority;
use ReputationVIP\QueueClient\PriorityHandler\PriorityHandlerInterface;
use ReputationVIP\QueueClient\PriorityHandler\StandardPriorityHandler;
use ReputationVIP\QueueClient\Utils\LockHandlerFactory;
use ReputationVIP\QueueClient\Utils\LockHandlerFactoryInterface;
use Symfony\Component\Filesystem\Exception\IOExceptionInterface;
use Symfony\Component\Filesystem\Filesystem;
use Symfony\Component\Finder\Finder;
use Symfony\Component\Finder\SplFileInfo;
use Symfony\Component\Lock\Factory;
use Symfony\Component\Lock\Store\FlockStore;

class FileAdapter extends AbstractAdapter implements AdapterInterface
{
Expand All @@ -31,7 +31,7 @@ class FileAdapter extends AbstractAdapter implements AdapterInterface
/** @var Filesystem $fs */
private $fs;

/** @var LockHandlerFactoryInterface $fs */
/** @var Factory $lockHandlerFactory */
private $lockHandlerFactory;

/** @var PriorityHandlerInterface $priorityHandler */
Expand All @@ -42,12 +42,12 @@ class FileAdapter extends AbstractAdapter implements AdapterInterface
* @param PriorityHandlerInterface $priorityHandler
* @param Filesystem $fs
* @param Finder $finder
* @param LockHandlerFactoryInterface $lockHandlerFactory
* @param Factory $lockHandlerFactory
*
* @throws \InvalidArgumentException
* @throws QueueAccessException
*/
public function __construct($repository, PriorityHandlerInterface $priorityHandler = null, Filesystem $fs = null, Finder $finder = null, LockHandlerFactoryInterface $lockHandlerFactory = null)
public function __construct($repository, PriorityHandlerInterface $priorityHandler = null, Filesystem $fs = null, Finder $finder = null, Factory $lockHandlerFactory = null)
{
if (empty($repository)) {
throw new \InvalidArgumentException('Argument repository empty or not defined.');
Expand All @@ -61,15 +61,12 @@ public function __construct($repository, PriorityHandlerInterface $priorityHandl
$finder = new Finder();
}

if (null === $lockHandlerFactory) {
$lockHandlerFactory = new LockHandlerFactory();
}

if (null === $priorityHandler) {
$priorityHandler = new StandardPriorityHandler();
}

$this->fs = $fs;

if (!$this->fs->exists($repository)) {
try {
$this->fs->mkdir($repository);
Expand All @@ -78,6 +75,10 @@ public function __construct($repository, PriorityHandlerInterface $priorityHandl
}
}

if (null === $lockHandlerFactory) {
$lockHandlerFactory = new Factory(new FlockStore($repository));
}

$this->priorityHandler = $priorityHandler;
$this->repository = $repository;
$this->finder = $finder;
Expand Down Expand Up @@ -126,8 +127,8 @@ private function getQueuePath($queueName, Priority $priority)
private function readQueueFromFile($queueName, Priority $priority, $nbTries = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand All @@ -148,10 +149,10 @@ private function readQueueFromFile($queueName, Priority $priority, $nbTries = 0)
}
$queue = json_decode($content, true);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();

return $queue;
}
Expand All @@ -170,8 +171,8 @@ private function readQueueFromFile($queueName, Priority $priority, $nbTries = 0)
private function writeQueueInFile($queueName, Priority $priority, $queue, $nbTries = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand All @@ -182,10 +183,10 @@ private function writeQueueInFile($queueName, Priority $priority, $queue, $nbTri
$queueJson = json_encode($queue);
$this->fs->dumpFile($queueFilePath, $queueJson);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();
return $this;
}

Expand All @@ -205,8 +206,8 @@ private function writeQueueInFile($queueName, Priority $priority, $queue, $nbTri
private function addMessageLock($queueName, $message, Priority $priority, $nbTries = 0, $delaySeconds = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand Down Expand Up @@ -239,10 +240,10 @@ private function addMessageLock($queueName, $message, Priority $priority, $nbTri
$queueJson = json_encode($queue);
$this->fs->dumpFile($queueFilePath, $queueJson);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();
return $this;
}

Expand Down Expand Up @@ -291,8 +292,8 @@ public function addMessage($queueName, $message, Priority $priority = null, $del
private function getMessagesLock($queueName, $nbMsg, Priority $priority, $nbTries = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand Down Expand Up @@ -335,10 +336,10 @@ private function getMessagesLock($queueName, $nbMsg, Priority $priority, $nbTrie
$queueJson = json_encode($queue);
$this->fs->dumpFile($queueFilePath, $queueJson);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();

return $messages;
}
Expand Down Expand Up @@ -399,8 +400,8 @@ public function getMessages($queueName, $nbMsg = 1, Priority $priority = null)
private function deleteMessageLock($queueName, $message, Priority $priority, $nbTries = 0)
{
$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
Expand Down Expand Up @@ -431,10 +432,10 @@ private function deleteMessageLock($queueName, $message, Priority $priority, $nb
$queueJson = json_encode($queue);
$this->fs->dumpFile($queueFilePath, $queueJson);
} catch (\Exception $e) {
$lockHandler->release();
$lock->release();
throw $e;
}
$lockHandler->release();
$lock->release();

return $this;
}
Expand Down Expand Up @@ -570,16 +571,16 @@ private function deleteQueueLock($queueName, Priority $priority, $nbTries = 0)
}

$queueFilePath = $this->getQueuePath($queueName, $priority);
$lockHandler = $this->lockHandlerFactory->getLockHandler($queueFilePath);
if (!$lockHandler->lock()) {
$lock = $this->lockHandlerFactory->createLock($queueFilePath);
if (!$lock->acquire()) {
if ($nbTries >= static::MAX_LOCK_TRIES) {
throw new QueueAccessException('Reach max retry for locking queue file ' . $queueFilePath);
}
usleep(10);
return $this->deleteQueueLock($queueName, $priority, ($nbTries + 1));
}
$this->fs->remove($queueFilePath);
$lockHandler->release();
$lock->release();
return $this;
}

Expand Down
16 changes: 0 additions & 16 deletions src/Utils/LockHandlerFactory.php

This file was deleted.

17 changes: 0 additions & 17 deletions src/Utils/LockHandlerFactoryInterface.php

This file was deleted.

Loading

0 comments on commit 9d57856

Please sign in to comment.