Browse Source

Refresh potential lost connections at query start for field caps (#131517)

For CPS S2D9, we'd need field caps to refresh potentially lost connections before
executing a query by explicitly establishing a connection with a short timeout to
avoid waiting for large duration. This is similar to what we recently did with
`_search` and will help for ES|QL as well.
Pawan Kartik 2 months ago
parent
commit
8005a7d9e7

+ 5 - 0
docs/changelog/131517.yaml

@@ -0,0 +1,5 @@
+pr: 131517
+summary: Refresh potential lost connections at query start for field caps
+area: Search
+type: enhancement
+issues: []

+ 138 - 0
server/src/internalClusterTest/java/org/elasticsearch/indices/cluster/FieldCapsForceConnectTimeoutIT.java

@@ -0,0 +1,138 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.indices.cluster;
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
+import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
+import org.elasticsearch.common.settings.Setting;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.plugins.ClusterPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.AbstractMultiClustersTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.TransportService;
+import org.hamcrest.Matchers;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+
+public class FieldCapsForceConnectTimeoutIT extends AbstractMultiClustersTestCase {
+    private static final String LINKED_CLUSTER_1 = "cluster-a";
+    private static final String LINKED_CLUSTER_2 = "cluster-b";
+
+    public static class ForceConnectTimeoutPlugin extends Plugin implements ClusterPlugin {
+        @Override
+        public List<Setting<?>> getSettings() {
+            return List.of(ForceConnectTimeoutSetting);
+        }
+    }
+
+    private static final Setting<String> ForceConnectTimeoutSetting = Setting.simpleString(
+        "search.ccs.force_connect_timeout",
+        Setting.Property.NodeScope
+    );
+
+    @Override
+    protected List<String> remoteClusterAlias() {
+        return List.of(LINKED_CLUSTER_1, LINKED_CLUSTER_2);
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
+        return CollectionUtils.appendToCopy(super.nodePlugins(clusterAlias), ForceConnectTimeoutPlugin.class);
+    }
+
+    @Override
+    protected Settings nodeSettings() {
+        /*
+         * This is the setting that controls how long TransportFieldCapabilitiesAction will wait for establishing a connection
+         * with a remote. At present, we set it to low 1s to prevent stalling the test for too long -- this is consistent
+         * with what we've done in other tests.
+         */
+        return Settings.builder().put(super.nodeSettings()).put("search.ccs.force_connect_timeout", "1s").build();
+    }
+
+    @Override
+    protected Map<String, Boolean> skipUnavailableForRemoteClusters() {
+        return Map.of(LINKED_CLUSTER_1, true, LINKED_CLUSTER_2, true);
+    }
+
+    public void testTimeoutSetting() {
+        var latch = new CountDownLatch(1);
+        for (String nodeName : cluster(LOCAL_CLUSTER).getNodeNames()) {
+            MockTransportService mts = (MockTransportService) cluster(LOCAL_CLUSTER).getInstance(TransportService.class, nodeName);
+
+            mts.addConnectBehavior(
+                cluster(LINKED_CLUSTER_1).getInstance(TransportService.class, (String) null),
+                ((transport, discoveryNode, profile, listener) -> {
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new AssertionError(e);
+                    }
+
+                    transport.openConnection(discoveryNode, profile, listener);
+                })
+            );
+        }
+
+        // Add some dummy data to prove we are communicating fine with the remote.
+        assertAcked(client(LINKED_CLUSTER_1).admin().indices().prepareCreate("test-index"));
+        client(LINKED_CLUSTER_1).prepareIndex("test-index").setSource("sample-field", "sample-value").get();
+        client(LINKED_CLUSTER_1).admin().indices().prepareRefresh("test-index").get();
+
+        /*
+         * Do a full restart so that our custom connect behaviour takes effect since it does not apply to
+         * pre-existing connections -- they're already established by the time this test runs.
+         */
+        try {
+            cluster(LINKED_CLUSTER_1).fullRestart();
+        } catch (Exception e) {
+            throw new AssertionError(e);
+        } finally {
+            var fieldCapsRequest = new FieldCapabilitiesRequest();
+            /*
+             * We have an origin and 2 linked clusters but will target only the one that we stalled.
+             * This is because when the timeout kicks in, and we move on from the stalled cluster, we do not want
+             * the error to be a top-level error. Rather, it must be present in the response object under "failures".
+             * All other errors are free to be top-level errors though.
+             */
+            fieldCapsRequest.indices(LINKED_CLUSTER_1 + ":*");
+            fieldCapsRequest.fields("foo", "bar", "baz");
+            var result = safeGet(client().execute(TransportFieldCapabilitiesAction.TYPE, fieldCapsRequest));
+
+            var failures = result.getFailures();
+            assertThat(failures.size(), Matchers.is(1));
+
+            var failure = failures.getFirst();
+            assertThat(failure.getIndices().length, Matchers.is(1));
+            assertThat(failure.getIndices()[0], Matchers.equalTo("cluster-a:*"));
+            // Outer wrapper that gets unwrapped in ExceptionsHelper.isRemoteUnavailableException().
+            assertThat(
+                failure.getException().toString(),
+                Matchers.containsString("java.lang.IllegalStateException: Unable to open any connections")
+            );
+
+            // The actual error that is thrown by the subscribable listener when a linked cluster could not be talked to.
+            assertThat(failure.getException().getCause(), Matchers.instanceOf(ElasticsearchTimeoutException.class));
+            assertThat(ExceptionsHelper.isRemoteUnavailableException(failure.getException()), Matchers.is(true));
+
+            latch.countDown();
+            result.decRef();
+        }
+    }
+}

