瀏覽代碼

Add ES|QL async delete API (#103628)

This commit adds the ES|QL async delete API, to cancel running ES|QL queries and/or remove stored results.
Chris Hegarty 1 年之前
父節點
當前提交
2d352e9cb4

+ 5 - 0
docs/changelog/103628.yaml

@@ -0,0 +1,5 @@
+pr: 103628
+summary: Add ES|QL async delete API
+area: ES|QL
+type: enhancement
+issues: []

+ 10 - 0
docs/reference/esql/esql-async-query-api.asciidoc

@@ -80,3 +80,13 @@ finished, and the results are returned.
 }
 ----
 // TEST[skip: no access to search ID - may return response values]
+
+Use the <<delete-async-eqsl-query-api,delete async ES|QL query API>> to
+delete an async search before the `keep_alive` period ends. If the query
+is still running, {es} cancels it.
+
+[source,console]
+----
+DELETE /_query/async/delete/FmdMX2pIang3UWhLRU5QS0lqdlppYncaMUpYQ05oSkpTc3kwZ21EdC1tbFJXQToxOTI=
+----
+// TEST[skip: no access to search ID]

+ 12 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResultsService.java

@@ -15,6 +15,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.TriConsumer;
+import org.elasticsearch.core.RefCounted;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.TaskManager;
@@ -144,7 +145,17 @@ public class AsyncResultsService<Task extends AsyncTask, Response extends AsyncR
         long nowInMillis,
         ActionListener<Response> listener
     ) {
-        store.getResponse(searchId, true, listener.delegateFailure((l, response) -> sendFinalResponse(request, response, nowInMillis, l)));
+        store.getResponse(searchId, true, listener.delegateFailure((l, response) -> {
+            try {
+                sendFinalResponse(request, response, nowInMillis, l);
+            } finally {
+                if (response instanceof StoredAsyncResponse<?> storedAsyncResponse
+                    && storedAsyncResponse.getResponse() instanceof RefCounted refCounted) {
+                    refCounted.decRef();
+                }
+            }
+
+        }));
     }
 
     private void sendFinalResponse(GetAsyncResultRequest request, Response response, long nowInMillis, ActionListener<Response> listener) {

+ 165 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AbstractPausableIntegTestCase.java

@@ -0,0 +1,165 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.support.WriteRequest;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.index.engine.SegmentsStats;
+import org.elasticsearch.index.mapper.OnScriptError;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.ScriptPlugin;
+import org.elasticsearch.script.LongFieldScript;
+import org.elasticsearch.script.ScriptContext;
+import org.elasticsearch.script.ScriptEngine;
+import org.elasticsearch.search.lookup.SearchLookup;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.json.JsonXContent;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/** A pausable testcase. Subclasses extend this testcase to simulate slow running queries.
+ *
+ * Uses the evaluation of a runtime field in the mappings "pause_me" of type long, along
+ * with a custom script language "pause", and semaphore "scriptPermits", to block execution.
+ */
+public abstract class AbstractPausableIntegTestCase extends AbstractEsqlIntegTestCase {
+
+    private static final Logger LOGGER = LogManager.getLogger(AbstractPausableIntegTestCase.class);
+
+    protected static final Semaphore scriptPermits = new Semaphore(0);
+
+    protected int pageSize = -1;
+
+    protected int numberOfDocs = -1;
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return CollectionUtils.appendToCopy(super.nodePlugins(), PausableFieldPlugin.class);
+    }
+
+    protected int pageSize() {
+        if (pageSize == -1) {
+            pageSize = between(10, 100);
+        }
+        return pageSize;
+    }
+
+    protected int numberOfDocs() {
+        if (numberOfDocs == -1) {
+            numberOfDocs = between(4 * pageSize(), 5 * pageSize());
+        }
+        return numberOfDocs;
+    }
+
+    @Before
+    public void setupIndex() throws IOException {
+        assumeTrue("requires query pragmas", canUseQueryPragmas());
+
+        XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
+        mapping.startObject("runtime");
+        {
+            mapping.startObject("pause_me");
+            {
+                mapping.field("type", "long");
+                mapping.startObject("script").field("source", "").field("lang", "pause").endObject();
+            }
+            mapping.endObject();
+        }
+        mapping.endObject();
+        client().admin()
+            .indices()
+            .prepareCreate("test")
+            .setSettings(Map.of("number_of_shards", 1, "number_of_replicas", 0))
+            .setMapping(mapping.endObject())
+            .get();
+
+        BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
+        for (int i = 0; i < numberOfDocs(); i++) {
+            bulk.add(prepareIndex("test").setId(Integer.toString(i)).setSource("foo", i));
+        }
+        bulk.get();
+        /*
+         * forceMerge so we can be sure that we don't bump into tiny
+         * segments that finish super quickly and cause us to report strange
+         * statuses when we expect "starting".
+         */
+        client().admin().indices().prepareForceMerge("test").setMaxNumSegments(1).get();
+        /*
+         * Double super extra paranoid check that force merge worked. It's
+         * failed to reduce the index to a single segment and caused this test
+         * to fail in very difficult to debug ways. If it fails again, it'll
+         * trip here. Or maybe it won't! And we'll learn something. Maybe
+         * it's ghosts.
+         */
+        SegmentsStats stats = client().admin().indices().prepareStats("test").get().getPrimaries().getSegments();
+        if (stats.getCount() != 1L) {
+            fail(Strings.toString(stats));
+        }
+    }
+
+    public static class PausableFieldPlugin extends Plugin implements ScriptPlugin {
+
+        @Override
+        public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
+            return new ScriptEngine() {
+                @Override
+                public String getType() {
+                    return "pause";
+                }
+
+                @Override
+                @SuppressWarnings("unchecked")
+                public <FactoryType> FactoryType compile(
+                    String name,
+                    String code,
+                    ScriptContext<FactoryType> context,
+                    Map<String, String> params
+                ) {
+                    return (FactoryType) new LongFieldScript.Factory() {
+                        @Override
+                        public LongFieldScript.LeafFactory newFactory(
+                            String fieldName,
+                            Map<String, Object> params,
+                            SearchLookup searchLookup,
+                            OnScriptError onScriptError
+                        ) {
+                            return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
+                                @Override
+                                public void execute() {
+                                    try {
+                                        assertTrue(scriptPermits.tryAcquire(1, TimeUnit.MINUTES));
+                                    } catch (Exception e) {
+                                        throw new AssertionError(e);
+                                    }
+                                    LOGGER.debug("--> emitting value");
+                                    emit(1);
+                                }
+                            };
+                        }
+                    };
+                }
+
+                @Override
+                public Set<ScriptContext<?>> getSupportedContexts() {
+                    return Set.of(LongFieldScript.CONTEXT);
+                }
+            };
+        }
+    }
+}

