ソースを参照

Use chunked encoding for indices stats response (#91760)

These responses can become huge, lets chunk them by index.
Armin Braun 2 年 前
コミット
7dbc1ea36e

+ 62 - 64
server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java

@@ -13,15 +13,13 @@ import org.apache.lucene.search.SortField;
 import org.apache.lucene.search.SortedNumericSortField;
 import org.apache.lucene.search.SortedSetSortField;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
-import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
+import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse;
 import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.unit.ByteSizeValue;
-import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.index.engine.Segment;
-import org.elasticsearch.rest.action.RestActions;
 import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 
@@ -33,7 +31,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 
-public class IndicesSegmentResponse extends BaseBroadcastResponse implements ChunkedToXContent {
+public class IndicesSegmentResponse extends ChunkedBroadcastResponse {
 
     private final ShardSegments[] shards;
 
@@ -79,72 +77,72 @@ public class IndicesSegmentResponse extends BaseBroadcastResponse implements Chu
     }
 
     @Override
-    public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
-        return Iterators.concat(Iterators.single(((builder, params) -> {
-            builder.startObject();
-            RestActions.buildBroadcastShardsHeader(builder, params, this);
-            return builder.startObject(Fields.INDICES);
-        })), getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, params) -> {
-            builder.startObject(indexSegments.getIndex());
-
-            builder.startObject(Fields.SHARDS);
-            for (IndexShardSegments indexSegment : indexSegments) {
-                builder.startArray(Integer.toString(indexSegment.shardId().id()));
-                for (ShardSegments shardSegments : indexSegment) {
-                    builder.startObject();
-
-                    builder.startObject(Fields.ROUTING);
-                    builder.field(Fields.STATE, shardSegments.getShardRouting().state());
-                    builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary());
-                    builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId());
-                    if (shardSegments.getShardRouting().relocatingNodeId() != null) {
-                        builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId());
-                    }
-                    builder.endObject();
-
-                    builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted());
-                    builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch());
-
-                    builder.startObject(Fields.SEGMENTS);
-                    for (Segment segment : shardSegments) {
-                        builder.startObject(segment.getName());
-                        builder.field(Fields.GENERATION, segment.getGeneration());
-                        builder.field(Fields.NUM_DOCS, segment.getNumDocs());
-                        builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs());
-                        builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize());
-                        if (builder.getRestApiVersion() == RestApiVersion.V_7) {
-                            builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO);
-                        }
-                        builder.field(Fields.COMMITTED, segment.isCommitted());
-                        builder.field(Fields.SEARCH, segment.isSearch());
-                        if (segment.getVersion() != null) {
-                            builder.field(Fields.VERSION, segment.getVersion());
-                        }
-                        if (segment.isCompound() != null) {
-                            builder.field(Fields.COMPOUND, segment.isCompound());
+    protected Iterator<ToXContent> customXContentChunks(ToXContent.Params params) {
+        return Iterators.concat(
+            Iterators.single((builder, p) -> builder.startObject(Fields.INDICES)),
+            getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, p) -> {
+                builder.startObject(indexSegments.getIndex());
+
+                builder.startObject(Fields.SHARDS);
+                for (IndexShardSegments indexSegment : indexSegments) {
+                    builder.startArray(Integer.toString(indexSegment.shardId().id()));
+                    for (ShardSegments shardSegments : indexSegment) {
+                        builder.startObject();
+
+                        builder.startObject(Fields.ROUTING);
+                        builder.field(Fields.STATE, shardSegments.getShardRouting().state());
+                        builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary());
+                        builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId());
+                        if (shardSegments.getShardRouting().relocatingNodeId() != null) {
+                            builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId());
                         }
-                        if (segment.getMergeId() != null) {
-                            builder.field(Fields.MERGE_ID, segment.getMergeId());
-                        }
-                        if (segment.getSegmentSort() != null) {
-                            toXContent(builder, segment.getSegmentSort());
-                        }
-                        if (segment.attributes != null && segment.attributes.isEmpty() == false) {
-                            builder.field("attributes", segment.attributes);
+                        builder.endObject();
+
+                        builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted());
+                        builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch());
+
+                        builder.startObject(Fields.SEGMENTS);
+                        for (Segment segment : shardSegments) {
+                            builder.startObject(segment.getName());
+                            builder.field(Fields.GENERATION, segment.getGeneration());
+                            builder.field(Fields.NUM_DOCS, segment.getNumDocs());
+                            builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs());
+                            builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize());
+                            if (builder.getRestApiVersion() == RestApiVersion.V_7) {
+                                builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO);
+                            }
+                            builder.field(Fields.COMMITTED, segment.isCommitted());
+                            builder.field(Fields.SEARCH, segment.isSearch());
+                            if (segment.getVersion() != null) {
+                                builder.field(Fields.VERSION, segment.getVersion());
+                            }
+                            if (segment.isCompound() != null) {
+                                builder.field(Fields.COMPOUND, segment.isCompound());
+                            }
+                            if (segment.getMergeId() != null) {
+                                builder.field(Fields.MERGE_ID, segment.getMergeId());
+                            }
+                            if (segment.getSegmentSort() != null) {
+                                toXContent(builder, segment.getSegmentSort());
+                            }
+                            if (segment.attributes != null && segment.attributes.isEmpty() == false) {
+                                builder.field("attributes", segment.attributes);
+                            }
+                            builder.endObject();
                         }
                         builder.endObject();
