Skip to content

Commit 603befe

Browse files
authored
feat(job): support debouncing (#2760)
1 parent f6d29fc commit 603befe

13 files changed

+292
-29
lines changed

lib/commands/addJob-6.lua

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,17 @@
3333
ARGV[9] priority
3434
ARGV[10] LIFO
3535
ARGV[11] token
36+
ARGV[12] debounce key
37+
ARGV[13] debounceId
38+
ARGV[14] debounceTtl
3639
]]
3740
local jobId
3841
local jobIdKey
3942
local rcall = redis.call
4043

4144
-- Includes
4245
--- @include "includes/addJobWithPriority"
46+
--- @include "includes/debounceJob"
4347
--- @include "includes/getTargetQueueList"
4448

4549
local jobCounter = rcall("INCR", KEYS[4])
@@ -56,10 +60,28 @@ else
5660
end
5761
end
5862

63+
local debounceKey = ARGV[12]
64+
5965
local opts = cmsgpack.unpack(ARGV[5])
6066

61-
-- Store the job.
62-
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp", ARGV[6], "delay", ARGV[7], "priority", ARGV[9])
67+
local debouncedJobId = debounceJob(ARGV[1], ARGV[13], ARGV[14],
68+
jobId, debounceKey, ARGV[11])
69+
if debouncedJobId then
70+
return debouncedJobId
71+
end
72+
73+
local debounceId = ARGV[13]
74+
75+
local optionalValues = {}
76+
77+
if debounceId ~= "" then
78+
table.insert(optionalValues, "deid")
79+
table.insert(optionalValues, debounceId)
80+
end
81+
82+
-- Store the job.
83+
rcall("HMSET", jobIdKey, "name", ARGV[3], "data", ARGV[4], "opts", opts, "timestamp",
84+
ARGV[6], "delay", ARGV[7], "priority", ARGV[9], unpack(optionalValues))
6385

6486
-- Check if job is delayed
6587
local delayedTimestamp = tonumber(ARGV[8])

lib/commands/cleanJobsInSet-3.lua

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
KEYS[2] priority key
77
KEYS[3] rate limiter key
88
9-
ARGV[1] jobId
9+
ARGV[1] prefix key
1010
ARGV[2] maxTimestamp
1111
ARGV[3] limit the number of jobs to be removed. 0 is unlimited
1212
ARGV[4] set name, can be any of 'wait', 'active', 'paused', 'delayed', 'completed', or 'failed'
@@ -16,14 +16,17 @@ local setKey = KEYS[1]
1616
local priorityKey = KEYS[2]
1717
local rateLimiterKey = KEYS[3]
1818

19-
local jobKeyPrefix = ARGV[1]
19+
local prefixKey = ARGV[1]
2020
local maxTimestamp = ARGV[2]
2121
local limitStr = ARGV[3]
2222
local setName = ARGV[4]
2323

2424
local isList = false
2525
local rcall = redis.call
2626

27+
-- Includes
28+
--- @include "includes/removeDebounceKey"
29+
2730
if setName == "wait" or setName == "active" or setName == "paused" then
2831
isList = true
2932
end
@@ -75,7 +78,7 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
7578
break
7679
end
7780

78-
local jobKey = jobKeyPrefix .. jobId
81+
local jobKey = prefixKey .. jobId
7982
if (rcall("EXISTS", jobKey .. ":lock") == 0) then
8083
-- Find the right timestamp of the job to compare to maxTimestamp:
8184
-- * finishedOn says when the job was completed, but it isn't set unless the job has actually completed
@@ -98,6 +101,11 @@ while ((limit <= 0 or deletedCount < limit) and next(jobIds, nil) ~= nil) do
98101
rcall("ZREM", setKey, jobId)
99102
end
100103
rcall("ZREM", priorityKey, jobId)
104+
105+
if setName ~= "completed" and setName ~= "failed" then
106+
removeDebounceKey(prefixKey, jobKey)
107+
end
108+
101109
rcall("DEL", jobKey)
102110
rcall("DEL", jobKey .. ":logs")
103111

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
--[[
2+
Function to debounce a job.
3+
]]
4+
5+
local function debounceJob(prefixKey, debounceId, ttl, jobId, debounceKey, token)
6+
if debounceId ~= "" then
7+
local debounceKeyExists
8+
if ttl ~= "" then
9+
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'PX', ttl, 'NX')
10+
else
11+
debounceKeyExists = not rcall('SET', debounceKey, jobId, 'NX')
12+
end
13+
if debounceKeyExists then
14+
local currentDebounceJobId = rcall('GET', debounceKey)
15+
rcall("PUBLISH", prefixKey .. "debounced@" .. token, currentDebounceJobId)
16+
17+
return currentDebounceJobId
18+
end
19+
end
20+
end
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
2+
--[[
3+
Function to remove debounce key.
4+
]]
5+
6+
local function removeDebounceKey(prefixKey, jobKey)
7+
local debounceId = rcall("HGET", jobKey, "deid")
8+
if debounceId then
9+
local debounceKey = prefixKey .. "de:" .. debounceId
10+
rcall("DEL", debounceKey)
11+
end
12+
end
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
--[[
2+
Function to remove debounce key if needed.
3+
]]
4+
5+
local function removeDebounceKeyIfNeeded(prefixKey, debounceId)
6+
if debounceId then
7+
local debounceKey = prefixKey .. "de:" .. debounceId
8+
local pttl = rcall("PTTL", debounceKey)
9+
10+
if pttl == 0 or pttl == -1 then
11+
rcall("DEL", debounceKey)
12+
end
13+
end
14+
end