+ 257 - 0
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/AsyncEsqlQueryActionIT.java

@@ -0,0 +1,257 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.compute.operator.exchange.ExchangeService;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.tasks.TaskInfo;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
+import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
+import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
+import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
+import org.hamcrest.core.IsEqual;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.elasticsearch.core.TimeValue.timeValueMillis;
+import static org.elasticsearch.core.TimeValue.timeValueMinutes;
+import static org.elasticsearch.core.TimeValue.timeValueSeconds;
+import static org.elasticsearch.test.hamcrest.OptionalMatchers.isEmpty;
+import static org.elasticsearch.test.hamcrest.OptionalMatchers.isPresent;
+import static org.elasticsearch.xpack.esql.EsqlTestUtils.getValuesList;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+/**
+ * Individual tests for specific aspects of the async query API.
+ */
+public class AsyncEsqlQueryActionIT extends AbstractPausableIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        ArrayList<Class<? extends Plugin>> actions = new ArrayList<>(super.nodePlugins());
+        actions.add(EsqlAsyncActionIT.LocalStateEsqlAsync.class);
+        actions.add(InternalExchangePlugin.class);
+        return Collections.unmodifiableList(actions);
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        return Settings.builder()
+            .put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(500, 2000)))
+            .build();
+    }
+
+    public void testBasicAsyncExecution() throws Exception {
+        try (var initialResponse = sendAsyncQuery()) {
+            assertThat(initialResponse.asyncExecutionId(), isPresent());
+            assertThat(initialResponse.isRunning(), is(true));
+            String id = initialResponse.asyncExecutionId().get();
+
+            if (randomBoolean()) {
+                // let's timeout first
+                var getResultsRequest = new GetAsyncResultRequest(id);
+                getResultsRequest.setWaitForCompletionTimeout(timeValueMillis(10));
+                getResultsRequest.setKeepAlive(randomKeepAlive());
+                var future = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest);
+                try (var responseWithTimeout = future.get()) {
+                    assertThat(initialResponse.asyncExecutionId(), isPresent());
+                    assertThat(responseWithTimeout.asyncExecutionId().get(), equalTo(id));
+                    assertThat(responseWithTimeout.isRunning(), is(true));
+                }
+            }
+
+            // Now we wait
+            var getResultsRequest = new GetAsyncResultRequest(id);
+            getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(60));
+            getResultsRequest.setKeepAlive(randomKeepAlive());
+            var future = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest);
+
+            // release the permits to allow the query to proceed
+            scriptPermits.release(numberOfDocs());
+
+            try (var finalResponse = future.get()) {
+                assertThat(finalResponse, notNullValue());
+                assertThat(finalResponse.isRunning(), is(false));
+                assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfo("sum(pause_me)", "long"))));
+                assertThat(getValuesList(finalResponse).size(), equalTo(1));
+            }
+
+            // Get the stored result (again)
+            var again = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest);
+            try (var finalResponse = again.get()) {
+                assertThat(finalResponse, notNullValue());
+                assertThat(finalResponse.isRunning(), is(false));
+                assertThat(finalResponse.columns(), equalTo(List.of(new ColumnInfo("sum(pause_me)", "long"))));
+                assertThat(getValuesList(finalResponse).size(), equalTo(1));
+            }
+
+            AcknowledgedResponse deleteResponse = deleteAsyncId(id);
+            assertThat(deleteResponse.isAcknowledged(), equalTo(true));
+            // the stored response should no longer be retrievable
+            var e = expectThrows(ResourceNotFoundException.class, () -> deleteAsyncId(id));
+            assertThat(e.getMessage(), IsEqual.equalTo(id));
+        } finally {
+            scriptPermits.drainPermits();
+        }
+    }
+
+    public void testAsyncCancellation() throws Exception {
+        try (var initialResponse = sendAsyncQuery()) {
+            assertThat(initialResponse.asyncExecutionId(), isPresent());
+            assertThat(initialResponse.isRunning(), is(true));
+            String id = initialResponse.asyncExecutionId().get();
+
+            DeleteAsyncResultRequest request = new DeleteAsyncResultRequest(id);
+            var future = client().execute(DeleteAsyncResultAction.INSTANCE, request);
+
+            // there should be just one task
+            List<TaskInfo> tasks = getEsqlQueryTasks();
+            assertThat(tasks.size(), is(1));
+
+            // release the permits to allow the query to proceed
+            scriptPermits.release(numberOfDocs());
+
+            var deleteResponse = future.actionGet(timeValueSeconds(60));
+            assertThat(deleteResponse.isAcknowledged(), equalTo(true));
+
+            // there should be no tasks after delete
+            tasks = getEsqlQueryTasks();
+            assertThat(tasks.size(), is(0));
+
+            // the stored response should no longer be retrievable
+            var getResultsRequest = new GetAsyncResultRequest(id);
+            getResultsRequest.setKeepAlive(timeValueMinutes(10));
+            getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(60));
+            var e = expectThrows(
+                ResourceNotFoundException.class,
+                () -> client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).actionGet()
+            );
+            assertThat(e.getMessage(), equalTo(id));
+        } finally {
+            scriptPermits.drainPermits();
+        }
+    }
+
+    public void testFinishingBeforeTimeoutKeep() {
+        testFinishingBeforeTimeout(true);
+    }
+
+    public void testFinishingBeforeTimeoutDoNotKeep() {
+        testFinishingBeforeTimeout(false);
+    }
+
+    private void testFinishingBeforeTimeout(boolean keepOnCompletion) {
+        // don't block the query execution at all
+        scriptPermits.drainPermits();
+        assert scriptPermits.availablePermits() == 0;
+
+        scriptPermits.release(numberOfDocs());
+
+        var request = new EsqlQueryRequestBuilder(client()).query("from test | stats sum(pause_me)")
+            .pragmas(queryPragmas())
+            .async(true)
+            .waitForCompletionTimeout(TimeValue.timeValueSeconds(60))
+            .keepOnCompletion(keepOnCompletion)
+            .keepAlive(randomKeepAlive());
+
+        try (var response = request.execute().actionGet(60, TimeUnit.SECONDS)) {
+            assertThat(response.isRunning(), is(false));
+            assertThat(response.columns(), equalTo(List.of(new ColumnInfo("sum(pause_me)", "long"))));
+            assertThat(getValuesList(response).size(), equalTo(1));
+
+            if (keepOnCompletion) {
+                assertThat(response.asyncExecutionId(), isPresent());
+                // we should be able to retrieve the response by id, since it has been kept
+                String id = response.asyncExecutionId().get();
+                var getResultsRequest = new GetAsyncResultRequest(id);
+                getResultsRequest.setWaitForCompletionTimeout(timeValueSeconds(60));
+                var future = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest);
+                try (var resp = future.actionGet(60, TimeUnit.SECONDS)) {
+                    assertThat(resp.asyncExecutionId().get(), equalTo(id));
+                    assertThat(resp.isRunning(), is(false));
+                    assertThat(resp.columns(), equalTo(List.of(new ColumnInfo("sum(pause_me)", "long"))));
+                    assertThat(getValuesList(resp).size(), equalTo(1));
+                }
+            } else {
+                assertThat(response.asyncExecutionId(), isEmpty());
+            }
+        } finally {
+            scriptPermits.drainPermits();
+        }
+    }
+
+    private List<TaskInfo> getEsqlQueryTasks() throws Exception {
+        List<TaskInfo> foundTasks = new ArrayList<>();
+        assertBusy(() -> {
+            List<TaskInfo> tasks = client().admin()
+                .cluster()
+                .prepareListTasks()
+                .setActions(EsqlQueryAction.NAME + "[a]")
+                .setDetailed(true)
+                .get()
+                .getTasks();
+            foundTasks.addAll(tasks);
+        });
+        return foundTasks;
+    }
+
+    private EsqlQueryResponse sendAsyncQuery() {
+        scriptPermits.drainPermits();
+        assert scriptPermits.availablePermits() == 0;
+
+        scriptPermits.release(between(1, 5));
+        var pragmas = queryPragmas();
+        return new EsqlQueryRequestBuilder(client()).query("from test | stats sum(pause_me)")
+            .pragmas(pragmas)
+            .async(true)
+            // deliberately small timeout, to frequently trigger incomplete response
+            .waitForCompletionTimeout(TimeValue.timeValueNanos(1))
+            .keepOnCompletion(randomBoolean())
+            .keepAlive(randomKeepAlive())
+            .execute()
+            .actionGet(60, TimeUnit.SECONDS);
+    }
+
+    private QueryPragmas queryPragmas() {
+        return new QueryPragmas(
+            Settings.builder()
+                // Force shard partitioning because that's all the tests know how to match. It is easier to reason about too.
+                .put("data_partitioning", "shard")
+                // Limit the page size to something small so we do more than one page worth of work, so we get more status updates.
+                .put("page_size", pageSize())
+                .build()
+        );
+    }
+
+    private AcknowledgedResponse deleteAsyncId(String id) {
+        DeleteAsyncResultRequest request = new DeleteAsyncResultRequest(id);
+        return client().execute(DeleteAsyncResultAction.INSTANCE, request).actionGet(timeValueSeconds(60));
+    }
+
+    TimeValue randomKeepAlive() {
+        return TimeValue.parseTimeValue(randomTimeValue(1, 5, "d"), "test");
+    }
+
+    public static class LocalStateEsqlAsync extends LocalStateCompositeXPackPlugin {
+        public LocalStateEsqlAsync(final Settings settings, final Path configPath) {
+            super(settings, configPath);
+        }
+    }
+}