-                    }
-                    builder.endObject();
 
-                    builder.endObject();
+                        builder.endObject();
+                    }
+                    builder.endArray();
                 }
-                builder.endArray();
-            }
-            builder.endObject();
+                builder.endObject();
 
-            builder.endObject();
-            return builder;
-        }).iterator(), Iterators.single((builder, params) -> builder.endObject().endObject()));
+                builder.endObject();
+                return builder;
+            }).iterator(),
+            Iterators.single((builder, p) -> builder.endObject())
+        );
     }
 
     private static void toXContent(XContentBuilder builder, Sort sort) throws IOException {

+ 33 - 24
server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java

@@ -11,20 +11,23 @@ package org.elasticsearch.action.admin.indices.stats;
 import org.elasticsearch.Version;
 import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder;
 import org.elasticsearch.action.support.DefaultShardOperationFailedException;
-import org.elasticsearch.action.support.broadcast.BroadcastResponse;
+import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.health.ClusterIndexHealth;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.Index;
+import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -33,7 +36,7 @@ import java.util.stream.Collectors;
 
 import static java.util.Collections.unmodifiableMap;
 
-public class IndicesStatsResponse extends BroadcastResponse {
+public class IndicesStatsResponse extends ChunkedBroadcastResponse {
 
     private final Map<String, ClusterHealthStatus> indexHealthMap;
 
@@ -171,7 +174,7 @@ public class IndicesStatsResponse extends BroadcastResponse {
     }
 
     @Override
-    protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
+    protected Iterator<ToXContent> customXContentChunks(ToXContent.Params params) {
         final String level = params.param("level", "indices");
         final boolean isLevelValid = "cluster".equalsIgnoreCase(level)
             || "indices".equalsIgnoreCase(level)
@@ -179,22 +182,11 @@ public class IndicesStatsResponse extends BroadcastResponse {
         if (isLevelValid == false) {
             throw new IllegalArgumentException("level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]");
         }
-
-        builder.startObject("_all");
-
-        builder.startObject("primaries");
-        getPrimaries().toXContent(builder, params);
-        builder.endObject();
-
-        builder.startObject("total");
-        getTotal().toXContent(builder, params);
-        builder.endObject();
-
-        builder.endObject();
-
         if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) {
-            builder.startObject(Fields.INDICES);
-            for (IndexStats indexStats : getIndices().values()) {
+            return Iterators.concat(Iterators.single(((builder, p) -> {
+                commonStats(builder, p);
+                return builder.startObject(Fields.INDICES);
+            })), getIndices().values().stream().<ToXContent>map(indexStats -> (builder, p) -> {
                 builder.startObject(indexStats.getIndex());
                 builder.field("uuid", indexStats.getUuid());
                 if (indexStats.getHealth() != null) {
@@ -204,11 +196,11 @@ public class IndicesStatsResponse extends BroadcastResponse {
                     builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT));
                 }
                 builder.startObject("primaries");
-                indexStats.getPrimaries().toXContent(builder, params);
+                indexStats.getPrimaries().toXContent(builder, p);
                 builder.endObject();
 
                 builder.startObject("total");
-                indexStats.getTotal().toXContent(builder, params);
+                indexStats.getTotal().toXContent(builder, p);
                 builder.endObject();
 
                 if ("shards".equalsIgnoreCase(level)) {
@@ -217,17 +209,34 @@ public class IndicesStatsResponse extends BroadcastResponse {
                         builder.startArray(Integer.toString(indexShardStats.getShardId().id()));
                         for (ShardStats shardStats : indexShardStats) {
                             builder.startObject();
-                            shardStats.toXContent(builder, params);
+                            shardStats.toXContent(builder, p);
                             builder.endObject();
                         }
                         builder.endArray();
                     }
                     builder.endObject();
                 }
-                builder.endObject();
-            }
-            builder.endObject();
+                return builder.endObject();
+            }).iterator(), Iterators.single((b, p) -> b.endObject()));
         }
+        return Iterators.single((b, p) -> {
+            commonStats(b, p);
+            return b;
+        });
+    }
+
+    private void commonStats(XContentBuilder builder, ToXContent.Params p) throws IOException {
+        builder.startObject("_all");
+
+        builder.startObject("primaries");
+        getPrimaries().toXContent(builder, p);
+        builder.endObject();
+
+        builder.startObject("total");
+        getTotal().toXContent(builder, p);
+        builder.endObject();
+
+        builder.endObject();
     }
 
     static final class Fields {

+ 45 - 0
server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadcastResponse.java

@@ -0,0 +1,45 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+package org.elasticsearch.action.support.broadcast;
+
+import org.elasticsearch.action.support.DefaultShardOperationFailedException;
+import org.elasticsearch.common.collect.Iterators;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.xcontent.ChunkedToXContent;
+import org.elasticsearch.rest.action.RestActions;
+import org.elasticsearch.xcontent.ToXContent;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+public abstract class ChunkedBroadcastResponse extends BaseBroadcastResponse implements ChunkedToXContent {
+    public ChunkedBroadcastResponse(StreamInput in) throws IOException {
+        super(in);
+    }
+
+    public ChunkedBroadcastResponse(
+        int totalShards,
+        int successfulShards,
+        int failedShards,
+        List<DefaultShardOperationFailedException> shardFailures
+    ) {
+        super(totalShards, successfulShards, failedShards, shardFailures);
+    }
+
+    @Override
+    public final Iterator<ToXContent> toXContentChunked(ToXContent.Params params) {
+        return Iterators.concat(Iterators.single((b, p) -> {
+            b.startObject();
+            RestActions.buildBroadcastShardsHeader(b, p, this);
+            return b;
+        }), customXContentChunks(params), Iterators.single((builder, p) -> builder.endObject()));
+    }
+
+    protected abstract Iterator<ToXContent> customXContentChunks(ToXContent.Params params);
+}

+ 2 - 2
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java

@@ -19,7 +19,7 @@ import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
 import org.elasticsearch.rest.action.RestCancellableNodeClient;
-import org.elasticsearch.rest.action.RestToXContentListener;
+import org.elasticsearch.rest.action.RestChunkedToXContentListener;
 import org.elasticsearch.rest.action.document.RestMultiTermVectorsAction;
 
 import java.io.IOException;
@@ -140,7 +140,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
 
         return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
             .indices()
-            .stats(indicesStatsRequest, new RestToXContentListener<>(channel));
+            .stats(indicesStatsRequest, new RestChunkedToXContentListener<>(channel));
     }
 
     @Override

+ 1 - 1
server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java

@@ -74,6 +74,6 @@ public class IndicesSegmentResponseTests extends ESTestCase {
             iterator.next();
             chunks++;
         }
-        assertEquals(indices + 2, chunks);
+        assertEquals(indices + 4, chunks);
     }
 }

