@@ -77,22 +77,6 @@ module LogStash module Inputs class Redis < LogStash::Inputs::Threadable
77
77
config :pattern_list_threadpool_sleep , :validate => :number , :default => 0.2
78
78
79
79
public
80
- # public API
81
- # use to store a proc that can provide a Redis instance or mock
82
- def add_external_redis_builder ( builder ) #callable
83
- @redis_builder = builder
84
- self
85
- end
86
-
87
- # use to apply an instance directly and bypass the builder
88
- def use_redis ( instance )
89
- @redis = instance
90
- self
91
- end
92
-
93
- def new_redis_instance
94
- @redis_builder . call
95
- end
96
80
97
81
def init_threadpool
98
82
@threadpool ||= Concurrent ::ThreadPoolExecutor . new (
@@ -106,8 +90,6 @@ def init_threadpool
106
90
def register
107
91
@redis_url = @path . nil? ? "redis://#{ @password } @#{ @host } :#{ @port } /#{ @db } " : "#{ @password } @#{ @path } /#{ @db } "
108
92
109
- @redis_builder ||= method ( :internal_redis_builder )
110
-
111
93
# just switch on data_type once
112
94
if @data_type == 'list' || @data_type == 'dummy'
113
95
@run_method = method ( :list_runner )
@@ -151,30 +133,25 @@ def is_list_type?
151
133
152
134
# private
153
135
def redis_params
136
+ params = {
137
+ :timeout => @timeout ,
138
+ :db => @db ,
139
+ :password => @password . nil? ? nil : @password . value ,
140
+ :ssl => @ssl
141
+ }
142
+
154
143
if @path . nil?
155
- connectionParams = {
156
- :host => @host ,
157
- :port => @port
158
- }
144
+ params [ :host ] = @host
145
+ params [ :port ] = @port
159
146
else
160
147
@logger . warn ( "Parameter 'path' is set, ignoring parameters: 'host' and 'port'" )
161
- connectionParams = {
162
- :path => @path
163
- }
148
+ params [ :path ] = @path
164
149
end
165
150
166
- baseParams = {
167
- :timeout => @timeout ,
168
- :db => @db ,
169
- :password => @password . nil? ? nil : @password . value ,
170
- :ssl => @ssl
171
- }
172
-
173
- return connectionParams . merge ( baseParams )
151
+ params
174
152
end
175
153
176
- # private
177
- def internal_redis_builder
154
+ def new_redis_instance
178
155
::Redis . new ( redis_params )
179
156
end
180
157
@@ -183,14 +160,12 @@ def connect
183
160
redis = new_redis_instance
184
161
185
162
# register any renamed Redis commands
186
- if @command_map . any?
187
- client_command_map = redis . client . command_map
188
- @command_map . each do |name , renamed |
189
- client_command_map [ name . to_sym ] = renamed . to_sym
190
- end
163
+ @command_map . each do |name , renamed |
164
+ redis . _client . command_map [ name . to_sym ] = renamed . to_sym
191
165
end
192
166
193
167
load_batch_script ( redis ) if batched? && is_list_type?
168
+
194
169
redis
195
170
end # def connect
196
171
@@ -221,9 +196,12 @@ def queue_event(msg, output_queue, channel=nil)
221
196
222
197
# private
223
198
def reset_redis
224
- return if @redis . nil? || !@redis . connected?
199
+ redis = @redis # might change during method invocation
200
+ return if redis . nil? || !redis . connected?
225
201
226
- @redis . quit rescue nil
202
+ redis . quit rescue nil # does client.disconnect internally
203
+ # check if input retried while executing
204
+ list_stop unless redis . equal? @redis
227
205
@redis = nil
228
206
end
229
207
@@ -239,13 +217,9 @@ def list_runner(output_queue)
239
217
begin
240
218
@redis ||= connect
241
219
@list_method . call ( @redis , output_queue )
242
- rescue ::Redis ::BaseError => e
243
- @logger . warn ( "Redis connection problem" , :exception => e )
244
- # Reset the redis variable to trigger reconnect
245
- @redis = nil
246
- # this sleep does not need to be stoppable as its
247
- # in a while !stop? loop
248
- sleep 1
220
+ rescue => e
221
+ log_error ( e )
222
+ retry if reset_for_error_retry ( e )
249
223
end
250
224
end
251
225
end
@@ -400,18 +374,19 @@ def list_single_listener(redis, output_queue)
400
374
401
375
# private
402
376
def subscribe_stop
403
- return if @redis . nil? || ! @redis . connected?
404
- # if its a SubscribedClient then:
405
- # it does not have a disconnect method (yet)
406
- if @ redis. client . is_a? ( :: Redis :: SubscribedClient )
377
+ redis = @redis # might change during method invocation
378
+ return if redis . nil? || ! redis . connected?
379
+
380
+ if redis . subscribed?
407
381
if @data_type == 'pattern_channel'
408
- @ redis. client . punsubscribe
382
+ redis . punsubscribe
409
383
else
410
- @ redis. client . unsubscribe
384
+ redis . unsubscribe
411
385
end
412
- else
413
- @redis . client . disconnect
414
386
end
387
+ redis . close rescue nil # does client.disconnect
388
+ # check if input retried while executing
389
+ subscribe_stop unless redis . equal? @redis
415
390
@redis = nil
416
391
end
417
392
@@ -420,15 +395,43 @@ def redis_runner
420
395
begin
421
396
@redis ||= connect
422
397
yield
423
- rescue ::Redis ::BaseError => e
424
- @logger . warn ( "Redis connection problem" , :exception => e )
425
- # Reset the redis variable to trigger reconnect
426
- @redis = nil
427
- Stud . stoppable_sleep ( 1 ) { stop? }
428
- retry if !stop?
398
+ rescue => e
399
+ log_error ( e )
400
+ retry if reset_for_error_retry ( e )
401
+ end
402
+ end
403
+
404
+ def log_error ( e )
405
+ info = { message : e . message , exception : e . class }
406
+ info [ :backtrace ] = e . backtrace if @logger . debug?
407
+
408
+ case e
409
+ when ::Redis ::TimeoutError
410
+ # expected for channels in case no data is available
411
+ @logger . debug ( "Redis timeout, retrying" , info )
412
+ when ::Redis ::BaseConnectionError , ::Redis ::ProtocolError
413
+ @logger . warn ( "Redis connection error" , info )
414
+ when ::Redis ::BaseError
415
+ @logger . error ( "Redis error" , info )
416
+ when ::LogStash ::ShutdownSignal
417
+ @logger . debug ( "Received shutdown signal" )
418
+ else
419
+ info [ :backtrace ] ||= e . backtrace
420
+ @logger . error ( "Unexpected error" , info )
429
421
end
430
422
end
431
423
424
+ # @return [true] if operation is fine to retry
425
+ def reset_for_error_retry ( e )
426
+ return if e . is_a? ( ::LogStash ::ShutdownSignal )
427
+
428
+ # Reset the redis variable to trigger reconnect
429
+ @redis = nil
430
+
431
+ Stud . stoppable_sleep ( 1 ) { stop? }
432
+ !stop? # retry if not stop-ing
433
+ end
434
+
432
435
# private
433
436
def channel_runner ( output_queue )
434
437
redis_runner do
@@ -476,6 +479,4 @@ def pattern_channel_listener(output_queue)
476
479
end
477
480
end
478
481
479
- # end
480
-
481
482
end end end # Redis Inputs LogStash
0 commit comments