Skip to content

Commit f7ef5fd

Browse files
author
Juuso Mäyränen
committed
Support list patterns
1 parent 73f0577 commit f7ef5fd

File tree

2 files changed

+247
-14
lines changed

2 files changed

+247
-14
lines changed

lib/logstash/inputs/redis.rb

Lines changed: 146 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
require "logstash/inputs/base"
44
require "logstash/inputs/threadable"
55
require 'redis'
6+
require 'concurrent'
7+
require 'concurrent/executors'
68

79
# This input will read events from a Redis instance; it supports both Redis channels and lists.
810
# The list command (BLPOP) used by Logstash is supported in Redis v1.3.1+, and
@@ -49,16 +51,30 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
4951
config :key, :validate => :string, :required => true
5052

5153
# Specify either list or channel. If `data_type` is `list`, then we will BLPOP the
52-
# key. If `data_type` is `channel`, then we will SUBSCRIBE to the key.
53-
# If `data_type` is `pattern_channel`, then we will PSUBSCRIBE to the key.
54-
config :data_type, :validate => [ "list", "channel", "pattern_channel" ], :required => true
54+
# key. If `data_type` is `pattern_list`, then we will spawn a number of worker
55+
# threads that will LPOP from keys matching that pattern. If `data_type` is
56+
# `channel`, then we will SUBSCRIBE to the key. If `data_type` is `pattern_channel`,
57+
# then we will PSUBSCRIBE to the key.
58+
config :data_type, :validate => [ "list", "pattern_list", "channel", "pattern_channel" ], :required => true
5559

5660
# The number of events to return from Redis using EVAL.
5761
config :batch_count, :validate => :number, :default => 125
5862

5963
# Redefined Redis commands to be passed to the Redis client.
6064
config :command_map, :validate => :hash, :default => {}
6165

66+
# Maximum number of worker threads to spawn when using `data_type` `pattern_list`.
67+
config :pattern_list_threads, :validate => :number, :default => 20
68+
69+
# Maximum number of items for a single worker thread to process when `data_type` is `pattern_list`.
70+
# After the list is empty or this number of items have been processed, the thread will exit and a
71+
# new one will be started if there are non-empty lists matching the pattern without a consumer.
72+
config :pattern_list_max_items, :validate => :number, :default => 1000
73+
74+
# Time to sleep in main loop after checking if more threads can/need to be spawned.
75+
# Applies to `data_type` is `pattern_list`
76+
config :pattern_list_threadpool_sleep, :validate => :number, :default => 0.2
77+
6278
public
6379
# public API
6480
# use to store a proc that can provide a Redis instance or mock
@@ -77,6 +93,15 @@ def new_redis_instance
7793
@redis_builder.call
7894
end
7995

96+
def init_threadpool
97+
@threadpool ||= Concurrent::ThreadPoolExecutor.new(
98+
min_threads: @pattern_list_threads,
99+
max_threads: @pattern_list_threads,
100+
max_queue: 2 * @pattern_list_threads
101+
)
102+
@current_workers ||= Concurrent::Set.new
103+
end
104+
80105
def register
81106
@redis_url = @path.nil? ? "redis://#{@password}@#{@host}:#{@port}/#{@db}" : "#{@password}@#{@path}/#{@db}"
82107

@@ -86,6 +111,9 @@ def register
86111
if @data_type == 'list' || @data_type == 'dummy'
87112
@run_method = method(:list_runner)
88113
@stop_method = method(:list_stop)
114+
elsif @data_type == 'pattern_list'
115+
@run_method = method(:pattern_list_runner)
116+
@stop_method = method(:pattern_list_stop)
89117
elsif @data_type == 'channel'
90118
@run_method = method(:channel_runner)
91119
@stop_method = method(:subscribe_stop)
@@ -94,8 +122,6 @@ def register
94122
@stop_method = method(:subscribe_stop)
95123
end
96124

97-
@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
98-
99125
@identity = "#{@redis_url} #{@data_type}:#{@key}"
100126
@logger.info("Registering Redis", :identity => @identity)
101127
end # def register
@@ -119,7 +145,7 @@ def batched?
119145

120146
# private
121147
def is_list_type?
122-
@data_type == 'list'
148+
@data_type == 'list' || @data_type == 'pattern_list'
123149
end
124150

125151
# private
@@ -193,15 +219,21 @@ def queue_event(msg, output_queue, channel=nil)
193219
end
194220

195221
# private
196-
def list_stop
222+
def reset_redis
197223
return if @redis.nil? || !@redis.connected?
198224

