Skip to content

Commit 9b12e34

Browse files
author
Juuso Mäyränen
committed
Refactor a bit and fix off-by-one
1 parent b577a25 commit 9b12e34

File tree

1 file changed

+15
-10
lines changed

1 file changed

+15
-10
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ def list_pattern_stop
260260
end
261261

262262
# private
263-
def launch_worker(output_queue, key)
263+
def worker_consume(output_queue, key)
264264
@logger.debug("Launched worker for #{key}")
265265
redis = new_redis_instance
266266
begin
267-
(0..@max_items_per_worker).each do
267+
(0..@max_items_per_worker-1).each do
268268
if stop?
269269
@logger.debug("Breaking from thread #{key} as it was requested to stop")
270270
break
@@ -285,21 +285,26 @@ def threadpool_capacity?
285285
@threadpool.remaining_capacity > 0
286286
end
287287

288+
# private
289+
def launch_worker(output_queue, key)
290+
@current_workers.add(key)
291+
@threadpool.post do
292+
begin
293+
worker_consume(output_queue, key)
294+
ensure
295+
@current_workers.delete(key)
296+
end
297+
end
298+
end
299+
288300
# private
289301
def ensure_workers(output_queue)
290302
return unless threadpool_capacity?
291303
keys = @redis.keys(@key)
292304
keys.shuffle
293305
keys.each do |key|
294306
next if @current_workers.include?(key)
295-
@current_workers.add(key)
296-
@threadpool.post do
297-
begin
298-
launch_worker(output_queue, key)
299-
ensure
300-
@current_workers.delete(key)
301-
end
302-
end
307+
launch_worker(output_queue, key)
303308
break unless threadpool_capacity?
304309
end
305310
end

0 commit comments

Comments
 (0)