Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/131517.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 131517
summary: Refresh potential lost connections at query start for field caps
area: Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.indices.cluster;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.ClusterPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import org.hamcrest.Matchers;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;

public class FieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase {
private static final String LINKED_CLUSTER_1 = "cluster-a";
private static final String LINKED_CLUSTER_2 = "cluster-b";

public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(ForceConnectTimeoutSetting);
}
}

private static final Setting<String> ForceConnectTimeoutSetting = Setting.simpleString(
"search.ccs.force_connect_timeout",
Setting.Property.NodeScope
);

@Override
protected List<String> remoteClusterAlias() {
return List.of(LINKED_CLUSTER_1, LINKED_CLUSTER_2);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class);
}

@Override
protected Settings nodeSettings() {
/*
* This is the setting that controls how long TransportFieldCapabilitiesAction will wait for establishing a connection
* with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent
* with what we've done in other tests.
*/
return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build();
}

@Override
protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
return Map.of(LINKED_CLUSTER_1, true, LINKED_CLUSTER_2, true);
}

public void testTimeoutSetting() {
var latch = new CountDownLatch(1);
for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) {
MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName);

mts.addConnectBehavior(
cluster(LINKED_CLUSTER_1).getInstance(TransportService.class, (String) null),
((transport, discoveryNode, profile, listener) -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}

transport.openConnection(discoveryNode, profile, listener);
})
);
}

// Add some dummy data to prove we are communicating fine with the remote.
assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index"));
client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();

/*
* Do a full restart so that our custom connect behaviour takes effect since it does not apply to
* pre-existing connections -- they're already established by the time this test runs.
*/
try {
cluster(LINKED_CLUSTER_1).fullRestart();
} catch (Exception e) {
throw new AssertionError(e);
} finally {
var fieldCapsRequest = new FieldCapabilitiesRequest();
/*
* We have an origin and 2 linked clusters but will target only the one that we stalled.
* This is because when the timeout kicks in, and we move on from the stalled cluster, we do not want
* the error to be a top-level error. Rather, it must be present in the response object under "failures".
* All other errors are free to be top-level errors though.
*/
fieldCapsRequest.indices(LINKED_CLUSTER_1 + ":*");
fieldCapsRequest.fields("foo", "bar", "baz");
var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest));

var failures = result.getFailures();
assertThat(failures.size(), Matchers.is(1));

var failure = failures.getFirst();
assertThat(failure.getIndices().length, Matchers.is(1));
assertThat(failure.getIndices()[0], Matchers.equalTo("cluster-a:*"));
// Outer wrapper that gets unwrapped in ExceptionsHelper.isRemoteUnavailableException().
assertThat(
failure.getException().toString(),
Matchers.containsString("java.lang.IllegalStateException: Unable to open any connections")
);

// The actual error that is thrown by the subscribable listener when a linked cluster could not be talked to.
assertThat(failure.getException().getCause(), Matchers.instanceOf(ElasticsearchTimeoutException.class));
assertThat(ExceptionsHelper.isRemoteUnavailableException(failure.getException()), Matchers.is(true));

latch.countDown();
result.decRef();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@

import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.automaton.TooComplexToDeterminizeException;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.OriginalIndices;
Expand All @@ -22,7 +24,7 @@
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.client.internal.RemoteClusterClient;
import org.elasticsearch.action.support.SubscribableListener;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -37,6 +39,7 @@
import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
Expand All @@ -48,9 +51,10 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
import org.elasticsearch.transport.TransportRequestOptions;
import org.elasticsearch.transport.TransportService;

import java.util.ArrayList;
Expand Down Expand Up @@ -91,6 +95,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie

private final IndicesService indicesService;
private final boolean ccsCheckCompatibility;
private final ThreadPool threadPool;
private final TimeValue forceConnectTimeoutSecs;

@Inject
public TransportFieldCapabilitiesAction(
Expand All @@ -117,32 +123,40 @@ public TransportFieldCapabilitiesAction(
new NodeTransportHandler()
);
this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
this.threadPool = threadPool;
this.forceConnectTimeoutSecs = clusterService.getSettings().getAsTime("search.ccs.force_connect_timeout", null);
}

@Override
protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
executeRequest(
task,
request,
(remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener),
(transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest(
conn,
REMOTE_TYPE.name(),
fieldCapabilitiesRequest,
TransportRequestOptions.EMPTY,
responseHandler
),
listener
);
}