+ 15 - 136
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

@@ -12,45 +12,25 @@ import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionFuture;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.support.WriteRequest;
-import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.util.CollectionUtils;
 import org.elasticsearch.compute.lucene.LuceneSourceOperator;
 import org.elasticsearch.compute.lucene.ValuesSourceReaderOperator;
 import org.elasticsearch.compute.operator.DriverStatus;
 import org.elasticsearch.compute.operator.DriverTaskRunner;
 import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator;
 import org.elasticsearch.compute.operator.exchange.ExchangeSourceOperator;
-import org.elasticsearch.index.engine.SegmentsStats;
-import org.elasticsearch.index.mapper.OnScriptError;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
-import org.elasticsearch.plugins.Plugin;
-import org.elasticsearch.plugins.ScriptPlugin;
-import org.elasticsearch.script.LongFieldScript;
-import org.elasticsearch.script.ScriptContext;
-import org.elasticsearch.script.ScriptEngine;
-import org.elasticsearch.search.lookup.SearchLookup;
 import org.elasticsearch.tasks.TaskCancelledException;
 import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.test.junit.annotations.TestLogging;
-import org.elasticsearch.xcontent.XContentBuilder;
-import org.elasticsearch.xcontent.json.JsonXContent;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 import org.junit.Before;
 
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.test.MapMatcher.assertMap;
 import static org.elasticsearch.test.MapMatcher.matchesMap;
