Pārlūkot izejas kodu

Make GET /_cat/segments cancellable (#69020)

A small followup to #67413 and #68965: the underlying actions of the
`GET /_cat/segments` API are now cancellable, so we may as well cancel
them if needed.
David Turner 4 gadi atpakaļ
vecāks
revīzija
b6598c6816

+ 10 - 4
qa/smoke-test-http/src/test/java/org/elasticsearch/http/IndicesSegmentsRestCancellationIT.java

@@ -64,7 +64,15 @@ public class IndicesSegmentsRestCancellationIT extends HttpSmokeTestCase {
         return false;
     }
 
-    public void testClusterStateRestCancellation() throws Exception {
+    public void testIndicesSegmentsRestCancellation() throws Exception {
+        runTest(new Request(HttpGet.METHOD_NAME, "/_segments"));
+    }
+
+    public void testCatSegmentsRestCancellation() throws Exception {
+        runTest(new Request(HttpGet.METHOD_NAME, "/_cat/segments"));
+    }
+
+    private void runTest(Request request) throws Exception {
 
         createIndex("test", Settings.builder().put(BLOCK_SEARCHER_SETTING.getKey(), true).build());
         ensureGreen("test");
@@ -89,11 +97,9 @@ public class IndicesSegmentsRestCancellationIT extends HttpSmokeTestCase {
                 releasables.add(searcherBlock::release);
             }
 
-            final Request indicesSegments = new Request(HttpGet.METHOD_NAME, "/_segments");
-
             final PlainActionFuture<Void> future = new PlainActionFuture<>();
             logger.info("--> sending indices segments request");
-            final Cancellable cancellable = getRestClient().performRequestAsync(indicesSegments, new ResponseListener() {
+            final Cancellable cancellable = getRestClient().performRequestAsync(request, new ResponseListener() {
                 @Override
                 public void onSuccess(Response response) {
                     future.onResponse(null);

+ 9 - 2
server/src/main/java/org/elasticsearch/rest/action/cat/RestSegmentsAction.java

@@ -23,7 +23,9 @@ import org.elasticsearch.index.engine.Segment;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestResponse;
 import org.elasticsearch.rest.action.RestActionListener;
+import org.elasticsearch.rest.action.RestCancellableNodeClient;
 import org.elasticsearch.rest.action.RestResponseListener;
+import org.elasticsearch.tasks.TaskCancelledException;
 
 import java.util.List;
 import java.util.Map;
@@ -58,14 +60,19 @@ public class RestSegmentsAction extends AbstractCatAction {
         clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));
         clusterStateRequest.clear().nodes(true).routingTable(true).indices(indices);
 
-        return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
+        final RestCancellableNodeClient cancelClient = new RestCancellableNodeClient(client, request.getHttpChannel());
+
+        return channel -> cancelClient.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
             @Override
             public void processResponse(final ClusterStateResponse clusterStateResponse) {
                 final IndicesSegmentsRequest indicesSegmentsRequest = new IndicesSegmentsRequest();
                 indicesSegmentsRequest.indices(indices);
-                client.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) {
+                cancelClient.admin().indices().segments(indicesSegmentsRequest, new RestResponseListener<IndicesSegmentResponse>(channel) {
                     @Override
                     public RestResponse buildResponse(final IndicesSegmentResponse indicesSegmentResponse) throws Exception {
+                        if (request.getHttpChannel().isOpen() == false) {
+                            throw new TaskCancelledException("response channel [" + request.getHttpChannel() + "] closed");
+                        }
                         final Map<String, IndexSegments> indicesSegments = indicesSegmentResponse.getIndices();
                         Table tab = buildTable(request, clusterStateResponse, indicesSegments);
                         return RestTable.buildResponse(tab, channel);