Skip to content

Commit 419ead3

Browse files
committed
add headers reporting uncompressed size and doc count
1 parent fc281c3 commit 419ead3

File tree

2 files changed

+96
-5
lines changed

2 files changed

+96
-5
lines changed

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ module LogStash; module Outputs; class ElasticSearch;
2121
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
2222
# made sense. We picked one on the lowish side to not use too much heap.
2323
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB
24-
24+
EVENT_COUNT_HEADER = "X-Elastic-Event-Count".freeze
25+
UNCOMPRESSED_LENGTH_HEADER = "X-Elastic-Uncompressed-Request-Length".freeze
2526

2627
class HttpClient
2728
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
@@ -143,7 +144,11 @@ def bulk(actions)
143144
:payload_size => stream_writer.pos,
144145
:content_length => body_stream.size,
145146
:batch_offset => (index + 1 - batch_actions.size))
146-
bulk_responses << bulk_send(body_stream, batch_actions)
147+
headers = {
148+
EVENT_COUNT_HEADER => batch_actions.size,
149+
UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos
150+
}
151+
bulk_responses << bulk_send(body_stream, batch_actions, headers)
147152
body_stream.truncate(0) && body_stream.seek(0)
148153
stream_writer = gzip_writer(body_stream) if compression_level?
149154
batch_actions.clear
@@ -159,7 +164,14 @@ def bulk(actions)
159164
:payload_size => stream_writer.pos,
160165
:content_length => body_stream.size,
161166
:batch_offset => (actions.size - batch_actions.size))
162-
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
167+
168+
if body_stream.size > 0
169+
headers = {
170+
EVENT_COUNT_HEADER => batch_actions.size,
171+
UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos
172+
}
173+
bulk_responses << bulk_send(body_stream, batch_actions, headers)
174+
end
163175

164176
body_stream.close unless compression_level?
165177
join_bulk_responses(bulk_responses)
@@ -179,8 +191,8 @@ def join_bulk_responses(bulk_responses)
179191
}
180192
end
181193

182-
def bulk_send(body_stream, batch_actions)
183-
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
194+
def bulk_send(body_stream, batch_actions, headers = {})
195+
params = compression_level? ? {:headers => headers.merge("Content-Encoding" => "gzip") } : { :headers => headers }
184196

185197
begin
186198
response = @pool.post(@bulk_path, params, body_stream.string)

spec/unit/outputs/elasticsearch/http_client_spec.rb

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,85 @@
270270

271271
end
272272
end
273+
context "the 'user-agent' header" do
274+
let(:pool) { double("pool") }
275+
let(:compression_level) { 6 }
276+
let(:base_options) { super().merge( :client_settings => {:compression_level => compression_level}) }
277+
let(:actions) { [
278+
["index", {:_id=>nil, :_index=>"logstash"}, {"message_1"=> message_1}],
279+
["index", {:_id=>nil, :_index=>"logstash"}, {"message_2"=> message_2}],
280+
["index", {:_id=>nil, :_index=>"logstash"}, {"message_3"=> message_3}],
281+
]}
282+
let(:message_1) { "hello" }
283+
let(:message_2_size) { 1_000 }
284+
let(:message_2) { SecureRandom.alphanumeric(message_2_size / 2 ) * 2 }
285+
let(:message_3_size) { 1_000 }
286+
let(:message_3) { "m" * message_3_size }
287+
let(:messages_size) { message_1.size + message_2.size + message_3.size }
288+
let(:action_overhead) { 42 + 16 + 2 } # header plus doc key size plus new line overhead per action
289+
290+
let(:response) do
291+
response = double("response")
292+
allow(response).to receive(:code).and_return(response)
293+
allow(response).to receive(:body).and_return({"errors" => false}.to_json)
294+
response
295+
end
296+
297+
before(:each) do
298+
subject.instance_variable_set("@pool", pool)
299+
end
300+
301+
it "carries bulk request's uncompressed size" do
302+
expect(pool).to receive(:post) do |path, params, body|
303+
headers = params.fetch(:headers, {})
304+
expect(headers["X-Elastic-Event-Count"]).to be(3)
305+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq messages_size + (action_overhead * 3)
306+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to be > body.size
307+
end.and_return(response)
308+
309+
subject.send(:bulk, actions)
310+
end
311+
context "without compression" do
312+
let(:compression_level) { 0 }
313+
it "carries bulk request's uncompressed size" do
314+
expect(pool).to receive(:post) do |path, params, body|
315+
headers = params.fetch(:headers, {})
316+
expect(headers["X-Elastic-Event-Count"]).to be(3)
317+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq messages_size + (action_overhead * 3)
318+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq body.size
319+
end.and_return(response)
320+
subject.send(:bulk, actions)
321+
end
322+
end
323+
324+
context "with compressed messages over 20MB" do
325+
let(:message_2_size) { 21_000_000 }
326+
it "carries bulk request's uncompressed size" do
327+
# only the first, tiny, message is sent first
328+
expect(pool).to receive(:post) do |path, params, body|
329+
headers = params.fetch(:headers, {})
330+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to be == message_1.size + action_overhead
331+
expect(headers["X-Elastic-Event-Count"]).to be(1)
332+
end.and_return(response)
333+
334+
# huge message_2 is sent afterwards alone
335+
expect(pool).to receive(:post) do |path, params, body|
336+
headers = params.fetch(:headers, {})
337+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to be >= message_2.size + action_overhead
338+
expect(headers["X-Elastic-Event-Count"]).to be(1)
339+
end.and_return(response)
340+
341+
# finally medium message_3 is sent alone as well
342+
expect(pool).to receive(:post) do |path, params, body|
343+
headers = params.fetch(:headers, {})
344+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to be >= message_3.size + action_overhead
345+
expect(headers["X-Elastic-Event-Count"]).to be(1)
346+
end.and_return(response)
347+
348+
subject.send(:bulk, actions)
349+
end
350+
end
351+
end
273352
end
274353

275354
describe "sniffing" do

0 commit comments

Comments
 (0)