199225
@redis.quit rescue nil
200226
@redis = nil
201227
end
202228

229+
# private
230+
def list_stop
231+
reset_redis
232+
end
233+
203234
# private
204235
def list_runner(output_queue)
236+
@list_method = batched? ? method(:list_batch_listener) : method(:list_single_listener)
205237
while !stop?
206238
begin
207239
@redis ||= connect
@@ -217,16 +249,113 @@ def list_runner(output_queue)
217249
end
218250
end
219251

220-
def list_batch_listener(redis, output_queue)
252+
#private
253+
def reset_threadpool
254+
return if @threadpool.nil?
255+
@threadpool.shutdown
256+
@threadpool.wait_for_termination
257+
@threadpool = nil
258+
end
259+
260+
# private
261+
def pattern_list_stop
262+
reset_redis
263+
reset_threadpool
264+
end
265+
266+
# private
267+
def pattern_list_process_item(redis, output_queue, key)
268+
if stop?
269+
@logger.debug("Breaking from thread #{key} as it was requested to stop")
270+
return false
271+
end
272+
value = redis.lpop(key)
273+
return false if value.nil?
274+
queue_event(value, output_queue)
275+
true
276+
end
277+
278+
# private
279+
def pattern_list_single_processor(redis, output_queue, key)
280+
(0...@pattern_list_max_items).each do
281+
break unless pattern_list_process_item(redis, output_queue, key)
282+
end
283+
end
284+
285+
# private
286+
def pattern_list_batch_processor(redis, output_queue, key)
287+
items_left = @pattern_list_max_items
288+
while items_left > 0
289+
limit = [items_left, @batch_count].min
290+
processed = process_batch(redis, output_queue, key, limit, 0)
291+
if processed.zero? || processed < limit
292+
return
293+
end
294+
items_left -= processed
295+
end
296+
end
297+
298+
# private
299+
def pattern_list_worker_consume(output_queue, key)
221300
begin
222-
results = redis.evalsha(@redis_script_sha, [@key], [@batch_count-1])
223-
results.each do |item|
224-
queue_event(item, output_queue)
301+
redis ||= connect
302+
@pattern_list_processor.call(redis, output_queue, key)
303+
rescue ::Redis::BaseError => e
304+
@logger.warn("Redis connection problem in thread for key #{key}. Sleeping a while before exiting thread.", :exception => e)
305+
sleep 1
306+
return
307+
ensure
308+
redis.quit rescue nil
309+
end
310+
end
311+
312+
# private
313+
def threadpool_capacity?
314+
@threadpool.remaining_capacity > 0
315+
end
316+
317+
# private
318+
def pattern_list_launch_worker(output_queue, key)
319+
@current_workers.add(key)
320+
@threadpool.post do
321+
begin
322+
pattern_list_worker_consume(output_queue, key)
323+
ensure
324+
@current_workers.delete(key)
225325
end
326+
end
327+
end
226328

227-
if results.size.zero?
228-
sleep BATCH_EMPTY_SLEEP
329+
# private
330+
def pattern_list_ensure_workers(output_queue)
331+
return unless threadpool_capacity?
332+
redis_runner do
333+
@redis.keys(@key).shuffle.each do |key|
334+
next if @current_workers.include?(key)
335+
pattern_list_launch_worker(output_queue, key)
336+
break unless threadpool_capacity?
229337
end
338+
end
339+
end
340+
341+
# private
342+
def pattern_list_runner(output_queue)
343+
@pattern_list_processor = batched? ? method(:pattern_list_batch_processor) : method(:pattern_list_single_processor)
344+
while !stop?
345+
init_threadpool if @threadpool.nil?
346+
pattern_list_ensure_workers(output_queue)
347+
sleep(@pattern_list_threadpool_sleep)
348+
end
349+
end
350+
351+
def process_batch(redis, output_queue, key, batch_size, sleep_time)
352+
begin
353+
results = redis.evalsha(@redis_script_sha, [key], [batch_size-1])
354+
results.each do |item|
355+
queue_event(item, output_queue)
356+
end
357+
sleep sleep_time if results.size.zero? && sleep_time > 0
358+
results.size
230359

231360
# Below is a commented-out implementation of 'batch fetch'
232361
# using pipelined LPOP calls. This in practice has been observed to
@@ -255,6 +384,10 @@ def list_batch_listener(redis, output_queue)
255384
end
256385
end
257386