@@ -71,83 +51,34 @@ import static org.hamcrest.Matchers.not;
     value = "org.elasticsearch.xpack.esql:TRACE,org.elasticsearch.compute:TRACE",
     reason = "These tests were failing frequently, let's learn as much as we can"
 )
-public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
-    private static int PAGE_SIZE;
-    private static int NUM_DOCS;
+public class EsqlActionTaskIT extends AbstractPausableIntegTestCase {
 
-    private static String READ_DESCRIPTION;
-    private static String MERGE_DESCRIPTION;
     private static final Logger LOGGER = LogManager.getLogger(EsqlActionTaskIT.class);
 
-    @Override
-    protected Collection<Class<? extends Plugin>> nodePlugins() {
-        return CollectionUtils.appendToCopy(super.nodePlugins(), PausableFieldPlugin.class);
-    }
+    private String READ_DESCRIPTION;
+    private String MERGE_DESCRIPTION;
 
     @Before
-    public void setupIndex() throws IOException {
+    public void setup() {
         assumeTrue("requires query pragmas", canUseQueryPragmas());
-        PAGE_SIZE = between(10, 100);
-        NUM_DOCS = between(4 * PAGE_SIZE, 5 * PAGE_SIZE);
         READ_DESCRIPTION = """
-            \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = PAGE_SIZE, limit = 2147483647]
+            \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 2147483647]
             \\_ValuesSourceReaderOperator[fields = [pause_me]]
             \\_AggregationOperator[mode = INITIAL, aggs = sum of longs]
-            \\_ExchangeSinkOperator""".replace("PAGE_SIZE", Integer.toString(PAGE_SIZE));
+            \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize()));
         MERGE_DESCRIPTION = """
             \\_ExchangeSourceOperator[]
             \\_AggregationOperator[mode = FINAL, aggs = sum of longs]
             \\_ProjectOperator[projection = [0]]
             \\_LimitOperator[limit = 500]
             \\_OutputOperator[columns = [sum(pause_me)]]""";
-
-        XContentBuilder mapping = JsonXContent.contentBuilder().startObject();
-        mapping.startObject("runtime");
-        {
-            mapping.startObject("pause_me");
-            {
-                mapping.field("type", "long");
-                mapping.startObject("script").field("source", "").field("lang", "pause").endObject();
-            }
-            mapping.endObject();
-        }
-        mapping.endObject();
-        client().admin()
-            .indices()
-            .prepareCreate("test")
-            .setSettings(Map.of("number_of_shards", 1, "number_of_replicas", 0))
-            .setMapping(mapping.endObject())
-            .get();
-
-        BulkRequestBuilder bulk = client().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-        for (int i = 0; i < NUM_DOCS; i++) {
-            bulk.add(prepareIndex("test").setId(Integer.toString(i)).setSource("foo", i));
-        }
-        bulk.get();
-        /*
-         * forceMerge so we can be sure that we don't bump into tiny
-         * segments that finish super quickly and cause us to report strange
-         * statuses when we expect "starting".
-         */
-        client().admin().indices().prepareForceMerge("test").setMaxNumSegments(1).get();
-        /*
-         * Double super extra paranoid check that force merge worked. It's
-         * failed to reduce the index to a single segment and caused this test
-         * to fail in very difficult to debug ways. If it fails again, it'll
-         * trip here. Or maybe it won't! And we'll learn something. Maybe
-         * it's ghosts.
-         */
-        SegmentsStats stats = client().admin().indices().prepareStats("test").get().getPrimaries().getSegments();
-        if (stats.getCount() != 1L) {
-            fail(Strings.toString(stats));
-        }
     }
 
     public void testTaskContents() throws Exception {
         ActionFuture<EsqlQueryResponse> response = startEsql();
         try {
             getTasksStarting();
-            scriptPermits.release(PAGE_SIZE);
+            scriptPermits.release(pageSize());
             List<TaskInfo> foundTasks = getTasksRunning();
             int luceneSources = 0;
             int valuesSourceReaders = 0;
@@ -158,7 +89,7 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
                 assertThat(status.sessionId(), not(emptyOrNullString()));
                 for (DriverStatus.OperatorStatus o : status.activeOperators()) {
                     logger.info("status {}", o);
-                    if (o.operator().startsWith("LuceneSourceOperator[maxPageSize=" + PAGE_SIZE)) {
+                    if (o.operator().startsWith("LuceneSourceOperator[maxPageSize=" + pageSize())) {
                         LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status();
                         assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices()));
                         assertThat(oStatus.sliceIndex(), lessThanOrEqualTo(oStatus.totalSlices()));
@@ -204,9 +135,9 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
             assertThat(exchangeSinks, greaterThanOrEqualTo(1));
             assertThat(exchangeSources, equalTo(1));
         } finally {
-            scriptPermits.release(NUM_DOCS);
+            scriptPermits.release(numberOfDocs());
             try (EsqlQueryResponse esqlResponse = response.get()) {
-                assertThat(Iterators.flatMap(esqlResponse.values(), i -> i).next(), equalTo((long) NUM_DOCS));
+                assertThat(Iterators.flatMap(esqlResponse.values(), i -> i).next(), equalTo((long) numberOfDocs()));
             }
         }
     }
@@ -219,7 +150,7 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
             cancelTask(running.taskId());
             assertCancelled(response);
         } finally {
-            scriptPermits.release(NUM_DOCS);
+            scriptPermits.release(numberOfDocs());
         }
     }
 
