Skip to content

Commit 8005a7d

Browse files
Refresh potential lost connections at query start for field caps (#131517)
For CPS S2D9, we'd need field caps to refresh potentially lost connections before executing a query by explicitly establishing a connection with a short timeout to avoid waiting for large duration. This is similar to what we recently did with `_search` and will help for ES|QL as well.
1 parent f5b1263 commit 8005a7d

File tree

4 files changed

+223
-33
lines changed

4 files changed

+223
-33
lines changed

docs/changelog/131517.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 131517
2+
summary: Refresh potential lost connections at query start for field caps
3+
area: Search
4+
type: enhancement
5+
issues: []
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.indices.cluster;
11+
12+
import org.elasticsearch.ElasticsearchTimeoutException;
13+
import org.elasticsearch.ExceptionsHelper;
14+
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
15+
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
16+
import org.elasticsearch.common.settings.Setting;
17+
import org.elasticsearch.common.settings.Settings;
18+
import org.elasticsearch.common.util.CollectionUtils;
19+
import org.elasticsearch.plugins.ClusterPlugin;
20+
import org.elasticsearch.plugins.Plugin;
21+
import org.elasticsearch.test.AbstractMultiClustersTestCase;
22+
import org.elasticsearch.test.transport.MockTransportService;
23+
import org.elasticsearch.transport.TransportService;
24+
import org.hamcrest.Matchers;
25+
26+
import java.util.Collection;
27+
import java.util.List;
28+
import java.util.Map;
29+
import java.util.concurrent.CountDownLatch;
30+
31+
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
32+
33+
public class FieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase {
34+
private static final String LINKED_CLUSTER_1 = "cluster-a";
35+
private static final String LINKED_CLUSTER_2 = "cluster-b";
36+
37+
public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin {
38+
@Override
39+
public List<Setting<?>> getSettings() {
40+
return List.of(ForceConnectTimeoutSetting);
41+
}
42+
}
43+
44+
private static final Setting<String> ForceConnectTimeoutSetting = Setting.simpleString(
45+
"search.ccs.force_connect_timeout",
46+
Setting.Property.NodeScope
47+
);
48+
49+
@Override
50+
protected List<String> remoteClusterAlias() {
51+
return List.of(LINKED_CLUSTER_1, LINKED_CLUSTER_2);
52+
}
53+
54+
@Override
55+
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
56+
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class);
57+
}
58+
59+
@Override
60+
protected Settings nodeSettings() {
61+
/*
62+
* This is the setting that controls how long TransportFieldCapabilitiesAction will wait for establishing a connection
63+
* with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent
64+
* with what we've done in other tests.
65+
*/
66+
return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build();
67+
}
68+
69+
@Override
70+
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
71+
return Map.of(LINKED_CLUSTER_1, true, LINKED_CLUSTER_2, true);
72+
}
73+
74+
public void testTimeoutSetting() {
75+
var latch = new CountDownLatch(1);
76+
for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) {
77+
MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName);
78+
79+
mts.addConnectBehavior(
80+
cluster(LINKED_CLUSTER_1).getInstance(TransportService.class, (String) null),
81+
((transport, discoveryNode, profile, listener) -> {
82+
try {
83+
latch.await();
84+
} catch (InterruptedException e) {
85+
throw new AssertionError(e);
86+
}
87+
88+
transport.openConnection(discoveryNode, profile, listener);
89+
})
90+
);
91+
}
92+
93+
// Add some dummy data to prove we are communicating fine with the remote.
94+
assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index"));
95+
client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
96+
client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();
97+
98+
/*
99+
* Do a full restart so that our custom connect behaviour takes effect since it does not apply to
100+
* pre-existing connections -- they're already established by the time this test runs.
101+
*/
102+
try {
103+
cluster(LINKED_CLUSTER_1).fullRestart();
104+
} catch (Exception e) {
105+
throw new AssertionError(e);
106+
} finally {
107+
var fieldCapsRequest = new FieldCapabilitiesRequest();
108+
/*
109+
* We have an origin and 2 linked clusters but will target only the one that we stalled.
110+
* This is because when the timeout kicks in, and we move on from the stalled cluster, we do not want
111+
* the error to be a top-level error. Rather, it must be present in the response object under "failures".
112+
* All other errors are free to be top-level errors though.
113+
*/
114+
fieldCapsRequest.indices(LINKED_CLUSTER_1 + ":*");
115+
fieldCapsRequest.fields("foo", "bar", "baz");
116+
var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest));
117+
118+
var failures = result.getFailures();
119+
assertThat(failures.size(), Matchers.is(1));
120+
121+
var failure = failures.getFirst();
122+
assertThat(failure.getIndices().length, Matchers.is(1));
123+
assertThat(failure.getIndices()[0], Matchers.equalTo("cluster-a:*"));
124+
// Outer wrapper that gets unwrapped in ExceptionsHelper.isRemoteUnavailableException().
125+
assertThat(
126+
failure.getException().toString(),
127+
Matchers.containsString("java.lang.IllegalStateException: Unable to open any connections")
128+
);
129+
130+
// The actual error that is thrown by the subscribable listener when a linked cluster could not be talked to.
131+
assertThat(failure.getException().getCause(), Matchers.instanceOf(ElasticsearchTimeoutException.class));
132+
assertThat(ExceptionsHelper.isRemoteUnavailableException(failure.getException()), Matchers.is(true));
133+
134+
latch.countDown();
135+
result.decRef();
136+
}
137+
}
138+
}

