Skip to content

Commit c0d4c9f

Browse files
author
Juuso Mäyränen
committed
Support batches in list pattern
1 parent f892b41 commit c0d4c9f

File tree

1 file changed

+46
-19
lines changed

1 file changed

+46
-19
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,6 @@ def register
122122
@stop_method = method(:subscribe_stop)
123123
end
124124

125-
@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
126-
127125
@identity = "#{@redis_url} #{@data_type}:#{@key}"
128126
@logger.info("Registering Redis", :identity => @identity)
129127
end # def register
@@ -147,7 +145,7 @@ def batched?
147145

148146
# private
149147
def is_list_type?
150-
@data_type == 'list'
148+
@data_type == 'list' || @data_type == 'pattern_list'
151149
end
152150

153151
# private
@@ -235,6 +233,7 @@ def list_stop
235233

236234
# private
237235
def list_runner(output_queue)
236+
@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
238237
while !stop?
239238
begin
240239
@redis ||= connect
@@ -264,28 +263,53 @@ def list_pattern_stop
264263
reset_threadpool
265264
end
266265

266+
# private
267+
def list_pattern_process_item(redis, output_queue, key)
268+
if stop?
269+
@logger.debug("Breaking from thread #{key} as it was requested to stop")
270+
return false
271+
end
272+
value = redis.lpop(key)
273+
return false if value.nil?
274+
queue_event(value, output_queue)
275+
true
276+
end
277+
278+
# private
279+
def list_pattern_single_processor(redis, output_queue, key)
280+
(0...@pattern_list_max_items).each do
281+
break unless list_pattern_process_item(redis, output_queue, key)
282+
end
283+
end
284+
285+
# private
286+
def list_pattern_batch_processor(redis, output_queue, key)
287+
items_left = @pattern_list_max_items
288+
while items_left > 0
289+
limit = [items_left, @batch_count].min
290+
processed = process_batch(redis, output_queue, key, limit, 0)
291+
@logger.warn("Got batch size #{processed} for #{key}")
292+
if processed.zero? || processed < limit
293+
return
294+
end
295+
items_left -= processed
296+
end
297+
end
298+
267299
# private
268300
def worker_consume(output_queue, key)
269301
@logger.debug("Launched worker for #{key}")
270302
begin
271303
redis ||= connect
272-
(0...@pattern_list_max_items).each do
273-
if stop?
274-
@logger.debug("Breaking from thread #{key} as it was requested to stop")
275-
break
276-
end
277-
value = redis.lpop(key)
278-
break if value.nil?
279-
queue_event(value, output_queue)
280-
end
304+
@list_pattern_processor.call(redis, output_queue, key)
281305
rescue ::Redis::BaseError => e
282306
@logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e)
283307
sleep 1
284308
return
285309
ensure
286310
redis.quit rescue nil
287311
end
288-
@logger.debug("Exit worker for #{key}")
312+
@logger.warn("Exit worker for #{key}")
289313
end
290314

291315
# private
@@ -319,23 +343,22 @@ def ensure_workers(output_queue)
319343

320344
# private
321345
def list_pattern_runner(output_queue)
346+
@list_pattern_processor = batched? ? method(:list_pattern_batch_processor) : method(:list_pattern_single_processor)
322347
while !stop?
323348
init_threadpool if @threadpool.nil?
324349
ensure_workers(output_queue)
325350
sleep(@pattern_list_threadpool_sleep)
326351
end
327352
end
328353

329-
def list_batch_listener(redis, output_queue)
354+
def process_batch(redis, output_queue, key, batch_size, sleep_time)
330355
begin
331-
results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])
356+
results = redis.evalsha(@redis_script_sha, [key], [batch_size-1])
332357
results.each do |item|
333358
queue_event(item, output_queue)
334359
end
335-
336-
if results.size.zero?
337-
sleep BATCH_EMPTY_SLEEP
338-
end
360+
sleep sleep_time if results.size.zero? && sleep_time > 0
361+
results.size
339362

340363
# Below is a commented-out implementation of 'batch fetch'
341364
# using pipelined LPOP calls. This in practice has been observed to
@@ -364,6 +387,10 @@ def list_batch_listener(redis, output_queue)
364387
end
365388
end
366389

390+
def list_batch_listener(redis, output_queue)
391+
process_batch(redis, output_queue, @key, @batch_count, BATCH_EMPTY_SLEEP)
392+
end
393+
367394
def list_single_listener(redis, output_queue)
368395
item = redis.blpop(@key, 0, :timeout => 1)
369396
return unless item # from timeout or other conditions

0 commit comments

Comments
 (0)