Bladeren bron

Support partial results in ES|QL (#121942)

This change introduces partial results in ES|QL. To minimize the scope of the 
changes, this PR is just the first step toward full support for partial
results. The following follow-up tasks are required:

- Support partial results across clusters

- Return shard-level failures (currently, we only return the `is_partial` flag)

- Add documentation

- Allow partial results during resolution
Nhat Nguyen 8 maanden geleden
bovenliggende
commit
b07ba89b1d
25 gewijzigde bestanden met toevoegingen van 305 en 116 verwijderingen
  1. 5 0
      docs/changelog/121942.yaml
  2. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  3. 2 1
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java
  4. 2 1
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java
  5. 21 6
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java
  6. 44 24
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java
  7. 58 12
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java
  8. 6 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java
  9. 11 15
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java
  10. 9 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java
  11. 4 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
  12. 2 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java
  13. 9 11
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java
  14. 21 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java
  15. 9 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java
  16. 3 2
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java
  17. 2 11
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  18. 26 3
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java
  19. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java
  20. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java
  21. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java
  22. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java
  23. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java
  24. 58 13
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java
  25. 2 1
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/ConfigurationSerializationTests.java

+ 5 - 0
docs/changelog/121942.yaml

@@ -0,0 +1,5 @@
+pr: 121942
+summary: Allow partial results in ES|QL
+area: ES|QL
+type: enhancement
+issues: []

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -192,6 +192,7 @@ public class TransportVersions {
     public static final TransportVersion ESQL_LOOKUP_JOIN_SOURCE_TEXT = def(9_008_0_00);
     public static final TransportVersion REMOVE_ALL_APPLICABLE_SELECTOR = def(9_009_0_00);
     public static final TransportVersion SLM_UNHEALTHY_IF_NO_SNAPSHOT_WITHIN = def(9_010_0_00);
+    public static final TransportVersion ESQL_SUPPORT_PARTIAL_RESULTS = def(9_011_0_00);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 2 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/ConfigurationTestUtils.java

@@ -71,7 +71,8 @@ public class ConfigurationTestUtils {
             query,
             profile,
             tables,
-            System.nanoTime()
+            System.nanoTime(),
+            false
         );
     }
 

+ 2 - 1
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/EsqlTestUtils.java

@@ -391,7 +391,8 @@ public final class EsqlTestUtils {
             query,
             false,
             TABLES,
-            System.nanoTime()
+            System.nanoTime(),
+            false
         );
     }
 

+ 21 - 6
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionBreakerIT.java

