Browse Source

Chunked encoding for pending tasks API (#91929)

This response can reach a few MiB in size in an overwhelmed cluster,
let's use chunking so as not to make things worse than they already are.

Relates #89838
David Turner 2 years ago
parent
commit
a88bea9d5d

+ 3 - 3
server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java

@@ -38,7 +38,7 @@ public class PendingTasksBlocksIT extends ESIntegTestCase {
             try {
             try {
                 enableIndexBlock("test", blockSetting);
                 enableIndexBlock("test", blockSetting);
                 PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get();
                 PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get();
-                assertNotNull(response.getPendingTasks());
+                assertNotNull(response.pendingTasks());
             } finally {
             } finally {
                 disableIndexBlock("test", blockSetting);
                 disableIndexBlock("test", blockSetting);
             }
             }
@@ -54,7 +54,7 @@ public class PendingTasksBlocksIT extends ESIntegTestCase {
         try {
         try {
             setClusterReadOnly(true);
             setClusterReadOnly(true);
             PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get();
             PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get();
-            assertNotNull(response.getPendingTasks());
+            assertNotNull(response.pendingTasks());
         } finally {
         } finally {
             setClusterReadOnly(false);
             setClusterReadOnly(false);
         }
         }
@@ -80,7 +80,7 @@ public class PendingTasksBlocksIT extends ESIntegTestCase {
             }
             }
         });
         });
 
 
-        assertNotNull(client().admin().cluster().preparePendingClusterTasks().get().getPendingTasks());
+        assertNotNull(client().admin().cluster().preparePendingClusterTasks().get().pendingTasks());
 
 
         // starting one more node allows the cluster to recover
         // starting one more node allows the cluster to recover
         internalCluster().startNode();
         internalCluster().startNode();

+ 2 - 2
server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java

@@ -366,7 +366,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
         assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
         assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
         assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
         assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
         assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
         assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
-        for (PendingClusterTask task : response) {
+        for (PendingClusterTask task : response.pendingTasks()) {
             controlSources.remove(task.getSource().string());
             controlSources.remove(task.getSource().string());
         }
         }
         assertTrue(controlSources.isEmpty());
         assertTrue(controlSources.isEmpty());
@@ -431,7 +431,7 @@ public class ClusterServiceIT extends ESIntegTestCase {
             response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
             response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
             assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
             assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
             controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
             controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
-            for (PendingClusterTask task : response) {
+            for (PendingClusterTask task : response.pendingTasks()) {
                 if (controlSources.remove(task.getSource().string())) {
                 if (controlSources.remove(task.getSource().string())) {
                     assertThat(task.getTimeInQueueInMillis(), greaterThan(0L));
                     assertThat(task.getTimeInQueueInMillis(), greaterThan(0L));
                 }
                 }

+ 17 - 24
server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java

@@ -10,16 +10,17 @@ package org.elasticsearch.action.admin.cluster.tasks;
 
 
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.action.ActionResponse;
 import org.elasticsearch.cluster.service.PendingClusterTask;
 import org.elasticsearch.cluster.service.PendingClusterTask;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.xcontent.ToXContentObject;
-import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.ChunkedToXContent;
+import org.elasticsearch.xcontent.ToXContent;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.List;
 
 
-public class PendingClusterTasksResponse extends ActionResponse implements Iterable<PendingClusterTask>, ToXContentObject {
+public class PendingClusterTasksResponse extends ActionResponse implements ChunkedToXContent {
 
 
     private final List<PendingClusterTask> pendingTasks;
     private final List<PendingClusterTask> pendingTasks;
 
 
@@ -36,23 +37,11 @@ public class PendingClusterTasksResponse extends ActionResponse implements Itera
         return pendingTasks;
         return pendingTasks;
     }
     }
 
 
-    /**
-     * The pending cluster tasks
-     */
-    public List<PendingClusterTask> getPendingTasks() {
-        return pendingTasks();
-    }
-
-    @Override
-    public Iterator<PendingClusterTask> iterator() {
-        return pendingTasks.iterator();
-    }
-
     @Override
     @Override
     public String toString() {
     public String toString() {
         StringBuilder sb = new StringBuilder();
         StringBuilder sb = new StringBuilder();
         sb.append("tasks: (").append(pendingTasks.size()).append("):\n");
         sb.append("tasks: (").append(pendingTasks.size()).append("):\n");
-        for (PendingClusterTask pendingClusterTask : this) {
+        for (PendingClusterTask pendingClusterTask : pendingTasks) {
             sb.append(pendingClusterTask.getInsertOrder())
             sb.append(pendingClusterTask.getInsertOrder())
                 .append("/")
                 .append("/")
                 .append(pendingClusterTask.getPriority())
                 .append(pendingClusterTask.getPriority())
@@ -66,10 +55,12 @@ public class PendingClusterTasksResponse extends ActionResponse implements Itera
     }
     }
 
 
     @Override
     @Override
-    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-        builder.startObject();
-        builder.startArray(Fields.TASKS);
-        for (PendingClusterTask pendingClusterTask : this) {
+    public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
+        return Iterators.concat(Iterators.single((builder, p) -> {
+            builder.startObject();
+            builder.startArray(Fields.TASKS);
+            return builder;
+        }), pendingTasks.stream().<ToXContent>map(pendingClusterTask -> (builder, p) -> {
             builder.startObject();
             builder.startObject();
             builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder());
             builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder());
             builder.field(Fields.PRIORITY, pendingClusterTask.getPriority());
             builder.field(Fields.PRIORITY, pendingClusterTask.getPriority());
@@ -78,10 +69,12 @@ public class PendingClusterTasksResponse extends ActionResponse implements Itera
             builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
             builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
             builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
             builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
             builder.endObject();
             builder.endObject();
-        }
-        builder.endArray();
-        builder.endObject();
-        return builder;
+            return builder;
+        }).iterator(), Iterators.single((builder, p) -> {
+            builder.endArray();
+            builder.endObject();
+            return builder;
+        }));
     }
     }
 
 
     static final class Fields {
     static final class Fields {

+ 4 - 2
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java

@@ -12,7 +12,7 @@ import org.elasticsearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.RestRequest;
-import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.rest.action.RestChunkedToXContentListener;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.util.List;
 import java.util.List;
@@ -36,6 +36,8 @@ public class RestPendingClusterTasksAction extends BaseRestHandler {
         PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
         PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
         pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
         pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
         pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
         pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
-        return channel -> client.admin().cluster().pendingClusterTasks(pendingClusterTasksRequest, new RestToXContentListener<>(channel));
+        return channel -> client.admin()
+            .cluster()
+            .pendingClusterTasks(pendingClusterTasksRequest, new RestChunkedToXContentListener<>(channel));
     }
     }
 }
 }

