Skip to content

Commit c05c2ee

Browse files
committed
Add require_ack_response option
1 parent 3012f02 commit c05c2ee

File tree

1 file changed

+32
-4
lines changed

1 file changed

+32
-4
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,9 @@ def initialize(tag_prefix = nil, *args)
8989
end
9090
@packer = @factory.packer
9191

92+
@require_ack_response = options[:require_ack_response]
93+
@ack_response_timeout = options[:ack_response_timeout] || 190
94+
9295
@mon = Monitor.new
9396
@pending = nil
9497
@connect_error_history = []
@@ -143,7 +146,7 @@ def close
143146
if @pending
144147
begin
145148
@pending.each do |tag, record|
146-
send_data([tag, record].to_msgpack)
149+
send_data(tag, record)
147150
end
148151
rescue => e
149152
set_last_error(e)
@@ -231,7 +234,7 @@ def write(tag, time, map)
231234

232235
begin
233236
@pending.each do |tag, record|
234-
send_data([tag, record].to_msgpack)
237+
send_data(tag, record)
235238
end
236239
@pending = nil
237240
true
@@ -249,11 +252,17 @@ def write(tag, time, map)
249252
}
250253
end
251254

252-
def send_data(data)
255+
def send_data(tag, record)
253256
unless connect?
254257
connect!
255258
end
256-
@con.write data
259+
if @require_ack_response
260+
option = {}
261+
option['chunk'] = generate_chunk
262+
@con.write [tag, record, option].to_msgpack
263+
else
264+
@con.write [tag, record].to_msgpack
265+
end
257266
#while true
258267
# puts "sending #{data.length} bytes"
259268
# if data.length > 32*1024
@@ -268,6 +277,21 @@ def send_data(data)
268277
# data = data[n..-1]
269278
#end
270279

280+
if @require_ack_response && @ack_response_timeout > 0
281+
if IO.select([@con], nil, nil, @ack_response_timeout)
282+
raw_data = @con.recv(1024)
283+
284+
if raw_data.empty?
285+
raise "Closed connection"
286+
else
287+
response = MessagePack.unpack(raw_data)
288+
if response['ack'] != option['chunk']
289+
raise "ack in response and chunk id in sent data are different"
290+
end
291+
end
292+
end
293+
end
294+
271295
true
272296
end
273297

@@ -306,6 +330,10 @@ def set_last_error(e)
306330
# TODO: Check non GVL env
307331
@last_error[Thread.current.object_id] = e
308332
end
333+
334+
def generate_chunk
335+
Base64.encode64(([SecureRandom.random_number(1 << 32)] * 4).pack('NNNN')).chomp
336+
end
309337
end
310338
end
311339
end

0 commit comments

Comments
 (0)