Browse Source

Async search status (#62947)

Introduce async search status API

GET /_async_search/status/<id>

The API is restricted to the monitoring_user role.

For a running async search, the response is:

```js
{
  "id" : <id>,
  "is_running" : true,
  "is_partial" : true,
  "start_time_in_millis" : 1583945890986,
  "expiration_time_in_millis" : 1584377890986,
  "_shards" : {
      "total" : 562,
      "successful" : 188,
      "skipped" : 0,
      "failed" : 0
  }
}
```

For a completed async search, an additional
`completion_status` fields is added.

```js
{
  "id" : <id>,
  "is_running" : false,
  "is_partial" : false,
  "start_time_in_millis" : 1583945890986,
  "expiration_time_in_millis" : 1584377890986,
  "_shards" : {
      "total" : 562,
      "successful" : 562,
      "skipped" : 0,
      "failed" : 0
  },
 "completion_status" : 200
}
```

Closes #57537
Mayya Sharipova 5 years ago
parent
commit
074f7d2e8a
15 changed files with 741 additions and 5 deletions
  1. 84 3
      docs/reference/search/async-search.asciidoc
  2. 17 0
      server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java
  3. 41 0
      x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java
  4. 7 0
      x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java
  5. 4 1
      x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java
  6. 10 0
      x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java
  7. 56 1
      x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java
  8. 38 0
      x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncStatusAction.java
  9. 111 0
      x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java
  10. 33 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java
  11. 65 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/GetAsyncStatusRequest.java
  12. 220 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java
  13. 17 0
      x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncStatusAction.java
  14. 25 0
      x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.status.json
  15. 13 0
      x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml

+ 84 - 3
docs/reference/search/async-search.asciidoc

@@ -138,7 +138,7 @@ set to `false`.
 ==== Get async search
 
 The get async search API retrieves the results of a previously submitted
-async search request given its id. If the {es} {security-features} are enabled.
+async search request given its id. If the {es} {security-features} are enabled,
 the access to the results of a specific async search is restricted to the user
 that submitted it in the first place.
 
@@ -161,8 +161,8 @@ GET /_async_search/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsd
     "timed_out" : false,
     "num_reduce_phases" : 46, <4>
     "_shards" : {
-      "total" : 562, <5>
-      "successful" : 188,
+      "total" : 562,
+      "successful" : 188, <5>
       "skipped" : 0,
       "failed" : 0
     },
@@ -222,6 +222,87 @@ override such value and extend the validity of the request. When this period
 expires, the search, if still running, is cancelled. If the search is
 completed, its saved results are deleted.
 
+
+[[get-async-search-status]]
+==== Get async search status
+The get async search status API, without retrieving search results, shows
+only the status of a previously submitted async search request given its `id`.
+If the {es} {security-features} are enabled, the access to the get async
+search status API is restricted to the
+<<built-in-roles, monitoring_user role>>.
+
+[source,console,id=get-async-search-status-example]
+--------------------------------------------------
+GET /_async_search/status/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=
+--------------------------------------------------
+// TEST[continued s/FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=/\${body.id}/]
+
+[source,console-result]
+--------------------------------------------------
+{
+  "id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
+  "is_running" : true,
+  "is_partial" : true,
+  "start_time_in_millis" : 1583945890986,
+  "expiration_time_in_millis" : 1584377890986,
+  "_shards" : {
+      "total" : 562,
+      "successful" : 188, <1>
+      "skipped" : 0,
+      "failed" : 0
+  }
+}
+--------------------------------------------------
+// TEST[skip: a sample output of a status of a running async search]
+
+<1> Indicates how many shards have executed the query so far.
+
+For an async search that has been completed, the status response has
+an additional `completion_status` field that shows the status
+code of the completed async search.
+[source,console-result]
+--------------------------------------------------
+{
+  "id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
+  "is_running" : false,
+  "is_partial" : false,
+  "start_time_in_millis" : 1583945890986,
+  "expiration_time_in_millis" : 1584377890986,
+  "_shards" : {
+      "total" : 562,
+      "successful" : 562,
+      "skipped" : 0,
+      "failed" : 0
+  },
+ "completion_status" : 200 <1>
+}
+--------------------------------------------------
+// TEST[skip: a sample output of a status of a completed async search]
+
+<1> Indicates that the async search was successfully completed
+
+
+[source,console-result]
+--------------------------------------------------
+{
+  "id" : "FmRldE8zREVEUzA2ZVpUeGs2ejJFUFEaMkZ5QTVrSTZSaVN3WlNFVmtlWHJsdzoxMDc=",
+  "is_running" : false,
+  "is_partial" : true,
+  "start_time_in_millis" : 1583945890986,
+  "expiration_time_in_millis" : 1584377890986,
+  "_shards" : {
+      "total" : 562,
+      "successful" : 450,
+      "skipped" : 0,
+      "failed" : 112
+  },
+ "completion_status" : 503 <1>
+}
+--------------------------------------------------
+// TEST[skip: a sample output of a status of a completed async search]
+
+<1> Indicates that the async search was completed with an error
+
 [[delete-async-search]]
 ==== Delete async search
 

+ 17 - 0
server/src/main/java/org/elasticsearch/common/util/concurrent/AtomicArray.java

@@ -45,6 +45,23 @@ public class AtomicArray<E> {
         return array.length();
     }
 
+    /**
+     * Returns the size of the expected results, excluding potential null values.
+     * @return the number of non-null elements
+     */
+    public int nonNullLength() {
+        if (nonNullList != null) {
+            return nonNullList.size();
+        }
+        int count = 0;
+        for (int i = 0; i < array.length(); i++) {
+            if (array.get(i) != null) {
+                count++;
+            }
+        }
+        return count;
+    }
+
     /**
      * Sets the element at position {@code i} to the given value.
      *

+ 41 - 0
x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java

@@ -21,6 +21,7 @@ import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.test.ESIntegTestCase.SuiteScopeTestCase;
 import org.elasticsearch.xpack.core.XPackPlugin;
 import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
+import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
 import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchRequest;
 
 import java.util.ArrayList;
@@ -188,10 +189,19 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
         }
         ensureTaskCompletion(initial.getId());
         restartTaskNode(initial.getId(), indexName);
+
         AsyncSearchResponse response = getAsyncSearch(initial.getId());
         assertNotNull(response.getSearchResponse());
         assertFalse(response.isRunning());
         assertFalse(response.isPartial());
+
+        AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
+        assertFalse(statusResponse.isRunning());
+        assertFalse(statusResponse.isPartial());
+        assertEquals(numShards, statusResponse.getTotalShards());
+        assertEquals(numShards, statusResponse.getSuccessfulShards());
+        assertEquals(RestStatus.OK, statusResponse.getCompletionStatus());
+
         deleteAsyncSearch(response.getId());
         ensureTaskRemoval(response.getId());
     }
@@ -232,6 +242,15 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
         assertTrue(response.isPartial());
         assertThat(response.getSearchResponse().getTotalShards(), equalTo(numShards));
         assertThat(response.getSearchResponse().getShardFailures().length, equalTo(numShards));
+
+        AsyncStatusResponse statusResponse = getAsyncStatus(initial.getId());
+        assertFalse(statusResponse.isRunning());
+        assertTrue(statusResponse.isPartial());
+        assertEquals(numShards, statusResponse.getTotalShards());
+        assertEquals(0, statusResponse.getSuccessfulShards());
+        assertEquals(numShards, statusResponse.getFailedShards());
+        assertThat(statusResponse.getCompletionStatus().getStatus(), greaterThanOrEqualTo(400));
+
         deleteAsyncSearch(initial.getId());
         ensureTaskRemoval(initial.getId());
     }
@@ -248,6 +267,10 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
             }
             assertFalse(response.isRunning());
         }
+
+        ExecutionException exc = expectThrows(ExecutionException.class, () -> getAsyncStatus("invalid"));
+        assertThat(exc.getCause(), instanceOf(IllegalArgumentException.class));
+        assertThat(exc.getMessage(), containsString("invalid id"));
     }
 
     public void testNoIndex() throws Exception {
@@ -289,6 +312,13 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
         assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
         assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
 
+        AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+        assertTrue(statusResponse.isRunning());
+        assertEquals(numShards, statusResponse.getTotalShards());
+        assertEquals(0, statusResponse.getSuccessfulShards());
+        assertEquals(0, statusResponse.getSkippedShards());
+        assertEquals(0, statusResponse.getFailedShards());
+
         deleteAsyncSearch(response.getId());
         ensureTaskRemoval(response.getId());
     }
@@ -323,6 +353,17 @@ public class AsyncSearchActionIT extends AsyncSearchIntegTestCase {
         assertThat(response.getSearchResponse().getSuccessfulShards(), equalTo(0));
         assertThat(response.getSearchResponse().getFailedShards(), equalTo(0));
 
+        AsyncStatusResponse statusResponse = getAsyncStatus(response.getId());
+        assertTrue(statusResponse.isRunning());
+        assertTrue(statusResponse.isPartial());
+        assertThat(statusResponse.getExpirationTime(), greaterThan(expirationTime));
+        assertThat(statusResponse.getStartTime(), lessThan(statusResponse.getExpirationTime()));
+        assertEquals(numShards, statusResponse.getTotalShards());
+        assertEquals(0, statusResponse.getSuccessfulShards());
+        assertEquals(0, statusResponse.getFailedShards());
+        assertEquals(0, statusResponse.getSkippedShards());
+        assertEquals(null, statusResponse.getCompletionStatus());
+
         response = getAsyncSearch(response.getId(), TimeValue.timeValueMillis(1));
         assertThat(response.getExpirationTime(), lessThan(expirationTime));
         ensureTaskNotRunning(response.getId());

+ 7 - 0
x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchIntegTestCase.java

@@ -36,10 +36,13 @@ import org.elasticsearch.xpack.core.async.AsyncTaskMaintenanceService;
 import org.elasticsearch.xpack.core.async.DeleteAsyncResultAction;
 import org.elasticsearch.xpack.core.async.DeleteAsyncResultRequest;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
+import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
 import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
+import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
 import org.elasticsearch.xpack.core.search.action.ClosePointInTimeAction;
 import org.elasticsearch.xpack.core.search.action.ClosePointInTimeRequest;
 import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
+import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
 import org.elasticsearch.xpack.core.search.action.OpenPointInTimeAction;
 import org.elasticsearch.xpack.core.search.action.OpenPointInTimeRequest;
 import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
@@ -154,6 +157,10 @@ public abstract class AsyncSearchIntegTestCase extends ESIntegTestCase {
         return client().execute(GetAsyncSearchAction.INSTANCE, new GetAsyncResultRequest(id).setKeepAlive(keepAlive)).get();
     }
 
+    protected AsyncStatusResponse getAsyncStatus(String id) throws ExecutionException, InterruptedException {
+        return client().execute(GetAsyncStatusAction.INSTANCE, new GetAsyncStatusRequest(id)).get();
+    }
+
     protected AcknowledgedResponse deleteAsyncSearch(String id) throws ExecutionException, InterruptedException {
         return client().execute(DeleteAsyncResultAction.INSTANCE, new DeleteAsyncResultRequest(id)).get();
     }

+ 4 - 1
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearch.java

@@ -19,6 +19,7 @@ import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.rest.RestController;
 import org.elasticsearch.rest.RestHandler;
 import org.elasticsearch.xpack.core.search.action.GetAsyncSearchAction;
+import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
 import org.elasticsearch.xpack.core.search.action.SubmitAsyncSearchAction;
 
 import java.util.Arrays;
@@ -34,7 +35,8 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
     public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
         return Arrays.asList(
             new ActionHandler<>(SubmitAsyncSearchAction.INSTANCE, TransportSubmitAsyncSearchAction.class),
-            new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class)
+            new ActionHandler<>(GetAsyncSearchAction.INSTANCE, TransportGetAsyncSearchAction.class),
+            new ActionHandler<>(GetAsyncStatusAction.INSTANCE, TransportGetAsyncStatusAction.class)
         );
     }
 
@@ -46,6 +48,7 @@ public final class AsyncSearch extends Plugin implements ActionPlugin {
         return Arrays.asList(
             new RestSubmitAsyncSearchAction(),
             new RestGetAsyncSearchAction(),
+            new RestGetAsyncStatusAction(),
             new RestDeleteAsyncSearchAction()
         );
     }

+ 10 - 0
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java

@@ -31,6 +31,7 @@ import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.xpack.core.async.AsyncExecutionId;
 import org.elasticsearch.xpack.core.async.AsyncTask;
 import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
+import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -347,6 +348,15 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask {
         }
     }
 
+    /**
+     * Returns the status of {@link AsyncSearchTask}
+     */
+    public AsyncStatusResponse getStatusResponse() {
+        MutableSearchResponse mutableSearchResponse = searchResponse.get();
+        assert mutableSearchResponse != null;
+        return mutableSearchResponse.toStatusResponse(searchId.getEncoded(), getStartTime(), expirationTimeMillis);
+    }
+
     class Listener extends SearchProgressActionListener {
         @Override
         protected void onQueryResult(int shardIndex) {

+ 56 - 1
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java

@@ -7,6 +7,7 @@ package org.elasticsearch.xpack.search;
 
 import org.apache.lucene.search.TotalHits;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchResponse.Clusters;
 import org.elasticsearch.action.search.ShardSearchFailure;
@@ -17,6 +18,7 @@ import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.InternalAggregations;
 import org.elasticsearch.search.internal.InternalSearchResponse;
 import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
+import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -32,6 +34,7 @@ import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.restoreRe
  * run concurrently to 1 and ensures that we pause the search progress when an {@link AsyncSearchResponse} is built.
  */
 class MutableSearchResponse {
+    private static final TotalHits EMPTY_TOTAL_HITS = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
     private final int totalShards;
     private final int skippedShards;
     private final Clusters clusters;
@@ -77,7 +80,7 @@ class MutableSearchResponse {
         this.queryFailures = totalShards == -1 ? null : new AtomicArray<>(totalShards-skippedShards);
         this.isPartial = true;
         this.threadContext = threadContext;
-        this.totalHits = new TotalHits(0L, TotalHits.Relation.GREATER_THAN_OR_EQUAL_TO);
+        this.totalHits = EMPTY_TOTAL_HITS;
     }
 
     /**
@@ -184,6 +187,58 @@ class MutableSearchResponse {
             failure, isPartial, frozen == false, task.getStartTime(), expirationTime);
     }
 
+
+    /**
+     * Creates an {@link AsyncStatusResponse} -- status of an async response.
+     * Response is created based on the current state of the mutable response or based on {@code finalResponse} if it is available.
+     * @param asyncExecutionId – id of async search request
+     * @param startTime – start time of task
+     * @param expirationTime – expiration time of async search request
+     * @return response representing the status of async search
+     */
+    synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) {
+        if (finalResponse != null) {
+            return new AsyncStatusResponse(
+                asyncExecutionId,
+                false,
+                false,
+                startTime,
+                expirationTime,
+                finalResponse.getTotalShards(),
+                finalResponse.getSuccessfulShards(),
+                finalResponse.getSkippedShards(),
+                finalResponse.getShardFailures() != null ? finalResponse.getShardFailures().length : 0,
+                finalResponse.status()
+            );
+        }
+        if (failure != null) {
+            return new AsyncStatusResponse(
+                asyncExecutionId,
+                false,
+                true,
+                startTime,
+                expirationTime,
+                totalShards,
+                successfulShards,
+                skippedShards,
+                queryFailures == null ? 0 : queryFailures.nonNullLength(),
+                ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure))
+            );
+        }
+        return new AsyncStatusResponse(
+            asyncExecutionId,
+            true,
+            true,
+            startTime,
+            expirationTime,
+            totalShards,
+            successfulShards,
+            skippedShards,
+            queryFailures == null ? 0 : queryFailures.nonNullLength(),
+            null  // for a still running search, completion status is null
+        );
+    }
+
     synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task,
                                                            long expirationTime,
                                                            ElasticsearchException reduceException) {

+ 38 - 0
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/RestGetAsyncStatusAction.java

@@ -0,0 +1,38 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.search;
+
+import org.elasticsearch.client.node.NodeClient;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestStatusToXContentListener;
+import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
+import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
+
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.unmodifiableList;
+import static org.elasticsearch.rest.RestRequest.Method.GET;
+
+public class RestGetAsyncStatusAction extends BaseRestHandler  {
+    @Override
+    public List<Route> routes() {
+        return unmodifiableList(asList(new Route(GET, "/_async_search/status/{id}")));
+    }
+
+
+    @Override
+    public String getName() {
+        return "async_search_status_action";
+    }
+
+    @Override
+    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {
+        GetAsyncStatusRequest statusRequest = new GetAsyncStatusRequest(request.param("id"));
+        return channel -> client.execute(GetAsyncStatusAction.INSTANCE, statusRequest, new RestStatusToXContentListener<>(channel));
+    }
+}

+ 111 - 0
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java

@@ -0,0 +1,111 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.search;
+
+import org.elasticsearch.ResourceNotFoundException;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionListenerResponseHandler;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.HandledTransportAction;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportRequestOptions;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.async.AsyncExecutionId;
+import org.elasticsearch.xpack.core.async.AsyncTask;
+import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
+import org.elasticsearch.xpack.core.async.GetAsyncStatusRequest;
+import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
+import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
+import org.elasticsearch.xpack.core.search.action.GetAsyncStatusAction;
+
+import java.util.Objects;
+
+import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
+
+public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsyncStatusRequest, AsyncStatusResponse> {
+    private final TransportService transportService;
+    private final ClusterService clusterService;
+    private final AsyncTaskIndexService<AsyncSearchResponse> store;
+
+    @Inject
+    public TransportGetAsyncStatusAction(TransportService transportService,
+             ActionFilters actionFilters,
+             ClusterService clusterService,
+             NamedWriteableRegistry registry,
+             Client client,
+             ThreadPool threadPool) {
+        super(GetAsyncStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new);
+        this.transportService = transportService;
+        this.clusterService = clusterService;
+        this.store = new AsyncTaskIndexService<>(XPackPlugin.ASYNC_RESULTS_INDEX, clusterService,
+            threadPool.getThreadContext(), client, ASYNC_SEARCH_ORIGIN, AsyncSearchResponse::new, registry);
+    }
+
+    @Override
+    protected void doExecute(Task task, GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
+        AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
+        DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
+        if (node == null || Objects.equals(node, clusterService.localNode())) {
+            retrieveStatus(request, listener);
+        } else {
+            TransportRequestOptions.Builder builder = TransportRequestOptions.builder();
+            transportService.sendRequest(node, GetAsyncStatusAction.NAME, request, builder.build(),
+                new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, ThreadPool.Names.SAME));
+        }
+    }
+
+    private void retrieveStatus(GetAsyncStatusRequest request, ActionListener<AsyncStatusResponse> listener) {
+        long nowInMillis = System.currentTimeMillis();
+        AsyncExecutionId searchId = AsyncExecutionId.decode(request.getId());
+        try {
+            AsyncTask task = (AsyncTask) taskManager.getTask(searchId.getTaskId().getId());
+            if ((task instanceof AsyncSearchTask) && (task.getExecutionId().equals(searchId))) {
+                AsyncStatusResponse response = ((AsyncSearchTask) task).getStatusResponse();
+                sendFinalResponse(request, response, nowInMillis, listener);
+            } else {
+                getStatusResponseFromIndex(searchId, request, nowInMillis, listener);
+            }
+        } catch (Exception exc) {
+            listener.onFailure(exc);
+        }
+    }
+
+    /**
+     * Get a status response from index
+     */
+    private void getStatusResponseFromIndex(AsyncExecutionId searchId,
+            GetAsyncStatusRequest request, long nowInMillis, ActionListener<AsyncStatusResponse> listener) {
+        store.getStatusResponse(searchId, AsyncStatusResponse::getStatusFromAsyncSearchResponseWithExpirationTime,
+            new ActionListener<>() {
+                @Override
+                public void onResponse(AsyncStatusResponse asyncStatusResponse) {
+                    sendFinalResponse(request, asyncStatusResponse, nowInMillis, listener);
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    listener.onFailure(e);
+                }
+            }
+        );
+    }
+
+    private static void sendFinalResponse(GetAsyncStatusRequest request,
+            AsyncStatusResponse response, long nowInMillis, ActionListener<AsyncStatusResponse> listener) {
+        if (response.getExpirationTime() < nowInMillis) { // check if the result has expired
+            listener.onFailure(new ResourceNotFoundException(request.getId()));
+        } else {
+            listener.onResponse(response);
+        }
+    }
+}