+ 67 - 21
server/src/main/java/org/elasticsearch/action/fieldcaps/TransportFieldCapabilitiesAction.java

@@ -11,8 +11,10 @@ package org.elasticsearch.action.fieldcaps;
 
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.automaton.TooComplexToDeterminizeException;
+import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.OriginalIndices;
@@ -22,7 +24,7 @@ import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.ChannelActionListener;
 import org.elasticsearch.action.support.HandledTransportAction;
 import org.elasticsearch.action.support.RefCountingRunnable;
-import org.elasticsearch.client.internal.RemoteClusterClient;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.ProjectState;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@@ -37,6 +39,7 @@ import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.common.util.concurrent.ThrottledTaskRunner;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.indices.IndicesService;
@@ -48,9 +51,10 @@ import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteClusterAware;
-import org.elasticsearch.transport.RemoteClusterService;
+import org.elasticsearch.transport.Transport;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportRequestHandler;
+import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.ArrayList;
@@ -91,6 +95,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
 
     private final IndicesService indicesService;
     private final boolean ccsCheckCompatibility;
+    private final ThreadPool threadPool;
+    private final TimeValue forceConnectTimeoutSecs;
 
     @Inject
     public TransportFieldCapabilitiesAction(
@@ -117,6 +123,8 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             new NodeTransportHandler()
         );
         this.ccsCheckCompatibility = SearchService.CCS_VERSION_CHECK_SETTING.get(clusterService.getSettings());
+        this.threadPool = threadPool;
+        this.forceConnectTimeoutSecs = clusterService.getSettings().getAsTime("search.ccs.force_connect_timeout", null);
     }
 
     @Override
@@ -124,7 +132,13 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
         executeRequest(
             task,
             request,
-            (remoteClient, remoteRequest, remoteListener) -> remoteClient.execute(REMOTE_TYPE, remoteRequest, remoteListener),
+            (transportService, conn, fieldCapabilitiesRequest, responseHandler) -> transportService.sendRequest(
+                conn,
+                REMOTE_TYPE.name(),
+                fieldCapabilitiesRequest,
+                TransportRequestOptions.EMPTY,
+                responseHandler
+            ),
             listener
         );
     }
