Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NEW CLI output in ProcessJobQueueTask #310

Merged
merged 7 commits into from
Nov 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions _config/queuedjobs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ SilverStripe\Core\Injector\Injector:
queueHandler: %$QueueHandler
# Change to %$DoormanRunner for async processing (requires *nix)
queueRunner: %$Symbiote\QueuedJobs\Tasks\Engines\QueueRunner
logger: %$Psr\Log\LoggerInterface

DefaultRule:
class: 'AsyncPHP\Doorman\Rule\InMemoryRule'
Expand Down
15 changes: 13 additions & 2 deletions src/Services/QueuedJobHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Symbiote\QueuedJobs\Services;

use Monolog\Formatter\LineFormatter;
use Monolog\Handler\AbstractProcessingHandler;
use SilverStripe\Core\Injector\Injectable;
use Symbiote\QueuedJobs\DataObjects\QueuedJobDescriptor;
Expand Down Expand Up @@ -56,11 +57,21 @@ protected function write(array $record)

public function handleBatch(array $records)
{
foreach ($records as $record) {
$this->job->addMessage($record['message'], $record['level_name'], $record['datetime']);
foreach ($records as $i => $record) {
$records[$i] = $this->processRecord($records[$i]);
$records[$i]['formatted'] = $this->getFormatter()->format($records[$i]);
$this->job->addMessage($records[$i]['formatted'], $records[$i]['level_name'], $records[$i]['datetime']);
};
$this->jobDescriptor->SavedJobMessages = serialize($this->job->getJobData()->messages);

$this->jobDescriptor->write();
}

/**
* Ensure that exception context is retained. Similar logic to SyslogHandler.
*/
protected function getDefaultFormatter()
{
return new LineFormatter('%message% %context% %extra%');
}
}
232 changes: 114 additions & 118 deletions src/Services/QueuedJobService.php
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,11 @@ class QueuedJobService
*/
private static $lock_file_path = '';

/**
* @var LoggerInterface
*/
private $logger;

/**
* @var DefaultQueueHandler
*/
Expand Down Expand Up @@ -476,16 +481,7 @@ public function checkJobHealth($queue = null)
}

$this->getLogger()->error(
print_r(
[
'errno' => 0,
'errstr' => 'Broken jobs were found in the job queue',
'errfile' => __FILE__,
'errline' => __LINE__,
'errcontext' => [],
],
true
),
'Broken jobs were found in the job queue',
[
'file' => __FILE__,
'line' => __LINE__,
Expand Down Expand Up @@ -724,7 +720,7 @@ protected function grabMutex(QueuedJobDescriptor $jobDescriptor)
});

