1
0
Эх сурвалжийг харах

Set max allowed size for stored async response (#74455)

Add a dynamic transient cluster setting search.max_async_search_response_size
that controls the maximum allowed size for a stored async search
response. The default max size is 10Mb. An attempt to store
an async search response larger than this size will result in error.

Relates to #67594
Mayya Sharipova 4 жил өмнө
parent
commit
aa76ebbfbe

+ 5 - 0
docs/reference/search/async-search.asciidoc

@@ -132,6 +132,11 @@ nor search requests that only include the <<search-suggesters,suggest section>>.
 {ccs-cap} is supported only with 
 <<ccs-min-roundtrips,`ccs_minimize_roundtrips`>> set to `false`.
 
+WARNING: By default, {es} doesn't allow to store an async search response
+larger than 10Mb, and an attempt to do this results in an error. The maximum
+allowed size for a stored async search response can be set by changing the
+`search.max_async_search_response_size` cluster level setting.
+
 [[get-async-search]]
 ==== Get async search
 

+ 1 - 1
server/src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java

@@ -170,7 +170,7 @@ public class BytesStreamOutput extends BytesStream {
         return bytes.ramBytesUsed();
     }
 
-    void ensureCapacity(long offset) {
+    protected void ensureCapacity(long offset) {
         if (offset > Integer.MAX_VALUE) {
             throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
         }

+ 1 - 0
server/src/main/java/org/elasticsearch/common/settings/ClusterSettings.java

@@ -400,6 +400,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
             SearchService.LOW_LEVEL_CANCELLATION_SETTING,
             SearchService.MAX_OPEN_SCROLL_CONTEXT,
             SearchService.ENABLE_REWRITE_AGGS_TO_FILTER_BY_FILTER,
+            SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING,
             Node.WRITE_PORTS_FILE_SETTING,
             Node.NODE_NAME_SETTING,
             Node.NODE_ATTRIBUTES,

+ 9 - 0
server/src/main/java/org/elasticsearch/search/SearchService.java

@@ -32,6 +32,8 @@ import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.component.AbstractLifecycleComponent;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.unit.ByteSizeUnit;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.core.Releasable;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.common.lucene.Lucene;
@@ -172,6 +174,13 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
         Property.NodeScope
     );
 
+    public static final Setting<ByteSizeValue> MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING = Setting.byteSizeSetting(
+        "search.max_async_search_response_size",
+        new ByteSizeValue(10, ByteSizeUnit.MB),
+        Property.Dynamic,
+        Property.NodeScope
+    );
+
     public static final int DEFAULT_SIZE = 10;
     public static final int DEFAULT_FROM = 0;
 

+ 47 - 1
x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

@@ -9,9 +9,11 @@ package org.elasticsearch.xpack.search;
 
 import org.apache.lucene.store.AlreadyClosedException;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.query.MatchAllQueryBuilder;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.bucket.terms.InternalTerms;
@@ -21,8 +23,10 @@ import org.elasticsearch.search.aggregations.metrics.InternalMin;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
 import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
 import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
 import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
+import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
 import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
 
 import java.util.ArrayList;
@@ -34,6 +38,8 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.elasticsearch.search.SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
@@ -60,7 +66,7 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
         createIndex(indexName, Settings.builder()
             .put("index.number_of_shards", numShards)
             .build());
-        numKeywords = randomIntBetween(1, 100);
+        numKeywords = randomIntBetween(50, 100);
         keywordFreqs = new HashMap<>();
         Set<String> keywordSet = new HashSet<>();
         for (int i = 0; i < numKeywords; i++) {
@@ -457,4 +463,44 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
         assertNotNull(response.getFailure());
         ensureTaskNotRunning(response.getId());
     }
+
+    public void testFinalResponseLargerMaxSize() throws Exception {
+        SearchSourceBuilder source = new SearchSourceBuilder()
+            .query(new MatchAllQueryBuilder())
+            .aggregation(AggregationBuilders.terms("terms").field("terms.keyword").size(numKeywords));
+
+        int limit = 1000; // should be enough to store initial response, but not enough for final response
+        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", limit + "b"));
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+
+        final SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(source, indexName);
+        request.setWaitForCompletionTimeout(TimeValue.timeValueMillis(0));
+
+        // initial response – ok
+        final AsyncSearchResponse initialResponse = submitAsyncSearch(request);
+        assertTrue(initialResponse.isRunning());
+        assertNull(initialResponse.getFailure());
+
+        // final response – with failure; test that stored async search response is updated with this failure
+        assertBusy(() -> {
+            final AsyncSearchResponse finalResponse = client().execute(GetAsyncSearchAction.INSTANCE,
+                new GetAsyncResultRequest(initialResponse.getId())
+                    .setWaitForCompletionTimeout(TimeValue.timeValueMillis(300))).get();
+            assertNotNull(finalResponse.getFailure());
+            assertFalse(finalResponse.isRunning());
+            if (finalResponse.getFailure() != null) {
+                assertEquals("Can't store an async search response larger than [" + limit + "] bytes. " +
+                        "This limit can be set by changing the [" + MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting.",
+                    finalResponse.getFailure().getMessage());
+            }
+        });
+
+        updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", (String) null));
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+
+        deleteAsyncSearch(initialResponse.getId());
+        ensureTaskRemoval(initialResponse.getId());
+    }
 }

+ 8 - 24
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

@@ -6,15 +6,10 @@
  */
 package org.elasticsearch.xpack.search;
 
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchAction;
 import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchTask;
 import org.elasticsearch.action.search.TransportSearchAction;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.HandledTransportAction;
@@ -27,8 +22,6 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.index.engine.DocumentMissingException;
-import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.tasks.Task;
@@ -49,8 +42,6 @@ import java.util.function.Supplier;
 import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
 
 public class TransportSubmitAsyncSearchAction extends HandledTransportAction<SubmitAsyncSearchRequest, AsyncSearchResponse> {
-    private static final Logger logger = LogManager.getLogger(TransportSubmitAsyncSearchAction.class);
-
     private final NodeClient nodeClient;
     private final Function<SearchRequest, InternalAggregation.ReduceContext> requestToAggReduceContextBuilder;
     private final TransportSearchAction searchAction;
@@ -177,21 +168,14 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
     private void onFinalResponse(AsyncSearchTask searchTask,
                                  AsyncSearchResponse response,
                                  Runnable nextAction) {
-        store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
-            ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
-                exc -> {
-                    Throwable cause = ExceptionsHelper.unwrapCause(exc);
-                    if (cause instanceof DocumentMissingException == false &&
-                            cause instanceof VersionConflictEngineException == false) {
-                        logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
-                            searchTask.getExecutionId().getEncoded()), exc);
-                    }
-                    unregisterTaskAndMoveOn(searchTask, nextAction);
-                }));
+        store.updateResponse(searchTask.getExecutionId().getDocId(),
+            threadContext.getResponseHeaders(),
+            response,
+            ActionListener.wrap(() ->  {
+                taskManager.unregister(searchTask);
+                nextAction.run();
+            })
+        );
     }
 
