Browse Source

Cancel multisearch when http connection closed (#61399)

Relates #61337
Nhat Nguyen 5 years ago
parent
commit
066e83c691

+ 53 - 14
qa/smoke-test-http/src/test/java/org/elasticsearch/http/SearchRestCancellationIT.java

@@ -18,19 +18,25 @@
  */
 package org.elasticsearch.http;
 
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NByteArrayEntity;
 import org.apache.logging.log4j.LogManager;
 import org.apache.lucene.util.SetOnce;
 import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.search.MultiSearchAction;
+import org.elasticsearch.action.search.MultiSearchRequest;
 import org.elasticsearch.action.search.SearchAction;
+import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Cancellable;
 import org.elasticsearch.client.Request;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.ResponseListener;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.plugins.PluginsService;
 import org.elasticsearch.script.MockScriptPlugin;
@@ -45,6 +51,7 @@ import org.elasticsearch.tasks.TaskInfo;
 import org.elasticsearch.tasks.TaskManager;
 import org.elasticsearch.transport.TransportService;
 
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -75,15 +82,29 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
     }
 
     public void testAutomaticCancellationDuringQueryPhase() throws Exception {
-        Map<String, String> nodeIdToName = readNodesInfo();
-
-        List<ScriptedBlockPlugin> plugins = initBlockFactory();
-        indexTestData();
-
         Request searchRequest = new Request("GET", "/test/_search");
         SearchSourceBuilder searchSource = new SearchSourceBuilder().query(scriptQuery(
             new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap())));
         searchRequest.setJsonEntity(Strings.toString(searchSource));
+        verifyCancellationDuringQueryPhase(SearchAction.NAME, searchRequest);
+    }
+
+    public void testAutomaticCancellationMultiSearchDuringQueryPhase() throws Exception {
+        XContentType contentType = XContentType.JSON;
+        MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest("test")
+            .source(new SearchSourceBuilder().scriptField("test_field",
+                new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))));
+        Request restRequest = new Request("POST", "/_msearch");
+        byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
+        restRequest.setEntity(new NByteArrayEntity(requestBody, createContentType(contentType)));
+        verifyCancellationDuringQueryPhase(MultiSearchAction.NAME, restRequest);
+    }
+
+    void verifyCancellationDuringQueryPhase(String searchAction, Request searchRequest) throws Exception {
+        Map<String, String> nodeIdToName = readNodesInfo();
+
+        List<ScriptedBlockPlugin> plugins = initBlockFactory();
+        indexTestData();
 
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<Exception> error = new AtomicReference<>();
@@ -102,7 +123,7 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
 
         awaitForBlock(plugins);
         cancellable.cancel();
-        ensureSearchTaskIsCancelled(nodeIdToName::get);
+        ensureSearchTaskIsCancelled(searchAction, nodeIdToName::get);
 
         disableBlocks(plugins);
         latch.await();
@@ -110,15 +131,29 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
     }
 
     public void testAutomaticCancellationDuringFetchPhase() throws Exception {
-        Map<String, String> nodeIdToName = readNodesInfo();
-
-        List<ScriptedBlockPlugin> plugins = initBlockFactory();
-        indexTestData();
-
         Request searchRequest = new Request("GET", "/test/_search");
         SearchSourceBuilder searchSource = new SearchSourceBuilder().scriptField("test_field",
             new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()));
         searchRequest.setJsonEntity(Strings.toString(searchSource));
+        verifyCancellationDuringFetchPhase(SearchAction.NAME, searchRequest);
+    }
+
+    public void testAutomaticCancellationMultiSearchDuringFetchPhase() throws Exception {
+        XContentType contentType = XContentType.JSON;
+        MultiSearchRequest multiSearchRequest = new MultiSearchRequest().add(new SearchRequest("test")
+            .source(new SearchSourceBuilder().scriptField("test_field",
+                new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))));
+        Request restRequest = new Request("POST", "/_msearch");
+        byte[] requestBody = MultiSearchRequest.writeMultiLineFormat(multiSearchRequest, contentType.xContent());
+        restRequest.setEntity(new NByteArrayEntity(requestBody, createContentType(contentType)));
+        verifyCancellationDuringFetchPhase(MultiSearchAction.NAME, restRequest);
+    }
+
+    void verifyCancellationDuringFetchPhase(String searchAction, Request searchRequest) throws Exception {
+        Map<String, String> nodeIdToName = readNodesInfo();
+
+        List<ScriptedBlockPlugin> plugins = initBlockFactory();
+        indexTestData();
 
         CountDownLatch latch = new CountDownLatch(1);
         AtomicReference<Exception> error = new AtomicReference<>();
@@ -137,7 +172,7 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
 
         awaitForBlock(plugins);
         cancellable.cancel();
-        ensureSearchTaskIsCancelled(nodeIdToName::get);
+        ensureSearchTaskIsCancelled(searchAction, nodeIdToName::get);
 
         disableBlocks(plugins);
         latch.await();
@@ -154,11 +189,11 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
         return nodeIdToName;
     }
 
-    private static void ensureSearchTaskIsCancelled(Function<String, String> nodeIdToName) throws Exception {
+    private static void ensureSearchTaskIsCancelled(String transportAction, Function<String, String> nodeIdToName) throws Exception {
         SetOnce<TaskInfo> searchTask = new SetOnce<>();
         ListTasksResponse listTasksResponse = client().admin().cluster().prepareListTasks().get();
         for (TaskInfo task : listTasksResponse.getTasks()) {
-            if (task.getAction().equals(SearchAction.NAME)) {
+            if (task.getAction().equals(transportAction)) {
                 searchTask.set(task);
             }
         }
@@ -248,4 +283,8 @@ public class SearchRestCancellationIT extends HttpSmokeTestCase {
             });
         }
     }
+
+    private static ContentType createContentType(final XContentType xContentType) {
+        return ContentType.create(xContentType.mediaTypeWithoutParameters(), (Charset) null);
+    }
 }

+ 7 - 2
server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.rest.action.search;
 
+import org.elasticsearch.action.search.MultiSearchAction;
 import org.elasticsearch.action.search.MultiSearchRequest;
 import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.support.IndicesOptions;
@@ -33,6 +34,7 @@ import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestToXContentListener;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 
@@ -79,8 +81,11 @@ public class RestMultiSearchAction extends BaseRestHandler {
 
     @Override
     public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
-        MultiSearchRequest multiSearchRequest = parseRequest(request, allowExplicitIndex);
-        return channel -> client.multiSearch(multiSearchRequest, new RestToXContentListener<>(channel));
+        final MultiSearchRequest multiSearchRequest = parseRequest(request, allowExplicitIndex);
+        return channel -> {
+            final RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
+            cancellableClient.execute(MultiSearchAction.INSTANCE, multiSearchRequest, new RestToXContentListener<>(channel));
+        };
     }
 
     /**