Skip to content

Commit 6734f1c

Browse files
authored
Merge pull request #9 from tes/fluentd-compat
Fluentd / FluentBit out_forward compat
2 parents 6305e3c + 0fb3461 commit 6734f1c

File tree

3 files changed

+112
-14
lines changed

3 files changed

+112
-14
lines changed

lib/logstash/codecs/fluent.rb

Lines changed: 56 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,17 @@
2828
class LogStash::Codecs::Fluent < LogStash::Codecs::Base
2929
config_name "fluent"
3030

31-
public
3231
def register
3332
require "msgpack"
3433
@decoder = MessagePack::Unpacker.new
3534
end
3635

37-
public
38-
def decode(data)
39-
@decoder.feed(data)
40-
@decoder.each do |tag, epochtime, map|
41-
event = LogStash::Event.new(map.merge(
42-
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
43-
"tags" => tag
44-
))
45-
yield event
36+
def decode(data, &block)
37+
@decoder.feed_each(data) do |item|
38+
decode_event(item, &block)
4639
end
4740
end # def decode
4841

49-
public
5042
def encode(event)
5143
tag = event.get("tags") || "log"
5244
epochtime = event.timestamp.to_i
@@ -59,4 +51,57 @@ def encode(event)
5951
@on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]))
6052
end # def encode
6153

54+
private
55+
56+
def decode_event(data, &block)
57+
tag = data[0]
58+
entries = data[1]
59+
60+
case entries
61+
when String
62+
# PackedForward
63+
option = data[2]
64+
compressed = (option && option['compressed'] == 'gzip')
65+
if compressed
66+
raise(LogStash::Error, "PackedForward with compression is not supported")
67+
end
68+
69+
entries_decoder = MessagePack::Unpacker.new
70+
entries_decoder.feed_each(entries) do |entry|
71+
epochtime = entry[0]
72+
map = entry[1]
73+
event = LogStash::Event.new(map.merge(
74+
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
75+
"tags" => [ tag ]
76+
))
77+
yield event
78+
end
79+
when Array
80+
# Forward
81+
entries.each do |entry|
82+
epochtime = entry[0]
83+
map = entry[1]
84+
event = LogStash::Event.new(map.merge(
85+
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
86+
"tags" => [ tag ]
87+
))
88+
yield event
89+
end
90+
when Fixnum
91+
# Message
92+
epochtime = entries
93+
map = data[2]
94+
event = LogStash::Event.new(map.merge(
95+
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
96+
"tags" => [ tag ]
97+
))
98+
yield event
99+
else
100+
raise(LogStash::Error, "Unknown event type")
101+
end
102+
rescue StandardError => e
103+
@logger.error("Fluent parse error, original data now in message field", :error => e, :data => data)
104+
yield LogStash::Event.new("message" => data, "tags" => [ "_fluentparsefailure" ])
105+
end
106+
62107
end # class LogStash::Codecs::Fluent

logstash-codec-fluent.gemspec

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Gem::Specification.new do |s|
22

33
s.name = 'logstash-codec-fluent'
4-
s.version = '3.0.2'
4+
s.version = '3.1.1'
55
s.licenses = ['Apache License (2.0)']
66
s.summary = "This codec handles fluentd's msgpack schema."
77
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -29,6 +29,6 @@ Gem::Specification.new do |s|
2929
s.add_runtime_dependency 'msgpack'
3030
end
3131

32-
s.add_development_dependency 'logstash-devutils'
32+
s.add_development_dependency 'logstash-devutils', ">= 1.0.0"
3333
end
3434

spec/codecs/fluent_spec.rb

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,60 @@
3737

3838
it "should decode without errors" do
3939
subject.decode(message) do |event|
40-
expect(event.get("name")).to eq("foo")
40+
expect(event.get("name")).to eq("foo")
41+
end
42+
end
43+
44+
end
45+
46+
describe "event decoding (buckets of events)" do
47+
48+
let(:tag) { "mytag" }
49+
let(:epochtime) { event.timestamp.to_i }
50+
let(:data) { LogStash::Util.normalize(event.to_hash) }
51+
let(:message) do
52+
MessagePack.pack([tag,
53+
[
54+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
55+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)],
56+
[epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]
57+
]
58+
])
59+
end
60+
61+
it "should decode without errors" do
62+
count = 0
63+
64+
subject.decode(message) do |event|
65+
expect(event.get("name")).to eq("foo")
66+
count += 1
67+
end
68+
69+
expect(count).to eq(3)
70+
end
71+
72+
end
73+
74+
describe "event decoding (broken package)" do
75+
76+
let(:tag) { "mytag" }
77+
let(:epochtime) { event.timestamp.to_s }
78+
let(:data) { LogStash::Util.normalize(event.to_hash) }
79+
let(:message) do
80+
MessagePack.pack([tag,
81+
epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)
82+
])
83+
end
84+
85+
it "should decode with errors" do
86+
subject.decode(message) do |event|
87+
expect(event.get("name")).not_to eq("foo")
88+
end
89+
end
90+
91+
it "should inject a failure event" do
92+
subject.decode(message) do |event|
93+
expect(event.get("tags")).to include("_fluentparsefailure")
4194
end
4295
end
4396

0 commit comments

Comments
 (0)