+ 33 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/AsyncTaskIndexService.java

@@ -38,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.xpack.core.search.action.AsyncSearchResponse;
+import org.elasticsearch.xpack.core.search.action.AsyncStatusResponse;
 import org.elasticsearch.xpack.core.security.SecurityContext;
 import org.elasticsearch.xpack.core.security.authc.Authentication;
 import org.elasticsearch.xpack.core.security.authc.support.AuthenticationContextSerializer;
@@ -49,6 +50,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.BiFunction;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
@@ -309,6 +311,37 @@ public final class AsyncTaskIndexService<R extends AsyncResponse<R>> {
         ));
     }
 
+
+    /**
+     * Gets the status response of the async search from the index
+     * @param asyncExecutionId – id of the async search
+     * @param statusProducer – a producer of the status from the stored async search response and expirationTime
+     * @param listener – listener to report result to
+     */
+    public void getStatusResponse(
+        AsyncExecutionId asyncExecutionId,
+            BiFunction<R, Long, AsyncStatusResponse> statusProducer, ActionListener<AsyncStatusResponse> listener) {
+        GetRequest internalGet = new GetRequest(index)
+            .preference(asyncExecutionId.getEncoded())
+            .id(asyncExecutionId.getDocId());
+        client.get(internalGet, ActionListener.wrap(
+            get -> {
+                if (get.isExists() == false) {
+                    listener.onFailure(new ResourceNotFoundException(asyncExecutionId.getEncoded()));
+                    return;
+                }
+                String encoded = (String) get.getSource().get(RESULT_FIELD);
+                if (encoded != null) {
+                    Long expirationTime = (Long) get.getSource().get(EXPIRATION_TIME_FIELD);
+                    listener.onResponse(statusProducer.apply(decodeResponse(encoded), expirationTime));
+                } else {
+                    listener.onResponse(null);
+                }
+            },
+            listener::onFailure
+        ));
+    }
+
     /**
      * Ensures that the current user can read the specified response without actually reading it
      */

