From 1aca469d63298235dd8ee127e4e70e8248f89407 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Mon, 27 May 2024 10:29:41 +0200 Subject: [PATCH 01/14] PJAS-466 Add new parameter for enforcing a specific job id --- src/Queue.php | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/Queue.php b/src/Queue.php index 1e281c8..f2350f1 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -43,15 +43,20 @@ public function get(string $jobId) : ?Job /** * Create a new job and enqueue it. * - * @param string $action - * @param array $parameters - * @param int $maxRetries + * @param string $action The action to perform + * @param array $parameters The parameters for the job + * @param int $maxRetries The maximum number of retries + * @param string $jobId Speicify the job id instead of generating random one * * @return Job Returns the job */ - public function add(string $action, array $parameters = [], int $maxRetries = 3) : Job + public function add(string $action, array $parameters = [], int $maxRetries = 3, string $jobId = '') : Job { - $job = new Job(uniqid('', true), $action, $parameters); + $job = new Job( + $jobId !== '' ? $jobId : uniqid('', true), + $action, + $parameters + ); $this->driver->add($job, $maxRetries); return $job; } From 58f3c5c6354dfd7822e958fef30a87997cada640 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 09:21:25 +0200 Subject: [PATCH 02/14] PJAS-466 Enforce php 8.1, implement new method to check if job exists in queue, apply phpcs fixes --- composer.json | 21 +- src/Driver/DriverInterface.php | 9 + src/Driver/PHPArrayDriver.php | 12 + src/Driver/RedisDriver.php | 588 +++++++++++++------------ src/Exception.php | 4 +- src/Exception/InvalidDataException.php | 4 +- src/Exception/InvalidJobException.php | 4 +- src/Exception/LockedMutexException.php | 4 +- src/Job.php | 202 ++++----- src/Mutex.php | 4 +- src/ProcessManager.php | 204 ++++----- src/Queue.php | 301 +++++++------ 12 files changed, 716 insertions(+), 641 deletions(-) diff --git a/composer.json b/composer.json index b1b177b..d6d4613 100755 --- a/composer.json +++ b/composer.json @@ -8,12 +8,14 @@ ], "bin": [], "require": { - "php": ">=7.4", + "php": "^8.1", "symfony/process": "^5.4" }, "require-dev": { - "phpstan/phpstan": "^1.5", - "phpunit/phpunit": "^9.5" + "phpunit/phpunit": "^9.5", + "cineman/hydrogen-cs": "dev-master", + "phpstan/phpstan": "^1.7.0", + "squizlabs/php_codesniffer": "^3.5" }, "autoload": { "psr-4": { @@ -26,5 +28,16 @@ } }, "minimum-stability": "dev", - "prefer-stable": true + "prefer-stable": true, + "scripts": { + "ci-phpcs": [ + "vendor/bin/phpcs src/ --standard=vendor/cineman/hydrogen-cs" + ], + "ci-phpcs-fix": [ + "vendor/bin/phpcbf src/ --standard=vendor/cineman/hydrogen-cs" + ], + "ci-phpstan": [ + "vendor/bin/phpstan analyse src --error-format=github -l8" + ] + } } diff --git a/src/Driver/DriverInterface.php b/src/Driver/DriverInterface.php index aa53572..e78fade 100644 --- a/src/Driver/DriverInterface.php +++ b/src/Driver/DriverInterface.php @@ -3,6 +3,7 @@ namespace Beryllium\Driver; use Beryllium\Job; +use Generator; interface DriverInterface { @@ -29,6 +30,14 @@ public function add(Job $job, int $maxRetries = 3) : void; */ public function get(string $id) : ?Job; + /** + * Check if a job exists in the queue + * @param string $id + * + * @return bool + */ + public function exists(string $id) : bool; + /** * Get the ID of a waiting job * diff --git a/src/Driver/PHPArrayDriver.php b/src/Driver/PHPArrayDriver.php index b060ddc..a9da4a5 100644 --- a/src/Driver/PHPArrayDriver.php +++ b/src/Driver/PHPArrayDriver.php @@ -3,6 +3,7 @@ namespace Beryllium\Driver; use Beryllium\Job; +use Generator; class PHPArrayDriver implements DriverInterface { @@ -64,6 +65,17 @@ public function get(string $id) : ?Job return $this->jobs[$id] ?? null; } + /** + * Check if a job exists in the queue + * @param string $id + * + * @return bool + */ + public function exists(string $id) : bool + { + return array_key_exists($id, $this->jobs); + } + /** * Get the ID of a waiting job * diff --git a/src/Driver/RedisDriver.php b/src/Driver/RedisDriver.php index 99b47ce..4319d26 100644 --- a/src/Driver/RedisDriver.php +++ b/src/Driver/RedisDriver.php @@ -6,290 +6,312 @@ use Beryllium\Job; use Beryllium\Exception\InvalidDataException; +use Generator; class RedisDriver implements DriverInterface -{ - /** - * Get the redis connection - */ - protected Redis $redis; - - /** - * The redis key prefix - */ - protected string $queuePrefix = 'beryllium.queue.'; - - /** - * The redis lock key prefix - */ - protected string $lockPrefix = 'beryllium.lock.'; - - /** - * Redis Keys - */ - const REDIS_KEY_WAITLIST = 'waitlist'; - const REDIS_KEY_ATTEMPT = 'attempt.'; - const REDIS_KEY_MAX_RETRIES = 'max_retries.'; - const REDIS_KEY_DATA = 'data.'; - const REDIS_KEY_STATS = 'stats.'; - - /** - * Constructor - */ - public function __construct(Redis $redis) - { - $this->redis = $redis; - } - - /** - * Queue Methods - * - * ------------------------------------------------------------------------ - */ - - /** - * Sets the queues key prefix - * - * @param string $prefix - * @return void - */ - public function setQueueKeyPrefix(string $prefix) - { - $this->queuePrefix = $prefix; - } - - /** - * @deprecated - * @param string $prefix - * @return void - */ - public function setKeyPrefix(string $prefix) - { - trigger_error('Method ' . __METHOD__ . ' is deprecated, use setQueueKeyPrefix instead.', E_USER_DEPRECATED); - $this->setQueueKeyPrefix($prefix); - } - - /** - * Sets the lock key prefix - * - * @param string $prefix - * @return void - */ - public function setLockKeyPrefix(string $prefix) - { - $this->lockPrefix = $prefix; - } - - /** - * Add a job to the queue - * - * @param Job $job - * @param int $maxRetries - */ - public function add(Job $job, int $maxRetries = 3) : void - { - $id = $job->id(); // get the job id - - $this->redis->lPush($this->queuePrefix . static::REDIS_KEY_WAITLIST, $id); - - $this->redis->set($this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id, 0); - $this->redis->set($this->queuePrefix . static::REDIS_KEY_MAX_RETRIES . $id, $maxRetries); - $this->redis->set($this->queuePrefix . static::REDIS_KEY_DATA . $id, $job->serialize()); - - // timeout the queue elements after an hour - // if there is an error somewhere this way we at least clear the garbage - $this->redis->expire($this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id, 3600); - $this->redis->expire($this->queuePrefix . static::REDIS_KEY_MAX_RETRIES . $id, 3600); - $this->redis->expire($this->queuePrefix . static::REDIS_KEY_DATA . $id, 3600); - } - - /** - * Get a job by id - * - * @param string $id - * @return Job - */ - public function get(string $id) : ?Job - { - // get the data - if (!$data = $this->redis->get($this->queuePrefix . static::REDIS_KEY_DATA . $id)) { - return null; - } - - // unserialize it - return Job::unserialize($data); - } - - /** - * Get the ID of a waiting job - * - * @return string - */ - public function popWaitingId() : ?string - { - return $this->redis->rPop($this->queuePrefix . static::REDIS_KEY_WAITLIST) ?: null; - } - - /** - * Counts the number of jobs waiting for execution - * - * @return int - */ - public function waitingCount() : int - { - return (int) $this->redis->lLen($this->queuePrefix . static::REDIS_KEY_WAITLIST); - } - - /** - * Reinsert the job into the waitlist - * - * @param string $id - * @return void - */ - public function retry(string $id) : void - { - $this->redis->incr($this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id); - $this->redis->lPush($this->queuePrefix . static::REDIS_KEY_WAITLIST, $id); - } - - /** - * Get the maximum number of attempts we should try for the job - * - * @param string $id - * @return int - */ - public function getMaxRetries(string $id) : int - { - if (($c = $this->redis->get($this->queuePrefix . static::REDIS_KEY_MAX_RETRIES . $id)) !== false) return (int) $c; - return -1; - } - - /** - * Get the number of attempts for the job - * - * @param string $id - * @return int - */ - public function attemptCount(string $id) : int - { - if (($c = $this->redis->get($this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id)) !== false) return (int) $c; - return -1; - } - - /** - * Cleanup the jobs data - * - * @param string $id - */ - public function cleanup(string $id) : void - { - $this->redis->del([ - $this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id, - $this->queuePrefix . static::REDIS_KEY_MAX_RETRIES . $id, - $this->queuePrefix . static::REDIS_KEY_DATA . $id, - ]); - } - - /** - * Will clear freaking everything - * !!! Attention with this one.. - * - * @return void - */ - public function clearEverything() : void - { - $this->redis->del($this->redis->keys($this->queuePrefix . '*')); - $this->redis->del($this->redis->keys($this->lockPrefix . '*')); - } - - /** - * Stats Methods - * - * ------------------------------------------------------------------------ - */ - - /** - * Simply store a value - * - * @param string $key - * @param mixed $value - * @return void - */ - public function storeStatsValue(string $key, $value) : void - { - $this->redis->set($this->queuePrefix . static::REDIS_KEY_STATS . $key, serialize($value)); - } - - /** - * Simply get a value - * - * @param string $key - * @return mixed - */ - public function getStatsValue(string $key) - { - if (($raw = $this->redis->get($this->queuePrefix . static::REDIS_KEY_STATS . $key)) === false) { - throw new InvalidDataException("Could not read stats value from redis."); - } - - return unserialize($raw); - } - - /** - * Locking System Methods - * - * ------------------------------------------------------------------------ - */ - - /** - * Checks if the given key is locked on the driver. - * - * @param string $key - * @return bool - */ - public function isLocked(string $key) : bool - { - return (bool) $this->redis->exists($this->lockPrefix . $key); - } - - /** - * Returns the locks token - * - * @param string $key - * @return string - */ - public function getLockToken(string $key) : ?string - { - return $this->redis->get($this->lockPrefix . $key) ?: null; - } - - /** - * Creates a lock entry on the driver, this must be synchronised! - * - * @param string $key - * @param string $token - * @param int $ttl - * - * @return bool Returns true if the lock could be created - */ - public function lock(string $key, string $token, int $ttl) : bool - { - return $this->redis->set($this->lockPrefix . $key, $token, ['NX', 'EX' => $ttl]); - } - - - /** - * Removes a lock entry on the driver, this must be synchronised! - * Also the lock for the key should only be removed if the token matches! - * - * @param string $key - * @param string $token - * - * @return bool - */ - public function unlock(string $key, string $token) : bool - { - $script = ' +{ + /** + * Get the redis connection + */ + protected Redis $redis; + + /** + * The redis key prefix + */ + protected string $queuePrefix = 'beryllium.queue.'; + + /** + * The redis lock key prefix + */ + protected string $lockPrefix = 'beryllium.lock.'; + + /** + * Redis Keys + */ + const REDIS_KEY_WAITLIST = 'waitlist'; + const REDIS_KEY_ATTEMPT = 'attempt.'; + const REDIS_KEY_MAX_RETRIES = 'max_retries.'; + const REDIS_KEY_DATA = 'data.'; + const REDIS_KEY_STATS = 'stats.'; + + /** + * Constructor + */ + public function __construct(Redis $redis) + { + $this->redis = $redis; + } + + /** + * Queue Methods + * + * ------------------------------------------------------------------------ + */ + + /** + * Sets the queues key prefix + * + * @param string $prefix + * @return void + */ + public function setQueueKeyPrefix(string $prefix) + { + $this->queuePrefix = $prefix; + } + + /** + * @deprecated + * @param string $prefix + * @return void + */ + public function setKeyPrefix(string $prefix) + { + trigger_error('Method ' . __METHOD__ . ' is deprecated, use setQueueKeyPrefix instead.', E_USER_DEPRECATED); + $this->setQueueKeyPrefix($prefix); + } + + /** + * Sets the lock key prefix + * + * @param string $prefix + * @return void + */ + public function setLockKeyPrefix(string $prefix) + { + $this->lockPrefix = $prefix; + } + + /** + * Add a job to the queue + * + * @param Job $job + * @param int $maxRetries + */ + public function add(Job $job, int $maxRetries = 3) : void + { + $id = $job->id(); // get the job id + + $this->redis->lPush($this->queuePrefix . static::REDIS_KEY_WAITLIST, $id); + + $this->redis->set($this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id, 0); + $this->redis->set($this->queuePrefix . static::REDIS_KEY_MAX_RETRIES . $id, $maxRetries); + $this->redis->set($this->queuePrefix . static::REDIS_KEY_DATA . $id, $job->serialize()); + + // timeout the queue elements after an hour + // if there is an error somewhere this way we at least clear the garbage + $this->redis->expire($this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id, 3600); + $this->redis->expire($this->queuePrefix . static::REDIS_KEY_MAX_RETRIES . $id, 3600); + $this->redis->expire($this->queuePrefix . static::REDIS_KEY_DATA . $id, 3600); + } + + /** + * Get a job by id + * + * @param string $id + * @return Job + */ + public function get(string $id) : ?Job + { + // get the data + if (!$data = $this->redis->get($this->queuePrefix . static::REDIS_KEY_DATA . $id)) { + return null; + } + + // unserialize it + return Job::unserialize($data); + } + + /** + * Check if a job exists in the queue + * @param string $id + * + * @return bool + */ + public function exists(string $id) : bool + { + $iterator = null; + $keys = $this->redis->scan( + $iterator, + $this->queuePrefix . static::REDIS_KEY_DATA . $id . '*', + 1 + ); + if ($keys !== false && count($keys) > 0) { + return true; + } + + return false; + } + + /** + * Get the ID of a waiting job + * + * @return string + */ + public function popWaitingId() : ?string + { + return $this->redis->rPop($this->queuePrefix . static::REDIS_KEY_WAITLIST) ?: null; + } + + /** + * Counts the number of jobs waiting for execution + * + * @return int + */ + public function waitingCount() : int + { + return (int) $this->redis->lLen($this->queuePrefix . static::REDIS_KEY_WAITLIST); + } + + /** + * Reinsert the job into the waitlist + * + * @param string $id + * @return void + */ + public function retry(string $id) : void + { + $this->redis->incr($this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id); + $this->redis->lPush($this->queuePrefix . static::REDIS_KEY_WAITLIST, $id); + } + + /** + * Get the maximum number of attempts we should try for the job + * + * @param string $id + * @return int + */ + public function getMaxRetries(string $id) : int + { + if (($c = $this->redis->get($this->queuePrefix . static::REDIS_KEY_MAX_RETRIES . $id)) !== false) return (int) $c; + return -1; + } + + /** + * Get the number of attempts for the job + * + * @param string $id + * @return int + */ + public function attemptCount(string $id) : int + { + if (($c = $this->redis->get($this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id)) !== false) return (int) $c; + return -1; + } + + /** + * Cleanup the jobs data + * + * @param string $id + */ + public function cleanup(string $id) : void + { + $this->redis->del([ + $this->queuePrefix . static::REDIS_KEY_ATTEMPT . $id, + $this->queuePrefix . static::REDIS_KEY_MAX_RETRIES . $id, + $this->queuePrefix . static::REDIS_KEY_DATA . $id, + ]); + } + + /** + * Will clear freaking everything + * !!! Attention with this one.. + * + * @return void + */ + public function clearEverything() : void + { + $this->redis->del($this->redis->keys($this->queuePrefix . '*')); + $this->redis->del($this->redis->keys($this->lockPrefix . '*')); + } + + /** + * Stats Methods + * + * ------------------------------------------------------------------------ + */ + + /** + * Simply store a value + * + * @param string $key + * @param mixed $value + * @return void + */ + public function storeStatsValue(string $key, $value) : void + { + $this->redis->set($this->queuePrefix . static::REDIS_KEY_STATS . $key, serialize($value)); + } + + /** + * Simply get a value + * + * @param string $key + * @return mixed + */ + public function getStatsValue(string $key) + { + if (($raw = $this->redis->get($this->queuePrefix . static::REDIS_KEY_STATS . $key)) === false) { + throw new InvalidDataException("Could not read stats value from redis."); + } + + return unserialize($raw); + } + + /** + * Locking System Methods + * + * ------------------------------------------------------------------------ + */ + + /** + * Checks if the given key is locked on the driver. + * + * @param string $key + * @return bool + */ + public function isLocked(string $key) : bool + { + return (bool) $this->redis->exists($this->lockPrefix . $key); + } + + /** + * Returns the locks token + * + * @param string $key + * @return string + */ + public function getLockToken(string $key) : ?string + { + return $this->redis->get($this->lockPrefix . $key) ?: null; + } + + /** + * Creates a lock entry on the driver, this must be synchronised! + * + * @param string $key + * @param string $token + * @param int $ttl + * + * @return bool Returns true if the lock could be created + */ + public function lock(string $key, string $token, int $ttl) : bool + { + return $this->redis->set($this->lockPrefix . $key, $token, ['NX', 'EX' => $ttl]); + } + + + /** + * Removes a lock entry on the driver, this must be synchronised! + * Also the lock for the key should only be removed if the token matches! + * + * @param string $key + * @param string $token + * + * @return bool + */ + public function unlock(string $key, string $token) : bool + { + $script = ' if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else @@ -298,5 +320,5 @@ public function unlock(string $key, string $token) : bool '; return (bool) $this->redis->eval($script, [$this->lockPrefix . $key, $token], 1); - } + } } diff --git a/src/Exception.php b/src/Exception.php index 2cfb359..703c3c3 100644 --- a/src/Exception.php +++ b/src/Exception.php @@ -2,4 +2,6 @@ namespace Beryllium; -class Exception extends \Exception {} \ No newline at end of file +class Exception extends \Exception +{ +} diff --git a/src/Exception/InvalidDataException.php b/src/Exception/InvalidDataException.php index 6751627..408d0b0 100644 --- a/src/Exception/InvalidDataException.php +++ b/src/Exception/InvalidDataException.php @@ -2,4 +2,6 @@ namespace Beryllium\Exception; -class InvalidDataException extends \Beryllium\Exception {} +class InvalidDataException extends \Beryllium\Exception +{ +} diff --git a/src/Exception/InvalidJobException.php b/src/Exception/InvalidJobException.php index 5ef3491..e2d2c1f 100644 --- a/src/Exception/InvalidJobException.php +++ b/src/Exception/InvalidJobException.php @@ -2,4 +2,6 @@ namespace Beryllium\Exception; -class InvalidJobException extends \Beryllium\Exception {} +class InvalidJobException extends \Beryllium\Exception +{ +} diff --git a/src/Exception/LockedMutexException.php b/src/Exception/LockedMutexException.php index ebedbfc..897fbc9 100644 --- a/src/Exception/LockedMutexException.php +++ b/src/Exception/LockedMutexException.php @@ -2,4 +2,6 @@ namespace Beryllium\Exception; -class LockedMutexException extends \Beryllium\Exception {} +class LockedMutexException extends \Beryllium\Exception +{ +} diff --git a/src/Job.php b/src/Job.php index 27e5562..35521c4 100644 --- a/src/Job.php +++ b/src/Job.php @@ -6,117 +6,117 @@ class Job { - /** - * Unserialize the given data to a job - * - * @param string $data - * @return Job - */ - public static function unserialize(string $data) : ?Job - { - $data = json_decode($data, true); + /** + * Unserialize the given data to a job + * + * @param string $data + * @return Job + */ + public static function unserialize(string $data) : ?Job + { + $data = json_decode($data, true); - if ( - (!isset($data['id'])) || - (!isset($data['action'])) || - (!isset($data['data'])) - ) { - return null; - } + if ( + (!isset($data['id'])) || + (!isset($data['action'])) || + (!isset($data['data'])) + ) { + return null; + } - return new Job($data['id'], $data['action'], $data['data']); - } + return new Job($data['id'], $data['action'], $data['data']); + } - /** - * The jobs id - * - * @var string - */ - private string $id; + /** + * The jobs id + * + * @var string + */ + private string $id; - /** - * The jobs action - * - * @var string - */ - private string $action; + /** + * The jobs action + * + * @var string + */ + private string $action; - /** - * The jobs parameters - * - * @var array - */ - private array $parameters; + /** + * The jobs parameters + * + * @var array + */ + private array $parameters; - /** - * Construct - * - * @param string $id - * @param string $action - * @param array $parameters - */ - public function __construct(string $id, string $action, array $parameters = []) - { - $this->id = $id; - $this->action = $action; - $this->parameters = $parameters; - } + /** + * Construct + * + * @param string $id + * @param string $action + * @param array $parameters + */ + public function __construct(string $id, string $action, array $parameters = []) + { + $this->id = $id; + $this->action = $action; + $this->parameters = $parameters; + } - /** - * Get the jobs id - * - * @return string - */ - public function id() : string - { - return $this->id; - } + /** + * Get the jobs id + * + * @return string + */ + public function id() : string + { + return $this->id; + } - /** - * Get the jobs action - * - * @return string - */ - public function action() : string - { - return $this->action; - } + /** + * Get the jobs action + * + * @return string + */ + public function action() : string + { + return $this->action; + } - /** - * Get the jobs parameters - * - * @return array - */ - public function parameters() : array - { - return $this->parameters; - } + /** + * Get the jobs parameters + * + * @return array + */ + public function parameters() : array + { + return $this->parameters; + } - /** - * Get a specific parameter from the job - * - * @param string $key - * @param mixed $default - * @return mixed - */ - public function parameter(string $key, $default = null) - { - return $this->parameters[$key] ?? $default; - } + /** + * Get a specific parameter from the job + * + * @param string $key + * @param mixed $default + * @return mixed + */ + public function parameter(string $key, $default = null) + { + return $this->parameters[$key] ?? $default; + } - /** - * Serialize the Job - * - * @throws InvalidJobException - * - * @return string - */ - public function serialize() : string - { - if (($serialized = json_encode(['id' => $this->id, 'action' => $this->action, 'data' => $this->parameters])) === false) { - throw new InvalidJobException("Could not serialize Beryllium Job with ID '{$this->id}'. " . json_last_error_msg()); - } + /** + * Serialize the Job + * + * @throws InvalidJobException + * + * @return string + */ + public function serialize() : string + { + if (($serialized = json_encode(['id' => $this->id, 'action' => $this->action, 'data' => $this->parameters])) === false) { + throw new InvalidJobException("Could not serialize Beryllium Job with ID '{$this->id}'. " . json_last_error_msg()); + } - return $serialized; - } + return $serialized; + } } diff --git a/src/Mutex.php b/src/Mutex.php index 266da73..e04a3bc 100644 --- a/src/Mutex.php +++ b/src/Mutex.php @@ -134,10 +134,10 @@ public function safeExec(callable $callback, int $ttl = 10) try { $callback($this->token); - } catch(\Exception $e) { + } catch (\Exception $e) { $this->unlock(); throw $e; - } catch(\Error $e) { + } catch (\Error $e) { $this->unlock(); throw $e; } diff --git a/src/ProcessManager.php b/src/ProcessManager.php index ee4208d..d257deb 100644 --- a/src/ProcessManager.php +++ b/src/ProcessManager.php @@ -5,116 +5,116 @@ use Symfony\Component\Process\Process; class ProcessManager -{ - /** - * A queue instance - * - * @var Queue - */ - protected $queue; - - /** - * An array of workers - * - * @var array - */ - protected $workers = []; - - /** - * Maximum number of workers - * - * @var int - */ - protected $maxWorkers = 32; - - /** - * Should we exit the main loop - * - * @var bool - */ - protected $shouldExit = false; - - /** - * Idle wait in microseconds - * - * @var int - */ - protected $idleWait = 10000; - - /** - * The process pattern - * - * @var string - */ - protected $processPattern; - - /** - * Construct - * - * @param Queue $queue +{ + /** + * A queue instance + * + * @var Queue + */ + protected $queue; + + /** + * An array of workers + * + * @var array + */ + protected $workers = []; + + /** + * Maximum number of workers + * + * @var int + */ + protected $maxWorkers = 32; + + /** + * Should we exit the main loop + * + * @var bool + */ + protected $shouldExit = false; + + /** + * Idle wait in microseconds + * + * @var int + */ + protected $idleWait = 10000; + + /** + * The process pattern + * + * @var string + */ + protected $processPattern; + + /** + * Construct + * + * @param Queue $queue * @param string $processPattern - */ - public function __construct(Queue $queue, string $processPattern) - { - $this->queue = $queue; - $this->processPattern = $processPattern; - } - - /** - * Start the main loop - * ! This method is blocking ! - * - * @return void - */ - public function work(bool $verbose = false, bool $printWorkerOutput = false) - { - while(!$this->shouldExit) - { - usleep($this->idleWait); - - // check the worker status - foreach($this->workers as $jobId => $process) - { - if (!$process->isRunning()) - { - // if the process failed we might retry - if (!$process->isSuccessful()) { - if ($verbose) echo "[{$jobId}] failed\n"; - $this->queue->considerRetry($jobId); - } else { - if ($verbose) echo "[{$jobId}] success\n"; - $this->queue->done($jobId); - } - - if ($printWorkerOutput) echo "[{$jobId}] {$process->getOutput()}\n"; - - unset($this->workers[$jobId]); + */ + public function __construct(Queue $queue, string $processPattern) + { + $this->queue = $queue; + $this->processPattern = $processPattern; + } + + /** + * Start the main loop + * ! This method is blocking ! + * + * @return void + */ + public function work(bool $verbose = false, bool $printWorkerOutput = false) + { + while (!$this->shouldExit) + { + usleep($this->idleWait); + + // check the worker status + foreach($this->workers as $jobId => $process) + { + if (!$process->isRunning()) + { + // if the process failed we might retry + if (!$process->isSuccessful()) { + if ($verbose) echo "[{$jobId}] failed\n"; + $this->queue->considerRetry($jobId); + } else { + if ($verbose) echo "[{$jobId}] success\n"; + $this->queue->done($jobId); + } + + if ($printWorkerOutput) echo "[{$jobId}] {$process->getOutput()}\n"; + + unset($this->workers[$jobId]); // update the number of active jobs $this->queue->statsSetActiveWorkers(count($this->workers)); - } - } + } + } - if (count($this->workers) >= $this->maxWorkers) { - continue; - } + if (count($this->workers) >= $this->maxWorkers) { + continue; + } - // get the next job - if (!$jobId = $this->queue->getNextJobId()) { - continue; - } + // get the next job + if (!$jobId = $this->queue->getNextJobId()) { + continue; + } - $process = new Process(explode(' ', sprintf($this->processPattern, $jobId))); - $process->start(); + $process = new Process(explode(' ', sprintf($this->processPattern, $jobId))); + $process->start(); - $this->workers[$jobId] = $process; + $this->workers[$jobId] = $process; - if ($verbose) echo "[{$jobId}] starting\n"; + if ($verbose) echo "[{$jobId}] starting\n"; - // update the number of active jobs - $this->queue->statsSetActiveWorkers(count($this->workers)); - } - } + // update the number of active jobs + $this->queue->statsSetActiveWorkers(count($this->workers)); + } + } /** * Get the sleeptime @@ -129,7 +129,7 @@ public function getIdleWait() : int /** * Set the sleep time in microseconds * - * @param int $idleWait + * @param int $idleWait * @return self */ public function setIdleWait(int $idleWait) @@ -150,7 +150,7 @@ public function getMaxWorkers() : int /** * Set maximum allowed number of concurrent workers * - * @param int $maxWorkers + * @param int $maxWorkers * @return self */ public function setMaxWorkers(int $maxWorkers) diff --git a/src/Queue.php b/src/Queue.php index f2350f1..549cf87 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -5,149 +5,160 @@ use Beryllium\Driver\DriverInterface; class Queue -{ - /** - * The driver to read the queue - */ - protected DriverInterface $driver; - - /** - * Constructor - */ - public function __construct(DriverInterface $driver) - { - $this->driver = $driver; - } - - /** - * Retrieve the id of the next job or null if there is nothing todo - * - * @return string|null - */ - public function getNextJobId() : ?string - { - return $this->driver->popWaitingId(); - } - - /** - * Get a specific job from the queue by id - * - * @param string $jobId - * @return Job|null - */ - public function get(string $jobId) : ?Job - { - return $this->driver->get($jobId); - } - - /** - * Create a new job and enqueue it. - * - * @param string $action The action to perform - * @param array $parameters The parameters for the job - * @param int $maxRetries The maximum number of retries - * @param string $jobId Speicify the job id instead of generating random one - * - * @return Job Returns the job - */ - public function add(string $action, array $parameters = [], int $maxRetries = 3, string $jobId = '') : Job - { - $job = new Job( - $jobId !== '' ? $jobId : uniqid('', true), - $action, - $parameters - ); - $this->driver->add($job, $maxRetries); - return $job; - } - - /** - * Mark the given job as done - * - * @param string $jobId - * @return void - */ - public function done(string $jobId) : void - { - $this->driver->cleanup($jobId); - } - - /** - * Consider a retry of the job - * - * @param string $jobId - * @return bool - */ - public function considerRetry(string $jobId) : bool - { - $maxRetries = $this->driver->getMaxRetries($jobId); - $attempts = $this->driver->attemptCount($jobId); - - // if one of the values has never been set we are unable to retry - if ($maxRetries === -1 || $attempts === -1) { - return false; - } - - if ($maxRetries > $attempts) { - $this->driver->retry($jobId); return true; - } - - return false; - } - - /** - * Returns a queue stat value by key - * - * @param string $key - * @return mixed - */ - public function statsGetValue(string $key) - { - return $this->driver->getStatsValue($key); - } - - /** - * Stores a queue stat value by key - * - * @param string $key - * @param mixed $value - * @return void - */ - public function statsSetValue(string $key, $value) : void - { - $this->driver->storeStatsValue($key, $value); - } - - /** - * Store the current number of active workers - * This is used for debugging and maintanance - * - * @param int $num - * @return void - */ - public function statsSetActiveWorkers(int $num) : void - { - $this->driver->storeStatsValue('active_workers', $num); - } - - /** - * Store the current number of active workers - * This is used for debugging and maintanance - * - * @return int - */ - public function statsGetActiveWorkers() : int - { - return $this->driver->getStatsValue('active_workers'); - } - - /** - * Returns the number of jobs currently in queue - * - * @return int - */ - public function waitingCount() : int - { - return $this->driver->waitingCount(); - } +{ + /** + * The driver to read the queue + */ + protected DriverInterface $driver; + + /** + * Constructor + */ + public function __construct(DriverInterface $driver) + { + $this->driver = $driver; + } + + /** + * Retrieve the id of the next job or null if there is nothing todo + * + * @return string|null + */ + public function getNextJobId() : ?string + { + return $this->driver->popWaitingId(); + } + + /** + * Get a specific job from the queue by id + * + * @param string $jobId + * @return Job|null + */ + public function get(string $jobId) : ?Job + { + return $this->driver->get($jobId); + } + + /** + * Check if a job exists in the queue + * + * @param string $jobId + * @return bool + */ + public function exists(string $jobId) : bool + { + return $this->driver->exists($jobId); + } + + /** + * Create a new job and enqueue it. + * + * @param string $action The action to perform + * @param array $parameters The parameters for the job + * @param int $maxRetries The maximum number of retries + * @param string $jobId Speicify the job id instead of generating random one + * + * @return Job Returns the job + */ + public function add(string $action, array $parameters = [], int $maxRetries = 3, string $jobId = '') : Job + { + $job = new Job( + $jobId !== '' ? $jobId : uniqid('', true), + $action, + $parameters + ); + $this->driver->add($job, $maxRetries); + return $job; + } + + /** + * Mark the given job as done + * + * @param string $jobId + * @return void + */ + public function done(string $jobId) : void + { + $this->driver->cleanup($jobId); + } + + /** + * Consider a retry of the job + * + * @param string $jobId + * @return bool + */ + public function considerRetry(string $jobId) : bool + { + $maxRetries = $this->driver->getMaxRetries($jobId); + $attempts = $this->driver->attemptCount($jobId); + + // if one of the values has never been set we are unable to retry + if ($maxRetries === -1 || $attempts === -1) { + return false; + } + + if ($maxRetries > $attempts) { + $this->driver->retry($jobId); return true; + } + + return false; + } + + /** + * Returns a queue stat value by key + * + * @param string $key + * @return mixed + */ + public function statsGetValue(string $key) + { + return $this->driver->getStatsValue($key); + } + + /** + * Stores a queue stat value by key + * + * @param string $key + * @param mixed $value + * @return void + */ + public function statsSetValue(string $key, $value) : void + { + $this->driver->storeStatsValue($key, $value); + } + + /** + * Store the current number of active workers + * This is used for debugging and maintanance + * + * @param int $num + * @return void + */ + public function statsSetActiveWorkers(int $num) : void + { + $this->driver->storeStatsValue('active_workers', $num); + } + + /** + * Store the current number of active workers + * This is used for debugging and maintanance + * + * @return int + */ + public function statsGetActiveWorkers() : int + { + return $this->driver->getStatsValue('active_workers'); + } + + /** + * Returns the number of jobs currently in queue + * + * @return int + */ + public function waitingCount() : int + { + return $this->driver->waitingCount(); + } } From 7316679b8f64bfba176aa561645e26f29cff2224 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 09:32:28 +0200 Subject: [PATCH 03/14] PJAS-466 Fix phpstan --- src/Driver/RedisDriver.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Driver/RedisDriver.php b/src/Driver/RedisDriver.php index 4319d26..c0f3bff 100644 --- a/src/Driver/RedisDriver.php +++ b/src/Driver/RedisDriver.php @@ -149,7 +149,7 @@ public function exists(string $id) : bool */ public function popWaitingId() : ?string { - return $this->redis->rPop($this->queuePrefix . static::REDIS_KEY_WAITLIST) ?: null; + return (string)$this->redis->rPop($this->queuePrefix . static::REDIS_KEY_WAITLIST) ?: null; } /** From 19504f8be71a6828844e7408f6d6b1684b844fb3 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 09:39:52 +0200 Subject: [PATCH 04/14] PJAS-466 Add private repo auth --- composer.json | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/composer.json b/composer.json index d6d4613..0a88e15 100755 --- a/composer.json +++ b/composer.json @@ -17,9 +17,18 @@ "phpstan/phpstan": "^1.7.0", "squizlabs/php_codesniffer": "^3.5" }, + "repositories": [ + { + "type": "composer", + "url": "https://repos-php.cineman.ch", + "options": { "http": { + "header": [ "X-Access-Token: BBvDcoQjBCC4MCN8YBZyzPwbZqUZfKLnHvv8ZQVp4RwVDr6dyMuaTYLCBiyBRUmT" ] + }} + } + ], "autoload": { "psr-4": { - "Beryllium\\": "src/" + "Beryllium\\": "src/" } }, "autoload-dev": { From 6430679709000300f9e8a6aebe6a0b79143a1112 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 12:37:05 +0200 Subject: [PATCH 05/14] PJAS-466 Refactor annotations --- phpstan.neon | 7 --- src/Driver/DriverInterface.php | 52 ++++++++++++--------- src/Driver/PHPArrayDriver.php | 52 ++++++++++++--------- src/Driver/RedisDriver.php | 84 +++++++++++++++++++--------------- src/Job.php | 58 +++++++++++------------ src/Locker.php | 6 ++- src/Mutex.php | 51 +++++++++++---------- src/ProcessManager.php | 16 ++++--- src/Queue.php | 27 +++++++---- 9 files changed, 198 insertions(+), 155 deletions(-) delete mode 100644 phpstan.neon diff --git a/phpstan.neon b/phpstan.neon deleted file mode 100644 index 5faee1b..0000000 --- a/phpstan.neon +++ /dev/null @@ -1,7 +0,0 @@ -parameters: - ignoreErrors: - bootstrapFiles: - - test-bootstrap.php - level: 8 - paths: - - src diff --git a/src/Driver/DriverInterface.php b/src/Driver/DriverInterface.php index e78fade..39116de 100644 --- a/src/Driver/DriverInterface.php +++ b/src/Driver/DriverInterface.php @@ -3,7 +3,6 @@ namespace Beryllium\Driver; use Beryllium\Job; -use Generator; interface DriverInterface { @@ -16,8 +15,9 @@ interface DriverInterface /** * Adds the given Job to the queue * - * @param Job $job - * @param int $maxRetries + * @param Job $job + * @param int $maxRetries + * * @return void */ public function add(Job $job, int $maxRetries = 3) : void; @@ -25,14 +25,16 @@ public function add(Job $job, int $maxRetries = 3) : void; /** * Get a job instance by the given id. * - * @param string $id The Job identifier. + * @param string $id The Job identifier. + * * @return Job */ public function get(string $id) : ?Job; /** * Check if a job exists in the queue - * @param string $id + * + * @param string $id The Job identifier. * * @return bool */ @@ -55,7 +57,8 @@ public function waitingCount() : int; /** * Reinsert the job into the waitlist * - * @param string $id The Job identifier. + * @param string $id The Job identifier. + * * @return void */ public function retry(string $id) : void; @@ -63,23 +66,26 @@ public function retry(string $id) : void; /** * Get the maximum number of attempts we should try for the job * - * @param string $id The Job identifier. - * @return int Returns -1 if the job has never been executed + * @param string $id The Job identifier. + * + * @return int Returns -1 if the job has never been executed */ public function getMaxRetries(string $id) : int; /** * Get the number of attempts this job already had. * - * @param string $id The Job identifier. - * @return int Returns -1 if the job has never been executed + * @param string $id The Job identifier. + * + * @return int Returns -1 if the job has never been executed */ public function attemptCount(string $id) : int; /** * Cleanup the jobs data * - * @param string $id The Job identifier. + * @param string $id The Job identifier. + * * @return void */ public function cleanup(string $id) : void; @@ -101,8 +107,9 @@ public function clearEverything() : void; /** * Simply store a value * - * @param string $key - * @param mixed $value + * @param string $key + * @param mixed $value + * * @return void */ public function storeStatsValue(string $key, $value) : void; @@ -110,7 +117,8 @@ public function storeStatsValue(string $key, $value) : void; /** * Simply get a value * - * @param string $key + * @param string $key + * * @return mixed */ public function getStatsValue(string $key); @@ -124,7 +132,8 @@ public function getStatsValue(string $key); /** * Checks if the given key is locked on the driver. * - * @param string $key + * @param string $key + * * @return bool */ public function isLocked(string $key) : bool; @@ -132,7 +141,8 @@ public function isLocked(string $key) : bool; /** * Returns the locks token * - * @param string $key + * @param string $key + * * @return string */ public function getLockToken(string $key) : ?string; @@ -140,9 +150,9 @@ public function getLockToken(string $key) : ?string; /** * Creates a lock entry on the driver, this must be synchronised! * - * @param string $key - * @param string $token - * @param int $ttl + * @param string $key + * @param string $token + * @param int $ttl * * @return bool Returns true if the lock could be created */ @@ -152,8 +162,8 @@ public function lock(string $key, string $token, int $ttl) : bool; * Removes a lock entry on the driver, this must be synchronised! * Also the lock for the key should only be removed if the token matches! * - * @param string $key - * @param string $token + * @param string $key + * @param string $token * * @return bool Retruns true if the lock could be removed. */ diff --git a/src/Driver/PHPArrayDriver.php b/src/Driver/PHPArrayDriver.php index a9da4a5..91a7daa 100644 --- a/src/Driver/PHPArrayDriver.php +++ b/src/Driver/PHPArrayDriver.php @@ -3,7 +3,6 @@ namespace Beryllium\Driver; use Beryllium\Job; -use Generator; class PHPArrayDriver implements DriverInterface { @@ -42,8 +41,9 @@ class PHPArrayDriver implements DriverInterface /** * Adds the given Job to the queue * - * @param Job $job - * @param int $maxRetries + * @param Job $job + * @param int $maxRetries + * * @return void */ public function add(Job $job, int $maxRetries = 3) : void @@ -57,7 +57,8 @@ public function add(Job $job, int $maxRetries = 3) : void /** * Get a job instance by the given id. * - * @param string $id The Job identifier. + * @param string $id The Job identifier. + * * @return Job */ public function get(string $id) : ?Job @@ -67,7 +68,8 @@ public function get(string $id) : ?Job /** * Check if a job exists in the queue - * @param string $id + * + * @param string $id The Job identifier. * * @return bool */ @@ -99,7 +101,8 @@ public function waitingCount() : int /** * Reinsert the job into the waitlist * - * @param string $id The Job identifier. + * @param string $id The Job identifier. + * * @return void */ public function retry(string $id) : void @@ -111,8 +114,9 @@ public function retry(string $id) : void /** * Get the maximum number of attempts we should try for the job * - * @param string $id The Job identifier. - * @return int Returns -1 if the job has never been executed + * @param string $id The Job identifier. + * + * @return int Returns -1 if the job has never been executed */ public function getMaxRetries(string $id) : int { @@ -122,8 +126,9 @@ public function getMaxRetries(string $id) : int /** * Get the number of attempts this job already had. * - * @param string $id The Job identifier. - * @return int Returns -1 if the job has never been executed + * @param string $id The Job identifier. + * + * @return int Returns -1 if the job has never been executed */ public function attemptCount(string $id) : int { @@ -133,7 +138,8 @@ public function attemptCount(string $id) : int /** * Cleanup the jobs data * - * @param string $id The Job identifier. + * @param string $id The Job identifier. + * * @return void */ public function cleanup(string $id) : void @@ -162,8 +168,9 @@ public function clearEverything() : void /** * Simply store a value * - * @param string $key - * @param mixed $value + * @param string $key + * @param mixed $value + * * @return void */ public function storeStatsValue(string $key, $value) : void @@ -174,7 +181,8 @@ public function storeStatsValue(string $key, $value) : void /** * Simply get a value * - * @param string $key + * @param string $key + * * @return mixed */ public function getStatsValue(string $key) @@ -185,7 +193,8 @@ public function getStatsValue(string $key) /** * Checks if the given key is locked on the driver. * - * @param string $key + * @param string $key + * * @return bool */ public function isLocked(string $key) : bool @@ -196,7 +205,8 @@ public function isLocked(string $key) : bool /** * Returns the locks token * - * @param string $key + * @param string $key + * * @return string */ public function getLockToken(string $key) : ?string @@ -207,9 +217,9 @@ public function getLockToken(string $key) : ?string /** * Creates a lock entry on the driver, this must be synchronised! * - * @param string $key - * @param string $token - * @param int $ttl + * @param string $key + * @param string $token + * @param int $ttl * * @return bool Returns true if the lock could be created */ @@ -227,8 +237,8 @@ public function lock(string $key, string $token, int $ttl) : bool * Removes a lock entry on the driver, this must be synchronised! * Also the lock for the key should only be removed if the token matches! * - * @param string $key - * @param string $token + * @param string $key + * @param string $token * * @return bool Retruns true if the lock could be removed. */ diff --git a/src/Driver/RedisDriver.php b/src/Driver/RedisDriver.php index c0f3bff..7931dfc 100644 --- a/src/Driver/RedisDriver.php +++ b/src/Driver/RedisDriver.php @@ -6,10 +6,18 @@ use Beryllium\Job; use Beryllium\Exception\InvalidDataException; -use Generator; class RedisDriver implements DriverInterface { + /** + * Redis Keys + */ + public const REDIS_KEY_WAITLIST = 'waitlist'; + public const REDIS_KEY_ATTEMPT = 'attempt.'; + public const REDIS_KEY_MAX_RETRIES = 'max_retries.'; + public const REDIS_KEY_DATA = 'data.'; + public const REDIS_KEY_STATS = 'stats.'; + /** * Get the redis connection */ @@ -25,15 +33,6 @@ class RedisDriver implements DriverInterface */ protected string $lockPrefix = 'beryllium.lock.'; - /** - * Redis Keys - */ - const REDIS_KEY_WAITLIST = 'waitlist'; - const REDIS_KEY_ATTEMPT = 'attempt.'; - const REDIS_KEY_MAX_RETRIES = 'max_retries.'; - const REDIS_KEY_DATA = 'data.'; - const REDIS_KEY_STATS = 'stats.'; - /** * Constructor */ @@ -82,10 +81,12 @@ public function setLockKeyPrefix(string $prefix) } /** - * Add a job to the queue + * Adds the given Job to the queue * - * @param Job $job - * @param int $maxRetries + * @param Job $job + * @param int $maxRetries + * + * @return void */ public function add(Job $job, int $maxRetries = 3) : void { @@ -105,9 +106,10 @@ public function add(Job $job, int $maxRetries = 3) : void } /** - * Get a job by id + * Get a job instance by the given id. * - * @param string $id + * @param string $id The Job identifier. + * * @return Job */ public function get(string $id) : ?Job @@ -123,7 +125,8 @@ public function get(string $id) : ?Job /** * Check if a job exists in the queue - * @param string $id + * + * @param string $id The Job identifier. * * @return bool */ @@ -145,7 +148,7 @@ public function exists(string $id) : bool /** * Get the ID of a waiting job * - * @return string + * @return string|null Returns null if no job is queued. */ public function popWaitingId() : ?string { @@ -165,7 +168,8 @@ public function waitingCount() : int /** * Reinsert the job into the waitlist * - * @param string $id + * @param string $id The Job identifier. + * * @return void */ public function retry(string $id) : void @@ -177,8 +181,9 @@ public function retry(string $id) : void /** * Get the maximum number of attempts we should try for the job * - * @param string $id - * @return int + * @param string $id The Job identifier. + * + * @return int Returns -1 if the job has never been executed */ public function getMaxRetries(string $id) : int { @@ -187,10 +192,11 @@ public function getMaxRetries(string $id) : int } /** - * Get the number of attempts for the job + * Get the number of attempts this job already had. * - * @param string $id - * @return int + * @param string $id The Job identifier. + * + * @return int Returns -1 if the job has never been executed */ public function attemptCount(string $id) : int { @@ -201,7 +207,9 @@ public function attemptCount(string $id) : int /** * Cleanup the jobs data * - * @param string $id + * @param string $id The Job identifier. + * + * @return void */ public function cleanup(string $id) : void { @@ -213,7 +221,7 @@ public function cleanup(string $id) : void } /** - * Will clear freaking everything + * Will clear freaking everything releated to the driver * !!! Attention with this one.. * * @return void @@ -233,8 +241,9 @@ public function clearEverything() : void /** * Simply store a value * - * @param string $key - * @param mixed $value + * @param string $key + * @param mixed $value + * * @return void */ public function storeStatsValue(string $key, $value) : void @@ -245,7 +254,8 @@ public function storeStatsValue(string $key, $value) : void /** * Simply get a value * - * @param string $key + * @param string $key + * * @return mixed */ public function getStatsValue(string $key) @@ -266,7 +276,8 @@ public function getStatsValue(string $key) /** * Checks if the given key is locked on the driver. * - * @param string $key + * @param string $key + * * @return bool */ public function isLocked(string $key) : bool @@ -277,7 +288,8 @@ public function isLocked(string $key) : bool /** * Returns the locks token * - * @param string $key + * @param string $key + * * @return string */ public function getLockToken(string $key) : ?string @@ -288,9 +300,9 @@ public function getLockToken(string $key) : ?string /** * Creates a lock entry on the driver, this must be synchronised! * - * @param string $key - * @param string $token - * @param int $ttl + * @param string $key + * @param string $token + * @param int $ttl * * @return bool Returns true if the lock could be created */ @@ -304,10 +316,10 @@ public function lock(string $key, string $token, int $ttl) : bool * Removes a lock entry on the driver, this must be synchronised! * Also the lock for the key should only be removed if the token matches! * - * @param string $key - * @param string $token + * @param string $key + * @param string $token * - * @return bool + * @return bool Retruns true if the lock could be removed. */ public function unlock(string $key, string $token) : bool { diff --git a/src/Job.php b/src/Job.php index 35521c4..b1bdf74 100644 --- a/src/Job.php +++ b/src/Job.php @@ -6,27 +6,6 @@ class Job { - /** - * Unserialize the given data to a job - * - * @param string $data - * @return Job - */ - public static function unserialize(string $data) : ?Job - { - $data = json_decode($data, true); - - if ( - (!isset($data['id'])) || - (!isset($data['action'])) || - (!isset($data['data'])) - ) { - return null; - } - - return new Job($data['id'], $data['action'], $data['data']); - } - /** * The jobs id * @@ -51,9 +30,9 @@ public static function unserialize(string $data) : ?Job /** * Construct * - * @param string $id - * @param string $action - * @param array $parameters + * @param string $id + * @param string $action + * @param array $parameters */ public function __construct(string $id, string $action, array $parameters = []) { @@ -95,11 +74,12 @@ public function parameters() : array /** * Get a specific parameter from the job * - * @param string $key - * @param mixed $default + * @param string $key + * @param mixed $default + * * @return mixed */ - public function parameter(string $key, $default = null) + public function parameter(string $key, $default = null) : mixed { return $this->parameters[$key] ?? $default; } @@ -107,9 +87,9 @@ public function parameter(string $key, $default = null) /** * Serialize the Job * - * @throws InvalidJobException - * * @return string + * + * @throws InvalidJobException */ public function serialize() : string { @@ -119,4 +99,24 @@ public function serialize() : string return $serialized; } + + /** + * Unserialize the given data to a job + * + * @param string $data + * + * @return Job + */ + public static function unserialize(string $data) : ?Job + { + $data = json_decode($data, true); + + if ((!isset($data['id'])) || + (!isset($data['action'])) || + (!isset($data['data']))) { + return null; + } + + return new Job($data['id'], $data['action'], $data['data']); + } } diff --git a/src/Locker.php b/src/Locker.php index 7dbeacb..66a9b9d 100644 --- a/src/Locker.php +++ b/src/Locker.php @@ -16,7 +16,8 @@ class Locker /** * Constructor * - * @param DriverInterface $driver The beryllium driver. + * @param DriverInterface $driver The beryllium driver. + * * @return void */ public function __construct(DriverInterface $driver) @@ -27,7 +28,8 @@ public function __construct(DriverInterface $driver) /** * Creates a mutex with the given key and the locker assigned driver * - * @param string $lockkey + * @param string $lockkey + * * @return Mutex */ public function mutex(string $lockkey) : Mutex diff --git a/src/Mutex.php b/src/Mutex.php index e04a3bc..36b9d7c 100644 --- a/src/Mutex.php +++ b/src/Mutex.php @@ -3,11 +3,16 @@ namespace Beryllium; use Beryllium\Driver\DriverInterface; - use Beryllium\Exception\LockedMutexException; class Mutex { + /** + * Error codes + */ + public const ERROR_ALREADY_LOCKED = 5; + public const ERROR_UNLOCK_FAILURE = 10; + /** * Driver instance * @@ -30,17 +35,12 @@ class Mutex */ private string $token; - /** - * Error codes - */ - const ERROR_ALREADY_LOCKED = 5; - const ERROR_UNLOCK_FAILURE = 10; - /** * Constructor * - * @param DriverInterface $driver The beryllium driver. - * @param string $lockkey + * @param DriverInterface $driver The beryllium driver. + * @param string $lockkey + * * @return void */ public function __construct(DriverInterface $driver, string $lockkey) @@ -61,13 +61,14 @@ public function getMutexKey() : string /** * Lock the mutex - * - * @throws LockedMutexException * - * @param int $ttl Max time to live in seconds. + * @param int $ttl Max time to live in seconds. + * * @return void + * + * @throws LockedMutexException */ - public function lock(int $ttl = 30) + public function lock(int $ttl = 30) : void { // generate a token $this->token = uniqid(); @@ -104,16 +105,19 @@ public function ownsLock() : bool /** * Lock the mutex - * - * @throws LockedMutexException * - * @return void + * @return void + * + * @throws LockedMutexException */ - public function unlock() + public function unlock() : void { // try to lock on the driver if (!$this->driver->unlock($this->lockkey, $this->token)) { - throw new LockedMutexException("The mutex ($this->lockkey) could not be unlocked, either its been locked by another instance or it does not exist.", static::ERROR_UNLOCK_FAILURE); + throw new LockedMutexException( + "The mutex ($this->lockkey) could not be unlocked, either its been locked by another instance or it does not exist.", + static::ERROR_UNLOCK_FAILURE + ); } } @@ -121,14 +125,15 @@ public function unlock() * Runs the given callback in a mutex locked enclosure. * Catches any error that might occour and unlocks the mutex and rethrows the error / exception. * + * @param callable $callback + * @param int $ttl + * + * @return void + * * @throws \Error * @throws \Exception - * - * @param callable $callback - * @param int $ttl - * @return void */ - public function safeExec(callable $callback, int $ttl = 10) + public function safeExec(callable $callback, int $ttl = 10) : void { $this->lock($ttl); diff --git a/src/ProcessManager.php b/src/ProcessManager.php index d257deb..2f5f70f 100644 --- a/src/ProcessManager.php +++ b/src/ProcessManager.php @@ -51,8 +51,8 @@ class ProcessManager /** * Construct * - * @param Queue $queue - * @param string $processPattern + * @param Queue $queue + * @param string $processPattern */ public function __construct(Queue $queue, string $processPattern) { @@ -66,7 +66,7 @@ public function __construct(Queue $queue, string $processPattern) * * @return void */ - public function work(bool $verbose = false, bool $printWorkerOutput = false) + public function work(bool $verbose = false, bool $printWorkerOutput = false) : void { while (!$this->shouldExit) { @@ -129,10 +129,11 @@ public function getIdleWait() : int /** * Set the sleep time in microseconds * - * @param int $idleWait + * @param int $idleWait + * * @return self */ - public function setIdleWait(int $idleWait) + public function setIdleWait(int $idleWait) : self { $this->idleWait = $idleWait; return $this; } @@ -150,10 +151,11 @@ public function getMaxWorkers() : int /** * Set maximum allowed number of concurrent workers * - * @param int $maxWorkers + * @param int $maxWorkers + * * @return self */ - public function setMaxWorkers(int $maxWorkers) + public function setMaxWorkers(int $maxWorkers) : self { $this->maxWorkers = $maxWorkers; return $this; } diff --git a/src/Queue.php b/src/Queue.php index 549cf87..2bbfb9c 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -13,6 +13,8 @@ class Queue /** * Constructor + * + * @param DriverInterface $driver The beryllium driver. */ public function __construct(DriverInterface $driver) { @@ -32,7 +34,8 @@ public function getNextJobId() : ?string /** * Get a specific job from the queue by id * - * @param string $jobId + * @param string $jobId + * * @return Job|null */ public function get(string $jobId) : ?Job @@ -43,7 +46,8 @@ public function get(string $jobId) : ?Job /** * Check if a job exists in the queue * - * @param string $jobId + * @param string $jobId + * * @return bool */ public function exists(string $jobId) : bool @@ -75,7 +79,8 @@ public function add(string $action, array $parameters = [], int $maxRetries = 3, /** * Mark the given job as done * - * @param string $jobId + * @param string $jobId + * * @return void */ public function done(string $jobId) : void @@ -86,7 +91,8 @@ public function done(string $jobId) : void /** * Consider a retry of the job * - * @param string $jobId + * @param string $jobId + * * @return bool */ public function considerRetry(string $jobId) : bool @@ -109,10 +115,11 @@ public function considerRetry(string $jobId) : bool /** * Returns a queue stat value by key * - * @param string $key + * @param string $key + * * @return mixed */ - public function statsGetValue(string $key) + public function statsGetValue(string $key) : mixed { return $this->driver->getStatsValue($key); } @@ -120,8 +127,9 @@ public function statsGetValue(string $key) /** * Stores a queue stat value by key * - * @param string $key - * @param mixed $value + * @param string $key + * @param mixed $value + * * @return void */ public function statsSetValue(string $key, $value) : void @@ -133,7 +141,8 @@ public function statsSetValue(string $key, $value) : void * Store the current number of active workers * This is used for debugging and maintanance * - * @param int $num + * @param int $num + * * @return void */ public function statsSetActiveWorkers(int $num) : void From 083060cd5ecbb01dbeaca8f2d6b3bb81288aa691 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 13:11:01 +0200 Subject: [PATCH 06/14] PJAS-466 Move redis connection params into phpunit.xml --- .gitignore | 1 + phpunit.xml | 6 ++++++ test-bootstrap.php | 8 ++++---- 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/.gitignore b/.gitignore index d191143..a5e4450 100755 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ vendor/ coverage/ composer.lock +.phpunit.result.cache diff --git a/phpunit.xml b/phpunit.xml index b59e82f..5b1a0c8 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -1,5 +1,11 @@ + + + + + + ./src diff --git a/test-bootstrap.php b/test-bootstrap.php index ce3e014..6aadeb2 100644 --- a/test-bootstrap.php +++ b/test-bootstrap.php @@ -16,10 +16,10 @@ /** * Redis hostname and port by define */ -if (!defined('BERYLLIUM_REDIS_HOST')) define('BERYLLIUM_REDIS_HOST', '127.0.0.1'); -if (!defined('BERYLLIUM_REDIS_PORT')) define('BERYLLIUM_REDIS_PORT', 6379); -if (!defined('BERYLLIUM_IDLE_WAIT')) define('BERYLLIUM_IDLE_WAIT', 10000); // 10ms -if (!defined('BERYLLIUM_MAX_WORKERS')) define('BERYLLIUM_MAX_WORKERS', 8); +if (!defined('BERYLLIUM_REDIS_HOST')) define('BERYLLIUM_REDIS_HOST', getenv('BERYLLIUM_REDIS_HOST')); +if (!defined('BERYLLIUM_REDIS_PORT')) define('BERYLLIUM_REDIS_PORT', getenv('BERYLLIUM_REDIS_PORT')); +if (!defined('BERYLLIUM_IDLE_WAIT')) define('BERYLLIUM_IDLE_WAIT', getenv('BERYLLIUM_IDLE_WAIT')); +if (!defined('BERYLLIUM_MAX_WORKERS')) define('BERYLLIUM_MAX_WORKERS', getenv('BERYLLIUM_MAX_WORKERS')); /** * Create a queue instance with the default configuration From 918bc9b96b8a921fe182be2bbc54aa0c55c6455d Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 13:21:03 +0200 Subject: [PATCH 07/14] PJAS-466 Split CI into two workflows and test against multiple php versions --- .github/workflows/quality.yml | 44 +++++++++++++++++++ .github/workflows/{hydrogen.yml => tests.yml} | 25 +++++++---- 2 files changed, 61 insertions(+), 8 deletions(-) create mode 100644 .github/workflows/quality.yml rename .github/workflows/{hydrogen.yml => tests.yml} (57%) diff --git a/.github/workflows/quality.yml b/.github/workflows/quality.yml new file mode 100644 index 0000000..0864333 --- /dev/null +++ b/.github/workflows/quality.yml @@ -0,0 +1,44 @@ +name: Code Quality + +on: + push: + branches: [ '*' ] + pull_request: + branches: [ '*' ] + +jobs: + build: + runs-on: ${{ matrix.operating-system }} + strategy: + matrix: + operating-system: ['ubuntu-latest'] + php-versions: ['8.1', '8.2', '8.3'] + + services: + redis: + image: redis + options: >- + --health-cmd "redis-cli ping" + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 6379:6379 + + steps: + - uses: actions/checkout@v2 + + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php-versions }} + tools: cs2pr + + - name: Install dependencies + run: composer install --prefer-dist --no-progress --no-suggest + + - name: Run phpcs + run: composer run-script ci-phpcs -- --report=checkstyle | cs2pr --graceful-warnings + + - name: Run phpstan + run: composer run-script ci-phpstan diff --git a/.github/workflows/hydrogen.yml b/.github/workflows/tests.yml similarity index 57% rename from .github/workflows/hydrogen.yml rename to .github/workflows/tests.yml index 5456d5a..487dbab 100644 --- a/.github/workflows/hydrogen.yml +++ b/.github/workflows/tests.yml @@ -1,14 +1,20 @@ -name: Hydrogen PHPUnit +name: Tests on: push: - branches: [ master, stage ] + branches: [ '*' ] pull_request: - branches: [ master, stage ] + branches: [ '*' ] jobs: build: - runs-on: ubuntu-latest + + runs-on: ${{ matrix.operating-system }} + strategy: + matrix: + operating-system: ['ubuntu-latest'] + php-versions: ['8.1', '8.2', '8.3'] + services: redis: image: redis @@ -19,16 +25,19 @@ jobs: --health-retries 5 ports: - 6379:6379 + steps: - uses: actions/checkout@v2 + - name: Setup PHP + uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php-versions }} + - name: Install dependencies run: composer install --prefer-dist --no-progress --no-suggest - name: Run PHPUnit - run: php vendor/bin/phpunit + run: composer run-script ci-phpcs env: PHPUNIT_BERYILLIUM_PARALLEL_WAIT: 15 - - - name: Run PHPStan - run: php vendor/bin/phpstan analyse src --error-format=github -l8 From 228e62e580a7347656f23549523057d36668f2c7 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 13:29:15 +0200 Subject: [PATCH 08/14] PJAS-466 Add phpunit command in composer --- .github/workflows/tests.yml | 2 +- composer.json | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 487dbab..d7f7187 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -38,6 +38,6 @@ jobs: run: composer install --prefer-dist --no-progress --no-suggest - name: Run PHPUnit - run: composer run-script ci-phpcs + run: composer run-script ci-phpunit env: PHPUNIT_BERYILLIUM_PARALLEL_WAIT: 15 diff --git a/composer.json b/composer.json index 0a88e15..06bb359 100755 --- a/composer.json +++ b/composer.json @@ -47,6 +47,9 @@ ], "ci-phpstan": [ "vendor/bin/phpstan analyse src --error-format=github -l8" + ], + "ci-phpunit": [ + "vendor/bin/phpunit" ] } } From 643f671d1c31aa261f6246e81b01d9c5574dd836 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 13:40:29 +0200 Subject: [PATCH 09/14] PJAS-466 Rename phpunit.xml into dist and ignore --- .gitignore | 1 + phpunit.xml => phpunit.xml.dist | 0 2 files changed, 1 insertion(+) rename phpunit.xml => phpunit.xml.dist (100%) diff --git a/.gitignore b/.gitignore index a5e4450..3c8e4de 100755 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,4 @@ vendor/ coverage/ composer.lock .phpunit.result.cache +phpunit.xml \ No newline at end of file diff --git a/phpunit.xml b/phpunit.xml.dist similarity index 100% rename from phpunit.xml rename to phpunit.xml.dist From 928dae70dfa9bff571570c0c4678983e5a3e0473 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Wed, 29 May 2024 13:47:28 +0200 Subject: [PATCH 10/14] PJAS-466 Update README.md --- README.md | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/README.md b/README.md index 3f6714d..67ad183 100644 --- a/README.md +++ b/README.md @@ -102,3 +102,31 @@ StartLimitBurst=10 WantedBy=multi-user.target ``` +### Code Quality + +Make sure the latest quality standards are met by executing the `phpcs` and `phpstan` scripts. There are three commands available which are defined in the root `composer.json` as custom scripts. + +Execute `phpcs` and `phpcbf` for linting and automatic fixing respectively: + +``` +composer run-script ci-phpcs +composer run-script ci-phpcs-fix +``` + +Execute `phpstan` to analyse the code to detect code issues. + +``` +composer run-script ci-phpstan +``` + +### Tests + +There are tests as part of this package in order to verify that everything works as expected. + +Execute the following command to run the tests: + +``` +composer run-script ci-phpunit +``` + +**Note: You need to configure your database connection first in `phpunit.xml` before running the tests. If `phpunit.xml` does not exist, copy the `phpunit.xml.dist` file.** \ No newline at end of file From f049ab0ece7b0e6790e6baf02266f841f6d2532e Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Thu, 27 Jun 2024 10:15:05 +0200 Subject: [PATCH 11/14] PJAS-466 Minor refactorings --- src/Driver/DriverInterface.php | 6 +++--- src/Driver/PHPArrayDriver.php | 6 +++--- src/Driver/RedisDriver.php | 33 +++++++++++++++++---------------- src/Job.php | 6 +----- src/Locker.php | 10 +--------- src/Mutex.php | 20 ++------------------ src/ProcessManager.php | 34 +++++----------------------------- src/Queue.php | 8 +------- 8 files changed, 33 insertions(+), 90 deletions(-) diff --git a/src/Driver/DriverInterface.php b/src/Driver/DriverInterface.php index 39116de..3de984e 100644 --- a/src/Driver/DriverInterface.php +++ b/src/Driver/DriverInterface.php @@ -27,7 +27,7 @@ public function add(Job $job, int $maxRetries = 3) : void; * * @param string $id The Job identifier. * - * @return Job + * @return Job|null */ public function get(string $id) : ?Job; @@ -121,7 +121,7 @@ public function storeStatsValue(string $key, $value) : void; * * @return mixed */ - public function getStatsValue(string $key); + public function getStatsValue(string $key) : mixed; /** * Locking System Methods @@ -143,7 +143,7 @@ public function isLocked(string $key) : bool; * * @param string $key * - * @return string + * @return string|null */ public function getLockToken(string $key) : ?string; diff --git a/src/Driver/PHPArrayDriver.php b/src/Driver/PHPArrayDriver.php index 91a7daa..1c0e8f6 100644 --- a/src/Driver/PHPArrayDriver.php +++ b/src/Driver/PHPArrayDriver.php @@ -59,7 +59,7 @@ public function add(Job $job, int $maxRetries = 3) : void * * @param string $id The Job identifier. * - * @return Job + * @return Job|null */ public function get(string $id) : ?Job { @@ -185,7 +185,7 @@ public function storeStatsValue(string $key, $value) : void * * @return mixed */ - public function getStatsValue(string $key) + public function getStatsValue(string $key) : mixed { return $this->stats[$key] ?? null; } @@ -207,7 +207,7 @@ public function isLocked(string $key) : bool * * @param string $key * - * @return string + * @return string|null */ public function getLockToken(string $key) : ?string { diff --git a/src/Driver/RedisDriver.php b/src/Driver/RedisDriver.php index 7931dfc..51f93c0 100644 --- a/src/Driver/RedisDriver.php +++ b/src/Driver/RedisDriver.php @@ -18,11 +18,6 @@ class RedisDriver implements DriverInterface public const REDIS_KEY_DATA = 'data.'; public const REDIS_KEY_STATS = 'stats.'; - /** - * Get the redis connection - */ - protected Redis $redis; - /** * The redis key prefix */ @@ -35,10 +30,11 @@ class RedisDriver implements DriverInterface /** * Constructor + * + * @param Redis $redis The redis instance. */ - public function __construct(Redis $redis) + public function __construct(protected Redis $redis) { - $this->redis = $redis; } /** @@ -50,20 +46,24 @@ public function __construct(Redis $redis) /** * Sets the queues key prefix * - * @param string $prefix + * @param string $prefix + * * @return void */ - public function setQueueKeyPrefix(string $prefix) + public function setQueueKeyPrefix(string $prefix) : void { $this->queuePrefix = $prefix; } /** + * Sets the key prefix + * * @deprecated - * @param string $prefix + * @param string $prefix + * * @return void */ - public function setKeyPrefix(string $prefix) + public function setKeyPrefix(string $prefix) : void { trigger_error('Method ' . __METHOD__ . ' is deprecated, use setQueueKeyPrefix instead.', E_USER_DEPRECATED); $this->setQueueKeyPrefix($prefix); @@ -72,10 +72,11 @@ public function setKeyPrefix(string $prefix) /** * Sets the lock key prefix * - * @param string $prefix + * @param string $prefix + * * @return void */ - public function setLockKeyPrefix(string $prefix) + public function setLockKeyPrefix(string $prefix) : void { $this->lockPrefix = $prefix; } @@ -110,7 +111,7 @@ public function add(Job $job, int $maxRetries = 3) : void * * @param string $id The Job identifier. * - * @return Job + * @return Job|null */ public function get(string $id) : ?Job { @@ -258,7 +259,7 @@ public function storeStatsValue(string $key, $value) : void * * @return mixed */ - public function getStatsValue(string $key) + public function getStatsValue(string $key) : mixed { if (($raw = $this->redis->get($this->queuePrefix . static::REDIS_KEY_STATS . $key)) === false) { throw new InvalidDataException("Could not read stats value from redis."); @@ -290,7 +291,7 @@ public function isLocked(string $key) : bool * * @param string $key * - * @return string + * @return string|null */ public function getLockToken(string $key) : ?string { diff --git a/src/Job.php b/src/Job.php index b1bdf74..bdac001 100644 --- a/src/Job.php +++ b/src/Job.php @@ -8,15 +8,11 @@ class Job { /** * The jobs id - * - * @var string */ private string $id; /** * The jobs action - * - * @var string */ private string $action; @@ -105,7 +101,7 @@ public function serialize() : string * * @param string $data * - * @return Job + * @return Job|null */ public static function unserialize(string $data) : ?Job { diff --git a/src/Locker.php b/src/Locker.php index 66a9b9d..cabd3f7 100644 --- a/src/Locker.php +++ b/src/Locker.php @@ -6,13 +6,6 @@ class Locker { - /** - * Driver instance - * - * @var DriverInterface - */ - private DriverInterface $driver; - /** * Constructor * @@ -20,9 +13,8 @@ class Locker * * @return void */ - public function __construct(DriverInterface $driver) + public function __construct(private DriverInterface $driver) { - $this->driver = $driver; } /** diff --git a/src/Mutex.php b/src/Mutex.php index 36b9d7c..c4e3a3d 100644 --- a/src/Mutex.php +++ b/src/Mutex.php @@ -12,21 +12,7 @@ class Mutex */ public const ERROR_ALREADY_LOCKED = 5; public const ERROR_UNLOCK_FAILURE = 10; - - /** - * Driver instance - * - * @var DriverInterface - */ - private DriverInterface $driver; - - /** - * Mutex key - * - * @var string - */ - private string $lockkey; - + /** * The mutex current token that ensures that only * this instance can unlock a lock. @@ -43,10 +29,8 @@ class Mutex * * @return void */ - public function __construct(DriverInterface $driver, string $lockkey) + public function __construct(private DriverInterface $driver, private string $lockkey) { - $this->driver = $driver; - $this->lockkey = $lockkey; } /** diff --git a/src/ProcessManager.php b/src/ProcessManager.php index 2f5f70f..0f7e476 100644 --- a/src/ProcessManager.php +++ b/src/ProcessManager.php @@ -6,47 +6,25 @@ class ProcessManager { - /** - * A queue instance - * - * @var Queue - */ - protected $queue; - /** * An array of workers - * - * @var array */ - protected $workers = []; + protected array $workers = []; /** * Maximum number of workers - * - * @var int */ - protected $maxWorkers = 32; + protected int $maxWorkers = 32; /** * Should we exit the main loop - * - * @var bool */ - protected $shouldExit = false; + protected bool $shouldExit = false; /** * Idle wait in microseconds - * - * @var int */ - protected $idleWait = 10000; - - /** - * The process pattern - * - * @var string - */ - protected $processPattern; + protected int $idleWait = 10000; /** * Construct @@ -54,10 +32,8 @@ class ProcessManager * @param Queue $queue * @param string $processPattern */ - public function __construct(Queue $queue, string $processPattern) + public function __construct(protected Queue $queue, protected string $processPattern) { - $this->queue = $queue; - $this->processPattern = $processPattern; } /** diff --git a/src/Queue.php b/src/Queue.php index 2bbfb9c..6aac139 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -6,19 +6,13 @@ class Queue { - /** - * The driver to read the queue - */ - protected DriverInterface $driver; - /** * Constructor * * @param DriverInterface $driver The beryllium driver. */ - public function __construct(DriverInterface $driver) + public function __construct(protected DriverInterface $driver) { - $this->driver = $driver; } /** From a1d9dd2307440761eaf3b78f7f5f544a5dc07397 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Thu, 27 Jun 2024 10:28:01 +0200 Subject: [PATCH 12/14] PJAS-466 Add array typehint --- src/ProcessManager.php | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/ProcessManager.php b/src/ProcessManager.php index 0f7e476..e821279 100644 --- a/src/ProcessManager.php +++ b/src/ProcessManager.php @@ -8,6 +8,8 @@ class ProcessManager { /** * An array of workers + * + * @var array */ protected array $workers = []; From 2b8149ba22410469adc93bcd541101088217fd00 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Thu, 27 Jun 2024 11:06:17 +0200 Subject: [PATCH 13/14] PJAS-466 Reorder namespaces --- src/Driver/RedisDriver.php | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/Driver/RedisDriver.php b/src/Driver/RedisDriver.php index 51f93c0..efa4916 100644 --- a/src/Driver/RedisDriver.php +++ b/src/Driver/RedisDriver.php @@ -2,10 +2,9 @@ namespace Beryllium\Driver; -use Redis; -use Beryllium\Job; - use Beryllium\Exception\InvalidDataException; +use Beryllium\Job; +use Redis; class RedisDriver implements DriverInterface { From 264ba4e4b3605ba1115eca41b168b8d79b174af8 Mon Sep 17 00:00:00 2001 From: Dragan Gjorgjiev Date: Thu, 27 Jun 2024 11:16:32 +0200 Subject: [PATCH 14/14] PJAS-466 Update constructor --- src/Job.php | 22 +--------------------- 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/src/Job.php b/src/Job.php index bdac001..9be282f 100644 --- a/src/Job.php +++ b/src/Job.php @@ -6,23 +6,6 @@ class Job { - /** - * The jobs id - */ - private string $id; - - /** - * The jobs action - */ - private string $action; - - /** - * The jobs parameters - * - * @var array - */ - private array $parameters; - /** * Construct * @@ -30,11 +13,8 @@ class Job * @param string $action * @param array $parameters */ - public function __construct(string $id, string $action, array $parameters = []) + public function __construct(private string $id, private string $action, private array $parameters = []) { - $this->id = $id; - $this->action = $action; - $this->parameters = $parameters; } /**