diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 2f53289..3a7129f 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -95,6 +95,12 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # Zero means to check on every event. config :congestion_interval, :validate => :number, :default => 1 + # How many attempts to make to send to redis while congestion is at + # its threshold. Will try connecting to the next server in the host + # list after attempts is reached. This will take + # time_in_secs = :congestion_interval * :congestion_attempts + config :congestion_attempts, :validate => :number, :default => 0 + def register require 'redis' @@ -182,10 +188,16 @@ def receive(event) def congestion_check(key) return if @congestion_threshold == 0 + tries = 0 if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check. while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold. @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds") sleep @congestion_interval + tries += 1 + if tries > @congestion_attempts and @congestion_attempts > 0 + @redis = connect + tries = 0 + end end @congestion_check_times[key] = Time.now.to_i end