Browse Source

Better failure handling for lookup join (#132874)

Stanislav Malyshev 1 month ago
parent
commit
e0296d0217

+ 10 - 8
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterLookupJoinFailuresIT.java

@@ -94,6 +94,7 @@ public class CrossClusterLookupJoinFailuresIT extends AbstractCrossClusterTestCa
             } */
 
             try (
+                // This only calls REMOTE_CLUSTER_1 which is skip_unavailable=true
                 EsqlQueryResponse resp = runQuery(
                     "FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key",
                     randomBoolean()
@@ -112,9 +113,8 @@ public class CrossClusterLookupJoinFailuresIT extends AbstractCrossClusterTestCa
                 assertThat(remoteCluster.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SKIPPED));
                 assertThat(remoteCluster.getFailures(), not(empty()));
                 var failure = remoteCluster.getFailures().get(0);
-                // FIXME: this produces a wrong message currently
-                // assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
-                assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
+                assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
+                assertThat(failure.reason(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]"));
             }
 
             try (
@@ -138,24 +138,26 @@ public class CrossClusterLookupJoinFailuresIT extends AbstractCrossClusterTestCa
                 assertThat(remoteCluster2.getStatus(), equalTo(EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
                 assertThat(remoteCluster.getFailures(), not(empty()));
                 var failure = remoteCluster.getFailures().get(0);
-                // FIXME: this produces a wrong message currently
-                // assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
-                assertThat(failure.reason(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
+                assertThat(failure.reason(), containsString(simulatedFailure.getMessage()));
+                assertThat(failure.reason(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]"));
             }
 
             // now fail
             setSkipUnavailable(REMOTE_CLUSTER_1, false);
+            // c*: only calls REMOTE_CLUSTER_1 which is skip_unavailable=false now
             Exception ex = expectThrows(
                 VerificationException.class,
                 () -> runQuery("FROM logs-*,c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
             );
-            assertThat(ex.getMessage(), containsString("lookup index [values_lookup] is not available in remote cluster [cluster-a]"));
+            assertThat(ex.getMessage(), containsString("lookup failed in remote cluster [cluster-a] for index [values_lookup]"));
+            String message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage();
+            assertThat(message, containsString(simulatedFailure.getMessage()));
 
             ex = expectThrows(
                 Exception.class,
                 () -> runQuery("FROM c*:logs-* | EVAL lookup_key = v | LOOKUP JOIN values_lookup ON lookup_key", randomBoolean())
             );
-            String message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage();
+            message = ex.getCause() == null ? ex.getMessage() : ex.getCause().getMessage();
             assertThat(message, containsString(simulatedFailure.getMessage()));
         } finally {
             for (TransportService transportService : cluster(REMOTE_CLUSTER_1).getInstances(TransportService.class)) {

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/VerificationException.java

@@ -25,4 +25,8 @@ public class VerificationException extends EsqlClientException {
         super(failures.toString());
     }
 
+    public VerificationException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
 }

+ 18 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/EsqlSession.java

@@ -428,7 +428,10 @@ public class EsqlSession {
     }
 
     private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, String message) {
-        VerificationException error = new VerificationException(message);
+        skipClusterOrError(clusterAlias, executionInfo, new VerificationException(message));
+    }
+
+    private void skipClusterOrError(String clusterAlias, EsqlExecutionInfo executionInfo, ElasticsearchException error) {
         // If we can, skip the cluster and mark it as such
         if (executionInfo.shouldSkipOnFailure(clusterAlias)) {
             EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, error);
@@ -528,11 +531,7 @@ public class EsqlSession {
             String clusterAlias = cluster.getClusterAlias();
             if (clustersWithResolvedIndices.containsKey(clusterAlias) == false) {
                 // Missing cluster resolution
-                skipClusterOrError(
-                    clusterAlias,
-                    executionInfo,
-                    "lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias)
-                );
+                skipClusterOrError(clusterAlias, executionInfo, findFailure(lookupIndexResolution.failures(), index, clusterAlias));
             }
         });
 
@@ -542,6 +541,19 @@ public class EsqlSession {
         );
     }
 
+    private ElasticsearchException findFailure(Map<String, List<FieldCapabilitiesFailure>> failures, String index, String clusterAlias) {
+        if (failures.containsKey(clusterAlias)) {
+            var exc = failures.get(clusterAlias).stream().findFirst().map(FieldCapabilitiesFailure::getException);
+            if (exc.isPresent()) {
+                return new VerificationException(
+                    "lookup failed " + EsqlCCSUtils.inClusterName(clusterAlias) + " for index [" + index + "]",
+                    ExceptionsHelper.unwrapCause(exc.get())
+                );
+            }
+        }
+        return new VerificationException("lookup index [" + index + "] is not available " + EsqlCCSUtils.inClusterName(clusterAlias));
+    }
+
     /**
      * Check whether the lookup index resolves to a single concrete index on all clusters or not.
      * If it's a single index, we are compatible with old pre-9.2 LOOKUP JOIN code and just need to send the same resolution as we did.