diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index daeb5bd..2394ce5 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -2,6 +2,7 @@ require "logstash/outputs/base" require "logstash/namespace" require "stud/buffer" +require "json" # This output will send events to a Redis queue using RPUSH. # The RPUSH command is supported in Redis v0.0.7+. Using @@ -59,9 +60,24 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # valid here, for example `logstash-%{type}`. config :key, :validate => :string, :required => false - # Either list or channel. If `redis_type` is list, then we will set - # RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`. - config :data_type, :validate => [ "list", "channel" ], :required => false + # The size of the stack for 'last_n_values' type: number of values to keep + config :stack_size, :validate => :number, :default => 1 + + # Key expiration time (in seconds) for the 'last_n_values' mode. + # By default the last n values are kept forever + config :expire, :validate => :number, :default => -1 + + # Either list, channel, last_n_values or last_value. + # If `data_type` is `list`, then we will set + # `RPUSH` to key. If `data_type` is `channel`, then we will PUBLISH to `key`. + # If `data_type` is `last_n_values`, then we will parse the evnt object and `LPUSH` each object + # key/value under top level key `key` and `LTRIM` to `stack_size` so as to obtain a tree of 'lists' + # with the last n values of each key in the event object. + # if `data_type` is `last_value`, then we will parse the object and `SET` each object key/value under + # top level key `key`. + # In building key/value trees, the separator is `:`. + # With `last_n_values` and `last_value`, an expiration of `expire` seconds is set on each key if `expire` is > 0 + config :data_type, :validate => [ "list", "channel", "last_n_values", "last_value" ], :required => false # Set to true if you want Redis to batch up values and send 1 RPUSH command # instead of one command per value to push on the list. Note that this only @@ -118,7 +134,7 @@ def register if @batch - if @data_type != "list" + if not ["list", "last_n_values"].include? @data_type raise RuntimeError.new( "batch is not supported with data_type #{@data_type}" ) @@ -168,7 +184,7 @@ def congestion_check(key) end @congestion_check_times[key] = Time.now.to_i end - end + end # def congestion_check # called from Stud::Buffer#buffer_flush when there are events to flush def flush(events, key, close=false) @@ -176,8 +192,23 @@ def flush(events, key, close=false) # we should not block due to congestion on close # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value. congestion_check(key) unless close - @redis.rpush(key, events) - end + case @data_type + when 'list' + congestion_check(key) + @redis.rpush(key, payload) + when 'last_n_values' + @redis.multi + recurse_lpush(key, JSON.parse(payload)) + @redis.exec + when 'last_value' + @redis.multi + recurse_set(key, JSON.parse(payload)) + @redis.exec + else + @redis.publish(key, payload) + end + end # def flush + # called from Stud::Buffer#buffer_flush when an error occurs def on_flush_error(e) @logger.warn("Failed to send backlog of events to Redis", @@ -186,7 +217,7 @@ def on_flush_error(e) :backtrace => e.backtrace ) @redis = connect - end + end # def on_flush_error def close if @batch @@ -196,7 +227,7 @@ def close @redis.quit @redis = nil end - end + end # def close private def connect @@ -227,11 +258,38 @@ def identity @name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}" end + def recurse_lpush(key, payload) + payload.each do |k, value| + if value.is_a?(Hash) + recurse_lpush(key+":"+k, value) + else + @redis.lpush(key+":"+k, value) + @redis.ltrim(key+":"+k, 0, @stack_size - 1) + if @expire > 0 + @redis.expire(key+":"+k, @expire) + end + end + end + end # def recurse_lpush + + def recurse_set(key, payload) + payload.each do |k, value| + if value.is_a?(Hash) + recurse_set(key+":"+k, value) + else + @redis.set(key+":"+k, value) + if @expire > 0 + @redis.expire(key+":"+k, @expire) + end + end + end + end # def recurse_set + def send_to_redis(event, payload) # How can I do this sort of thing with codecs? key = event.sprintf(@key) - if @batch && @data_type == 'list' # Don't use batched method for pubsub. + if @batch and ['list', 'last_n_values'].include? @data_type # Don't use batched method for pubsub. # Stud::Buffer buffer_receive(payload, key) return @@ -239,12 +297,21 @@ def send_to_redis(event, payload) begin @redis ||= connect - if @data_type == 'list' - congestion_check(key) - @redis.rpush(key, payload) - else - @redis.publish(key, payload) - end + case @data_type + when 'list' + congestion_check(key) + @redis.rpush(key, payload) + when 'last_n_values' + @redis.multi + recurse_lpush(key, JSON.parse(payload)) + @redis.exec + when 'last_value' + @redis.multi + recurse_set(key, JSON.parse(payload)) + @redis.exec + else + @redis.publish(key, payload) + end rescue => e @logger.warn("Failed to send event to Redis", :event => event, :identity => identity, :exception => e, @@ -253,5 +320,5 @@ def send_to_redis(event, payload) @redis = nil retry end - end + end # def send_to_redis end diff --git a/logstash-output-redis.gemspec b/logstash-output-redis.gemspec index 332dfa1..c4fb010 100644 --- a/logstash-output-redis.gemspec +++ b/logstash-output-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-redis' - s.version = '3.0.1' + s.version = '2.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "This output will send events to a Redis queue using RPUSH" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -20,7 +20,7 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } # Gem dependencies - s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0" + s.add_runtime_dependency "logstash-core-plugin-api", "~> 1.0" s.add_runtime_dependency 'redis' s.add_runtime_dependency 'stud'