+ 65 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/async/GetAsyncStatusRequest.java

@@ -0,0 +1,65 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.async;
+
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.ActionRequestValidationException;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * A request class to get a status update of the async search request
+ */
+public class GetAsyncStatusRequest extends ActionRequest {
+    private final String id;
+
+    /**
+     * Creates a new request
+     * @param id The id of the search progress request.
+     */
+    public GetAsyncStatusRequest(String id) {
+        this.id = id;
+    }
+
+    public GetAsyncStatusRequest(StreamInput in) throws IOException {
+        super(in);
+        this.id = in.readString();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeString(id);
+    }
+
+    @Override
+    public ActionRequestValidationException validate() {
+        return null;
+    }
+
+    /**
+     * Returns the id of the async search.
+     */
+    public String getId() {
+        return id;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        GetAsyncStatusRequest request = (GetAsyncStatusRequest) o;
+        return Objects.equals(id, request.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
+}

+ 220 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/AsyncStatusResponse.java

@@ -0,0 +1,220 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.search.action;
+
+import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.ActionResponse;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.StatusToXContentObject;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.action.RestActions;
+
+import java.io.IOException;
+
+import static org.elasticsearch.rest.RestStatus.OK;
+
+/**
+ * A response of an async search request.
+ */
+public class AsyncStatusResponse extends ActionResponse implements StatusToXContentObject {
+    private final String id;
+    private final boolean isRunning;
+    private final boolean isPartial;
+    private final long startTimeMillis;
+    private final long expirationTimeMillis;
+    private final int totalShards;
+    private final int successfulShards;
+    private final int skippedShards;
+    private final int failedShards;
+    private final RestStatus completionStatus;
+
+    public AsyncStatusResponse(String id,
+            boolean isRunning,
+            boolean isPartial,
+            long startTimeMillis,
+            long expirationTimeMillis,
+            int totalShards,
+            int successfulShards,
+            int skippedShards,
+            int failedShards,
+            RestStatus completionStatus) {
+        this.id = id;
+        this.isRunning = isRunning;
+        this.isPartial = isPartial;
+        this.startTimeMillis = startTimeMillis;
+        this.expirationTimeMillis = expirationTimeMillis;
+        this.totalShards = totalShards;
+        this.successfulShards = successfulShards;
+        this.skippedShards = skippedShards;
+        this.failedShards = failedShards;
+        this.completionStatus = completionStatus;
+    }
+
+    public static AsyncStatusResponse getStatusFromAsyncSearchResponseWithExpirationTime(AsyncSearchResponse asyncSearchResponse,
+            long expirationTimeMillis) {
+        int totalShards = 0;
+        int successfulShards = 0;
+        int skippedShards = 0;
+        int failedShards = 0;
+        RestStatus completionStatus = null;
+        SearchResponse searchResponse = asyncSearchResponse.getSearchResponse();
+        if (searchResponse != null) {
+            totalShards = searchResponse.getTotalShards();
+            successfulShards = searchResponse.getSuccessfulShards();
+            skippedShards = searchResponse.getSkippedShards();
+            failedShards = searchResponse.getFailedShards();
+        }
+        if (asyncSearchResponse.isRunning() == false) {
+            if (searchResponse != null) {
+                completionStatus = searchResponse.status();
+            } else {
+                Exception failure = asyncSearchResponse.getFailure();
+                if (failure != null) {
+                    completionStatus = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure));
+                }
+            }
+        }
+        return new AsyncStatusResponse(
+            asyncSearchResponse.getId(),
+            asyncSearchResponse.isRunning(),
+            asyncSearchResponse.isPartial(),
+            asyncSearchResponse.getStartTime(),
+            expirationTimeMillis,
+            totalShards,
+            successfulShards,
+            skippedShards,
+            failedShards,
+            completionStatus
+        );
+    }
+
+    public AsyncStatusResponse(StreamInput in) throws IOException {
+        this.id = in.readString();
+        this.isRunning = in.readBoolean();
+        this.isPartial = in.readBoolean();
+        this.startTimeMillis = in.readLong();
+        this.expirationTimeMillis = in.readLong();
+        this.totalShards = in.readVInt();
+        this.successfulShards = in.readVInt();
+        this.skippedShards = in.readVInt();
+        this.failedShards = in.readVInt();
+        this.completionStatus = (this.isRunning == false) ? RestStatus.readFrom(in) : null;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeString(id);
+        out.writeBoolean(isRunning);
+        out.writeBoolean(isPartial);
+        out.writeLong(startTimeMillis);
+        out.writeLong(expirationTimeMillis);
+        out.writeVInt(totalShards);
+        out.writeVInt(successfulShards);
+        out.writeVInt(skippedShards);
+        out.writeVInt(failedShards);
+        if (isRunning == false) {
+            RestStatus.writeTo(out, completionStatus);
+        }
+    }
+
+    @Override
+    public RestStatus status() {
+        return OK;
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
+        builder.startObject();
+        builder.field("id", id);
+        builder.field("is_running", isRunning);
+        builder.field("is_partial", isPartial);
+        builder.timeField("start_time_in_millis", "start_time", startTimeMillis);
+        builder.timeField("expiration_time_in_millis", "expiration_time", expirationTimeMillis);
+        RestActions.buildBroadcastShardsHeader(builder, params, totalShards, successfulShards, skippedShards, failedShards, null);
+        if (isRunning == false) { // completion status information is only available for a completed search
+            builder.field("completion_status", completionStatus.getStatus());
+        }
+        builder.endObject();
+        return builder;
+    }
+
+    /**
+     * Returns the id of the async search status request.
+     */
+    public String getId() {
+        return id;
+    }
+
+    /**
+     * Returns {@code true} if the search is still running in the cluster,
+     * or {@code false} if the search has been completed.
+     */
+    public boolean isRunning() {
+        return isRunning;
+    }
+
+    /**
+     * Returns {@code true} if the search results are partial.
+     * This could be either because async search hasn't finished yet,
+     * or if it finished and some shards have failed.
+     */
+    public boolean isPartial() {
+        return isPartial;
+    }
+
+    /**
+     * Returns a timestamp when the search tasks started, in milliseconds since epoch.
+     */
+    public long getStartTime() {
+        return startTimeMillis;
+    }
+
+    /**
+     * Returns a timestamp when the search will be expired, in milliseconds since epoch.
+     */
+    public long getExpirationTime() {
+        return expirationTimeMillis;
+    }
+
+    /**
+     * Returns the total number of shards the search is executed on.
+     */
+    public int getTotalShards() {
+        return totalShards;
+    }
+
+    /**
+     * Returns the number of successful shards the search was executed on.
+     */
+    public int getSuccessfulShards() {
+        return successfulShards;
+    }
+
+    /**
+     * Returns the number of skipped shards due to pre-filtering.
+     */
+    public int getSkippedShards() {
+        return skippedShards;
+    }
+
+    /**
+     * Returns the number of failed shards the search was executed on.
+     */
+    public int getFailedShards() {
+        return failedShards;
+    }
+
+    /**
+     * For a completed async search returns the completion status.
+     * For a still running async search returns {@code null}.
+     */
+    public RestStatus getCompletionStatus() {
+        return completionStatus;
+    }
+}

