Skip to content

Commit 06da5d2

Browse files
author
Admin
committed
1 parent ceec254 commit 06da5d2

File tree

4 files changed

+68
-27
lines changed

4 files changed

+68
-27
lines changed

illuminate/Bus/UniqueLock.php

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
namespace Illuminate\Bus;
44

55
use Illuminate\Contracts\Cache\Repository as Cache;
6-
use Illuminate\Contracts\Queue\ShouldBeUnique;
7-
use Illuminate\Queue\SerializesModels;
86

97
class UniqueLock
108
{
@@ -34,13 +32,6 @@ public function __construct(Cache $cache)
3432
*/
3533
public function acquire($job): bool
3634
{
37-
if (
38-
$job instanceof ShouldBeUnique
39-
&& \in_array(SerializesModels::class, \class_uses_recursive($job::class), true)
40-
) {
41-
throw new \RuntimeException('ShouldBeUnique not supported in combination with SerializesModels');
42-
}
43-
4435
$uniqueFor = method_exists($job, 'uniqueFor')
4536
? $job->uniqueFor()
4637
: ($job->uniqueFor ?? 0);
@@ -72,12 +63,22 @@ public function release($job): void
7263
*
7364
* @param mixed $job
7465
*/
75-
protected function getKey($job): string
66+
public static function getKey($job): string
7667
{
7768
$uniqueId = method_exists($job, 'uniqueId')
7869
? $job->uniqueId()
7970
: ($job->uniqueId ?? '');
8071

8172
return 'laravel_unique_job:' . get_class($job) . $uniqueId;
8273
}
74+
75+
/**
76+
* Determine the cache store used by the unique job to acquire locks.
77+
*/
78+
public static function getUniqueJobCacheStore(mixed $job): ?string
79+
{
80+
return \method_exists($job, 'uniqueVia')
81+
? $job->uniqueVia()->getName()
82+
: \config('cache.default');
83+
}
8384
}

illuminate/Queue/CallQueuedHandler.php

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Illuminate\Bus\Batchable;
77
use Illuminate\Bus\UniqueLock;
88
use Illuminate\Contracts\Bus\Dispatcher;
9+
use Illuminate\Contracts\Cache\Factory as CacheFactory;
910
use Illuminate\Contracts\Cache\Repository as Cache;
1011
use Illuminate\Contracts\Container\Container;
1112
use Illuminate\Contracts\Encryption\Encrypter;
@@ -250,6 +251,8 @@ protected function handleModelNotFound(Job $job, $e)
250251
$shouldDelete = false;
251252
}
252253

254+
$this->ensureUniqueJobLockIsReleasedViaJobPayload($job);
255+
253256
if ($shouldDelete) {
254257
$job->delete();
255258

@@ -259,6 +262,26 @@ protected function handleModelNotFound(Job $job, $e)
259262
$job->fail($e);
260263
}
261264

265+
/**
266+
* Ensure the lock for a unique job is released via job payload.
267+
*
268+
* This is required when we can't unserialize the job due to missing models.
269+
*/
270+
protected function ensureUniqueJobLockIsReleasedViaJobPayload(Job $job): void
271+
{
272+
$store = $job->payload()['uniqueJobCacheStore'] ?? '';
273+
$key = $job->payload()['uniqueJobKey'] ?? '';
274+
275+
if ('' === $store || '' === $key) {
276+
return;
277+
}
278+
279+
$this->container->make(CacheFactory::class)
280+
->store($store)
281+
->lock($key)
282+
->forceRelease();
283+
}
284+
262285
/**
263286
* Call the failed method on the job instance.
264287
*

illuminate/Queue/Queue.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44

55
use Closure;
66
use DateTimeInterface;
7+
use Illuminate\Bus\UniqueLock;
78
use Illuminate\Container\Container;
89
use Illuminate\Contracts\Encryption\Encrypter;
910
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
11+
use Illuminate\Contracts\Queue\ShouldBeUnique;
1012
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
1113
use Illuminate\Queue\Events\JobQueued;
1214
use Illuminate\Queue\Events\JobQueueing;
@@ -104,6 +106,13 @@ protected function createPayload($job, $queue, $data = '')
104106
$job = CallQueuedClosure::create($job);
105107
}
106108

109+
if ($job instanceof ShouldBeUnique) {
110+
self::createPayloadUsing(fn (): array => [
111+
'uniqueJobKey' => UniqueLock::getKey($job),
112+
'uniqueJobCacheStore' => UniqueLock::getUniqueJobCacheStore($job)
113+
]);
114+
}
115+
107116
$payload = json_encode($value = $this->createPayloadArray($job, $queue, $data), \JSON_UNESCAPED_UNICODE);
108117

109118
if (json_last_error() !== JSON_ERROR_NONE) {

src/Bus/PendingDispatch.php

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
namespace Laravel\Lumen\Bus;
44

5+
use Illuminate\Bus\UniqueLock;
56
use Illuminate\Container\Container;
67
use Illuminate\Contracts\Bus\Dispatcher;
78
use Illuminate\Contracts\Cache\Repository as Cache;
89
use Illuminate\Contracts\Queue\ShouldBeUnique;
9-
use Illuminate\Queue\SerializesModels;
1010

1111
class PendingDispatch
1212
{
@@ -17,6 +17,14 @@ class PendingDispatch
1717
*/
1818
protected $job;
1919

20+
/**
21+
* Indicates if the job should be dispatched immediately after sending the response.
22+
*
23+
* @var bool
24+
*/
25+
protected $afterResponse = false;
26+
27+
2028
/**
2129
* Create a new pending job dispatch.
2230
*
@@ -54,34 +62,32 @@ public function onQueue($queue)
5462
return $this;
5563
}
5664

65+
/**
66+
* Indicate that the job should be dispatched after the response is sent to the browser.
67+
*/
68+
public function afterResponse(): static
69+
{
70+
$this->afterResponse = true;
71+
72+
return $this;
73+
}
74+
5775
/**
5876
* Determine if the job should be dispatched.
5977
*
6078
* @return bool
6179
* @throws \RuntimeException
80+
* @throws \ReflectionException
81+
* @throws \Illuminate\Contracts\Container\BindingResolutionException
6282
*/
6383
protected function shouldDispatch(): bool
6484
{
6585
if (!$this->job instanceof ShouldBeUnique) {
6686
return true;
6787
}
6888

69-
if (\in_array(SerializesModels::class, \class_uses_recursive($this->job::class), true)) {
70-
throw new \RuntimeException('ShouldBeUnique not supported in combination with SerializesModels');
71-
}
72-
73-
$uniqueId = method_exists($this->job, 'uniqueId')
74-
? $this->job->uniqueId()
75-
: ($this->job->uniqueId ?? '');
76-
77-
$cache = method_exists($this->job, 'uniqueVia')
78-
? $this->job->uniqueVia()
79-
: Container::getInstance()->make(Cache::class);
80-
81-
return (bool)$cache->lock(
82-
$key = 'laravel_unique_job:' . get_class($this->job) . $uniqueId,
83-
$this->job->uniqueFor ?? 0
84-
)->get();
89+
return (new UniqueLock(Container::getInstance()->make(Cache::class)))
90+
->acquire($this->job);
8591
}
8692

8793
/**
@@ -93,6 +99,8 @@ public function __destruct()
9399
{
94100
if (!$this->shouldDispatch()) {
95101
return;
102+
} elseif ($this->afterResponse) {
103+
app(Dispatcher::class)->dispatchAfterResponse($this->job);
96104
} else {
97105
app(Dispatcher::class)->dispatch($this->job);
98106
}

0 commit comments

Comments
 (0)