Browse Source

Report failures on partial results (#124823)

* Report failures on partial results
Stanislav Malyshev 7 months ago
parent
commit
0e6d6f4324

+ 5 - 0
docs/changelog/124823.yaml

@@ -0,0 +1,5 @@
+pr: 124823
+summary: Report failures on partial results
+area: "ES|QL"
+type: enhancement
+issues: []

+ 22 - 1
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractCrossClusterTestCase.java

@@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 
@@ -143,7 +144,27 @@ public abstract class AbstractCrossClusterTestCase extends AbstractMultiClusters
                 assertThat((int) inner.get("total"), equalTo(numClusters));
                 assertTrue(inner.containsKey("details"));
             } else {
-                assertNull(clusters);
+                final Object partial = esqlResponseAsMap.get("is_partial");
+                if (partial != null && (Boolean) partial) {
+                    // If we have partial response, we could have cluster metadata, it should contain details.
+                    // Details should not be empty, and it should contain clusters with failures.
+                    if (clusters != null) {
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> inner = (Map<String, Object>) clusters;
+                        assertThat(inner, aMapWithSize(1));
+                        assertTrue(inner.containsKey("details"));
+                        @SuppressWarnings("unchecked")
+                        Map<String, Object> details = (Map<String, Object>) inner.get("details");
+                        assertThat(details.size(), greaterThanOrEqualTo(1));
+                        details.forEach((k, v) -> {
+                            @SuppressWarnings("unchecked")
+                            Map<String, Object> cluster = (Map<String, Object>) v;
+                            assertTrue(cluster.containsKey("failures"));
+                        });
+                    }
+                } else {
+                    assertNull(clusters);
+                }
             }
         } catch (IOException e) {
             fail("Could not convert ESQLQueryResponse to Map: " + e);

+ 27 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

@@ -218,6 +218,15 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
             || clusterInfo.size() == 1 && clusterInfo.containsKey(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY) == false;
     }
 
+    /**
+     * Is there any metadata to report in the response?
+     * This is true on cross-cluster search with includeCCSMetadata=true or when there are partial failures.
+     */
+    public boolean hasMetadataToReport() {
+        return isCrossClusterSearch() && includeCCSMetadata
+            || (isPartial && clusterInfo.values().stream().anyMatch(c -> c.getFailures().isEmpty() == false));
+    }
+
     public Cluster getCluster(String clusterAlias) {
         return clusterInfo.get(clusterAlias);
     }
@@ -257,9 +266,13 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
 
     @Override
     public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
-        if (isCrossClusterSearch() == false || clusterInfo.isEmpty()) {
+        if (clusterInfo.isEmpty()) {
             return Collections.emptyIterator();
         }
+        if (includeCCSMetadata == false) {
+            // If includeCCSMetadata is false, the only reason we're here is partial failures, so just report them.
+            return onlyFailuresToXContent();
+        }
         Map<Cluster.Status, Integer> clusterStatuses = new EnumMap<>(Cluster.Status.class);
         for (Cluster info : clusterInfo.values()) {
             clusterStatuses.merge(info.getStatus(), 1, Integer::sum);
@@ -280,6 +293,19 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
         );
     }
 
+    private Iterator<? extends ToXContent> onlyFailuresToXContent() {
+        Iterator<Cluster> failuresIterator = clusterInfo.values().stream().filter(c -> (c.getFailures().isEmpty() == false)).iterator();
+        if (failuresIterator.hasNext()) {
+            return Iterators.concat(
+                ChunkedToXContentHelper.startObject(),
+                ChunkedToXContentHelper.object("details", failuresIterator),
+                ChunkedToXContentHelper.endObject()
+            );
+        } else {
+            return Collections.emptyIterator();
+        }
+    }
+
     /**
      * @param status the status you want to access
      * @return a stream of clusters with that status

+ 3 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -228,11 +228,9 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
         Iterator<ToXContent> profileRender = profile != null
             ? ChunkedToXContentHelper.field("profile", profile, params)
             : Collections.emptyIterator();
-        Iterator<ToXContent> executionInfoRender = executionInfo != null
-            && executionInfo.isCrossClusterSearch()
-            && executionInfo.includeCCSMetadata()
-                ? ChunkedToXContentHelper.field("_clusters", executionInfo, params)
-                : Collections.emptyIterator();
+        Iterator<ToXContent> executionInfoRender = executionInfo != null && executionInfo.hasMetadataToReport()
+            ? ChunkedToXContentHelper.field("_clusters", executionInfo, params)
+            : Collections.emptyIterator();
         return Iterators.concat(
             ChunkedToXContentHelper.startObject(),
             asyncPropertiesOrEmpty(),

+ 66 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfoTests.java

@@ -0,0 +1,66 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.transport.RemoteClusterService;
+
+import java.util.List;
+
+public class EsqlExecutionInfoTests extends ESTestCase {
+
+    static final EsqlExecutionInfo.Cluster localCluster = new EsqlExecutionInfo.Cluster(
+        RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
+        "test"
+    );
+    static final EsqlExecutionInfo.Cluster remoteCluster = new EsqlExecutionInfo.Cluster("remote", "test");
+
+    public void testHasMetadataInclude() {
+        // includeCCSMetadata + non-local clusters will produce true
+        EsqlExecutionInfo info = new EsqlExecutionInfo(true);
+        assertFalse(info.hasMetadataToReport());
+        info.swapCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> localCluster);
+        assertFalse(info.hasMetadataToReport());
+        info.swapCluster("remote", (k, v) -> remoteCluster);
+        assertTrue(info.hasMetadataToReport());
+        // Only remote is enough
+        info = new EsqlExecutionInfo(true);
+        info.swapCluster("remote", (k, v) -> remoteCluster);
+        assertTrue(info.hasMetadataToReport());
+    }
+
+    public void testHasMetadataIncludeFalse() {
+        // If includeCCSMetadata is false, then it should always return false
+        EsqlExecutionInfo info = new EsqlExecutionInfo(false);
+        assertFalse(info.hasMetadataToReport());
+        assertFalse(info.hasMetadataToReport());
+        info.swapCluster(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, (k, v) -> localCluster);
+        assertFalse(info.hasMetadataToReport());
+        info.swapCluster("remote", (k, v) -> remoteCluster);
+        assertFalse(info.hasMetadataToReport());
+    }
+
+    public void testHasMetadataPartial() {
+        EsqlExecutionInfo info = new EsqlExecutionInfo(false);
+        String key = randomFrom(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY, "remote");
+        info.swapCluster(key, (k, v) -> new EsqlExecutionInfo.Cluster(k, "test", false, EsqlExecutionInfo.Cluster.Status.SUCCESSFUL));
+        assertFalse(info.isPartial());
+        assertFalse(info.hasMetadataToReport());
+        info.swapCluster(key, (k, v) -> new EsqlExecutionInfo.Cluster(k, "test", false, EsqlExecutionInfo.Cluster.Status.PARTIAL));
+        assertTrue(info.isPartial());
+        assertFalse(info.hasMetadataToReport());
+        info.swapCluster(key, (k, v) -> {
+            EsqlExecutionInfo.Cluster.Builder builder = new EsqlExecutionInfo.Cluster.Builder(v);
+            builder.setFailures(List.of(new ShardSearchFailure(new IllegalStateException("shard failure"))));
+            return builder.build();
+        });
+        assertTrue(info.hasMetadataToReport());
+    }
+
+}