diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 9406057..b61475c 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -20,9 +20,10 @@ include::{include_path}/plugin_header.asciidoc[] ==== Description -This output will send events to a Redis queue using RPUSH. -The RPUSH command is supported in Redis v0.0.7+. Using -PUBLISH to a channel requires at least v1.3.8+. +This output will send events to a Redis queue using RPUSH +(or, optionally, LPUSH). +The RPUSH and LPUSH commands are supported in Redis v1.0.0+. +Using PUBLISH to a channel requires at least v1.3.8+. While you may be able to make these Redis versions work, the best performance and stability will be found in more recent stable versions. Versions 2.6.0+ are recommended. diff --git a/lib/logstash/outputs/redis.rb b/lib/logstash/outputs/redis.rb index 81e3c9b..f8e66c5 100644 --- a/lib/logstash/outputs/redis.rb +++ b/lib/logstash/outputs/redis.rb @@ -3,9 +3,10 @@ require "logstash/namespace" require "stud/buffer" -# This output will send events to a Redis queue using RPUSH. -# The RPUSH command is supported in Redis v0.0.7+. Using -# PUBLISH to a channel requires at least v1.3.8+. +# This output will send events to a Redis queue using RPUSH +# or LPUSH. +# The RPUSH and LPUSH commands are supported in Redis v1.0.0+. +# Using PUBLISH to a channel requires at least v1.3.8+. # While you may be able to make these Redis versions work, # the best performance and stability will be found in more # recent stable versions. Versions 2.6.0+ are recommended. @@ -56,15 +57,19 @@ class LogStash::Outputs::Redis < LogStash::Outputs::Base # valid here, for example `logstash-%{type}`. config :key, :validate => :string, :required => true - # Either list or channel. If `redis_type` is list, then we will set - # RPUSH to key. If `redis_type` is channel, then we will PUBLISH to `key`. + # Either list or channel. If `redis_type` is list, then we will set RPUSH (or + # LPUSH) to key. If `redis_type` is channel, then we will PUBLISH to `key`. config :data_type, :validate => [ "list", "channel" ], :required => true - # Set to true if you want Redis to batch up values and send 1 RPUSH command - # instead of one command per value to push on the list. Note that this only - # works with `data_type="list"` mode right now. + # Either RPUSH or LPUSH. Only relevant if `data_type` is list. Defines if + # elements shall be inserted at the head of the list (LPUSH) or the tail (RPUSH). + config :list_operation, :validate => ["RPUSH", "LPUSH"], :default => "RPUSH" + + # Set to true if you want Redis to batch up values and send 1 RPUSH (or + # LPUSH) command instead of one command per value to push on the list. + # Note that this only works with `data_type="list"` mode right now. # - # If true, we send an RPUSH every "batch_events" events or + # If true, we send an RPUSH (or LPUSH) every "batch_events" events or # "batch_timeout" seconds (whichever comes first). # Only supported for `data_type` is "list". config :batch, :validate => :boolean, :default => false @@ -154,7 +159,11 @@ def flush(events, key, close=false) # we should not block due to congestion on close # to support this Stud::Buffer#buffer_flush should pass here the :final boolean value. congestion_check(key) unless close - @redis.rpush(key, events) + if @list_operation == 'LPUSH' + @redis.lpush(key, events) + else + @redis.rpush(key, events) + end end # called from Stud::Buffer#buffer_flush when an error occurs def on_flush_error(e) @@ -219,7 +228,11 @@ def send_to_redis(event, payload) @redis ||= connect if @data_type == 'list' congestion_check(key) - @redis.rpush(key, payload) + if @list_operation == 'LPUSH' + @redis.lpush(key, payload) + else + @redis.rpush(key, payload) + end else @redis.publish(key, payload) end diff --git a/logstash-output-redis.gemspec b/logstash-output-redis.gemspec index 32e9cb1..3216100 100644 --- a/logstash-output-redis.gemspec +++ b/logstash-output-redis.gemspec @@ -3,7 +3,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-redis' s.version = '4.0.3' s.licenses = ['Apache License (2.0)'] - s.summary = "Sends events to a Redis queue using the `RPUSH` command" + s.summary = "Sends events to a Redis queue using the `RPUSH` or `LPUSH` command" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" s.authors = ["Elastic"] s.email = 'info@elastic.co'