public void executeRequest(
Task task,
FieldCapabilitiesRequest request,
RemoteRequestExecutor remoteRequestExecutor,
LinkedRequestExecutor linkedRequestExecutor,
ActionListener<FieldCapabilitiesResponse> listener
) {
// workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l)));
searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, linkedRequestExecutor, l)));
}

private void doExecuteForked(
Task task,
FieldCapabilitiesRequest request,
RemoteRequestExecutor remoteRequestExecutor,
LinkedRequestExecutor linkedRequestExecutor,
ActionListener<FieldCapabilitiesResponse> listener
) {
if (ccsCheckCompatibility) {
Expand Down Expand Up @@ -268,12 +282,6 @@ private void doExecuteForked(
for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
String clusterAlias = remoteIndices.getKey();
OriginalIndices originalIndices = remoteIndices.getValue();
var remoteClusterClient = transportService.getRemoteClusterService()
.getRemoteClusterClient(
clusterAlias,
singleThreadedExecutor,
RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
);
FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
Expand All @@ -299,18 +307,34 @@ private void doExecuteForked(
handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
}
});
remoteRequestExecutor.executeRemoteRequest(
remoteClusterClient,
remoteRequest,

SubscribableListener<Transport.Connection> connectionListener = new SubscribableListener<>();
if (forceConnectTimeoutSecs != null) {
connectionListener.addTimeout(forceConnectTimeoutSecs, threadPool, singleThreadedExecutor);
}

connectionListener.addListener(
// The underlying transport service may call onFailure with a thread pool other than search_coordinator.
// This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
// TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439
new ForkingOnFailureActionListener<>(
singleThreadedExecutor,
true,
ActionListener.releaseAfter(remoteListener, refs.acquire())
).delegateFailure(
(responseListener, conn) -> linkedRequestExecutor.executeRemoteRequest(
transportService,
conn,
remoteRequest,
new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor)
)
)
);

boolean ensureConnected = forceConnectTimeoutSecs != null
|| transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
transportService.getRemoteClusterService()
.maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener);
}
}
}
Expand Down Expand Up @@ -338,11 +362,12 @@ public void onFailure(Exception e) {
});
}

public interface RemoteRequestExecutor {
public interface LinkedRequestExecutor {
void executeRemoteRequest(
RemoteClusterClient remoteClient,
TransportService transportService,
Transport.Connection conn,
FieldCapabilitiesRequest remoteRequest,
ActionListener<FieldCapabilitiesResponse> remoteListener
ActionListenerResponseHandler<FieldCapabilitiesResponse> responseHandler
);
}

Expand Down Expand Up @@ -376,8 +401,20 @@ private static void mergeIndexResponses(
} else {
// we have no responses at all, maybe because of errors
if (indexFailures.isEmpty() == false) {
// throw back the first exception
listener.onFailure(failures.get(0).getException());
/*
* Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors.
* Instead, they should always be passed through the response object, as part of "failures".
*/
if (failures.stream()
.anyMatch(
failure -> failure.getException() instanceof IllegalStateException ise
&& ise.getCause() instanceof ElasticsearchTimeoutException
)) {
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures));
} else {
// throw back the first exception
listener.onFailure(failures.get(0).getException());
}
} else {
listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
}
Expand Down Expand Up @@ -585,15 +622,24 @@ List<FieldCapabilitiesFailure> build(Set<String> successfulIndices) {
for (Map.Entry<String, Exception> failure : failuresByIndex.entrySet()) {
String index = failure.getKey();
Exception e = failure.getValue();
/*
* The listener we use to briefly try, and connect to a linked cluster can throw an ElasticsearchTimeoutException
* error if it cannot be reached. To make sure we correctly recognise this scenario via
* ExceptionsHelper.isRemoteUnavailableException(), we wrap this error appropriately.
*/
if (e instanceof ElasticsearchTimeoutException ete) {
e = new IllegalStateException("Unable to open any connections", ete);
}

if (successfulIndices.contains(index) == false) {
// we deduplicate exceptions on the underlying causes message and classname
// we unwrap the cause to e.g. group RemoteTransportExceptions coming from different nodes if the cause is the same
Throwable cause = ExceptionsHelper.unwrapCause(e);
Tuple<String, String> groupingKey = new Tuple<>(cause.getMessage(), cause.getClass().getName());
Exception ex = e;
indexFailures.compute(
groupingKey,
(k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, e) : v.addIndex(index)
(k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, ex) : v.addIndex(index)
);
}
}
Expand Down
Loading