浏览代码

ES|QL Async Query API (#103398)

This commit adds two new endpoints to support ES|QL async querying.

The endpoints added are:
 1. GET /_query/async - for initiating a query
 2. GET /_query/async/{id} - for retrieving the result
Chris Hegarty 1 年之前
父节点
当前提交
6ca309983d
共有 29 个文件被更改,包括 1210 次插入271 次删除
  1. 5 0
      docs/changelog/103398.yaml
  2. 82 0
      docs/reference/esql/esql-async-query-api.asciidoc
  3. 1 0
      server/src/main/java/org/elasticsearch/TransportVersions.java
  4. 3 2
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncTask.java
  5. 1 1
      x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/ConsumeProcessor.java
  6. 2 2
      x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java
  7. 74 0
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncActionIT.java
  8. 18 16
      x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/WarningsIT.java
  9. 21 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlAsyncGetResultAction.java
  10. 89 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java
  11. 21 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestBuilder.java
  12. 137 213
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java
  13. 38 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java
  14. 13 6
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java
  15. 177 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java
  16. 91 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java
  17. 62 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlAsyncQueryAction.java
  18. 42 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlGetAsyncResultAction.java
  19. 1 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlQueryAction.java
  20. 5 1
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java
  21. 58 0
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java
  22. 97 5
      x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java
  23. 6 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java
  24. 79 8
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java
  25. 71 6
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java
  26. 4 4
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java
  27. 3 0
      x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java
  28. 8 1
      x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java
  29. 1 0
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

+ 5 - 0
docs/changelog/103398.yaml

@@ -0,0 +1,5 @@
+pr: 103398
+summary: ES|QL Async Query API
+area: ES|QL
+type: enhancement
+issues: []

+ 82 - 0
docs/reference/esql/esql-async-query-api.asciidoc

@@ -0,0 +1,82 @@
+[[esql-async-query-api]]
+== {esql} async query API
+++++
+<titleabbrev>{esql} async query API</titleabbrev>
+++++
+
+Runs an async {esql} search.
+
+The async query API lets you asynchronously execute a search request,
+monitor its progress, and retrieve results as they become available.
+
+Executing an <<esql,ES|QL ({es} query language)>> is commonly quite fast,
+however searches across large data sets or frozen data can take some time.
+To avoid long waits, run an async {esql} search.
+
+Searches initiated by this API may return search results or not. The
+`wait_for_completion_timeout` property determines how long to wait for
+the search results. The default value is 1 second. If the results are
+not available by this time, a search id is return which can be later
+used to retrieve the results.
+
+Initiates an async search for an <<esql,ES|QL ({es} query language)>>
+query. The API accepts the same parameters and request body as the
+<<esql-query-api,query API>>.
+
+[source,console]
+----
+POST /_query/async
+{
+  "query": """
+    FROM library
+    | EVAL year = DATE_TRUNC(1 YEARS, release_date)
+    | STATS MAX(page_count) BY year
+    | SORT year
+    | LIMIT 5
+  """,
+  "wait_for_completion_timeout": "2s"
+}
+----
+// TEST[setup:library]
+
+If the results are not available within the timeout period, 2 seconds in
+this case, the search returns no results but rather a response that
+includes:
+
+ * A search ID
+ * An `is_running` value of true, indicating the search is ongoing
+
+The query continues to run in the background without blocking other
+requests.
+
+[source,console-result]
+----
+{
+  "id": "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=",
+  "is_running": true
+}
+----
+// TEST[skip: no access to search ID - may return response values]
+
+To check the progress of an async search, use the <<get-async-esql-query-api,get
+async ES|QL query API>> with the search ID. Specify how long you'd like for
+complete results in the `wait_for_completion_timeout` parameter.
+
+[source,console]
+----
+GET /_query/async/get/FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=?wait_for_completion_timeout=30s
+----
+// TEST[skip: no access to search ID - may return response values]
+
+If the response's `is_running` value is `false`, the async search has
+finished, and the results are returned.
+
+[source,console-result]
+----
+{
+  "id": "FmNJRUZ1YWZCU3dHY1BIOUhaenVSRkEaaXFlZ3h4c1RTWFNocDdnY2FSaERnUTozNDE=",
+  "is_running": false,
+  "columns": ...
+}
+----
+// TEST[skip: no access to search ID - may return response values]

+ 1 - 0
server/src/main/java/org/elasticsearch/TransportVersions.java

@@ -172,6 +172,7 @@ public class TransportVersions {
     public static final TransportVersion ENRICH_ELASTICSEARCH_VERSION_REMOVED = def(8_560_00_0);
     public static final TransportVersion NODE_STATS_REQUEST_SIMPLIFIED = def(8_561_00_0);
     public static final TransportVersion TEXT_EXPANSION_TOKEN_PRUNING_CONFIG_ADDED = def(8_562_00_0);
+    public static final TransportVersion ESQL_ASYNC_QUERY = def(8_563_00_0);
 
     /*
      * STOP! READ THIS FIRST! No, really,

+ 3 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/StoredAsyncTask.java

@@ -23,7 +23,7 @@ public abstract class StoredAsyncTask<Response extends ActionResponse> extends C
     private final AsyncExecutionId asyncExecutionId;
     private final Map<String, String> originHeaders;
     private volatile long expirationTimeMillis;
-    private final List<ActionListener<Response>> completionListeners;
+    protected final List<ActionListener<Response>> completionListeners;
 
     @SuppressWarnings("this-escape")
     public StoredAsyncTask(
@@ -79,7 +79,8 @@ public abstract class StoredAsyncTask<Response extends ActionResponse> extends C
      */
     public synchronized void onResponse(Response response) {
         for (ActionListener<Response> listener : completionListeners) {
-            listener.onResponse(response);
+            response.incRef();
+            ActionListener.respondAndRelease(listener, response);
         }
     }
 

+ 1 - 1
x-pack/plugin/esql/compute/gen/src/main/java/org/elasticsearch/compute/gen/ConsumeProcessor.java

@@ -40,7 +40,7 @@ public class ConsumeProcessor implements Processor {
             "org.elasticsearch.xpack.esql.expression.function.FunctionInfo",
             "org.elasticsearch.xpack.esql.expression.function.Param",
             "org.elasticsearch.rest.ServerlessScope",
-
+            "org.elasticsearch.xcontent.ParserConstructor",
             Fixed.class.getName()
         );
     }

+ 2 - 2
x-pack/plugin/esql/qa/testFixtures/src/main/java/org/elasticsearch/xpack/esql/CsvTestUtils.java

@@ -24,7 +24,7 @@ import org.elasticsearch.core.Releasables;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.test.VersionUtils;
-import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
+import org.elasticsearch.xpack.esql.action.ResponseValueUtils;
 import org.elasticsearch.xpack.ql.util.StringUtils;
 import org.supercsv.io.CsvListReader;
 import org.supercsv.prefs.CsvPreference;
@@ -477,7 +477,7 @@ public final class CsvTestUtils {
         Map<String, List<String>> responseHeaders
     ) {
         Iterator<Iterator<Object>> values() {
-            return EsqlQueryResponse.pagesToValues(dataTypes(), pages);
+            return ResponseValueUtils.pagesToValues(dataTypes(), pages);
         }
     }
 

+ 74 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncActionIT.java

@@ -0,0 +1,74 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
+import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+
+public class EsqlAsyncActionIT extends EsqlActionIT {
+
+    @Override
+    protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) {
+        EsqlQueryRequest request = new EsqlQueryRequest();
+        request.query(esqlCommands);
+        request.pragmas(pragmas);
+        request.async(true);
+        // deliberately small timeout, to frequently trigger incomplete response
+        request.waitForCompletionTimeout(TimeValue.timeValueNanos(1));
+        request.keepOnCompletion(randomBoolean());
+        if (filter != null) {
+            request.filter(filter);
+        }
+
+        var response = run(request);
+        if (response.asyncExecutionId().isPresent()) {
+            assertThat(response.isRunning(), is(true));
+            assertThat(response.columns(), is(empty())); // no partial results
+            assertThat(response.pages(), is(empty()));
+            response.close();
+            return getAsyncResponse(response.asyncExecutionId().get());
+        } else {
+            return response;
+        }
+    }
+
+    EsqlQueryResponse getAsyncResponse(String id) {
+        try {
+            GetAsyncResultRequest getResultsRequest = new GetAsyncResultRequest(id).setWaitForCompletionTimeout(
+                TimeValue.timeValueSeconds(60)
+            );
+            var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).actionGet(30, TimeUnit.SECONDS);
+            // resp.decRef(); // the client has incremented our non-0 resp
+            return resp;
+        } catch (ElasticsearchTimeoutException e) {
+            throw new AssertionError("timeout", e);
+        }
+    }
+
+    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102455")
+    // junit.framework.AssertionFailedError: Unexpected exception type, expected VerificationException but got
+    // org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper: verification_exception: Found 1 problem
+    @Override
+    public void testOverlappingIndexPatterns() throws Exception {
+        super.testOverlappingIndexPatterns();
+    }
+
+    @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/102455")
+    @Override
+    public void testIndexPatterns() throws Exception {
+        super.testOverlappingIndexPatterns();
+    }
+}

+ 18 - 16
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/WarningsIT.java

