From ee8f197f683d94163409379b5f836bc1fdaabf7b Mon Sep 17 00:00:00 2001 From: Robby Dyer Date: Wed, 5 Mar 2014 13:26:41 -0500 Subject: [PATCH 1/2] Add redis output congestion_attempts --- lib/logstash/outputs/redis.rb | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 2f53289..e28f625 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -95,6 +95,12 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # Zero means to check on every event. config :congestion_interval, :validate => :number, :default => 1 + # How many attempts to make to send to redis while congestion is at + # its threshold. Will try connecting to the next server in the host + # list after attempts is reached. This will take + # time_in_secs = :congestion_interval * :congestion_attempts + config :congestion_attempts, :validate => :number, :default => 10 + def register require 'redis' @@ -182,10 +188,16 @@ def receive(event) def congestion_check(key) return if @congestion_threshold == 0 + tries = 0 if (Time.now.to_i - @congestion_check_times[key]) >= @congestion_interval # Check congestion only if enough time has passed since last check. while @redis.llen(key) > @congestion_threshold # Don't push event to Redis key which has reached @congestion_threshold. @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds") sleep @congestion_interval + tries += 1 + if tries > @congestion_attempts + @redis = connect + tries = 0 + end end @congestion_check_times[key] = Time.now.to_i end From 24756c59b127cca03bd5bc476be4c4e8fa4f2046 Mon Sep 17 00:00:00 2001 From: Robby Dyer Date: Tue, 18 Mar 2014 12:53:54 -0400 Subject: [PATCH 2/2] Default redis congestion_attemps to 0 --- lib/logstash/outputs/redis.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index e28f625..3a7129f 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -99,7 +99,7 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # its threshold. Will try connecting to the next server in the host # list after attempts is reached. This will take # time_in_secs = :congestion_interval * :congestion_attempts - config :congestion_attempts, :validate => :number, :default => 10 + config :congestion_attempts, :validate => :number, :default => 0 def register require 'redis' @@ -194,7 +194,7 @@ def congestion_check(key) @logger.warn? and @logger.warn("Redis key size has hit a congestion threshold #{@congestion_threshold} suspending output for #{@congestion_interval} seconds") sleep @congestion_interval tries += 1 - if tries > @congestion_attempts + if tries > @congestion_attempts and @congestion_attempts > 0 @redis = connect tries = 0 end