Skip to content

Commit 8eac7ce

Browse files
authored
add headers reporting uncompressed size and doc count for bulk requests (#1217)
Adds two new headers to each bulk request: * "X-Elastic-Event-Count": number of actions / documents in that bulk request * "X-Elastic-Uncompressed-Request-Length": size in bytes of the request body before compression X-Elastic-Uncompressed-Request-Length is equal to Content-Length when compression is disabled.
1 parent 5c8092c commit 8eac7ce

File tree

7 files changed

+114
-10
lines changed

7 files changed

+114
-10
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 12.0.6
2+
- Add headers reporting uncompressed size and doc count for bulk requests [#1217](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1217)
3+
14
## 12.0.5
25
- [DOC] Fix link to Logstash DLQ docs [#1214](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1214)
36

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ module LogStash; module Outputs; class ElasticSearch;
2121
# We wound up agreeing that a number greater than 10 MiB and less than 100MiB
2222
# made sense. We picked one on the lowish side to not use too much heap.
2323
TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB
24-
24+
EVENT_COUNT_HEADER = "X-Elastic-Event-Count".freeze
25+
UNCOMPRESSED_LENGTH_HEADER = "X-Elastic-Uncompressed-Request-Length".freeze
2526

2627
class HttpClient
2728
attr_reader :client, :options, :logger, :pool, :action_count, :recv_count
@@ -143,7 +144,11 @@ def bulk(actions)
143144
:payload_size => stream_writer.pos,
144145
:content_length => body_stream.size,
145146
:batch_offset => (index + 1 - batch_actions.size))
146-
bulk_responses << bulk_send(body_stream, batch_actions)
147+
headers = {
148+
EVENT_COUNT_HEADER => batch_actions.size.to_s,
149+
UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s
150+
}
151+
bulk_responses << bulk_send(body_stream, batch_actions, headers)
147152
body_stream.truncate(0) && body_stream.seek(0)
148153
stream_writer = gzip_writer(body_stream) if compression_level?
149154
batch_actions.clear
@@ -159,7 +164,14 @@ def bulk(actions)
159164
:payload_size => stream_writer.pos,
160165
:content_length => body_stream.size,
161166
:batch_offset => (actions.size - batch_actions.size))
162-
bulk_responses << bulk_send(body_stream, batch_actions) if body_stream.size > 0
167+
168+
if body_stream.size > 0
169+
headers = {
170+
EVENT_COUNT_HEADER => batch_actions.size.to_s,
171+
UNCOMPRESSED_LENGTH_HEADER => stream_writer.pos.to_s
172+
}
173+
bulk_responses << bulk_send(body_stream, batch_actions, headers)
174+
end
163175

164176
body_stream.close unless compression_level?
165177
join_bulk_responses(bulk_responses)
@@ -179,8 +191,8 @@ def join_bulk_responses(bulk_responses)
179191
}
180192
end
181193

182-
def bulk_send(body_stream, batch_actions)
183-
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
194+
def bulk_send(body_stream, batch_actions, headers = {})
195+
params = compression_level? ? {:headers => headers.merge("Content-Encoding" => "gzip") } : { :headers => headers }
184196

185197
begin
186198
response = @pool.post(@bulk_path, params, body_stream.string)

logstash-output-elasticsearch.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-elasticsearch'
3-
s.version = '12.0.5'
3+
s.version = '12.0.6'
44
s.licenses = ['apache-2.0']
55
s.summary = "Stores logs in Elasticsearch"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/integration/outputs/compressed_indexing_spec.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
{
3737
"Content-Encoding" => "gzip",
3838
"Content-Type" => "application/json",
39-
'x-elastic-product-origin' => 'logstash-output-elasticsearch'
39+
'x-elastic-product-origin' => 'logstash-output-elasticsearch',
40+
'X-Elastic-Event-Count' => anything,
41+
'X-Elastic-Uncompressed-Request-Length' => anything,
4042
}
4143
}
4244

spec/integration/outputs/index_spec.rb

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,12 +215,22 @@ def curl_and_get_json_response(url, method: :get, retrieve_err_payload: false);
215215

