diff --git a/composer.json b/composer.json index 6a8757a..29b8aaa 100644 --- a/composer.json +++ b/composer.json @@ -34,6 +34,7 @@ "require": { "php": "^8.1", "yiisoft/db": "^1.0", + "yiisoft/mutex": "^1.0", "yiisoft/queue": "dev-master" }, "require-dev": { @@ -60,9 +61,6 @@ }, "config-plugin-options": { "source-directory": "config" - }, - "config-plugin": { - "di": "di.php" } }, "config": { diff --git a/src/Adapter.php b/src/Adapter.php index accc32e..e1a50e6 100644 --- a/src/Adapter.php +++ b/src/Adapter.php @@ -6,43 +6,50 @@ use InvalidArgumentException; use Yiisoft\Queue\Adapter\AdapterInterface; +use Yiisoft\Queue\Cli\LoopInterface; use Yiisoft\Queue\Enum\JobStatus; use Yiisoft\Queue\Message\MessageInterface; +use Yiisoft\Queue\Message\MessageSerializerInterface; use Yiisoft\Queue\QueueFactory; use Yiisoft\Queue\Message\IdEnvelope; use Yiisoft\Db\Connection\ConnectionInterface; use Yiisoft\Db\Query\Query; +use Yiisoft\Mutex\MutexFactoryInterface; +use Yiisoft\Mutex\MutexInterface; final class Adapter implements AdapterInterface { /** - * @var int timeout + * @var MutexInterface Mutex interface. + */ + public MutexInterface $mutex; + /** + * @var int Mutex timeout. */ public $mutexTimeout = 3; /** - * @var string table name + * @var string Table name. */ public $tableName = '{{%queue}}'; /** - * @var bool ability to delete released messages from table + * @var bool Ability to delete released messages from table. */ public $deleteReleased = true; public function __construct( private ConnectionInterface $db, + private MessageSerializerInterface $serializer, + private LoopInterface $loop, + private MutexFactoryInterface $mutexFactory, private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME, ) { + $this->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel); } public function runExisting(callable $handlerCallback): void { - $result = true; - while (($payload = $this->reserve()) && ($result === true)) { - if ($result = $handlerCallback(\unserialize($payload['job']))) { - $this->release($payload); - } - } + $this->run($handlerCallback, false); } public function status(string|int $id): JobStatus @@ -78,7 +85,7 @@ public function push(MessageInterface $message): MessageInterface $metadata = $message->getMetadata(); $this->db->createCommand()->insert($this->tableName, [ 'channel' => $this->channel, - 'job' => \serialize($message), + 'job' => $this->serializer->serialize($message), 'pushed_at' => time(), 'ttr' => $metadata['ttr'] ?? 300, 'delay' => $metadata['delay'] ?? 0, @@ -92,7 +99,7 @@ public function push(MessageInterface $message): MessageInterface public function subscribe(callable $handlerCallback): void { - $this->runExisting($handlerCallback); + $this->run($handlerCallback, true, 5); // TWK TODO timeout should not be hard coded } public function withChannel(string $channel): self @@ -103,7 +110,8 @@ public function withChannel(string $channel): self $new = clone $this; $new->channel = $channel; - + $new->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel); + return $new; } @@ -115,10 +123,10 @@ public function withChannel(string $channel): self */ protected function reserve(): array|null { - // TWK TODO ??? return $this->db->useMaster(function () { - // TWK TODO ??? if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) { - // TWK TODO ??? throw new \Exception('Has not waited the lock.'); - // TWK TODO ??? } + // TWK TODO what is useMaster in Yii3 return $this->db->useMaster(function () { + if (!$this->mutex->acquire($this->mutexTimeout)) { + throw new \Exception('Has not waited the lock.'); + } try { $this->moveExpired(); @@ -147,7 +155,7 @@ protected function reserve(): array|null } } } finally { - // TWK TODO ??? $this->mutex->release(__CLASS__ . $this->channel); + $this->mutex->release(); } return $payload; @@ -194,4 +202,28 @@ private function moveExpired(): void */ private $reserveTime = 0; + /** + * Listens queue and runs each job. + * + * @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages + * @param bool $repeat whether to continue listening when queue is empty. + * @param non-negative-int $timeout number of seconds to sleep before next iteration. + */ + public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void + { + while ($this->loop->canContinue()) { + if ($payload = $this->reserve()) { + if ($handlerCallback($this->serializer->unserialize($payload['job']))) { + $this->release($payload); + } + continue; + } + if (!$repeat) { + break; + } + if ($timeout > 0) { + sleep($timeout); + } + } + } }