-    private void unregisterTaskAndMoveOn(SearchTask searchTask, Runnable nextAction) {
-        taskManager.unregister(searchTask);
-        nextAction.run();
-    }
 }

+ 7 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncResponse.java

@@ -20,4 +20,11 @@ public interface AsyncResponse<T extends AsyncResponse<?>> extends Writeable {
      */
     T withExpirationTime(long expirationTimeMillis);
 
+    /**
+     * Convert this AsyncResponse to a new AsyncResponse with a given failure
+     * @return a new AsyncResponse that stores a failure with a provided exception
+     */
+    default T convertToFailure(Exception exc) {
+        return null;
+    }
 }

+ 114 - 6
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

@@ -6,6 +6,10 @@
  */
 package org.elasticsearch.xpack.core.async;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.message.ParameterizedMessage;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.Version;
@@ -42,6 +46,8 @@ import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentParserUtils;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.engine.DocumentMissingException;
+import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.indices.SystemIndexDescriptor;
 import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.tasks.Task;
@@ -68,6 +74,7 @@ import java.util.function.Function;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
 import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
+import static org.elasticsearch.search.SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING;
 import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
 import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AUTHENTICATION_KEY;
 
@@ -75,6 +82,7 @@ import static org.elasticsearch.xpack.core.security.authc.AuthenticationField.AU
  * A service that exposes the CRUD operations for the async task-specific index.
  */
 public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
