Skip to content

Redis output to store last values #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 84 additions & 17 deletions lib/logstash/outputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
require "logstash/outputs/base"
require "logstash/namespace"
require "stud/buffer"
require "json"

# This output will send events to a Redis queue using RPUSH.
# The RPUSH command is supported in Redis v0.0.7+. Using
Expand Down Expand Up @@ -59,9 +60,24 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
# valid here, for example `logstash-%{type}`.
config :key, :validate => :string, :required => false

# Either list or channel. If `redis_type` is list, then we will set
# RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`.
config :data_type, :validate => [ "list", "channel" ], :required => false
# The size of the stack for 'last_n_values' type: number of values to keep
config :stack_size, :validate => :number, :default => 1

# Key expiration time (in seconds) for the 'last_n_values' mode.
# By default the last n values are kept forever
config :expire, :validate => :number, :default => -1

# Either list, channel, last_n_values or last_value.
# If `data_type` is `list`, then we will set
# `RPUSH` to key. If `data_type` is `channel`, then we will PUBLISH to `key`.
# If `data_type` is `last_n_values`, then we will parse the evnt object and `LPUSH` each object
# key/value under top level key `key` and `LTRIM` to `stack_size` so as to obtain a tree of 'lists'
# with the last n values of each key in the event object.
# if `data_type` is `last_value`, then we will parse the object and `SET` each object key/value under
# top level key `key`.
# In building key/value trees, the separator is `:`.
# With `last_n_values` and `last_value`, an expiration of `expire` seconds is set on each key if `expire` is > 0
config :data_type, :validate => [ "list", "channel", "last_n_values", "last_value" ], :required => false

# Set to true if you want Redis to batch up values and send 1 RPUSH command
# instead of one command per value to push on the list. Note that this only
Expand Down Expand Up @@ -118,7 +134,7 @@ def register


if @batch
if @data_type != "list"
if not ["list", "last_n_values"].include? @data_type
raise RuntimeError.new(
"batch is not supported with data_type #{@data_type}"
)
Expand Down Expand Up @@ -168,16 +184,31 @@ def congestion_check(key)
end
@congestion_check_times[key] = Time.now.to_i
end
end
end # def congestion_check

# called from Stud::Buffer#buffer_flush when there are events to flush
def flush(events, key, close=false)
@redis ||= connect
# we should not block due to congestion on close
# to support this Stud::Buffer#buffer_flush should pass here the :final boolean value.
congestion_check(key) unless close
@redis.rpush(key, events)
end
case @data_type
when 'list'
congestion_check(key)
@redis.rpush(key, payload)
when 'last_n_values'
@redis.multi
recurse_lpush(key, JSON.parse(payload))
@redis.exec
when 'last_value'
@redis.multi
recurse_set(key, JSON.parse(payload))
@redis.exec
else
@redis.publish(key, payload)
end
end # def flush

# called from Stud::Buffer#buffer_flush when an error occurs
def on_flush_error(e)
@logger.warn("Failed to send backlog of events to Redis",
Expand All @@ -186,7 +217,7 @@ def on_flush_error(e)
:backtrace => e.backtrace
)
@redis = connect
end
end # def on_flush_error

def close
if @batch
Expand All @@ -196,7 +227,7 @@ def close
@redis.quit
@redis = nil
end
end
end # def close

private
def connect
Expand Down Expand Up @@ -227,24 +258,60 @@ def identity
@name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}"
end

def recurse_lpush(key, payload)
payload.each do |k, value|
if value.is_a?(Hash)
recurse_lpush(key+":"+k, value)
else
@redis.lpush(key+":"+k, value)
@redis.ltrim(key+":"+k, 0, @stack_size - 1)
if @expire > 0
@redis.expire(key+":"+k, @expire)
end
end
end
end # def recurse_lpush

def recurse_set(key, payload)
payload.each do |k, value|
if value.is_a?(Hash)
recurse_set(key+":"+k, value)
else
@redis.set(key+":"+k, value)
if @expire > 0
@redis.expire(key+":"+k, @expire)
end
end
end
end # def recurse_set

def send_to_redis(event, payload)
# How can I do this sort of thing with codecs?
key = event.sprintf(@key)

if @batch && @data_type == 'list' # Don't use batched method for pubsub.
if @batch and ['list', 'last_n_values'].include? @data_type # Don't use batched method for pubsub.
# Stud::Buffer
buffer_receive(payload, key)
return
end

begin
@redis ||= connect
if @data_type == 'list'
congestion_check(key)
@redis.rpush(key, payload)
else
@redis.publish(key, payload)
end
case @data_type
when 'list'
congestion_check(key)
@redis.rpush(key, payload)
when 'last_n_values'
@redis.multi
recurse_lpush(key, JSON.parse(payload))
@redis.exec
when 'last_value'
@redis.multi
recurse_set(key, JSON.parse(payload))
@redis.exec
else
@redis.publish(key, payload)
end
rescue => e
@logger.warn("Failed to send event to Redis", :event => event,
:identity => identity, :exception => e,
Expand All @@ -253,5 +320,5 @@ def send_to_redis(event, payload)
@redis = nil
retry
end
end
end # def send_to_redis
end
4 changes: 2 additions & 2 deletions logstash-output-redis.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-redis'
s.version = '3.0.1'
s.version = '2.1.0'
s.licenses = ['Apache License (2.0)']
s.summary = "This output will send events to a Redis queue using RPUSH"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand All @@ -20,7 +20,7 @@ Gem::Specification.new do |s|
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" }

# Gem dependencies
s.add_runtime_dependency "logstash-core-plugin-api", "~> 2.0"
s.add_runtime_dependency "logstash-core-plugin-api", "~> 1.0"

s.add_runtime_dependency 'redis'
s.add_runtime_dependency 'stud'
Expand Down