From 234089c059edf62db6344ce63a86b692a5657153 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 21 Oct 2025 10:23:37 -0400 Subject: [PATCH 1/7] Rework reduction failure to be successful transport response --- muted-tests.yml | 3 - .../SearchQueryThenFetchAsyncAction.java | 173 ++++++++++-------- 2 files changed, 94 insertions(+), 82 deletions(-) diff --git a/muted-tests.yml b/muted-tests.yml index 041655779e4c1..3edb78f3c1ae2 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -315,9 +315,6 @@ tests: - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: test {p0=mtermvectors/10_basic/Tests catching other exceptions per item} issue: https://github.com/elastic/elasticsearch/issues/122414 -- class: org.elasticsearch.search.SearchWithRejectionsIT - method: testOpenContextsAfterRejections - issue: https://github.com/elastic/elasticsearch/issues/130821 - class: org.elasticsearch.packaging.test.DockerTests method: test090SecurityCliPackaging issue: https://github.com/elastic/elasticsearch/issues/131107 diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 0dd3ef56362da..528e900ee23b4 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -48,13 +48,11 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; -import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.BytesTransportResponse; import org.elasticsearch.transport.LeakTracker; -import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportChannel; @@ -225,13 +223,22 @@ public static final class NodeQueryResponse extends TransportResponse { private final RefCounted refCounted = LeakTracker.wrap(new SimpleRefCounted()); private final Object[] results; + private final Exception reductionFailure; private final SearchPhaseController.TopDocsStats topDocsStats; private final QueryPhaseResultConsumer.MergeResult mergeResult; public NodeQueryResponse(StreamInput in) throws IOException { this.results = in.readArray(i -> i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); - this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + boolean hasReductionFailure = in.readBoolean(); + if (hasReductionFailure) { + this.reductionFailure = in.readException(); + this.mergeResult = null; + this.topDocsStats = null; + } else { + this.reductionFailure = null; + this.mergeResult = QueryPhaseResultConsumer.MergeResult.readFrom(in); + this.topDocsStats = SearchPhaseController.TopDocsStats.readFrom(in); + } } // public for tests @@ -239,6 +246,10 @@ public Object[] getResults() { return results; } + Exception getReductionFailure() { + return reductionFailure; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(results.length); @@ -249,7 +260,13 @@ public void writeTo(StreamOutput out) throws IOException { writePerShardResult(out, (QuerySearchResult) result); } } - writeMergeResult(out, mergeResult, topDocsStats); + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } } @Override @@ -502,7 +519,12 @@ public Executor executor() { @Override public void handleResponse(NodeQueryResponse response) { if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + Exception reductionFailure = response.getReductionFailure(); + if (reductionFailure != null) { + queryPhaseResultConsumer.failure.compareAndSet(null, reductionFailure); + } else { + queryPhaseResultConsumer.addBatchedPartialResult(response.topDocsStats, response.mergeResult); + } } for (int i = 0; i < response.results.length; i++) { var s = request.shards.get(i); @@ -526,21 +548,7 @@ public void handleResponse(NodeQueryResponse response) { public void handleException(TransportException e) { Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); - if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { - // two possible special cases here where we do not want to fail the phase: - // failure to send out the request -> handle things the same way a shard would fail with unbatched execution - // as this could be a transient failure and partial results we may have are still valid - // cancellation of the whole batched request on the remote -> maybe we timed out or so, partial results may - // still be valid - onNodeQueryFailure(e, request, routing); - } else { - // Remote failure that wasn't due to networking or cancellation means that the data node was unable to reduce - // its local results. Failure to reduce always fails the phase without exception so we fail the phase here. - if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { - queryPhaseResultConsumer.failure.compareAndSet(null, cause); - } - onPhaseFailure(getName(), "", cause); - } + onNodeQueryFailure(e, request, routing); } }); }); @@ -801,61 +809,19 @@ void onShardDone() { if (countDown.countDown() == false) { return; } - RecyclerBytesStreamOutput out = null; - boolean success = false; var channelListener = new ChannelActionListener<>(channel); + RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); + out.setTransportVersion(channel.getVersion()); try (queryPhaseResultConsumer) { - var failure = queryPhaseResultConsumer.failure.get(); - if (failure != null) { - handleMergeFailure(failure, channelListener, namedWriteableRegistry); - return; - } - final QueryPhaseResultConsumer.MergeResult mergeResult; - try { - mergeResult = Objects.requireNonNullElse( - queryPhaseResultConsumer.consumePartialMergeResultDataNode(), - EMPTY_PARTIAL_MERGE_RESULT - ); - } catch (Exception e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); - return; - } - // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, - // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other - // indices without a roundtrip to the coordinating node - final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); - if (mergeResult.reducedTopDocs() != null) { - for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { - final int localIndex = scoreDoc.shardIndex; - scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; - relevantShardIndices.set(localIndex); - } - } - final int resultCount = queryPhaseResultConsumer.getNumShards(); - out = dependencies.transportService.newNetworkBytesStream(); - out.setTransportVersion(channel.getVersion()); - try { - out.writeVInt(resultCount); - for (int i = 0; i < resultCount; i++) { - var result = queryPhaseResultConsumer.results.get(i); - if (result == null) { - NodeQueryResponse.writePerShardException(out, failures.remove(i)); - } else { - // free context id and remove it from the result right away in case we don't need it anymore - maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); - NodeQueryResponse.writePerShardResult(out, result); - } - } - NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); - success = true; - } catch (IOException e) { - handleMergeFailure(e, channelListener, namedWriteableRegistry); - return; - } - } finally { - if (success == false && out != null) { - out.close(); + Exception reductionFailure = queryPhaseResultConsumer.failure.get(); + if (reductionFailure == null) { + writeSuccessfulResponse(out); + } else { + writeReductionFailureResponse(out, reductionFailure); } + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); } ActionListener.respondAndRelease( channelListener, @@ -863,6 +829,60 @@ void onShardDone() { ); } + private void writeSuccessfulResponse(RecyclerBytesStreamOutput out) throws IOException { + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = Objects.requireNonNullElse( + queryPhaseResultConsumer.consumePartialMergeResultDataNode(), + EMPTY_PARTIAL_MERGE_RESULT + ); + } catch (Exception e) { + writeReductionFailureResponse(out, e); + return; + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(false); // does not have a reduction failure + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + } + + private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Exception reductionFailure) throws IOException { + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + NodeQueryResponse.writePerShardResult(out, result); + } + } + out.writeBoolean(true); // does have a reduction failure + out.writeException(reductionFailure); + releaseAllResultsContexts(); + } + private void maybeFreeContext( SearchPhaseResult result, BitSet relevantShardIndices, @@ -881,11 +901,7 @@ && isPartOfPIT(searchRequest.searchRequest, q.getContextId(), namedWriteableRegi } } - private void handleMergeFailure( - Exception e, - ChannelActionListener channelListener, - NamedWriteableRegistry namedWriteableRegistry - ) { + private void releaseAllResultsContexts() { queryPhaseResultConsumer.getSuccessfulResults() .forEach( searchPhaseResult -> releaseLocalContext( @@ -895,7 +911,6 @@ private void handleMergeFailure( namedWriteableRegistry ) ); - channelListener.onFailure(e); } void consumeResult(QuerySearchResult queryResult) { From bf76a654de4e47559217b7158ee9996bf292cafb Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 21 Oct 2025 12:11:38 -0400 Subject: [PATCH 2/7] Update docs/changelog/136889.yaml --- docs/changelog/136889.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 docs/changelog/136889.yaml diff --git a/docs/changelog/136889.yaml b/docs/changelog/136889.yaml new file mode 100644 index 0000000000000..8888eeb316b6c --- /dev/null +++ b/docs/changelog/136889.yaml @@ -0,0 +1,6 @@ +pr: 136889 +summary: Remove early phase failure in batched +area: Search +type: bug +issues: + - 134151 From 7540aa964b9a47c6afed6e7e4639e4ce88be1b37 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Wed, 22 Oct 2025 10:48:24 -0400 Subject: [PATCH 3/7] Set up new transport version --- .../SearchQueryThenFetchAsyncAction.java | 18 ++++++++++++------ ...esponse_might_include_reduction_failure.csv | 1 + .../resources/transport/upper_bounds/9.3.csv | 2 +- 3 files changed, 14 insertions(+), 7 deletions(-) create mode 100644 server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 528e900ee23b4..420c4a5dd7cf2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -82,6 +82,9 @@ public class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction i.readBoolean() ? new QuerySearchResult(i) : i.readException(), Object[]::new); - boolean hasReductionFailure = in.readBoolean(); - if (hasReductionFailure) { + if (in.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) && in.readBoolean()) { this.reductionFailure = in.readException(); this.mergeResult = null; this.topDocsStats = null; @@ -260,10 +262,14 @@ public void writeTo(StreamOutput out) throws IOException { writePerShardResult(out, (QuerySearchResult) result); } } - boolean hasReductionFailure = reductionFailure != null; - out.writeBoolean(hasReductionFailure); - if (hasReductionFailure) { - out.writeException(reductionFailure); + if (out.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE)) { + boolean hasReductionFailure = reductionFailure != null; + out.writeBoolean(hasReductionFailure); + if (hasReductionFailure) { + out.writeException(reductionFailure); + } else { + writeMergeResult(out, mergeResult, topDocsStats); + } } else { writeMergeResult(out, mergeResult, topDocsStats); } diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv new file mode 100644 index 0000000000000..773be2c2c150c --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -0,0 +1 @@ +9198000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 0b1b264125525..8e8baa42f7556 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -dimension_values,9197000 +batched_response_might_include_reduction_failure,9198000 From c48633cc460a9bedd8754c919c56f417a618b2be Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 22 Oct 2025 14:55:49 +0000 Subject: [PATCH 4/7] [CI] Update transport version definitions --- .../batched_response_might_include_reduction_failure.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.1.csv | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv index 773be2c2c150c..e161c25dd4b40 100644 --- a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -1 +1 @@ -9198000 +9198000,9112011 diff --git a/server/src/main/resources/transport/upper_bounds/9.1.csv b/server/src/main/resources/transport/upper_bounds/9.1.csv index 2b28f469f01a5..1dc617cb57ee0 100644 --- a/server/src/main/resources/transport/upper_bounds/9.1.csv +++ b/server/src/main/resources/transport/upper_bounds/9.1.csv @@ -1 +1 @@ -initial_9.1.6,9112010 +batched_response_might_include_reduction_failure,9112011 From d9d0b7fd478dea57d6d442ea6ee8cb092271b559 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 22 Oct 2025 16:43:46 +0000 Subject: [PATCH 5/7] [CI] Update transport version definitions --- .../batched_response_might_include_reduction_failure.csv | 2 +- server/src/main/resources/transport/upper_bounds/9.2.csv | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv index e161c25dd4b40..3d196b1ce5c78 100644 --- a/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv +++ b/server/src/main/resources/transport/definitions/referable/batched_response_might_include_reduction_failure.csv @@ -1 +1 @@ -9198000,9112011 +9198000,9185003,9112011 diff --git a/server/src/main/resources/transport/upper_bounds/9.2.csv b/server/src/main/resources/transport/upper_bounds/9.2.csv index cd7ce4b3b4fa8..d9fa3f0338a0c 100644 --- a/server/src/main/resources/transport/upper_bounds/9.2.csv +++ b/server/src/main/resources/transport/upper_bounds/9.2.csv @@ -1 +1 @@ -dimension_values,9185002 +batched_response_might_include_reduction_failure,9185003 From 657c31b348270c6e62c1098dd6ba9610246fe30b Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 28 Oct 2025 12:40:43 -0400 Subject: [PATCH 6/7] Ensure bwc --- .../SearchQueryThenFetchAsyncAction.java | 107 ++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 420c4a5dd7cf2..5f843a9350891 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -48,11 +48,13 @@ import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.tasks.CancellableTask; import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskCancelledException; import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.AbstractTransportRequest; import org.elasticsearch.transport.BytesTransportResponse; import org.elasticsearch.transport.LeakTracker; +import org.elasticsearch.transport.SendRequestTransportException; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportActionProxy; import org.elasticsearch.transport.TransportChannel; @@ -552,10 +554,40 @@ public void handleResponse(NodeQueryResponse response) { @Override public void handleException(TransportException e) { + if (connection.getTransportVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcHandleException(e); + return; + } Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); logger.debug("handling node search exception coming from [" + nodeId + "]", cause); onNodeQueryFailure(e, request, routing); } + + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + private void bwcHandleException(TransportException e) { + Exception cause = (Exception) ExceptionsHelper.unwrapCause(e); + logger.debug("handling node search exception coming from [" + nodeId + "]", cause); + if (e instanceof SendRequestTransportException || cause instanceof TaskCancelledException) { + // two possible special cases here where we do not want to fail the phase: + // failure to send out the request -> handle things the same way a shard would fail with unbatched execution + // as this could be a transient failure and partial results we may have are still valid + // cancellation of the whole batched request on the remote -> maybe we timed out or so, partial results may + // still be valid + onNodeQueryFailure(e, request, routing); + } else { + // Remote failure that wasn't due to networking or cancellation means that the data node was unable to reduce + // its local results. Failure to reduce always fails the phase without exception so we fail the phase here. + if (results instanceof QueryPhaseResultConsumer queryPhaseResultConsumer) { + queryPhaseResultConsumer.failure.compareAndSet(null, cause); + } + onPhaseFailure(getName(), "", cause); + } + onNodeQueryFailure(e, request, routing); + } }); }); } @@ -815,6 +847,10 @@ void onShardDone() { if (countDown.countDown() == false) { return; } + if (channel.getVersion().supports(BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE) == false) { + bwcRespond(); + return; + } var channelListener = new ChannelActionListener<>(channel); RecyclerBytesStreamOutput out = dependencies.transportService.newNetworkBytesStream(); out.setTransportVersion(channel.getVersion()); @@ -889,6 +925,77 @@ private void writeReductionFailureResponse(RecyclerBytesStreamOutput out, Except releaseAllResultsContexts(); } + /** + * This code is strictly for _snapshot_ backwards compatibility. The feature flag + * {@link SearchService#BATCHED_QUERY_PHASE_FEATURE_FLAG} was not turned on when the transport version + * {@link SearchQueryThenFetchAsyncAction#BATCHED_RESPONSE_MIGHT_INCLUDE_REDUCTION_FAILURE} was introduced. + */ + void bwcRespond() { + RecyclerBytesStreamOutput out = null; + boolean success = false; + var channelListener = new ChannelActionListener<>(channel); + try (queryPhaseResultConsumer) { + var failure = queryPhaseResultConsumer.failure.get(); + if (failure != null) { + releaseAllResultsContexts(); + channelListener.onFailure(failure); + return; + } + final QueryPhaseResultConsumer.MergeResult mergeResult; + try { + mergeResult = Objects.requireNonNullElse( + queryPhaseResultConsumer.consumePartialMergeResultDataNode(), + EMPTY_PARTIAL_MERGE_RESULT + ); + } catch (Exception e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } + // translate shard indices to those on the coordinator so that it can interpret the merge result without adjustments, + // also collect the set of indices that may be part of a subsequent fetch operation here so that we can release all other + // indices without a roundtrip to the coordinating node + final BitSet relevantShardIndices = new BitSet(searchRequest.shards.size()); + if (mergeResult.reducedTopDocs() != null) { + for (ScoreDoc scoreDoc : mergeResult.reducedTopDocs().scoreDocs) { + final int localIndex = scoreDoc.shardIndex; + scoreDoc.shardIndex = searchRequest.shards.get(localIndex).shardIndex; + relevantShardIndices.set(localIndex); + } + } + final int resultCount = queryPhaseResultConsumer.getNumShards(); + out = dependencies.transportService.newNetworkBytesStream(); + out.setTransportVersion(channel.getVersion()); + try { + out.writeVInt(resultCount); + for (int i = 0; i < resultCount; i++) { + var result = queryPhaseResultConsumer.results.get(i); + if (result == null) { + NodeQueryResponse.writePerShardException(out, failures.remove(i)); + } else { + // free context id and remove it from the result right away in case we don't need it anymore + maybeFreeContext(result, relevantShardIndices, namedWriteableRegistry); + NodeQueryResponse.writePerShardResult(out, result); + } + } + NodeQueryResponse.writeMergeResult(out, mergeResult, queryPhaseResultConsumer.topDocsStats); + success = true; + } catch (IOException e) { + releaseAllResultsContexts(); + channelListener.onFailure(e); + return; + } + } finally { + if (success == false && out != null) { + out.close(); + } + } + ActionListener.respondAndRelease( + channelListener, + new BytesTransportResponse(out.moveToBytesReference(), out.getTransportVersion()) + ); + } + private void maybeFreeContext( SearchPhaseResult result, BitSet relevantShardIndices, From f93d906ceff372537748821234373d7ff76f5b11 Mon Sep 17 00:00:00 2001 From: Ben Chaplin Date: Tue, 28 Oct 2025 17:48:35 -0400 Subject: [PATCH 7/7] Fix --- .../action/search/SearchQueryThenFetchAsyncAction.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 128a3ed7118d7..6be6b31726330 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -599,7 +599,6 @@ private void bwcHandleException(TransportException e) { } onPhaseFailure(getName(), "", cause); } - onNodeQueryFailure(e, request, routing); } }); });