@@ -8,23 +8,21 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.transport.TransportService;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 
-@TestLogging(value = "org.elasticsearch.xpack.esql:TRACE", reason = "debug")
 public class WarningsIT extends AbstractEsqlIntegTestCase {
 
-    public void testCollectWarnings() {
+    public void testCollectWarnings() throws Exception {
         final String node1, node2;
         if (randomBoolean()) {
             internalCluster().ensureAtLeastNumDataNodes(2);
@@ -64,19 +62,23 @@ public class WarningsIT extends AbstractEsqlIntegTestCase {
         EsqlQueryRequest request = new EsqlQueryRequest();
         request.query("FROM index-* | EVAL ip = to_ip(host) | STATS s = COUNT(*) by ip | KEEP ip | LIMIT 100");
         request.pragmas(randomPragmas());
-        PlainActionFuture<EsqlQueryResponse> future = new PlainActionFuture<>();
-        client(coordinator.getName()).execute(EsqlQueryAction.INSTANCE, request, ActionListener.runBefore(future, () -> {
-            var threadpool = internalCluster().getInstance(TransportService.class, coordinator.getName()).getThreadPool();
-            Map<String, List<String>> responseHeaders = threadpool.getThreadContext().getResponseHeaders();
-            List<String> warnings = responseHeaders.getOrDefault("Warning", List.of())
-                .stream()
-                .filter(w -> w.contains("is not an IP string literal"))
-                .toList();
-            int expectedWarnings = Math.min(20, numDocs1 + numDocs2);
-            // we cap the number of warnings per node
-            assertThat(warnings.size(), greaterThanOrEqualTo(expectedWarnings));
+        CountDownLatch latch = new CountDownLatch(1);
+        client(coordinator.getName()).execute(EsqlQueryAction.INSTANCE, request, ActionListener.running(() -> {
+            try {
+                var threadpool = internalCluster().getInstance(TransportService.class, coordinator.getName()).getThreadPool();
+                Map<String, List<String>> responseHeaders = threadpool.getThreadContext().getResponseHeaders();
+                List<String> warnings = responseHeaders.getOrDefault("Warning", List.of())
+                    .stream()
+                    .filter(w -> w.contains("is not an IP string literal"))
+                    .toList();
+                int expectedWarnings = Math.min(20, numDocs1 + numDocs2);
+                // we cap the number of warnings per node
+                assertThat(warnings.size(), greaterThanOrEqualTo(expectedWarnings));
+            } finally {
+                latch.countDown();
+            }
         }));
-        future.actionGet(30, TimeUnit.SECONDS).close();
+        latch.await(30, TimeUnit.SECONDS);
     }
 
     private DiscoveryNode randomDataNode() {

+ 21 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlAsyncGetResultAction.java

@@ -0,0 +1,21 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.action.ActionType;
+
+public class EsqlAsyncGetResultAction extends ActionType<EsqlQueryResponse> {
+
+    public static final EsqlAsyncGetResultAction INSTANCE = new EsqlAsyncGetResultAction();
+
+    public static final String NAME = "indices:data/read/esql/async/get";
+
+    private EsqlAsyncGetResultAction() {
+        super(NAME, in -> { throw new IllegalArgumentException("can't transport EsqlAsyncGetResultAction"); });
+    }
+}

+ 89 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequest.java

@@ -14,6 +14,7 @@ import org.elasticsearch.action.CompositeIndicesRequest;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.AbstractQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.tasks.CancellableTask;
@@ -43,6 +44,9 @@ import static org.elasticsearch.xcontent.ObjectParser.ValueType.VALUE_ARRAY;
 
 public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesRequest {
 
+    public static TimeValue DEFAULT_KEEP_ALIVE = TimeValue.timeValueDays(5);
+    public static TimeValue DEFAULT_WAIT_FOR_COMPLETION = TimeValue.timeValueSeconds(1);
+
     private static final ConstructingObjectParser<TypedParamValue, Void> PARAM_PARSER = new ConstructingObjectParser<>(
         "params",
         true,
@@ -64,7 +68,14 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR
     private static final ParseField LOCALE_FIELD = new ParseField("locale");
     private static final ParseField PROFILE_FIELD = new ParseField("profile");
 
-    private static final ObjectParser<EsqlQueryRequest, Void> PARSER = objectParser(EsqlQueryRequest::new);
+    static final ParseField WAIT_FOR_COMPLETION_TIMEOUT = new ParseField("wait_for_completion_timeout");
+    static final ParseField KEEP_ALIVE = new ParseField("keep_alive");
+    static final ParseField KEEP_ON_COMPLETION = new ParseField("keep_on_completion");
+
+    private static final ObjectParser<EsqlQueryRequest, Void> SYNC_PARSER = objectParserSync(EsqlQueryRequest::syncEsqlQueryRequest);
+    private static final ObjectParser<EsqlQueryRequest, Void> ASYNC_PARSER = objectParserAsync(EsqlQueryRequest::asyncEsqlQueryRequest);
+
+    private boolean async;
 
     private String query;
     private boolean columnar;
@@ -73,6 +84,21 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR
     private QueryBuilder filter;
     private QueryPragmas pragmas = new QueryPragmas(Settings.EMPTY);
     private List<TypedParamValue> params = List.of();
+    private TimeValue waitForCompletionTimeout = DEFAULT_WAIT_FOR_COMPLETION;
+    private TimeValue keepAlive = DEFAULT_KEEP_ALIVE;
+    private boolean keepOnCompletion;
+
+    private static EsqlQueryRequest syncEsqlQueryRequest() {
+        return new EsqlQueryRequest(false);
+    }
+
+    private static EsqlQueryRequest asyncEsqlQueryRequest() {
+        return new EsqlQueryRequest(true);
+    }
+
+    private EsqlQueryRequest(boolean async) {
+        this.async = async;
+    }
 
     public EsqlQueryRequest(StreamInput in) throws IOException {
         super(in);
@@ -100,6 +126,14 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR
         return query;
     }
 
+    public void async(boolean async) {
+        this.async = async;
+    }
+
+    public boolean async() {
+        return async;
+    }
+
     public void columnar(boolean columnar) {
         this.columnar = columnar;
     }
@@ -155,12 +189,39 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR
         this.params = params;
     }
 
-    public static EsqlQueryRequest fromXContent(XContentParser parser) {
-        return PARSER.apply(parser, null);
+    public TimeValue waitForCompletionTimeout() {
+        return waitForCompletionTimeout;
     }
 
-    private static ObjectParser<EsqlQueryRequest, Void> objectParser(Supplier<EsqlQueryRequest> supplier) {
-        ObjectParser<EsqlQueryRequest, Void> parser = new ObjectParser<>("esql/query", false, supplier);
+    public void waitForCompletionTimeout(TimeValue waitForCompletionTimeout) {
+        this.waitForCompletionTimeout = waitForCompletionTimeout;
+    }
+
+    public TimeValue keepAlive() {
+        return keepAlive;
+    }
+
+    public void keepAlive(TimeValue keepAlive) {
+        this.keepAlive = keepAlive;
+    }
+
+    public boolean keepOnCompletion() {
+        return keepOnCompletion;
+    }
+
+    public void keepOnCompletion(boolean keepOnCompletion) {
+        this.keepOnCompletion = keepOnCompletion;
+    }
+
+    public static EsqlQueryRequest fromXContentSync(XContentParser parser) {
+        return SYNC_PARSER.apply(parser, null);
+    }
+
+    public static EsqlQueryRequest fromXContentAsync(XContentParser parser) {
+        return ASYNC_PARSER.apply(parser, null);
+    }
+
+    private static void objectParserCommon(ObjectParser<EsqlQueryRequest, ?> parser) {
         parser.declareString(EsqlQueryRequest::query, QUERY_FIELD);
         parser.declareBoolean(EsqlQueryRequest::columnar, COLUMNAR_FIELD);
         parser.declareObject(EsqlQueryRequest::filter, (p, c) -> AbstractQueryBuilder.parseTopLevelQuery(p), FILTER_FIELD);
@@ -172,7 +233,30 @@ public class EsqlQueryRequest extends ActionRequest implements CompositeIndicesR
         parser.declareField(EsqlQueryRequest::params, EsqlQueryRequest::parseParams, PARAMS_FIELD, VALUE_ARRAY);
         parser.declareString((request, localeTag) -> request.locale(Locale.forLanguageTag(localeTag)), LOCALE_FIELD);
         parser.declareBoolean(EsqlQueryRequest::profile, PROFILE_FIELD);
+    }
+
+    private static ObjectParser<EsqlQueryRequest, Void> objectParserSync(Supplier<EsqlQueryRequest> supplier) {
+        ObjectParser<EsqlQueryRequest, Void> parser = new ObjectParser<>("esql/query", false, supplier);
+        objectParserCommon(parser);
+        return parser;
+    }
 
+    private static ObjectParser<EsqlQueryRequest, Void> objectParserAsync(Supplier<EsqlQueryRequest> supplier) {
+        ObjectParser<EsqlQueryRequest, Void> parser = new ObjectParser<>("esql/async_query", false, supplier);
+        objectParserCommon(parser);
+        parser.declareBoolean(EsqlQueryRequest::keepOnCompletion, KEEP_ON_COMPLETION);
+        parser.declareField(
+            EsqlQueryRequest::waitForCompletionTimeout,
+            (p, c) -> TimeValue.parseTimeValue(p.text(), WAIT_FOR_COMPLETION_TIMEOUT.getPreferredName()),
+            WAIT_FOR_COMPLETION_TIMEOUT,
+            ObjectParser.ValueType.VALUE
+        );
+        parser.declareField(
+            EsqlQueryRequest::keepAlive,
+            (p, c) -> TimeValue.parseTimeValue(p.text(), KEEP_ALIVE.getPreferredName()),
+            KEEP_ALIVE,
+            ObjectParser.ValueType.VALUE
+        );
         return parser;
     }
 

+ 21 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestBuilder.java

@@ -9,6 +9,7 @@ package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.action.ActionRequestBuilder;
 import org.elasticsearch.client.internal.ElasticsearchClient;
+import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 
@@ -37,4 +38,24 @@ public class EsqlQueryRequestBuilder extends ActionRequestBuilder<EsqlQueryReque
         request.pragmas(pragmas);
         return this;
     }
+
+    public EsqlQueryRequestBuilder waitForCompletionTimeout(TimeValue waitForCompletionTimeout) {
+        request.waitForCompletionTimeout(waitForCompletionTimeout);
+        return this;
+    }
+
+    public EsqlQueryRequestBuilder keepAlive(TimeValue keepAlive) {
+        request.keepAlive(keepAlive);
+        return this;
+    }
+
+    public EsqlQueryRequestBuilder keepOnCompletion(boolean keepOnCompletion) {
+        request.keepOnCompletion(keepOnCompletion);
+        return this;
+    }
+
+    public EsqlQueryRequestBuilder async(boolean async) {
+        request.async(async);
+        return this;
+    }
 }

+ 137 - 213
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponse.java

@@ -7,12 +7,9 @@
 
 package org.elasticsearch.xpack.esql.action;
 
-import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.TransportVersions;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.common.Strings;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -20,54 +17,38 @@ import org.elasticsearch.common.io.stream.Writeable;
 import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
 import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
-import org.elasticsearch.common.xcontent.XContentHelper;
-import org.elasticsearch.compute.data.Block;
 import org.elasticsearch.compute.data.BlockFactory;
 import org.elasticsearch.compute.data.BlockStreamInput;
-import org.elasticsearch.compute.data.BooleanBlock;
-import org.elasticsearch.compute.data.BytesRefBlock;
-import org.elasticsearch.compute.data.DoubleBlock;
-import org.elasticsearch.compute.data.IntBlock;
-import org.elasticsearch.compute.data.LongBlock;
 import org.elasticsearch.compute.data.Page;
-import org.elasticsearch.compute.lucene.UnsupportedValueSource;
 import org.elasticsearch.compute.operator.DriverProfile;
+import org.elasticsearch.core.AbstractRefCounted;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
-import org.elasticsearch.search.DocValueFormat;
 import org.elasticsearch.xcontent.InstantiatingObjectParser;
 import org.elasticsearch.xcontent.ObjectParser;
 import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ParserConstructor;
 import org.elasticsearch.xcontent.ToXContent;
-import org.elasticsearch.xcontent.XContentBuilder;
 import org.elasticsearch.xcontent.XContentParser;
-import org.elasticsearch.xcontent.XContentParserConfiguration;
-import org.elasticsearch.xcontent.json.JsonXContent;
-import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
-import org.elasticsearch.xpack.esql.planner.PlannerUtils;
-import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
-import org.elasticsearch.xpack.versionfield.Version;
 
 import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.function.Function;
+import java.util.Optional;
 
 import static org.elasticsearch.xcontent.ConstructingObjectParser.constructorArg;
-import static org.elasticsearch.xpack.ql.util.DateUtils.UTC_DATE_TIME_FORMATTER;
-import static org.elasticsearch.xpack.ql.util.NumericUtils.asLongUnsigned;
-import static org.elasticsearch.xpack.ql.util.NumericUtils.unsignedLongAsNumber;
-import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.CARTESIAN;
-import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.GEO;
-import static org.elasticsearch.xpack.ql.util.StringUtils.parseIP;
+import static org.elasticsearch.xcontent.ConstructingObjectParser.optionalConstructorArg;
+import static org.elasticsearch.xpack.esql.action.ResponseValueUtils.valuesToPage;
 
 public class EsqlQueryResponse extends ActionResponse implements ChunkedToXContentObject, Releasable {
+
+    private final AbstractRefCounted counted = AbstractRefCounted.of(this::closeInternal);
+
+    private static final ParseField ID = new ParseField("id");
+    private static final ParseField IS_RUNNING = new ParseField("is_running");
     private static final InstantiatingObjectParser<EsqlQueryResponse, Void> PARSER;
     static {
         InstantiatingObjectParser.Builder<EsqlQueryResponse, Void> parser = InstantiatingObjectParser.builder(
@@ -75,6 +56,13 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
             true,
             EsqlQueryResponse.class
         );
+        parser.declareString(optionalConstructorArg(), ID);
+        parser.declareField(
+            optionalConstructorArg(),
+            p -> p.currentToken() == XContentParser.Token.VALUE_NULL ? false : p.booleanValue(),
+            IS_RUNNING,
+            ObjectParser.ValueType.BOOLEAN_OR_NULL
+        );
         parser.declareObjectArray(constructorArg(), (p, c) -> ColumnInfo.fromXContent(p), new ParseField("columns"));
         parser.declareField(constructorArg(), (p, c) -> p.list(), new ParseField("values"), ObjectParser.ValueType.OBJECT_ARRAY);
         PARSER = parser.build();
@@ -84,42 +72,87 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
     private final List<Page> pages;
     private final Profile profile;
     private final boolean columnar;
-
-    public EsqlQueryResponse(List<ColumnInfo> columns, List<Page> pages, @Nullable Profile profile, boolean columnar) {
+    private final String asyncExecutionId;
+    private final boolean isRunning;
+    // True if this response is as a result of an async query request
+    private final boolean isAsync;
+
+    public EsqlQueryResponse(
+        List<ColumnInfo> columns,
+        List<Page> pages,
+        @Nullable Profile profile,
+        boolean columnar,
+        @Nullable String asyncExecutionId,
+        boolean isRunning,
+        boolean isAsync
+    ) {
         this.columns = columns;
         this.pages = pages;
         this.profile = profile;
         this.columnar = columnar;
+        this.asyncExecutionId = asyncExecutionId;
+        this.isRunning = isRunning;
+        this.isAsync = isAsync;
     }
 
-    public EsqlQueryResponse(List<ColumnInfo> columns, List<List<Object>> values) {
-        this.columns = columns;
-        this.pages = List.of(valuesToPage(columns.stream().map(ColumnInfo::type).toList(), values));
-        this.profile = null;
-        this.columnar = false;
+    public EsqlQueryResponse(List<ColumnInfo> columns, List<Page> pages, @Nullable Profile profile, boolean columnar, boolean isAsync) {
+        this(columns, pages, profile, columnar, null, false, isAsync);
+    }
+
+    // Used for XContent reconstruction
+    @ParserConstructor
+    public EsqlQueryResponse(@Nullable String asyncExecutionId, Boolean isRunning, List<ColumnInfo> columns, List<List<Object>> values) {
+        this(
+            columns,
+            List.of(valuesToPage(columns, values)),
+            null,
+            false,
+            asyncExecutionId,
+            isRunning != null,
+            isAsync(asyncExecutionId, isRunning)
+        );
+    }
+
+    static boolean isAsync(@Nullable String asyncExecutionId, Boolean isRunning) {
+        if (asyncExecutionId != null || isRunning != null) {
+            return true;
+        }
+        return false;
     }
 
     /**
      * Build a reader for the response.
      */
     public static Writeable.Reader<EsqlQueryResponse> reader(BlockFactory blockFactory) {
-        return in -> new EsqlQueryResponse(new BlockStreamInput(in, blockFactory));
+        return in -> deserialize(new BlockStreamInput(in, blockFactory));
     }
 
-    private EsqlQueryResponse(BlockStreamInput in) throws IOException {
-        super(in);
-        this.columns = in.readCollectionAsList(ColumnInfo::new);
-        this.pages = in.readCollectionAsList(Page::new);
+    static EsqlQueryResponse deserialize(BlockStreamInput in) throws IOException {
+        String asyncExecutionId = null;
+        boolean isRunning = false;
+        boolean isAsync = false;
+        Profile profile = null;
+        if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_ASYNC_QUERY)) {
+            asyncExecutionId = in.readOptionalString();
+            isRunning = in.readBoolean();
+            isAsync = in.readBoolean();
+        }
+        List<ColumnInfo> columns = in.readCollectionAsList(ColumnInfo::new);
+        List<Page> pages = in.readCollectionAsList(Page::new);
         if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE)) {
-            this.profile = in.readOptionalWriteable(Profile::new);
-        } else {
-            this.profile = null;
+            profile = in.readOptionalWriteable(Profile::new);
         }
-        this.columnar = in.readBoolean();
+        boolean columnar = in.readBoolean();
+        return new EsqlQueryResponse(columns, pages, profile, columnar, asyncExecutionId, isRunning, isAsync);
     }
 
     @Override
     public void writeTo(StreamOutput out) throws IOException {
+        if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_ASYNC_QUERY)) {
+            out.writeOptionalString(asyncExecutionId);
+            out.writeBoolean(isRunning);
+            out.writeBoolean(isAsync);
+        }
         out.writeCollection(columns);
         out.writeCollection(pages);
         if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE)) {
@@ -137,7 +170,8 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
     }
 
     public Iterator<Iterator<Object>> values() {
-        return pagesToValues(columns.stream().map(ColumnInfo::type).toList(), pages);
+        List<String> dataTypes = columns.stream().map(ColumnInfo::type).toList();
+        return ResponseValueUtils.pagesToValues(dataTypes, pages);
     }
 
     public Profile profile() {
@@ -148,63 +182,42 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
         return columnar;
     }
 
-    @Override
-    public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
-        final BytesRef scratch = new BytesRef();
-        final Iterator<? extends ToXContent> valuesIt;
-        if (pages.isEmpty()) {
-            valuesIt = Collections.emptyIterator();
-        } else if (columnar) {
-            valuesIt = Iterators.flatMap(
-                Iterators.forRange(
-                    0,
-                    columns().size(),
-                    column -> Iterators.concat(
-                        Iterators.single(((builder, p) -> builder.startArray())),
-                        Iterators.flatMap(pages.iterator(), page -> {
-                            ColumnInfo.PositionToXContent toXContent = columns.get(column)
-                                .positionToXContent(page.getBlock(column), scratch);
-                            return Iterators.forRange(
-                                0,
-                                page.getPositionCount(),
-                                position -> (builder, p) -> toXContent.positionToXContent(builder, p, position)
-                            );
-                        }),
-                        ChunkedToXContentHelper.endArray()
-                    )
-                ),
-                Function.identity()
-            );
-        } else {
-            valuesIt = Iterators.flatMap(pages.iterator(), page -> {
-                final int columnCount = columns.size();
-                assert page.getBlockCount() == columnCount : page.getBlockCount() + " != " + columnCount;
-                final ColumnInfo.PositionToXContent[] toXContents = new ColumnInfo.PositionToXContent[columnCount];
-                for (int column = 0; column < columnCount; column++) {
-                    toXContents[column] = columns.get(column).positionToXContent(page.getBlock(column), scratch);
+    public Optional<String> asyncExecutionId() {
+        return Optional.ofNullable(asyncExecutionId);
+    }
+
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    public boolean isAsync() {
+        return isRunning;
+    }
+
+    private Iterator<? extends ToXContent> asyncPropertiesOrEmpty() {
+        if (isAsync) {
+            return ChunkedToXContentHelper.singleChunk((builder, params) -> {
+                if (asyncExecutionId != null) {
+                    builder.field("id", asyncExecutionId);
                 }
-                return Iterators.forRange(0, page.getPositionCount(), position -> (builder, p) -> {
-                    builder.startArray();
-                    for (int c = 0; c < columnCount; c++) {
-                        toXContents[c].positionToXContent(builder, p, position);
-                    }
-                    return builder.endArray();
-                });
+                builder.field("is_running", isRunning);
+                return builder;
             });
+        } else {
+            return Collections.emptyIterator();
         }
-        Iterator<ToXContent> columnsRender = ChunkedToXContentHelper.singleChunk((builder, p) -> {
-            builder.startArray("columns");
-            for (ColumnInfo col : columns) {
-                col.toXContent(builder, p);
-            }
-            return builder.endArray();
-        });
+    }
+
+    @Override
+    public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
+        final Iterator<? extends ToXContent> valuesIt = ResponseXContentUtils.columnValues(this.columns, this.pages, columnar);
         Iterator<ToXContent> profileRender = profile == null
             ? List.<ToXContent>of().iterator()
             : ChunkedToXContentHelper.field("profile", profile, params);
         return Iterators.concat(
             ChunkedToXContentHelper.startObject(),
-            columnsRender,
+            asyncPropertiesOrEmpty(),
+            ResponseXContentUtils.columnHeadings(columns),
             ChunkedToXContentHelper.array("values", valuesIt),
             profileRender,
             ChunkedToXContentHelper.endObject()
@@ -226,6 +239,8 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
         if (o == null || getClass() != o.getClass()) return false;
         EsqlQueryResponse that = (EsqlQueryResponse) o;
         return Objects.equals(columns, that.columns)
+            && Objects.equals(asyncExecutionId, that.asyncExecutionId)
+            && Objects.equals(isRunning, that.isRunning)
             && columnar == that.columnar
             && Iterators.equals(values(), that.values(), (row1, row2) -> Iterators.equals(row1, row2, Objects::equals))
             && Objects.equals(profile, that.profile);
@@ -233,7 +248,13 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
 
     @Override
     public int hashCode() {
-        return Objects.hash(columns, Iterators.hashCode(values(), row -> Iterators.hashCode(row, Objects::hashCode)), columnar);
+        return Objects.hash(
+            asyncExecutionId,
+            isRunning,
+            columns,
+            Iterators.hashCode(values(), row -> Iterators.hashCode(row, Objects::hashCode)),
+            columnar
+        );
     }
 
     @Override
@@ -242,129 +263,32 @@ public class EsqlQueryResponse extends ActionResponse implements ChunkedToXConte
     }
 
     @Override
-    public void close() {
-        Releasables.close(() -> Iterators.map(pages.iterator(), p -> p::releaseBlocks));
+    public void incRef() {
+        tryIncRef();
     }
 
-    public static Iterator<Iterator<Object>> pagesToValues(List<String> dataTypes, List<Page> pages) {
-        BytesRef scratch = new BytesRef();
-        return Iterators.flatMap(
-            pages.iterator(),
-            page -> Iterators.forRange(0, page.getPositionCount(), p -> Iterators.forRange(0, page.getBlockCount(), b -> {
-                Block block = page.getBlock(b);
-                if (block.isNull(p)) {
-                    return null;
-                }
-                /*
-                 * Use the ESQL data type to map to the output to make sure compute engine
-                 * respects its types. See the INTEGER clause where is doesn't always
-                 * respect it.
-                 */
-                int count = block.getValueCount(p);
-                int start = block.getFirstValueIndex(p);
-                String dataType = dataTypes.get(b);
-                if (count == 1) {
-                    return valueAt(dataType, block, start, scratch);
-                }
-                List<Object> thisResult = new ArrayList<>(count);
-                int end = count + start;
-                for (int i = start; i < end; i++) {
-                    thisResult.add(valueAt(dataType, block, i, scratch));
-                }
-                return thisResult;
-            }))
-        );
+    @Override
+    public boolean tryIncRef() {
+        return counted.tryIncRef();
     }
 
-    private static Object valueAt(String dataType, Block block, int offset, BytesRef scratch) {
-        return switch (dataType) {
-            case "unsigned_long" -> unsignedLongAsNumber(((LongBlock) block).getLong(offset));
-            case "long" -> ((LongBlock) block).getLong(offset);
-            case "integer" -> ((IntBlock) block).getInt(offset);
-            case "double" -> ((DoubleBlock) block).getDouble(offset);
-            case "keyword", "text" -> ((BytesRefBlock) block).getBytesRef(offset, scratch).utf8ToString();
-            case "ip" -> {
-                BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
-                yield DocValueFormat.IP.format(val);
-            }
-            case "date" -> {
-                long longVal = ((LongBlock) block).getLong(offset);
-                yield UTC_DATE_TIME_FORMATTER.formatMillis(longVal);
-            }
-            case "boolean" -> ((BooleanBlock) block).getBoolean(offset);
-            case "version" -> new Version(((BytesRefBlock) block).getBytesRef(offset, scratch)).toString();
-            case "geo_point" -> GEO.longAsPoint(((LongBlock) block).getLong(offset));
-            case "cartesian_point" -> CARTESIAN.longAsPoint(((LongBlock) block).getLong(offset));
-            case "unsupported" -> UnsupportedValueSource.UNSUPPORTED_OUTPUT;
-            case "_source" -> {
-                BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
-                try {
-                    try (XContentParser parser = XContentHelper.createParser(XContentParserConfiguration.EMPTY, new BytesArray(val))) {
-                        parser.nextToken();
-                        yield parser.mapOrdered();
-                    }
-                } catch (IOException e) {
-                    throw new UncheckedIOException(e);
-                }
-            }
-            default -> throw EsqlIllegalArgumentException.illegalDataType(dataType);
-        };
+    @Override
+    public boolean decRef() {
+        return counted.decRef();
     }
 
-    /**
-     * Convert a list of values to Pages so we can parse from xcontent. It's not
-     * super efficient but it doesn't really have to be.
-     */
-    private static Page valuesToPage(List<String> dataTypes, List<List<Object>> values) {
-        List<Block.Builder> results = dataTypes.stream()
-            .map(c -> PlannerUtils.toElementType(EsqlDataTypes.fromName(c)).newBlockBuilder(values.size()))
-            .toList();
-
-        for (List<Object> row : values) {
-            for (int c = 0; c < row.size(); c++) {
-                var builder = results.get(c);
-                var value = row.get(c);
-                switch (dataTypes.get(c)) {
-                    case "unsigned_long" -> ((LongBlock.Builder) builder).appendLong(asLongUnsigned(((Number) value).longValue()));
-                    case "long" -> ((LongBlock.Builder) builder).appendLong(((Number) value).longValue());
-                    case "integer" -> ((IntBlock.Builder) builder).appendInt(((Number) value).intValue());
-                    case "double" -> ((DoubleBlock.Builder) builder).appendDouble(((Number) value).doubleValue());
-                    case "keyword", "text", "unsupported" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
-                        new BytesRef(value.toString())
-                    );
-                    case "ip" -> ((BytesRefBlock.Builder) builder).appendBytesRef(parseIP(value.toString()));
-                    case "date" -> {
-                        long longVal = UTC_DATE_TIME_FORMATTER.parseMillis(value.toString());
-                        ((LongBlock.Builder) builder).appendLong(longVal);
-                    }
-                    case "boolean" -> ((BooleanBlock.Builder) builder).appendBoolean(((Boolean) value));
-                    case "null" -> builder.appendNull();
-                    case "version" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new Version(value.toString()).toBytesRef());
-                    case "_source" -> {
-                        @SuppressWarnings("unchecked")
-                        Map<String, ?> o = (Map<String, ?>) value;
-                        try {
-                            try (XContentBuilder sourceBuilder = JsonXContent.contentBuilder()) {
-                                sourceBuilder.map(o);
-                                ((BytesRefBlock.Builder) builder).appendBytesRef(BytesReference.bytes(sourceBuilder).toBytesRef());
-                            }
-                        } catch (IOException e) {
-                            throw new UncheckedIOException(e);
-                        }
-                    }
-                    case "geo_point" -> {
-                        long longVal = GEO.pointAsLong(GEO.stringAsPoint(value.toString()));
-                        ((LongBlock.Builder) builder).appendLong(longVal);
-                    }
-                    case "cartesian_point" -> {
-                        long longVal = CARTESIAN.pointAsLong(CARTESIAN.stringAsPoint(value.toString()));
-                        ((LongBlock.Builder) builder).appendLong(longVal);
-                    }
-                    default -> throw EsqlIllegalArgumentException.illegalDataType(dataTypes.get(c));
-                }
-            }
-        }
-        return new Page(results.stream().map(Block.Builder::build).toArray(Block[]::new));
+    @Override
+    public boolean hasReferences() {
+        return counted.hasReferences();
+    }
+
+    @Override
+    public void close() {
+        decRef();
+    }
+
+    void closeInternal() {
+        Releasables.close(() -> Iterators.map(pages.iterator(), p -> p::releaseBlocks));
     }
 
     public static class Profile implements Writeable, ChunkedToXContentObject {

+ 38 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlQueryTask.java

@@ -0,0 +1,38 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.tasks.TaskId;
+import org.elasticsearch.xpack.core.async.AsyncExecutionId;
+import org.elasticsearch.xpack.core.async.StoredAsyncTask;
+
+import java.util.List;
+import java.util.Map;
+
+public class EsqlQueryTask extends StoredAsyncTask<EsqlQueryResponse> {
+
+    public EsqlQueryTask(
+        long id,
+        String type,
+        String action,
+        String description,
+        TaskId parentTaskId,
+        Map<String, String> headers,
+        Map<String, String> originHeaders,
+        AsyncExecutionId asyncExecutionId,
+        TimeValue keepAlive
+    ) {
+        super(id, type, action, description, parentTaskId, headers, originHeaders, asyncExecutionId, keepAlive);
+    }
+
+    @Override
+    public EsqlQueryResponse getCurrentResult() {
+        return new EsqlQueryResponse(List.of(), List.of(), null, false, getExecutionId().getEncoded(), true, true);
+    }
+}

+ 13 - 6
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/EsqlResponseListener.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
@@ -16,11 +17,12 @@ import org.elasticsearch.rest.RestChannel;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.RestStatus;
-import org.elasticsearch.rest.action.RestResponseListener;
+import org.elasticsearch.rest.action.RestRefCountedChunkedToXContentListener;
 import org.elasticsearch.xcontent.MediaType;
 import org.elasticsearch.xpack.esql.formatter.TextFormat;
 import org.elasticsearch.xpack.esql.plugin.EsqlMediaTypeParser;
 
+import java.io.IOException;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
 
@@ -31,7 +33,7 @@ import static org.elasticsearch.xpack.ql.util.LoggingUtils.logOnFailure;
 /**
  * Listens for a single {@link EsqlQueryResponse}, builds a corresponding {@link RestResponse} and sends it.
  */
-public class EsqlResponseListener extends RestResponseListener<EsqlQueryResponse> {
+public final class EsqlResponseListener extends RestRefCountedChunkedToXContentListener<EsqlQueryResponse> {
     /**
      * A simple, thread-safe stop watch for timing a single action.
      * Allows to stop the time for building a response and to log it at a later point.
@@ -118,8 +120,13 @@ public class EsqlResponseListener extends RestResponseListener<EsqlQueryResponse
     }
 
     @Override
-    public RestResponse buildResponse(EsqlQueryResponse esqlResponse) throws Exception {
+    protected void processResponse(EsqlQueryResponse esqlQueryResponse) throws IOException {
+        channel.sendResponse(buildResponse(esqlQueryResponse));
+    }
+
+    private RestResponse buildResponse(EsqlQueryResponse esqlResponse) throws IOException {
         boolean success = false;
+        final Releasable releasable = releasableFromResponse(esqlResponse);
         try {
             RestResponse restResponse;
             if (mediaType instanceof TextFormat format) {
@@ -128,13 +135,13 @@ public class EsqlResponseListener extends RestResponseListener<EsqlQueryResponse
                     ChunkedRestResponseBody.fromTextChunks(
                         format.contentType(restRequest),
                         format.format(restRequest, esqlResponse),
-                        esqlResponse
+                        releasable
                     )
                 );
             } else {
                 restResponse = RestResponse.chunked(
                     RestStatus.OK,
-                    ChunkedRestResponseBody.fromXContent(esqlResponse, channel.request(), channel, esqlResponse)
+                    ChunkedRestResponseBody.fromXContent(esqlResponse, channel.request(), channel, releasable)
                 );
             }
             long tookNanos = stopWatch.stop().getNanos();
@@ -143,7 +150,7 @@ public class EsqlResponseListener extends RestResponseListener<EsqlQueryResponse
             return restResponse;
         } finally {
             if (success == false) {
-                esqlResponse.close();
+                releasable.close();
             }
         }
     }

+ 177 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseValueUtils.java

@@ -0,0 +1,177 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.compute.data.Block;
+import org.elasticsearch.compute.data.BooleanBlock;
+import org.elasticsearch.compute.data.BytesRefBlock;
+import org.elasticsearch.compute.data.DoubleBlock;
+import org.elasticsearch.compute.data.IntBlock;
+import org.elasticsearch.compute.data.LongBlock;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.compute.lucene.UnsupportedValueSource;
+import org.elasticsearch.search.DocValueFormat;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentParser;
+import org.elasticsearch.xcontent.XContentParserConfiguration;
+import org.elasticsearch.xcontent.json.JsonXContent;
+import org.elasticsearch.xpack.esql.EsqlIllegalArgumentException;
+import org.elasticsearch.xpack.esql.planner.PlannerUtils;
+import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
+import org.elasticsearch.xpack.versionfield.Version;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.elasticsearch.xpack.ql.util.DateUtils.UTC_DATE_TIME_FORMATTER;
+import static org.elasticsearch.xpack.ql.util.NumericUtils.asLongUnsigned;
+import static org.elasticsearch.xpack.ql.util.NumericUtils.unsignedLongAsNumber;
+import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.CARTESIAN;
+import static org.elasticsearch.xpack.ql.util.SpatialCoordinateTypes.GEO;
+import static org.elasticsearch.xpack.ql.util.StringUtils.parseIP;
+
+/**
+ * Collection of static utility methods for helping transform response data between pages and values.
+ */
+public final class ResponseValueUtils {
+
+    /**
+     * Returns an iterator of iterators over the values in the given pages. There is one iterator
+     * for each block.
+     */
+    public static Iterator<Iterator<Object>> pagesToValues(List<String> dataTypes, List<Page> pages) {
+        BytesRef scratch = new BytesRef();
+        return Iterators.flatMap(
+            pages.iterator(),
+            page -> Iterators.forRange(0, page.getPositionCount(), p -> Iterators.forRange(0, page.getBlockCount(), b -> {
+                Block block = page.getBlock(b);
+                if (block.isNull(p)) {
+                    return null;
+                }
+                /*
+                 * Use the ESQL data type to map to the output to make sure compute engine
+                 * respects its types. See the INTEGER clause where is doesn't always
+                 * respect it.
+                 */
+                int count = block.getValueCount(p);
+                int start = block.getFirstValueIndex(p);
+                String dataType = dataTypes.get(b);
+                if (count == 1) {
+                    return valueAt(dataType, block, start, scratch);
+                }
+                List<Object> thisResult = new ArrayList<>(count);
+                int end = count + start;
+                for (int i = start; i < end; i++) {
+                    thisResult.add(valueAt(dataType, block, i, scratch));
+                }
+                return thisResult;
+            }))
+        );
+    }
+
+    private static Object valueAt(String dataType, Block block, int offset, BytesRef scratch) {
+        return switch (dataType) {
+            case "unsigned_long" -> unsignedLongAsNumber(((LongBlock) block).getLong(offset));
+            case "long" -> ((LongBlock) block).getLong(offset);
+            case "integer" -> ((IntBlock) block).getInt(offset);
+            case "double" -> ((DoubleBlock) block).getDouble(offset);
+            case "keyword", "text" -> ((BytesRefBlock) block).getBytesRef(offset, scratch).utf8ToString();
+            case "ip" -> {
+                BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
+                yield DocValueFormat.IP.format(val);
+            }
+            case "date" -> {
+                long longVal = ((LongBlock) block).getLong(offset);
+                yield UTC_DATE_TIME_FORMATTER.formatMillis(longVal);
+            }
+            case "boolean" -> ((BooleanBlock) block).getBoolean(offset);
+            case "version" -> new Version(((BytesRefBlock) block).getBytesRef(offset, scratch)).toString();
+            case "geo_point" -> GEO.longAsPoint(((LongBlock) block).getLong(offset));
+            case "cartesian_point" -> CARTESIAN.longAsPoint(((LongBlock) block).getLong(offset));
+            case "unsupported" -> UnsupportedValueSource.UNSUPPORTED_OUTPUT;
+            case "_source" -> {
+                BytesRef val = ((BytesRefBlock) block).getBytesRef(offset, scratch);
+                try {
+                    try (XContentParser parser = XContentHelper.createParser(XContentParserConfiguration.EMPTY, new BytesArray(val))) {
+                        parser.nextToken();
+                        yield parser.mapOrdered();
+                    }
+                } catch (IOException e) {
+                    throw new UncheckedIOException(e);
+                }
+            }
+            default -> throw EsqlIllegalArgumentException.illegalDataType(dataType);
+        };
+    }
+
+    /**
+     * Converts a list of values to Pages so that we can parse from xcontent. It's not
+     * super efficient, but it doesn't really have to be.
+     */
+    static Page valuesToPage(List<ColumnInfo> columns, List<List<Object>> values) {
+        List<String> dataTypes = columns.stream().map(ColumnInfo::type).toList();
+        List<Block.Builder> results = dataTypes.stream()
+            .map(c -> PlannerUtils.toElementType(EsqlDataTypes.fromName(c)).newBlockBuilder(values.size()))
+            .toList();
+
+        for (List<Object> row : values) {
+            for (int c = 0; c < row.size(); c++) {
+                var builder = results.get(c);
+                var value = row.get(c);
+                switch (dataTypes.get(c)) {
+                    case "unsigned_long" -> ((LongBlock.Builder) builder).appendLong(asLongUnsigned(((Number) value).longValue()));
+                    case "long" -> ((LongBlock.Builder) builder).appendLong(((Number) value).longValue());
+                    case "integer" -> ((IntBlock.Builder) builder).appendInt(((Number) value).intValue());
+                    case "double" -> ((DoubleBlock.Builder) builder).appendDouble(((Number) value).doubleValue());
+                    case "keyword", "text", "unsupported" -> ((BytesRefBlock.Builder) builder).appendBytesRef(
+                        new BytesRef(value.toString())
+                    );
+                    case "ip" -> ((BytesRefBlock.Builder) builder).appendBytesRef(parseIP(value.toString()));
+                    case "date" -> {
+                        long longVal = UTC_DATE_TIME_FORMATTER.parseMillis(value.toString());
+                        ((LongBlock.Builder) builder).appendLong(longVal);
+                    }
+                    case "boolean" -> ((BooleanBlock.Builder) builder).appendBoolean(((Boolean) value));
+                    case "null" -> builder.appendNull();
+                    case "version" -> ((BytesRefBlock.Builder) builder).appendBytesRef(new Version(value.toString()).toBytesRef());
+                    case "_source" -> {
+                        @SuppressWarnings("unchecked")
+                        Map<String, ?> o = (Map<String, ?>) value;
+                        try {
+                            try (XContentBuilder sourceBuilder = JsonXContent.contentBuilder()) {
+                                sourceBuilder.map(o);
+                                ((BytesRefBlock.Builder) builder).appendBytesRef(BytesReference.bytes(sourceBuilder).toBytesRef());
+                            }
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    }
+                    case "geo_point" -> {
+                        long longVal = GEO.pointAsLong(GEO.stringAsPoint(value.toString()));
+                        ((LongBlock.Builder) builder).appendLong(longVal);
+                    }
+                    case "cartesian_point" -> {
+                        long longVal = CARTESIAN.pointAsLong(CARTESIAN.stringAsPoint(value.toString()));
+                        ((LongBlock.Builder) builder).appendLong(longVal);
+                    }
+                    default -> throw EsqlIllegalArgumentException.illegalDataType(dataTypes.get(c));
+                }
+            }
+        }
+        return new Page(results.stream().map(Block.Builder::build).toArray(Block[]::new));
+    }
+}

+ 91 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/ResponseXContentUtils.java

@@ -0,0 +1,91 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.xcontent.ToXContent;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.Function;
+
+/**
+ * Collection of static utility methods for helping transform response data to XContent.
+ */
+final class ResponseXContentUtils {
+
+    /** Returns the column headings for the given columns. */
+    static Iterator<? extends ToXContent> columnHeadings(List<ColumnInfo> columns) {
+        return ChunkedToXContentHelper.singleChunk((builder, params) -> {
+            builder.startArray("columns");
+            for (ColumnInfo col : columns) {
+                col.toXContent(builder, params);
+            }
+            return builder.endArray();
+        });
+    }
+
+    /** Returns the column values for the given pages (described by the column infos). */
+    static Iterator<? extends ToXContent> columnValues(List<ColumnInfo> columns, List<Page> pages, boolean columnar) {
+        if (pages.isEmpty()) {
+            return Collections.emptyIterator();
+        } else if (columnar) {
+            return columnarValues(columns, pages);
+        } else {
+            return rowValues(columns, pages);
+        }
+    }
+
+    /** Returns a columnar based representation of the values in the given pages (described by the column infos). */
+    static Iterator<? extends ToXContent> columnarValues(List<ColumnInfo> columns, List<Page> pages) {
+        final BytesRef scratch = new BytesRef();
+        return Iterators.flatMap(
+            Iterators.forRange(
+                0,
+                columns.size(),
+                column -> Iterators.concat(
+                    Iterators.single(((builder, params) -> builder.startArray())),
+                    Iterators.flatMap(pages.iterator(), page -> {
+                        ColumnInfo.PositionToXContent toXContent = columns.get(column).positionToXContent(page.getBlock(column), scratch);
+                        return Iterators.forRange(
+                            0,
+                            page.getPositionCount(),
+                            position -> (builder, params) -> toXContent.positionToXContent(builder, params, position)
+                        );
+                    }),
+                    ChunkedToXContentHelper.endArray()
+                )
+            ),
+            Function.identity()
+        );
+    }
+
+    /** Returns a row based representation of the values in the given pages (described by the column infos). */
+    static Iterator<? extends ToXContent> rowValues(List<ColumnInfo> columns, List<Page> pages) {
+        final BytesRef scratch = new BytesRef();
+        return Iterators.flatMap(pages.iterator(), page -> {
+            final int columnCount = columns.size();
+            assert page.getBlockCount() == columnCount : page.getBlockCount() + " != " + columnCount;
+            final ColumnInfo.PositionToXContent[] toXContents = new ColumnInfo.PositionToXContent[columnCount];
+            for (int column = 0; column < columnCount; column++) {
+                toXContents[column] = columns.get(column).positionToXContent(page.getBlock(column), scratch);
+            }
+            return Iterators.forRange(0, page.getPositionCount(), position -> (builder, params) -> {
+                builder.startArray();
+                for (int c = 0; c < columnCount; c++) {
+                    toXContents[c].positionToXContent(builder, params, position);
+                }
+                return builder.endArray();
+            });
+        });
+    }
+}

+ 62 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlAsyncQueryAction.java

@@ -0,0 +1,62 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
+import org.elasticsearch.xcontent.XContentParser;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import static org.elasticsearch.rest.RestRequest.Method.POST;
+import static org.elasticsearch.xpack.esql.formatter.TextFormat.URL_PARAM_DELIMITER;
+
+public class RestEsqlAsyncQueryAction extends BaseRestHandler {
+    private static final Logger LOGGER = LogManager.getLogger(RestEsqlAsyncQueryAction.class);
+
+    @Override
+    public String getName() {
+        return "esql_async_query";
+    }
+
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(POST, "/_query/async"));
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+        EsqlQueryRequest esqlRequest;
+        try (XContentParser parser = request.contentOrSourceParamParser()) {
+            esqlRequest = EsqlQueryRequest.fromXContentAsync(parser);
+        }
+
+        LOGGER.info("Beginning execution of ESQL async query.\nQuery string: [{}]", esqlRequest.query());
+
+        return channel -> {
+            RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
+            cancellableClient.execute(
+                EsqlQueryAction.INSTANCE,
+                esqlRequest,
+                new EsqlResponseListener(channel, request, esqlRequest).wrapWithLogging()
+            );
+        };
+    }
+
+    @Override
+    protected Set<String> responseParams() {
+        return Collections.singleton(URL_PARAM_DELIMITER);
+    }
+}

+ 42 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlGetAsyncResultAction.java

@@ -0,0 +1,42 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestChunkedToXContentListener;
+import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
+
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+
+public class RestEsqlGetAsyncResultAction extends BaseRestHandler {
+    @Override
+    public List<Route> routes() {
+        return List.of(new Route(GET, "/_query/async/{id}"));
+    }
+
+    @Override
+    public String getName() {
+        return "esql_get_async_result";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+        GetAsyncResultRequest get = new GetAsyncResultRequest(request.param("id"));
+        if (request.hasParam("wait_for_completion_timeout")) {
+            get.setWaitForCompletionTimeout(request.paramAsTime("wait_for_completion_timeout", get.getWaitForCompletionTimeout()));
+        }
+        if (request.hasParam("keep_alive")) {
+            get.setKeepAlive(request.paramAsTime("keep_alive", get.getKeepAlive()));
+        }
+        return channel -> client.execute(EsqlAsyncGetResultAction.INSTANCE, get, new RestChunkedToXContentListener<>(channel));
+    }
+}

+ 1 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlQueryAction.java

@@ -48,7 +48,7 @@ public class RestEsqlQueryAction extends BaseRestHandler {
     protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
         EsqlQueryRequest esqlRequest;
         try (XContentParser parser = request.contentOrSourceParamParser()) {
-            esqlRequest = EsqlQueryRequest.fromXContent(parser);
+            esqlRequest = EsqlQueryRequest.fromXContentSync(parser);
         }
 
         LOGGER.info("Beginning execution of ESQL query.\nQuery string: [{}]", esqlRequest.query());

+ 5 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -43,7 +43,10 @@ import org.elasticsearch.xpack.core.action.XPackInfoFeatureAction;
 import org.elasticsearch.xpack.core.action.XPackUsageFeatureAction;
 import org.elasticsearch.xpack.esql.EsqlInfoTransportAction;
 import org.elasticsearch.xpack.esql.EsqlUsageTransportAction;
+import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
+import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction;
+import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
@@ -124,6 +127,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
     public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
         return List.of(
             new ActionHandler<>(EsqlQueryAction.INSTANCE, TransportEsqlQueryAction.class),
+            new ActionHandler<>(EsqlAsyncGetResultAction.INSTANCE, TransportEsqlAsyncGetResultsAction.class),
             new ActionHandler<>(EsqlStatsAction.INSTANCE, TransportEsqlStatsAction.class),
             new ActionHandler<>(XPackUsageFeatureAction.ESQL, EsqlUsageTransportAction.class),
             new ActionHandler<>(XPackInfoFeatureAction.ESQL, EsqlInfoTransportAction.class)
@@ -140,7 +144,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
         IndexNameExpressionResolver indexNameExpressionResolver,
         Supplier<DiscoveryNodes> nodesInCluster
     ) {
-        return List.of(new RestEsqlQueryAction());
+        return List.of(new RestEsqlQueryAction(), new RestEsqlAsyncQueryAction(), new RestEsqlGetAsyncResultAction());
     }
 
     @Override

+ 58 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlAsyncGetResultsAction.java

@@ -0,0 +1,58 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.plugin;
+
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
+import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
+import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
+import org.elasticsearch.xpack.ql.plugin.AbstractTransportQlAsyncGetResultsAction;
+
+public class TransportEsqlAsyncGetResultsAction extends AbstractTransportQlAsyncGetResultsAction<EsqlQueryResponse, EsqlQueryTask> {
+
+    private final BlockFactory blockFactory;
+
+    @Inject
+    public TransportEsqlAsyncGetResultsAction(
+        TransportService transportService,
+        ActionFilters actionFilters,
+        ClusterService clusterService,
+        NamedWriteableRegistry registry,
+        Client client,
+        ThreadPool threadPool,
+        BigArrays bigArrays,
+        BlockFactory blockFactory
+    ) {
+        super(
+            EsqlAsyncGetResultAction.NAME,
+            transportService,
+            actionFilters,
+            clusterService,
+            registry,
+            client,
+            threadPool,
+            bigArrays,
+            EsqlQueryTask.class
+        );
+        this.blockFactory = blockFactory;
+    }
+
+    @Override
+    public Writeable.Reader<EsqlQueryResponse> responseReader() {
+        return EsqlQueryResponse.reader(blockFactory);
+    }
+}

+ 97 - 5
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -11,8 +11,11 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ActionRunnable;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.client.internal.Client;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
 import org.elasticsearch.compute.data.BlockFactory;
@@ -23,22 +26,32 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.async.AsyncExecutionId;
 import org.elasticsearch.xpack.esql.action.ColumnInfo;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
 import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
+import org.elasticsearch.xpack.esql.action.EsqlQueryTask;
 import org.elasticsearch.xpack.esql.enrich.EnrichLookupService;
 import org.elasticsearch.xpack.esql.enrich.EnrichPolicyResolver;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.esql.type.EsqlDataTypes;
+import org.elasticsearch.xpack.ql.async.AsyncTaskManagementService;
 
+import java.io.IOException;
 import java.time.ZoneOffset;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.concurrent.Executor;
 
-public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRequest, EsqlQueryResponse> {
+import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
+
+public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRequest, EsqlQueryResponse>
+    implements
+        AsyncTaskManagementService.AsyncOperation<EsqlQueryRequest, EsqlQueryResponse, EsqlQueryTask> {
 
     private final PlanExecutor planExecutor;
     private final ComputeService computeService;
@@ -47,6 +60,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
     private final Executor requestExecutor;
     private final EnrichPolicyResolver enrichPolicyResolver;
     private final EnrichLookupService enrichLookupService;
+    private final AsyncTaskManagementService<EsqlQueryRequest, EsqlQueryResponse, EsqlQueryTask> asyncTaskManagementService;
 
     @Inject
     public TransportEsqlQueryAction(
@@ -58,7 +72,10 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
         ClusterService clusterService,
         ThreadPool threadPool,
         BigArrays bigArrays,
-        BlockFactory blockFactory
+        BlockFactory blockFactory,
+        Client client,
+        NamedWriteableRegistry registry
+
     ) {
         // TODO replace SAME when removing workaround for https://github.com/elastic/elasticsearch/issues/97916
         super(EsqlQueryAction.NAME, transportService, actionFilters, EsqlQueryRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
@@ -79,12 +96,41 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
             bigArrays,
             blockFactory
         );
+        this.asyncTaskManagementService = new AsyncTaskManagementService<>(
+            XPackPlugin.ASYNC_RESULTS_INDEX,
+            client,
+            ASYNC_SEARCH_ORIGIN,
+            registry,
+            taskManager,
+            EsqlQueryAction.INSTANCE.name(),
+            this,
+            EsqlQueryTask.class,
+            clusterService,
+            threadPool,
+            bigArrays
+        );
     }
 
     @Override
     protected void doExecute(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
-        // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
-        requestExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l)));
+        listener = listener.delegateFailureAndWrap(ActionListener::respondAndRelease);
+        if (requestIsAsync(request)) {
+            asyncTaskManagementService.asyncExecute(
+                request,
+                request.waitForCompletionTimeout(),
+                request.keepAlive(),
+                request.keepOnCompletion(),
+                listener
+            );
+        } else {
+            // workaround for https://github.com/elastic/elasticsearch/issues/97916 - TODO remove this when we can
+            requestExecutor.execute(ActionRunnable.wrap(listener, l -> doExecuteForked(task, request, l)));
+        }
+    }
+
+    @Override
+    public void execute(EsqlQueryRequest request, EsqlQueryTask task, ActionListener<EsqlQueryResponse> listener) {
+        doExecuteForked(task, request, listener);
     }
 
     private void doExecuteForked(Task task, EsqlQueryRequest request, ActionListener<EsqlQueryResponse> listener) {
@@ -120,7 +166,7 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
                         EsqlQueryResponse.Profile profile = configuration.profile()
                             ? new EsqlQueryResponse.Profile(result.profiles())
                             : null;
-                        return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar());
+                        return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
                     })
                 )
             )
@@ -143,4 +189,50 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
     public EnrichLookupService enrichLookupService() {
         return enrichLookupService;
     }
+
+    @Override
+    public EsqlQueryTask createTask(
+        EsqlQueryRequest request,
+        long id,
+        String type,
+        String action,
+        TaskId parentTaskId,
+        Map<String, String> headers,
+        Map<String, String> originHeaders,
+        AsyncExecutionId asyncExecutionId
+    ) {
+        return new EsqlQueryTask(
+            id,
+            type,
+            action,
+            request.getDescription(),
+            parentTaskId,
+            headers,
+            originHeaders,
+            asyncExecutionId,
+            request.keepAlive()
+        );
+    }
+
+    @Override
+    public EsqlQueryResponse initialResponse(EsqlQueryTask task) {
+        return new EsqlQueryResponse(
+            List.of(),
+            List.of(),
+            null,
+            false,
+            task.getExecutionId().getEncoded(),
+            true, // is_running
+            true // isAsync
+        );
+    }
+
+    @Override
+    public EsqlQueryResponse readResponse(StreamInput inputStream) throws IOException {
+        throw new AssertionError("should not reach here");
+    }
+
+    private static boolean requestIsAsync(EsqlQueryRequest request) {
+        return request.async();
+    }
 }

+ 6 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/SerializationTestUtils.java

@@ -29,6 +29,7 @@ import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
 import org.elasticsearch.xpack.esql.querydsl.query.SingleValueQuery;
 import org.elasticsearch.xpack.esql.session.EsqlConfiguration;
 import org.elasticsearch.xpack.ql.expression.Expression;
+import org.elasticsearch.xpack.ql.plan.logical.LogicalPlan;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -43,6 +44,11 @@ public class SerializationTestUtils {
         EqualsHashCodeTestUtils.checkEqualsAndHashCode(plan, unused -> deserPlan);
     }
 
+    public static void assertSerialization(LogicalPlan plan) {
+        var deserPlan = serializeDeserialize(plan, PlanStreamOutput::writeLogicalPlanNode, PlanStreamInput::readLogicalPlanNode);
+        EqualsHashCodeTestUtils.checkEqualsAndHashCode(plan, unused -> deserPlan);
+    }
+
     public static void assertSerialization(Expression expression) {
         Expression deserExpression = serializeDeserialize(expression, PlanStreamOutput::writeExpression, PlanStreamInput::readExpression);
         EqualsHashCodeTestUtils.checkEqualsAndHashCode(expression, unused -> deserExpression);

+ 79 - 8
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryRequestTests.java

@@ -31,6 +31,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 import static org.hamcrest.Matchers.containsString;
@@ -55,7 +56,7 @@ public class EsqlQueryRequestTests extends ESTestCase {
                 "filter": %s
                 %s""", query, columnar, locale.toLanguageTag(), filter, paramsString);
 
-        EsqlQueryRequest request = parseEsqlQueryRequest(json);
+        EsqlQueryRequest request = parseEsqlQueryRequestSync(json);
 
         assertEquals(query, request.query());
         assertEquals(columnar, request.columnar());
@@ -69,6 +70,57 @@ public class EsqlQueryRequestTests extends ESTestCase {
         }
     }
 
+    public void testParseFieldsForAsync() throws IOException {
+        String query = randomAlphaOfLengthBetween(1, 100);
+        boolean columnar = randomBoolean();
+        Locale locale = randomLocale(random());
+        QueryBuilder filter = randomQueryBuilder();
+
+        List<TypedParamValue> params = randomParameters();
+        boolean hasParams = params.isEmpty() == false;
+        StringBuilder paramsString = paramsString(params, hasParams);
+        boolean keepOnCompletion = randomBoolean();
+        TimeValue waitForCompletion = TimeValue.parseTimeValue(randomTimeValue(), "test");
+        TimeValue keepAlive = TimeValue.parseTimeValue(randomTimeValue(), "test");
+        String json = String.format(
+            Locale.ROOT,
+            """
+                {
+                    "query": "%s",
+                    "columnar": %s,
+                    "locale": "%s",
+                    "filter": %s,
+                    "keep_on_completion": %s,
+                    "wait_for_completion_timeout": "%s",
+                    "keep_alive": "%s"
+                    %s""",
+            query,
+            columnar,
+            locale.toLanguageTag(),
+            filter,
+            keepOnCompletion,
+            waitForCompletion.getStringRep(),
+            keepAlive.getStringRep(),
+            paramsString
+        );
+
+        EsqlQueryRequest request = parseEsqlQueryRequestAsync(json);
+
+        assertEquals(query, request.query());
+        assertEquals(columnar, request.columnar());
+        assertEquals(locale.toLanguageTag(), request.locale().toLanguageTag());
+        assertEquals(locale, request.locale());
+        assertEquals(filter, request.filter());
+        assertEquals(keepOnCompletion, request.keepOnCompletion());
+        assertEquals(waitForCompletion, request.waitForCompletionTimeout());
+        assertEquals(keepAlive, request.keepAlive());
+
+        assertEquals(params.size(), request.params().size());
+        for (int i = 0; i < params.size(); i++) {
+            assertEquals(params.get(i), request.params().get(i));
+        }
+    }
+
     public void testRejectUnknownFields() {
         assertParserErrorMessage("""
             {
@@ -84,10 +136,15 @@ public class EsqlQueryRequestTests extends ESTestCase {
     }
 
     public void testMissingQueryIsNotValidation() throws IOException {
-        EsqlQueryRequest request = parseEsqlQueryRequest("""
+        String json = """
             {
                 "columnar": true
-            }""");
+            }""";
+        EsqlQueryRequest request = parseEsqlQueryRequestSync(json);
+        assertNotNull(request.validate());
+        assertThat(request.validate().getMessage(), containsString("[query] is required"));
+
+        request = parseEsqlQueryRequestAsync(json);
         assertNotNull(request.validate());
         assertThat(request.validate().getMessage(), containsString("[query] is required"));
     }
@@ -96,10 +153,12 @@ public class EsqlQueryRequestTests extends ESTestCase {
         String query = randomAlphaOfLength(10);
         int id = randomInt();
 
-        EsqlQueryRequest request = parseEsqlQueryRequest("""
+        String requestJson = """
             {
                 "query": "QUERY"
-            }""".replace("QUERY", query));
+            }""".replace("QUERY", query);
+
+        EsqlQueryRequest request = parseEsqlQueryRequestSync(requestJson);
         Task task = request.createTask(id, "transport", EsqlQueryAction.NAME, TaskId.EMPTY_TASK_ID, Map.of());
         assertThat(task.getDescription(), equalTo(query));
 
@@ -180,17 +239,29 @@ public class EsqlQueryRequestTests extends ESTestCase {
     }
 
     private static void assertParserErrorMessage(String json, String message) {
-        Exception e = expectThrows(IllegalArgumentException.class, () -> parseEsqlQueryRequest(json));
+        Exception e = expectThrows(IllegalArgumentException.class, () -> parseEsqlQueryRequestSync(json));
+        assertThat(e.getMessage(), containsString(message));
+
+        e = expectThrows(IllegalArgumentException.class, () -> parseEsqlQueryRequestAsync(json));
         assertThat(e.getMessage(), containsString(message));
     }
 
-    private static EsqlQueryRequest parseEsqlQueryRequest(String json) throws IOException {
+    static EsqlQueryRequest parseEsqlQueryRequestSync(String json) throws IOException {
+        return parseEsqlQueryRequest(json, EsqlQueryRequest::fromXContentSync);
+    }
+
+    static EsqlQueryRequest parseEsqlQueryRequestAsync(String json) throws IOException {
+        return parseEsqlQueryRequest(json, EsqlQueryRequest::fromXContentAsync);
+    }
+
+    static EsqlQueryRequest parseEsqlQueryRequest(String json, Function<XContentParser, EsqlQueryRequest> fromXContentFunc)
+        throws IOException {
         SearchModule searchModule = new SearchModule(Settings.EMPTY, Collections.emptyList());
         XContentParserConfiguration config = XContentParserConfiguration.EMPTY.withRegistry(
             new NamedXContentRegistry(searchModule.getNamedXContents())
         );
         try (XContentParser parser = XContentType.JSON.xContent().createParser(config, json)) {
-            return EsqlQueryRequest.fromXContent(parser);
+            return fromXContentFunc.apply(parser);
         }
     }
 

+ 71 - 6
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java

@@ -89,11 +89,21 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
     }
 
     EsqlQueryResponse randomResponse(boolean columnar, EsqlQueryResponse.Profile profile) {
+        return randomResponseAsync(columnar, profile, false);
+    }
+
+    EsqlQueryResponse randomResponseAsync(boolean columnar, EsqlQueryResponse.Profile profile, boolean async) {
         int noCols = randomIntBetween(1, 10);
         List<ColumnInfo> columns = randomList(noCols, noCols, this::randomColumnInfo);
         int noPages = randomIntBetween(1, 20);
         List<Page> values = randomList(noPages, noPages, () -> randomPage(columns));
-        return new EsqlQueryResponse(columns, values, profile, columnar);
+        String id = null;
+        boolean isRunning = false;
+        if (async) {
+            id = randomAlphaOfLengthBetween(1, 16);
+            isRunning = randomBoolean();
+        }
+        return new EsqlQueryResponse(columns, values, profile, columnar, id, isRunning, async);
     }
 
     private ColumnInfo randomColumnInfo() {
@@ -167,19 +177,21 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 List<ColumnInfo> cols = new ArrayList<>(instance.columns());
                 // keep the type the same so the values are still valid but change the name
                 cols.set(mutCol, new ColumnInfo(cols.get(mutCol).name() + "mut", cols.get(mutCol).type()));
-                yield new EsqlQueryResponse(cols, deepCopyOfPages(instance), instance.profile(), instance.columnar());
+                yield new EsqlQueryResponse(cols, deepCopyOfPages(instance), instance.profile(), instance.columnar(), instance.isAsync());
             }
             case 1 -> new EsqlQueryResponse(
                 instance.columns(),
                 deepCopyOfPages(instance),
                 instance.profile(),
-                false == instance.columnar()
+                false == instance.columnar(),
+                instance.isAsync()
             );
             case 2 -> new EsqlQueryResponse(
                 instance.columns(),
                 deepCopyOfPages(instance),
                 randomValueOtherThan(instance.profile(), this::randomProfile),
-                instance.columnar()
+                instance.columnar(),
+                instance.isAsync()
             );
             case 3 -> {
                 int noPages = instance.pages().size();
@@ -188,7 +200,13 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                     differentPages.forEach(p -> Releasables.closeExpectNoException(p::releaseBlocks));
                     differentPages = randomList(noPages, noPages, () -> randomPage(instance.columns()));
                 } while (differentPages.equals(instance.pages()));
-                yield new EsqlQueryResponse(instance.columns(), differentPages, instance.profile(), instance.columnar());
+                yield new EsqlQueryResponse(
+                    instance.columns(),
+                    differentPages,
+                    instance.profile(),
+                    instance.columnar(),
+                    instance.isAsync()
+                );
             }
             default -> throw new IllegalArgumentException();
         };
@@ -223,6 +241,12 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
             assertChunkCount(resp, r -> 5 + bodySize);
         }
+
+        try (EsqlQueryResponse resp = randomResponseAsync(true, null, true)) {
+            int columnCount = resp.pages().get(0).getBlockCount();
+            int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount() * p.getBlockCount()).sum() + columnCount * 2;
+            assertChunkCount(resp, r -> 6 + bodySize); // is_running
+        }
     }
 
     public void testChunkResponseSizeRows() {
@@ -230,6 +254,10 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
             int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount()).sum();
             assertChunkCount(resp, r -> 5 + bodySize);
         }
+        try (EsqlQueryResponse resp = randomResponseAsync(false, null, true)) {
+            int bodySize = resp.pages().stream().mapToInt(p -> p.getPositionCount()).sum();
+            assertChunkCount(resp, r -> 6 + bodySize);
+        }
     }
 
     public void testSimpleXContentColumnar() {
@@ -239,6 +267,13 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         }
     }
 
+    public void testSimpleXContentColumnarAsync() {
+        try (EsqlQueryResponse response = simple(true, true)) {
+            assertThat(Strings.toString(response), equalTo("""
+                {"is_running":false,"columns":[{"name":"foo","type":"integer"}],"values":[[40,80]]}"""));
+        }
+    }
+
     public void testSimpleXContentRows() {
         try (EsqlQueryResponse response = simple(false)) {
             assertThat(Strings.toString(response), equalTo("""
@@ -246,12 +281,41 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
         }
     }
 
+    public void testSimpleXContentRowsAsync() {
+        try (EsqlQueryResponse response = simple(false, true)) {
+            assertThat(Strings.toString(response), equalTo("""
+                {"is_running":false,"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}"""));
+        }
+    }
+
+    public void testBasicXContentIdAndRunning() {
+        try (
+            EsqlQueryResponse response = new EsqlQueryResponse(
+                List.of(new ColumnInfo("foo", "integer")),
+                List.of(new Page(new IntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
+                null,
+                false,
+                "id-123",
+                true,
+                true
+            )
+        ) {
+            assertThat(Strings.toString(response), equalTo("""
+                {"id":"id-123","is_running":true,"columns":[{"name":"foo","type":"integer"}],"values":[[40],[80]]}"""));
+        }
+    }
+
     private EsqlQueryResponse simple(boolean columnar) {
+        return simple(columnar, false);
+    }
+
+    private EsqlQueryResponse simple(boolean columnar, boolean async) {
         return new EsqlQueryResponse(
             List.of(new ColumnInfo("foo", "integer")),
             List.of(new Page(new IntArrayVector(new int[] { 40, 80 }, 2).asBlock())),
             null,
-            columnar
+            columnar,
+            async
         );
     }
 
@@ -263,6 +327,7 @@ public class EsqlQueryResponseTests extends AbstractChunkedSerializingTestCase<E
                 new EsqlQueryResponse.Profile(
                     List.of(new DriverProfile(List.of(new DriverStatus.OperatorStatus("asdf", new AbstractPageMappingOperator.Status(10)))))
                 ),
+                false,
                 false
             );
         ) {

+ 4 - 4
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatTests.java

@@ -231,12 +231,12 @@ public class TextFormatTests extends ESTestCase {
     public void testPlainTextEmptyCursorWithoutColumns() {
         assertEquals(
             StringUtils.EMPTY,
-            getTextBodyContent(PLAIN_TEXT.format(req(), new EsqlQueryResponse(emptyList(), emptyList(), null, false)))
+            getTextBodyContent(PLAIN_TEXT.format(req(), new EsqlQueryResponse(emptyList(), emptyList(), null, false, false)))
         );
     }
 
     private static EsqlQueryResponse emptyData() {
-        return new EsqlQueryResponse(singletonList(new ColumnInfo("name", "keyword")), emptyList(), null, false);
+        return new EsqlQueryResponse(singletonList(new ColumnInfo("name", "keyword")), emptyList(), null, false, false);
     }
 
     private static EsqlQueryResponse regularData() {
@@ -259,7 +259,7 @@ public class TextFormatTests extends ESTestCase {
             )
         );
 
-        return new EsqlQueryResponse(headers, values, null, false);
+        return new EsqlQueryResponse(headers, values, null, false, false);
     }
 
     private static EsqlQueryResponse escapedData() {
@@ -277,7 +277,7 @@ public class TextFormatTests extends ESTestCase {
             )
         );
 
-        return new EsqlQueryResponse(headers, values, null, false);
+        return new EsqlQueryResponse(headers, values, null, false, false);
     }
 
     private static RestRequest req() {

+ 3 - 0
x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/formatter/TextFormatterTests.java

@@ -62,6 +62,7 @@ public class TextFormatterTests extends ESTestCase {
             )
         ),
         null,
+        randomBoolean(),
         randomBoolean()
     );
 
@@ -125,6 +126,7 @@ public class TextFormatterTests extends ESTestCase {
                 )
             ),
             null,
+            randomBoolean(),
             randomBoolean()
         );
 
@@ -164,6 +166,7 @@ public class TextFormatterTests extends ESTestCase {
                             )
                         ),
                         null,
+                        randomBoolean(),
                         randomBoolean()
                     )
                 ).format(false)

+ 8 - 1
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java

@@ -215,6 +215,7 @@ public class AsyncTaskManagementService<
                 acquiredListener.onResponse(operation.initialResponse(searchTask));
             }
         }, waitForCompletionTimeout, threadPool.executor(ThreadPool.Names.SEARCH));
+
         // This will be performed at the end of normal execution
         return ActionListener.wrap(response -> {
             ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
@@ -234,7 +235,11 @@ public class AsyncTaskManagementService<
                 }
             } else {
                 // We finished after timeout - saving results
-                storeResults(searchTask, new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()));
+                storeResults(
+                    searchTask,
+                    new StoredAsyncResponse<>(response, threadPool.absoluteTimeInMillis() + keepAlive.getMillis()),
+                    ActionListener.running(response::decRef)
+                );
             }
         }, e -> {
             ActionListener<Response> acquiredListener = exclusiveListener.getAndSet(null);
@@ -272,6 +277,7 @@ public class AsyncTaskManagementService<
                 ActionListener.wrap(
                     // We should only unregister after the result is saved
                     resp -> {
+                        // TODO: generalize the logging, not just eql
                         logger.trace(() -> "stored eql search results for [" + searchTask.getExecutionId().getEncoded() + "]");
                         taskManager.unregister(searchTask);
                         if (storedResponse.getException() != null) {
@@ -290,6 +296,7 @@ public class AsyncTaskManagementService<
                         if (cause instanceof DocumentMissingException == false
                             && cause instanceof VersionConflictEngineException == false) {
                             logger.error(
+                                // TODO: generalize the logging, not just eql
                                 () -> format("failed to store eql search results for [%s]", searchTask.getExecutionId().getEncoded()),
                                 exc
                             );

+ 1 - 0
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -517,6 +517,7 @@ public class Constants {
         "indices:data/read/eql",
         "indices:data/read/eql/async/get",
         "indices:data/read/esql",
+        "indices:data/read/esql/async/get",
         "indices:data/read/explain",
         "indices:data/read/field_caps",
         "indices:data/read/get",