Skip to content

Commit cfc2b9b

Browse files
jsvddonoghuc
authored andcommitted
Properly handle 413 Payload Too Large errors
Previously when Elasticsearch responds with a 413 (Payload Too Large) status, the manticore adapter raises an error before the response can be processed by the bulk_send error handling. This commit refactors the way `BadErrorResponse` codes are handled. Previously we had logic in the manticore adaptor which special cased raising errors on some codes. This commit refactors such that the adaptor raises on any error status and the caller is now responsible for special case handling the code.
1 parent 05d5870 commit cfc2b9b

File tree

5 files changed

+60
-45
lines changed

5 files changed

+60
-45
lines changed

docs/index.asciidoc

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,22 @@ This plugin uses the Elasticsearch bulk API to optimize its imports into Elastic
196196
either partial or total failures. The bulk API sends batches of requests to an HTTP endpoint. Error codes for the HTTP
197197
request are handled differently than error codes for individual documents.
198198

199-
HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely.
199+
200+
HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely,
201+
including 413 (Payload Too Large) responses.
202+
203+
If you want to handle large payloads differently, you can configure 413 responses to go to the Dead Letter Queue instead:
204+
205+
[source,ruby]
206+
-----
207+
output {
208+
elasticsearch {
209+
hosts => ["localhost:9200"]
210+
dlq_custom_codes => [413] # Send 413 errors to DLQ instead of retrying
211+
}
212+
-----
213+
214+
This will capture oversized payloads in the DLQ for analysis rather than retrying them.
200215

201216
The following document errors are handled as follows:
202217

lib/logstash/outputs/elasticsearch/http_client.rb

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses)
182182
def bulk_send(body_stream, batch_actions)
183183
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}
184184

185-
response = @pool.post(@bulk_path, params, body_stream.string)
186-
187-
@bulk_response_metrics.increment(response.code.to_s)
188-
189-
case response.code
190-
when 200 # OK
191-
LogStash::Json.load(response.body)
192-
when 413 # Payload Too Large
185+
begin
186+
response = @pool.post(@bulk_path, params, body_stream.string)
187+
@bulk_response_metrics.increment(response.code.to_s)
188+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
189+
@bulk_response_metrics.increment(e.response_code.to_s)
190+
raise e unless e.response_code == 413
191+
# special handling for 413, treat it as a document level issue
193192
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
194-
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
195-
else
196-
url = ::LogStash::Util::SafeURI.new(response.final_url)
197-
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
198-
response.code, url, body_stream.to_s, response.body
199-
)
193+
return emulate_batch_error_response(batch_actions, 413, 'payload_too_large')
194+
rescue => e # it may be a network issue instead, re-raise
195+
raise e
200196
end
197+
198+
LogStash::Json.load(response.body)
201199
end
202200

203201
def emulate_batch_error_response(actions, http_code, reason)
@@ -411,6 +409,9 @@ def host_to_url(h)
411409
def exists?(path, use_get=false)
412410
response = use_get ? @pool.get(path) : @pool.head(path)
413411
response.code >= 200 && response.code <= 299
412+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
413+
return true if e.code == 404
414+
raise e
414415
end
415416

416417
def template_exists?(template_endpoint, name)
@@ -420,7 +421,10 @@ def template_exists?(template_endpoint, name)
420421
def template_put(template_endpoint, name, template)
421422
path = "#{template_endpoint}/#{name}"
422423
logger.info("Installing Elasticsearch template", name: name)
423-
@pool.put(path, nil, LogStash::Json.dump(template))
424+
response = @pool.put(path, nil, LogStash::Json.dump(template))
425+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
426+
return response if e.code == 404
427+
raise e
424428
end
425429

426430
# ILM methods
@@ -432,17 +436,15 @@ def rollover_alias_exists?(name)
432436

433437
# Create a new rollover alias
434438
def rollover_alias_put(alias_name, alias_definition)
435-
begin
436-
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
437-
logger.info("Created rollover alias", name: alias_name)
438-
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
439-
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
440-
if e.response_code == 400
441-
logger.info("Rollover alias already exists, skipping", name: alias_name)
442-
return
443-
end
444-
raise e
439+
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
440+
logger.info("Created rollover alias", name: alias_name)
441+
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
442+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
443+
if e.response_code == 400
444+
logger.info("Rollover alias already exists, skipping", name: alias_name)
445+
return
445446
end
447+
raise e
446448
end
447449

448450
def get_xpack_info

lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil)
7676
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
7777
end
7878

79-
# 404s are excluded because they are valid codes in the case of
80-
# template installation. We might need a better story around this later
81-
# but for our current purposes this is correct
8279
code = resp.code
83-
if code < 200 || code > 299 && code != 404
80+
if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret
8481
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
8582
end
8683

lib/logstash/outputs/elasticsearch/http_client/pool.rb

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -253,13 +253,11 @@ def get_license(url)
253253
def health_check_request(url)
254254
logger.debug("Running health check to see if an Elasticsearch connection is working",
255255
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
256-
begin
257-
response = perform_request_to_url(url, :head, @healthcheck_path)
258-
return response, nil
259-
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
260-
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
261-
return nil, e
262-
end
256+
response = perform_request_to_url(url, :head, @healthcheck_path)
257+
return response, nil
258+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
259+
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
260+
return nil, e
263261
end
264262

265263
def healthcheck!(register_phase = true)
@@ -312,13 +310,11 @@ def healthcheck!(register_phase = true)
312310
end
313311

314312
def get_root_path(url, params={})
315-
begin
316-
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
317-
return resp, nil
318-
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
319-
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
320-
return nil, e
321-
end
313+
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
314+
return resp, nil
315+
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
316+
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
317+
return nil, e
322318
end
323319

324320
def test_serverless_connection(url, root_response)

spec/unit/outputs/elasticsearch_spec.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -915,7 +915,12 @@
915915
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
916916
if body.length > max_bytes
917917
max_bytes *= 2 # ensure a successful retry
918-
double("Response", :code => 413, :body => "")
918+
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
919+
413,
920+
"test-url",
921+
body,
922+
""
923+
)
919924
else
920925
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
921926
end

0 commit comments

Comments
 (0)