Skip to content

Commit 2fd8df8

Browse files
author
Hariprasad Selvaraj (hselvara)
committed
avoiding overwriting of tags with key named as 'tags'
1 parent 6734f1c commit 2fd8df8

File tree

1 file changed

+114
-0
lines changed

1 file changed

+114
-0
lines changed

fluent.rb

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
# encoding: utf-8
2+
require "logstash/codecs/base"
3+
require "logstash/util/charset"
4+
require "logstash/timestamp"
5+
require "logstash/util"
6+
7+
# This codec handles fluentd's msgpack schema.
8+
#
9+
# For example, you can receive logs from `fluent-logger-ruby` with:
10+
# [source,ruby]
11+
# input {
12+
# tcp {
13+
# codec => fluent
14+
# port => 4000
15+
# }
16+
# }
17+
#
18+
# And from your ruby code in your own application:
19+
# [source,ruby]
20+
# logger = Fluent::Logger::FluentLogger.new(nil, :host => "example.log", :port => 4000)
21+
# logger.post("some_tag", { "your" => "data", "here" => "yay!" })
22+
#
23+
# Notes:
24+
#
25+
# * the fluent uses a second-precision time for events, so you will never see
26+
# subsecond precision on events processed by this codec.
27+
#
28+
class LogStash::Codecs::Fluent < LogStash::Codecs::Base
29+
config_name "fluent"
30+
31+
def register
32+
require "msgpack"
33+
@decoder = MessagePack::Unpacker.new
34+
end
35+
36+
def decode(data, &block)
37+
@decoder.feed_each(data) do |item|
38+
decode_event(item, &block)
39+
end
40+
end # def decode
41+
42+
def encode(event)
43+
tag = event.get("tags") || "log"
44+
45+
epochtime = event.timestamp.to_i
46+
47+
# use normalize to make sure returned Hash is pure Ruby for
48+
# MessagePack#pack which relies on pure Ruby object recognition
49+
data = LogStash::Util.normalize(event.to_hash)
50+
# timestamp is serialized as a iso8601 string
51+
# merge to avoid modifying data which could have side effects if multiple outputs
52+
@on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]))
53+
end # def encode
54+
55+
private
56+
57+
def decode_event(data, &block)
58+
tag = data[0]
59+
entries = data[1]
60+
61+
case entries
62+
when String
63+
# PackedForward
64+
option = data[2]
65+
compressed = (option && option['compressed'] == 'gzip')
66+
if compressed
67+
raise(LogStash::Error, "PackedForward with compression is not supported")
68+
end
69+
70+
entries_decoder = MessagePack::Unpacker.new
71+
entries_decoder.feed_each(entries) do |entry|
72+
epochtime = entry[0]
73+
map = entry[1]
74+
arr= []
75+
event = LogStash::Event.new(map.merge!(
76+
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
77+
"tags" => tag
78+
){ |key, v1, v2| if (v1 == v2) then arr.insert(0,v1) else arr.insert(0,v1,v2) end} )
79+
yield event
80+
end
81+
when Array
82+
# Forward
83+
entries.each do |entry|
84+
epochtime = entry[0]
85+
map = entry[1]
86+
arr= []
87+
event = LogStash::Event.new(map.merge!(
88+
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
89+
"tags" => tag
90+
){ |key, v1, v2| if (v1 == v2) then arr.insert(0,v1) else arr.insert(0,v1,v2) end})
91+
yield event
92+
end
93+
when Fixnum
94+
# Message
95+
epochtime = entries
96+
map = data[2]
97+
arr = []
98+
#put "the map entry during fixnum #{map}"
99+
event = LogStash::Event.new(map.merge!(
100+
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
101+
"tags" => tag
102+
){ |key, v1, v2| if (v1 == v2) then arr.insert(0,v1) else arr.insert(0,v1,v2) end})
103+
yield event
104+
else
105+
raise(LogStash::Error, "Unknown event type")
106+
end
107+
rescue StandardError => e
108+
@logger.error("Fluent parse error, original data now in message field", :error => e, :data => data)
109+
yield LogStash::Event.new("message" => data, "tags" => [ "_fluentparsefailure" ])
110+
end
111+
112+
end # class LogStash::Codecs::Fluent
113+
114+

0 commit comments

Comments
 (0)