@@ -231,7 +162,7 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
             cancelTask(running.taskId());
             assertCancelled(response);
         } finally {
-            scriptPermits.release(NUM_DOCS);
+            scriptPermits.release(numberOfDocs());
         }
     }
 
@@ -249,7 +180,7 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
             cancelTask(tasks.get(0).taskId());
             assertCancelled(response);
         } finally {
-            scriptPermits.release(NUM_DOCS);
+            scriptPermits.release(numberOfDocs());
         }
     }
 
@@ -261,7 +192,7 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
                 // Force shard partitioning because that's all the tests know how to match. It is easier to reason about too.
                 .put("data_partitioning", "shard")
                 // Limit the page size to something small so we do more than one page worth of work, so we get more status updates.
-                .put("page_size", PAGE_SIZE)
+                .put("page_size", pageSize())
                 // Report the status after every action
                 .put("status_interval", "0ms")
                 .build()
@@ -274,7 +205,7 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
         request.setWaitForCompletion(false);
         LOGGER.debug("--> cancelling task [{}] without waiting for completion", taskId);
         client().admin().cluster().execute(CancelTasksAction.INSTANCE, request).actionGet();
-        scriptPermits.release(NUM_DOCS);
+        scriptPermits.release(numberOfDocs());
         request = new CancelTasksRequest().setTargetTaskId(taskId).setReason("test cancel");
         request.setWaitForCompletion(true);
         LOGGER.debug("--> cancelling task [{}] with waiting for completion", taskId);
