Skip to content

Commit c15da98

Browse files
committed
update after review
1 parent b75717d commit c15da98

File tree

2 files changed

+8
-14
lines changed

2 files changed

+8
-14
lines changed

server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.client.internal.node.NodeClient;
2323
import org.elasticsearch.cluster.node.DiscoveryNode;
2424
import org.elasticsearch.common.breaker.CircuitBreaker;
25-
import org.elasticsearch.common.bytes.ReleasableBytesReference;
2625
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2726
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
2827
import org.elasticsearch.common.io.stream.StreamInput;
@@ -799,16 +798,14 @@ public void onResponse(T response) {
799798
return;
800799
}
801800
var bytesRef = out.moveToBytesReference();
802-
Releasable breakerRelease;
803801
try {
804-
breakerRelease = detachRelease.apply(response);
802+
Releasables.close(detachRelease.apply(response));
805803
} catch (Exception e) {
806804
Releasables.close(bytesRef);
807805
channelListener.onFailure(e);
808806
return;
809807
}
810-
var trackingBytes = new ReleasableBytesReference(bytesRef, Releasables.wrap(bytesRef, breakerRelease));
811-
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(trackingBytes, out.getTransportVersion()));
808+
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(bytesRef, out.getTransportVersion()));
812809
}
813810

814811
@Override

server/src/test/java/org/elasticsearch/action/search/AsBytesResponseTests.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,10 @@ public void testNetworkPathCallsDetachReleaseOnSuccess() {
120120
listener.onResponse(new SimpleTestResponse("hello"));
121121

122122
assertTrue("detachRelease must be called after successful serialization", detachCalled.get());
123-
assertFalse("releasable must NOT be closed yet (deferred to bytes release)", releasableClosed.get());
123+
assertTrue("releasable must be closed eagerly after serialization", releasableClosed.get());
124124
assertThat(sentResponse.get(), notNullValue());
125125
assertThat(sentResponse.get(), instanceOf(BytesTransportResponse.class));
126-
127126
sentResponse.get().decRef();
128-
assertTrue("releasable must be closed after bytes are released", releasableClosed.get());
129127
}
130128

131129
public void testNetworkPathReleasesImmediatelyOnSerializationFailure() {
@@ -150,7 +148,7 @@ public void testNetworkPathReleasesImmediatelyOnSerializationFailure() {
150148
assertThat(sentException.get(), instanceOf(IOException.class));
151149
}
152150

153-
public void testNetworkPathDefersCircuitBreakerReleaseUntilBytesReleased() {
151+
public void testNetworkPathReleasesResponseBreakerEagerlyAndPageBreakerOnSend() {
154152
long responseBytes = 5000L;
155153
long pageBytes = PageCacheRecycler.BYTE_PAGE_SIZE;
156154
var breakerUsed = new AtomicLong(responseBytes);
@@ -181,14 +179,14 @@ public void testNetworkPathDefersCircuitBreakerReleaseUntilBytesReleased() {
181179

182180
assertThat("detach must zero out the field on the result", fetchResult.getSearchHitsSizeBytes(), equalTo(0L));
183181
assertThat(
184-
"breaker must account for both the detached response reservation and the serialized page bytes",
182+
"response reservation must be released eagerly, only page bytes remain on the breaker",
185183
breakerUsed.get(),
186-
equalTo(responseBytes + pageBytes)
184+
equalTo(pageBytes)
187185
);
188186
assertThat(sentResponse.get(), instanceOf(BytesTransportResponse.class));
189187

190188
sentResponse.get().decRef();
191-
assertThat("breaker must be released after bytes are released", breakerUsed.get(), equalTo(0L));
189+
assertThat("breaker must be fully released after bytes are sent", breakerUsed.get(), equalTo(0L));
192190
} finally {
193191
fetchResult.decRef();
194192
}
@@ -345,10 +343,9 @@ public void testTaskTransportChannelUnwrapsToNetworkPath() {
345343
listener.onResponse(new SimpleTestResponse("task-network-test"));
346344

347345
assertTrue("detachRelease must be called on task-wrapped network path", detachCalled.get());
348-
assertFalse("releasable must NOT be closed yet on task-wrapped network path", releasableClosed.get());
346+
assertTrue("releasable must be closed eagerly on task-wrapped network path", releasableClosed.get());
349347
assertThat(sentResponse.get(), instanceOf(BytesTransportResponse.class));
350348
sentResponse.get().decRef();
351-
assertTrue("releasable must be closed after bytes are released", releasableClosed.get());
352349
}
353350

354351
static class SimpleTestResponse extends TransportResponse {

0 commit comments

Comments
 (0)