Browse Source

Convert RestGetMappingAction to chunked encoding (#89906)

Straight forward conversion of this endpoint and a basic test for a large enough
response that chunks into a few pieces.
Armin Braun 3 years ago
parent
commit
0f1b08c408

+ 1 - 1
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -793,7 +793,7 @@ public class ActionModule extends AbstractModule {
         registerHandler.accept(new RestSimulateTemplateAction());
 
         registerHandler.accept(new RestPutMappingAction());
-        registerHandler.accept(new RestGetMappingAction(threadPool));
+        registerHandler.accept(new RestGetMappingAction());
         registerHandler.accept(new RestGetFieldMappingAction());
 
         registerHandler.accept(new RestRefreshAction());

+ 8 - 6
server/src/main/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponse.java

@@ -14,19 +14,21 @@ import org.elasticsearch.cluster.metadata.MappingMetadata;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ChunkedToXContent;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.index.mapper.MapperService;
 import org.elasticsearch.xcontent.ParseField;
+import org.elasticsearch.xcontent.ToXContent;
 import org.elasticsearch.xcontent.ToXContentFragment;
-import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.Map;
 
 import static org.elasticsearch.rest.BaseRestHandler.DEFAULT_INCLUDE_TYPE_NAME_POLICY;
 import static org.elasticsearch.rest.BaseRestHandler.INCLUDE_TYPE_NAME_PARAMETER;
 
-public class GetMappingsResponse extends ActionResponse implements ToXContentFragment {
+public class GetMappingsResponse extends ActionResponse implements ToXContentFragment, ChunkedToXContent {
 
     private static final ParseField MAPPINGS = new ParseField("mappings");
 
@@ -65,8 +67,8 @@ public class GetMappingsResponse extends ActionResponse implements ToXContentFra
     }
 
     @Override
-    public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-        for (final Map.Entry<String, MappingMetadata> indexEntry : getMappings().entrySet()) {
+    public Iterator<ToXContent> toXContentChunked() {
+        return getMappings().entrySet().stream().map(indexEntry -> (ToXContent) (builder, params) -> {
             builder.startObject(indexEntry.getKey());
             boolean includeTypeName = params.paramAsBoolean(INCLUDE_TYPE_NAME_PARAMETER, DEFAULT_INCLUDE_TYPE_NAME_POLICY);
             if (builder.getRestApiVersion() == RestApiVersion.V_7 && includeTypeName && indexEntry.getValue() != null) {
@@ -83,8 +85,8 @@ public class GetMappingsResponse extends ActionResponse implements ToXContentFra
                 builder.startObject(MAPPINGS.getPreferredName()).endObject();
             }
             builder.endObject();
-        }
-        return builder;
+            return builder;
+        }).iterator();
     }
 
     @Override

+ 26 - 43
server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestGetMappingAction.java

@@ -8,27 +8,26 @@
 
 package org.elasticsearch.rest.action.admin.indices;
 
-import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.Strings;
+import org.elasticsearch.common.collect.Iterators;
 import org.elasticsearch.common.logging.DeprecationLogger;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.http.HttpChannel;
 import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.ChunkedRestResponseBody;
 import org.elasticsearch.rest.RestRequest;
-import org.elasticsearch.rest.action.DispatchingRestToXContentListener;
+import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.rest.action.RestActionListener;
 import org.elasticsearch.rest.action.RestCancellableNodeClient;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.elasticsearch.xcontent.ToXContentObject;
-import org.elasticsearch.xcontent.XContentBuilder;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.function.LongSupplier;
 
 import static org.elasticsearch.rest.RestRequest.Method.GET;
 import static org.elasticsearch.rest.RestRequest.Method.HEAD;
@@ -40,11 +39,7 @@ public class RestGetMappingAction extends BaseRestHandler {
     public static final String TYPES_DEPRECATION_MESSAGE = "[types removal] Specifying types in get mapping request is deprecated. "
         + "Use typeless api instead";
 
-    private final ThreadPool threadPool;
-
-    public RestGetMappingAction(ThreadPool threadPool) {
-        this.threadPool = threadPool;
-    }
+    public RestGetMappingAction() {}
 
     @Override
     public List<Route> routes() {
@@ -97,37 +92,25 @@ public class RestGetMappingAction extends BaseRestHandler {
         final HttpChannel httpChannel = request.getHttpChannel();
         return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
             .indices()
-            .getMappings(
-                getMappingsRequest,
-                new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request).map(
-                    getMappingsResponse -> new RestGetMappingsResponse(getMappingsResponse, threadPool::relativeTimeInMillis, timeout)
-                )
-            );
-    }
-
-    private static final class RestGetMappingsResponse implements ToXContentObject {
-        private final GetMappingsResponse response;
-        private final LongSupplier relativeTimeSupplierMillis;
-        private final TimeValue timeout;
-        private final long startTimeMs;
-
-        private RestGetMappingsResponse(GetMappingsResponse response, LongSupplier relativeTimeSupplierMillis, TimeValue timeout) {
-            this.response = response;
-            this.relativeTimeSupplierMillis = relativeTimeSupplierMillis;
-            this.timeout = timeout;
-            this.startTimeMs = relativeTimeSupplierMillis.getAsLong();
-        }
-
-        @Override
-        public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
-            if (relativeTimeSupplierMillis.getAsLong() - startTimeMs > timeout.millis()) {
-                throw new ElasticsearchTimeoutException("Timed out getting mappings");
-            }
-
-            builder.startObject();
-            response.toXContent(builder, params);
-            builder.endObject();
-            return builder;
-        }
+            .getMappings(getMappingsRequest, new RestActionListener<>(channel) {
+                @Override
+                protected void processResponse(GetMappingsResponse getMappingsResponse) throws Exception {
+                    ensureOpen();
+                    channel.sendResponse(
+                        new RestResponse(
+                            RestStatus.OK,
+                            ChunkedRestResponseBody.fromXContent(
+                                () -> Iterators.concat(
+                                    Iterators.single((b, p) -> b.startObject()),
+                                    getMappingsResponse.toXContentChunked(),
+                                    Iterators.single((b, p) -> b.endObject())
+                                ),
+                                request,
+                                channel
+                            )
+                        )
+                    );
+                }
+            });
     }
 }

+ 19 - 0
server/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/GetMappingsResponseTests.java

@@ -18,6 +18,9 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 public class GetMappingsResponseTests extends AbstractWireSerializingTestCase<GetMappingsResponse> {
 
@@ -63,6 +66,22 @@ public class GetMappingsResponseTests extends AbstractWireSerializingTestCase<Ge
         return resp;
     }
 
+    public void testChunkedXContentUsesChunkPerIndex() {
+        final int indexCount = randomIntBetween(1, 10);
+        final var response = new GetMappingsResponse(
+            IntStream.range(0, indexCount)
+                .mapToObj(i -> "index-" + i)
+                .collect(Collectors.toUnmodifiableMap(Function.identity(), k -> createMappingsForIndex()))
+        );
+        final var chunks = response.toXContentChunked();
+        int chunkCount = 0;
+        while (chunks.hasNext()) {
+            chunks.next();
+            chunkCount++;
+        }
+        assertEquals(indexCount, chunkCount);
+    }
+
     // Not meant to be exhaustive
     private static Map<String, Object> randomFieldMapping() {
         Map<String, Object> mappings = new HashMap<>();