Skip to content

Commit

Permalink
FIX Limit duration of INIT state on jobs (#320)
Browse files Browse the repository at this point in the history
Resolves an issue where a job could get stuck in the INIT / WAITING state,
unable to release its lock.
  • Loading branch information
mfendeksilverstripe authored Dec 14, 2020
1 parent 079e4eb commit e259183
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 6 deletions.
62 changes: 56 additions & 6 deletions src/Services/QueuedJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,15 @@ class QueuedJobService
*/
private static $worker_ttl = 'PT5M';

/**
* Duration for TTL of initialising state based on ISO 8601 duration specification.
* if a job is stuck in this state longer than this value it's considered stalled
*
* @var string
* @config
*/
private static $initialising_state_ttl = 'PT2M';

/**
* Timestamp (in seconds) when the queue was started
*
Expand Down Expand Up @@ -378,7 +387,7 @@ protected function copyDescriptorToJob($jobDescriptor, $job)
*
* @param string $type Job type
*
* @return QueuedJobDescriptor|false
* @return QueuedJobDescriptor|null
*/
public function getNextPendingJob($type = null)
{
Expand Down Expand Up @@ -425,6 +434,7 @@ public function getNextPendingJob($type = null)
*
* @param int $queue The queue to check against
* @return array stalled job and broken job IDs
* @throws Exception
*/
public function checkJobHealth($queue = null)
{
Expand All @@ -443,13 +453,19 @@ public function checkJobHealth($queue = null)
// If no steps have been processed since the last run, consider it a broken job
// Only check jobs that have been viewed before. LastProcessedCount defaults to -1 on new jobs.
// Only check jobs that are past expiry to ensure another process isn't currently executing the job
$now = DBDatetime::now()->Rfc2822();
$stalledJobs = $runningJobs
->filter([
'LastProcessedCount:GreaterThanOrEqual' => 0,
'Expiry:LessThanOrEqual' => $now,
])
->where('"StepsProcessed" = "LastProcessedCount"');
->where('"StepsProcessed" = "LastProcessedCount"')
->whereAny([
// either job lock is expired
'"Expiry" <= ?' => DBDatetime::now()->Rfc2822(),
// or job lock was never assigned (maybe there were not enough server resources to kick off the process)
// fall back to LastEdited time and only restart those jobs that were left untouched for a small while
// this covers the situation where a process is still going to pick up the job
'"Expiry" IS NULL AND "LastEdited" <= ?' => $this->getInitStateExpiry()
]);

/** @var QueuedJobDescriptor $stalledJob */
foreach ($stalledJobs as $stalledJob) {
Expand Down Expand Up @@ -596,8 +612,7 @@ public function checkDefaultJobs($queue = null)
*/
protected function restartStalledJob($stalledJob)
{
// release job lock on the descriptor so it can run again
$stalledJob->Worker = null;
$this->releaseJobLock($stalledJob);

if ($stalledJob->ResumeCounts < static::config()->get('stall_threshold')) {
$stalledJob->restart();
Expand Down Expand Up @@ -921,6 +936,7 @@ public function runJob($jobId)
));
if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) {
$jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT;
$this->releaseJobLock($jobDescriptor);
}
$broken = true;
}
Expand All @@ -933,6 +949,7 @@ public function runJob($jobId)
));
if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) {
$jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT;
$this->releaseJobLock($jobDescriptor);
}
$broken = true;
}
Expand Down Expand Up @@ -1414,6 +1431,29 @@ protected function getWorkerExpiry(): string
return $expiry->Rfc2822();
}

/**
* Get expiry time for a INIT state of a queued job
* this helps to identify jobs that have stalled more accurately
*
* @return string
* @throws Exception
*/
protected function getInitStateExpiry(): string
{
$now = DBDatetime::now()->Rfc2822();
$time = new DateTime($now);
$timeToLive = $this->config()->get('initialising_state_ttl');

if ($timeToLive) {
$time->sub(new DateInterval($timeToLive));
}

/** @var DBDatetime $expiry */
$expiry = DBField::create_field('Datetime', $time->getTimestamp());

return $expiry->Rfc2822();
}

/**
* Add job-specific logger functionality which has the ability to flush logs into
* the job descriptor database record. Based on the default logger set for this class,
Expand Down Expand Up @@ -1459,6 +1499,16 @@ private function addJobHandlersToLogger(LoggerInterface $logger, QueuedJob $job,
}
}

/**
* Release job lock on the descriptor so it can run again
*
* @param QueuedJobDescriptor $descriptor
*/
protected function releaseJobLock(QueuedJobDescriptor $descriptor): void
{
$descriptor->Worker = null;
}

/**
* @return string
*/
Expand Down
32 changes: 32 additions & 0 deletions tests/QueuedJobsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,38 @@ public function testJobHealthCheck()
);
}

public function testJobHealthCheckForStuckInitJobs()
{
$svc = $this->getService();
$logger = $svc->getLogger();
$job = new TestQueuedJob(QueuedJob::IMMEDIATE);
$id = $svc->queueJob($job);

/** @var QueuedJobDescriptor $descriptor */
$descriptor = QueuedJobDescriptor::get()->byID($id);

// Kick off job processing - this is before job has a worker allocated
DBDatetime::set_mock_now('2017-01-01 16:00:00');
$descriptor->JobStatus = QueuedJob::STATUS_INIT;
$descriptor->LastProcessedCount = 0;
$descriptor->StepsProcessed = 0;
$descriptor->write();

// Check that valid jobs are left untouched
DBDatetime::set_mock_now('2017-01-01 16:01:59');
$svc->checkJobHealth(QueuedJob::IMMEDIATE);

$descriptor = QueuedJobDescriptor::get()->byID($id);
$this->assertEquals(QueuedJob::STATUS_INIT, $descriptor->JobStatus);

// Check that init jobs which are considered stuck are handled
DBDatetime::set_mock_now('2017-01-01 16:02:00');
$svc->checkJobHealth(QueuedJob::IMMEDIATE);

$descriptor = QueuedJobDescriptor::get()->byID($id);
$this->assertEquals(QueuedJob::STATUS_WAIT, $descriptor->JobStatus);
}

public function testExceptionWithMemoryExhaustion()
{
$svc = $this->getService();
Expand Down

0 comments on commit e259183

Please sign in to comment.