Bläddra i källkod

ES|QL: Wrap remote errors with cluster name to provide more context (#123156)

Wrap remote errors with cluster name to provide more context

Previously, if a remote encountered an error, user would see a top-level error that would provide no context about which remote ran into the error. Now, such errors are wrapped in a separate remote exception whose error message clearly specifies the name of the remote cluster and the error that occurred is the cause of this remote exception.
Pawan Kartik 7 månader sedan
förälder
incheckning
e4fb22c4f3

+ 5 - 0
docs/changelog/123156.yaml

@@ -0,0 +1,5 @@
+pr: 123156
+summary: Wrap remote errors with cluster name to provide more context
+area: Search
+type: enhancement
+issues: []

+ 3 - 1
server/src/main/java/org/elasticsearch/ElasticsearchException.java

@@ -16,6 +16,7 @@ import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.store.LockObtainFailedException;
 import org.elasticsearch.action.bulk.IndexDocFailureStoreStatus;
 import org.elasticsearch.action.support.replication.ReplicationOperation;
+import org.elasticsearch.cluster.RemoteException;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
 import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
 import org.elasticsearch.common.io.stream.StreamInput;
@@ -1981,7 +1982,8 @@ public class ElasticsearchException extends RuntimeException implements ToXConte
             IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus::new,
             183,
             TransportVersions.V_8_16_0
-        );
+        ),
+        REMOTE_EXCEPTION(RemoteException.class, RemoteException::new, 184, TransportVersions.REMOTE_EXCEPTION);
 
         final Class<? extends ElasticsearchException> exceptionClass;
         final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -209,6 +209,7 @@ public class TransportVersions {
     public static final TransportVersion INDEX_STATS_AND_METADATA_INCLUDE_PEAK_WRITE_LOAD = def(9_041_0_00);
     public static final TransportVersion REPOSITORIES_METADATA_AS_PROJECT_CUSTOM = def(9_042_0_00);
     public static final TransportVersion BATCHED_QUERY_PHASE_VERSION = def(9_043_0_00);
+    public static final TransportVersion REMOTE_EXCEPTION = def(9_044_0_00);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 44 - 0
server/src/main/java/org/elasticsearch/cluster/RemoteException.java

@@ -0,0 +1,44 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Represents an error that occurred on a remote node.
+ * It allows capturing some context such as the cluster alias that encountered the error.
+ */
+public class RemoteException extends ElasticsearchException {
+
+    /**
+     * @param clusterAlias Name of the cluster.
+     * @param cause Error that was encountered.
+     */
+    public RemoteException(String clusterAlias, Throwable cause) {
+        super("Remote [" + clusterAlias + "] encountered an error", cause);
+        Objects.requireNonNull(cause);
+    }
+
+    public RemoteException(StreamInput in) throws IOException {
+        super(in);
+    }
+
+    @Override
+    public RestStatus status() {
+        // This is similar to what we do in SearchPhaseExecutionException.
+        return ExceptionsHelper.status(getCause());
+    }
+}

+ 2 - 0
server/src/test/java/org/elasticsearch/ExceptionSerializationTests.java

@@ -21,6 +21,7 @@ import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.action.support.replication.ReplicationOperation;
 import org.elasticsearch.client.internal.AbstractClientHeadersTestCase;
+import org.elasticsearch.cluster.RemoteException;
 import org.elasticsearch.cluster.action.shard.ShardStateAction;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.coordination.CoordinationStateRejectedException;
@@ -840,6 +841,7 @@ public class ExceptionSerializationTests extends ESTestCase {
         ids.put(181, ResourceAlreadyUploadedException.class);
         ids.put(182, IngestPipelineException.class);
         ids.put(183, IndexDocFailureStoreStatus.ExceptionWithFailureStoreStatus.class);
+        ids.put(184, RemoteException.class);
 
         Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
         for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {

+ 15 - 0
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

@@ -11,6 +11,7 @@ import org.apache.lucene.document.InetAddressPoint;
 import org.apache.lucene.sandbox.document.HalfFloatPoint;
 import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.cluster.RemoteException;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Strings;
@@ -858,4 +859,18 @@ public final class EsqlTestUtils {
         assertThat(collection, hasSize(1));
         return collection.iterator().next();
     }
+
+    /**
+     * Errors from remotes are wrapped in RemoteException while the ones from the local cluster
+     * aren't. This utility method is useful for unwrapping in such cases.
+     * @param e Exception to unwrap.
+     * @return Cause of RemoteException, else the error itself.
+     */
+    public static Exception unwrapIfWrappedInRemoteException(Exception e) {
+        if (e instanceof RemoteException rce) {
+            return (Exception) rce.getCause();
+        } else {
+            return e;
+        }
+    }
 }

+ 3 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterCancellationIT.java

@@ -26,6 +26,7 @@ import org.elasticsearch.test.AbstractMultiClustersTestCase;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.plugin.ComputeService;
 import org.junit.After;
 import org.junit.Before;
@@ -163,6 +164,7 @@ public class CrossClusterCancellationIT extends AbstractMultiClustersTestCase {
             SimplePauseFieldPlugin.allowEmitting.countDown();
         }
         Exception error = expectThrows(Exception.class, requestFuture::actionGet);
+        error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
         assertThat(error.getMessage(), containsString("proxy timeout"));
     }
 
@@ -284,6 +286,7 @@ public class CrossClusterCancellationIT extends AbstractMultiClustersTestCase {
         }
 
         Exception error = expectThrows(Exception.class, requestFuture::actionGet);
+        error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
         assertThat(error, instanceOf(TaskCancelledException.class));
     }
 }