@@ -132,17 +146,17 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
     public void executeRequest(
         Task task,
         FieldCapabilitiesRequest request,
-        RemoteRequestExecutor remoteRequestExecutor,
+        LinkedRequestExecutor linkedRequestExecutor,
         ActionListener<FieldCapabilitiesResponse> listener
     ) {
         // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
-        searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, remoteRequestExecutor, l)));
+        searchCoordinationExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, linkedRequestExecutor, l)));
     }
 
     private void doExecuteForked(
         Task task,
         FieldCapabilitiesRequest request,
-        RemoteRequestExecutor remoteRequestExecutor,
+        LinkedRequestExecutor linkedRequestExecutor,
         ActionListener<FieldCapabilitiesResponse> listener
     ) {
         if (ccsCheckCompatibility) {
@@ -268,12 +282,6 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             for (Map.Entry<String, OriginalIndices> remoteIndices : remoteClusterIndices.entrySet()) {
                 String clusterAlias = remoteIndices.getKey();
                 OriginalIndices originalIndices = remoteIndices.getValue();
-                var remoteClusterClient = transportService.getRemoteClusterService()
-                    .getRemoteClusterClient(
-                        clusterAlias,
-                        singleThreadedExecutor,
-                        RemoteClusterService.DisconnectedStrategy.RECONNECT_UNLESS_SKIP_UNAVAILABLE
-                    );
                 FieldCapabilitiesRequest remoteRequest = prepareRemoteRequest(clusterAlias, request, originalIndices, nowInMillis);
                 ActionListener<FieldCapabilitiesResponse> remoteListener = ActionListener.wrap(response -> {
                     for (FieldCapabilitiesIndexResponse resp : response.getIndexResponses()) {
@@ -299,9 +307,13 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
                         handleIndexFailure.accept(RemoteClusterAware.buildRemoteIndexName(clusterAlias, index), ex);
                     }
                 });
-                remoteRequestExecutor.executeRemoteRequest(
-                    remoteClusterClient,
-                    remoteRequest,
+
+                SubscribableListener<Transport.Connection> connectionListener = new SubscribableListener<>();
+                if (forceConnectTimeoutSecs != null) {
+                    connectionListener.addTimeout(forceConnectTimeoutSecs, threadPool, singleThreadedExecutor);
+                }
+
+                connectionListener.addListener(
                     // The underlying transport service may call onFailure with a thread pool other than search_coordinator.
                     // This fork is a workaround to ensure that the merging of field-caps always occurs on the search_coordinator.
                     // TODO: remove this workaround after we fixed https://github.com/elastic/elasticsearch/issues/107439
@@ -309,8 +321,20 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
                         singleThreadedExecutor,
                         true,
                         ActionListener.releaseAfter(remoteListener, refs.acquire())
+                    ).delegateFailure(
+                        (responseListener, conn) -> linkedRequestExecutor.executeRemoteRequest(
+                            transportService,
+                            conn,
+                            remoteRequest,
+                            new ActionListenerResponseHandler<>(responseListener, FieldCapabilitiesResponse::new, singleThreadedExecutor)
+                        )
                     )
                 );
+
+                boolean ensureConnected = forceConnectTimeoutSecs != null
+                    || transportService.getRemoteClusterService().isSkipUnavailable(clusterAlias) == false;
+                transportService.getRemoteClusterService()
+                    .maybeEnsureConnectedAndGetConnection(clusterAlias, ensureConnected, connectionListener);
             }
         }
     }
@@ -338,11 +362,12 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
         });
     }
 
-    public interface RemoteRequestExecutor {
+    public interface LinkedRequestExecutor {
         void executeRemoteRequest(
-            RemoteClusterClient remoteClient,
+            TransportService transportService,
+            Transport.Connection conn,
             FieldCapabilitiesRequest remoteRequest,
-            ActionListener<FieldCapabilitiesResponse> remoteListener
+            ActionListenerResponseHandler<FieldCapabilitiesResponse> responseHandler
         );
     }
 
@@ -376,8 +401,20 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
         } else {
             // we have no responses at all, maybe because of errors
             if (indexFailures.isEmpty() == false) {
-                // throw back the first exception
-                listener.onFailure(failures.get(0).getException());
+                /*
+                 * Under no circumstances are we to pass timeout errors originating from SubscribableListener as top-level errors.
+                 * Instead, they should always be passed through the response object, as part of "failures".
+                 */
+                if (failures.stream()
+                    .anyMatch(
+                        failure -> failure.getException() instanceof IllegalStateException ise
+                            && ise.getCause() instanceof ElasticsearchTimeoutException
+                    )) {
+                    listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), failures));
+                } else {
+                    // throw back the first exception
+                    listener.onFailure(failures.get(0).getException());
+                }
             } else {
                 listener.onResponse(new FieldCapabilitiesResponse(Collections.emptyList(), Collections.emptyList()));
             }
