Skip to content

Commit 4f98038

Browse files
committed
Use PackedForward instead of Message
See fluent/fluentd#671
1 parent b64c3f8 commit 4f98038

File tree

2 files changed

+25
-13
lines changed

2 files changed

+25
-13
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
require 'monitor'
2121
require 'logger'
2222
require 'json'
23+
require 'base64'
24+
require 'securerandom'
2325

2426
module Fluent
2527
module Logger
@@ -131,17 +133,19 @@ def post_with_time(tag, map, time)
131133
@logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
132134
tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
133135
if @nanosecond_precision && time.is_a?(Time)
134-
write [tag, EventTime.new(time.to_i, time.nsec), map]
136+
write(tag, EventTime.new(time.to_i, time.nsec), map)
135137
else
136-
write [tag, time.to_i, map]
138+
write(tag, time.to_i, map)
137139
end
138140
end
139141

140142
def close
141143
@mon.synchronize {
142144
if @pending
143145
begin
144-
send_data(@pending)
146+
@pending.each do |tag, record|
147+
send_data([tag, record].to_msgpack)
148+
end
145149
rescue => e
146150
set_last_error(e)
147151
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
@@ -201,20 +205,22 @@ def suppress_sec
201205
end
202206
end
203207

204-
def write(msg)
208+
def write(tag, time, map)
205209
begin
206-
data = to_msgpack(msg)
210+
record = to_msgpack([time, map])
207211
rescue => e
208212
set_last_error(e)
213+
msg = [tag, time, map]
209214
@logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}")
210215
return false
211216
end
212217

213218
@mon.synchronize {
214219
if @pending
215-
@pending << data
220+
@pending[tag] << record
216221
else
217-
@pending = data
222+
@pending = Hash.new{|h, k| h[k] = "" }
223+
@pending[tag] = record
218224
end
219225

220226
# suppress reconnection burst
@@ -225,7 +231,9 @@ def write(msg)
225231
end
226232

227233
begin
228-
send_data(@pending)
234+
@pending.each do |tag, record|
235+
send_data([tag, record].to_msgpack)
236+
end
229237
@pending = nil
230238
true
231239
rescue => e
@@ -260,6 +268,7 @@ def send_data(data)
260268
# end
261269
# data = data[n..-1]
262270
#end
271+
263272
true
264273
end
265274

spec/fluent_logger_spec.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@
196196
logger_io.rewind
197197
log = logger_io.read
198198
expect(log).to match /Failed to connect/
199-
expect(log).to match /Can't send logs to/
199+
expect(log).to match /Can\'t send logs to/
200200
}
201201

202202
it ('post limit over') do
@@ -206,11 +206,11 @@
206206
expect(fluentd.queue.last).to be_nil
207207

208208
logger_io.rewind
209-
expect(logger_io.read).not_to match /Can't send logs to/
209+
expect(logger_io.read).not_to match /Can\'t send logs to/
210210

211211
logger.post('tag', {'a' => ('c' * 1000)})
212212
logger_io.rewind
213-
expect(logger_io.read).to match /Can't send logs to/
213+
expect(logger_io.read).to match /Can\'t send logs to/
214214
end
215215

216216
it ('log connect error once') do
@@ -233,8 +233,11 @@ class BufferOverflowHandler
233233

234234
def flush(messages)
235235
@buffer ||= []
236-
MessagePack::Unpacker.new.feed_each(messages) do |msg|
237-
@buffer << msg
236+
messages.each do |tag, message|
237+
unpacker = MessagePack::Unpacker.new(StringIO.new(message))
238+
unpacker.each do |time, record|
239+
@buffer << [tag, time, record]
240+
end
238241
end
239242
end
240243
end

0 commit comments

Comments
 (0)