From 6da2ecafe534b5818626b2a89b70bfd29138e6b8 Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Thu, 17 Jul 2025 21:39:27 +0100 Subject: [PATCH 1/7] Force reconnect to remote clusters with a short timeout for CPS --- .../RemoteFieldCapsForceConnectTimeoutIT.java | 112 ++++++++++++++++++ .../TransportFieldCapabilitiesAction.java | 52 +++++--- .../esql/action/EsqlResolveFieldsAction.java | 19 +-- 3 files changed, 160 insertions(+), 23 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java new file mode 100644 index 0000000000000..873df48fe1434 --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; +import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +public class RemoteFieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase { + private static final String REMOTE_CLUSTER_1 = "cluster-a"; + + public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin { + @Override + public List> getSettings() { + return List.of(ForceConnectTimeoutSetting); + } + } + + private static final Setting ForceConnectTimeoutSetting = Setting.simpleString( + "search.ccs.force_connect_timeout", + Setting.Property.NodeScope + ); + + @Override + protected List remoteClusterAlias() { + return List.of(REMOTE_CLUSTER_1); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class); + } + + @Override + protected Settings nodeSettings() { + /* + * This is the setting that controls how long TransportFieldCapabilitiesAction will wait for establishing a connection + * with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent + * with what we've done in other tests. + */ + return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build(); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(REMOTE_CLUSTER_1, true); + } + + public void testTimeoutSetting() { + var latch = new CountDownLatch(1); + for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) { + MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName); + + mts.addConnectBehavior( + cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, randomFrom(cluster(REMOTE_CLUSTER_1).getNodeNames())), + ((transport, discoveryNode, profile, listener) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + + transport.openConnection(discoveryNode, profile, listener); + }) + ); + } + + // Add some dummy data to prove we are communicating fine with the remote. + assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index")); + client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); + client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); + + /* + * Do a full restart so that our custom connect behaviour takes effect since it does not apply to + * pre-existing connections -- they're already established by the time this test runs. + */ + try { + cluster(REMOTE_CLUSTER_1).fullRestart(); + } catch (Exception e) { + throw new AssertionError(e); + } finally { + var fieldCapsRequest = new FieldCapabilitiesRequest(); + fieldCapsRequest.indices("*", "*:*"); + fieldCapsRequest.fields("foo", "bar", "baz"); + var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest)); + + latch.countDown(); + result.decRef(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index eb45276ce2da0..66b2690865368 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -13,6 +13,7 @@ import org.apache.lucene.util.automaton.TooComplexToDeterminizeException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.OriginalIndices; @@ -22,7 +23,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.RefCountingRunnable; -import org.elasticsearch.client.internal.RemoteClusterClient; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -37,6 +38,7 @@ import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -48,9 +50,10 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -91,6 +94,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener), + (transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest( + conn, + REMOTE_TYPE.name(), + fieldCapabilitiesRequest, + TransportRequestOptions.EMPTY, + responseHandler + ), listener ); } @@ -268,12 +281,6 @@ private void doExecuteForked( for (Map.Entry remoteIndices : remoteClusterIndices.entrySet()) { String clusterAlias = remoteIndices.getKey(); OriginalIndices originalIndices = remoteIndices.getValue(); - var remoteClusterClient = transportService.getRemoteClusterService() - .getRemoteClusterClient( - clusterAlias, - singleThreadedExecutor, - RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE - ); FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis); ActionListener remoteListener = ActionListener.wrap(response -> { for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) { @@ -299,9 +306,13 @@ private void doExecuteForked( handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); } }); - remoteRequestExecutor.executeRemoteRequest( - remoteClusterClient, - remoteRequest, + + SubscribableListener connectionListener = new SubscribableListener<>(); + if (forceConnectTimeoutSecs != null) { + connectionListener.addTimeout(forceConnectTimeoutSecs, threadPool, singleThreadedExecutor); + } + + connectionListener.addListener( // The underlying transport service may call onFailure with a thread pool other than search_coordinator. // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator. // TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439 @@ -309,8 +320,20 @@ private void doExecuteForked( singleThreadedExecutor, true, ActionListener.releaseAfter(remoteListener, refs.acquire()) + ).delegateFailure( + (responseListener, conn) -> remoteRequestExecutor.executeRemoteRequest( + transportService, + conn, + remoteRequest, + new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor) + ) ) ); + + boolean ensureConnected = forceConnectTimeoutSecs != null + || transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false; + transportService.getRemoteClusterService() + .maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener); } } } @@ -340,9 +363,10 @@ public void onFailure(Exception e) { public interface RemoteRequestExecutor { void executeRemoteRequest( - RemoteClusterClient remoteClient, + TransportService transportService, + Transport.Connection conn, FieldCapabilitiesRequest remoteRequest, - ActionListener remoteListener + ActionListenerResponseHandler responseHandler ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java index f7fd991a9ef16..4fc1b32e37e03 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java @@ -8,6 +8,7 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; @@ -15,10 +16,11 @@ import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; /** @@ -53,15 +55,14 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti } void executeRemoteRequest( - RemoteClusterClient remoteClient, + TransportService transportService, + Transport.Connection conn, FieldCapabilitiesRequest remoteRequest, - ActionListener remoteListener + ActionListenerResponseHandler responseHandler ) { - remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> { - var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) - ? RESOLVE_REMOTE_TYPE - : TransportFieldCapabilitiesAction.REMOTE_TYPE; - remoteClient.execute(conn, remoteAction, remoteRequest, l); - })); + var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) + ? RESOLVE_REMOTE_TYPE + : TransportFieldCapabilitiesAction.REMOTE_TYPE; + transportService.sendRequest(conn, remoteAction.name(), remoteRequest, TransportRequestOptions.EMPTY, responseHandler); } } From d447ab0ba936e691abf9d2dd9d3b9d21d0868e2a Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Fri, 18 Jul 2025 12:04:42 +0100 Subject: [PATCH 2/7] Pass failures back correctly --- .../cluster/RemoteFieldCapsForceConnectTimeoutIT.java | 8 +++++++- .../fieldcaps/TransportFieldCapabilitiesAction.java | 8 +------- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java index 873df48fe1434..b4713ff491a7f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java @@ -19,6 +19,7 @@ import org.elasticsearch.test.AbstractMultiClustersTestCase; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.TransportService; +import org.hamcrest.Matchers; import java.util.Collection; import java.util.List; @@ -73,7 +74,7 @@ public void testTimeoutSetting() { MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName); mts.addConnectBehavior( - cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, randomFrom(cluster(REMOTE_CLUSTER_1).getNodeNames())), + cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, cluster(REMOTE_CLUSTER_1).getNodeNames()[0]), ((transport, discoveryNode, profile, listener) -> { try { latch.await(); @@ -105,6 +106,11 @@ public void testTimeoutSetting() { fieldCapsRequest.fields("foo", "bar", "baz"); var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest)); + var failures = result.getFailures(); + assertThat(failures.size(), Matchers.is(1)); + var message = result.getFailures().getFirst().getException().toString(); + assertThat(message, Matchers.containsString("org.elasticsearch.ElasticsearchTimeoutException: timed out after [1s/1000ms]")); + latch.countDown(); result.decRef(); } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 66b2690865368..7fbd2cb16d7bf 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -398,13 +398,7 @@ private static void mergeIndexResponses( listener.onResponse(new FieldCapabilitiesResponse(new ArrayList<>(indexResponses.values()), failures)); } } else { - // we have no responses at all, maybe because of errors - if (indexFailures.isEmpty() == false) { - // throw back the first exception - listener.onFailure(failures.get(0).getException()); - } else { - listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList())); - } + listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures)); } } From bd9ed9154d64977f94fa14c8698a16e5717612e9 Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Fri, 18 Jul 2025 12:32:35 +0100 Subject: [PATCH 3/7] Use 2 remotes for test and assert cluster name in failures --- .../RemoteFieldCapsForceConnectTimeoutIT.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java index b4713ff491a7f..ddacf464ec5d6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java @@ -30,6 +30,7 @@ public class RemoteFieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase { private static final String REMOTE_CLUSTER_1 = "cluster-a"; + private static final String REMOTE_CLUSTER_2 = "cluster-b"; public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin { @Override @@ -45,7 +46,7 @@ public List> getSettings() { @Override protected List remoteClusterAlias() { - return List.of(REMOTE_CLUSTER_1); + return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); } @Override @@ -65,7 +66,7 @@ protected Settings nodeSettings() { @Override protected Map skipUnavailableForRemoteClusters() { - return Map.of(REMOTE_CLUSTER_1, true); + return Map.of(REMOTE_CLUSTER_1, true, REMOTE_CLUSTER_2, true); } public void testTimeoutSetting() { @@ -108,8 +109,14 @@ public void testTimeoutSetting() { var failures = result.getFailures(); assertThat(failures.size(), Matchers.is(1)); - var message = result.getFailures().getFirst().getException().toString(); - assertThat(message, Matchers.containsString("org.elasticsearch.ElasticsearchTimeoutException: timed out after [1s/1000ms]")); + + var failure = failures.getFirst(); + assertThat(failure.getIndices().length, Matchers.is(1)); + assertThat(failure.getIndices()[0], Matchers.equalTo("cluster-a:*")); + assertThat( + failure.getException().toString(), + Matchers.containsString("org.elasticsearch.ElasticsearchTimeoutException: timed out after [1s/1000ms]") + ); latch.countDown(); result.decRef(); From 52afa62f36a7c316b86c288536891311bddee89d Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Tue, 22 Jul 2025 16:05:50 +0100 Subject: [PATCH 4/7] Pass only `ElasticsearchTimeoutException` through the response and others as top-level errors --- .../RemoteFieldCapsForceConnectTimeoutIT.java | 10 ++++++++-- .../TransportFieldCapabilitiesAction.java | 17 ++++++++++++++++- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java index ddacf464ec5d6..cb15564773e3b 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java @@ -75,7 +75,7 @@ public void testTimeoutSetting() { MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName); mts.addConnectBehavior( - cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, cluster(REMOTE_CLUSTER_1).getNodeNames()[0]), + cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, (String) null), ((transport, discoveryNode, profile, listener) -> { try { latch.await(); @@ -103,7 +103,13 @@ public void testTimeoutSetting() { throw new AssertionError(e); } finally { var fieldCapsRequest = new FieldCapabilitiesRequest(); - fieldCapsRequest.indices("*", "*:*"); + /* + * We have a local and 2 remote clusters but will target only the remote that we stalled. + * This is because when the timeout kicks in, and we move on from the stalled remote, we do not want + * the error to be a top-level error. Rather, it must be present in the response object under "failures". + * All other errors are free to be top-level errors though. + */ + fieldCapsRequest.indices(REMOTE_CLUSTER_1 + ":*"); fieldCapsRequest.fields("foo", "bar", "baz"); var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest)); diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 7fbd2cb16d7bf..ff347f02870a2 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -11,6 +11,7 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.automaton.TooComplexToDeterminizeException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; @@ -398,7 +399,21 @@ private static void mergeIndexResponses( listener.onResponse(new FieldCapabilitiesResponse(new ArrayList<>(indexResponses.values()), failures)); } } else { - listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures)); + // we have no responses at all, maybe because of errors + if (indexFailures.isEmpty() == false) { + /* + * Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors. + * Instead, they should always be passed through the response object, as part of "failures". + */ + if (failures.stream().anyMatch(failure -> failure.getException() instanceof ElasticsearchTimeoutException)) { + listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures)); + } else { + // throw back the first exception + listener.onFailure(failures.get(0).getException()); + } + } else { + listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList())); + } } } From 7b8c5cd9f71f27093680a7ca572a262128b7eab5 Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Tue, 22 Jul 2025 17:20:59 +0100 Subject: [PATCH 5/7] Update docs/changelog/131517.yaml --- docs/changelog/131517.yaml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 docs/changelog/131517.yaml diff --git a/docs/changelog/131517.yaml b/docs/changelog/131517.yaml new file mode 100644 index 0000000000000..0ecd47224e6d0 --- /dev/null +++ b/docs/changelog/131517.yaml @@ -0,0 +1,5 @@ +pr: 131517 +summary: Refresh potential lost connections at query start for field caps +area: Search +type: enhancement +issues: [] From af3091fcd67a8bc7c0392e0b9d03db054137dd65 Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Mon, 28 Jul 2025 16:40:16 +0100 Subject: [PATCH 6/7] Address review comment: wrap ETE to make it recognisable by `ExceptionsHelper.isRemoteUnavailableException()` --- .../RemoteFieldCapsForceConnectTimeoutIT.java | 7 ++++++- .../TransportFieldCapabilitiesAction.java | 17 +++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java index cb15564773e3b..891916f25046f 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java @@ -9,6 +9,7 @@ package org.elasticsearch.indices.cluster; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; import org.elasticsearch.common.settings.Setting; @@ -119,11 +120,15 @@ public void testTimeoutSetting() { var failure = failures.getFirst(); assertThat(failure.getIndices().length, Matchers.is(1)); assertThat(failure.getIndices()[0], Matchers.equalTo("cluster-a:*")); + // Outer wrapper that gets unwrapped in ExceptionsHelper.isRemoteUnavailableException(). assertThat( failure.getException().toString(), - Matchers.containsString("org.elasticsearch.ElasticsearchTimeoutException: timed out after [1s/1000ms]") + Matchers.containsString("java.lang.IllegalStateException: Unable to open any connections") ); + // The actual error that is thrown by the subscribable listener when a remote could not be talked to. + assertThat(failure.getException().getCause(), Matchers.instanceOf(ElasticsearchTimeoutException.class)); + latch.countDown(); result.decRef(); } diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index ff347f02870a2..6b13e6012cb73 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -405,7 +405,11 @@ private static void mergeIndexResponses( * Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors. * Instead, they should always be passed through the response object, as part of "failures". */ - if (failures.stream().anyMatch(failure -> failure.getException() instanceof ElasticsearchTimeoutException)) { + if (failures.stream() + .anyMatch( + failure -> failure.getException() instanceof IllegalStateException ise + && ise.getCause() instanceof ElasticsearchTimeoutException + )) { listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures)); } else { // throw back the first exception @@ -618,15 +622,24 @@ List build(Set successfulIndices) { for (Map.Entry failure : failuresByIndex.entrySet()) { String index = failure.getKey(); Exception e = failure.getValue(); + /* + * The listener we use to briefly try, and connect to a remote can throw an ElasticsearchTimeoutException + * error if a remote cannot be reached. To make sure we correctly recognise this scenario via + * ExceptionsHelper.isRemoteUnavailableException(), we wrap this error appropriately. + */ + if (e instanceof ElasticsearchTimeoutException ete) { + e = new IllegalStateException("Unable to open any connections", ete); + } if (successfulIndices.contains(index) == false) { // we deduplicate exceptions on the underlying causes message and classname // we unwrap the cause to e.g. group RemoteTransportExceptions coming from different nodes if the cause is the same Throwable cause = ExceptionsHelper.unwrapCause(e); Tuple groupingKey = new Tuple<>(cause.getMessage(), cause.getClass().getName()); + Exception ex = e; indexFailures.compute( groupingKey, - (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, e) : v.addIndex(index) + (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, ex) : v.addIndex(index) ); } } From bd2d73381b46358217357e71309afbd0a1b6f7a5 Mon Sep 17 00:00:00 2001 From: Pawan Kartik Date: Tue, 29 Jul 2025 16:24:04 +0100 Subject: [PATCH 7/7] Address review comment: use new naming conventions --- ...va => FieldCapsForceConnectTimeoutIT.java} | 30 ++++++++++--------- .../TransportFieldCapabilitiesAction.java | 14 ++++----- .../esql/action/EsqlResolveFieldsAction.java | 8 ++--- 3 files changed, 27 insertions(+), 25 deletions(-) rename server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/{RemoteFieldCapsForceConnectTimeoutIT.java => FieldCapsForceConnectTimeoutIT.java} (83%) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/FieldCapsForceConnectTimeoutIT.java similarity index 83% rename from server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java rename to server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/FieldCapsForceConnectTimeoutIT.java index 891916f25046f..96d2b4190f75d 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/RemoteFieldCapsForceConnectTimeoutIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/FieldCapsForceConnectTimeoutIT.java @@ -10,6 +10,7 @@ package org.elasticsearch.indices.cluster; import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; import org.elasticsearch.common.settings.Setting; @@ -29,9 +30,9 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -public class RemoteFieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase { - private static final String REMOTE_CLUSTER_1 = "cluster-a"; - private static final String REMOTE_CLUSTER_2 = "cluster-b"; +public class FieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase { + private static final String LINKED_CLUSTER_1 = "cluster-a"; + private static final String LINKED_CLUSTER_2 = "cluster-b"; public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin { @Override @@ -47,7 +48,7 @@ public List> getSettings() { @Override protected List remoteClusterAlias() { - return List.of(REMOTE_CLUSTER_1, REMOTE_CLUSTER_2); + return List.of(LINKED_CLUSTER_1, LINKED_CLUSTER_2); } @Override @@ -67,7 +68,7 @@ protected Settings nodeSettings() { @Override protected Map skipUnavailableForRemoteClusters() { - return Map.of(REMOTE_CLUSTER_1, true, REMOTE_CLUSTER_2, true); + return Map.of(LINKED_CLUSTER_1, true, LINKED_CLUSTER_2, true); } public void testTimeoutSetting() { @@ -76,7 +77,7 @@ public void testTimeoutSetting() { MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName); mts.addConnectBehavior( - cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, (String) null), + cluster(LINKED_CLUSTER_1).getInstance(TransportService.class, (String) null), ((transport, discoveryNode, profile, listener) -> { try { latch.await(); @@ -90,27 +91,27 @@ public void testTimeoutSetting() { } // Add some dummy data to prove we are communicating fine with the remote. - assertAcked(client(REMOTE_CLUSTER_1).admin().indices().prepareCreate("test-index")); - client(REMOTE_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); - client(REMOTE_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); + assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index")); + client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); + client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); /* * Do a full restart so that our custom connect behaviour takes effect since it does not apply to * pre-existing connections -- they're already established by the time this test runs. */ try { - cluster(REMOTE_CLUSTER_1).fullRestart(); + cluster(LINKED_CLUSTER_1).fullRestart(); } catch (Exception e) { throw new AssertionError(e); } finally { var fieldCapsRequest = new FieldCapabilitiesRequest(); /* - * We have a local and 2 remote clusters but will target only the remote that we stalled. - * This is because when the timeout kicks in, and we move on from the stalled remote, we do not want + * We have an origin and 2 linked clusters but will target only the one that we stalled. + * This is because when the timeout kicks in, and we move on from the stalled cluster, we do not want * the error to be a top-level error. Rather, it must be present in the response object under "failures". * All other errors are free to be top-level errors though. */ - fieldCapsRequest.indices(REMOTE_CLUSTER_1 + ":*"); + fieldCapsRequest.indices(LINKED_CLUSTER_1 + ":*"); fieldCapsRequest.fields("foo", "bar", "baz"); var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest)); @@ -126,8 +127,9 @@ public void testTimeoutSetting() { Matchers.containsString("java.lang.IllegalStateException: Unable to open any connections") ); - // The actual error that is thrown by the subscribable listener when a remote could not be talked to. + // The actual error that is thrown by the subscribable listener when a linked cluster could not be talked to. assertThat(failure.getException().getCause(), Matchers.instanceOf(ElasticsearchTimeoutException.class)); + assertThat(ExceptionsHelper.isRemoteUnavailableException(failure.getException()), Matchers.is(true)); latch.countDown(); result.decRef(); diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 6b13e6012cb73..e9960c832953c 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -146,17 +146,17 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti public void executeRequest( Task task, FieldCapabilitiesRequest request, - RemoteRequestExecutor remoteRequestExecutor, + LinkedRequestExecutor linkedRequestExecutor, ActionListener listener ) { // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can - searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l))); + searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, linkedRequestExecutor, l))); } private void doExecuteForked( Task task, FieldCapabilitiesRequest request, - RemoteRequestExecutor remoteRequestExecutor, + LinkedRequestExecutor linkedRequestExecutor, ActionListener listener ) { if (ccsCheckCompatibility) { @@ -322,7 +322,7 @@ private void doExecuteForked( true, ActionListener.releaseAfter(remoteListener, refs.acquire()) ).delegateFailure( - (responseListener, conn) -> remoteRequestExecutor.executeRemoteRequest( + (responseListener, conn) -> linkedRequestExecutor.executeRemoteRequest( transportService, conn, remoteRequest, @@ -362,7 +362,7 @@ public void onFailure(Exception e) { }); } - public interface RemoteRequestExecutor { + public interface LinkedRequestExecutor { void executeRemoteRequest( TransportService transportService, Transport.Connection conn, @@ -623,8 +623,8 @@ List build(Set successfulIndices) { String index = failure.getKey(); Exception e = failure.getValue(); /* - * The listener we use to briefly try, and connect to a remote can throw an ElasticsearchTimeoutException - * error if a remote cannot be reached. To make sure we correctly recognise this scenario via + * The listener we use to briefly try, and connect to a linked cluster can throw an ElasticsearchTimeoutException + * error if it cannot be reached. To make sure we correctly recognise this scenario via * ExceptionsHelper.isRemoteUnavailableException(), we wrap this error appropriately. */ if (e instanceof ElasticsearchTimeoutException ete) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java index 4fc1b32e37e03..a801a0bbaefa3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java @@ -51,18 +51,18 @@ public EsqlResolveFieldsAction( @Override protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { - fieldCapsAction.executeRequest(task, request, this::executeRemoteRequest, listener); + fieldCapsAction.executeRequest(task, request, this::executeLinkedRequest, listener); } - void executeRemoteRequest( + void executeLinkedRequest( TransportService transportService, Transport.Connection conn, - FieldCapabilitiesRequest remoteRequest, + FieldCapabilitiesRequest request, ActionListenerResponseHandler responseHandler ) { var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? RESOLVE_REMOTE_TYPE : TransportFieldCapabilitiesAction.REMOTE_TYPE; - transportService.sendRequest(conn, remoteAction.name(), remoteRequest, TransportRequestOptions.EMPTY, responseHandler); + transportService.sendRequest(conn, remoteAction.name(), request, TransportRequestOptions.EMPTY, responseHandler); } }