From b7e4a2d4d380a2ef512411cc80f75f1c1fcae0b0 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Wed, 27 Mar 2024 23:19:06 +0000 Subject: [PATCH 01/19] Add Config and Migration, Update Adapter.php --- config/di.php | 6 +++ src/Adapter.php | 51 ++++++++++++++++++------- src/Migrations/M240327150317Queue.php | 55 +++++++++++++++++++++++++++ 3 files changed, 98 insertions(+), 14 deletions(-) create mode 100644 config/di.php create mode 100644 src/Migrations/M240327150317Queue.php diff --git a/config/di.php b/config/di.php new file mode 100644 index 0000000..1f6a841 --- /dev/null +++ b/config/di.php @@ -0,0 +1,6 @@ +mutex = new FileMutex(__CLASS__ . $this->channel, sys_get_temp_dir()); } public function runExisting(callable $handlerCallback): void { - $result = true; - while (($payload = $this->reserve()) && ($result === true)) { - if ($result = $handlerCallback(\unserialize($payload['job']))) { - $this->release($payload); - } - } + $this->run($handlerCallback, false); } public function status(string|int $id): JobStatus @@ -92,7 +93,7 @@ public function push(MessageInterface $message): MessageInterface public function subscribe(callable $handlerCallback): void { - $this->runExisting($handlerCallback); + $this->run($handlerCallback, true, 5); // TWK TODO timeout should not be hard coded } public function withChannel(string $channel): self @@ -103,7 +104,8 @@ public function withChannel(string $channel): self $new = clone $this; $new->channel = $channel; - + $new->mutex = new FileMutex(__CLASS__ . $this->channel, sys_get_temp_dir()); + return $new; } @@ -115,10 +117,10 @@ public function withChannel(string $channel): self */ protected function reserve(): array|null { - // TWK TODO ??? return $this->db->useMaster(function () { - // TWK TODO ??? if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) { - // TWK TODO ??? throw new \Exception('Has not waited the lock.'); - // TWK TODO ??? } + // TWK TODO what is useMaster in Yii3 return $this->db->useMaster(function () { + if (!$this->mutex->acquire($this->mutexTimeout)) { + throw new \Exception('Has not waited the lock.'); + } try { $this->moveExpired(); @@ -147,7 +149,7 @@ protected function reserve(): array|null } } } finally { - // TWK TODO ??? $this->mutex->release(__CLASS__ . $this->channel); + $this->mutex->release(); } return $payload; @@ -194,4 +196,25 @@ private function moveExpired(): void */ private $reserveTime = 0; + /** + * Listens queue and runs each job. + * + * @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages + * @param bool $repeat whether to continue listening when queue is empty. + * @param int $timeout number of seconds to sleep before next iteration. + */ + public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void + { + while (1) { // TWK TODO while condition should not be hard coded, use LoopInterface + if ($payload = $this->reserve()) { + if ($handlerCallback(\unserialize($payload['job']))) { + $this->release($payload); + } + } elseif (!$repeat) { + break; + } elseif ($timeout) { + sleep($timeout); + } + } + } } diff --git a/src/Migrations/M240327150317Queue.php b/src/Migrations/M240327150317Queue.php new file mode 100644 index 0000000..56e09f9 --- /dev/null +++ b/src/Migrations/M240327150317Queue.php @@ -0,0 +1,55 @@ +createTable($this->tableName, [ + 'id' => $b->primaryKey(), + 'channel' => $b->string()->notNull(), + 'job' => $b->binary()->notNull(), + 'pushed_at' => $b->integer()->notNull(), + 'ttr' => $b->integer()->notNull(), + 'delay' => $b->integer()->notNull()->defaultValue(0), + 'priority' => $b->integer()->unsigned()->notNull()->defaultValue(1024), + 'reserved_at' => $b->integer(), + 'attempt' => $b->integer(), + 'done_at' => $b->integer(), + ]); + $b->createIndex($this->tableName, 'channel', 'channel'); + $b->createIndex($this->tableName, 'priority', 'priority'); + $b->createIndex($this->tableName, 'reserved_at', 'reserved_at'); + } + + public function down(MigrationBuilder $b): void + { + $b->dropTable($this->tableName); + } +} From b5fe7aab8a3a0b3ca1c5227d54536f2f2cb0fd76 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Wed, 27 Mar 2024 23:41:14 +0000 Subject: [PATCH 02/19] Update composer.json --- composer.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/composer.json b/composer.json index 6a8757a..a0ef141 100644 --- a/composer.json +++ b/composer.json @@ -34,6 +34,8 @@ "require": { "php": "^8.1", "yiisoft/db": "^1.0", + "yiisoft/db-migration": "^1.0", + "yiisoft/mutex-file": "^1.0", "yiisoft/queue": "dev-master" }, "require-dev": { From dd253054e98c28272383e11652a3b2cbff158f11 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Wed, 27 Mar 2024 23:44:10 +0000 Subject: [PATCH 03/19] Update Adapter.php --- src/Adapter.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Adapter.php b/src/Adapter.php index d95e281..e8fcc17 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -203,7 +203,7 @@ private function moveExpired(): void * @param bool $repeat whether to continue listening when queue is empty. * @param int $timeout number of seconds to sleep before next iteration. */ - public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void + public function run(callable $handlerCallback, bool $repeat, int<0, max> $timeout = 0): void { while (1) { // TWK TODO while condition should not be hard coded, use LoopInterface if ($payload = $this->reserve()) { From c30e377a8232f1fe273090f689d159a64cc77d6a Mon Sep 17 00:00:00 2001 From: aphraoh Date: Thu, 28 Mar 2024 00:10:55 +0000 Subject: [PATCH 04/19] Update Adapter.php --- src/Adapter.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index e8fcc17..bf9b2e7 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -201,9 +201,9 @@ private function moveExpired(): void * * @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages * @param bool $repeat whether to continue listening when queue is empty. - * @param int $timeout number of seconds to sleep before next iteration. + * @param non-negative-int $timeout number of seconds to sleep before next iteration. */ - public function run(callable $handlerCallback, bool $repeat, int<0, max> $timeout = 0): void + public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void { while (1) { // TWK TODO while condition should not be hard coded, use LoopInterface if ($payload = $this->reserve()) { From 36559085720f594b7d62e8d68f899f777d601530 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Thu, 28 Mar 2024 00:17:50 +0000 Subject: [PATCH 05/19] Update M240327150317Queue.php --- src/Migrations/M240327150317Queue.php | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/Migrations/M240327150317Queue.php b/src/Migrations/M240327150317Queue.php index 56e09f9..c75f6cc 100644 --- a/src/Migrations/M240327150317Queue.php +++ b/src/Migrations/M240327150317Queue.php @@ -10,6 +10,9 @@ final class M240327150317Queue implements RevertibleMigrationInterface, TransactionalMigrationInterface { + /** + * @var string table name + */ public $tableName = '{{%queue}}'; /* From 63bdc76617319f918014ed3e5fe50eeb54389a5c Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 12:07:39 +0100 Subject: [PATCH 06/19] Update composer.json after review --- composer.json | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index a0ef141..60cf464 100644 --- a/composer.json +++ b/composer.json @@ -34,10 +34,12 @@ "require": { "php": "^8.1", "yiisoft/db": "^1.0", - "yiisoft/db-migration": "^1.0", - "yiisoft/mutex-file": "^1.0", + "yiisoft/mutex": "^1.0", "yiisoft/queue": "dev-master" }, + "suggest": { + "yiisoft/db-migration": "^1.0": "Enabling db-migration allows database migration of the used queue" + } "require-dev": { "maglnet/composer-require-checker": "^4.7", "phpunit/phpunit": "^10.5", From 7fd3788d48253b6b954c9c025711c3251967f955 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 12:22:19 +0100 Subject: [PATCH 07/19] Use LoopInterface --- src/Adapter.php | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Adapter.php b/src/Adapter.php index bf9b2e7..72c0670 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -6,6 +6,7 @@ use InvalidArgumentException; use Yiisoft\Queue\Adapter\AdapterInterface; +use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\MessageInterface; use Yiisoft\Queue\QueueFactory; @@ -36,6 +37,7 @@ final class Adapter implements AdapterInterface public function __construct( private ConnectionInterface $db, + private LoopInterface $loop, private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME, ) { $this->mutex = new FileMutex(__CLASS__ . $this->channel, sys_get_temp_dir()); @@ -205,7 +207,7 @@ private function moveExpired(): void */ public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void { - while (1) { // TWK TODO while condition should not be hard coded, use LoopInterface + while ($this->loop->canContinue()) { if ($payload = $this->reserve()) { if ($handlerCallback(\unserialize($payload['job']))) { $this->release($payload); From 219b3ead4432419c3575b1a8c4a9ec846611b706 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 13:09:54 +0100 Subject: [PATCH 08/19] Use MutexInterface and MutexFactoryInterface --- src/Adapter.php | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index 72c0670..26c9074 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -13,14 +13,15 @@ use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Db\Connection\ConnectionInterface; use Yiisoft\Db\Query\Query; -use Yiisoft\Mutex\File\FileMutex; +use Yiisoft\Mutex\MutexFactoryInterface; +use Yiisoft\Mutex\MutexInterface; final class Adapter implements AdapterInterface { /** - * @var FileMutex file mutex + * @var MutexInterface mutex interface */ - public FileMutex $mutex; + public MutexInterface $mutex; /** * @var int mutex timeout */ @@ -38,9 +39,10 @@ final class Adapter implements AdapterInterface public function __construct( private ConnectionInterface $db, private LoopInterface $loop, + private MutexFactoryInterface $mutexFactory, private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME, ) { - $this->mutex = new FileMutex(__CLASS__ . $this->channel, sys_get_temp_dir()); + $this->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel, sys_get_temp_dir()); } public function runExisting(callable $handlerCallback): void @@ -106,7 +108,7 @@ public function withChannel(string $channel): self $new = clone $this; $new->channel = $channel; - $new->mutex = new FileMutex(__CLASS__ . $this->channel, sys_get_temp_dir()); + $new->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel, sys_get_temp_dir()); return $new; } From 2757ef15c95ce4cf9e361fabbbdadb5c18dfc7da Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 13:11:06 +0100 Subject: [PATCH 09/19] Update src/Adapter.php Co-authored-by: Dmitriy Derepko --- src/Adapter.php | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index 26c9074..4434350 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -214,9 +214,12 @@ public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): if ($handlerCallback(\unserialize($payload['job']))) { $this->release($payload); } - } elseif (!$repeat) { + continue; + } + if (!$repeat) { break; - } elseif ($timeout) { + } + if ($timeout > 0) { sleep($timeout); } } From 98d161327fcceb4b0b3225e7e8b52a2da408d688 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 13:40:12 +0100 Subject: [PATCH 10/19] Update composer.json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 60cf464..6bc3a30 100644 --- a/composer.json +++ b/composer.json @@ -38,7 +38,7 @@ "yiisoft/queue": "dev-master" }, "suggest": { - "yiisoft/db-migration": "^1.0": "Enabling db-migration allows database migration of the used queue" + "yiisoft/db-migration": "^1.0" } "require-dev": { "maglnet/composer-require-checker": "^4.7", From 5c917f1631d76fe9335aea6be9db97cb1acf4918 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 13:52:15 +0100 Subject: [PATCH 11/19] Update composer.json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 6bc3a30..1b2e35f 100644 --- a/composer.json +++ b/composer.json @@ -39,7 +39,7 @@ }, "suggest": { "yiisoft/db-migration": "^1.0" - } + }, "require-dev": { "maglnet/composer-require-checker": "^4.7", "phpunit/phpunit": "^10.5", From dfffc04aeada06930d1aba270882f327719a1b0b Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 14:03:17 +0100 Subject: [PATCH 12/19] Update Adapter.php --- src/Adapter.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index 4434350..3f5a713 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -42,7 +42,7 @@ public function __construct( private MutexFactoryInterface $mutexFactory, private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME, ) { - $this->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel, sys_get_temp_dir()); + $this->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel); } public function runExisting(callable $handlerCallback): void @@ -108,7 +108,7 @@ public function withChannel(string $channel): self $new = clone $this; $new->channel = $channel; - $new->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel, sys_get_temp_dir()); + $new->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel); return $new; } From 7e05e5f533aa92d51aa28396a4a13217041d2e1f Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 14:15:26 +0100 Subject: [PATCH 13/19] Update composer.json Build check error if not added to required --- composer.json | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/composer.json b/composer.json index 1b2e35f..f4dbb87 100644 --- a/composer.json +++ b/composer.json @@ -34,12 +34,10 @@ "require": { "php": "^8.1", "yiisoft/db": "^1.0", + "yiisoft/db-migration": "^1.0" "yiisoft/mutex": "^1.0", "yiisoft/queue": "dev-master" }, - "suggest": { - "yiisoft/db-migration": "^1.0" - }, "require-dev": { "maglnet/composer-require-checker": "^4.7", "phpunit/phpunit": "^10.5", From 6c8f0ddcb3bdaa30c5b31adcaafc55ecf3cfd68a Mon Sep 17 00:00:00 2001 From: aphraoh Date: Tue, 2 Apr 2024 14:16:08 +0100 Subject: [PATCH 14/19] Update composer.json due to invalid json --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index f4dbb87..8ae8da2 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,7 @@ "require": { "php": "^8.1", "yiisoft/db": "^1.0", - "yiisoft/db-migration": "^1.0" + "yiisoft/db-migration": "^1.0", "yiisoft/mutex": "^1.0", "yiisoft/queue": "dev-master" }, From 2d04ff10586d0aa32d2de6eed3191ecbb80c27bd Mon Sep 17 00:00:00 2001 From: aphraoh Date: Wed, 3 Apr 2024 15:19:36 +0100 Subject: [PATCH 15/19] Delete config directory due to empty di --- config/di.php | 6 ------ 1 file changed, 6 deletions(-) delete mode 100644 config/di.php diff --git a/config/di.php b/config/di.php deleted file mode 100644 index 1f6a841..0000000 --- a/config/di.php +++ /dev/null @@ -1,6 +0,0 @@ - Date: Wed, 3 Apr 2024 15:22:33 +0100 Subject: [PATCH 16/19] Update src/Adapter.php Start description with capital letter and end with dot. Co-authored-by: Alexander Makarov --- src/Adapter.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Adapter.php b/src/Adapter.php index 3f5a713..2fdcd66 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -19,7 +19,7 @@ final class Adapter implements AdapterInterface { /** - * @var MutexInterface mutex interface + * @var MutexInterface Mutex interface. */ public MutexInterface $mutex; /** From 992d64cbf6818039c9f6d73f36a549067cfb77a6 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Wed, 3 Apr 2024 15:56:21 +0100 Subject: [PATCH 17/19] Delete src/Migrations directory --- src/Migrations/M240327150317Queue.php | 58 --------------------------- 1 file changed, 58 deletions(-) delete mode 100644 src/Migrations/M240327150317Queue.php diff --git a/src/Migrations/M240327150317Queue.php b/src/Migrations/M240327150317Queue.php deleted file mode 100644 index c75f6cc..0000000 --- a/src/Migrations/M240327150317Queue.php +++ /dev/null @@ -1,58 +0,0 @@ -createTable($this->tableName, [ - 'id' => $b->primaryKey(), - 'channel' => $b->string()->notNull(), - 'job' => $b->binary()->notNull(), - 'pushed_at' => $b->integer()->notNull(), - 'ttr' => $b->integer()->notNull(), - 'delay' => $b->integer()->notNull()->defaultValue(0), - 'priority' => $b->integer()->unsigned()->notNull()->defaultValue(1024), - 'reserved_at' => $b->integer(), - 'attempt' => $b->integer(), - 'done_at' => $b->integer(), - ]); - $b->createIndex($this->tableName, 'channel', 'channel'); - $b->createIndex($this->tableName, 'priority', 'priority'); - $b->createIndex($this->tableName, 'reserved_at', 'reserved_at'); - } - - public function down(MigrationBuilder $b): void - { - $b->dropTable($this->tableName); - } -} From 1a9e2282f06c46c13c325ce67fbfca5f10bc09de Mon Sep 17 00:00:00 2001 From: aphraoh Date: Wed, 3 Apr 2024 15:57:47 +0100 Subject: [PATCH 18/19] Update composer.json --- composer.json | 4 ---- 1 file changed, 4 deletions(-) diff --git a/composer.json b/composer.json index 8ae8da2..29b8aaa 100644 --- a/composer.json +++ b/composer.json @@ -34,7 +34,6 @@ "require": { "php": "^8.1", "yiisoft/db": "^1.0", - "yiisoft/db-migration": "^1.0", "yiisoft/mutex": "^1.0", "yiisoft/queue": "dev-master" }, @@ -62,9 +61,6 @@ }, "config-plugin-options": { "source-directory": "config" - }, - "config-plugin": { - "di": "di.php" } }, "config": { From c2b137ca3c1ebed0a4f8f78ec10045be75eab876 Mon Sep 17 00:00:00 2001 From: aphraoh Date: Wed, 3 Apr 2024 16:24:18 +0100 Subject: [PATCH 19/19] Update Adapter.php Update var comments to convention Use serialize interface instead of php serialise --- src/Adapter.php | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Adapter.php b/src/Adapter.php index 2fdcd66..e1a50e6 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -9,6 +9,7 @@ use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Message\MessageSerializerInterface; use Yiisoft\Queue\QueueFactory; use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Db\Connection\ConnectionInterface; @@ -23,21 +24,22 @@ final class Adapter implements AdapterInterface */ public MutexInterface $mutex; /** - * @var int mutex timeout + * @var int Mutex timeout. */ public $mutexTimeout = 3; /** - * @var string table name + * @var string Table name. */ public $tableName = '{{%queue}}'; /** - * @var bool ability to delete released messages from table + * @var bool Ability to delete released messages from table. */ public $deleteReleased = true; public function __construct( private ConnectionInterface $db, + private MessageSerializerInterface $serializer, private LoopInterface $loop, private MutexFactoryInterface $mutexFactory, private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME, @@ -83,7 +85,7 @@ public function push(MessageInterface $message): MessageInterface $metadata = $message->getMetadata(); $this->db->createCommand()->insert($this->tableName, [ 'channel' => $this->channel, - 'job' => \serialize($message), + 'job' => $this->serializer->serialize($message), 'pushed_at' => time(), 'ttr' => $metadata['ttr'] ?? 300, 'delay' => $metadata['delay'] ?? 0, @@ -211,7 +213,7 @@ public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): { while ($this->loop->canContinue()) { if ($payload = $this->reserve()) { - if ($handlerCallback(\unserialize($payload['job']))) { + if ($handlerCallback($this->serializer->unserialize($payload['job']))) { $this->release($payload); } continue;