216216
it "sets the correct content-type header" do
217217
expected_manticore_opts = {
218-
:headers => {"Content-Type" => "application/json", 'x-elastic-product-origin' => 'logstash-output-elasticsearch'},
218+
:headers => {
219+
"Content-Type" => "application/json",
220+
'x-elastic-product-origin' => 'logstash-output-elasticsearch',
221+
'X-Elastic-Event-Count' => anything,
222+
'X-Elastic-Uncompressed-Request-Length' => anything
223+
},
219224
:body => anything
220225
}
221226
if secure
222227
expected_manticore_opts = {
223-
:headers => {"Content-Type" => "application/json", 'x-elastic-product-origin' => 'logstash-output-elasticsearch'},
228+
:headers => {
229+
"Content-Type" => "application/json",
230+
'x-elastic-product-origin' => 'logstash-output-elasticsearch',
231+
'X-Elastic-Event-Count' => anything,
232+
'X-Elastic-Uncompressed-Request-Length' => anything
233+
},
224234
:body => anything,
225235
:auth => {
226236
:user => user,

spec/unit/outputs/elasticsearch/http_client_spec.rb

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,83 @@
270270

271271
end
272272
end
273+
context "the 'user-agent' header" do
274+
let(:pool) { double("pool") }
275+
let(:compression_level) { 6 }
276+
let(:base_options) { super().merge( :client_settings => {:compression_level => compression_level}) }
277+
let(:actions) { [
278+
["index", {:_id=>nil, :_index=>"logstash"}, {"message_1"=> message_1}],
279+
["index", {:_id=>nil, :_index=>"logstash"}, {"message_2"=> message_2}],
280+
["index", {:_id=>nil, :_index=>"logstash"}, {"message_3"=> message_3}],
281+
]}
282+
let(:message_1) { "hello" }
283+
let(:message_2_size) { 1_000 }
284+
let(:message_2) { SecureRandom.alphanumeric(message_2_size / 2 ) * 2 }
285+
let(:message_3_size) { 1_000 }
286+
let(:message_3) { "m" * message_3_size }
287+
let(:messages_size) { message_1.size + message_2.size + message_3.size }
288+
let(:action_overhead) { 42 + 16 + 2 } # header plus doc key size plus new line overhead per action
289+
290+
let(:response) do
291+
response = double("response")
292+
allow(response).to receive(:code).and_return(response)
293+
allow(response).to receive(:body).and_return({"errors" => false}.to_json)
294+
response
295+
end
296+
297+
before(:each) do
298+
subject.instance_variable_set("@pool", pool)
299+
end
300+
301+
it "carries bulk request's uncompressed size" do
302+
expect(pool).to receive(:post) do |path, params, body|
303+
headers = params.fetch(:headers, {})
304+
expect(headers["X-Elastic-Event-Count"]).to eq("3")
305+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (messages_size + (action_overhead * 3)).to_s
306+
end.and_return(response)
307+
308+
subject.send(:bulk, actions)
309+
end
310+
context "without compression" do
311+
let(:compression_level) { 0 }
312+
it "carries bulk request's uncompressed size" do
313+
expect(pool).to receive(:post) do |path, params, body|
314+
headers = params.fetch(:headers, {})
315+
expect(headers["X-Elastic-Event-Count"]).to eq("3")
316+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (messages_size + (action_overhead * 3)).to_s
317+
end.and_return(response)
318+
subject.send(:bulk, actions)
319+
end
320+
end
321+
322+
context "with compressed messages over 20MB" do
323+
let(:message_2_size) { 21_000_000 }
324+
it "carries bulk request's uncompressed size" do
325+
# only the first, tiny, message is sent first
326+
expect(pool).to receive(:post) do |path, params, body|
327+
headers = params.fetch(:headers, {})
328+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_1.size + action_overhead).to_s
329+
expect(headers["X-Elastic-Event-Count"]).to eq("1")
330+
end.and_return(response)
331+
332+
# huge message_2 is sent afterwards alone
333+
expect(pool).to receive(:post) do |path, params, body|
334+
headers = params.fetch(:headers, {})
335+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_2.size + action_overhead).to_s
336+
expect(headers["X-Elastic-Event-Count"]).to eq("1")
337+
end.and_return(response)
338+
339+
# finally medium message_3 is sent alone as well
340+
expect(pool).to receive(:post) do |path, params, body|
341+
headers = params.fetch(:headers, {})
342+
expect(headers["X-Elastic-Uncompressed-Request-Length"]).to eq (message_3.size + action_overhead).to_s
343+
expect(headers["X-Elastic-Event-Count"]).to eq("1")
344+
end.and_return(response)
345+
346+
subject.send(:bulk, actions)
347+
end
348+
end
349+
end
273350
end
274351

275352
describe "sniffing" do

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -770,7 +770,7 @@
770770
end
771771

772772
before(:each) do
773-
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array)) do |stream, actions|
773+
allow(subject.client).to receive(:bulk_send).with(instance_of(StringIO), instance_of(Array), instance_of(Hash)) do |stream, actions, headers|
774774
expect( stream.string ).to include '"foo":"bar1"'
775775
expect( stream.string ).to include '"foo":"bar2"'
776776
end.and_return(bulk_response, {"errors"=>false}) # let's make it go away (second call) to not retry indefinitely

0 commit comments

Comments
 (0)