Skip to content

Commit 7b12be1

Browse files
authored
fix(move-to-finished): throw error when job is not in active state (#2739)
1 parent 08f2bfd commit 7b12be1

File tree

7 files changed

+60
-76
lines changed

7 files changed

+60
-76
lines changed

lib/commands/addJob-6.lua

Lines changed: 6 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ local jobId
3838
local jobIdKey
3939
local rcall = redis.call
4040

41+
-- Includes
42+
--- @include "includes/addJobWithPriority"
43+
--- @include "includes/getTargetQueueList"
44+
4145
local jobCounter = rcall("INCR", KEYS[4])
4246

4347
if ARGV[2] == "" then
@@ -67,33 +71,15 @@ else
6771

6872
-- Whe check for the meta-paused key to decide if we are paused or not
6973
-- (since an empty list and !EXISTS are not really the same)
70-
local paused
71-
if rcall("EXISTS", KEYS[3]) ~= 1 then
72-
target = KEYS[1]
73-
paused = false
74-
else
75-
target = KEYS[2]
76-
paused = true
77-
end
74+
local target, paused = getTargetQueueList(KEYS[3], KEYS[1], KEYS[2])
7875

7976
-- Standard or priority add
8077
local priority = tonumber(ARGV[9])
8178
if priority == 0 then
8279
-- LIFO or FIFO
8380
rcall(ARGV[10], target, jobId)
8481
else
85-
-- Priority add
86-
rcall("ZADD", KEYS[6], priority, jobId)
87-
local count = rcall("ZCOUNT", KEYS[6], 0, priority)
88-
89-
local len = rcall("LLEN", target)
90-
local id = rcall("LINDEX", target, len - (count-1))
91-
if id then
92-
rcall("LINSERT", target, "BEFORE", id, jobId)
93-
else
94-
rcall("RPUSH", target, jobId)
95-
end
96-
82+
addJobWithPriority(KEYS[6], priority, jobId, target)
9783
end
9884

9985
-- Emit waiting event (wait..ing@token)

lib/commands/includes/batches.lua

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
--[[
2+
Function to loop in batches.
3+
Just a bit of warning, some commands as ZREM
4+
could receive a maximum of 7000 parameters per call.
5+
]]
6+
7+
local function batches(n, batchSize)
8+
local i = 0
9+
10+
return function()
11+
local from = i * batchSize + 1
12+
i = i + 1
13+
if (from <= n) then
14+
local to = math.min(from + batchSize - 1, n)
15+
return from, to
16+
end
17+
end
18+
end

lib/commands/moveStalledJobsToWait-7.lua

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222

2323
local rcall = redis.call
2424

25+
-- Includes
26+
--- @include "includes/batches"
27+
--- @include "includes/getTargetQueueList"
28+
2529
local function removeJob(jobId, baseKey)
2630
local jobKey = baseKey .. jobId
2731
rcall("DEL", jobKey, jobKey .. ':logs')
@@ -45,19 +49,6 @@ local function removeJobsByMaxCount(maxCount, targetSet, prefix)
4549
rcall("ZREMRANGEBYRANK", targetSet, 0, -(maxCount + 1))
4650
end
4751

48-
local function batches(n, batchSize)
49-
local i = 0
50-
51-
return function()
52-
local from = i * batchSize + 1
53-
i = i + 1
54-
if (from <= n) then
55-
local to = math.min(from + batchSize - 1, n)
56-
return from, to
57-
end
58-
end
59-
end
60-
6152
-- Check if we need to check for stalled jobs now.
6253
if rcall("EXISTS", KEYS[5]) == 1 then
6354
return {{}, {}}
@@ -70,15 +61,6 @@ local stalling = rcall('SMEMBERS', KEYS[1])
7061
local stalled = {}
7162
local failed = {}
7263
if(#stalling > 0) then
73-
74-
local dst
75-
-- wait or paused destination
76-
if rcall("EXISTS", KEYS[6]) ~= 1 then
77-
dst = KEYS[2]
78-
else
79-
dst = KEYS[7]
80-
end
81-
8264
rcall('DEL', KEYS[1])
8365

8466
local MAX_STALLED_JOB_COUNT = tonumber(ARGV[1])
@@ -129,8 +111,10 @@ if(#stalling > 0) then
129111

130112
table.insert(failed, jobId)
131113
else
114+
local target = getTargetQueueList(KEYS[6], KEYS[2], KEYS[7])
115+
132116
-- Move the job back to the wait queue, to immediately be picked up by a waiting worker.
133-
rcall("RPUSH", dst, jobId)
117+
rcall("RPUSH", target, jobId)
134118
rcall('PUBLISH', KEYS[1] .. '@', jobId)
135119
table.insert(stalled, jobId)
136120
end

lib/commands/moveToFinished-9.lua

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,10 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
9292
end
9393
end
9494

95-
-- Remove from active list
96-
rcall("LREM", KEYS[1], -1, ARGV[1])
95+
-- Remove from active list (if not active we shall return error)
96+
local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[1])
97+
98+
if numRemovedElements < 1 then return -3 end
9799

98100
-- Remove job?
99101
local keepJobs = cmsgpack.unpack(ARGV[6])

lib/commands/retryJobs-5.lua

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,8 @@ local maxCount = tonumber(ARGV[1])
1919

2020
local rcall = redis.call;
2121

22-
local function batches(n, batchSize)
23-
local i = 0
24-
25-
return function()
26-
local from = i * batchSize + 1
27-
i = i + 1
28-
if (from <= n) then
29-
local to = math.min(from + batchSize - 1, n)
30-
return from, to
31-
end
32-
end
33-
end
22+
-- Includes
23+
--- @include "includes/batches"
3424

3525
local function getZSetItems(keyName, max)
3626
return rcall('ZRANGE', keyName, 0, max - 1)

lib/commands/updateDelaySet-6.lua

Lines changed: 6 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,18 @@
2020
]]
2121
local rcall = redis.call;
2222

