Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Config and Migration plus review changes #13

Merged
merged 19 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
"require": {
"php": "^8.1",
"yiisoft/db": "^1.0",
"yiisoft/mutex": "^1.0",
"yiisoft/queue": "dev-master"
},
"require-dev": {
Expand All @@ -60,9 +61,6 @@
},
"config-plugin-options": {
"source-directory": "config"
},
"config-plugin": {
"di": "di.php"
}
},
"config": {
Expand Down
66 changes: 49 additions & 17 deletions src/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,50 @@

use InvalidArgumentException;
use Yiisoft\Queue\Adapter\AdapterInterface;
use Yiisoft\Queue\Cli\LoopInterface;
use Yiisoft\Queue\Enum\JobStatus;
use Yiisoft\Queue\Message\MessageInterface;
use Yiisoft\Queue\Message\MessageSerializerInterface;
use Yiisoft\Queue\QueueFactory;
use Yiisoft\Queue\Message\IdEnvelope;
use Yiisoft\Db\Connection\ConnectionInterface;
use Yiisoft\Db\Query\Query;
use Yiisoft\Mutex\MutexFactoryInterface;
use Yiisoft\Mutex\MutexInterface;

final class Adapter implements AdapterInterface
{
/**
* @var int timeout
* @var MutexInterface Mutex interface.
*/
public MutexInterface $mutex;
/**
* @var int Mutex timeout.
*/
public $mutexTimeout = 3;
/**
* @var string table name
* @var string Table name.
*/
public $tableName = '{{%queue}}';
/**
* @var bool ability to delete released messages from table
* @var bool Ability to delete released messages from table.
*/
public $deleteReleased = true;


public function __construct(
private ConnectionInterface $db,
private MessageSerializerInterface $serializer,
private LoopInterface $loop,
private MutexFactoryInterface $mutexFactory,
private string $channel = QueueFactory::DEFAULT_CHANNEL_NAME,
) {
$this->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel);

Check warning on line 47 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L47

Added line #L47 was not covered by tests
}

public function runExisting(callable $handlerCallback): void
{
$result = true;
while (($payload = $this->reserve()) && ($result === true)) {
if ($result = $handlerCallback(\unserialize($payload['job']))) {
$this->release($payload);
}
}
$this->run($handlerCallback, false);

Check warning on line 52 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L52

Added line #L52 was not covered by tests
}

public function status(string|int $id): JobStatus
Expand Down Expand Up @@ -78,7 +85,7 @@
$metadata = $message->getMetadata();
$this->db->createCommand()->insert($this->tableName, [
'channel' => $this->channel,
'job' => \serialize($message),
'job' => $this->serializer->serialize($message),

Check warning on line 88 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L88

Added line #L88 was not covered by tests
'pushed_at' => time(),
'ttr' => $metadata['ttr'] ?? 300,
'delay' => $metadata['delay'] ?? 0,
Expand All @@ -92,7 +99,7 @@

public function subscribe(callable $handlerCallback): void
{
$this->runExisting($handlerCallback);
$this->run($handlerCallback, true, 5); // TWK TODO timeout should not be hard coded

Check warning on line 102 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L102

Added line #L102 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you plan to resolve it before merge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I do it after the merge, I believe the solution should be somewhere in the middleware but need to investigate a bit further so maybe that can be a lession for another day after the merge...

}

public function withChannel(string $channel): self
Expand All @@ -103,7 +110,8 @@

$new = clone $this;
$new->channel = $channel;

$new->mutex = $this->mutexFactory->create(__CLASS__ . $this->channel);

Check warning on line 113 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L113

Added line #L113 was not covered by tests

return $new;
}

Expand All @@ -115,10 +123,10 @@
*/
protected function reserve(): array|null
{
// TWK TODO ??? return $this->db->useMaster(function () {
// TWK TODO ??? if (!$this->mutex->acquire(__CLASS__ . $this->channel, $this->mutexTimeout)) {
// TWK TODO ??? throw new \Exception('Has not waited the lock.');
// TWK TODO ??? }
// TWK TODO what is useMaster in Yii3 return $this->db->useMaster(function () {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (!$this->mutex->acquire($this->mutexTimeout)) {
throw new \Exception('Has not waited the lock.');

Check warning on line 128 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L127-L128

Added lines #L127 - L128 were not covered by tests
}

try {
$this->moveExpired();
Expand Down Expand Up @@ -147,7 +155,7 @@
}
}
} finally {
// TWK TODO ??? $this->mutex->release(__CLASS__ . $this->channel);
$this->mutex->release();

Check warning on line 158 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L158

Added line #L158 was not covered by tests
}

return $payload;
Expand Down Expand Up @@ -194,4 +202,28 @@
*/
private $reserveTime = 0;

/**
* Listens queue and runs each job.
*
* @param callable(MessageInterface): bool $handlerCallback The handler which will handle messages. Returns false if it cannot continue handling messages
* @param bool $repeat whether to continue listening when queue is empty.
* @param non-negative-int $timeout number of seconds to sleep before next iteration.
*/
public function run(callable $handlerCallback, bool $repeat, int $timeout = 0): void

Check warning on line 212 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L212

Added line #L212 was not covered by tests
{
while ($this->loop->canContinue()) {
if ($payload = $this->reserve()) {
if ($handlerCallback($this->serializer->unserialize($payload['job']))) {
$this->release($payload);

Check warning on line 217 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L214-L217

Added lines #L214 - L217 were not covered by tests
}
continue;

Check warning on line 219 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L219

Added line #L219 was not covered by tests
}
if (!$repeat) {
break;

Check warning on line 222 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L221-L222

Added lines #L221 - L222 were not covered by tests
}
if ($timeout > 0) {
sleep($timeout);

Check warning on line 225 in src/Adapter.php

View check run for this annotation

Codecov / codecov/patch

src/Adapter.php#L224-L225

Added lines #L224 - L225 were not covered by tests
}
}
}
}
Loading