From ef4d9e222487b8667fb077639f748d45f830f727 Mon Sep 17 00:00:00 2001 From: Luke Demi Date: Thu, 4 Feb 2016 21:33:34 +0000 Subject: [PATCH 1/4] add the increment data type and optional auto expire --- CHANGELOG.md | 3 +++ lib/logstash/outputs/redis.rb | 18 +++++++++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 53045b4..5cdd539 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 2.1.0 + - Add in the increment "data type" to increment keys in redis as well as an expire option to reset it. + ## 2.0.0 - Plugins were updated to follow the new shutdown semantic, this mainly allows Logstash to instruct input plugins to terminate gracefully, instead of using Thread.raise on the plugins' threads. Ref: https://github.com/elastic/logstash/pull/3895 diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 9274f8f..d8b04c6 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -58,10 +58,15 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # TODO set required true config :key, :validate => :string, :required => false - # Either list or channel. If `redis_type` is list, then we will set - # RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`. + # Either list, channel, or increment. If `redis_type` is list or increment, then we + # will set RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`. # TODO set required true - config :data_type, :validate => [ "list", "channel" ], :required => false + config :data_type, :validate => [ "list", "channel", "increment" ], :required => false + + # If using increment, set the key to expire at this interval in seconds if not + # set, key will just keep increasing. + # TODO set required true + config :expire, :validate => :number, :required => false, :default => 0 # Set to true if you want Redis to batch up values and send 1 RPUSH command # instead of one command per value to push on the list. Note that this only @@ -167,6 +172,13 @@ def receive(event) if @data_type == 'list' congestion_check(key) @redis.rpush(key, payload) + elsif @data_type == 'increment' + if @expire == 0 + @redis.incr(key) + else + lua_code = "local v = redis.call('INCR', KEYS[1]) if v == 1 then redis.call('EXPIRE', KEYS[1], KEYS[2]) end return v" + @redis.eval(lua_code, [key, @expire]) + end else @redis.publish(key, payload) end From 6ed684b2b25b10287956532aaa6eed0791ba85dd Mon Sep 17 00:00:00 2001 From: Luke Demi Date: Thu, 4 Feb 2016 21:48:43 +0000 Subject: [PATCH 2/4] couple quick comments --- lib/logstash/outputs/redis.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index d8b04c6..695d7a7 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -174,8 +174,10 @@ def receive(event) @redis.rpush(key, payload) elsif @data_type == 'increment' if @expire == 0 + # if we don't need an expire, increment forever, and ever @redis.incr(key) else + # in order to ensure that new keys are given an expire, we need to use some lua lua_code = "local v = redis.call('INCR', KEYS[1]) if v == 1 then redis.call('EXPIRE', KEYS[1], KEYS[2]) end return v" @redis.eval(lua_code, [key, @expire]) end From 72bfc08b22f608aefd51c28db51d32e76fbbf6f8 Mon Sep 17 00:00:00 2001 From: Luke Demi Date: Thu, 11 Feb 2016 18:46:56 +0000 Subject: [PATCH 3/4] dont need to limit to just the logstash v2 --- logstash-output-redis.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-redis.gemspec b/logstash-output-redis.gemspec index b78d126..6076e96 100644 --- a/logstash-output-redis.gemspec +++ b/logstash-output-redis.gemspec @@ -20,7 +20,7 @@ Gem::Specification.new do |s| s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } # Gem dependencies - s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" + #s.add_runtime_dependency "logstash-core", ">= 2.0.0.beta2", "< 3.0.0" s.add_runtime_dependency 'redis' s.add_runtime_dependency 'stud' From d2799e8d31801e0ab820cd70ec1136d46047f4c4 Mon Sep 17 00:00:00 2001 From: Luke Demi Date: Thu, 11 Feb 2016 18:47:32 +0000 Subject: [PATCH 4/4] bump version in gemspec --- logstash-output-redis.gemspec | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/logstash-output-redis.gemspec b/logstash-output-redis.gemspec index 6076e96..7b0e498 100644 --- a/logstash-output-redis.gemspec +++ b/logstash-output-redis.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-redis' - s.version = '2.0.2' + s.version = '2.1.0' s.licenses = ['Apache License (2.0)'] s.summary = "This output will send events to a Redis queue using RPUSH" s.description = "This gem is a logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/plugin install gemname. This gem is not a stand-alone program"