23+
-- Includes
24+
--- @include "includes/addJobWithPriority"
25+
--- @include "includes/getTargetQueueList"
26+
2327
-- Try to get as much as 1000 jobs at once
2428
local jobs = rcall("ZRANGEBYSCORE", KEYS[1], 0, tonumber(ARGV[2]) * 0x1000, "LIMIT", 0, 1000)
2529

2630
if(#jobs > 0) then
2731
rcall("ZREM", KEYS[1], unpack(jobs))
2832

2933
-- check if we need to use push in paused instead of waiting
30-
local target;
31-
if rcall("EXISTS", KEYS[6]) ~= 1 then
32-
target = KEYS[3]
33-
else
34-
target = KEYS[5]
35-
end
34+
local target = getTargetQueueList(KEYS[6], KEYS[3], KEYS[5])
3635

3736
for _, jobId in ipairs(jobs) do
3837
-- Is this really needed?
@@ -44,17 +43,7 @@ if(#jobs > 0) then
4443
-- LIFO or FIFO
4544
rcall("LPUSH", target, jobId)
4645
else
47-
-- Priority add
48-
rcall("ZADD", KEYS[4], priority, jobId)
49-
local count = rcall("ZCOUNT", KEYS[4], 0, priority)
50-
51-
local len = rcall("LLEN", target)
52-
local id = rcall("LINDEX", target, len - (count-1))
53-
if id then
54-
rcall("LINSERT", target, "BEFORE", id, jobId)
55-
else
56-
rcall("RPUSH", target, jobId)
57-
end
46+
addJobWithPriority(KEYS[4], priority, jobId, target)
5847
end
5948

6049
-- Emit waiting event (wait..ing@token)

test/test_job.js

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -608,12 +608,15 @@ describe('Job', () => {
608608
describe('.moveToCompleted', () => {
609609
it('marks the job as completed and returns new job', () => {
610610
return Job.create(queue, { foo: 'bar' }).then(job1 => {
611-
return Job.create(queue, { foo: 'bar' }).then(job2 => {
611+
return Job.create(queue, { foo: 'bar' }, { lifo: true }).then(job2 => {
612612
return job2
613613
.isCompleted()
614614
.then(isCompleted => {
615615
expect(isCompleted).to.be(false);
616616
})
617+
.then(() => {
618+
return scripts.moveToActive(queue);
619+
})
617620
.then(() => {
618621
return job2.moveToCompleted('succeeded', true);
619622
})
@@ -637,6 +640,9 @@ describe('Job', () => {
637640
.then(isFailed => {
638641
expect(isFailed).to.be(false);
639642
})
643+
.then(() => {
644+
return scripts.moveToActive(queue);
645+
})
640646
.then(() => {
641647
return job.moveToFailed(new Error('test error'), true);
642648
})
@@ -702,6 +708,9 @@ describe('Job', () => {
702708
.then(isFailed => {
703709
expect(isFailed).to.be(false);
704710
})
711+
.then(() => {
712+
return scripts.moveToActive(queue);
713+
})
705714
.then(() => {
706715
return job.moveToFailed(new Error('test error'), true);
707716
})
@@ -747,16 +756,22 @@ describe('Job', () => {
747756

748757
it('applies stacktrace limit on failure', () => {
749758
const stackTraceLimit = 1;
750-
return Job.create(queue, { foo: 'bar' }, { stackTraceLimit }).then(
759+
return Job.create(queue, { foo: 'bar' }, { stackTraceLimit, attempts: 2 }).then(
751760
job => {
752761
return job
753762
.isFailed()
754763
.then(isFailed => {
755764
expect(isFailed).to.be(false);
756765
})
766+
.then(() => {
767+
return scripts.moveToActive(queue);
768+
})
757769
.then(() => {
758770
return job.moveToFailed(new Error('test error'), true);
759771
})
772+
.then(() => {
773+
return scripts.moveToActive(queue);
774+
})
760775
.then(() => {
761776
return job
762777
.moveToFailed(new Error('test error'), true)

0 commit comments

Comments
 (0)