+ 45 - 3
server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java

@@ -13,13 +13,17 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
 import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.io.Streams;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardPath;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.xcontent.ToXContent;
+import org.elasticsearch.xcontent.XContentBuilder;
+import org.elasticsearch.xcontent.XContentType;
 import org.elasticsearch.xcontent.json.JsonXContent;
 
+import java.io.IOException;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,7 +45,7 @@ public class IndicesStatsResponseTests extends ESTestCase {
         final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level));
         final IllegalArgumentException e = expectThrows(
             IllegalArgumentException.class,
-            () -> response.toXContent(JsonXContent.contentBuilder(), params)
+            () -> response.toXContentChunked(params).next().toXContent(JsonXContent.contentBuilder(), params)
         );
         assertThat(
             e,
@@ -64,7 +68,7 @@ public class IndicesStatsResponseTests extends ESTestCase {
                 ShardId shId = new ShardId(index, shardId);
                 Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardId));
                 ShardPath shardPath = new ShardPath(false, path, path, shId);
-                ShardRouting routing = createShardRouting(index, shId, (shardId == 0));
+                ShardRouting routing = createShardRouting(shId, (shardId == 0));
                 shards.add(new ShardStats(routing, shardPath, null, null, null, null));
                 AtomicLong primaryShardsCounter = expectedIndexToPrimaryShardsCount.computeIfAbsent(
                     index.getName(),
@@ -105,7 +109,45 @@ public class IndicesStatsResponseTests extends ESTestCase {
         }
     }
 