@@ -95,15 +95,30 @@ public class EsqlActionBreakerIT extends EsqlActionIT {
 
     @Override
     protected EsqlQueryResponse run(EsqlQueryRequest request) {
+        if (randomBoolean()) {
+            request.allowPartialResults(randomBoolean());
+        }
+        Exception failure = null;
         try {
-            return runWithBreaking(request);
-        } catch (Exception e) {
-            try (EsqlQueryResponse resp = super.run(request)) {
-                assertThat(e, instanceOf(CircuitBreakingException.class));
-                assertThat(ExceptionsHelper.status(e), equalTo(RestStatus.TOO_MANY_REQUESTS));
-                resp.incRef();
+            final EsqlQueryResponse resp = runWithBreaking(request);
+            if (resp.isPartial() == false) {
                 return resp;
             }
+            try (resp) {
+                assertTrue(request.allowPartialResults());
+            }
+        } catch (Exception e) {
+            failure = e;
+        }
+        // Re-run if the previous query failed or returned partial results
+        // Only check the previous failure if the second query succeeded
+        try (EsqlQueryResponse resp = super.run(request)) {
+            if (failure != null) {
+                assertThat(failure, instanceOf(CircuitBreakingException.class));
+                assertThat(ExceptionsHelper.status(failure), equalTo(RestStatus.TOO_MANY_REQUESTS));
+            }
+            resp.incRef();
+            return resp;
         }
     }
 

+ 44 - 24
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlDisruptionIT.java

@@ -83,38 +83,58 @@ public class EsqlDisruptionIT extends EsqlActionIT {
         logger.info("--> start disruption scheme [{}]", disruptionScheme);
         disruptionScheme.startDisrupting();
         logger.info("--> executing esql query with disruption {} ", request.query());
+        if (randomBoolean()) {
+            request.allowPartialResults(randomBoolean());
+        }
         ActionFuture<EsqlQueryResponse> future = client().execute(EsqlQueryAction.INSTANCE, request);
+        EsqlQueryResponse resp = null;
         try {
-            return future.actionGet(2, TimeUnit.MINUTES);
+            resp = future.actionGet(2, TimeUnit.MINUTES);
+            if (resp.isPartial() == false) {
+                return resp;
+            }
         } catch (Exception ignored) {
 
         } finally {
             clearDisruption();
         }
-        try {
-            return future.actionGet(2, TimeUnit.MINUTES);
-        } catch (Exception e) {
-            logger.info(
-                "running tasks: {}",
-                client().admin()
-                    .cluster()
-                    .prepareListTasks()
-                    .get()
-                    .getTasks()
-                    .stream()
-                    .filter(
-                        // Skip the tasks we that'd get in the way while debugging
-                        t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
-                            && false == t.action().contains(HealthNode.TASK_NAME)
-                    )
-                    .toList()
-            );
-            assertTrue("request must be failed or completed after clearing disruption", future.isDone());
-            ensureBlocksReleased();
-            logger.info("--> failed to execute esql query with disruption; retrying...", e);
-            EsqlTestUtils.assertEsqlFailure(e);
-            return client().execute(EsqlQueryAction.INSTANCE, request).actionGet(2, TimeUnit.MINUTES);
+        // wait for the response after clear disruption
+        if (resp == null) {
+            try {
+                resp = future.actionGet(2, TimeUnit.MINUTES);
+            } catch (Exception e) {
+                logger.info(
+                    "running tasks: {}",
+                    client().admin()
+                        .cluster()
+                        .prepareListTasks()
+                        .get()
+                        .getTasks()
+                        .stream()
+                        .filter(
+                            // Skip the tasks we that'd get in the way while debugging
+                            t -> false == t.action().contains(TransportListTasksAction.TYPE.name())
+                                && false == t.action().contains(HealthNode.TASK_NAME)
+                        )
+                        .toList()
+                );
+                assertTrue("request must be failed or completed after clearing disruption", future.isDone());
+                ensureBlocksReleased();
+                logger.info("--> failed to execute esql query with disruption; retrying...", e);
+                EsqlTestUtils.assertEsqlFailure(e);
+            }
+        }
+        // use the response if it's not partial
+        if (resp != null) {
+            if (resp.isPartial() == false) {
+                return resp;
+            }
+            try (var ignored = resp) {
+                assertTrue(request.allowPartialResults());
+            }
         }
+        // re-run the query
+        return super.run(request);
     }
 
     private ServiceDisruptionScheme addRandomDisruptionScheme() {

+ 58 - 12
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlNodeFailureIT.java

@@ -16,13 +16,17 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.FailingFieldPlugin;
 import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.esql.EsqlTestUtils;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
 
 /**
  * Make sure the failures on the data node come back as failures over the wire.
@@ -48,10 +52,7 @@ public class EsqlNodeFailureIT extends AbstractEsqlIntegTestCase {
         return settings;
     }
 
-    /**
-     * Use a runtime field that fails when loading field values to fail the entire query.
-     */
-    public void testFailureLoadingFields() throws IOException {
+    public Set<String> populateIndices() throws Exception {
         XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
         mapping.startObject("runtime");
         {
@@ -63,17 +64,62 @@ public class EsqlNodeFailureIT extends AbstractEsqlIntegTestCase {
             mapping.endObject();
         }
         mapping.endObject();
-        client().admin().indices().prepareCreate("fail").setSettings(indexSettings(1, 0)).setMapping(mapping.endObject()).get();
-
-        int docCount = 50;
-        List<IndexRequestBuilder> docs = new ArrayList<>(docCount);
-        for (int d = 0; d < docCount; d++) {
-            docs.add(client().prepareIndex("ok").setSource("foo", d));
+        client().admin().indices().prepareCreate("fail").setMapping(mapping.endObject()).get();
+        int okCount = between(1, 50);
+        Set<String> okIds = new HashSet<>();
+        List<IndexRequestBuilder> docs = new ArrayList<>(okCount);
+        for (int d = 0; d < okCount; d++) {
+            String id = "ok-" + d;
+            okIds.add(id);
+            docs.add(client().prepareIndex("ok").setId(id).setSource("foo", d));
+        }
+        int failCount = between(1, 50);
+        for (int d = 0; d < failCount; d++) {
+            docs.add(client().prepareIndex("fail").setId("fail-" + d).setSource("foo", d));
         }
-        docs.add(client().prepareIndex("fail").setSource("foo", 0));
         indexRandom(true, docs);
+        return okIds;
+    }
 
+    /**
+     * Use a runtime field that fails when loading field values to fail the entire query.
+     */
+    public void testFailureLoadingFields() throws Exception {
+        populateIndices();
         IllegalStateException e = expectThrows(IllegalStateException.class, () -> run("FROM fail,ok | LIMIT 100").close());
         assertThat(e.getMessage(), equalTo("Accessing failing field"));
     }
+
+    public void testPartialResults() throws Exception {
+        Set<String> okIds = populateIndices();
+        {
+            EsqlQueryRequest request = new EsqlQueryRequest();
+            request.query("FROM fail,ok | LIMIT 100");
+            request.allowPartialResults(true);
+            request.pragmas(randomPragmas());
+            try (EsqlQueryResponse resp = run(request)) {
+                assertTrue(resp.isPartial());
+                List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
+                assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
+            }
+        }
+        {
+            EsqlQueryRequest request = new EsqlQueryRequest();
+            request.query("FROM fail,ok METADATA _id | KEEP _id, fail_me | LIMIT 100");
+            request.allowPartialResults(true);
+            request.pragmas(randomPragmas());
+            try (EsqlQueryResponse resp = run(request)) {
+                assertTrue(resp.isPartial());
+                List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
+                assertThat(rows.size(), lessThanOrEqualTo(okIds.size()));
+                Set<String> actualIds = new HashSet<>();
+                for (List<Object> row : rows) {
+                    assertThat(row.size(), equalTo(2));
+                    String id = (String) row.getFirst();
+                    assertThat(id, in(okIds));
+                    assertTrue(actualIds.add(id));
+                }
+            }
+        }
+    }
 }

+ 6 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlCapabilities.java

@@ -819,7 +819,12 @@ public class EsqlCapabilities {
          * Fixes a series of issues with inlinestats which had an incomplete implementation after lookup and inlinestats
          * were refactored.
          */
-        INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG);
+        INLINESTATS_V3(EsqlPlugin.INLINESTATS_FEATURE_FLAG),
+
+        /**
+         * Support partial_results
+         */
+        SUPPORT_PARTIAL_RESULTS;
 
         private final boolean enabled;
 

+ 11 - 15
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlExecutionInfo.java

@@ -246,7 +246,13 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
      * @return the new Cluster object
      */
     public Cluster swapCluster(String clusterAlias, BiFunction<String, Cluster, Cluster> remappingFunction) {
-        return clusterInfo.compute(clusterAlias, remappingFunction);
+        return clusterInfo.compute(clusterAlias, (unused, oldCluster) -> {
+            final Cluster newCluster = remappingFunction.apply(clusterAlias, oldCluster);
+            if (newCluster != null && isPartial == false) {
+                isPartial = newCluster.isPartial();
+            }
+            return newCluster;
+        });
     }
 
     @Override
@@ -305,13 +311,6 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
         return isPartial;
     }
 
-    /**
-     * Mark the query as having partial results.
-     */
-    public void markAsPartial() {
-        isPartial = true;
-    }
-
     public void markAsStopped() {
         isStopped = true;
     }
@@ -320,13 +319,6 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
         return isStopped;
     }
 
-    /**
-     * Mark this cluster as having partial results.
-     */
-    public void markClusterAsPartial(String clusterAlias) {
-        swapCluster(clusterAlias, (k, v) -> new Cluster.Builder(v).setStatus(Cluster.Status.PARTIAL).build());
-    }
-
     /**
      * Represents the search metadata about a particular cluster involved in a cross-cluster search.
      * The Cluster object can represent either the local cluster or a remote cluster.
@@ -618,6 +610,10 @@ public class EsqlExecutionInfo implements ChunkedToXContentObject, Writeable {
             return failures;
         }
 
+        boolean isPartial() {
+            return status == Status.PARTIAL || status == Status.SKIPPED || (failedShards != null && failedShards > 0);
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) return true;

+ 9 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java

@@ -52,6 +52,7 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
     private boolean keepOnCompletion;
     private boolean onSnapshotBuild = Build.current().isSnapshot();
     private boolean acceptedPragmaRisks = false;
+    private boolean allowPartialResults = false;
 
     /**
      * "Tables" provided in the request for use with things like {@code LOOKUP}.
@@ -231,6 +232,14 @@ public class EsqlQueryRequest extends org.elasticsearch.xpack.core.esql.action.E
         return tables;
     }
 
+    public boolean allowPartialResults() {
+        return allowPartialResults;
+    }
+
+    public void allowPartialResults(boolean allowPartialResults) {
+        this.allowPartialResults = allowPartialResults;
+    }
+
     @Override
     public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
         // Pass the query as the description

+ 4 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -181,6 +181,10 @@ public class EsqlQueryResponse extends org.elasticsearch.xpack.core.esql.action.
         return isRunning;
     }
 
+    public boolean isPartial() {
+        return executionInfo != null && executionInfo.isPartial();
+    }
+
     public EsqlExecutionInfo getExecutionInfo() {
         return executionInfo;
     }

+ 2 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RequestXContent.java

@@ -85,6 +85,7 @@ final class RequestXContent {
     static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
     static final ParseField KEEP_ALIVE = new ParseField("keep_alive");
     static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion");
+    static final ParseField ALLOW_PARTIAL_RESULTS = new ParseField("allow_partial_results");
 
     private static final ObjectParser<EsqlQueryRequest, Void> SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest);
     private static final ObjectParser<EsqlQueryRequest, Void> ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest);
@@ -114,6 +115,7 @@ final class RequestXContent {
         parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD);
         parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD);
         parser.declareField((p, r, c) -> new ParseTables(r, p).parseTables(), TABLES_FIELD, ObjectParser.ValueType.OBJECT);
+        parser.declareBoolean(EsqlQueryRequest::allowPartialResults, ALLOW_PARTIAL_RESULTS);
     }
 
     private static ObjectParser<EsqlQueryRequest, Void> objectParserSync(Supplier<EsqlQueryRequest> supplier) {

+ 9 - 11
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java

@@ -209,7 +209,7 @@ public class ComputeService {
                         transportService.getThreadPool(),
                         cancelQueryOnFailure,
                         computeListener.acquireCompute().delegateFailure((l, profiles) -> {
-                            if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
+                            if (execInfo.clusterInfo.containsKey(LOCAL_CLUSTER)) {
                                 var tookTime = TimeValue.timeValueNanos(System.nanoTime() - execInfo.getRelativeStartNanos());
                                 var status = localClusterWasInterrupted.get()
                                     ? EsqlExecutionInfo.Cluster.Status.PARTIAL
@@ -252,16 +252,14 @@ public class ComputeService {
                             cancelQueryOnFailure,
                             localListener.acquireCompute().map(r -> {
                                 localClusterWasInterrupted.set(execInfo.isStopped());
-                                if (execInfo.isCrossClusterSearch() && execInfo.clusterAliases().contains(LOCAL_CLUSTER)) {
-                                    execInfo.swapCluster(
-                                        LOCAL_CLUSTER,
-                                        (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards())
-                                            .setSuccessfulShards(r.getSuccessfulShards())
-                                            .setSkippedShards(r.getSkippedShards())
-                                            .setFailedShards(r.getFailedShards())
-                                            .build()
-                                    );
-                                }
+                                execInfo.swapCluster(
+                                    LOCAL_CLUSTER,
+                                    (k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setTotalShards(r.getTotalShards())
+                                        .setSuccessfulShards(r.getSuccessfulShards())
+                                        .setSkippedShards(r.getSkippedShards())
+                                        .setFailedShards(r.getFailedShards())
+                                        .build()
+                                );
                                 return r.getProfiles();
                             })
                         );

+ 21 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java

@@ -97,7 +97,8 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
         Runnable runOnTaskFailure,
         ActionListener<ComputeResponse> outListener
     ) {
-        DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask) {
+        final boolean allowPartialResults = configuration.allowPartialResults();
+        DataNodeRequestSender sender = new DataNodeRequestSender(transportService, esqlExecutor, parentTask, allowPartialResults) {
             @Override
             protected void sendRequest(
                 DiscoveryNode node,
@@ -125,14 +126,28 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
                     queryPragmas.exchangeBufferSize(),
                     esqlExecutor,
                     listener.delegateFailureAndWrap((l, unused) -> {
+                        final Runnable onGroupFailure;
+                        final CancellableTask groupTask;
+                        if (allowPartialResults) {
+                            groupTask = RemoteListenerGroup.createGroupTask(
+                                transportService,
+                                parentTask,
+                                () -> "compute group: data-node [" + node.getName() + "], " + shardIds + " [" + shardIds + "]"
+                            );
+                            onGroupFailure = computeService.cancelQueryOnFailure(groupTask);
+                            l = ActionListener.runAfter(l, () -> transportService.getTaskManager().unregister(groupTask));
+                        } else {
+                            groupTask = parentTask;
+                            onGroupFailure = runOnTaskFailure;
+                        }
                         final AtomicReference<DataNodeComputeResponse> nodeResponseRef = new AtomicReference<>();
                         try (
-                            var computeListener = new ComputeListener(threadPool, runOnTaskFailure, l.map(ignored -> nodeResponseRef.get()))
+                            var computeListener = new ComputeListener(threadPool, onGroupFailure, l.map(ignored -> nodeResponseRef.get()))
                         ) {
-                            final var remoteSink = exchangeService.newRemoteSink(parentTask, childSessionId, transportService, connection);
+                            final var remoteSink = exchangeService.newRemoteSink(groupTask, childSessionId, transportService, connection);
                             exchangeSource.addRemoteSink(
                                 remoteSink,
-                                true,
+                                allowPartialResults == false,
                                 pagesFetched::incrementAndGet,
                                 queryPragmas.concurrentExchangeClients(),
                                 computeListener.acquireAvoid()
@@ -153,7 +168,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
                                 connection,
                                 ComputeService.DATA_ACTION_NAME,
                                 dataNodeRequest,
-                                parentTask,
+                                groupTask,
                                 TransportRequestOptions.EMPTY,
                                 new ActionListenerResponseHandler<>(computeListener.acquireCompute().map(r -> {
                                     nodeResponseRef.set(r);
@@ -238,6 +253,7 @@ final class DataNodeComputeHandler implements TransportRequestHandler<DataNodeRe
                         }
                         onResponse(List.of());
                     } else {
+                        // TODO: add these to fatal failures so we can continue processing other shards.
                         try {
                             exchangeService.finishSinkHandler(request.sessionId(), e);
                         } finally {

+ 9 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSender.java

@@ -55,6 +55,7 @@ abstract class DataNodeRequestSender {
     private final TransportService transportService;
     private final Executor esqlExecutor;
     private final CancellableTask rootTask;
+    private final boolean allowPartialResults;
     private final ReentrantLock sendingLock = new ReentrantLock();
     private final Queue<ShardId> pendingShardIds = ConcurrentCollections.newQueue();
     private final Map<DiscoveryNode, Semaphore> nodePermits = new HashMap<>();
@@ -62,10 +63,11 @@ abstract class DataNodeRequestSender {
     private final AtomicBoolean changed = new AtomicBoolean();
     private boolean reportedFailure = false; // guarded by sendingLock
 
-    DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask) {
+    DataNodeRequestSender(TransportService transportService, Executor esqlExecutor, CancellableTask rootTask, boolean allowPartialResults) {
         this.transportService = transportService;
         this.esqlExecutor = esqlExecutor;
         this.rootTask = rootTask;
+        this.allowPartialResults = allowPartialResults;
     }
 
     final void startComputeOnDataNodes(
@@ -80,13 +82,14 @@ abstract class DataNodeRequestSender {
         searchShards(rootTask, clusterAlias, requestFilter, concreteIndices, originalIndices, ActionListener.wrap(targetShards -> {
             try (var computeListener = new ComputeListener(transportService.getThreadPool(), runOnTaskFailure, listener.map(profiles -> {
                 TimeValue took = TimeValue.timeValueNanos(System.nanoTime() - startTimeInNanos);
+                final int failedShards = shardFailures.size();
                 return new ComputeResponse(
                     profiles,
                     took,
                     targetShards.totalShards(),
-                    targetShards.totalShards(),
+                    targetShards.totalShards() - failedShards,
                     targetShards.skippedShards(),
-                    0
+                    failedShards
                 );
             }))) {
                 for (TargetShard shard : targetShards.shards.values()) {
@@ -120,7 +123,8 @@ abstract class DataNodeRequestSender {
                             );
                         }
                     }
-                    if (reportedFailure || shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal)) {
+                    if (reportedFailure
+                        || (allowPartialResults == false && shardFailures.values().stream().anyMatch(shardFailure -> shardFailure.fatal))) {
                         reportedFailure = true;
                         reportFailures(computeListener);
                     } else {
@@ -345,7 +349,7 @@ abstract class DataNodeRequestSender {
             filter,
             null,
             null,
-            false,
+            true, // unavailable_shards will be handled by the sender
             clusterAlias
         );
         transportService.sendChildRequest(

+ 3 - 2
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/RemoteListenerGroup.java

@@ -50,7 +50,7 @@ class RemoteListenerGroup {
         this.taskManager = transportService.getTaskManager();
         this.clusterAlias = clusterAlias;
         this.executionInfo = executionInfo;
-        groupTask = createGroupTask(rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]");
+        groupTask = createGroupTask(transportService, rootTask, () -> rootTask.getDescription() + "[" + clusterAlias + "]");
         CountDown countDown = new CountDown(2);
         // The group is done when both the sink and the cluster request are done
         Runnable finishGroup = () -> {
@@ -92,7 +92,8 @@ class RemoteListenerGroup {
         return clusterRequestListener;
     }
 
-    private CancellableTask createGroupTask(Task parentTask, Supplier<String> description) {
+    public static CancellableTask createGroupTask(TransportService transportService, Task parentTask, Supplier<String> description) {
+        final TaskManager taskManager = transportService.getTaskManager();
         return (CancellableTask) taskManager.register(
             "transport",
             "esql_compute_group",

+ 2 - 11
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -206,7 +206,8 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             request.query(),
             request.profile(),
             request.tables(),
-            System.nanoTime()
+            System.nanoTime(),
+            request.allowPartialResults()
         );
         String sessionId = sessionID(task);
         // async-query uses EsqlQueryTask, so pull the EsqlExecutionInfo out of the task
@@ -233,16 +234,6 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             planRunner,
             services,
             ActionListener.wrap(result -> {
-                // If we had any skipped or partial clusters, the result is partial
-                if (executionInfo.getClusters()
-                    .values()
-                    .stream()
-                    .anyMatch(
-                        c -> c.getStatus() == EsqlExecutionInfo.Cluster.Status.SKIPPED
-                            || c.getStatus() == EsqlExecutionInfo.Cluster.Status.PARTIAL
-                    )) {
-                    executionInfo.markAsPartial();
-                }
                 recordCCSTelemetry(task, executionInfo, request, null);
                 listener.onResponse(toResponse(task, request, configuration, result));
             }, ex -> {

+ 26 - 3
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/session/Configuration.java

@@ -50,6 +50,7 @@ public class Configuration implements Writeable {
     private final String query;
 
     private final boolean profile;
+    private final boolean allowPartialResults;
 
     private final Map<String, Map<String, Column>> tables;
     private final long queryStartTimeNanos;
@@ -65,7 +66,8 @@ public class Configuration implements Writeable {
         String query,
         boolean profile,
         Map<String, Map<String, Column>> tables,
-        long queryStartTimeNanos
+        long queryStartTimeNanos,
+        boolean allowPartialResults
     ) {
         this.zoneId = zi.normalized();
         this.now = ZonedDateTime.now(Clock.tick(Clock.system(zoneId), Duration.ofNanos(1)));
@@ -80,6 +82,7 @@ public class Configuration implements Writeable {
         this.tables = tables;
         assert tables != null;
         this.queryStartTimeNanos = queryStartTimeNanos;
+        this.allowPartialResults = allowPartialResults;
     }
 
     public Configuration(BlockStreamInput in) throws IOException {
@@ -107,6 +110,11 @@ public class Configuration implements Writeable {
         } else {
             this.queryStartTimeNanos = -1;
         }
+        if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS)) {
+            this.allowPartialResults = in.readBoolean();
+        } else {
+            this.allowPartialResults = false;
+        }
     }
 
     @Override
@@ -131,6 +139,9 @@ public class Configuration implements Writeable {
         if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
             out.writeLong(queryStartTimeNanos);
         }
+        if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_SUPPORT_PARTIAL_RESULTS)) {
+            out.writeBoolean(allowPartialResults);
+        }
     }
 
     public ZoneId zoneId() {
@@ -206,6 +217,13 @@ public class Configuration implements Writeable {
         return profile;
     }
 
+    /**
+     * Whether this request can return partial results instead of failing fast on failures
+     */
+    public boolean allowPartialResults() {
+        return allowPartialResults;
+    }
+
     private static void writeQuery(StreamOutput out, String query) throws IOException {
         if (query.length() > QUERY_COMPRESS_THRESHOLD_CHARS) { // compare on chars to avoid UTF-8 encoding unless actually required
             out.writeBoolean(true);
@@ -244,7 +262,8 @@ public class Configuration implements Writeable {
             && Objects.equals(locale, that.locale)
             && Objects.equals(that.query, query)
             && profile == that.profile
-            && tables.equals(that.tables);
+            && tables.equals(that.tables)
+            && allowPartialResults == that.allowPartialResults;
     }
 
     @Override
@@ -260,7 +279,8 @@ public class Configuration implements Writeable {
             locale,
             query,
             profile,
-            tables
+            tables,
+            allowPartialResults
         );
     }
 
@@ -282,6 +302,9 @@ public class Configuration implements Writeable {
             + profile
             + ", tables="
             + tables
+            + "allow_partial_result="
+            + allowPartialResults
             + '}';
     }
+
 }

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/AbstractConfigurationFunctionTestCase.java

@@ -43,7 +43,8 @@ public abstract class AbstractConfigurationFunctionTestCase extends AbstractScal
             StringUtils.EMPTY,
             randomBoolean(),
             Map.of(),
-            System.nanoTime()
+            System.nanoTime(),
+            randomBoolean()
         );
     }
 

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToLowerTests.java

@@ -70,7 +70,8 @@ public class ToLowerTests extends AbstractConfigurationFunctionTestCase {
             "",
             false,
             Map.of(),
-            System.nanoTime()
+            System.nanoTime(),
+            randomBoolean()
         );
     }
 

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/scalar/string/ToUpperTests.java

@@ -70,7 +70,8 @@ public class ToUpperTests extends AbstractConfigurationFunctionTestCase {
             "",
             false,
             Map.of(),
-            System.nanoTime()
+            System.nanoTime(),
+            randomBoolean()
         );
     }
 

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/EvalMapperTests.java

