@@ -947,25 +947,39 @@ impl<H: DurableExecution> Queue<H> {
947947 local job_data_hash = KEYS[3]
948948 local results_hash = KEYS[4] -- e.g., "myqueue:results"
949949 local dedupe_set_name = KEYS[5]
950+ local active_hash = KEYS[6]
951+ local pending_list = KEYS[7]
952+ local delayed_zset = KEYS[8]
950953
951954 local max_len = tonumber(ARGV[1])
952955
953956 local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
957+ local actually_deleted = 0
954958
955959 if #job_ids_to_delete > 0 then
956960 for _, j_id in ipairs(job_ids_to_delete) do
957- local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
958- local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
959-
960- redis.call('SREM', dedupe_set_name, j_id)
961- redis.call('HDEL', job_data_hash, j_id)
962- redis.call('DEL', job_meta_hash)
963- redis.call('HDEL', results_hash, j_id)
964- redis.call('DEL', errors_list_name)
961+ -- CRITICAL FIX: Check if this job_id is currently active/pending/delayed
962+ -- This prevents the race where we prune metadata for a job that's currently running
963+ local is_active = redis.call('HEXISTS', active_hash, j_id) == 1
964+ local is_pending = redis.call('LPOS', pending_list, j_id) ~= nil
965+ local is_delayed = redis.call('ZSCORE', delayed_zset, j_id) ~= nil
966+
967+ -- Only delete if the job is NOT currently in the system
968+ if not is_active and not is_pending and not is_delayed then
969+ local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
970+ local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
971+
972+ redis.call('SREM', dedupe_set_name, j_id)
973+ redis.call('HDEL', job_data_hash, j_id)
974+ redis.call('DEL', job_meta_hash)
975+ redis.call('HDEL', results_hash, j_id)
976+ redis.call('DEL', errors_list_name)
977+ actually_deleted = actually_deleted + 1
978+ end
965979 end
966980 redis.call('LTRIM', list_name, 0, max_len - 1)
967981 end
968- return #job_ids_to_delete
982+ return actually_deleted
969983 "# ,
970984 ) ;
971985
@@ -975,6 +989,9 @@ impl<H: DurableExecution> Queue<H> {
975989 . key ( self . job_data_hash_name ( ) )
976990 . key ( self . job_result_hash_name ( ) ) // results_hash
977991 . key ( self . dedupe_set_name ( ) )
992+ . key ( self . active_hash_name ( ) ) // Check if job is active
993+ . key ( self . pending_list_name ( ) ) // Check if job is pending
994+ . key ( self . delayed_zset_name ( ) ) // Check if job is delayed
978995 . arg ( self . options . max_success ) // max_len (LTRIM is 0 to max_success-1)
979996 . invoke_async ( & mut self . redis . clone ( ) )
980997 . await ?;
@@ -1099,24 +1116,37 @@ impl<H: DurableExecution> Queue<H> {
10991116 local list_name = KEYS[2]
11001117 local job_data_hash = KEYS[3]
11011118 local dedupe_set_name = KEYS[4]
1119+ local active_hash = KEYS[5]
1120+ local pending_list = KEYS[6]
1121+ local delayed_zset = KEYS[7]
11021122
11031123 local max_len = tonumber(ARGV[1])
11041124
11051125 local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
1126+ local actually_deleted = 0
11061127
11071128 if #job_ids_to_delete > 0 then
11081129 for _, j_id in ipairs(job_ids_to_delete) do
1109- local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
1110- local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
1111-
1112- redis.call('SREM', dedupe_set_name, j_id)
1113- redis.call('HDEL', job_data_hash, j_id)
1114- redis.call('DEL', job_meta_hash)
1115- redis.call('DEL', errors_list_name)
1130+ -- CRITICAL FIX: Check if this job_id is currently active/pending/delayed
1131+ local is_active = redis.call('HEXISTS', active_hash, j_id) == 1
1132+ local is_pending = redis.call('LPOS', pending_list, j_id) ~= nil
1133+ local is_delayed = redis.call('ZSCORE', delayed_zset, j_id) ~= nil
1134+
1135+ -- Only delete if the job is NOT currently in the system
1136+ if not is_active and not is_pending and not is_delayed then
1137+ local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
1138+ local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
1139+
1140+ redis.call('SREM', dedupe_set_name, j_id)
1141+ redis.call('HDEL', job_data_hash, j_id)
1142+ redis.call('DEL', job_meta_hash)
1143+ redis.call('DEL', errors_list_name)
1144+ actually_deleted = actually_deleted + 1
1145+ end
11161146 end
11171147 redis.call('LTRIM', list_name, 0, max_len - 1)
11181148 end
1119- return #job_ids_to_delete
1149+ return actually_deleted
11201150 "# ,
11211151 ) ;
11221152
@@ -1125,6 +1155,9 @@ impl<H: DurableExecution> Queue<H> {
11251155 . key ( self . failed_list_name ( ) )
11261156 . key ( self . job_data_hash_name ( ) )
11271157 . key ( self . dedupe_set_name ( ) )
1158+ . key ( self . active_hash_name ( ) ) // Check if job is active
1159+ . key ( self . pending_list_name ( ) ) // Check if job is pending
1160+ . key ( self . delayed_zset_name ( ) ) // Check if job is delayed
11281161 . arg ( self . options . max_failed )
11291162 . invoke_async ( & mut self . redis . clone ( ) )
11301163 . await ?;
0 commit comments