+    private static final Logger logger = LogManager.getLogger(AsyncTaskIndexService.class);
 
     public static final String HEADERS_FIELD = "headers";
     public static final String RESPONSE_HEADERS_FIELD = "response_headers";
@@ -149,6 +157,7 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
     private final NamedWriteableRegistry registry;
     private final Writeable.Reader<R> reader;
     private final BigArrays bigArrays;
+    private volatile long maxResponseSize;
     private final ClusterService clusterService;
     private final CircuitBreaker circuitBreaker;
 
@@ -167,6 +176,9 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
         this.registry = registry;
         this.reader = reader;
         this.bigArrays = bigArrays;
+        this.maxResponseSize = MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.get(clusterService.getSettings()).getBytes();
+        clusterService.getClusterSettings().addSettingsUpdateConsumer(
+            MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING, (v) -> maxResponseSize = v.getBytes());
         this.clusterService = clusterService;
         this.circuitBreaker = bigArrays.breakerService().getBreaker(CircuitBreaker.REQUEST);
     }
@@ -195,21 +207,24 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
     /**
      * Stores the initial response with the original headers of the authenticated user
      * and the expected expiration time.
+     * Currently for EQL we don't set limit for a stored async response
+     * TODO: add limit for stored async response in EQL, and instead of this method use createResponse
      */
-    public void createResponse(String docId,
+    public void createResponseForEQL(String docId,
                                Map<String, String> headers,
                                R response,
                                ActionListener<IndexResponse> listener) throws IOException {
         try {
             final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking());
             final XContentBuilder source = XContentFactory.jsonBuilder(buffer);
-            listener = ActionListener.runBefore(listener, source::close);
+            listener = ActionListener.runBefore(listener, buffer::close);
             source
                 .startObject()
                 .field(HEADERS_FIELD, headers)
                 .field(EXPIRATION_TIME_FIELD, response.getExpirationTime())
                 .directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os))
                 .endObject();
+
             // do not close the buffer or the XContentBuilder until the IndexRequest is completed (i.e., listener is notified);
             // otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage.
             source.flush();
@@ -224,16 +239,59 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
     }
 
     /**
-     * Stores the final response if the place-holder document is still present (update).
+     * Stores the initial response with the original headers of the authenticated user
+     * and the expected expiration time.
      */