387+
def list_batch_listener(redis, output_queue)
388+
process_batch(redis, output_queue, @key, @batch_count, BATCH_EMPTY_SLEEP)
389+
end
390+
258391
def list_single_listener(redis, output_queue)
259392
item = redis.blpop(@key, 0, :timeout => 1)
260393
return unless item # from timeout or other conditions

spec/inputs/redis_spec.rb

Lines changed: 101 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,48 @@ def process(conf, event_count)
6363
populate(key, event_count)
6464
process(conf, event_count)
6565
end
66+
67+
it "should read events from a list pattern" do
68+
key_base = SecureRandom.hex
69+
conf = <<-CONFIG
70+
input {
71+
redis {
72+
type => "blah"
73+
key => "#{key}.*"
74+
data_type => "pattern_list"
75+
batch_count => 1
76+
}
77+
}
78+
CONFIG
79+
total_event_count = 0
80+
(0..10).each do |idx|
81+
event_count = 100 + rand(50)
82+
total_event_count += event_count
83+
populate("#{key_base}.#{idx}", event_count)
84+
end
85+
process(conf, total_event_count)
86+
end
87+
88+
it "should read events from a list pattern using batch_count (default 125)" do
89+
key_base = SecureRandom.hex
90+
conf = <<-CONFIG
91+
input {
92+
redis {
93+
type => "blah"
94+
key => "#{key}.*"
95+
data_type => "pattern_list"
96+
batch_count => 125
97+
}
98+
}
99+
CONFIG
100+
total_event_count = 0
101+
(0..10).each do |idx|
102+
event_count = 100 + rand(50)
103+
total_event_count += event_count
104+
populate("#{key_base}.#{idx}", event_count)
105+
end
106+
process(conf, total_event_count)
107+
end
66108
end
67109

68110
# unit tests ---------------------
@@ -264,6 +306,64 @@ def process(conf, event_count)
264306
end
265307
end
266308

309+
context 'runtime for pattern_list data_type' do
310+
let(:data_type) { 'pattern_list' }
311+
let(:key) { 'foo.*' }
312+
before do
313+
subject.register
314+
subject.init_threadpool
315+
end
316+
317+
context 'close when redis is unset' do
318+
let(:quit_calls) { [:quit, :unsubscribe, :punsubscribe, :connection, :disconnect!] }
319+
320+
it 'does not attempt to quit' do
321+
allow(redis).to receive(:nil?).and_return(true)
322+
quit_calls.each do |call|
323+
expect(redis).not_to receive(call)
324+
end
325+
expect {subject.do_stop}.not_to raise_error
326+
end
327+
end
328+
329+
it 'calling the run method, adds events to the queue' do
330+
expect(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
331+
expect(redis).to receive(:lpop).at_least(:once).and_return('l1')
332+
333+
allow(redis).to receive(:connected?).and_return(connected.last)
334+
allow(redis).to receive(:quit)
335+
336+
tt = Thread.new do
337+
end_by = Time.now + 3
338+
while accumulator.size < 1 and Time.now <= end_by
339+
sleep 0.1
340+
end
341+
subject.do_stop
342+
end
343+
344+
subject.run(accumulator)
345+
346+
tt.join
347+
348+
expect(accumulator.size).to be > 0
349+
end
350+
351+
it 'multiple close calls, calls to redis once' do
352+
subject.use_redis(redis)
353+
allow(redis).to receive(:keys).at_least(:once).and_return(['foo.bar'])
354+
allow(redis).to receive(:lpop).and_return('l1')
355+
expect(redis).to receive(:connected?).and_return(connected.last)
356+
quit_calls.each do |call|
357+
expect(redis).to receive(call).at_most(:once)
358+
end
359+
360+
subject.do_stop
361+
connected.push(false) #can't use let block here so push to array
362+
expect {subject.do_stop}.not_to raise_error
363+
subject.do_stop
364+
end
365+
end
366+
267367
context 'for the subscribe data_types' do
268368
def run_it_thread(inst)
269369
Thread.new(inst) do |subj|
@@ -396,7 +496,7 @@ def close_thread(inst, rt)
396496

397497
describe LogStash::Inputs::Redis do
398498
context "when using data type" do
399-
["list", "channel", "pattern_channel"].each do |data_type|
499+
["list", "channel", "pattern_channel", "pattern_list"].each do |data_type|
400500
context data_type do
401501
it_behaves_like "an interruptible input plugin" do
402502
let(:config) { {'key' => 'foo', 'data_type' => data_type } }

0 commit comments

Comments
 (0)