-    private ShardRouting createShardRouting(Index index, ShardId shardId, boolean isPrimary) {
+    public void testChunkedEncodingPerIndex() throws IOException {
+        final int shards = randomIntBetween(1, 10);
+        final List<ShardStats> stats = new ArrayList<>(shards);
+        for (int i = 0; i < shards; i++) {
+            ShardId shId = new ShardId(createIndex("index-" + i), randomIntBetween(0, 1));
+            Path path = createTempDir().resolve("indices").resolve(shId.getIndex().getUUID()).resolve(String.valueOf(shId.id()));
+            ShardPath shardPath = new ShardPath(false, path, path, shId);
+            ShardRouting routing = createShardRouting(shId, (shId.id() == 0));
+            stats.add(new ShardStats(routing, shardPath, new CommonStats(), null, null, null));
+        }
+        final IndicesStatsResponse indicesStatsResponse = new IndicesStatsResponse(
+            stats.toArray(new ShardStats[0]),
+            shards,
+            shards,
+            0,
+            null,
+            ClusterState.EMPTY_STATE
+        );
+        final ToXContent.Params paramsClusterLevel = new ToXContent.MapParams(Map.of("level", "cluster"));
+        final var iteratorClusterLevel = indicesStatsResponse.toXContentChunked(paramsClusterLevel);
+        int chunksSeenClusterLevel = 0;
+        final XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), Streams.NULL_OUTPUT_STREAM);
+        while (iteratorClusterLevel.hasNext()) {
+            iteratorClusterLevel.next().toXContent(builder, paramsClusterLevel);
+            chunksSeenClusterLevel++;
+        }
+        assertEquals(3, chunksSeenClusterLevel);
+
+        final ToXContent.Params paramsIndexLevel = new ToXContent.MapParams(Map.of("level", "indices"));
+        final var iteratorIndexLevel = indicesStatsResponse.toXContentChunked(paramsIndexLevel);
+        int chunksSeenIndexLevel = 0;
+        while (iteratorIndexLevel.hasNext()) {
+            iteratorIndexLevel.next().toXContent(builder, paramsIndexLevel);
+            chunksSeenIndexLevel++;
+        }
+        assertEquals(4 + shards, chunksSeenIndexLevel);
+    }
+
+    private ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) {
         return TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(4), isPrimary, ShardRoutingState.STARTED);
     }