@@ -585,15 +622,24 @@ public class TransportFieldCapabilitiesAction extends HandledTransportAction<Fie
             for (Map.Entry<String, Exception> failure : failuresByIndex.entrySet()) {
                 String index = failure.getKey();
                 Exception e = failure.getValue();
+                /*
+                 * The listener we use to briefly try, and connect to a linked cluster can throw an ElasticsearchTimeoutException
+                 * error if it cannot be reached. To make sure we correctly recognise this scenario via
+                 * ExceptionsHelper.isRemoteUnavailableException(), we wrap this error appropriately.
+                 */
+                if (e instanceof ElasticsearchTimeoutException ete) {
+                    e = new IllegalStateException("Unable to open any connections", ete);
+                }
 
                 if (successfulIndices.contains(index) == false) {
                     // we deduplicate exceptions on the underlying causes message and classname
                     // we unwrap the cause to e.g. group RemoteTransportExceptions coming from different nodes if the cause is the same
                     Throwable cause = ExceptionsHelper.unwrapCause(e);
                     Tuple<String, String> groupingKey = new Tuple<>(cause.getMessage(), cause.getClass().getName());
+                    Exception ex = e;
                     indexFailures.compute(
                         groupingKey,
-                        (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, e) : v.addIndex(index)
+                        (k, v) -> v == null ? new FieldCapabilitiesFailure(new String[] { index }, ex) : v.addIndex(index)
                     );
                 }
             }

+ 13 - 12
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResolveFieldsAction.java

@@ -8,6 +8,7 @@ package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
 import org.elasticsearch.action.ActionType;
 import org.elasticsearch.action.RemoteClusterActionType;
 import org.elasticsearch.action.fieldcaps.FieldCapabilitiesRequest;
@@ -15,10 +16,11 @@ import org.elasticsearch.action.fieldcaps.FieldCapabilitiesResponse;
 import org.elasticsearch.action.fieldcaps.TransportFieldCapabilitiesAction;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
-import org.elasticsearch.client.internal.RemoteClusterClient;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.TransportRequestOptions;
 import org.elasticsearch.transport.TransportService;
 
 /**
@@ -49,19 +51,18 @@ public class EsqlResolveFieldsAction extends HandledTransportAction<FieldCapabil
 
     @Override
     protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener<FieldCapabilitiesResponse> listener) {
-        fieldCapsAction.executeRequest(task, request, this::executeRemoteRequest, listener);
+        fieldCapsAction.executeRequest(task, request, this::executeLinkedRequest, listener);
     }
 
-    void executeRemoteRequest(
-        RemoteClusterClient remoteClient,
-        FieldCapabilitiesRequest remoteRequest,
-        ActionListener<FieldCapabilitiesResponse> remoteListener
+    void executeLinkedRequest(
+        TransportService transportService,
+        Transport.Connection conn,
+        FieldCapabilitiesRequest request,
+        ActionListenerResponseHandler<FieldCapabilitiesResponse> responseHandler
     ) {
-        remoteClient.getConnection(remoteRequest, remoteListener.delegateFailure((l, conn) -> {
-            var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)
-                ? RESOLVE_REMOTE_TYPE
-                : TransportFieldCapabilitiesAction.REMOTE_TYPE;
-            remoteClient.execute(conn, remoteAction, remoteRequest, l);
-        }));
+        var remoteAction = conn.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)
+            ? RESOLVE_REMOTE_TYPE
+            : TransportFieldCapabilitiesAction.REMOTE_TYPE;
+        transportService.sendRequest(conn, remoteAction.name(), request, TransportRequestOptions.EMPTY, responseHandler);
     }
 }