@@ -367,56 +298,4 @@ public class EsqlActionTaskIT extends AbstractEsqlIntegTestCase {
             )
         );
     }
-
-    private static final Semaphore scriptPermits = new Semaphore(0);
-
-    public static class PausableFieldPlugin extends Plugin implements ScriptPlugin {
-        @Override
-        public ScriptEngine getScriptEngine(Settings settings, Collection<ScriptContext<?>> contexts) {
-            return new ScriptEngine() {
-                @Override
-                public String getType() {
-                    return "pause";
-                }
-
-                @Override
-                @SuppressWarnings("unchecked")
-                public <FactoryType> FactoryType compile(
-                    String name,
-                    String code,
-                    ScriptContext<FactoryType> context,
-                    Map<String, String> params
-                ) {
-                    return (FactoryType) new LongFieldScript.Factory() {
-                        @Override
-                        public LongFieldScript.LeafFactory newFactory(
-                            String fieldName,
-                            Map<String, Object> params,
-                            SearchLookup searchLookup,
-                            OnScriptError onScriptError
-                        ) {
-                            return ctx -> new LongFieldScript(fieldName, params, searchLookup, onScriptError, ctx) {
-                                @Override
-                                public void execute() {
-                                    try {
-                                        assertTrue(scriptPermits.tryAcquire(1, TimeUnit.MINUTES));
-                                    } catch (Exception e) {
-                                        throw new AssertionError(e);
-                                    }
-                                    LOGGER.debug("--> emitting value");
-                                    emit(1);
-                                }
-                            };
-                        }
-                    };
-                }
-
-                @Override
-                public Set<ScriptContext<?>> getSupportedContexts() {
-                    return Set.of(LongFieldScript.CONTEXT);
-                }
-            };
-        }
-    }
-
 }

