Skip to content

Commit a52820a

Browse files
committed
Add require_ack_response option
1 parent 36f115f commit a52820a

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,9 @@ def initialize(tag_prefix = nil, *args)
9090
end
9191
@packer = @factory.packer
9292

93+
@require_ack_response = options[:require_ack_response]
94+
@ack_response_timeout = options[:ack_response_timeout] || 190
95+
9396
@mon = Monitor.new
9497
@pending = nil
9598
@connect_error_history = []
@@ -144,7 +147,7 @@ def close
144147
if @pending
145148
begin
146149
@pending.each do |tag, record|
147-
send_data([tag, record].to_msgpack)
150+
send_data(tag, record)
148151
end
149152
rescue => e
150153
set_last_error(e)
@@ -232,7 +235,7 @@ def write(tag, time, map)
232235

233236
begin
234237
@pending.each do |tag, record|
235-
send_data([tag, record].to_msgpack)
238+
send_data(tag, record)
236239
end
237240
@pending = nil
238241
true
@@ -250,11 +253,17 @@ def write(tag, time, map)
250253
}
251254
end
252255

253-
def send_data(data)
256+
def send_data(tag, record)
254257
unless connect?
255258
connect!
256259
end
257-
@con.write data
260+
if @require_ack_response
261+
option = {}
262+
option['chunk'] = generate_chunk
263+
@con.write [tag, record, option].to_msgpack
264+
else
265+
@con.write [tag, record].to_msgpack
266+
end
258267
#while true
259268
# puts "sending #{data.length} bytes"
260269
# if data.length > 32*1024
@@ -269,6 +278,21 @@ def send_data(data)
269278
# data = data[n..-1]
270279
#end
271280

281+
if @require_ack_response && @ack_response_timeout > 0
282+
if IO.select([@con], nil, nil, @ack_response_timeout)
283+
raw_data = @con.recv(1024)
284+
285+
if raw_data.empty?
286+
raise "Closed connection"
287+
else
288+
response = MessagePack.unpack(raw_data)
289+
if response['ack'] != option['chunk']
290+
raise "ack in response and chunk id in sent data are different"
291+
end
292+
end
293+
end
294+
end
295+
272296
true
273297
end
274298

@@ -307,6 +331,10 @@ def set_last_error(e)
307331
# TODO: Check non GVL env
308332
@last_error[Thread.current.object_id] = e
309333
end
334+
335+
def generate_chunk
336+
Base64.encode64(([SecureRandom.random_number(1 << 32)] * 4).pack('NNNN')).chomp
337+
end
310338
end
311339
end
312340
end

0 commit comments

Comments
 (0)