Browse Source

Async search: Add ID and "is running" http headers (#112431)

Add the async execution ID and "is running" flag in the response as HTTP headers.
This allows users to know the request status without having to parse the response body.
It was also implemented in the `/_async_search/status/<id>` endpoint for consistency.

Continuation of https://github.com/elastic/elasticsearch/pull/111840, which implemented this same thing for ESQL.
Fixes https://github.com/elastic/elasticsearch/issues/109576
Iván Cea Fontenla 1 year ago
parent
commit
d59df8af3e

+ 6 - 0
docs/changelog/112431.yaml

@@ -0,0 +1,6 @@
+pr: 112431
+summary: "Async search: Add ID and \"is running\" http headers"
+area: Search
+type: feature
+issues:
+ - 109576

+ 5 - 0
x-pack/plugin/async-search/qa/rest/build.gradle

@@ -1,6 +1,7 @@
 import org.elasticsearch.gradle.internal.info.BuildParams
 
 apply plugin: 'elasticsearch.base-internal-es-plugin'
+apply plugin: 'elasticsearch.internal-java-rest-test'
 apply plugin: 'elasticsearch.legacy-yaml-rest-test'
 apply plugin: 'elasticsearch.legacy-yaml-rest-compat-test'
 
@@ -10,6 +11,10 @@ esplugin {
   classname 'org.elasticsearch.query.DeprecatedQueryPlugin'
 }
 
+dependencies {
+  clusterPlugins project(xpackModule('async-search'))
+}
+
 restResources {
   restApi {
     include '_common', 'indices', 'index', 'async_search'

+ 59 - 0
x-pack/plugin/async-search/qa/rest/src/javaRestTest/java/org/elasticsearch/qa/AsyncSearchHeadersIT.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.qa;
+
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.test.cluster.ElasticsearchCluster;
+import org.elasticsearch.test.rest.ESRestTestCase;
+import org.junit.Before;
+import org.junit.ClassRule;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class AsyncSearchHeadersIT extends ESRestTestCase {
+    @ClassRule
+    public static ElasticsearchCluster cluster = ElasticsearchCluster.local().plugin("x-pack-async-search").build();
+
+    @Override
+    protected String getTestRestCluster() {
+        return cluster.getHttpAddresses();
+    }
+
+    @Before
+    public void createIndex() throws IOException {
+        client().performRequest(new Request("PUT", "/test_index"));
+    }
+
+    public void testAsyncHeaders() throws IOException {
+        Response submitResponse = client().performRequest(new Request("POST", "/test_index/_async_search?keep_on_completion=true"));
+        var asyncExecutionId = assertAsyncHeaders(submitResponse);
+
+        Response statusResponse = client().performRequest(new Request("GET", "/_async_search/status/" + asyncExecutionId));
+        assertAsyncHeaders(statusResponse);
+
+        Response resultResponse = client().performRequest(new Request("GET", "/_async_search/" + asyncExecutionId));
+        assertAsyncHeaders(resultResponse);
+    }
+
+    private String assertAsyncHeaders(Response response) throws IOException {
+        var json = entityAsMap(response);
+
+        var asyncExecutionId = (String) json.get("id");
+        var isRunning = (boolean) json.get("is_running");
+
+        if (asyncExecutionId != null) {
+            assertThat(response.getHeader("X-ElasticSearch-Async-Id"), equalTo(asyncExecutionId));
+        }
+        assertThat(response.getHeader("X-ElasticSearch-Async-Is-Running"), equalTo(isRunning ? "?1" : "?0"));
+
+        return asyncExecutionId;
+    }
+}

+ 14 - 2
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncSearchAction.java

@@ -16,11 +16,13 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 import org.elasticsearch.xpack.core.XPackPlugin;
+import org.elasticsearch.xpack.core.async.AsyncExecutionId;
 import org.elasticsearch.xpack.core.async.AsyncResultsService;
 import org.elasticsearch.xpack.core.async.AsyncTaskIndexService;
 import org.elasticsearch.xpack.core.async.GetAsyncResultRequest;
@@ -32,6 +34,7 @@ import static org.elasticsearch.xpack.core.ClientHelper.ASYNC_SEARCH_ORIGIN;
 public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsyncResultRequest, AsyncSearchResponse> {
     private final AsyncResultsService<AsyncSearchTask, AsyncSearchResponse> resultsService;
     private final TransportService transportService;
+    private final ThreadContext threadContext;
 
     @Inject
     public TransportGetAsyncSearchAction(
@@ -45,6 +48,7 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsy
     ) {
         super(GetAsyncSearchAction.NAME, transportService, actionFilters, GetAsyncResultRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
         this.transportService = transportService;
+        this.threadContext = threadPool.getThreadContext();
         this.resultsService = createResultsService(transportService, clusterService, registry, client, threadPool, bigArrays);
     }
 
@@ -78,15 +82,23 @@ public class TransportGetAsyncSearchAction extends HandledTransportAction<GetAsy
 
     @Override
     protected void doExecute(Task task, GetAsyncResultRequest request, ActionListener<AsyncSearchResponse> listener) {
+        ActionListener<AsyncSearchResponse> listenerWithHeaders = listener.map(response -> {
+            threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, response.isRunning() ? "?1" : "?0");
+            if (response.getId() != null) {
+                threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, response.getId());
+            }
+            return response;
+        });
+
         DiscoveryNode node = resultsService.getNode(request.getId());
         if (node == null || resultsService.isLocalNode(node)) {
-            resultsService.retrieveResult(request, listener);
+            resultsService.retrieveResult(request, listenerWithHeaders);
         } else {
             transportService.sendRequest(
                 node,
                 GetAsyncSearchAction.NAME,
                 request,
-                new ActionListenerResponseHandler<>(listener, AsyncSearchResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
+                new ActionListenerResponseHandler<>(listenerWithHeaders, AsyncSearchResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
             );
         }
     }

+ 14 - 5
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportGetAsyncStatusAction.java

@@ -18,6 +18,7 @@ import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.EsExecutors;
+import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.injection.guice.Inject;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.tasks.Task;
@@ -40,6 +41,7 @@ import static org.elasticsearch.xpack.core.async.AsyncTaskIndexService.getTask;
 public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsyncStatusRequest, AsyncStatusResponse> {
     private final TransportService transportService;
     private final ClusterService clusterService;
+    private final ThreadContext threadContext;
     private final AsyncTaskIndexService<AsyncSearchResponse> store;
 
     @Inject
@@ -55,6 +57,7 @@ public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsy
         super(GetAsyncStatusAction.NAME, transportService, actionFilters, GetAsyncStatusRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
         this.transportService = transportService;
         this.clusterService = clusterService;
+        this.threadContext = threadPool.getThreadContext();
         this.store = new AsyncTaskIndexService<>(
             XPackPlugin.ASYNC_RESULTS_INDEX,
             clusterService,
@@ -73,6 +76,12 @@ public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsy
         DiscoveryNode node = clusterService.state().nodes().get(searchId.getTaskId().getNodeId());
         DiscoveryNode localNode = clusterService.state().getNodes().getLocalNode();
 
+        ActionListener<AsyncStatusResponse> listenerWithHeaders = listener.map(response -> {
+            threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, response.isRunning() ? "?1" : "?0");
+            threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, response.getId());
+            return response;
+        });
+
         if (node == null || Objects.equals(node, localNode)) {
             if (request.getKeepAlive() != null && request.getKeepAlive().getMillis() > 0) {
                 long expirationTime = System.currentTimeMillis() + request.getKeepAlive().getMillis();
@@ -87,17 +96,17 @@ public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsy
                         AsyncSearchTask.class,
                         AsyncSearchTask::getStatusResponse,
                         AsyncStatusResponse::getStatusFromStoredSearch,
-                        listener
+                        listenerWithHeaders
                     );
                 }, exc -> {
                     RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
                     if (status != RestStatus.NOT_FOUND) {
                         logger.error(() -> format("failed to update expiration time for async-search [%s]", searchId.getEncoded()), exc);
-                        listener.onFailure(exc);
+                        listenerWithHeaders.onFailure(exc);
                     } else {
                         // the async search document or its index is not found.
                         // That can happen if an invalid/deleted search id is provided.
-                        listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
+                        listenerWithHeaders.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
                     }
                 }));
             } else {
@@ -107,7 +116,7 @@ public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsy
                     AsyncSearchTask.class,
                     AsyncSearchTask::getStatusResponse,
                     AsyncStatusResponse::getStatusFromStoredSearch,
-                    listener
+                    listenerWithHeaders
                 );
             }
         } else {
@@ -115,7 +124,7 @@ public class TransportGetAsyncStatusAction extends HandledTransportAction<GetAsy
                 node,
                 GetAsyncStatusAction.NAME,
                 request,
-                new ActionListenerResponseHandler<>(listener, AsyncStatusResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
+                new ActionListenerResponseHandler<>(listenerWithHeaders, AsyncStatusResponse::new, EsExecutors.DIRECT_EXECUTOR_SERVICE)
             );
         }
     }

