@@ -49,11 +49,19 @@ def self.base_campaign_url(campaign_id)
49
49
Settings . spoke . base_campaign_url ? sprintf ( Settings . spoke . base_campaign_url , campaign_id . to_s ) : nil
50
50
end
51
51
52
- def self . worker_currenly_running ?( method_name )
52
+ def self . worker_currently_running ?( method_name , sync_id )
53
53
workers = Sidekiq ::Workers . new
54
54
workers . each do |_process_id , _thread_id , work |
55
- matched_process = work [ "payload" ] [ "args" ] = [ SYSTEM_NAME , method_name ]
56
- if matched_process
55
+ args = work [ "payload" ] [ "args" ]
56
+ worker_sync_id = ( args . count > 0 ) ? args [ 0 ] : nil
57
+ worker_sync = worker_sync_id ? Sync . find_by ( id : worker_sync_id ) : nil
58
+ next unless worker_sync
59
+ worker_system = worker_sync . external_system
60
+ worker_method_name = JSON . parse ( worker_sync . external_system_params ) [ "pull_job" ]
61
+ already_running = ( worker_system == SYSTEM_NAME &&
62
+ worker_method_name == method_name &&
63
+ worker_sync_id != sync_id )
64
+ if already_running
57
65
puts ">>> #{ SYSTEM_NAME . titleize } #{ method_name } skipping as worker already running ..."
58
66
return true
59
67
end
@@ -79,7 +87,10 @@ def self.pull(sync_id, external_system_params)
79
87
80
88
def self . fetch_new_messages ( sync_id , force : false )
81
89
## Do not run method if another worker is currently processing this method
82
- yield 0 , { } , { } , true if self . worker_currenly_running? ( __method__ . to_s )
90
+ if self . worker_currently_running? ( __method__ . to_s , sync_id )
91
+ yield 0 , { } , { } , true
92
+ return
93
+ end
83
94
84
95
started_at = DateTime . now
85
96
last_created_at = Time . parse ( $redis. with { |r | r . get 'spoke:messages:last_created_at' } || '2019-01-01 00:00:00' )
@@ -188,7 +199,10 @@ def self.handle_new_message(sync_id, message_id)
188
199
189
200
def self . fetch_new_opt_outs ( sync_id , force : false )
190
201
## Do not run method if another worker is currently processing this method
191
- yield 0 , { } , { } , true if self . worker_currenly_running? ( __method__ . to_s )
202
+ if self . worker_currently_running? ( __method__ . to_s , sync_id )
203
+ yield 0 , { } , { } , true
204
+ return
205
+ end
192
206
193
207
if Settings . spoke . subscription_id
194
208
started_at = DateTime . now
@@ -245,7 +259,10 @@ def self.handle_new_opt_out(sync_id, opt_out_id)
245
259
246
260
def self . fetch_active_campaigns ( sync_id , force : false )
247
261
## Do not run method if another worker is currently processing this method
248
- yield 0 , { } , { } , true if self . worker_currenly_running? ( __method__ . to_s )
262
+ if self . worker_currently_running? ( __method__ . to_s , sync_id )
263
+ yield 0 , { } , { } , true
264
+ return
265
+ end
249
266
250
267
active_campaigns = IdentitySpoke ::Campaign . active
251
268
0 commit comments