Skip to content

Commit 703157a

Browse files
committed
Add require_ack_response option
1 parent 3bae5cd commit 703157a

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ def initialize(tag_prefix = nil, *args)
5858
@port = options[:port]
5959
@socket_path = options[:socket_path]
6060

61+
@require_ack_response = options[:require_ack_response]
62+
@ack_response_timeout = options[:ack_response_timeout] || 190
63+
6164
@mon = Monitor.new
6265
@pending = nil
6366
@connect_error_history = []
@@ -108,7 +111,7 @@ def close
108111
if @pending
109112
begin
110113
@pending.each do |tag, record|
111-
send_data([tag, record].to_msgpack)
114+
send_data(tag, record)
112115
end
113116
rescue => e
114117
set_last_error(e)
@@ -184,7 +187,7 @@ def write(tag, time, map)
184187

185188
begin
186189
@pending.each do |tag, record|
187-
send_data([tag, record].to_msgpack)
190+
send_data(tag, record)
188191
end
189192
@pending = nil
190193
true
@@ -202,11 +205,17 @@ def write(tag, time, map)
202205
}
203206
end
204207

205-
def send_data(data)
208+
def send_data(tag, record)
206209
unless connect?
207210
connect!
208211
end
209-
@con.write data
212+
if @require_ack_response
213+
option = {}
214+
option['chunk'] = generate_chunk
215+
@con.write [tag, record, option].to_msgpack
216+
else
217+
@con.write [tag, record].to_msgpack
218+
end
210219
#while true
211220
# puts "sending #{data.length} bytes"
212221
# if data.length > 32*1024
@@ -221,6 +230,21 @@ def send_data(data)
221230
# data = data[n..-1]
222231
#end
223232

233+
if @require_ack_response && @ack_response_timeout > 0
234+
if IO.select([@con], nil, nil, @ack_response_timeout)
235+
raw_data = @con.recv(1024)
236+
237+
if raw_data.empty?
238+
raise "Closed connection"
239+
else
240+
response = MessagePack.unpack(raw_data)
241+
if response['ack'] != option['chunk']
242+
raise "ack in response and chunk id in sent data are different"
243+
end
244+
end
245+
end
246+
end
247+
224248
true
225249
end
226250

@@ -259,6 +283,10 @@ def set_last_error(e)
259283
# TODO: Check non GVL env
260284
@last_error[Thread.current.object_id] = e
261285
end
286+
287+
def generate_chunk
288+
Base64.encode64(([SecureRandom.random_number(1 << 32)] * 4).pack('NNNN')).chomp
289+
end
262290
end
263291
end
264292
end

0 commit comments

Comments
 (0)