+ 53 - 7
x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlAsyncActionIT.java

@@ -8,18 +8,42 @@
 package org.elasticsearch.xpack.esql.action;
 
 import org.elasticsearch.ElasticsearchTimeoutException;
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.support.master.AcknowledgedResponse;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.xpack.core.LocalStateCompositeXPackPlugin;
+import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
+import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
 
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
+import static org.elasticsearch.core.TimeValue.timeValueSeconds;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.core.IsEqual.equalTo;
 
+/**
+ * Runs test scenarios from EsqlActionIT, with an extra level of indirection
+ * through the async query and async get APIs.
+ */
 public class EsqlAsyncActionIT extends EsqlActionIT {
 
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        ArrayList<Class<? extends Plugin>> actions = new ArrayList<>(super.nodePlugins());
+        actions.add(LocalStateEsqlAsync.class);
+        return Collections.unmodifiableList(actions);
+    }
+
     @Override
     protected EsqlQueryResponse run(String esqlCommands, QueryPragmas pragmas, QueryBuilder filter) {
         EsqlQueryRequest request = new EsqlQueryRequest();
@@ -35,24 +59,40 @@ public class EsqlAsyncActionIT extends EsqlActionIT {
 
         var response = run(request);
         if (response.asyncExecutionId().isPresent()) {
+            String id = response.asyncExecutionId().get();
             assertThat(response.isRunning(), is(true));
             assertThat(response.columns(), is(empty())); // no partial results
             assertThat(response.pages(), is(empty()));
             response.close();
-            return getAsyncResponse(response.asyncExecutionId().get());
+            var getResponse = getAsyncResponse(id);
+            assertDeletable(id);
+            return getResponse;
         } else {
             return response;
         }
     }
 
+    void assertDeletable(String id) {
+        var resp = deleteAsyncId(id);
+        assertTrue(resp.isAcknowledged());
+        // the stored response should no longer be retrievable
+        var e = expectThrows(ResourceNotFoundException.class, () -> getAsyncResponse(id));
+        assertThat(e.getMessage(), equalTo(id));
+    }
+
     EsqlQueryResponse getAsyncResponse(String id) {
         try {
-            GetAsyncResultRequest getResultsRequest = new GetAsyncResultRequest(id).setWaitForCompletionTimeout(
-                TimeValue.timeValueSeconds(60)
-            );
-            var resp = client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).actionGet(30, TimeUnit.SECONDS);
-            // resp.decRef(); // the client has incremented our non-0 resp
-            return resp;
+            var getResultsRequest = new GetAsyncResultRequest(id).setWaitForCompletionTimeout(timeValueSeconds(60));
+            return client().execute(EsqlAsyncGetResultAction.INSTANCE, getResultsRequest).actionGet(30, TimeUnit.SECONDS);
+        } catch (ElasticsearchTimeoutException e) {
+            throw new AssertionError("timeout", e);
+        }
+    }
+
+    AcknowledgedResponse deleteAsyncId(String id) {
+        try {
+            DeleteAsyncResultRequest request = new DeleteAsyncResultRequest(id);
+            return client().execute(DeleteAsyncResultAction.INSTANCE, request).actionGet(30, TimeUnit.SECONDS);
         } catch (ElasticsearchTimeoutException e) {
             throw new AssertionError("timeout", e);
         }
@@ -71,4 +111,10 @@ public class EsqlAsyncActionIT extends EsqlActionIT {
     public void testIndexPatterns() throws Exception {
         super.testOverlappingIndexPatterns();
     }
+
+    public static class LocalStateEsqlAsync extends LocalStateCompositeXPackPlugin {
+        public LocalStateEsqlAsync(final Settings settings, final Path configPath) {
+            super(settings, configPath);
+        }
+    }
 }

+ 38 - 0
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/action/RestEsqlDeleteAsyncResultAction.java

@@ -0,0 +1,38 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.action;
+
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
+import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
+
+import java.util.List;
+
+import static org.elasticsearch.rest.RestRequest.Method.DELETE;
+
+public class RestEsqlDeleteAsyncResultAction extends BaseRestHandler {
+    @Override
+    public List<RestHandler.Route> routes() {
+        return List.of(new RestHandler.Route(DELETE, "/_query/async/{id}"));
+    }
+
+    @Override
+    public String getName() {
+        return "esql_delete_async_result";
+    }
+
+    @Override
+    protected BaseRestHandler.RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+        DeleteAsyncResultRequest delete = new DeleteAsyncResultRequest(request.param("id"));
+        return channel -> client.execute(DeleteAsyncResultAction.INSTANCE, delete, new RestToXContentListener<>(channel));
+    }
+}