+ 17 - 0
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/search/action/GetAsyncStatusAction.java

@@ -0,0 +1,17 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+package org.elasticsearch.xpack.core.search.action;
+
+import org.elasticsearch.action.ActionType;
+
+public class GetAsyncStatusAction extends ActionType<AsyncStatusResponse> {
+    public static final GetAsyncStatusAction INSTANCE = new GetAsyncStatusAction();
+    public static final String NAME = "cluster:monitor/async_search/status";
+
+    private GetAsyncStatusAction() {
+        super(NAME, AsyncStatusResponse::new);
+    }
+}

+ 25 - 0
x-pack/plugin/src/test/resources/rest-api-spec/api/async_search.status.json

@@ -0,0 +1,25 @@
+{
+  "async_search.status":{
+    "documentation":{
+      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/current/async-search.html",
+      "description": "Retrieves the status of a previously submitted async search request given its ID."
+    },
+    "stability":"stable",
+    "url":{
+      "paths":[
+        {
+          "path":"/_async_search/status/{id}",
+          "methods":[
+            "GET"
+          ],
+          "parts":{
+            "id":{
+              "type":"string",
+              "description":"The async search ID"
+            }
+          }
+        }
+      ]
+    }
+  }
+}

+ 13 - 0
x-pack/plugin/src/test/resources/rest-api-spec/test/async_search/10_basic.yml

@@ -109,6 +109,14 @@
   - match:  { response.hits.hits.0._source.max:   1 }
   - match:  { response.aggregations.max.value:    3.0 }
 
+  - do:
+      async_search.status:
+        id: "$id"
+  - match:  { id:                             $id   }
+  - match:  { is_running:                     false }
+  - match:  { is_partial:                     false }
+  - match:  { completion_status:                200 }
+
   # test with typed_keys:
   - do:
       async_search.get:
@@ -132,6 +140,11 @@
       async_search.get:
         id: "$id"
 
+  - do:
+      catch: missing
+      async_search.status:
+        id: "$id"
+
   - do:
       catch: missing
       async_search.delete: