diff --git a/CHANGELOG.md b/CHANGELOG.md index 53045b4..5cdd539 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 2.1.0 + - Add in the increment "data type" to increment keys in redis as well as an expire option to reset it. + ## 2.0.0 - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 9274f8f..695d7a7 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -58,10 +58,15 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # TODO set required true config :key, :validate => :string, :required => false - # Either list or channel. If `redis_type` is list, then we will set - # RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`. + # Either list, channel, or increment. If `redis_type` is list or increment, then we + # will set RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`. # TODO set required true - config :data_type, :validate => [ "list", "channel" ], :required => false + config :data_type, :validate => [ "list", "channel", "increment" ], :required => false + + # If using increment, set the key to expire at this interval in seconds if not + # set, key will just keep increasing. + # TODO set required true + config :expire, :validate => :number, :required => false, :default => 0 # Set to true if you want Redis to batch up values and send 1 RPUSH command # instead of one command per value to push on the list. Note that this only @@ -167,6 +172,15 @@ def receive(event) if @data_type == 'list' congestion_check(key) @redis.rpush(key, payload) + elsif @data_type == 'increment' + if @expire == 0 + # if we don't need an expire, increment forever, and ever + @redis.incr(key) + else + # in order to ensure that new keys are given an expire, we need to use some lua + lua_code = "local v = redis.call('INCR', KEYS[1]) if v == 1 then redis.call('EXPIRE', KEYS[1], KEYS[2]) end return v" + @redis.eval(lua_code, [key, @expire]) + end else @redis.publish(key, payload) end diff --git a/logstash-output-redis.gemspec b/logstash-output-redis.gemspec index b78d126..7b0e498 100644 --- a/logstash-output-redis.gemspec +++ b/logstash-output-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-redis' - s.version = '2.0.2' + s.version = '2.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "This output will send events to a Redis queue using RPUSH" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program" @@ -20,7 +20,7 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } # Gem dependencies - s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" + #s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" s.add_runtime_dependency 'redis' s.add_runtime_dependency 'stud'