Skip to content

Commit 2769592

Browse files
committed
Use PackedForward instead of Message
See fluent/fluentd#671
1 parent 46359ba commit 2769592

File tree

2 files changed

+25
-13
lines changed

2 files changed

+25
-13
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
require 'monitor'
2121
require 'logger'
2222
require 'json'
23+
require 'base64'
24+
require 'securerandom'
2325

2426
module Fluent
2527
module Logger
@@ -130,17 +132,19 @@ def post_with_time(tag, map, time)
130132
@logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
131133
tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
132134
if @nanosecond_precision && time.is_a?(Time)
133-
write [tag, EventTime.new(time), map]
135+
write(tag, EventTime.new(time), map)
134136
else
135-
write [tag, time.to_i, map]
137+
write(tag, time.to_i, map)
136138
end
137139
end
138140

139141
def close
140142
@mon.synchronize {
141143
if @pending
142144
begin
143-
send_data(@pending)
145+
@pending.each do |tag, record|
146+
send_data([tag, record].to_msgpack)
147+
end
144148
rescue => e
145149
set_last_error(e)
146150
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
@@ -200,20 +204,22 @@ def suppress_sec
200204
end
201205
end
202206

203-
def write(msg)
207+
def write(tag, time, map)
204208
begin
205-
data = to_msgpack(msg)
209+
record = to_msgpack([time, map])
206210
rescue => e
207211
set_last_error(e)
212+
msg = [tag, time, map]
208213
@logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}")
209214
return false
210215
end
211216

212217
@mon.synchronize {
213218
if @pending
214-
@pending << data
219+
@pending[tag] << record
215220
else
216-
@pending = data
221+
@pending = Hash.new{|h, k| h[k] = "" }
222+
@pending[tag] = record
217223
end
218224

219225
# suppress reconnection burst
@@ -224,7 +230,9 @@ def write(msg)
224230
end
225231

226232
begin
227-
send_data(@pending)
233+
@pending.each do |tag, record|
234+
send_data([tag, record].to_msgpack)
235+
end
228236
@pending = nil
229237
true
230238
rescue => e
@@ -259,6 +267,7 @@ def send_data(data)
259267
# end
260268
# data = data[n..-1]
261269
#end
270+
262271
true
263272
end
264273

spec/fluent_logger_spec.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@
196196
logger_io.rewind
197197
log = logger_io.read
198198
expect(log).to match /Failed to connect/
199-
expect(log).to match /Can't send logs to/
199+
expect(log).to match /Can\'t send logs to/
200200
}
201201

202202
it ('post limit over') do
@@ -206,11 +206,11 @@
206206
expect(fluentd.queue.last).to be_nil
207207

208208
logger_io.rewind
209-
expect(logger_io.read).not_to match /Can't send logs to/
209+
expect(logger_io.read).not_to match /Can\'t send logs to/
210210

211211
logger.post('tag', {'a' => ('c' * 1000)})
212212
logger_io.rewind
213-
expect(logger_io.read).to match /Can't send logs to/
213+
expect(logger_io.read).to match /Can\'t send logs to/
214214
end
215215

216216
it ('log connect error once') do
@@ -233,8 +233,11 @@ class BufferOverflowHandler
233233

234234
def flush(messages)
235235
@buffer ||= []
236-
MessagePack::Unpacker.new.feed_each(messages) do |msg|
237-
@buffer << msg
236+
messages.each do |tag, message|
237+
unpacker = MessagePack::Unpacker.new(StringIO.new(message))
238+
unpacker.each do |time, record|
239+
@buffer << [tag, time, record]
240+
end
238241
end
239242
end
240243
end

0 commit comments

Comments
 (0)