瀏覽代碼

EQL: Switch to RestCancellableNodeClient in EQL search (#56692)

Switches to RestCancellableNodeClient wrapper for eql search operation in
order to detect clients closing the connection and cancelling the operation.

Relates to #49638
Igor Motov 5 年之前
父節點
當前提交
c9694447ae

+ 2 - 0
x-pack/plugin/eql/build.gradle

@@ -41,6 +41,8 @@ dependencies {
   testCompile project(path: ':modules:reindex', configuration: 'runtime')
   testCompile project(path: ':modules:parent-join', configuration: 'runtime')
   testCompile project(path: ':modules:analysis-common', configuration: 'runtime')
+  testCompile project(path: ':modules:transport-netty4', configuration: 'runtime') // for http in RestEqlCancellationIT
+  testCompile project(path: ':plugins:transport-nio', configuration: 'runtime') // for http in RestEqlCancellationIT
 }
 
 

+ 20 - 1
x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/AbstractEqlBlockingIntegTestCase.java

@@ -64,8 +64,18 @@ public abstract class AbstractEqlBlockingIntegTestCase extends AbstractEqlIntegT
     }
 
     protected void disableBlocks(List<SearchBlockPlugin> plugins) {
+        disableFieldCapBlocks(plugins);
+        disableSearchBlocks(plugins);
+    }
+
+    protected void disableSearchBlocks(List<SearchBlockPlugin> plugins) {
         for (SearchBlockPlugin plugin : plugins) {
             plugin.disableSearchBlock();
+        }
+    }
+
+    protected void disableFieldCapBlocks(List<SearchBlockPlugin> plugins) {
+        for (SearchBlockPlugin plugin : plugins) {
             plugin.disableFieldCapBlock();
         }
     }
@@ -198,10 +208,19 @@ public abstract class AbstractEqlBlockingIntegTestCase extends AbstractEqlIntegT
     }
 
     protected TaskId findTaskWithXOpaqueId(String id, String action) {
+        TaskInfo taskInfo = getTaskInfoWithXOpaqueId(id, action);
+        if (taskInfo != null) {
+            return taskInfo.getTaskId();
+        } else {
+             return null;
+        }
+    }
+
+    protected TaskInfo getTaskInfoWithXOpaqueId(String id, String action) {
         ListTasksResponse tasks = client().admin().cluster().prepareListTasks().setActions(action).get();
         for (TaskInfo task : tasks.getTasks()) {
             if (id.equals(task.getHeaders().get(Task.X_OPAQUE_ID))) {
-                return task.getTaskId();
+                return task;
             }
         }
         return null;

+ 147 - 0
x-pack/plugin/eql/src/internalClusterTest/java/org/elasticsearch/xpack/eql/action/RestEqlCancellationIT.java

@@ -0,0 +1,147 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License;
+ * you may not use this file except in compliance with the Elastic License.
+ */
+
+package org.elasticsearch.xpack.eql.action;
+
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.client.Cancellable;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RequestOptions;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.ResponseListener;
+import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.transport.Netty4Plugin;
+import org.elasticsearch.transport.nio.NioTransportPlugin;
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+public class RestEqlCancellationIT extends AbstractEqlBlockingIntegTestCase {
+
+    private static String nodeHttpTypeKey;
+
+    @SuppressWarnings("unchecked")
+    @BeforeClass
+    public static void setUpTransport() {
+        nodeHttpTypeKey = getHttpTypeKey(randomFrom(Netty4Plugin.class, NioTransportPlugin.class));
+    }
+
+    @Override
+    protected boolean addMockHttpTransport() {
+        return false; // enable http
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal))
+            .put(NetworkModule.HTTP_TYPE_KEY, nodeHttpTypeKey).build();
+    }
+
+    private static String getHttpTypeKey(Class<? extends Plugin> clazz) {
+        if (clazz.equals(NioTransportPlugin.class)) {
+            return NioTransportPlugin.NIO_HTTP_TRANSPORT_NAME;
+        } else {
+            assert clazz.equals(Netty4Plugin.class);
+            return Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME;
+        }
+    }
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins());
+        plugins.add(getTestTransportPlugin());
+        plugins.add(Netty4Plugin.class);
+        plugins.add(NioTransportPlugin.class);
+        return plugins;
+    }
+
+    public void testRestCancellation() throws Exception {
+        assertAcked(client().admin().indices().prepareCreate("test")
+            .setMapping("val", "type=integer", "event_type", "type=keyword", "@timestamp", "type=date")
+            .get());
+        createIndex("idx_unmapped");
+
+        int numDocs = randomIntBetween(6, 20);
+
+        List<IndexRequestBuilder> builders = new ArrayList<>();
+
+        for (int i = 0; i < numDocs; i++) {
+            int fieldValue = randomIntBetween(0, 10);
+            builders.add(client().prepareIndex("test").setSource(
+                jsonBuilder().startObject()
+                    .field("val", fieldValue).field("event_type", "my_event").field("@timestamp", "2020-04-09T12:35:48Z")
+                    .endObject()));
+        }
+
+        indexRandom(true, builders);
+
+        // We are cancelling during both mapping and searching but we cancel during mapping so we should never reach the second block
+        List<SearchBlockPlugin> plugins = initBlockFactory(true, true);
+        org.elasticsearch.client.eql.EqlSearchRequest eqlSearchRequest =
+            new org.elasticsearch.client.eql.EqlSearchRequest("test", "my_event where val=1").eventCategoryField("event_type");
+        String id = randomAlphaOfLength(10);
+
+        Request request = new Request("GET", "/test/_eql/search");
+        request.setJsonEntity(Strings.toString(eqlSearchRequest));
+        request.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader(Task.X_OPAQUE_ID, id));
+        logger.trace("Preparing search");
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicReference<Exception> error = new AtomicReference<>();
+        Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
+            @Override
+            public void onSuccess(Response response) {
+                latch.countDown();
+            }
+
+            @Override
+            public void onFailure(Exception exception) {
+                error.set(exception);
+                latch.countDown();
+            }
+        });
+
+        logger.trace("Waiting for block to be established");
+        awaitForBlockedFieldCaps(plugins);
+        logger.trace("Block is established");
+        assertThat(getTaskInfoWithXOpaqueId(id, EqlSearchAction.NAME), notNullValue());
+        cancellable.cancel();
+        logger.trace("Request is cancelled");
+        disableFieldCapBlocks(plugins);
+        // The task should be cancelled before ever reaching search blocks
+        assertBusy(() -> {
+            assertThat(getTaskInfoWithXOpaqueId(id, EqlSearchAction.NAME), nullValue());
+        });
+        // Make sure it didn't reach search blocks
+        assertThat(getNumberOfContexts(plugins), equalTo(0));
+        disableSearchBlocks(plugins);
+
+        latch.await();
+        assertThat(error.get(), instanceOf(CancellationException.class));
+    }
+
+    @Override
+    protected boolean ignoreExternalCluster() {
+        return true;
+    }
+}

