diff --git a/docs/changelog/131517.yaml b/docs/changelog/131517.yaml new file mode 100644 index 0000000000000..0ecd47224e6d0 --- /dev/null +++ b/docs/changelog/131517.yaml @@ -0,0 +1,5 @@ +pr: 131517 +summary: Refresh potential lost connections at query start for field caps +area: Search +type: enhancement +issues: [] diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/FieldCapsForceConnectTimeoutIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/FieldCapsForceConnectTimeoutIT.java new file mode 100644 index 0000000000000..96d2b4190f75d --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/FieldCapsForceConnectTimeoutIT.java @@ -0,0 +1,138 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.indices.cluster; + +import org.elasticsearch.ElasticsearchTimeoutException; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; +import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.plugins.ClusterPlugin; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.AbstractMultiClustersTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; +import org.hamcrest.Matchers; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +public class FieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase { + private static final String LINKED_CLUSTER_1 = "cluster-a"; + private static final String LINKED_CLUSTER_2 = "cluster-b"; + + public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin { + @Override + public List> getSettings() { + return List.of(ForceConnectTimeoutSetting); + } + } + + private static final Setting ForceConnectTimeoutSetting = Setting.simpleString( + "search.ccs.force_connect_timeout", + Setting.Property.NodeScope + ); + + @Override + protected List remoteClusterAlias() { + return List.of(LINKED_CLUSTER_1, LINKED_CLUSTER_2); + } + + @Override + protected Collection> nodePlugins(String clusterAlias) { + return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class); + } + + @Override + protected Settings nodeSettings() { + /* + * This is the setting that controls how long TransportFieldCapabilitiesAction will wait for establishing a connection + * with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent + * with what we've done in other tests. + */ + return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build(); + } + + @Override + protected Map skipUnavailableForRemoteClusters() { + return Map.of(LINKED_CLUSTER_1, true, LINKED_CLUSTER_2, true); + } + + public void testTimeoutSetting() { + var latch = new CountDownLatch(1); + for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) { + MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName); + + mts.addConnectBehavior( + cluster(LINKED_CLUSTER_1).getInstance(TransportService.class, (String) null), + ((transport, discoveryNode, profile, listener) -> { + try { + latch.await(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } + + transport.openConnection(discoveryNode, profile, listener); + }) + ); + } + + // Add some dummy data to prove we are communicating fine with the remote. + assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index")); + client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get(); + client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get(); + + /* + * Do a full restart so that our custom connect behaviour takes effect since it does not apply to + * pre-existing connections -- they're already established by the time this test runs. + */ + try { + cluster(LINKED_CLUSTER_1).fullRestart(); + } catch (Exception e) { + throw new AssertionError(e); + } finally { + var fieldCapsRequest = new FieldCapabilitiesRequest(); + /* + * We have an origin and 2 linked clusters but will target only the one that we stalled. + * This is because when the timeout kicks in, and we move on from the stalled cluster, we do not want + * the error to be a top-level error. Rather, it must be present in the response object under "failures". + * All other errors are free to be top-level errors though. + */ + fieldCapsRequest.indices(LINKED_CLUSTER_1 + ":*"); + fieldCapsRequest.fields("foo", "bar", "baz"); + var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest)); + + var failures = result.getFailures(); + assertThat(failures.size(), Matchers.is(1)); + + var failure = failures.getFirst(); + assertThat(failure.getIndices().length, Matchers.is(1)); + assertThat(failure.getIndices()[0], Matchers.equalTo("cluster-a:*")); + // Outer wrapper that gets unwrapped in ExceptionsHelper.isRemoteUnavailableException(). + assertThat( + failure.getException().toString(), + Matchers.containsString("java.lang.IllegalStateException: Unable to open any connections") + ); + + // The actual error that is thrown by the subscribable listener when a linked cluster could not be talked to. + assertThat(failure.getException().getCause(), Matchers.instanceOf(ElasticsearchTimeoutException.class)); + assertThat(ExceptionsHelper.isRemoteUnavailableException(failure.getException()), Matchers.is(true)); + + latch.countDown(); + result.decRef(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index eb45276ce2da0..e9960c832953c 100644 --- a/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -11,8 +11,10 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.automaton.TooComplexToDeterminizeException; +import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.OriginalIndices; @@ -22,7 +24,7 @@ import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.RefCountingRunnable; -import org.elasticsearch.client.internal.RemoteClusterClient; +import org.elasticsearch.action.support.SubscribableListener; import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -37,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.core.Tuple; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; @@ -48,9 +51,10 @@ import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.RemoteClusterAware; -import org.elasticsearch.transport.RemoteClusterService; +import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportChannel; import org.elasticsearch.transport.TransportRequestHandler; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; import java.util.ArrayList; @@ -91,6 +95,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener), + (transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest( + conn, + REMOTE_TYPE.name(), + fieldCapabilitiesRequest, + TransportRequestOptions.EMPTY, + responseHandler + ), listener ); } @@ -132,17 +146,17 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti public void executeRequest( Task task, FieldCapabilitiesRequest request, - RemoteRequestExecutor remoteRequestExecutor, + LinkedRequestExecutor linkedRequestExecutor, ActionListener listener ) { // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can - searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l))); + searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, linkedRequestExecutor, l))); } private void doExecuteForked( Task task, FieldCapabilitiesRequest request, - RemoteRequestExecutor remoteRequestExecutor, + LinkedRequestExecutor linkedRequestExecutor, ActionListener listener ) { if (ccsCheckCompatibility) { @@ -268,12 +282,6 @@ private void doExecuteForked( for (Map.Entry remoteIndices : remoteClusterIndices.entrySet()) { String clusterAlias = remoteIndices.getKey(); OriginalIndices originalIndices = remoteIndices.getValue(); - var remoteClusterClient = transportService.getRemoteClusterService() - .getRemoteClusterClient( - clusterAlias, - singleThreadedExecutor, - RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE - ); FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis); ActionListener remoteListener = ActionListener.wrap(response -> { for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) { @@ -299,9 +307,13 @@ private void doExecuteForked( handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex); } }); - remoteRequestExecutor.executeRemoteRequest( - remoteClusterClient, - remoteRequest, + + SubscribableListener connectionListener = new SubscribableListener<>(); + if (forceConnectTimeoutSecs != null) { + connectionListener.addTimeout(forceConnectTimeoutSecs, threadPool, singleThreadedExecutor); + } + + connectionListener.addListener( // The underlying transport service may call onFailure with a thread pool other than search_coordinator. // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator. // TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439 @@ -309,8 +321,20 @@ private void doExecuteForked( singleThreadedExecutor, true, ActionListener.releaseAfter(remoteListener, refs.acquire()) + ).delegateFailure( + (responseListener, conn) -> linkedRequestExecutor.executeRemoteRequest( + transportService, + conn, + remoteRequest, + new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor) + ) ) ); + + boolean ensureConnected = forceConnectTimeoutSecs != null + || transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false; + transportService.getRemoteClusterService() + .maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener); } } } @@ -338,11 +362,12 @@ public void onFailure(Exception e) { }); } - public interface RemoteRequestExecutor { + public interface LinkedRequestExecutor { void executeRemoteRequest( - RemoteClusterClient remoteClient, + TransportService transportService, + Transport.Connection conn, FieldCapabilitiesRequest remoteRequest, - ActionListener remoteListener + ActionListenerResponseHandler responseHandler ); } @@ -376,8 +401,20 @@ private static void mergeIndexResponses( } else { // we have no responses at all, maybe because of errors if (indexFailures.isEmpty() == false) { - // throw back the first exception - listener.onFailure(failures.get(0).getException()); + /* + * Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors. + * Instead, they should always be passed through the response object, as part of "failures". + */ + if (failures.stream() + .anyMatch( + failure -> failure.getException() instanceof IllegalStateException ise + && ise.getCause() instanceof ElasticsearchTimeoutException + )) { + listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures)); + } else { + // throw back the first exception + listener.onFailure(failures.get(0).getException()); + } } else { listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList())); } @@ -585,15 +622,24 @@ List build(Set successfulIndices) { for (Map.Entry failure : failuresByIndex.entrySet()) { String index = failure.getKey(); Exception e = failure.getValue(); + /* + * The listener we use to briefly try, and connect to a linked cluster can throw an ElasticsearchTimeoutException + * error if it cannot be reached. To make sure we correctly recognise this scenario via + * ExceptionsHelper.isRemoteUnavailableException(), we wrap this error appropriately. + */ + if (e instanceof ElasticsearchTimeoutException ete) { + e = new IllegalStateException("Unable to open any connections", ete); + } if (successfulIndices.contains(index) == false) { // we deduplicate exceptions on the underlying causes message and classname // we unwrap the cause to e.g. group RemoteTransportExceptions coming from different nodes if the cause is the same Throwable cause = ExceptionsHelper.unwrapCause(e); Tuple groupingKey = new Tuple<>(cause.getMessage(), cause.getClass().getName()); + Exception ex = e; indexFailures.compute( groupingKey, - (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, e) : v.addIndex(index) + (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, ex) : v.addIndex(index) ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java index f7fd991a9ef16..a801a0bbaefa3 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java @@ -8,6 +8,7 @@ import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.RemoteClusterActionType; import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest; @@ -15,10 +16,11 @@ import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; -import org.elasticsearch.client.internal.RemoteClusterClient; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; +import org.elasticsearch.transport.Transport; +import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; /** @@ -49,19 +51,18 @@ public EsqlResolveFieldsAction( @Override protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { - fieldCapsAction.executeRequest(task, request, this::executeRemoteRequest, listener); + fieldCapsAction.executeRequest(task, request, this::executeLinkedRequest, listener); } - void executeRemoteRequest( - RemoteClusterClient remoteClient, - FieldCapabilitiesRequest remoteRequest, - ActionListener remoteListener + void executeLinkedRequest( + TransportService transportService, + Transport.Connection conn, + FieldCapabilitiesRequest request, + ActionListenerResponseHandler responseHandler ) { - remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> { - var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) - ? RESOLVE_REMOTE_TYPE - : TransportFieldCapabilitiesAction.REMOTE_TYPE; - remoteClient.execute(conn, remoteAction, remoteRequest, l); - })); + var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) + ? RESOLVE_REMOTE_TYPE + : TransportFieldCapabilitiesAction.REMOTE_TYPE; + transportService.sendRequest(conn, remoteAction.name(), request, TransportRequestOptions.EMPTY, responseHandler); } }