@@ -112,8 +112,8 @@ def register
112
112
@run_method = method ( :list_runner )
113
113
@stop_method = method ( :list_stop )
114
114
elsif @data_type == 'pattern_list'
115
- @run_method = method ( :list_pattern_runner )
116
- @stop_method = method ( :list_pattern_stop )
115
+ @run_method = method ( :pattern_list_runner )
116
+ @stop_method = method ( :pattern_list_stop )
117
117
elsif @data_type == 'channel'
118
118
@run_method = method ( :channel_runner )
119
119
@stop_method = method ( :subscribe_stop )
@@ -258,13 +258,13 @@ def reset_threadpool
258
258
end
259
259
260
260
# private
261
- def list_pattern_stop
261
+ def pattern_list_stop
262
262
reset_redis
263
263
reset_threadpool
264
264
end
265
265
266
266
# private
267
- def list_pattern_process_item ( redis , output_queue , key )
267
+ def pattern_list_process_item ( redis , output_queue , key )
268
268
if stop?
269
269
@logger . debug ( "Breaking from thread #{ key } as it was requested to stop" )
270
270
return false
@@ -276,14 +276,14 @@ def list_pattern_process_item(redis, output_queue, key)
276
276
end
277
277
278
278
# private
279
- def list_pattern_single_processor ( redis , output_queue , key )
279
+ def pattern_list_single_processor ( redis , output_queue , key )
280
280
( 0 ...@pattern_list_max_items ) . each do
281
- break unless list_pattern_process_item ( redis , output_queue , key )
281
+ break unless pattern_list_process_item ( redis , output_queue , key )
282
282
end
283
283
end
284
284
285
285
# private
286
- def list_pattern_batch_processor ( redis , output_queue , key )
286
+ def pattern_list_batch_processor ( redis , output_queue , key )
287
287
items_left = @pattern_list_max_items
288
288
while items_left > 0
289
289
limit = [ items_left , @batch_count ] . min
@@ -297,11 +297,11 @@ def list_pattern_batch_processor(redis, output_queue, key)
297
297
end
298
298
299
299
# private
300
- def worker_consume ( output_queue , key )
300
+ def pattern_list_worker_consume ( output_queue , key )
301
301
@logger . debug ( "Launched worker for #{ key } " )
302
302
begin
303
303
redis ||= connect
304
- @list_pattern_processor . call ( redis , output_queue , key )
304
+ @pattern_list_processor . call ( redis , output_queue , key )
305
305
rescue ::Redis ::BaseError => e
306
306
@logger . warn ( "Redis connection problem in thread for key #{ key } . Sleeping a while before exiting thread." , :exception => e )
307
307
sleep 1
@@ -318,35 +318,35 @@ def threadpool_capacity?
318
318
end
319
319
320
320
# private
321
- def launch_worker ( output_queue , key )
321
+ def pattern_list_launch_worker ( output_queue , key )
322
322
@current_workers . add ( key )
323
323
@threadpool . post do
324
324
begin
325
- worker_consume ( output_queue , key )
325
+ pattern_list_worker_consume ( output_queue , key )
326
326
ensure
327
327
@current_workers . delete ( key )
328
328
end
329
329
end
330
330
end
331
331
332
332
# private
333
- def ensure_workers ( output_queue )
333
+ def pattern_list_ensure_workers ( output_queue )
334
334
return unless threadpool_capacity?
335
335
redis_runner do
336
336
@redis . keys ( @key ) . shuffle . each do |key |
337
337
next if @current_workers . include? ( key )
338
- launch_worker ( output_queue , key )
338
+ pattern_list_launch_worker ( output_queue , key )
339
339
break unless threadpool_capacity?
340
340
end
341
341
end
342
342
end
343
343
344
344
# private
345
- def list_pattern_runner ( output_queue )
346
- @list_pattern_processor = batched? ? method ( :list_pattern_batch_processor ) : method ( :list_pattern_single_processor )
345
+ def pattern_list_runner ( output_queue )
346
+ @pattern_list_processor = batched? ? method ( :pattern_list_batch_processor ) : method ( :pattern_list_single_processor )
347
347
while !stop?
348
348
init_threadpool if @threadpool . nil?
349
- ensure_workers ( output_queue )
349
+ pattern_list_ensure_workers ( output_queue )
350
350
sleep ( @pattern_list_threadpool_sleep )
351
351
end
352
352
end
0 commit comments