Explorar o código

Preserve final response headers in asynchronous search (#54349)

This change adds the response headers of the original search request
in the stored response in order to be able to restore them when retrieving a result
from the async-search index. It also ensures that response headers are preserved for
users that retrieve a final response on a running search task.
Partial response can eventually return response headers too but this change only ensures
that they are present when the response if final.

Relates #33936
Jim Ferenczi %!s(int64=5) %!d(string=hai) anos
pai
achega
f3f1b6993c

+ 0 - 6
x-pack/plugin/async-search/build.gradle

@@ -1,9 +1,3 @@
-/*
- * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
- * or more contributor license agreements. Licensed under the Elastic License;
- * you may not use this file except in compliance with the Elastic License.
- */
-
 evaluationDependsOn(xpackModule('core'))
 
 apply plugin: 'elasticsearch.esplugin'

+ 27 - 0
x-pack/plugin/async-search/qa/rest/build.gradle

@@ -0,0 +1,27 @@
+import org.elasticsearch.gradle.info.BuildParams
+
+apply plugin: 'elasticsearch.testclusters'
+apply plugin: 'elasticsearch.esplugin'
+
+esplugin {
+  description 'Deprecated query plugin'
+  classname 'org.elasticsearch.query.DeprecatedQueryPlugin'
+}
+
+restResources {
+  restApi {
+    includeCore '_common', 'indices', 'index'
+    includeXpack 'async_search'
+  }
+}
+
+testClusters.integTest {
+  testDistribution = 'DEFAULT'
+  // add the deprecated query plugin
+  plugin file(project(':x-pack:plugin:async-search:qa:rest').tasks.bundlePlugin.archiveFile)
+  setting 'xpack.security.enabled', 'false'
+}
+
+test.enabled = false
+
+check.dependsOn integTest

+ 75 - 0
x-pack/plugin/async-search/qa/rest/src/main/java/org/elasticsearch/query/DeprecatedQueryBuilder.java

@@ -0,0 +1,75 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.query;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.lucene.search.MatchAllDocsQuery;
+import org.apache.lucene.search.Query;
+import org.elasticsearch.common.ParsingException;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.logging.DeprecationLogger;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.query.AbstractQueryBuilder;
+import org.elasticsearch.index.query.QueryShardContext;
+
+import java.io.IOException;
+
+public class DeprecatedQueryBuilder extends AbstractQueryBuilder<DeprecatedQueryBuilder> {
+    private static final DeprecationLogger deprecationLogger = new DeprecationLogger(LogManager.getLogger("Deprecated"));
+
+    public static final String NAME = "deprecated";
+
+    public DeprecatedQueryBuilder() {}
+
+    DeprecatedQueryBuilder(StreamInput in) throws IOException {
+        super(in);
+    }
+
+    @Override
+    protected void doWriteTo(StreamOutput out) {}
+
+    @Override
+    protected void doXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject(NAME);
+        builder.endObject();
+    }
+
+    private static final ObjectParser<DeprecatedQueryBuilder, Void> PARSER = new ObjectParser<>(NAME, DeprecatedQueryBuilder::new);
+
+    public static DeprecatedQueryBuilder fromXContent(XContentParser parser) {
+        try {
+            PARSER.apply(parser, null);
+            return new DeprecatedQueryBuilder();
+        } catch (IllegalArgumentException e) {
+            throw new ParsingException(parser.getTokenLocation(), e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected Query doToQuery(QueryShardContext context) {
+        deprecationLogger.deprecated("[deprecated] query");
+        return new MatchAllDocsQuery();
+    }
+
+    @Override
+    protected boolean doEquals(DeprecatedQueryBuilder other) {
+        return false;
+    }
+
+    @Override
+    protected int doHashCode() {
+        return 0;
+    }
+
+    @Override
+    public String getWriteableName() {
+        return NAME;
+    }
+}

+ 24 - 0
x-pack/plugin/async-search/qa/rest/src/main/java/org/elasticsearch/query/DeprecatedQueryPlugin.java

@@ -0,0 +1,24 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.query;
+
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.plugins.SearchPlugin;
+
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+
+public class DeprecatedQueryPlugin extends Plugin implements SearchPlugin {
+
+    public DeprecatedQueryPlugin() {}
+
+    @Override
+    public List<QuerySpec<?>> getQueries() {
+        return singletonList(new QuerySpec<>("deprecated", DeprecatedQueryBuilder::new, p -> DeprecatedQueryBuilder.fromXContent(p)));
+    }
+}

+ 23 - 0
x-pack/plugin/async-search/qa/rest/src/test/java/org/elasticsearch/qa/AsyncSearchRestIT.java

@@ -0,0 +1,23 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.qa;
+
+import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;
+import org.elasticsearch.test.rest.yaml.ClientYamlTestCandidate;
+import org.elasticsearch.test.rest.yaml.ESClientYamlSuiteTestCase;
+
+public class AsyncSearchRestIT extends ESClientYamlSuiteTestCase {
+
+    public AsyncSearchRestIT(final ClientYamlTestCandidate testCandidate) {
+        super(testCandidate);
+    }
+
+    @ParametersFactory
+    public static Iterable<Object[]> parameters() throws Exception {
+        return ESClientYamlSuiteTestCase.createParameters();
+    }
+}

+ 104 - 0
x-pack/plugin/async-search/qa/rest/src/test/resources/rest-api-spec/test/async-search/10_deprecation.yml

@@ -0,0 +1,104 @@
+setup:
+  - do:
+      indices.create:
+        index: test-1
+        body:
+          settings:
+            number_of_shards: "2"
+
+  - do:
+      indices.create:
+        index: test-2
+        body:
+          settings:
+            number_of_shards: "1"
+
+  - do:
+      indices.create:
+        index: test-3
+        body:
+          settings:
+            number_of_shards: "3"
+
+  - do:
+      index:
+        index:  test-2
+        body:   { max: 2 }
+
+  - do:
+      index:
+        index:  test-1
+        body:   { max: 1 }
+
+  - do:
+      index:
+        index:  test-3
+        body:   { max: 3 }
+
+  - do:
+      indices.refresh: {}
+
+---
+"Deprecation when retrieved from task":
+  - skip:
+      features: "warnings"
+
+  - do:
+      warnings:
+        - '[deprecated] query'
+      async_search.submit:
+        index: test-*
+        wait_for_completion_timeout: 10s
+        body:
+          query:
+            deprecated: {}
+
+  - is_false: id
+  - match:  { is_partial:                   false }
+  - length: { response.hits.hits:               3 }
+
+---
+"Deprecation when retrieved from store":
+  - skip:
+      features: "warnings"
+
+  - do:
+      warnings:
+        - '[deprecated] query'
+      async_search.submit:
+        index: test-*
+        wait_for_completion_timeout: 10s
+        keep_on_completion: true
+        body:
+          query:
+            deprecated: {}
+
+  - set:    { id:                              id }
+  - match:  { is_partial:                   false }
+  - length: { response.hits.hits:               3 }
+
+  - do:
+      warnings:
+        - '[deprecated] query'
+      async_search.get:
+        id: "$id"
+
+  - match:  { is_partial:                       false }
+  - length: { response.hits.hits:                   3 }
+
+  - do:
+      async_search.delete:
+        id: "$id"
+
+  - match: { acknowledged:   true }
+
+  - do:
+      catch: missing
+      async_search.get:
+        id: "$id"
+
+  - do:
+      catch: missing
+      async_search.delete:
+        id: "$id"
+

+ 28 - 0
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchIndexService.java

@@ -47,6 +47,7 @@ import java.nio.ByteBuffer;
 import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -63,6 +64,7 @@ class AsyncSearchIndexService {
     public static final String INDEX = ".async-search";
 
     public static final String HEADERS_FIELD = "headers";
+    public static final String RESPONSE_HEADERS_FIELD = "response_headers";
     public static final String EXPIRATION_TIME_FIELD = "expiration_time";
     public static final String RESULT_FIELD = "result";
 
@@ -86,6 +88,10 @@ class AsyncSearchIndexService {
                             .field("type", "object")
                             .field("enabled", "false")
                         .endObject()
+                        .startObject(RESPONSE_HEADERS_FIELD)
+                            .field("type", "object")
+                            .field("enabled", "false")
+                        .endObject()
                         .startObject(RESULT_FIELD)
                             .field("type", "object")
                             .field("enabled", "false")
@@ -172,9 +178,11 @@ class AsyncSearchIndexService {
      * Stores the final response if the place-holder document is still present (update).
      */
     void storeFinalResponse(String docId,
+                            Map<String, List<String>> responseHeaders,
                             AsyncSearchResponse response,
                             ActionListener<UpdateResponse> listener) throws IOException {
         Map<String, Object> source = new HashMap<>();
+        source.put(RESPONSE_HEADERS_FIELD, responseHeaders);
         source.put(RESULT_FIELD, encodeResponse(response));
         UpdateRequest request = new UpdateRequest()
             .index(INDEX)
@@ -249,8 +257,11 @@ class AsyncSearchIndexService {
     /**
      * Gets the response from the index if present, or delegate a {@link ResourceNotFoundException}
      * failure to the provided listener if not.
+     * When the provided <code>restoreResponseHeaders</code> is <code>true</code>, this method also restores the
+     * response headers of the original request in the current thread context.
      */
     void getResponse(AsyncSearchId searchId,
+                     boolean restoreResponseHeaders,
                      ActionListener<AsyncSearchResponse> listener) {
         final Authentication current = securityContext.getAuthentication();
         GetRequest internalGet = new GetRequest(INDEX)
@@ -271,6 +282,12 @@ class AsyncSearchIndexService {
                     return;
                 }
 
+                if (restoreResponseHeaders) {
+                    @SuppressWarnings("unchecked")
+                    Map<String, List<String>> responseHeaders = (Map<String, List<String>>) get.getSource().get(RESPONSE_HEADERS_FIELD);
+                    restoreResponseHeadersContext(securityContext.getThreadContext(), responseHeaders);
+                }
+
                 String encoded = (String) get.getSource().get(RESULT_FIELD);
                 listener.onResponse(encoded != null ? decodeResponse(encoded) : null);
             },
@@ -339,4 +356,15 @@ class AsyncSearchIndexService {
             }
         }
     }
+
+    /**
+     * Restores the provided <code>responseHeaders</code> to the current thread context.
+     */
+    static void restoreResponseHeadersContext(ThreadContext threadContext, Map<String, List<String>> responseHeaders) {
+        for (Map.Entry<String, List<String>> entry : responseHeaders.entrySet()) {
+            for (String value : entry.getValue()) {
+                threadContext.addResponseHeader(entry.getKey(), value);
+            }
+        }
+    }
 }

+ 25 - 8
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

@@ -61,7 +61,7 @@ final class AsyncSearchTask extends SearchTask {
     private volatile long expirationTimeMillis;
     private final AtomicBoolean isCancelling = new AtomicBoolean(false);
 
-    private AtomicReference<MutableSearchResponse> searchResponse;
+    private final AtomicReference<MutableSearchResponse> searchResponse = new AtomicReference<>();
 
     /**
      * Creates an instance of {@link AsyncSearchTask}.
@@ -98,7 +98,6 @@ final class AsyncSearchTask extends SearchTask {
         this.threadPool = threadPool;
         this.aggReduceContextSupplier = aggReduceContextSupplier;
         this.progressListener = new Listener();
-        this.searchResponse = new AtomicReference<>();
         setProgressListener(progressListener);
     }
 
@@ -183,7 +182,7 @@ final class AsyncSearchTask extends SearchTask {
             }
         }
         if (executeImmediately) {
-            listener.onResponse(getResponse());
+            listener.onResponse(getResponseWithHeaders());
         }
     }
 
@@ -201,7 +200,7 @@ final class AsyncSearchTask extends SearchTask {
             }
         }
         if (executeImmediately) {
-            listener.accept(getResponse());
+            listener.accept(getResponseWithHeaders());
         }
     }
 
@@ -221,7 +220,7 @@ final class AsyncSearchTask extends SearchTask {
                         if (hasRun.compareAndSet(false, true)) {
                             // timeout occurred before completion
                             removeCompletionListener(id);
-                            listener.onResponse(getResponse());
+                            listener.onResponse(getResponseWithHeaders());
                         }
                     }, waitForCompletion, "generic");
                 } catch (EsRejectedExecutionException exc) {
@@ -238,7 +237,7 @@ final class AsyncSearchTask extends SearchTask {
             }
         }
         if (executeImmediately) {
-            listener.onResponse(getResponse());
+            listener.onResponse(getResponseWithHeaders());
         }
     }
 
@@ -284,6 +283,8 @@ final class AsyncSearchTask extends SearchTask {
             }
             hasCompleted = true;
         }
+        // we don't need to restore the response headers, they should be included in the current
+        // context since we are called by the search action listener.
         AsyncSearchResponse finalResponse = getResponse();
         for (Consumer<AsyncSearchResponse> listener : completionListeners.values()) {
             listener.accept(finalResponse);
@@ -291,11 +292,25 @@ final class AsyncSearchTask extends SearchTask {
         completionListeners.clear();
     }
 
+    /**
+     * Returns the current {@link AsyncSearchResponse}.
+     */
     private AsyncSearchResponse getResponse() {
         assert searchResponse.get() != null;
         return searchResponse.get().toAsyncSearchResponse(this, expirationTimeMillis);
     }
 
+    /**
+     * Returns the current {@link AsyncSearchResponse} and restores the response headers
+     * in the local thread context.
+     */
+    private AsyncSearchResponse getResponseWithHeaders() {
+        assert searchResponse.get() != null;
+        return searchResponse.get().toAsyncSearchResponseWithHeaders(this, expirationTimeMillis);
+    }
+
+
+
     // checks if the search task should be cancelled
     private void checkCancellation() {
         long now = System.currentTimeMillis();
@@ -336,7 +351,8 @@ final class AsyncSearchTask extends SearchTask {
             // best effort to cancel expired tasks
             checkCancellation();
             searchResponse.compareAndSet(null,
-                new MutableSearchResponse(shards.size() + skipped.size(), skipped.size(), clusters, aggReduceContextSupplier));
+                new MutableSearchResponse(shards.size() + skipped.size(), skipped.size(), clusters,
+                    aggReduceContextSupplier, threadPool.getThreadContext()));
             executeInitListeners();
         }
 
@@ -369,7 +385,8 @@ final class AsyncSearchTask extends SearchTask {
             if (searchResponse.get() == null) {
                 // if the failure occurred before calling onListShards
                 searchResponse.compareAndSet(null,
-                    new MutableSearchResponse(-1, -1, null, aggReduceContextSupplier));
+                    new MutableSearchResponse(-1, -1, null,
+                        aggReduceContextSupplier, threadPool.getThreadContext()));
             }
             searchResponse.get().updateWithFailure(exc);
             executeInitListeners();

+ 30 - 2
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.search.SearchResponseSections;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AtomicArray;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.InternalAggregation;
 import org.elasticsearch.search.aggregations.InternalAggregations;
@@ -21,11 +22,13 @@ import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Supplier;
 
 import static java.util.Collections.singletonList;
 import static org.apache.lucene.search.TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO;
 import static org.elasticsearch.search.aggregations.InternalAggregations.topLevelReduce;
+import static org.elasticsearch.xpack.search.AsyncSearchIndexService.restoreResponseHeadersContext;
 
 /**
  * A mutable search response that allows to update and create partial response synchronously.
@@ -39,12 +42,14 @@ class MutableSearchResponse {
     private final Clusters clusters;
     private final AtomicArray<ShardSearchFailure> shardFailures;
     private final Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier;
+    private final ThreadContext threadContext;
 
     private boolean isPartial;
     private boolean isFinalReduce;
     private int successfulShards;
     private SearchResponseSections sections;
     private ElasticsearchException failure;
+    private Map<String, List<String>> responseHeaders;
 
     private boolean frozen;
 
@@ -55,15 +60,20 @@ class MutableSearchResponse {
      * @param skippedShards The number of skipped shards, or -1 to indicate a failure.
      * @param clusters The remote clusters statistics.
      * @param aggReduceContextSupplier A supplier to run final reduce on partial aggregations.
+     * @param threadContext The thread context to retrieve the final response headers.
      */
-    MutableSearchResponse(int totalShards, int skippedShards, Clusters clusters,
-            Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier) {
+    MutableSearchResponse(int totalShards,
+                          int skippedShards,
+                          Clusters clusters,
+                          Supplier<InternalAggregation.ReduceContext> aggReduceContextSupplier,
+                          ThreadContext threadContext) {
         this.totalShards = totalShards;
         this.skippedShards = skippedShards;
         this.clusters = clusters;
         this.aggReduceContextSupplier = aggReduceContextSupplier;
         this.shardFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
         this.isPartial = true;
+        this.threadContext = threadContext;
         this.sections = totalShards == -1 ? null : new InternalSearchResponse(
             new SearchHits(SearchHits.EMPTY, new TotalHits(0, GREATER_THAN_OR_EQUAL_TO), Float.NaN),
             null, null, null, false, null, 0);
@@ -93,6 +103,8 @@ class MutableSearchResponse {
      */
     synchronized void updateFinalResponse(int successfulShards, SearchResponseSections newSections) {
         failIfFrozen();
+        // copy the response headers from the current context
+        this.responseHeaders = threadContext.getResponseHeaders();
         this.successfulShards = successfulShards;
         this.sections = newSections;
         this.isPartial = false;
@@ -106,6 +118,8 @@ class MutableSearchResponse {
      */
     synchronized void updateWithFailure(Exception exc) {
         failIfFrozen();
+        // copy the response headers from the current context
+        this.responseHeaders = threadContext.getResponseHeaders();
         this.isPartial = true;
         this.failure = ElasticsearchException.guessRootCauses(exc)[0];
         this.frozen = true;
@@ -146,6 +160,20 @@ class MutableSearchResponse {
             frozen == false, task.getStartTime(), expirationTime);
     }
 
+    /**
+     * Creates an {@link AsyncSearchResponse} based on the current state of the mutable response.
+     * This method also restores the response headers in the current thread context if the final response is available.
+     */
+    synchronized AsyncSearchResponse toAsyncSearchResponseWithHeaders(AsyncSearchTask task, long expirationTime) {
+        AsyncSearchResponse resp = toAsyncSearchResponse(task, expirationTime);
+        if (responseHeaders != null) {
+            restoreResponseHeadersContext(threadContext, responseHeaders);
+        }
+        return resp;
+    }
+
+
+
     private void failIfFrozen() {
         if (frozen) {
             throw new IllegalStateException("invalid update received after the completion of the request");

+ 2 - 1
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportDeleteAsyncSearchAction.java

@@ -66,7 +66,8 @@ public class TransportDeleteAsyncSearchAction extends HandledTransportAction<Del
             // the task is not running anymore so we throw a not found exception if
             // the search id is also not present in the index (already deleted) or if the user
             // is not allowed to access it.
-            store.getResponse(searchId, ActionListener.wrap(res -> store.deleteResponse(searchId, true, listener), listener::onFailure));
+            store.getResponse(searchId, false,
+                ActionListener.wrap(res -> store.deleteResponse(searchId, true, listener), listener::onFailure));
         }
     }
 }

+ 2 - 1
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java

@@ -116,7 +116,8 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsy
                                            GetAsyncSearchAction.Request request,
                                            long nowInMillis,
                                            ActionListener<AsyncSearchResponse> listener) {
-        store.getResponse(searchId, new ActionListener<>() {
+        store.getResponse(searchId, true,
+            new ActionListener<>() {
                 @Override
                 public void onResponse(AsyncSearchResponse response) {
                     sendFinalResponse(request, response, nowInMillis, listener);

+ 20 - 16
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

@@ -23,6 +23,7 @@ import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.index.engine.DocumentMissingException;
 import org.elasticsearch.search.SearchService;
 import org.elasticsearch.search.aggregations.InternalAggregation;
@@ -45,6 +46,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
     private final NodeClient nodeClient;
     private final Function<SearchRequest, InternalAggregation.ReduceContext> requestToAggReduceContextBuilder;
     private final TransportSearchAction searchAction;
+    private final ThreadContext threadContext;
     private final AsyncSearchIndexService store;
 
     @Inject
@@ -60,7 +62,8 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
         this.nodeClient = nodeClient;
         this.requestToAggReduceContextBuilder = request -> searchService.aggReduceContextBuilder(request).forFinalReduction();
         this.searchAction = searchAction;
-        this.store = new AsyncSearchIndexService(clusterService, transportService.getThreadPool().getThreadContext(), client, registry);
+        this.threadContext = transportService.getThreadPool().getThreadContext();
+        this.store = new AsyncSearchIndexService(clusterService, threadContext, client, registry);
     }
 
     @Override
@@ -179,23 +182,24 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
         }
 
         try {
-            store.storeFinalResponse(searchTask.getSearchId().getDocId(), response, new ActionListener<>() {
-                @Override
-                public void onResponse(UpdateResponse updateResponse) {
-                    taskManager.unregister(searchTask);
-                    nextAction.run();
-                }
+            store.storeFinalResponse(searchTask.getSearchId().getDocId(), threadContext.getResponseHeaders(), response,
+                new ActionListener<>() {
+                    @Override
+                    public void onResponse(UpdateResponse updateResponse) {
+                        taskManager.unregister(searchTask);
+                        nextAction.run();
+                    }
 
-                @Override
-                public void onFailure(Exception exc) {
-                    if (exc.getCause() instanceof DocumentMissingException == false) {
-                        logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
-                            searchTask.getSearchId().getEncoded()), exc);
+                    @Override
+                    public void onFailure(Exception exc) {
+                        if (exc.getCause() instanceof DocumentMissingException == false) {
+                            logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
+                                searchTask.getSearchId().getEncoded()), exc);
+                        }
+                        taskManager.unregister(searchTask);
+                        nextAction.run();
                     }
-                    taskManager.unregister(searchTask);
-                    nextAction.run();
-                }
-            });
+                });
         } catch (Exception exc) {
             logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]", searchTask.getSearchId().getEncoded()), exc);
             taskManager.unregister(searchTask);