Skip to content

Commit fa9aedb

Browse files
[1.x] Correct queued listeners' "queue" name (#313)
* Fix queue from queued listener * Fix code styling * Update Queues.php --------- Co-authored-by: timacdonald <[email protected]> Co-authored-by: Taylor Otwell <[email protected]>
1 parent 801ec40 commit fa9aedb

File tree

3 files changed

+80
-5
lines changed

3 files changed

+80
-5
lines changed

src/Recorders/Queues.php

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44

55
use Carbon\CarbonImmutable;
66
use Illuminate\Config\Repository;
7+
use Illuminate\Events\CallQueuedListener;
78
use Illuminate\Queue\Events\JobFailed;
89
use Illuminate\Queue\Events\JobProcessed;
910
use Illuminate\Queue\Events\JobProcessing;
1011
use Illuminate\Queue\Events\JobQueued;
1112
use Illuminate\Queue\Events\JobReleasedAfterException;
1213
use Laravel\Pulse\Pulse;
14+
use ReflectionClass;
1315

1416
/**
1517
* @internal
@@ -57,10 +59,7 @@ public function record(JobReleasedAfterException|JobFailed|JobProcessed|JobProce
5759
JobQueued::class => $event->connectionName,
5860
default => $event->job->getConnectionName(), // @phpstan-ignore method.nonObject
5961
},
60-
match ($class) {
61-
JobQueued::class => $event->job->queue ?? null,
62-
default => $event->job->getQueue(), // @phpstan-ignore method.nonObject
63-
},
62+
$this->resolveQueue($event),
6463
match ($class) {
6564
JobQueued::class => $event->payload()['uuid'], // @phpstan-ignore method.notFound
6665
default => $event->job->uuid(), // @phpstan-ignore method.nonObject
@@ -131,4 +130,31 @@ protected function normalizeSqsQueue(string $connection, string $queue): string
131130

132131
return $queue;
133132
}
133+
134+
/**
135+
* Resolve the queue.
136+
*/
137+
protected function resolveQueue(JobReleasedAfterException|JobFailed|JobProcessed|JobProcessing|JobQueued $event): ?string
138+
{
139+
return match ($event::class) {
140+
JobQueued::class => match (is_object($event->job) ? $event->job::class : $event->job) {
141+
CallQueuedListener::class => $this->resolveQueuedListenerQueue($event),
142+
default => $event->job->queue ?? null,
143+
},
144+
default => $event->job->getQueue(), // @phpstan-ignore method.nonObject
145+
};
146+
}
147+
148+
/**
149+
* Resolve the queued listener's queue.
150+
*/
151+
protected function resolveQueuedListenerQueue(JobQueued $event): ?string
152+
{
153+
return with(
154+
(new ReflectionClass($event->job->class))->newInstanceWithoutConstructor(), // @phpstan-ignore property.nonObject
155+
fn ($listener) => method_exists($listener, 'viaQueue')
156+
? $listener->viaQueue($event->job->data[0] ?? null)
157+
: ($listener->queue ?? null)
158+
);
159+
}
134160
}

tests/Feature/Recorders/QueuesTest.php

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,28 @@ function queueAggregates()
652652
);
653653
});
654654

655+
it('captures correct queue name for class based queued listeners', function () {
656+
Config::set('queue.default', 'database');
657+
658+
Event::listen('my-event', MyListenerWithCustomQueue::class);
659+
Event::listen(MyEvent::class, MyListenerWithCustomQueue::class);
660+
Event::listen(MyEvent::class, MyListenerWithViaQueue::class);
661+
Event::dispatch('my-event');
662+
Event::dispatch(new MyEvent);
663+
Pulse::ingest();
664+
Artisan::call('queue:work', ['--queue' => 'custom_queue', '--max-jobs' => 3, '--tries' => 1, '--stop-when-empty' => true, '--sleep' => 0]);
665+
666+
Pulse::ignore(fn () => expect(Queue::size())->toBe(0));
667+
$aggregates = queueAggregates();
668+
expect($aggregates)->toHaveCount(12);
669+
expect($aggregates)->toContainAggregateForAllPeriods(
670+
type: ['queued', 'processing', 'processed'],
671+
aggregate: 'count',
672+
key: 'database:custom_queue',
673+
value: '3.00',
674+
);
675+
});
676+
655677
class MyJob implements ShouldQueue
656678
{
657679
public function handle()
@@ -747,3 +769,30 @@ public function handle()
747769
$this->fail();
748770
}
749771
}
772+
773+
class MyListenerWithCustomQueue implements ShouldQueue
774+
{
775+
use InteractsWithQueue;
776+
777+
public $queue = 'custom_queue';
778+
779+
public function handle(): void
780+
{
781+
//
782+
}
783+
}
784+
785+
class MyListenerWithViaQueue implements ShouldQueue
786+
{
787+
use InteractsWithQueue;
788+
789+
public function handle(): void
790+
{
791+
//
792+
}
793+
794+
public function viaQueue(object $event)
795+
{
796+
return 'custom_queue';
797+
}
798+
}

tests/Pest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
|
5656
*/
5757

58-
expect()->extend('toContainAggregateForAllPeriods', function (string|array $type, string $aggregate, string $key, int $value, ?int $count = null, ?int $timestamp = null) {
58+
expect()->extend('toContainAggregateForAllPeriods', function (string|array $type, string $aggregate, string $key, int|float|string $value, ?int $count = null, ?int $timestamp = null) {
5959
$this->toBeInstanceOf(Collection::class);
6060

6161
$values = $this->value->each(function (stdClass $value) {

0 commit comments

Comments
 (0)