lib/commands/moveStalledJobsToWait-7.lua

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ local rcall = redis.call
2525
-- Includes
2626
--- @include "includes/batches"
2727
--- @include "includes/getTargetQueueList"
28+
--- @include "includes/removeDebounceKeyIfNeeded"
2829

2930
local function removeJob(jobId, baseKey)
3031
local jobKey = baseKey .. jobId
@@ -78,12 +79,13 @@ if(#stalling > 0) then
7879
-- If this job has been stalled too many times, such as if it crashes the worker, then fail it.
7980
local stalledCount = rcall("HINCRBY", jobKey, "stalledCounter", 1)
8081
if(stalledCount > MAX_STALLED_JOB_COUNT) then
81-
local rawOpts = rcall("HGET", jobKey, "opts")
82-
local opts = cjson.decode(rawOpts)
82+
local jobAttributes = rcall("HMGET", jobKey, "opts", "deid")
83+
local opts = cjson.decode(jobAttributes[1])
8384
local removeOnFailType = type(opts["removeOnFail"])
8485
rcall("ZADD", KEYS[4], ARGV[3], jobId)
8586
rcall("HMSET", jobKey, "failedReason", "job stalled more than allowable limit",
8687
"finishedOn", ARGV[3])
88+
removeDebounceKeyIfNeeded(ARGV[2], jobAttributes[2])
8789
rcall("PUBLISH", KEYS[4], '{"jobId":"' .. jobId .. '", "val": "job stalled more than maxStalledCount"}')
8890

8991
if removeOnFailType == "number" then

lib/commands/moveToFinished-9.lua

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ end
8484

8585
-- Includes
8686
--- @include "includes/removeLock"
87+
--- @include "includes/removeDebounceKeyIfNeeded"
8788

8889
if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
8990
local errorCode = removeLock(KEYS[3], KEYS[8], ARGV[5], ARGV[1])
@@ -96,6 +97,9 @@ if rcall("EXISTS", KEYS[3]) == 1 then -- // Make sure job exists
9697

9798
if numRemovedElements < 1 then return -3 end
9899

100+
local debounceId = rcall("HGET", KEYS[3], "deid")
101+
removeDebounceKeyIfNeeded(ARGV[9], debounceId)
102+
99103
-- Remove job?
100104
local keepJobs = cmsgpack.unpack(ARGV[6])
101105
local maxCount = keepJobs['count']

lib/commands/obliterate-2.lua

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@ local maxCount = tonumber(ARGV[1])
1818
local baseKey = KEYS[2]
1919

2020
local rcall = redis.call
21+
22+
-- Includes
23+
--- @include "includes/removeDebounceKey"
24+
2125
local function getListItems(keyName, max)
2226
return rcall('LRANGE', keyName, 0, max - 1)
2327
end
@@ -26,23 +30,24 @@ local function getZSetItems(keyName, max)
2630
return rcall('ZRANGE', keyName, 0, max - 1)
2731
end
2832

29-
local function removeJobs(parentKey, keys)
33+
local function removeJobs(baseKey, keys)
3034
for i, key in ipairs(keys) do
31-
rcall("DEL", baseKey .. key)
32-
rcall("DEL", baseKey .. key .. ':logs')
35+
local jobKey = baseKey .. key
36+
rcall("DEL", jobKey, jobKey .. ':logs')
37+
removeDebounceKey(baseKey, jobKey)
3338
end
3439
maxCount = maxCount - #keys
3540
end
3641

3742
local function removeListJobs(keyName, max)
3843
local jobs = getListItems(keyName, max)
39-
removeJobs(keyName, jobs)
44+
removeJobs(baseKey, jobs)
4045
rcall("LTRIM", keyName, #jobs, -1)
4146
end
4247

4348
local function removeZSetJobs(keyName, max)
4449
local jobs = getZSetItems(keyName, max)
45-
removeJobs(keyName, jobs)
50+
removeJobs(baseKey, jobs)
4651
if (#jobs > 0) then rcall("ZREM", keyName, unpack(jobs)) end
4752
end
4853

@@ -65,7 +70,7 @@ if (#activeJobs > 0) then
6570
end
6671

6772
removeLockKeys(activeJobs)
68-
removeJobs(activeKey, activeJobs)
73+
removeJobs(baseKey, activeJobs)
6974
rcall("LTRIM", activeKey, #activeJobs, -1)
7075
if (maxCount <= 0) then return 1 end
7176

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,17 @@
33
In order to be able to remove a job, it must be unlocked.
44
55
Input:
6-
KEYS[1] 'active',
7-
KEYS[2] 'wait',
8-
KEYS[3] 'delayed',
9-
KEYS[4] 'paused',
10-
KEYS[5] 'completed',
11-
KEYS[6] 'failed',
12-
KEYS[7] 'priority',
13-
KEYS[8] jobId
14-
KEYS[9] job logs
6+
KEYS[1] 'active',
7+
KEYS[2] 'wait',
8+
KEYS[3] 'delayed',
9+
KEYS[4] 'paused',
10+
KEYS[5] 'completed',
11+
KEYS[6] 'failed',
12+
KEYS[7] 'priority',
13+
KEYS[8] jobId key
14+
KEYS[9] job logs
1515
KEYS[10] rate limiter index table
16+
KEYS[11] prefix key
1617
1718
ARGV[1] jobId
1819
ARGV[2] lock token
@@ -24,8 +25,12 @@
2425
-- TODO PUBLISH global event 'removed'
2526

2627
local rcall = redis.call
28+
29+
-- Includes
30+
--- @include "includes/removeDebounceKey"
31+
2732
local lockKey = KEYS[8] .. ':lock'
28-
local lock = redis.call("GET", lockKey)
33+
local lock = rcall("GET", lockKey)
2934
if not lock then -- or (lock == ARGV[2])) then
3035
local jobId = ARGV[1]
3136
rcall("LREM", KEYS[1], 0, jobId)
@@ -35,6 +40,9 @@ if not lock then -- or (lock == ARGV[2])) then
3540
rcall("ZREM", KEYS[5], jobId)
3641
rcall("ZREM", KEYS[6], jobId)
3742
rcall("ZREM", KEYS[7], jobId)
43+
44+
removeDebounceKey(KEYS[11], KEYS[8])
45+
3846
rcall("DEL", KEYS[8])
3947
rcall("DEL", KEYS[9])
4048

lib/job.js

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ const Job = function(queue, name, data, opts) {
5757
this.attemptsMade = 0;
5858

5959
this.toKey = _.bind(queue.toKey, queue);
60+
this.debounceId = this.opts.debounce ? this.opts.debounce.id : undefined;
6061
};
6162

6263
function setDefaultOpts(opts) {
@@ -82,7 +83,8 @@ function addJob(queue, client, job) {
8283
return scripts.addJob(client, queue, jobData, {
8384
lifo: opts.lifo,
8485
customJobId: opts.jobId,
85-
priority: opts.priority
86+
priority: opts.priority,
87+
debounce: opts.debounce
8688
});
8789
}
8890

@@ -182,6 +184,7 @@ Job.prototype.toJSON = function() {
182184
failedReason: this.failedReason,
183185
stacktrace: this.stacktrace || null,
184186
returnvalue: this.returnvalue || null,
187+
debounceId: this.debounceId || null,
185188
finishedOn: this.finishedOn || null,
186189
processedOn: this.processedOn || null
187190
};
@@ -641,6 +644,10 @@ Job.fromJSON = function(queue, json, jobId) {
641644
job.returnvalue = getReturnValue(json.returnvalue);
642645
}
643646

647+
if (json.deid) {
648+
job.debounceId = json.deid;
649+
}
650+
644651
return job;
645652
};
646653

0 commit comments

Comments
 (0)