+ 7 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/EsqlPlugin.java

@@ -46,6 +46,7 @@ import org.elasticsearch.xpack.esql.EsqlUsageTransportAction;
 import org.elasticsearch.xpack.esql.action.EsqlAsyncGetResultAction;
 import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlAsyncQueryAction;
+import org.elasticsearch.xpack.esql.action.RestEsqlDeleteAsyncResultAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlGetAsyncResultAction;
 import org.elasticsearch.xpack.esql.action.RestEsqlQueryAction;
 import org.elasticsearch.xpack.esql.execution.PlanExecutor;
@@ -144,7 +145,12 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {
         IndexNameExpressionResolver indexNameExpressionResolver,
         Supplier<DiscoveryNodes> nodesInCluster
     ) {
-        return List.of(new RestEsqlQueryAction(), new RestEsqlAsyncQueryAction(), new RestEsqlGetAsyncResultAction());
+        return List.of(
+            new RestEsqlQueryAction(),
+            new RestEsqlAsyncQueryAction(),
+            new RestEsqlGetAsyncResultAction(),
+            new RestEsqlDeleteAsyncResultAction()
+        );
     }
 
     @Override

+ 6 - 1
x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/TransportEsqlQueryAction.java

@@ -166,7 +166,12 @@ public class TransportEsqlQueryAction extends HandledTransportAction<EsqlQueryRe
                         EsqlQueryResponse.Profile profile = configuration.profile()
                             ? new EsqlQueryResponse.Profile(result.profiles())
                             : null;
-                        return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
+                        if (task instanceof EsqlQueryTask asyncTask && request.keepOnCompletion()) {
+                            String id = asyncTask.getExecutionId().getEncoded();
+                            return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), id, false, request.async());
+                        } else {
+                            return new EsqlQueryResponse(columns, result.pages(), profile, request.columnar(), request.async());
+                        }
                     })
                 )
             )