+    public void createResponse(String docId,
+                               Map<String, String> headers,
+                               R response,
+                               ActionListener<IndexResponse> listener) throws IOException {
+        try {
+            final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutputWithLimit(
+                0, bigArrays.withCircuitBreaking(), maxResponseSize);
+            final XContentBuilder source = XContentFactory.jsonBuilder(buffer);
+            listener = ActionListener.runBefore(listener, buffer::close);
+            source
+                .startObject()
+                .field(HEADERS_FIELD, headers)
+                .field(EXPIRATION_TIME_FIELD, response.getExpirationTime())
+                .directFieldAsBase64(RESULT_FIELD, os -> writeResponse(response, os))
+                .endObject();
+
+            // do not close the buffer or the XContentBuilder until the IndexRequest is completed (i.e., listener is notified);
+            // otherwise, we underestimate the memory usage in case the circuit breaker does not use the real memory usage.
+            source.flush();
+            final IndexRequest indexRequest = new IndexRequest(index)
+                .create(true)
+                .id(docId)
+                .source(buffer.bytes(), source.contentType());
+            clientWithOrigin.index(indexRequest, listener);
+        } catch (Exception e) {
+            listener.onFailure(e);
+        }
+    }
+
     public void updateResponse(String docId,
                                Map<String, List<String>> responseHeaders,
                                R response,
                                ActionListener<UpdateResponse> listener) {
+        updateResponse(docId, responseHeaders, response, listener, false);
+    }
+
+    /**
+     * Stores the final response if the place-holder document is still present (update).
+     */
+    private void updateResponse(String docId,
+                                Map<String, List<String>> responseHeaders,
+                                R response,
+                                ActionListener<UpdateResponse> listener,
+                                boolean isFailure) {
         try {
-            final ReleasableBytesStreamOutput buffer = new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking());
+            final ReleasableBytesStreamOutput buffer = isFailure ?
+                new ReleasableBytesStreamOutput(0, bigArrays.withCircuitBreaking()) :
+                new ReleasableBytesStreamOutputWithLimit(0, bigArrays.withCircuitBreaking(), maxResponseSize);
             final XContentBuilder source = XContentFactory.jsonBuilder(buffer);
-            listener = ActionListener.runBefore(listener, source::close);
+            listener = ActionListener.runBefore(listener, buffer::close);
             source
                 .startObject()
                 .field(RESPONSE_HEADERS_FIELD, responseHeaders)
@@ -249,10 +307,42 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
                 .retryOnConflict(5);
             clientWithOrigin.update(request, listener);
         } catch (Exception e) {
-            listener.onFailure(e);
+            // even if we expect updating with a failure always succeed
+            // this is just an extra precaution not to create infinite loops
+            if (isFailure) {
+                listener.onFailure(e);
+            } else {
+                Throwable cause = ExceptionsHelper.unwrapCause(e);
+                if (cause instanceof DocumentMissingException == false && cause instanceof VersionConflictEngineException == false) {
+                    logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", docId), e);
+                    ActionListener<UpdateResponse> newListener = listener;
+                    updateStoredResponseWithFailure(
+                        docId,
+                        responseHeaders,
+                        response,
+                        e,
+                        // at end, we should report a failure to the listener
+                        ActionListener.wrap(() -> newListener.onFailure(e))
+                    );
+                } else {
+                    listener.onFailure(e);
+                }
+            }
         }
     }
 
