Skip to content

Commit 9eb1ec2

Browse files
committed
Use PackedForward instead of Message
See fluent/fluentd#671
1 parent 77a423c commit 9eb1ec2

File tree

2 files changed

+26
-14
lines changed

2 files changed

+26
-14
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
require 'monitor'
2121
require 'logger'
2222
require 'json'
23+
require 'base64'
24+
require 'securerandom'
2325

2426
module Fluent
2527
module Logger
@@ -98,14 +100,16 @@ def last_error
98100
def post_with_time(tag, map, time)
99101
@logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
100102
tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
101-
write [tag, time.to_i, map]
103+
write(tag, time.to_i, map)
102104
end
103105

104106
def close
105107
@mon.synchronize {
106108
if @pending
107109
begin
108-
send_data(@pending)
110+
@pending.each do |tag, record|
111+
send_data([tag, record].to_msgpack)
112+
end
109113
rescue => e
110114
set_last_error(e)
111115
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
@@ -153,36 +157,40 @@ def suppress_sec
153157
end
154158
end
155159

156-
def write(msg)
160+
def write(tag, time, map)
157161
begin
158-
data = to_msgpack(msg)
162+
record = to_msgpack([time, map])
159163
rescue => e
160164
set_last_error(e)
165+
msg = [tag, time, map]
161166
@logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}")
162167
return false
163168
end
164169

165170
@mon.synchronize {
166171
if @pending
167-
@pending << data
172+
@pending[tag] << record
168173
else
169-
@pending = data
174+
@pending = Hash.new{|h, k| h[k] = "" }
175+
@pending[tag] = record
170176
end
171177

172178
# suppress reconnection burst
173-
if !@connect_error_history.empty? && @pending.bytesize <= @limit
179+
if !@connect_error_history.empty? && @pending.to_s.bytesize <= @limit
174180
if Time.now.to_i - @connect_error_history.last < suppress_sec
175181
return false
176182
end
177183
end
178184

179185
begin
180-
send_data(@pending)
186+
@pending.each do |tag, record|
187+
send_data([tag, record].to_msgpack)
188+
end
181189
@pending = nil
182190
true
183191
rescue => e
184192
set_last_error(e)
185-
if @pending.bytesize > @limit
193+
if @pending.to_s.bytesize > @limit
186194
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
187195
call_buffer_overflow_handler(@pending)
188196
@pending = nil
@@ -212,6 +220,7 @@ def send_data(data)
212220
# end
213221
# data = data[n..-1]
214222
#end
223+
215224
true
216225
end
217226

spec/fluent_logger_spec.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@
157157
logger_io.rewind
158158
log = logger_io.read
159159
expect(log).to match /Failed to connect/
160-
expect(log).to match /Can't send logs to/
160+
expect(log).to match /Can\'t send logs to/
161161
}
162162

163163
it ('post limit over') do
@@ -167,11 +167,11 @@
167167
expect(fluentd.queue.last).to be_nil
168168

169169
logger_io.rewind
170-
expect(logger_io.read).not_to match /Can't send logs to/
170+
expect(logger_io.read).not_to match /Can\'t send logs to/
171171

172172
logger.post('tag', {'a' => ('c' * 1000)})
173173
logger_io.rewind
174-
expect(logger_io.read).to match /Can't send logs to/
174+
expect(logger_io.read).to match /Can\'t send logs to/
175175
end
176176

177177
it ('log connect error once') do
@@ -194,8 +194,11 @@ class BufferOverflowHandler
194194

195195
def flush(messages)
196196
@buffer ||= []
197-
MessagePack::Unpacker.new.feed_each(messages) do |msg|
198-
@buffer << msg
197+
messages.each do |tag, message|
198+
unpacker = MessagePack::Unpacker.new(StringIO.new(message))
199+
unpacker.each do |time, record|
200+
@buffer << [tag, time, record]
201+
end
199202
end
200203
end
201204
end

0 commit comments

Comments
 (0)