@@ -78,7 +78,8 @@ public class EvalMapperTests extends ESTestCase {
         StringUtils.EMPTY,
         false,
         Map.of(),
-        System.nanoTime()
+        System.nanoTime(),
+        false
     );
 
     @ParametersFactory(argumentFormatting = "%1$s")

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

@@ -202,7 +202,8 @@ public class LocalExecutionPlannerTests extends MapperServiceTestCase {
             StringUtils.EMPTY,
             false,
             Map.of(),
-            System.nanoTime()
+            System.nanoTime(),
+            randomBoolean()
         );
     }
 

+ 58 - 13
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/DataNodeRequestSenderTests.java

@@ -52,6 +52,8 @@ import static org.elasticsearch.xpack.esql.plugin.DataNodeRequestSender.NodeRequ
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.not;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -88,7 +90,11 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
     }
 
     public void testEmpty() {
-        var future = sendRequests(List.of(), (node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent"));
+        var future = sendRequests(
+            List.of(),
+            randomBoolean(),
+            (node, shardIds, aliasFilters, listener) -> fail("expect no data-node request is sent")
+        );
         var resp = safeGet(future);
         assertThat(resp.totalShards, equalTo(0));
     }
@@ -101,7 +107,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             targetShard(shard4, node2, node3)
         );
         Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
-        var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> {
+        var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> {
             sent.add(new NodeRequest(node, shardIds, aliasFilters));
             var resp = new DataNodeComputeResponse(List.of(), Map.of());
             runWithDelay(() -> listener.onResponse(resp));
@@ -112,12 +118,25 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
     }
 
     public void testMissingShards() {
-        var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
-        var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> {
-            fail("expect no data-node request is sent when target shards are missing");
-        });
-        var error = expectThrows(NoShardAvailableActionException.class, future::actionGet);
-        assertThat(error.getMessage(), containsString("no shard copies found"));
+        {
+            var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
+            var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
+                fail("expect no data-node request is sent when target shards are missing");
+            });
+            var error = expectThrows(NoShardAvailableActionException.class, future::actionGet);
+            assertThat(error.getMessage(), containsString("no shard copies found"));
+        }
+        {
+            var targetShards = List.of(targetShard(shard1, node1), targetShard(shard3), targetShard(shard4, node2, node3));
+            var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> {
+                assertThat(shard3, not(in(shardIds)));
+                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+            });
+            ComputeResponse resp = safeGet(future);
+            assertThat(resp.totalShards, equalTo(3));
+            assertThat(resp.failedShards, equalTo(1));
+            assertThat(resp.successfulShards, equalTo(2));
+        }
     }
 
     public void testRetryThenSuccess() {
@@ -129,7 +148,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             targetShard(shard5, node1, node3, node2)
         );
         Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
