Skip to content

Commit d9eab0f

Browse files
committed
Add require_ack_response option
1 parent 8d8de10 commit d9eab0f

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ def initialize(tag_prefix = nil, *args)
5757
@host = options[:host]
5858
@port = options[:port]
5959

60+
@require_ack_response = options[:require_ack_response]
61+
@ack_response_timeout = options[:ack_response_timeout] || 190
62+
6063
@mon = Monitor.new
6164
@pending = nil
6265
@connect_error_history = []
@@ -108,7 +111,7 @@ def close
108111
if @pending
109112
begin
110113
@pending.each do |tag, record|
111-
send_data([tag, record].to_msgpack)
114+
send_data(tag, record)
112115
end
113116
rescue => e
114117
set_last_error(e)
@@ -171,7 +174,7 @@ def write(tag, time, map)
171174

172175
begin
173176
@pending.each do |tag, record|
174-
send_data([tag, record].to_msgpack)
177+
send_data(tag, record)
175178
end
176179
@pending = nil
177180
true
@@ -189,11 +192,17 @@ def write(tag, time, map)
189192
}
190193
end
191194

192-
def send_data(data)
195+
def send_data(tag, record)
193196
unless connect?
194197
connect!
195198
end
196-
@con.write data
199+
if @require_ack_response
200+
option = {}
201+
option['chunk'] = generate_chunk
202+
@con.write [tag, record, option].to_msgpack
203+
else
204+
@con.write [tag, record].to_msgpack
205+
end
197206
#while true
198207
# puts "sending #{data.length} bytes"
199208
# if data.length > 32*1024
@@ -208,6 +217,21 @@ def send_data(data)
208217
# data = data[n..-1]
209218
#end
210219

220+
if @require_ack_response && @ack_response_timeout > 0
221+
if IO.select([@con], nil, nil, @ack_response_timeout)
222+
raw_data = @con.recv(1024)
223+
224+
if raw_data.empty?
225+
raise "Closed connection"
226+
else
227+
response = MessagePack.unpack(raw_data)
228+
if response['ack'] != option['chunk']
229+
raise "ack in response and chunk id in sent data are different"
230+
end
231+
end
232+
end
233+
end
234+
211235
true
212236
end
213237

@@ -246,6 +270,10 @@ def set_last_error(e)
246270
# TODO: Check non GVL env
247271
@last_error[Thread.current.object_id] = e
248272
end
273+
274+
def generate_chunk
275+
Base64.encode64(([SecureRandom.random_number(1 << 32)] * 4).pack('NNNN')).chomp
276+
end
249277
end
250278
end
251279
end

0 commit comments

Comments
 (0)