浏览代码

Add support for task cancellation to RestNodesStatsAction (#71897)

Francisco Fernández Castaño 4 年之前
父节点
当前提交
0642c73ed2

+ 19 - 0
qa/smoke-test-http/src/test/java/org/elasticsearch/http/NodeStatsRestCancellationIT.java

@@ -0,0 +1,19 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.http;
+
+import org.apache.http.client.methods.HttpGet;
+import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
+import org.elasticsearch.client.Request;
+
+public class NodeStatsRestCancellationIT extends BlockedSearcherRestCancellationTestCase {
+    public void testNodeStatsRestCancellation() throws Exception {
+        runTest(new Request(HttpGet.METHOD_NAME, "/_nodes/stats"), NodesStatsAction.NAME);
+    }
+}

+ 10 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsRequest.java

@@ -12,9 +12,14 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.support.nodes.BaseNodesRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.tasks.CancellableTask;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -154,6 +159,11 @@ public class NodesStatsRequest extends BaseNodesRequest<NodesStatsRequest> {
         }
     }
 
+    @Override
+    public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+        return new CancellableTask(id, type, action, "", parentTaskId, headers);
+    }
+
     @Override
     public void writeTo(StreamOutput out) throws IOException {
         super.writeTo(out);

+ 10 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java

@@ -16,13 +16,16 @@ import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.node.NodeService;
+import org.elasticsearch.tasks.CancellableTask;
 import org.elasticsearch.tasks.Task;
+import org.elasticsearch.tasks.TaskId;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportRequest;
 import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRequest,
@@ -57,6 +60,8 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
 
     @Override
     protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest, Task task) {
+        assert task instanceof CancellableTask;
+
         NodesStatsRequest request = nodeStatsRequest.request;
         Set<String> metrics = request.requestedMetrics();
         return nodeService.stats(
@@ -90,6 +95,11 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
             this.request = request;
         }
 
+        @Override
+        public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
+            return new CancellableTask(id, type, action, "", parentTaskId, headers);
+        }
+
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);

+ 3 - 1
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsAction.java

@@ -16,6 +16,7 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestActions.NodesResponseRestListener;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 
 import java.io.IOException;
 import java.util.Collections;
@@ -162,7 +163,8 @@ public class RestNodesStatsAction extends BaseRestHandler {
             nodesStatsRequest.indices().includeUnloadedSegments(request.paramAsBoolean("include_unloaded_segments", false));
         }
 
-        return channel -> client.admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
+        return channel -> new RestCancellableNodeClient(client, request.getHttpChannel())
+            .admin().cluster().nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
     }
 
     private final Set<String> RESPONSE_PARAMS = Collections.singleton("level");