server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

Lines changed: 67 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111

1212
import org.apache.lucene.util.ArrayUtil;
1313
import org.apache.lucene.util.automaton.TooComplexToDeterminizeException;
14+
import org.elasticsearch.ElasticsearchTimeoutException;
1415
import org.elasticsearch.ExceptionsHelper;
1516
import org.elasticsearch.action.ActionListener;
17+
import org.elasticsearch.action.ActionListenerResponseHandler;
1618
import org.elasticsearch.action.ActionRunnable;
1719
import org.elasticsearch.action.ActionType;
1820
import org.elasticsearch.action.OriginalIndices;
@@ -22,7 +24,7 @@
2224
import org.elasticsearch.action.support.ChannelActionListener;
2325
import org.elasticsearch.action.support.HandledTransportAction;
2426
import org.elasticsearch.action.support.RefCountingRunnable;
25-
import org.elasticsearch.client.internal.RemoteClusterClient;
27+
import org.elasticsearch.action.support.SubscribableListener;
2628
import org.elasticsearch.cluster.ProjectState;
2729
import org.elasticsearch.cluster.block.ClusterBlockLevel;
2830
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -37,6 +39,7 @@
3739
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
3840
import org.elasticsearch.core.Nullable;
3941
import org.elasticsearch.core.Releasable;
42+
import org.elasticsearch.core.TimeValue;
4043
import org.elasticsearch.core.Tuple;
4144
import org.elasticsearch.index.shard.ShardId;
4245
import org.elasticsearch.indices.IndicesService;
@@ -48,9 +51,10 @@
4851
import org.elasticsearch.tasks.Task;
4952
import org.elasticsearch.threadpool.ThreadPool;
5053
import org.elasticsearch.transport.RemoteClusterAware;
51-
import org.elasticsearch.transport.RemoteClusterService;
54+
import org.elasticsearch.transport.Transport;
5255
import org.elasticsearch.transport.TransportChannel;
5356
import org.elasticsearch.transport.TransportRequestHandler;
57+
import org.elasticsearch.transport.TransportRequestOptions;
5458
import org.elasticsearch.transport.TransportService;
5559

5660
import java.util.ArrayList;
@@ -91,6 +95,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
9195

9296
private final IndicesService indicesService;
9397
private final boolean ccsCheckCompatibility;
98+
private final ThreadPool threadPool;
99+
private final TimeValue forceConnectTimeoutSecs;
94100

95101
@Inject
96102
public TransportFieldCapabilitiesAction(
@@ -117,32 +123,40 @@ public TransportFieldCapabilitiesAction(
117123
new NodeTransportHandler()
118124
);
119125
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
126+
this.threadPool = threadPool;
127+
this.forceConnectTimeoutSecs = clusterService.getSettings().getAsTime("search.ccs.force_connect_timeout", null);
120128
}
121129

122130
@Override
123131
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
124132
executeRequest(
125133
task,
126134
request,
127-
(remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener),
135+
(transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest(
136+
conn,
137+
REMOTE_TYPE.name(),
138+
fieldCapabilitiesRequest,
139+
TransportRequestOptions.EMPTY,
140+
responseHandler
141+
),
128142
listener
129143
);
130144
}
131145

132146
public void executeRequest(
133147
Task task,
134148
FieldCapabilitiesRequest request,
135-
RemoteRequestExecutor remoteRequestExecutor,
149+
LinkedRequestExecutor linkedRequestExecutor,
136150
ActionListener<FieldCapabilitiesResponse> listener
137151
) {
138152
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
139-
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l)));
153+
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, linkedRequestExecutor, l)));
140154
}
141155