+ 2 - 2
server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java

@@ -66,10 +66,10 @@ public class RestPendingClusterTasksAction extends AbstractCatAction {
         return t;
         return t;
     }
     }
 
 
-    private Table buildTable(RestRequest request, PendingClusterTasksResponse tasks) {
+    private Table buildTable(RestRequest request, PendingClusterTasksResponse response) {
         Table t = getTableWithHeader(request);
         Table t = getTableWithHeader(request);
 
 
-        for (PendingClusterTask task : tasks) {
+        for (PendingClusterTask task : response.pendingTasks()) {
             t.startRow();
             t.startRow();
             t.addCell(task.getInsertOrder());
             t.addCell(task.getInsertOrder());
             t.addCell(task.getTimeInQueue());
             t.addCell(task.getTimeInQueue());

+ 49 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponseTests.java

@@ -0,0 +1,49 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.tasks;
+
+import org.elasticsearch.cluster.service.PendingClusterTask;
+import org.elasticsearch.common.Priority;
+import org.elasticsearch.common.text.Text;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
+
+public class PendingClusterTasksResponseTests extends ESTestCase {
+    public void testPendingClusterTasksResponseChunking() throws IOException {
+        final var tasks = new ArrayList<PendingClusterTask>();
+        for (int i = between(0, 10); i > 0; i--) {
+            tasks.add(
+                new PendingClusterTask(
+                    randomNonNegativeLong(),
+                    randomFrom(Priority.values()),
+                    new Text(randomAlphaOfLengthBetween(1, 10)),
+                    randomNonNegativeLong(),
+                    randomBoolean()
+                )
+            );
+        }
+
+        int chunkCount = 0;
+        try (XContentBuilder builder = jsonBuilder()) {
+            final var iterator = new PendingClusterTasksResponse(tasks).toXContentChunked(ToXContent.EMPTY_PARAMS);
+            while (iterator.hasNext()) {
+                iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
+                chunkCount += 1;
+            }
+        } // closing the builder verifies that the XContent is well-formed
+
+        assertEquals(tasks.size() + 2, chunkCount);
+    }
+}

+ 5 - 1
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -842,7 +842,11 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
                 ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
                 assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
                 assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
                 PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
                 PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
-                assertThat("client " + client + " still has pending tasks " + pendingTasks, pendingTasks, Matchers.emptyIterable());
+                assertThat(
+                    "client " + client + " still has pending tasks " + pendingTasks,
+                    pendingTasks.pendingTasks(),
+                    Matchers.emptyIterable()
+                );
                 clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
                 clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
                 assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
                 assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
             }
             }