+ 12 - 8
x-pack/plugin/eql/src/main/java/org/elasticsearch/xpack/eql/plugin/RestEqlSearchAction.java

@@ -16,6 +16,7 @@ import org.elasticsearch.rest.BytesRestResponse;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestResponseListener;
 import org.elasticsearch.xpack.eql.action.EqlSearchAction;
 import org.elasticsearch.xpack.eql.action.EqlSearchRequest;
@@ -56,14 +57,17 @@ public class RestEqlSearchAction extends BaseRestHandler {
             eqlRequest.keepOnCompletion(request.paramAsBoolean("keep_on_completion", eqlRequest.keepOnCompletion()));
         }
 
-        return channel -> client.execute(EqlSearchAction.INSTANCE, eqlRequest, new RestResponseListener<>(channel) {
-            @Override
-            public RestResponse buildResponse(EqlSearchResponse response) throws Exception {
-                XContentBuilder builder = channel.newBuilder(request.getXContentType(), XContentType.JSON, true);
-                response.toXContent(builder, request);
-                return new BytesRestResponse(RestStatus.OK, builder);
-            }
-        });
+        return channel -> {
+            RestCancellableNodeClient cancellableClient = new RestCancellableNodeClient(client, request.getHttpChannel());
+            cancellableClient.execute(EqlSearchAction.INSTANCE, eqlRequest, new RestResponseListener<>(channel) {
+                @Override
+                public RestResponse buildResponse(EqlSearchResponse response) throws Exception {
+                    XContentBuilder builder = channel.newBuilder(request.getXContentType(), XContentType.JSON, true);
+                    response.toXContent(builder, request);
+                    return new BytesRestResponse(RestStatus.OK, builder);
+                }
+            });
+        };
     }
 
     @Override