-
Notifications
You must be signed in to change notification settings - Fork 14
/
DependentJobProcessor.php
113 lines (90 loc) · 2.94 KB
/
DependentJobProcessor.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
<?php
namespace Enqueue\JobQueue;
use Enqueue\Client\Message as ClientMessage;
use Enqueue\Client\ProducerInterface;
use Enqueue\Client\TopicSubscriberInterface;
use Enqueue\Consumption\Result;
use Enqueue\JobQueue\Doctrine\JobStorage;
use Enqueue\Util\JSON;
use Interop\Queue\Context;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Psr\Log\LoggerInterface;
class DependentJobProcessor implements Processor, TopicSubscriberInterface
{
/**
* @var JobStorage
*/
private $jobStorage;
/**
* @var ProducerInterface
*/
private $producer;
/**
* @var LoggerInterface
*/
private $logger;
public function __construct(JobStorage $jobStorage, ProducerInterface $producer, LoggerInterface $logger)
{
$this->jobStorage = $jobStorage;
$this->producer = $producer;
$this->logger = $logger;
}
/**
* {@inheritdoc}
*/
public function process(Message $message, Context $context)
{
$data = JSON::decode($message->getBody());
if (!isset($data['jobId'])) {
$this->logger->critical(sprintf(
'[DependentJobProcessor] Got invalid message. body: "%s"',
$message->getBody()
));
return Result::REJECT;
}
$job = $this->jobStorage->findJobById($data['jobId']);
if (!$job) {
$this->logger->critical(sprintf(
'[DependentJobProcessor] Job was not found. id: "%s"',
$data['jobId']
));
return Result::REJECT;
}
if (!$job->isRoot()) {
$this->logger->critical(sprintf(
'[DependentJobProcessor] Expected root job but got child. id: "%s"',
$data['jobId']
));
return Result::REJECT;
}
$jobData = $job->getData();
if (!isset($jobData['dependentJobs'])) {
return Result::ACK;
}
$dependentJobs = $jobData['dependentJobs'];
foreach ($dependentJobs as $dependentJob) {
if (!isset($dependentJob['topic']) || !isset($dependentJob['message'])) {
$this->logger->critical(sprintf(
'[DependentJobProcessor] Got invalid dependent job data. job: "%s", dependentJob: "%s"',
$job->getId(),
JSON::encode($dependentJob)
));
return Result::REJECT;
}
}
foreach ($dependentJobs as $dependentJob) {
$message = new ClientMessage();
$message->setBody($dependentJob['message']);
if (isset($dependentJob['priority'])) {
$message->setPriority($dependentJob['priority']);
}
$this->producer->sendEvent($dependentJob['topic'], $message);
}
return Result::ACK;
}
public static function getSubscribedTopics()
{
return Topics::ROOT_JOB_STOPPED;
}
}