diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 9274f8f..13e41d1 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -171,11 +171,7 @@ def receive(event) @redis.publish(key, payload) end rescue => e - @logger.warn("Failed to send event to Redis", :event => event, - :identity => identity, :exception => e, - :backtrace => e.backtrace) - sleep @reconnect_interval - @redis = nil + on_send_error(e, true) retry end end # def receive @@ -200,15 +196,29 @@ def flush(events, key, close=false) @redis.rpush(key, events) end # called from Stud::Buffer#buffer_flush when an error occurs - def on_flush_error(e) + def on_send_error(e, _sleep=false) + if e.is_a?(Redis::CommandError) + @logger.warn("Redis write rejected: #{e.message}", :identity => identity) + sleep @reconnect_interval if _sleep + return + end + @logger.warn("Failed to send backlog of events to Redis", :identity => identity, :exception => e, :backtrace => e.backtrace ) + + begin + @redis.disconnect + rescue + end + sleep @reconnect_interval if _sleep @redis = connect end + alias_method :on_flush_error, :on_send_error + def close if @batch buffer_flush(:final => true)