Skip to content

Commit

Permalink
Add unique queue support.
Browse files Browse the repository at this point in the history
  • Loading branch information
steveworley committed Mar 17, 2023
1 parent a004b23 commit 2e8114a
Show file tree
Hide file tree
Showing 10 changed files with 130 additions and 9 deletions.
2 changes: 1 addition & 1 deletion modules/quant_api/src/EventSubscriber/QuantApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public function onOutput(QuantEvent $event) {

$media = array_merge($res['attachments']['js'], $res['attachments']['css'], $res['attachments']['media']['images'], $res['attachments']['media']['documents'], $res['attachments']['media']['video']);

$queue_factory = \Drupal::service('queue');
$queue_factory = \Drupal::service('quant.queue_factory');
$queue = $queue_factory->get('quant_seed_worker');

foreach ($media as $item) {
Expand Down
2 changes: 1 addition & 1 deletion quant.module
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ function quant_node_translation_delete($entity) {
* The batch context.
*/
function quant_process_queue(array &$context) {
$factory = \Drupal::service('queue');
$factory = \Drupal::service('quant.queue_factory');
$manager = \Drupal::service('plugin.manager.queue_worker');

$queue = $factory->get('quant_seed_worker');
Expand Down
4 changes: 4 additions & 0 deletions quant.services.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,7 @@ services:
- '@config.factory'
tags:
- { name: 'event_subscriber' }

quant.queue_factory:
class: Drupal\quant\QuantQueueFactory
parent: queue.database
16 changes: 14 additions & 2 deletions src/Commands/QuantDrushCommands.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ private function getDrushPath() {
* @usage quant:run-queue --threads=5
*/
public function message($options = ['threads' => 5]) {
$queue_factory = \Drupal\Core\Site\Settings::get('queue_service_quant_seed_worker');

if (empty($queue_factory)) {
$this->output()->writeln('<error>Drush support is not configured</error>' . PHP_EOL);
$this->output()->writeln('To enable Drush support, you need to define the queue factory for the quant_seed_worker queue');
$this->output()->writeln('by updating settings.php and adding the following' . PHP_EOL);
$this->output()->writeln(' $settings["queue_service_quant_seed_worker"] = "quant.queue_factory";' . PHP_EOL);
return;
}

var_dump($queue_factory);
exit;

$this->output()->writeln("<info>Forking seed worker.</info>");
$drushPath = $this->getDrushPath();
Expand Down Expand Up @@ -103,7 +115,7 @@ public function message($options = ['threads' => 5]) {
* @usage quant:clear-queue
*/
public function clear($options = []) {
$queue_factory = \Drupal::service('queue');
$queue_factory = \Drupal::service('quant.queue_factory');
$queue = $queue_factory->get('quant_seed_worker');
$queue->deleteQueue();
$this->output()->writeln("Removed all items from Quant queue.");
Expand All @@ -123,7 +135,7 @@ public function prepare($options = ['reset' => 'true']) {

$config = \Drupal::configFactory()->getEditable('quant.settings');

$queue_factory = \Drupal::service('queue');
$queue_factory = \Drupal::service('quant.queue_factory');
$queue = $queue_factory->get('quant_seed_worker');

$dispatcher = \Drupal::service('event_dispatcher');
Expand Down
2 changes: 1 addition & 1 deletion src/Event/ConfigFormEventBase.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public function getFormState() {
*/
public function getQueue() {
if (empty($this->queue)) {
$queue_factory = \Drupal::service('queue');
$queue_factory = \Drupal::service('quant.queue_factory');
$this->queue = $queue_factory->get('quant_seed_worker');
}
return $this->queue;
Expand Down
2 changes: 1 addition & 1 deletion src/EventSubscriber/CollectionSubscriber.php
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public function collectRoutes(CollectRoutesEvent $event) {

// Generate a redirection QueueItem from canonical path to URL.
// Use the default language alias in the event of multi-lang setup.
$queue_factory = \Drupal::service('queue');
$queue_factory = \Drupal::service('quant.queue_factory');
$queue = $queue_factory->get('quant_seed_worker');

if ("/taxonomy/term/{$tid}" != $url) {
Expand Down
2 changes: 1 addition & 1 deletion src/Form/SeedForm.php
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public function submitForm(array &$form, FormStateInterface $form_state) {
}
}

$queue_factory = \Drupal::service('queue');
$queue_factory = \Drupal::service('quant.queue_factory');
$queue = $queue_factory->get('quant_seed_worker');
$queue->deleteQueue();

Expand Down
5 changes: 3 additions & 2 deletions src/Page/QueueInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Drupal\Core\Controller\ControllerBase;
use Drupal\Core\Database\Query\PagerSelectExtender;
use Drupal\quant\QuantQueue;

/**
* Page controller for the queue info page.
Expand All @@ -18,7 +19,7 @@ class QueueInfo extends ControllerBase {
*/
public function build() {
$db = \Drupal::database();
$query = $db->select('queue', 'q')
$query = $db->select(QuantQueue::TABLE_NAME, 'q')
->condition('name', 'quant_seed_worker')
->fields('q', ['item_id', 'name', 'data', 'expire', 'created']);
$pager = $query->extend(PagerSelectExtender::class)->limit(10);
Expand All @@ -30,7 +31,7 @@ public function build() {
$this->t('Content/Data'),
];

$queue_factory = \Drupal::service('queue');
$queue_factory = \Drupal::service('quant.queue_factory');
$queue = $queue_factory->get('quant_seed_worker');

if ($queue->numberOfItems() > 0) {
Expand Down
91 changes: 91 additions & 0 deletions src/QuantQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?php

namespace Drupal\quant;

use Drupal\Core\Database\IntegrityConstraintViolationException;
use Drupal\Core\Queue\DatabaseQueue;

/**
* Additional handling for Quant queue items.
*
* @see https://git.drupalcode.org/project/queue_unique
*/
class QuantQueue extends DatabaseQueue {
/**
* The table name to store the queue items.
*/
public const TABLE_NAME = 'quant_queue';

/**
* Length of the hash coulmn.
*/
public const HASH_LENGTH = 48;

/**
* The algorithm used to hash the item.
*/
public const HASH_METHOD = 'sha2';

/**
* {@inheritdoc}
*/
public function doCreateItem($data) {
try {
$serialized_data = serialize($data);
$query = $this->connection->insert(static::TABLE_NAME)
->fields([
'name' => $this->name,
'data' => $serialized_data,
'created' => time(),
// Generate a near-unique value for this data on this queue.
'hash' => static::hash($this->name, $serialized_data),
]);
return $query->execute();
} catch (IntegrityConstraintViolationException $err) {
return FALSE;
}
}

/**
* Generate a hashed string from a queue name and serialized data.
*
* @param string $name
* The queue name.
* @param string $serialized_data
* The serialized data.
*
* @return string
* The hash string.
*/
public static function hash($name, $serialized_data) {
$substr_length = static::HASH_LENGTH - strlen(static::HASH_METHOD);
return static::HASH_METHOD . substr(base64_encode(hash('sha512', $name . $serialized_data, TRUE)), 0, $substr_length);
}

/**
* {@inheritdoc}
*/
public function schemaDefinition() {
return array_merge_recursive(
parent::schemaDefinition(),
// We cannot create a unique key on the data field because it is a blob.
// Instead, we merge an additional field which should contain a hash
// of the data and a unique key for this field into the original schema
// definition. These are used to ensure uniqueness.
[
'fields' => [
'hash' => [
'type' => 'char',
'length' => static::HASH_LENGTH,
'not null' => TRUE,
],
],
'unique keys' => [
'unique' => ['hash'],
],
]
);
}


}
13 changes: 13 additions & 0 deletions src/QuantQueueFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Drupal\quant;
use Drupal\Core\Queue\QueueDatabaseFactory;

class QuantQueueFactory extends QueueDatabaseFactory {
/**
* {@inheritdoc}
*/
public function get($name) {
return new QuantQueue($name, $this->connection);
}
}

0 comments on commit 2e8114a

Please sign in to comment.