diff --git a/src/Services/QueuedJobService.php b/src/Services/QueuedJobService.php index 27d528b1..0dabf713 100644 --- a/src/Services/QueuedJobService.php +++ b/src/Services/QueuedJobService.php @@ -124,6 +124,15 @@ class QueuedJobService */ private static $worker_ttl = 'PT5M'; + /** + * Duration for TTL of initialising state based on ISO 8601 duration specification. + * if a job is stuck in this state longer than this value it's considered stalled + * + * @var string + * @config + */ + private static $initialising_state_ttl = 'PT2M'; + /** * Timestamp (in seconds) when the queue was started * @@ -378,7 +387,7 @@ protected function copyDescriptorToJob($jobDescriptor, $job) * * @param string $type Job type * - * @return QueuedJobDescriptor|false + * @return QueuedJobDescriptor|null */ public function getNextPendingJob($type = null) { @@ -425,6 +434,7 @@ public function getNextPendingJob($type = null) * * @param int $queue The queue to check against * @return array stalled job and broken job IDs + * @throws Exception */ public function checkJobHealth($queue = null) { @@ -443,13 +453,19 @@ public function checkJobHealth($queue = null) // If no steps have been processed since the last run, consider it a broken job // Only check jobs that have been viewed before. LastProcessedCount defaults to -1 on new jobs. // Only check jobs that are past expiry to ensure another process isn't currently executing the job - $now = DBDatetime::now()->Rfc2822(); $stalledJobs = $runningJobs ->filter([ 'LastProcessedCount:GreaterThanOrEqual' => 0, - 'Expiry:LessThanOrEqual' => $now, ]) - ->where('"StepsProcessed" = "LastProcessedCount"'); + ->where('"StepsProcessed" = "LastProcessedCount"') + ->whereAny([ + // either job lock is expired + '"Expiry" <= ?' => DBDatetime::now()->Rfc2822(), + // or job lock was never assigned (maybe there were not enough server resources to kick off the process) + // fall back to LastEdited time and only restart those jobs that were left untouched for a small while + // this covers the situation where a process is still going to pick up the job + '"Expiry" IS NULL AND "LastEdited" <= ?' => $this->getInitStateExpiry() + ]); /** @var QueuedJobDescriptor $stalledJob */ foreach ($stalledJobs as $stalledJob) { @@ -596,8 +612,7 @@ public function checkDefaultJobs($queue = null) */ protected function restartStalledJob($stalledJob) { - // release job lock on the descriptor so it can run again - $stalledJob->Worker = null; + $this->releaseJobLock($stalledJob); if ($stalledJob->ResumeCounts < static::config()->get('stall_threshold')) { $stalledJob->restart(); @@ -921,6 +936,7 @@ public function runJob($jobId) )); if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) { $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; + $this->releaseJobLock($jobDescriptor); } $broken = true; } @@ -933,6 +949,7 @@ public function runJob($jobId) )); if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) { $jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT; + $this->releaseJobLock($jobDescriptor); } $broken = true; } @@ -1414,6 +1431,29 @@ protected function getWorkerExpiry(): string return $expiry->Rfc2822(); } + /** + * Get expiry time for a INIT state of a queued job + * this helps to identify jobs that have stalled more accurately + * + * @return string + * @throws Exception + */ + protected function getInitStateExpiry(): string + { + $now = DBDatetime::now()->Rfc2822(); + $time = new DateTime($now); + $timeToLive = $this->config()->get('initialising_state_ttl'); + + if ($timeToLive) { + $time->sub(new DateInterval($timeToLive)); + } + + /** @var DBDatetime $expiry */ + $expiry = DBField::create_field('Datetime', $time->getTimestamp()); + + return $expiry->Rfc2822(); + } + /** * Add job-specific logger functionality which has the ability to flush logs into * the job descriptor database record. Based on the default logger set for this class, @@ -1459,6 +1499,16 @@ private function addJobHandlersToLogger(LoggerInterface $logger, QueuedJob $job, } } + /** + * Release job lock on the descriptor so it can run again + * + * @param QueuedJobDescriptor $descriptor + */ + protected function releaseJobLock(QueuedJobDescriptor $descriptor): void + { + $descriptor->Worker = null; + } + /** * @return string */ diff --git a/tests/QueuedJobsTest.php b/tests/QueuedJobsTest.php index 9cf21066..6fb2f207 100644 --- a/tests/QueuedJobsTest.php +++ b/tests/QueuedJobsTest.php @@ -493,6 +493,38 @@ public function testJobHealthCheck() ); } + public function testJobHealthCheckForStuckInitJobs() + { + $svc = $this->getService(); + $logger = $svc->getLogger(); + $job = new TestQueuedJob(QueuedJob::IMMEDIATE); + $id = $svc->queueJob($job); + + /** @var QueuedJobDescriptor $descriptor */ + $descriptor = QueuedJobDescriptor::get()->byID($id); + + // Kick off job processing - this is before job has a worker allocated + DBDatetime::set_mock_now('2017-01-01 16:00:00'); + $descriptor->JobStatus = QueuedJob::STATUS_INIT; + $descriptor->LastProcessedCount = 0; + $descriptor->StepsProcessed = 0; + $descriptor->write(); + + // Check that valid jobs are left untouched + DBDatetime::set_mock_now('2017-01-01 16:01:59'); + $svc->checkJobHealth(QueuedJob::IMMEDIATE); + + $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertEquals(QueuedJob::STATUS_INIT, $descriptor->JobStatus); + + // Check that init jobs which are considered stuck are handled + DBDatetime::set_mock_now('2017-01-01 16:02:00'); + $svc->checkJobHealth(QueuedJob::IMMEDIATE); + + $descriptor = QueuedJobDescriptor::get()->byID($id); + $this->assertEquals(QueuedJob::STATUS_WAIT, $descriptor->JobStatus); + } + public function testExceptionWithMemoryExhaustion() { $svc = $this->getService();