Skip to content

Commit 3774a4b

Browse files
authored
Refactor job deletion logic in TWQM to prevent removing active, pending, or delayed jobs. Update EOA executor error handling for balance threshold updates. (#61)
1 parent 7810d96 commit 3774a4b

File tree

4 files changed

+447
-40
lines changed

4 files changed

+447
-40
lines changed

executors/src/eoa/worker/send.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -262,10 +262,10 @@ impl<C: Chain> EoaExecutorWorker<C> {
262262
if should_update_balance_threshold(inner_error) {
263263
balance_threshold_update_needed = true;
264264
}
265-
} else if let EoaExecutorWorkerError::RpcError { inner_error, .. } = &e {
266-
if should_update_balance_threshold(inner_error) {
267-
balance_threshold_update_needed = true;
268-
}
265+
} else if let EoaExecutorWorkerError::RpcError { inner_error, .. } = &e
266+
&& should_update_balance_threshold(inner_error)
267+
{
268+
balance_threshold_update_needed = true;
269269
}
270270

271271
// For deterministic build failures, fail the transaction immediately
@@ -284,10 +284,8 @@ impl<C: Chain> EoaExecutorWorker<C> {
284284
}
285285
}
286286

287-
if balance_threshold_update_needed {
288-
if let Err(e) = self.update_balance_threshold().await {
289-
tracing::error!(error = ?e, "Failed to update balance threshold");
290-
}
287+
if balance_threshold_update_needed && let Err(e) = self.update_balance_threshold().await {
288+
tracing::error!(error = ?e, "Failed to update balance threshold");
291289
}
292290

293291
Ok(cleaned_results)

twmq/src/lib.rs

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -947,25 +947,39 @@ impl<H: DurableExecution> Queue<H> {
947947
local job_data_hash = KEYS[3]
948948
local results_hash = KEYS[4] -- e.g., "myqueue:results"
949949
local dedupe_set_name = KEYS[5]
950+
local active_hash = KEYS[6]
951+
local pending_list = KEYS[7]
952+
local delayed_zset = KEYS[8]
950953
951954
local max_len = tonumber(ARGV[1])
952955
953956
local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
957+
local actually_deleted = 0
954958
955959
if #job_ids_to_delete > 0 then
956960
for _, j_id in ipairs(job_ids_to_delete) do
957-
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
958-
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
959-
960-
redis.call('SREM', dedupe_set_name, j_id)
961-
redis.call('HDEL', job_data_hash, j_id)
962-
redis.call('DEL', job_meta_hash)
963-
redis.call('HDEL', results_hash, j_id)
964-
redis.call('DEL', errors_list_name)
961+
-- CRITICAL FIX: Check if this job_id is currently active/pending/delayed
962+
-- This prevents the race where we prune metadata for a job that's currently running
963+
local is_active = redis.call('HEXISTS', active_hash, j_id) == 1
964+
local is_pending = redis.call('LPOS', pending_list, j_id) ~= nil
965+
local is_delayed = redis.call('ZSCORE', delayed_zset, j_id) ~= nil
966+
967+
-- Only delete if the job is NOT currently in the system
968+
if not is_active and not is_pending and not is_delayed then
969+
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
970+
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
971+
972+
redis.call('SREM', dedupe_set_name, j_id)
973+
redis.call('HDEL', job_data_hash, j_id)
974+
redis.call('DEL', job_meta_hash)
975+
redis.call('HDEL', results_hash, j_id)
976+
redis.call('DEL', errors_list_name)
977+
actually_deleted = actually_deleted + 1
978+
end
965979
end
966980
redis.call('LTRIM', list_name, 0, max_len - 1)
967981
end
968-
return #job_ids_to_delete
982+
return actually_deleted
969983
"#,
970984
);
971985

@@ -975,6 +989,9 @@ impl<H: DurableExecution> Queue<H> {
975989
.key(self.job_data_hash_name())
976990
.key(self.job_result_hash_name()) // results_hash
977991
.key(self.dedupe_set_name())
992+
.key(self.active_hash_name()) // Check if job is active
993+
.key(self.pending_list_name()) // Check if job is pending
994+
.key(self.delayed_zset_name()) // Check if job is delayed
978995
.arg(self.options.max_success) // max_len (LTRIM is 0 to max_success-1)
979996
.invoke_async(&mut self.redis.clone())
980997
.await?;
@@ -1099,24 +1116,37 @@ impl<H: DurableExecution> Queue<H> {
10991116
local list_name = KEYS[2]
11001117
local job_data_hash = KEYS[3]
11011118
local dedupe_set_name = KEYS[4]
1119+
local active_hash = KEYS[5]
1120+
local pending_list = KEYS[6]
1121+
local delayed_zset = KEYS[7]
11021122
11031123
local max_len = tonumber(ARGV[1])
11041124
11051125
local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
1126+
local actually_deleted = 0
11061127
11071128
if #job_ids_to_delete > 0 then
11081129
for _, j_id in ipairs(job_ids_to_delete) do
1109-
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
1110-
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
1111-
1112-
redis.call('SREM', dedupe_set_name, j_id)
1113-
redis.call('HDEL', job_data_hash, j_id)
1114-
redis.call('DEL', job_meta_hash)
1115-
redis.call('DEL', errors_list_name)
1130+
-- CRITICAL FIX: Check if this job_id is currently active/pending/delayed
1131+
local is_active = redis.call('HEXISTS', active_hash, j_id) == 1
1132+
local is_pending = redis.call('LPOS', pending_list, j_id) ~= nil
1133+
local is_delayed = redis.call('ZSCORE', delayed_zset, j_id) ~= nil
1134+
1135+
-- Only delete if the job is NOT currently in the system
1136+
if not is_active and not is_pending and not is_delayed then
1137+
local errors_list_name = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':errors'
1138+
local job_meta_hash = 'twmq:' .. queue_id .. ':job:' .. j_id .. ':meta'
1139+
1140+
redis.call('SREM', dedupe_set_name, j_id)
1141+
redis.call('HDEL', job_data_hash, j_id)
1142+
redis.call('DEL', job_meta_hash)
1143+
redis.call('DEL', errors_list_name)
1144+
actually_deleted = actually_deleted + 1
1145+
end
11161146
end
11171147
redis.call('LTRIM', list_name, 0, max_len - 1)
11181148
end
1119-
return #job_ids_to_delete
1149+
return actually_deleted
11201150
"#,
11211151
);
11221152

@@ -1125,6 +1155,9 @@ impl<H: DurableExecution> Queue<H> {
11251155
.key(self.failed_list_name())
11261156
.key(self.job_data_hash_name())
11271157
.key(self.dedupe_set_name())
1158+
.key(self.active_hash_name()) // Check if job is active
1159+
.key(self.pending_list_name()) // Check if job is pending
1160+
.key(self.delayed_zset_name()) // Check if job is delayed
11281161
.arg(self.options.max_failed)
11291162
.invoke_async(&mut self.redis.clone())
11301163
.await?;

twmq/src/multilane.rs

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -961,25 +961,51 @@ impl<H: DurableExecution> MultilaneQueue<H> {
961961
local job_data_hash = KEYS[3]
962962
local results_hash = KEYS[4]
963963
local dedupe_set_name = KEYS[5]
964+
local lanes_zset = KEYS[6]
964965
965966
local max_len = tonumber(ARGV[1])
966967
967968
local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
969+
local actually_deleted = 0
968970
969971
if #job_ids_to_delete > 0 then
970972
for _, j_id in ipairs(job_ids_to_delete) do
973+
-- Get the lane_id for this job to check if it's active/pending/delayed
971974
local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta'
972-
local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors'
973-
974-
redis.call('SREM', dedupe_set_name, j_id)
975-
redis.call('HDEL', job_data_hash, j_id)
976-
redis.call('DEL', job_meta_hash)
977-
redis.call('HDEL', results_hash, j_id)
978-
redis.call('DEL', errors_list_name)
975+
local lane_id = redis.call('HGET', job_meta_hash, 'lane_id')
976+
977+
local should_delete = true
978+
979+
if lane_id then
980+
-- Check if job is in any active state for this lane
981+
local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active'
982+
local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending'
983+
local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed'
984+
985+
local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1
986+
local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil
987+
local is_delayed = redis.call('ZSCORE', lane_delayed_zset, j_id) ~= nil
988+
989+
-- Don't delete if job is currently in the system
990+
if is_active or is_pending or is_delayed then
991+
should_delete = false
992+
end
993+
end
994+
995+
if should_delete then
996+
local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors'
997+
998+
redis.call('SREM', dedupe_set_name, j_id)
999+
redis.call('HDEL', job_data_hash, j_id)
1000+
redis.call('DEL', job_meta_hash)
1001+
redis.call('HDEL', results_hash, j_id)
1002+
redis.call('DEL', errors_list_name)
1003+
actually_deleted = actually_deleted + 1
1004+
end
9791005
end
9801006
redis.call('LTRIM', list_name, 0, max_len - 1)
9811007
end
982-
return #job_ids_to_delete
1008+
return actually_deleted
9831009
"#,
9841010
);
9851011

@@ -989,6 +1015,7 @@ impl<H: DurableExecution> MultilaneQueue<H> {
9891015
.key(self.job_data_hash_name())
9901016
.key(self.job_result_hash_name())
9911017
.key(self.dedupe_set_name())
1018+
.key(self.lanes_zset_name()) // Need to check lanes
9921019
.arg(self.options.max_success)
9931020
.invoke_async(&mut self.redis.clone())
9941021
.await?;
@@ -1087,20 +1114,45 @@ impl<H: DurableExecution> MultilaneQueue<H> {
10871114
local max_len = tonumber(ARGV[1])
10881115
10891116
local job_ids_to_delete = redis.call('LRANGE', list_name, max_len, -1)
1117+
local actually_deleted = 0
10901118
10911119
if #job_ids_to_delete > 0 then
10921120
for _, j_id in ipairs(job_ids_to_delete) do
1093-
local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors'
1121+
-- Get the lane_id for this job to check if it's active/pending/delayed
10941122
local job_meta_hash = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':meta'
1095-
1096-
redis.call('SREM', dedupe_set_name, j_id)
1097-
redis.call('HDEL', job_data_hash, j_id)
1098-
redis.call('DEL', job_meta_hash)
1099-
redis.call('DEL', errors_list_name)
1123+
local lane_id = redis.call('HGET', job_meta_hash, 'lane_id')
1124+
1125+
local should_delete = true
1126+
1127+
if lane_id then
1128+
-- Check if job is in any active state for this lane
1129+
local lane_active_hash = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':active'
1130+
local lane_pending_list = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':pending'
1131+
local lane_delayed_zset = 'twmq_multilane:' .. queue_id .. ':lane:' .. lane_id .. ':delayed'
1132+
1133+
local is_active = redis.call('HEXISTS', lane_active_hash, j_id) == 1
1134+
local is_pending = redis.call('LPOS', lane_pending_list, j_id) ~= nil
1135+
local is_delayed = redis.call('ZSCORE', lane_delayed_zset, j_id) ~= nil
1136+
1137+
-- Don't delete if job is currently in the system
1138+
if is_active or is_pending or is_delayed then
1139+
should_delete = false
1140+
end
1141+
end
1142+
1143+
if should_delete then
1144+
local errors_list_name = 'twmq_multilane:' .. queue_id .. ':job:' .. j_id .. ':errors'
1145+
1146+
redis.call('SREM', dedupe_set_name, j_id)
1147+
redis.call('HDEL', job_data_hash, j_id)
1148+
redis.call('DEL', job_meta_hash)
1149+
redis.call('DEL', errors_list_name)
1150+
actually_deleted = actually_deleted + 1
1151+
end
11001152
end
11011153
redis.call('LTRIM', list_name, 0, max_len - 1)
11021154
end
1103-
return #job_ids_to_delete
1155+
return actually_deleted
11041156
"#,
11051157
);
11061158

0 commit comments

Comments
 (0)