142156
private void doExecuteForked(
143157
Task task,
144158
FieldCapabilitiesRequest request,
145-
RemoteRequestExecutor remoteRequestExecutor,
159+
LinkedRequestExecutor linkedRequestExecutor,
146160
ActionListener<FieldCapabilitiesResponse> listener
147161
) {
148162
if (ccsCheckCompatibility) {
@@ -268,12 +282,6 @@ private void doExecuteForked(
268282
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
269283
String clusterAlias = remoteIndices.getKey();
270284
OriginalIndices originalIndices = remoteIndices.getValue();
271-
var remoteClusterClient = transportService.getRemoteClusterService()
272-
.getRemoteClusterClient(
273-
clusterAlias,
274-
singleThreadedExecutor,
275-
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
276-
);
277285
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
278286
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
279287
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
@@ -299,18 +307,34 @@ private void doExecuteForked(
299307
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
300308
}
301309
});
302-
remoteRequestExecutor.executeRemoteRequest(
303-
remoteClusterClient,
304-
remoteRequest,
310+
311+
SubscribableListener<Transport.Connection> connectionListener = new SubscribableListener<>();
312+
if (forceConnectTimeoutSecs != null) {
313+
connectionListener.addTimeout(forceConnectTimeoutSecs, threadPool, singleThreadedExecutor);
314+
}
315+
316+
connectionListener.addListener(
305317
// The underlying transport service may call onFailure with a thread pool other than search_coordinator.
306318
// This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
307319
// TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439
308320
new ForkingOnFailureActionListener<>(
309321
singleThreadedExecutor,
310322
true,
311323
ActionListener.releaseAfter(remoteListener, refs.acquire())
324+
).delegateFailure(
325+
(responseListener, conn) -> linkedRequestExecutor.executeRemoteRequest(
326+
transportService,
327+
conn,
328+
remoteRequest,
329+
new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor)
330+
)
312331
)
313332
);
333+
334+
boolean ensureConnected = forceConnectTimeoutSecs != null
335+
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
336+
transportService.getRemoteClusterService()
337+
.maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener);
314338
}
315339
}
316340
}
@@ -338,11 +362,12 @@ public void onFailure(Exception e) {
338362
});
339363
}
340364

341-
public interface RemoteRequestExecutor {
365+
public interface LinkedRequestExecutor {
342366
void executeRemoteRequest(
343-
RemoteClusterClient remoteClient,
367+
TransportService transportService,
368+
Transport.Connection conn,
344369
FieldCapabilitiesRequest remoteRequest,
345-
ActionListener<FieldCapabilitiesResponse> remoteListener
370+
ActionListenerResponseHandler<FieldCapabilitiesResponse> responseHandler
346371
);
347372
}
348373

@@ -376,8 +401,20 @@ private static void mergeIndexResponses(
376401
} else {
377402
// we have no responses at all, maybe because of errors
378403
if (indexFailures.isEmpty() == false) {
379-
// throw back the first exception
380-
listener.onFailure(failures.get(0).getException());
404+
/*
405+
* Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors.
406+
* Instead, they should always be passed through the response object, as part of "failures".
407+
*/
408+
if (failures.stream()
409+
.anyMatch(
410+
failure -> failure.getException() instanceof IllegalStateException ise
411+
&& ise.getCause() instanceof ElasticsearchTimeoutException
412+
)) {
413+
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures));
414+
} else {
415+
// throw back the first exception
416+
listener.onFailure(failures.get(0).getException());
417+
}
381418
} else {
382419
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
383420
}
@@ -585,15 +622,24 @@ List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
585622
for (Map.Entry<String, Exception> failure : failuresByIndex.entrySet()) {
586623
String index = failure.getKey();
587624
Exception e = failure.getValue();
625+
/*
626+
* The listener we use to briefly try, and connect to a linked cluster can throw an ElasticsearchTimeoutException
627+
* error if it cannot be reached. To make sure we correctly recognise this scenario via
628+
* ExceptionsHelper.isRemoteUnavailableException(), we wrap this error appropriately.
629+
*/
630+
if (e instanceof ElasticsearchTimeoutException ete) {
631+
e = new IllegalStateException("Unable to open any connections", ete);
632+
}
588633

589634
if (successfulIndices.contains(index) == false) {
590635
// we deduplicate exceptions on the underlying causes message and classname
591636
// we unwrap the cause to e.g. group RemoteTransportExceptions coming from different nodes if the cause is the same
592637
Throwable cause = ExceptionsHelper.unwrapCause(e);
593638
Tuple<String, String> groupingKey = new Tuple<>(cause.getMessage(), cause.getClass().getName());
639+
Exception ex = e;
594640
indexFailures.compute(
595641
groupingKey,
596-
(k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, e) : v.addIndex(index)
642+
(k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, ex) : v.addIndex(index)
597643
);
598644
}
599645
}

0 commit comments

Comments
 (0)