Skip to content

Commit

Permalink
Fix queue from queued listener
Browse files Browse the repository at this point in the history
  • Loading branch information
timacdonald committed Feb 13, 2024
1 parent 801ec40 commit d27d193
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 5 deletions.
34 changes: 30 additions & 4 deletions src/Recorders/Queues.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

use Carbon\CarbonImmutable;
use Illuminate\Config\Repository;
use Illuminate\Events\CallQueuedListener;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Queue\Events\JobReleasedAfterException;
use Laravel\Pulse\Pulse;
use ReflectionClass;

/**
* @internal
Expand Down Expand Up @@ -57,10 +59,7 @@ public function record(JobReleasedAfterException|JobFailed|JobProcessed|JobProce
JobQueued::class => $event->connectionName,
default => $event->job->getConnectionName(), // @phpstan-ignore method.nonObject
},
match ($class) {
JobQueued::class => $event->job->queue ?? null,
default => $event->job->getQueue(), // @phpstan-ignore method.nonObject
},
$this->resolveQueue($event),
match ($class) {
JobQueued::class => $event->payload()['uuid'], // @phpstan-ignore method.notFound
default => $event->job->uuid(), // @phpstan-ignore method.nonObject
Expand Down Expand Up @@ -131,4 +130,31 @@ protected function normalizeSqsQueue(string $connection, string $queue): string

return $queue;
}

/**
* Resolve the queue.
*/
protected function resolveQueue(JobReleasedAfterException|JobFailed|JobProcessed|JobProcessing|JobQueued $event): ?string
{
return match ($event::class) {
JobQueued::class => match(is_object($event->job) ? $event->job::class : $event->job) {
CallQueuedListener::class => $this->resolveQueuedListenerQueue($event),
default => $event->job->queue ?? null,
},
default => $event->job->getQueue(), // @phpstan-ignore method.nonObject
};
}

/**
* Resolve the queued listeners queue.
*/
protected function resolveQueuedListenerQueue(JobQueued $event): ?string
{
return with(
(new ReflectionClass($event->job->class))->newInstanceWithoutConstructor(), // @phpstan-ignore property.nonObject
fn ($listener) => method_exists($listener, 'viaQueue')
? $listener->viaQueue($event->job->data[0] ?? null)
: ($listener->queue ?? null)
);
}
}
49 changes: 49 additions & 0 deletions tests/Feature/Recorders/QueuesTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,28 @@ function queueAggregates()
);
});

it('captures correct queue name for class based queued listeners', function () {
Config::set('queue.default', 'database');

Event::listen('my-event', MyListenerWithCustomQueue::class);
Event::listen(MyEvent::class, MyListenerWithCustomQueue::class);
Event::listen(MyEvent::class, MyListenerWithViaQueue::class);
Event::dispatch('my-event');
Event::dispatch(new MyEvent);
Pulse::ingest();
Artisan::call('queue:work', ['--queue' => 'custom_queue', '--max-jobs' => 3, '--tries' => 1, '--stop-when-empty' => true, '--sleep' => 0]);

Pulse::ignore(fn () => expect(Queue::size())->toBe(0));
$aggregates = queueAggregates();
expect($aggregates)->toHaveCount(12);
expect($aggregates)->toContainAggregateForAllPeriods(
type: ['queued', 'processing', 'processed'],
aggregate: 'count',
key: 'database:custom_queue',
value: '3.00',
);
});

class MyJob implements ShouldQueue
{
public function handle()
Expand Down Expand Up @@ -747,3 +769,30 @@ public function handle()
$this->fail();
}
}

class MyListenerWithCustomQueue implements ShouldQueue
{
use InteractsWithQueue;

public $queue = 'custom_queue';

public function handle(): void
{
//
}
}

class MyListenerWithViaQueue implements ShouldQueue
{
use InteractsWithQueue;

public function handle(): void
{
//
}

public function viaQueue(object $event)
{
return 'custom_queue';
}
}
2 changes: 1 addition & 1 deletion tests/Pest.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
|
*/

expect()->extend('toContainAggregateForAllPeriods', function (string|array $type, string $aggregate, string $key, int $value, ?int $count = null, ?int $timestamp = null) {
expect()->extend('toContainAggregateForAllPeriods', function (string|array $type, string $aggregate, string $key, int|float|string $value, ?int $count = null, ?int $timestamp = null) {
$this->toBeInstanceOf(Collection::class);

$values = $this->value->each(function (stdClass $value) {
Expand Down

0 comments on commit d27d193

Please sign in to comment.