+    /**
+     * Update the initial stored response with a failure
+     */
+    private void updateStoredResponseWithFailure(String docId,
+                                                 Map<String, List<String>> responseHeaders,
+                                                 R response,
+                                                 Exception updateException,
+                                                 ActionListener<UpdateResponse> listener) {
+        R failureResponse = response.convertToFailure(updateException);
+        updateResponse(docId, responseHeaders, failureResponse, listener, true);
+    }
+
     /**
      * Updates the expiration time of the provided <code>docId</code> if the place-holder
      * document is still present (update).
@@ -531,4 +621,22 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
             }
         }
     }
+
+    private static class ReleasableBytesStreamOutputWithLimit extends ReleasableBytesStreamOutput {
+        private final long limit;
+
+        ReleasableBytesStreamOutputWithLimit(int expectedSize, BigArrays bigArrays, long limit) {
+            super(expectedSize, bigArrays);
+            this.limit = limit;
+        }
+
+        @Override
+        protected void ensureCapacity(long offset) {
+            if (offset > limit) {
+                throw new IllegalArgumentException("Can't store an async search response larger than [" + limit + "] bytes. " +
+                    "This limit can be set by changing the [" + MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting.");
+            }
+            super.ensureCapacity(offset);
+        }
+    }
 }

+ 14 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncSearchResponse.java

@@ -206,4 +206,18 @@ public class AsyncSearchResponse extends ActionResponse implements StatusToXCont
         builder.endObject();
         return builder;
     }
+
+    @Override
+    public AsyncSearchResponse convertToFailure(Exception exc) {
+        exc.setStackTrace(new StackTraceElement[0]); // we don't need to store stack traces
+        return new AsyncSearchResponse(
+            id,
+            null,
+            exc,
+            isPartial,
+            false,
+            startTimeMillis,
+            expirationTimeMillis
+        );
+    }
 }

+ 72 - 5
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/async/AsyncSearchIndexServiceTests.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.core.async;
 
 import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.support.PlainActionFuture;
 import org.elasticsearch.action.update.UpdateResponse;
@@ -16,6 +17,7 @@ import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.indices.breaker.AllCircuitBreakerStats;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
@@ -29,6 +31,8 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.elasticsearch.search.SearchService.MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -39,15 +43,24 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
     public static class TestAsyncResponse implements AsyncResponse<TestAsyncResponse> {
         public final String test;
         public final long expirationTimeMillis;
+        public String failure;
 
         public TestAsyncResponse(String test, long expirationTimeMillis) {
             this.test = test;
             this.expirationTimeMillis = expirationTimeMillis;
         }
 
+        public TestAsyncResponse(String test, long expirationTimeMillis, String failure) {
+            this.test = test;
+            this.expirationTimeMillis = expirationTimeMillis;
+            this.failure = failure;
+        }
+
+
         public TestAsyncResponse(StreamInput input) throws IOException {
             test = input.readOptionalString();
             this.expirationTimeMillis = input.readLong();
+            failure = input.readOptionalString();
         }
 
         @Override
@@ -57,13 +70,14 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
 
         @Override
         public TestAsyncResponse withExpirationTime(long expirationTime) {
-            return new TestAsyncResponse(test, expirationTime);
+            return new TestAsyncResponse(test, expirationTime, failure);
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeOptionalString(test);
             out.writeLong(expirationTimeMillis);
+            out.writeOptionalString(failure);
         }
 
         @Override
@@ -72,21 +86,27 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
             if (o == null || getClass() != o.getClass()) return false;
             TestAsyncResponse that = (TestAsyncResponse) o;
             return expirationTimeMillis == that.expirationTimeMillis &&
-                Objects.equals(test, that.test);
+                Objects.equals(test, that.test) && Objects.equals(failure, that.failure);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(test, expirationTimeMillis);
+            return Objects.hash(test, expirationTimeMillis, failure);
         }
 
         @Override
         public String toString() {
             return "TestAsyncResponse{" +
                 "test='" + test + '\'' +
+                "failure='" + failure + '\'' +
                 ", expirationTimeMillis=" + expirationTimeMillis +
                 '}';
         }
+
+        @Override
+        public TestAsyncResponse convertToFailure(Exception exc) {
+            return new TestAsyncResponse(test, expirationTimeMillis, exc.getMessage());
+        }
     }
 
     @Before
@@ -221,7 +241,8 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
             TestAsyncResponse initialResponse = new TestAsyncResponse(testMessage, expirationTime);
             PlainActionFuture<IndexResponse> createFuture = new PlainActionFuture<>();
             indexService.createResponse(executionId.getDocId(), Map.of(), initialResponse, createFuture);
-            expectThrows(CircuitBreakingException.class, createFuture::actionGet);
+            CircuitBreakingException e = expectThrows(CircuitBreakingException.class, createFuture::actionGet);
+            assertEquals(0, e.getSuppressed().length); // no other suppressed exceptions
             assertThat(circuitBreaker.getUsed(), equalTo(0L));
         }
         {
@@ -261,7 +282,8 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
                 PlainActionFuture<UpdateResponse> updateFuture = new PlainActionFuture<>();
                 TestAsyncResponse updateResponse = new TestAsyncResponse(randomAlphaOfLength(100), randomLong());
                 indexService.updateResponse(executionId.getDocId(), Map.of(), updateResponse, updateFuture);
-                expectThrows(CircuitBreakingException.class, updateFuture::actionGet);
+                CircuitBreakingException e = expectThrows(CircuitBreakingException.class, updateFuture::actionGet);
+                assertEquals(0, e.getSuppressed().length); // no other suppressed exceptions
                 assertThat(circuitBreaker.getUsed(), equalTo(0L));
             }
             if (randomBoolean()) {
@@ -281,4 +303,49 @@ public class AsyncSearchIndexServiceTests extends ESSingleNodeTestCase {
             }
         }
     }
+
+    public void testMaxAsyncSearchResponseSize() throws Exception {
+        // successfully create an initial response
+        AsyncExecutionId executionId1 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()),
+            new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()));
+        TestAsyncResponse initialResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
+        PlainActionFuture<IndexResponse> createFuture1 = new PlainActionFuture<>();
+        indexService.createResponse(executionId1.getDocId(), Map.of(), initialResponse, createFuture1);
+        createFuture1.actionGet();
+
+        // setting very small limit for the max size of async search response
+        int limit = randomIntBetween(1, 125);
+        ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", limit + "b"));
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+        String expectedErrMsg = "Can't store an async search response larger than ["+ limit + "] bytes. " +
+            "This limit can be set by changing the [" + MAX_ASYNC_SEARCH_RESPONSE_SIZE_SETTING.getKey() + "] setting.";
+
+        // test that an update operation of the initial response fails
+        PlainActionFuture<UpdateResponse> updateFuture = new PlainActionFuture<>();
+        TestAsyncResponse updateResponse = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
+        indexService.updateResponse(executionId1.getDocId(), Map.of(), updateResponse, updateFuture);
+        IllegalArgumentException e = expectThrows(IllegalArgumentException.class, updateFuture::actionGet);
+        assertEquals(expectedErrMsg, e.getMessage());
+        assertEquals(0, e.getSuppressed().length); // no other suppressed exceptions
+        // test that the inital response is overwritten with a failure
+        PlainActionFuture<TestAsyncResponse> getFuture = new PlainActionFuture<>();
+        indexService.getResponse(executionId1, randomBoolean(), getFuture);
+        assertEquals(expectedErrMsg, getFuture.actionGet().failure);
+
+        // test that a create operation fails
+        AsyncExecutionId executionId2 = new AsyncExecutionId(Long.toString(randomNonNegativeLong()),
+            new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()));
+        PlainActionFuture<IndexResponse> createFuture = new PlainActionFuture<>();
+        TestAsyncResponse initialResponse2 = new TestAsyncResponse(randomAlphaOfLength(130), randomLong());
+        indexService.createResponse(executionId2.getDocId(), Map.of(), initialResponse2, createFuture);
+        IllegalArgumentException e2 = expectThrows(IllegalArgumentException.class, createFuture::actionGet);
+        assertEquals(expectedErrMsg, e2.getMessage());
+        assertEquals(0, e2.getSuppressed().length); // no other suppressed exceptions
+
+        // restoring limit
+        updateSettingsRequest = new ClusterUpdateSettingsRequest();
+        updateSettingsRequest.transientSettings(Settings.builder().put("search.max_async_search_response_size", (String) null));
+        assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());
+    }
 }

+ 1 - 1
x-pack/plugin/ql/src/main/java/org/elasticsearch/xpack/ql/async/AsyncTaskManagementService.java

@@ -199,7 +199,7 @@ public class AsyncTaskManagementService<Request extends TaskAwareRequest, Respon
 
     private void storeResults(T searchTask, StoredAsyncResponse<Response> storedResponse, ActionListener<Void> finalListener) {
         try {
-            asyncTaskIndexService.createResponse(searchTask.getExecutionId().getDocId(),
+            asyncTaskIndexService.createResponseForEQL(searchTask.getExecutionId().getDocId(),
                 searchTask.getOriginHeaders(), storedResponse, ActionListener.wrap(
                     // We should only unregister after the result is saved
                     resp -> {