+ 20 - 6
x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/TransportSubmitAsyncSearchAction.java

@@ -92,6 +92,14 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
                 searchRequest
             );
             searchAction.execute(searchTask, searchRequest, searchTask.getSearchProgressActionListener());
+
+            ActionListener<AsyncSearchResponse> submitListenerWithHeaders = submitListener.map(response -> {
+                threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_IS_RUNNING_HEADER, response.isRunning() ? "?1" : "?0");
+                if (response.getId() != null) {
+                    threadContext.addResponseHeader(AsyncExecutionId.ASYNC_EXECUTION_ID_HEADER, response.getId());
+                }
+                return response;
+            });
             searchTask.addCompletionListener(new ActionListener<>() {
                 @Override
                 public void onResponse(AsyncSearchResponse searchResponse) {
@@ -119,14 +127,14 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
                                                         finalResponse -> onFinalResponse(searchTask, finalResponse, () -> {})
                                                     );
                                                 } finally {
-                                                    submitListener.onResponse(searchResponse);
+                                                    submitListenerWithHeaders.onResponse(searchResponse);
                                                 }
                                             } else {
                                                 searchResponse.mustIncRef();
                                                 onFinalResponse(
                                                     searchTask,
                                                     searchResponse,
-                                                    () -> ActionListener.respondAndRelease(submitListener, searchResponse)
+                                                    () -> ActionListener.respondAndRelease(submitListenerWithHeaders, searchResponse)
                                                 );
                                             }
                                         }
