Skip to content

Support for REDIS ZSET #46

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 55 additions & 29 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ include::{include_path}/plugin_header.asciidoc[]

==== Description

This output will send events to a Redis queue using RPUSH.
The RPUSH command is supported in Redis v0.0.7+. Using
PUBLISH to a channel requires at least v1.3.8+.
This output will send events to a Redis queue using RPUSH/ZADD/PUBLISH.

The RPUSH command is supported in Redis v0.0.7+.
Using ZADD is supported in Redis v1.2.0+.
Using PUBLISH to a channel requires at least v1.3.8+.

While you may be able to make these Redis versions work,
the best performance and stability will be found in more
recent stable versions. Versions 2.6.0+ are recommended.
Expand All @@ -43,10 +46,12 @@ This plugin supports the following configuration options plus the <<plugins-{typ
| <<plugins-{type}s-{plugin}-batch_timeout>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-congestion_interval>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-congestion_threshold>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-data_type>> |<<string,string>>, one of `["list", "channel"]`|No
| <<plugins-{type}s-{plugin}-data_type>> |<<string,string>>, one of `["list", "sortedset", "channel"]`|No
| <<plugins-{type}s-{plugin}-db>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-host>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-key>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-priority_field>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-priority_default>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-password>> |<<password,password>>|No
| <<plugins-{type}s-{plugin}-port>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-reconnect_interval>> |<<number,number>>|No
Expand All @@ -60,38 +65,38 @@ output plugins.
&nbsp;

[id="plugins-{type}s-{plugin}-batch"]
===== `batch`
===== `batch`

* Value type is <<boolean,boolean>>
* Default value is `false`

Set to true if you want Redis to batch up values and send 1 RPUSH command
instead of one command per value to push on the list. Note that this only
works with `data_type="list"` mode right now.
Set to true if you want Redis to batch up values and send 1 RPUSH or 1 ZADD command
instead of one command per value to push on the list or set. Note that this only
works with `data_type="list"` and `data_type="sortedset"` mode right now.

If true, we send an RPUSH every "batch_events" events or
If true, we send an RPUSH or ZADD every "batch_events" events or
"batch_timeout" seconds (whichever comes first).
Only supported for `data_type` is "list".
Only supported for `data_type` "list" or "sortedset".

[id="plugins-{type}s-{plugin}-batch_events"]
===== `batch_events`
===== `batch_events`

* Value type is <<number,number>>
* Default value is `50`

If batch is set to true, the number of events we queue up for an RPUSH.
If batch is set to true, the number of events we queue up for an RPUSH or ZADD.

[id="plugins-{type}s-{plugin}-batch_timeout"]
===== `batch_timeout`
===== `batch_timeout`

* Value type is <<number,number>>
* Default value is `5`

If batch is set to true, the maximum amount of time between RPUSH commands
If batch is set to true, the maximum amount of time between RPUSH or ZADD commands
when there are pending events to flush.

[id="plugins-{type}s-{plugin}-congestion_interval"]
===== `congestion_interval`
===== `congestion_interval`

* Value type is <<number,number>>
* Default value is `1`
Expand All @@ -100,38 +105,40 @@ How often to check for congestion. Default is one second.
Zero means to check on every event.

[id="plugins-{type}s-{plugin}-congestion_threshold"]
===== `congestion_threshold`
===== `congestion_threshold`

* Value type is <<number,number>>
* Default value is `0`

In case Redis `data_type` is `list` and has more than `@congestion_threshold` items,
In case Redis `data_type` is `list` or `sortedset` and has more than `@congestion_threshold` items,
block until someone consumes them and reduces congestion, otherwise if there are
no consumers Redis will run out of memory, unless it was configured with OOM protection.
But even with OOM protection, a single Redis list can block all other users of Redis,
until Redis CPU consumption reaches the max allowed RAM size.
A default value of 0 means that this limit is disabled.
Only supported for `list` Redis `data_type`.
Only supported for `list` and `sortedset` Redis `data_type`.

[id="plugins-{type}s-{plugin}-data_type"]
===== `data_type`
===== `data_type`

* Value can be any of: `list`, `channel`
* Value can be any of: `list`, `sortedset`, `channel`
* There is no default value for this setting.

Either list or channel. If `redis_type` is list, then we will set
RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`.
If `redis_type` is sortedset, then we will ZADD to `key` with weight set
to content of `priority_field`

[id="plugins-{type}s-{plugin}-db"]
===== `db`
===== `db`

* Value type is <<number,number>>
* Default value is `0`

The Redis database number.

[id="plugins-{type}s-{plugin}-host"]
===== `host`
===== `host`

* Value type is <<array,array>>
* Default value is `["127.0.0.1"]`
Expand All @@ -148,14 +155,33 @@ For example:
["127.0.0.1:6380", "127.0.0.1"]

[id="plugins-{type}s-{plugin}-key"]
===== `key`
===== `key`

* Value type is <<string,string>>
* There is no default value for this setting.

The name of a Redis list or channel. Dynamic names are
The name of a Redis list, sortedset or channel. Dynamic names are
valid here, for example `logstash-%{type}`.

[id="plugins-{type}s-{plugin}-priority_field"]
===== priority_field

* Value type is <<string,string>>
* Default value is `epoch`

Priority field to use for data_type `sortedset`, if field doesn't exist, priority will be priority_default
The score values should be the string representation of a double precision floating point number. +inf and -inf values are valid values as well. (see https://redis.io/commands/zadd)

[id="plugins-{type}s-{plugin}-priority_default"]
===== priority_default

* Value type is <<number,number>>
* Default value is `-1`

Default priority for data_type `sortedset` when priority field is not found in the event
The score values should be the string representation of a double precision floating point number. +inf and -inf values are valid values as well. (see https://redis.io/commands/zadd)


[id="plugins-{type}s-{plugin}-name"]
===== `name` (DEPRECATED)

Expand All @@ -166,15 +192,15 @@ valid here, for example `logstash-%{type}`.
Name is used for logging in case there are multiple instances.

[id="plugins-{type}s-{plugin}-password"]
===== `password`
===== `password`

* Value type is <<password,password>>
* There is no default value for this setting.

Password to authenticate with. There is no authentication by default.

[id="plugins-{type}s-{plugin}-port"]
===== `port`
===== `port`

* Value type is <<number,number>>
* Default value is `6379`
Expand All @@ -192,23 +218,23 @@ The name of the Redis queue (we'll use RPUSH on this). Dynamic names are
valid here, for example `logstash-%{type}`

[id="plugins-{type}s-{plugin}-reconnect_interval"]
===== `reconnect_interval`
===== `reconnect_interval`

* Value type is <<number,number>>
* Default value is `1`

Interval for reconnecting to failed Redis connections

[id="plugins-{type}s-{plugin}-shuffle_hosts"]
===== `shuffle_hosts`
===== `shuffle_hosts`

* Value type is <<boolean,boolean>>
* Default value is `true`

Shuffle the host list during Logstash startup.

[id="plugins-{type}s-{plugin}-timeout"]
===== `timeout`
===== `timeout`

* Value type is <<number,number>>
* Default value is `5`
Expand Down
81 changes: 63 additions & 18 deletions lib/logstash/outputs/redis.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
require "logstash/namespace"
require "stud/buffer"

# This output will send events to a Redis queue using RPUSH.
# The RPUSH command is supported in Redis v0.0.7+. Using
# PUBLISH to a channel requires at least v1.3.8+.
# This output will send events to a Redis queue using RPUSH/ZADD/PUBLISH.

# The RPUSH command is supported in Redis v0.0.7+.
# Using ZADD is supported in Redis v1.2.0+.
# Using PUBLISH to a channel requires at least v1.3.8+.

# While you may be able to make these Redis versions work,
# the best performance and stability will be found in more
# recent stable versions. Versions 2.6.0+ are recommended.
#

# For more information, see http://redis.io/[the Redis homepage]
#
class LogStash::Outputs::Redis < LogStash::Outputs::Base
Expand Down Expand Up @@ -51,7 +54,7 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base
# Password to authenticate with. There is no authentication by default.
config :password, :validate => :password

# The name of the Redis queue (we'll use RPUSH on this). Dynamic names are
# The name of the Redis queue/sortedset (using RPUSH/ZADD cmds). Dynamic names are
# valid here, for example `logstash-%{type}`
config :queue, :validate => :string, :deprecated => true

Expand All @@ -61,40 +64,50 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base

# Either list or channel. If `redis_type` is list, then we will set
# RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`.
config :data_type, :validate => [ "list", "channel" ], :required => false
# If `redis_type` is sortedset, then we will ZADD to `key` with weight set
# to content of `priority_field`
config :data_type, :validate => [ "list", "channel", "sortedset" ], :required => false

# Set to true if you want Redis to batch up values and send 1 RPUSH command
# instead of one command per value to push on the list. Note that this only
# works with `data_type="list"` mode right now.
# Set to true if you want Redis to batch up values and send 1 RPUSH or 1 ZADD command
# instead of one command per value to push on the list or set. Note that this only
# works with `data_type="list"` and `data_type="sortedset"` mode right now.
#
# If true, we send an RPUSH every "batch_events" events or
# If true, we send an RPUSH or ZADD every "batch_events" events or
# "batch_timeout" seconds (whichever comes first).
# Only supported for `data_type` is "list".
# Only supported for `data_type` "list" or "sortedset".
config :batch, :validate => :boolean, :default => false

# If batch is set to true, the number of events we queue up for an RPUSH.
# If batch is set to true, the number of events we queue up for an RPUSH or ZADD.
config :batch_events, :validate => :number, :default => 50

# If batch is set to true, the maximum amount of time between RPUSH commands
# If batch is set to true, the maximum amount of time between RPUSH or ZADD commands
# when there are pending events to flush.
config :batch_timeout, :validate => :number, :default => 5

# Interval for reconnecting to failed Redis connections
config :reconnect_interval, :validate => :number, :default => 1

# In case Redis `data_type` is `list` and has more than `@congestion_threshold` items,
# In case Redis `data_type` is `list` or `sortedset` and has more than `@congestion_threshold` items,
# block until someone consumes them and reduces congestion, otherwise if there are
# no consumers Redis will run out of memory, unless it was configured with OOM protection.
# But even with OOM protection, a single Redis list can block all other users of Redis,
# until Redis CPU consumption reaches the max allowed RAM size.
# A default value of 0 means that this limit is disabled.
# Only supported for `list` Redis `data_type`.
# Only supported for `list` and `sortedset` Redis `data_type`.
config :congestion_threshold, :validate => :number, :default => 0

# How often to check for congestion. Default is one second.
# Zero means to check on every event.
config :congestion_interval, :validate => :number, :default => 1

# Priority field to use for data_type `sortedset`, if field doesn't exist, priority will be priority_default
# The score values should be the string representation of a double precision floating point number. +inf and -inf values are valid values as well. (see https://redis.io/commands/zadd)
config :priority_field, :validate => :string, :default => "epoch"

# Default priority for data_type `sortedset` when priority field is not found in the event
# The score values should be the string representation of a double precision floating point number. +inf and -inf values are valid values as well. (see https://redis.io/commands/zadd)
config :priority_default, :validate => :number, :default => "-1"

def register
require 'redis'

Expand All @@ -118,7 +131,7 @@ def register


if @batch
if @data_type != "list"
if @data_type != "list" and @data_type != "sortedset"
raise RuntimeError.new(
"batch is not supported with data_type #{@data_type}"
)
Expand Down Expand Up @@ -176,8 +189,13 @@ def flush(events, key, close=false)
# we should not block due to congestion on close
# to support this Stud::Buffer#buffer_flush should pass here the :final boolean value.
congestion_check(key) unless close
@redis.rpush(key, events)
if @data_type == 'sortedset' then
@redis.zadd(key, events.map{ |event| [priorize(event), event] })
else
@redis.rpush(key, events)
end
end

# called from Stud::Buffer#buffer_flush when an error occurs
def on_flush_error(e)
@logger.warn("Failed to send backlog of events to Redis",
Expand Down Expand Up @@ -222,6 +240,30 @@ def connect
Redis.new(params)
end # def connect

private
def priorize(event)
if event.is_a?(String) then
begin
@codec.decode(event) do |event_decoded|
event = event_decoded
end
rescue => e # parse or event creation error
@logger.warn("Default priority [" << @priority_default.to_s << "] used, can't decode event [" << @event << "]")
return @priority_default.to_s
end
end


priority_value=event.get(@priority_field)

if priority_value.nil? || priority_value.to_s !~ /\A[-+]?[0-9]+(\.[0-9]+)?\z/ then
@logger.debug("Default priority [" << @priority_default.to_s << "] used, field [" << @priority_field << "] doesn't exist or doesn't contain a number")
priority_value=@priority_default
end

return priority_value.to_s
end

# A string used to identify a Redis instance in log messages
def identity
@name || "redis://#{@password}@#{@current_host}:#{@current_port}/#{@db} #{@data_type}:#{@key}"
Expand All @@ -231,7 +273,7 @@ def send_to_redis(event, payload)
# How can I do this sort of thing with codecs?
key = event.sprintf(@key)

if @batch && @data_type == 'list' # Don't use batched method for pubsub.
if @batch && (@data_type == 'list' or @data_type == 'sortedset') # Don't use batched method for pubsub.
# Stud::Buffer
buffer_receive(payload, key)
return
Expand All @@ -242,6 +284,9 @@ def send_to_redis(event, payload)
if @data_type == 'list'
congestion_check(key)
@redis.rpush(key, payload)
elsif @data_type == 'sortedset'
congestion_check(key)
@redis.zadd(key, priorize(event), payload)
else
@redis.publish(key, payload)
end
Expand Down
Loading