+ 7 - 3
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryIT.java

@@ -29,6 +29,7 @@ import org.elasticsearch.test.XContentTestUtils;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.esql.EsqlTestUtils;
 import org.elasticsearch.xpack.esql.VerificationException;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 
@@ -813,10 +814,13 @@ public class CrossClusterQueryIT extends AbstractCrossClusterTestCase {
         Map<String, Object> testClusterInfo = setupFailClusters();
         String localIndex = (String) testClusterInfo.get("local.index");
         String remote1Index = (String) testClusterInfo.get("remote.index");
-        int localNumShards = (Integer) testClusterInfo.get("local.num_shards");
         String q = Strings.format("FROM %s,cluster-a:%s*", localIndex, remote1Index);
-        IllegalStateException e = expectThrows(IllegalStateException.class, () -> runQuery(q, false));
-        assertThat(e.getMessage(), containsString("Accessing failing field"));
+
+        Exception error = expectThrows(Exception.class, () -> runQuery(q, false));
+        error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
+
+        assertThat(error, instanceOf(IllegalStateException.class));
+        assertThat(error.getMessage(), containsString("Accessing failing field"));
     }
 
     private static void assertClusterMetadataInResponse(EsqlQueryResponse resp, boolean responseExpectMeta) {

+ 8 - 2
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterQueryWithPartialResultsIT.java

@@ -42,6 +42,7 @@ import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.hamcrest.Matchers.not;
@@ -104,7 +105,10 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
         request.includeCCSMetadata(randomBoolean());
         {
             request.allowPartialResults(false);
-            IllegalStateException error = expectThrows(IllegalStateException.class, () -> runQuery(request).close());
+            Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
+            error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
+
+            assertThat(error, instanceOf(IllegalStateException.class));
             assertThat(error.getMessage(), containsString("Accessing failing field"));
         }
         request.allowPartialResults(true);
@@ -190,6 +194,7 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
             {
                 request.allowPartialResults(false);
                 Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
+                error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
                 var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
                 assertNotNull(unwrapped);
                 assertThat(unwrapped.getMessage(), equalTo(simulatedFailure.getMessage()));
@@ -236,7 +241,8 @@ public class CrossClusterQueryWithPartialResultsIT extends AbstractCrossClusterT
             request.includeCCSMetadata(randomBoolean());
             {
                 request.allowPartialResults(false);
-                var error = expectThrows(Exception.class, () -> runQuery(request).close());
+                Exception error = expectThrows(Exception.class, () -> runQuery(request).close());
+                error = EsqlTestUtils.unwrapIfWrappedInRemoteException(error);
                 EsqlTestUtils.assertEsqlFailure(error);
                 var unwrapped = ExceptionsHelper.unwrap(error, simulatedFailure.getClass());
                 assertNotNull(unwrapped);

+ 43 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlRemoteErrorWrapIT.java

@@ -0,0 +1,43 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.cluster.RemoteException;
+import org.elasticsearch.compute.operator.exchange.ExchangeService;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.TransportService;
+
+import static org.hamcrest.Matchers.is;
+
+public class EsqlRemoteErrorWrapIT extends AbstractCrossClusterTestCase {
+
+    public void testThatRemoteErrorsAreWrapped() throws Exception {
+        setupClusters(2);
+        setSkipUnavailable(REMOTE_CLUSTER_1, false);
+        setSkipUnavailable(REMOTE_CLUSTER_2, false);
+
+        /*
+         * Let's say something went wrong with the Exchange and its specifics when talking to a remote.
+         * And let's pretend only cluster-a is affected.
+         */
+        for (var nodes : cluster(REMOTE_CLUSTER_1).getNodeNames()) {
+            ((MockTransportService) cluster(REMOTE_CLUSTER_1).getInstance(TransportService.class, nodes)).addRequestHandlingBehavior(
+                ExchangeService.OPEN_EXCHANGE_ACTION_NAME,
+                (requestHandler, transportRequest, transportChannel, transportTask) -> {
+                    throw new IllegalArgumentException("some error to wreck havoc");
+                }
+            );
+        }
+
+        RemoteException wrappedError = expectThrows(
+            RemoteException.class,
+            () -> runQuery("FROM " + REMOTE_CLUSTER_1 + ":*," + REMOTE_CLUSTER_2 + ":* | LIMIT 100", false)
+        );
+        assertThat(wrappedError.getMessage(), is("Remote [cluster-a] encountered an error"));
+    }
+}

+ 19 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -11,6 +11,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.OriginalIndices;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.cluster.RemoteException;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.RunOnce;
@@ -19,6 +20,7 @@ import org.elasticsearch.compute.data.Page;
 import org.elasticsearch.compute.operator.Driver;
 import org.elasticsearch.compute.operator.DriverProfile;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
+import org.elasticsearch.compute.operator.FailureCollector;
 import org.elasticsearch.compute.operator.exchange.ExchangeService;
 import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
 import org.elasticsearch.core.Releasable;
@@ -38,6 +40,7 @@ import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.RemoteClusterAware;
+import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.esql.action.EsqlExecutionInfo;
@@ -338,7 +341,22 @@ public class ComputeService {
                         cluster,
                         cancelQueryOnFailure,
                         execInfo,
-                        computeListener.acquireCompute()
+                        computeListener.acquireCompute().delegateResponse((l, ex) -> {
+                            /*
+                             * At various points, when collecting failures before sending a response, we manually check
+                             * if an ex is a transport error and if it is, we unwrap it. Because we're wrapping an ex
+                             * in RemoteException, the checks fail and unwrapping does not happen. We offload the
+                             * unwrapping to here.
+                             *
+                             * Note: The other error we explicitly check for is TaskCancelledException which is never
+                             * wrapped.
+                             */
+                            if (ex instanceof TransportException te) {
+                                l.onFailure(new RemoteException(cluster.clusterAlias(), FailureCollector.unwrapTransportException(te)));
+                            } else {
+                                l.onFailure(new RemoteException(cluster.clusterAlias(), ex));
+                            }
+                        })
                     );
                 }
             }