@@ -138,7 +146,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
                                                 exc,
                                                 searchResponse.isRunning(),
                                                 "fatal failure: unable to store initial response",
-                                                submitListener
+                                                submitListenerWithHeaders
                                             );
                                         }
                                     }, searchResponse::decRef)
@@ -147,14 +155,20 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
                                 initialResp.decRef();
                             }
                         } catch (Exception exc) {
-                            onFatalFailure(searchTask, exc, searchResponse.isRunning(), "fatal failure: generic error", submitListener);
+                            onFatalFailure(
+                                searchTask,
+                                exc,
+                                searchResponse.isRunning(),
+                                "fatal failure: generic error",
+                                submitListenerWithHeaders
+                            );
                         }
                     } else {
                         try (searchTask) {
                             // the task completed within the timeout so the response is sent back to the user
                             // with a null id since nothing was stored on the cluster.
                             taskManager.unregister(searchTask);
-                            ActionListener.respondAndRelease(submitListener, searchResponse.clone(null));
+                            ActionListener.respondAndRelease(submitListenerWithHeaders, searchResponse.clone(null));
                         }
                     }
                 }
@@ -163,7 +177,7 @@ public class TransportSubmitAsyncSearchAction extends HandledTransportAction<Sub
                 public void onFailure(Exception exc) {
                     // this will only ever be called if there is an issue scheduling the thread that executes
                     // the completion listener once the wait for completion timeout expires.
-                    onFatalFailure(searchTask, exc, true, "fatal failure: addCompletionListener", submitListener);
+                    onFatalFailure(searchTask, exc, true, "fatal failure: addCompletionListener", submitListenerWithHeaders);
                 }
             }, request.getWaitForCompletionTimeout());
         }