-        var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> {
+        var future = sendRequests(targetShards, randomBoolean(), (node, shardIds, aliasFilters, listener) -> {
             sent.add(new NodeRequest(node, shardIds, aliasFilters));
             Map<ShardId, Exception> failures = new HashMap<>();
             if (node.equals(node1) && shardIds.contains(shard5)) {
@@ -161,7 +180,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             targetShard(shard5, node1, node3, node2)
         );
         Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
-        var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> {
+        var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
             sent.add(new NodeRequest(node, shardIds, aliasFilters));
             Map<ShardId, Exception> failures = new HashMap<>();
             if (shardIds.contains(shard5)) {
@@ -187,7 +206,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
         var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1));
         Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
         AtomicBoolean failed = new AtomicBoolean();
-        var future = sendRequests(targetShards, (node, shardIds, aliasFilters, listener) -> {
+        var future = sendRequests(targetShards, false, (node, shardIds, aliasFilters, listener) -> {
             sent.add(new NodeRequest(node, shardIds, aliasFilters));
             if (node1.equals(node) && failed.compareAndSet(false, true)) {
                 runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true));
@@ -203,6 +222,28 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
         assertThat(firstRound, equalTo(Map.of(node1, List.of(shard1, shard3), node2, List.of(shard2))));
     }
 
+    public void testAllowPartialResults() {
+        var targetShards = List.of(targetShard(shard1, node1), targetShard(shard2, node2), targetShard(shard3, node1, node2));
+        Queue<NodeRequest> sent = ConcurrentCollections.newQueue();
+        AtomicBoolean failed = new AtomicBoolean();
+        var future = sendRequests(targetShards, true, (node, shardIds, aliasFilters, listener) -> {
+            sent.add(new NodeRequest(node, shardIds, aliasFilters));
+            if (node1.equals(node) && failed.compareAndSet(false, true)) {
+                runWithDelay(() -> listener.onFailure(new IOException("test request level failure"), true));
+            } else {
+                runWithDelay(() -> listener.onResponse(new DataNodeComputeResponse(List.of(), Map.of())));
+            }
+        });
+        ComputeResponse resp = safeGet(future);
+        // one round: {node-1, node-2}
+        assertThat(sent.size(), equalTo(2));
+        var firstRound = groupRequests(sent, 2);
+        assertThat(firstRound, equalTo(Map.of(node1, List.of(shard1, shard3), node2, List.of(shard2))));
+        assertThat(resp.totalShards, equalTo(3));
+        assertThat(resp.failedShards, equalTo(2));
+        assertThat(resp.successfulShards, equalTo(1));
+    }
+
     static DataNodeRequestSender.TargetShard targetShard(ShardId shardId, DiscoveryNode... nodes) {
         return new DataNodeRequestSender.TargetShard(shardId, new ArrayList<>(Arrays.asList(nodes)), null);
     }
@@ -224,7 +265,11 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
         }
     }
 
-    PlainActionFuture<ComputeResponse> sendRequests(List<DataNodeRequestSender.TargetShard> shards, Sender sender) {
+    PlainActionFuture<ComputeResponse> sendRequests(
+        List<DataNodeRequestSender.TargetShard> shards,
+        boolean allowPartialResults,
+        Sender sender
+    ) {
         PlainActionFuture<ComputeResponse> future = new PlainActionFuture<>();
         TransportService transportService = mock(TransportService.class);
         when(transportService.getThreadPool()).thenReturn(threadPool);
@@ -236,7 +281,7 @@ public class DataNodeRequestSenderTests extends ComputeTestCase {
             TaskId.EMPTY_TASK_ID,
             Collections.emptyMap()
         );
-        DataNodeRequestSender requestSender = new DataNodeRequestSender(transportService, executor, task) {
+        DataNodeRequestSender requestSender = new DataNodeRequestSender(transportService, executor, task, allowPartialResults) {
             @Override
             void searchShards(
                 Task parentTask,

+ 2 - 1
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/session/ConfigurationSerializationTests.java

@@ -103,7 +103,8 @@ public class ConfigurationSerializationTests extends AbstractWireSerializingTest
             query,
             profile,
             tables,
-            System.nanoTime()
+            System.nanoTime(),
+            randomBoolean()
         );
 
     }