return true;
} catch (Exception $e) {
} catch (\Throwable $e) {
// note that error here may not be an issue as failing to acquire a job lock is a valid state
// which happens when other process claimed the job lock first
$this->getLogger()->debug(
Expand Down Expand Up @@ -759,6 +755,8 @@ protected function grabMutex(QueuedJobDescriptor $jobDescriptor)
*/
public function runJob($jobId)
{
$logger = $this->getLogger();

// first retrieve the descriptor
/** @var QueuedJobDescriptor $jobDescriptor */
$jobDescriptor = DataObject::get_by_id(
Expand Down Expand Up @@ -790,7 +788,7 @@ public function runJob($jobId)

$broken = false;

$this->withNestedState(function () use ($jobDescriptor, $jobId, &$broken) {
$this->withNestedState(function () use ($jobDescriptor, $jobId, &$broken, $logger) {
if (!$this->grabMutex($jobDescriptor)) {
return;
}
Expand Down Expand Up @@ -842,27 +840,22 @@ public function runJob($jobId)
);
if (!$jobDescriptor || !$jobDescriptor->exists()) {
$broken = true;
$this->getLogger()->error(
print_r(
[
'errno' => 0,
'errstr' => 'Job descriptor ' . $jobId . ' could not be found',
'errfile' => __FILE__,
'errline' => __LINE__,
'errcontext' => [],
],
true
),
$logger->error(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to test if the $logger is not null?

Copy link
Contributor Author

@chillu chillu Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, we also don't check it in the other dozen places. private $logger is a hard dependency. That's not made super clear because we tend to not inject dependencies through the constructor, but I'd say that's a different discussion (pre-existing issue in the class).

Copy link
Contributor Author

@chillu chillu Aug 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scratch that, just read through the diff again - I'm replacing the Injector::inst()->get(LoggerInterface::class) call which used to be getLogger(). So I'm turning an internal dependency into an external dependency, but making it implicit. I'd say that the default calling method on non object in $this->logger PHP error is pretty much the same than a custom throw "dependency not met" though? Moving this to a constructor based injection would be a cleaner API, but also an API breakage.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I was cranky before - I've fixed it be reinstating the old functionality, but taking preference for $this->logger if defined. So it should never be null.

'Job descriptor ' . $jobId . ' could not be found',
[
'file' => __FILE__,
'line' => __LINE__,
]
);
break;
}

// Add job-specific logger handling. Modifies the job singleton by reference
$this->addJobHandlersToLogger($logger, $job, $jobDescriptor);

if ($jobDescriptor->JobStatus != QueuedJob::STATUS_RUN) {
// we've been paused by something, so we'll just exit
$job->addMessage(_t(
$logger->warning(_t(
__CLASS__ . '.JOB_PAUSED',
'Job paused at {time}',
['time' => DBDatetime::now()->Rfc2822()]
Expand All @@ -871,83 +864,24 @@ public function runJob($jobId)
}

if (!$broken) {
// Inject real-time log handler
$logger = Injector::inst()->get(LoggerInterface::class);
if ($logger instanceof Logger) {
// Check if there is already a handler
$exists = false;
foreach ($logger->getHandlers() as $handler) {
if ($handler instanceof QueuedJobHandler) {
$exists = true;
break;
}
}

if (!$exists) {
// Add the handler
/** @var QueuedJobHandler $queuedJobHandler */
$queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor);

// We only write for every 100 file
$bufferHandler = new BufferHandler(
$queuedJobHandler,
100,
Logger::DEBUG,
true,
true
);

$logger->pushHandler($bufferHandler);
}
} else {
if ($logger instanceof LoggerInterface) {
$logger->warning(
'Monolog not found, messages will not output while the job is running'
);
}
}

// Collect output as job messages as well as sending it to the screen after processing
$obLogger = function ($buffer, $phase) use ($job, $jobDescriptor) {
// Collect output where jobs aren't using the logger singleton
ob_start(function ($buffer, $phase) use ($job, $jobDescriptor) {
$job->addMessage($buffer);
if ($jobDescriptor) {
$this->copyJobToDescriptor($job, $jobDescriptor);
$jobDescriptor->write();
}
return $buffer;
};
ob_start($obLogger, 256);
}, 256);

try {
$job->process();
} catch (Exception $e) {
// okay, we'll just catch this exception for now
$job->addMessage(
_t(
__CLASS__ . '.JOB_EXCEPT',
'Job caused exception {message} in {file} at line {line}',
[
'message' => $e->getMessage(),
'file' => $e->getFile(),
'line' => $e->getLine(),
]
)
);
$this->getLogger()->error(
$e->getMessage(),
[
'exception' => $e,
]
);
} catch (\Throwable $e) {
$logger->error($e->getMessage(), ['exception' => $e]);
$jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN;
$this->extend('updateJobDescriptorAndJobOnException', $jobDescriptor, $job, $e);
}

// Write any remaining batched messages at the end
if (isset($bufferHandler)) {
$bufferHandler->flush();
}

ob_end_flush();

// now check the job state
Expand All @@ -958,26 +892,22 @@ public function runJob($jobId)

if ($stallCount > static::config()->get('stall_threshold')) {
$broken = true;
$job->addMessage(
_t(
__CLASS__ . '.JOB_STALLED',
'Job stalled after {attempts} attempts - please check',
['attempts' => $stallCount]
)
);
$logger->error(_t(
__CLASS__ . '.JOB_STALLED',
'Job stalled after {attempts} attempts - please check',
['attempts' => $stallCount]
));
$jobDescriptor->JobStatus = QueuedJob::STATUS_BROKEN;
}

// now we'll be good and check our memory usage. If it is too high, we'll set the job to
// a 'Waiting' state, and let the next processing run pick up the job.
if ($this->isMemoryTooHigh()) {
$job->addMessage(
_t(
__CLASS__ . '.MEMORY_RELEASE',
'Job releasing memory and waiting ({used} used)',
['used' => $this->humanReadable($this->getMemoryUsage())]
)
);
$logger->warning(_t(
__CLASS__ . '.MEMORY_RELEASE',
'Job releasing memory and waiting ({used} used)',
['used' => $this->humanReadable($this->getMemoryUsage())]
));
if ($jobDescriptor->JobStatus != QueuedJob::STATUS_BROKEN) {
$jobDescriptor->JobStatus = QueuedJob::STATUS_WAIT;
}
Expand All @@ -986,7 +916,7 @@ public function runJob($jobId)

// Also check if we are running too long
if ($this->hasPassedTimeLimit()) {
$job->addMessage(_t(
$logger->warning(_t(
__CLASS__ . '.TIME_LIMIT',
'Queue has passed time limit and will restart before continuing'
));
Expand All @@ -1001,17 +931,8 @@ public function runJob($jobId)
$this->copyJobToDescriptor($job, $jobDescriptor);
$jobDescriptor->write();
} else {
$this->getLogger()->error(
print_r(
[
'errno' => 0,
'errstr' => 'Job descriptor has been set to null',
'errfile' => __FILE__,
'errline' => __LINE__,
'errcontext' => [],
],
true
),
$logger->error(
'Job descriptor has been set to null',
[
'file' => __FILE__,
'line' => __LINE__,
Expand All @@ -1033,15 +954,29 @@ public function runJob($jobId)

$this->extend('updateJobDescriptorAndJobOnCompletion', $jobDescriptor, $job);
}
} catch (Exception $e) {
// PHP 5.6 exception handling
$this->handleBrokenJobException($jobDescriptor, $job, $e);
$broken = true;
} catch (\Throwable $e) {
// PHP 7 Error handling)
$this->handleBrokenJobException($jobDescriptor, $job, $e);
$broken = true;
}

// Write any remaining batched messages at the end.
if ($logger instanceof Logger) {
foreach ($logger->getHandlers() as $handler) {
if ($handler instanceof BufferHandler) {
$handler->flush();
}
}
}

// If using a global singleton logger here,
// any messages added after this point will be auto-flushed on PHP shutdown through the handler.
// This causes a database write, and assumes the database and table will be available at this point.
if ($logger instanceof Logger) {
$logger->setHandlers(array_filter($logger->getHandlers(), function ($handler) {
return !($handler instanceof BufferHandler);
}));
}
});

$this->unsetRunAsUser($runAsUser, $originalUser);
Expand Down Expand Up @@ -1386,9 +1321,25 @@ public function onShutdown()
*/
public function getLogger()
{
// Enable dependency injection
if ($this->logger) {
return $this->logger;
}

// Fall back to implicitly created service
return Injector::inst()->get(LoggerInterface::class);
}

/**
* @param LoggerInterface $logger
*/
public function setLogger(LoggerInterface $logger)
{
$this->logger = $logger;

return $this;
}

public function enableMaintenanceLock()
{
if (!$this->config()->get('lock_file_enabled')) {
Expand Down Expand Up @@ -1452,6 +1403,51 @@ protected function getWorkerExpiry(): string
return $expiry->Rfc2822();
}

/**
* Add job-specific logger functionality which has the ability to flush logs into
* the job descriptor database record. Based on the default logger set for this class,
* which means it'll also log to other channels (e.g. stdout/stderr).
*
* @param QueuedJob $job
* @param QueuedJobDescriptor $jobDescriptor
*/
private function addJobHandlersToLogger(LoggerInterface $logger, QueuedJob $job, QueuedJobDescriptor $jobDescriptor)
{
if ($logger instanceof Logger) {
// Check if there is already a handler
$exists = false;
foreach ($logger->getHandlers() as $handler) {
if ($handler instanceof QueuedJobHandler) {
$exists = true;
break;
}
}

if (!$exists) {
// Add the handler
/** @var QueuedJobHandler $queuedJobHandler */
$queuedJobHandler = QueuedJobHandler::create($job, $jobDescriptor);

// Only write for every 100 messages to avoid excessive database activity
$bufferHandler = new BufferHandler(
$queuedJobHandler,
100,
Logger::DEBUG,
true,
true
);

$logger->pushHandler($bufferHandler);
}
} else {
if ($logger instanceof LoggerInterface) {
$logger->warning(
'Monolog not found, messages will not output while the job is running'
);
}
}
}

/**
* @return string
*/
Expand Down
Loading