Skip to content

Commit

Permalink
[BUGFIX symbiote#317] Make queueJob threadsafe
Browse files Browse the repository at this point in the history
- Separate functionality into protected methods for readability
- Wrap queueJob read/write functionality in withTransaction()
  • Loading branch information
danaenz committed Mar 26, 2021
1 parent cff6875 commit c941a73
Showing 1 changed file with 113 additions and 55 deletions.
168 changes: 113 additions & 55 deletions src/Services/QueuedJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -235,64 +235,30 @@ public function queueJob(QueuedJob $job, $startAfter = null, $userId = null, $qu
{
$signature = $job->getSignature();

// see if we already have this job in a queue
$filter = [
'Signature' => $signature,
'JobStatus' => [
QueuedJob::STATUS_NEW,
QueuedJob::STATUS_INIT,
],
];

$existing = QueuedJobDescriptor::get()
->filter($filter)
->first();

if ($existing && $existing->ID) {
return $existing->ID;
}

$jobDescriptor = new QueuedJobDescriptor();
$jobDescriptor->JobTitle = $job->getTitle();
$jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType();
$jobDescriptor->Signature = $signature;
$jobDescriptor->Implementation = get_class($job);
$jobDescriptor->StartAfter = $startAfter;

// no user provided - fallback to job user default
if ($userId === null && $job instanceof UserContextInterface) {
$userId = $job->getRunAsMemberID();
}
// Create the initial object
$jobDescriptor = $this->createJobDescriptor($job, $signature, $startAfter, $userId, $queueName);

// still no user - fallback to current user
if ($userId === null) {
if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) {
// current user available
$runAsID = Security::getCurrentUser()->ID;
} else {
// current user unavailable
$runAsID = 0;
}
} else {
$runAsID = $userId;
try {
return $this->findOrMakeJobDescriptorFromSignature($signature, $job, $jobDescriptor, $startAfter);
} catch (\Throwable $e) {
// note that error here may not be an issue as failing to acquire a job lock is a valid state
// which happens when other process claimed the job lock first
$this->getLogger()->debug(
sprintf(
'[%s] - Queued Jobs - Failed to acquire job lock %s %d %s',
DBDatetime::now()->Rfc2822(),
$e->getMessage(),
$signature,
PHP_EOL
),
[
'file' => __FILE__,
'line' => __LINE__,
]
);
}

$jobDescriptor->RunAsID = $runAsID;

// use this to populate custom data columns before job is queued
// note: you can pass arbitrary data to your job and then move it to job descriptor
// this is useful if you need some data that needs to be exposed as a separate
// DB column as opposed to serialised data
$this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job);

// copy data
$this->copyJobToDescriptor($job, $jobDescriptor);

$jobDescriptor->write();

$this->startJob($jobDescriptor, $startAfter);

return $jobDescriptor->ID;
return false;
}

/**
Expand Down Expand Up @@ -334,6 +300,98 @@ public function isAtMaxJobs()
return false;
}

/**
* Using a job signature, returns the JobDescriptor ID and whether the
* job descriptor is new or existing
*
* @param string $signature
* @param QueuedJob $job
* @param QueuedJobDescriptor $jobDescriptor
* @param null|string $startAfter
* @return int|null
*/
protected function findOrMakeJobDescriptorFromSignature($signature, $job, $jobDescriptor, $startAfter)
{
// Start a transaction which will hold until we have a lock on this signature.
return DB::get_conn()->withTransaction(function () use ($signature, $job, $jobDescriptor, $startAfter) {
$query = 'SELECT "ID" FROM "QueuedJobDescriptor" WHERE "Signature" = ? FOR UPDATE';

// Retrieve first record
$result = DB::prepared_query($query, [$signature]);

if ($result === null) {
throw new Exception('Failed to execute query to retrieve job signature');
}

$ID = $result->value();

// If the record does not exist
if (!$ID) {
// use this to populate custom data columns before job is queued
// note: you can pass arbitrary data to your job and then move it to job descriptor
// this is useful if you need some data that needs to be exposed as a separate
// DB column as opposed to serialised data
$this->extend('updateJobDescriptorBeforeQueued', $jobDescriptor, $job);

// copy data
$this->copyJobToDescriptor($job, $jobDescriptor);

// Write the record
$jobDescriptorID = $jobDescriptor->write();

$this->startJob($jobDescriptor, $startAfter);
} else {
$jobDescriptorID = $ID;
}
});
}

/**
* @param QueuedJob $job
* @param string $signature
* @param null $startAfter
* @param null $userId
* @param null $queueName
* @return QueuedJobDescriptor
*/
protected function createJobDescriptor(
QueuedJob $job,
$signature,
$startAfter = null,
$userId = null,
$queueName = null
)
{
$jobDescriptor = QueuedJobDescriptor::create();
$jobDescriptor->JobTitle = $job->getTitle();
$jobDescriptor->JobType = $queueName ? $queueName : $job->getJobType();
$jobDescriptor->Signature = $signature;
$jobDescriptor->Implementation = get_class($job);
$jobDescriptor->StartAfter = $startAfter;

// no user provided - fallback to job user default
if ($userId === null && $job instanceof UserContextInterface) {
$userId = $job->getRunAsMemberID();
}

// still no user - fallback to current user
if ($userId === null) {
if (Security::getCurrentUser() && Security::getCurrentUser()->exists()) {
// current user available
$runAsID = Security::getCurrentUser()->ID;
} else {
// current user unavailable
$runAsID = 0;
}
} else {
$runAsID = $userId;
}

$jobDescriptor->RunAsID = $runAsID;

return $jobDescriptor;
}

/**
* Copies data from a job into a descriptor for persisting
*
Expand Down

0 comments on commit c941a73

Please sign in to comment.