Skip to content

Commit f892b41

Browse files
author
Juuso Mäyränen
committed
Refactor redis connection handling for pattern_list
1 parent eea0dd1 commit f892b41

File tree

1 file changed

+14
-20
lines changed

1 file changed

+14
-20
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -267,8 +267,8 @@ def list_pattern_stop
267267
# private
268268
def worker_consume(output_queue, key)
269269
@logger.debug("Launched worker for #{key}")
270-
redis = new_redis_instance
271270
begin
271+
redis ||= connect
272272
(0...@pattern_list_max_items).each do
273273
if stop?
274274
@logger.debug("Breaking from thread #{key} as it was requested to stop")
@@ -278,6 +278,10 @@ def worker_consume(output_queue, key)
278278
break if value.nil?
279279
queue_event(value, output_queue)
280280
end
281+
rescue ::Redis::BaseError => e
282+
@logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e)
283+
sleep 1
284+
return
281285
ensure
282286
redis.quit rescue nil
283287
end
@@ -304,31 +308,21 @@ def launch_worker(output_queue, key)
304308
# private
305309
def ensure_workers(output_queue)
306310
return unless threadpool_capacity?
307-
@redis.keys(@key).shuffle.each do |key|
308-
next if @current_workers.include?(key)
309-
launch_worker(output_queue, key)
310-
break unless threadpool_capacity?
311+
redis_runner do
312+
@redis.keys(@key).shuffle.each do |key|
313+
next if @current_workers.include?(key)
314+
launch_worker(output_queue, key)
315+
break unless threadpool_capacity?
316+
end
311317
end
312318
end
313319

314-
# private
315-
def init_list_pattern_runner
316-
init_threadpool
317-
@redis ||= connect
318-
end
319-
320320
# private
321321
def list_pattern_runner(output_queue)
322322
while !stop?
323-
begin
324-
init_list_pattern_runner if @redis.nil?
325-
ensure_workers(output_queue)
326-
sleep(@pattern_list_threadpool_sleep)
327-
rescue ::Redis::BaseError => e
328-
@logger.warn("Redis connection problem", :exception => e)
329-
@redis = nil
330-
sleep 1
331-
end
323+
init_threadpool if @threadpool.nil?
324+
ensure_workers(output_queue)
325+
sleep(@pattern_list_threadpool_sleep)
332326
end
333327
end
334328

0 commit comments

Comments
 (0)