Browse Source

backport http content copy removal #117303 (#117599)

Mikhail Berezovskiy 10 months ago
parent
commit
b582082786
32 changed files with 140 additions and 146 deletions
  1. 0 2
      build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt
  2. 5 0
      docs/changelog/117303.yaml
  3. 1 1
      modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4TrashingAllocatorIT.java
  4. 4 3
      qa/system-indices/src/main/java/org/elasticsearch/system/indices/SystemIndicesQA.java
  5. 8 0
      server/src/main/java/org/elasticsearch/action/ActionListener.java
  6. 0 23
      server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java
  7. 1 1
      server/src/main/java/org/elasticsearch/http/HttpTracer.java
  8. 10 33
      server/src/main/java/org/elasticsearch/rest/RestRequest.java
  9. 2 2
      server/src/main/java/org/elasticsearch/rest/RestRequestFilter.java
  10. 6 1
      server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPutStoredScriptAction.java
  11. 1 1
      server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
  12. 1 1
      server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java
  13. 8 4
      server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java
  14. 2 1
      server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java
  15. 8 4
      server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java
  16. 3 3
      server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java
  17. 0 5
      server/src/test/java/org/elasticsearch/common/bytes/BytesArrayTests.java
  18. 2 2
      server/src/test/java/org/elasticsearch/rest/RestRequestTests.java
  19. 1 1
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearchBaseRestHandler.java
  20. 20 22
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/action/RestPostAnalyticsEventAction.java
  21. 1 1
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/rules/action/RestPutQueryRuleAction.java
  22. 1 1
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/rules/action/RestPutQueryRulesetAction.java
  23. 7 6
      x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestPutInferenceModelAction.java
  24. 8 2
      x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestUpdateInferenceModelAction.java
  25. 1 1
      x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestPutPipelineAction.java
  26. 8 2
      x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java
  27. 4 2
      x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkAction.java
  28. 1 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/audit/AuditUtil.java
  29. 1 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/SecurityBaseRestHandler.java
  30. 2 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/user/RestHasPrivilegesAction.java
  31. 8 8
      x-pack/plugin/text-structure/src/main/java/org/elasticsearch/xpack/textstructure/rest/RestFindStructureAction.java
  32. 15 9
      x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java

+ 0 - 2
build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt

@@ -171,5 +171,3 @@ org.elasticsearch.cluster.SnapshotDeletionsInProgress$Entry#<init>(java.lang.Str
 @defaultMessage Use a Thread constructor with a name, anonymous threads are more difficult to debug
 java.lang.Thread#<init>(java.lang.Runnable)
 java.lang.Thread#<init>(java.lang.ThreadGroup, java.lang.Runnable)
-
-org.elasticsearch.common.bytes.BytesReference#copyBytes(org.elasticsearch.common.bytes.BytesReference) @ This method is a subject for removal. Copying bytes is prone to performance regressions and unnecessary allocations.

+ 5 - 0
docs/changelog/117303.yaml

@@ -0,0 +1,5 @@
+pr: 117303
+summary: Remove HTTP content copies
+area: Network
+type: enhancement
+issues: []

+ 1 - 1
modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4TrashingAllocatorIT.java

@@ -89,7 +89,7 @@ public class Netty4TrashingAllocatorIT extends ESNetty4IntegTestCase {
 
                 @Override
                 protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
-                    var content = request.releasableContent();
+                    var content = request.content();
                     var iter = content.iterator();
                     return (chan) -> {
                         request.getHttpRequest().release();

+ 4 - 3
qa/system-indices/src/main/java/org/elasticsearch/system/indices/SystemIndicesQA.java

@@ -10,6 +10,7 @@
 
 package org.elasticsearch.system.indices;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.client.internal.node.NodeClient;
@@ -179,12 +180,12 @@ public class SystemIndicesQA extends Plugin implements SystemIndexPlugin, Action
 
         @Override
         protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+            var content = request.requiredContent();
             IndexRequest indexRequest = new IndexRequest(".net-new-system-index-primary");
-            indexRequest.source(request.requiredContent(), request.getXContentType());
+            indexRequest.source(content, request.getXContentType());
             indexRequest.id(request.param("id"));
             indexRequest.setRefreshPolicy(request.param("refresh"));
-
-            return channel -> client.index(indexRequest, new RestToXContentListener<>(channel));
+            return channel -> client.index(indexRequest, ActionListener.withRef(new RestToXContentListener<>(channel), content));
         }
 
         @Override

+ 8 - 0
server/src/main/java/org/elasticsearch/action/ActionListener.java

@@ -475,4 +475,12 @@ public interface ActionListener<Response> {
         ActionListener.run(ActionListener.runBefore(listener, resource::close), l -> action.accept(l, resource));
     }
 
+    /**
+     * Increments ref count and returns a listener that will decrement ref count on listener completion.
+     */
+    static <Response> ActionListener<Response> withRef(ActionListener<Response> listener, RefCounted ref) {
+        ref.mustIncRef();
+        return releaseAfter(listener, ref::decRef);
+    }
+
 }

+ 0 - 23
server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java

@@ -74,29 +74,6 @@ public interface BytesReference extends Comparable<BytesReference>, ToXContentFr
         }
     }
 
-    /**
-     * Allocates new buffer and copy bytes from given BytesReference.
-     *
-     * @deprecated copying bytes is a right place for performance regression and unnecessary allocations.
-     * This method exists to serve very few places that struggle to handle reference counted buffers.
-     */
-    @Deprecated(forRemoval = true)
-    static BytesReference copyBytes(BytesReference bytesReference) {
-        byte[] arr = new byte[bytesReference.length()];
-        int offset = 0;
-        final BytesRefIterator iterator = bytesReference.iterator();
-        try {
-            BytesRef slice;
-            while ((slice = iterator.next()) != null) {
-                System.arraycopy(slice.bytes, slice.offset, arr, offset, slice.length);
-                offset += slice.length;
-            }
-            return new BytesArray(arr);
-        } catch (IOException e) {
-            throw new AssertionError(e);
-        }
-    }
-
     /**
      * Returns BytesReference composed of the provided ByteBuffers.
      */

+ 1 - 1
server/src/main/java/org/elasticsearch/http/HttpTracer.java

@@ -94,7 +94,7 @@ class HttpTracer {
 
     private void logFullContent(RestRequest restRequest) {
         try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
-            restRequest.releasableContent().writeTo(stream);
+            restRequest.content().writeTo(stream);
         } catch (Exception e2) {
             assert false : e2; // no real IO here
         }

+ 10 - 33
server/src/main/java/org/elasticsearch/rest/RestRequest.java

@@ -23,7 +23,6 @@ import org.elasticsearch.core.Booleans;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.RestApiVersion;
-import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.http.HttpBody;
@@ -303,22 +302,13 @@ public class RestRequest implements ToXContent.Params, Traceable {
         return httpRequest.body().isFull();
     }
 
-    /**
-     * Returns a copy of HTTP content. The copy is GC-managed and does not require reference counting.
-     * Please use {@link #releasableContent()} to avoid content copy.
-     */
-    @SuppressForbidden(reason = "temporarily support content copy while migrating RestHandlers to ref counted pooled buffers")
-    public BytesReference content() {
-        return BytesReference.copyBytes(releasableContent());
-    }
-
     /**
      * Returns a direct reference to the network buffer containing the request body. The HTTP layers will release their references to this
      * buffer as soon as they have finished the synchronous steps of processing the request on the network thread, which will by default
      * release the buffer back to the pool where it may be re-used for another request. If you need to keep the buffer alive past the end of
      * these synchronous steps, acquire your own reference to this buffer and release it once it's no longer needed.
      */
-    public ReleasableBytesReference releasableContent() {
+    public ReleasableBytesReference content() {
         this.contentConsumed = true;
         var bytes = httpRequest.body().asFull().bytes();
         if (bytes.hasReferences() == false) {
@@ -338,32 +328,19 @@ public class RestRequest implements ToXContent.Params, Traceable {
         return httpRequest.body().asStream();
     }
 
-    private void ensureContent() {
+    /**
+     * Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
+     * See {@link #content()}.
+     */
+    public ReleasableBytesReference requiredContent() {
         if (hasContent() == false) {
             throw new ElasticsearchParseException("request body is required");
         } else if (xContentType.get() == null) {
             throwValidationException("unknown content type");
         }
-    }
-
-    /**
-     * @return copy of the request body or throw an exception if the body or content type is missing.
-     * See {@link #content()}. Please use {@link #requiredReleasableContent()} to avoid content copy.
-     */
-    public final BytesReference requiredContent() {
-        ensureContent();
         return content();
     }
 
-    /**
-     * Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
-     * See {@link #releasableContent()}. It's a recommended method to handle HTTP content without copying it.
-     */
-    public ReleasableBytesReference requiredReleasableContent() {
-        ensureContent();
-        return releasableContent();
-    }
-
     private static void throwValidationException(String msg) {
         ValidationException unknownContentType = new ValidationException();
         unknownContentType.addValidationError(msg);
@@ -596,7 +573,7 @@ public class RestRequest implements ToXContent.Params, Traceable {
      * if you need to handle the absence request content gracefully.
      */
     public final XContentParser contentOrSourceParamParser() throws IOException {
-        Tuple<XContentType, BytesReference> tuple = contentOrSourceParam();
+        Tuple<XContentType, ReleasableBytesReference> tuple = contentOrSourceParam();
         return XContentHelper.createParserNotCompressed(parserConfig, tuple.v2(), tuple.v1().xContent().type());
     }
 
@@ -607,7 +584,7 @@ public class RestRequest implements ToXContent.Params, Traceable {
      */
     public final void withContentOrSourceParamParserOrNull(CheckedConsumer<XContentParser, IOException> withParser) throws IOException {
         if (hasContentOrSourceParam()) {
-            Tuple<XContentType, BytesReference> tuple = contentOrSourceParam();
+            Tuple<XContentType, ReleasableBytesReference> tuple = contentOrSourceParam();
             try (XContentParser parser = XContentHelper.createParserNotCompressed(parserConfig, tuple.v2(), tuple.v1())) {
                 withParser.accept(parser);
             }
@@ -620,7 +597,7 @@ public class RestRequest implements ToXContent.Params, Traceable {
      * Get the content of the request or the contents of the {@code source} param or throw an exception if both are missing.
      * Prefer {@link #contentOrSourceParamParser()} or {@link #withContentOrSourceParamParserOrNull(CheckedConsumer)} if you need a parser.
      */
-    public final Tuple<XContentType, BytesReference> contentOrSourceParam() {
+    public final Tuple<XContentType, ReleasableBytesReference> contentOrSourceParam() {
         if (hasContentOrSourceParam() == false) {
             throw new ElasticsearchParseException("request body or source parameter is required");
         } else if (hasContent()) {
@@ -636,7 +613,7 @@ public class RestRequest implements ToXContent.Params, Traceable {
         if (xContentType == null) {
             throwValidationException("Unknown value for source_content_type [" + typeParam + "]");
         }
-        return new Tuple<>(xContentType, bytes);
+        return new Tuple<>(xContentType, ReleasableBytesReference.wrap(bytes));
     }
 
     public ParsedMediaType getParsedAccept() {

+ 2 - 2
server/src/main/java/org/elasticsearch/rest/RestRequestFilter.java

@@ -45,10 +45,10 @@ public interface RestRequestFilter {
                 }
 
                 @Override
-                public ReleasableBytesReference releasableContent() {
+                public ReleasableBytesReference content() {
                     if (filteredBytes == null) {
                         Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(
-                            restRequest.requiredReleasableContent(),
+                            restRequest.requiredContent(),
                             true,
                             restRequest.getXContentType()
                         );

+ 6 - 1
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPutStoredScriptAction.java

@@ -8,6 +8,7 @@
  */
 package org.elasticsearch.rest.action.admin.cluster;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.cluster.storedscripts.PutStoredScriptRequest;
 import org.elasticsearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction;
 import org.elasticsearch.client.internal.node.NodeClient;
@@ -57,6 +58,10 @@ public class RestPutStoredScriptAction extends BaseRestHandler {
             request.getXContentType(),
             StoredScriptSource.parse(content, xContentType)
         );
-        return channel -> client.execute(TransportPutStoredScriptAction.TYPE, putRequest, new RestToXContentListener<>(channel));
+        return channel -> client.execute(
+            TransportPutStoredScriptAction.TYPE,
+            putRequest,
+            ActionListener.withRef(new RestToXContentListener<>(channel), content)
+        );
     }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

@@ -109,7 +109,7 @@ public class RestBulkAction extends BaseRestHandler {
             boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
             bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
             bulkRequest.setRefreshPolicy(request.param("refresh"));
-            ReleasableBytesReference content = request.requiredReleasableContent();
+            ReleasableBytesReference content = request.requiredContent();
 
             try {
                 bulkRequest.add(

+ 1 - 1
server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java

@@ -124,7 +124,7 @@ public class RestIndexAction extends BaseRestHandler {
             request.param("type"); // consume and ignore the type
         }
 
-        ReleasableBytesReference source = request.requiredReleasableContent();
+        ReleasableBytesReference source = request.requiredContent();
         IndexRequest indexRequest = new IndexRequest(request.param("index"));
         indexRequest.id(request.param("id"));
         indexRequest.routing(request.param("routing"));

+ 8 - 4
server/src/main/java/org/elasticsearch/rest/action/ingest/RestPutPipelineAction.java

@@ -9,9 +9,10 @@
 
 package org.elasticsearch.rest.action.ingest;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ingest.PutPipelineRequest;
 import org.elasticsearch.client.internal.node.NodeClient;
-import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
@@ -55,10 +56,13 @@ public class RestPutPipelineAction extends BaseRestHandler {
             }
         }
 
-        Tuple<XContentType, BytesReference> sourceTuple = restRequest.contentOrSourceParam();
-        PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), sourceTuple.v2(), sourceTuple.v1(), ifVersion);
+        Tuple<XContentType, ReleasableBytesReference> sourceTuple = restRequest.contentOrSourceParam();
+        var content = sourceTuple.v2();
+        PutPipelineRequest request = new PutPipelineRequest(restRequest.param("id"), content, sourceTuple.v1(), ifVersion);
         request.masterNodeTimeout(getMasterNodeTimeout(restRequest));
         request.ackTimeout(getAckTimeout(restRequest));
-        return channel -> client.admin().cluster().putPipeline(request, new RestToXContentListener<>(channel));
+        return channel -> client.admin()
+            .cluster()
+            .putPipeline(request, ActionListener.withRef(new RestToXContentListener<>(channel), content));
     }
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulateIngestAction.java

@@ -16,6 +16,7 @@ import org.elasticsearch.action.bulk.SimulateBulkAction;
 import org.elasticsearch.action.bulk.SimulateBulkRequest;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.ingest.ConfigurationUtils;
@@ -72,7 +73,7 @@ public class RestSimulateIngestAction extends BaseRestHandler {
         String defaultIndex = request.param("index");
         FetchSourceContext defaultFetchSourceContext = FetchSourceContext.parseFromRestRequest(request);
         String defaultPipeline = request.param("pipeline");
-        Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
+        Tuple<XContentType, ReleasableBytesReference> sourceTuple = request.contentOrSourceParam();
         Map<String, Object> sourceMap = XContentHelper.convertToMap(sourceTuple.v2(), false, sourceTuple.v1()).v2();
         Map<String, Map<String, Object>> pipelineSubstitutions = (Map<String, Map<String, Object>>) sourceMap.remove(
             "pipeline_substitutions"

+ 8 - 4
server/src/main/java/org/elasticsearch/rest/action/ingest/RestSimulatePipelineAction.java

@@ -9,9 +9,10 @@
 
 package org.elasticsearch.rest.action.ingest;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.ingest.SimulatePipelineRequest;
 import org.elasticsearch.client.internal.node.NodeClient;
-import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.rest.BaseRestHandler;
 import org.elasticsearch.rest.RestRequest;
@@ -46,10 +47,13 @@ public class RestSimulatePipelineAction extends BaseRestHandler {
 
     @Override
     public RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
-        Tuple<XContentType, BytesReference> sourceTuple = restRequest.contentOrSourceParam();
-        SimulatePipelineRequest request = new SimulatePipelineRequest(sourceTuple.v2(), sourceTuple.v1());
+        Tuple<XContentType, ReleasableBytesReference> sourceTuple = restRequest.contentOrSourceParam();
+        var content = sourceTuple.v2();
+        SimulatePipelineRequest request = new SimulatePipelineRequest(sourceTuple.v2(), sourceTuple.v1(), restRequest.getRestApiVersion());
         request.setId(restRequest.param("id"));
         request.setVerbose(restRequest.paramAsBoolean("verbose", false));
-        return channel -> client.admin().cluster().simulatePipeline(request, new RestToXContentListener<>(channel));
+        return channel -> client.admin()
+            .cluster()
+            .simulatePipeline(request, ActionListener.withRef(new RestToXContentListener<>(channel), content));
     }
 }

+ 3 - 3
server/src/main/java/org/elasticsearch/rest/action/search/RestMultiSearchAction.java

@@ -17,7 +17,7 @@ import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.CheckedBiConsumer;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.TriFunction;
-import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.core.Tuple;
@@ -191,9 +191,9 @@ public class RestMultiSearchAction extends BaseRestHandler {
         boolean ccsMinimizeRoundtrips = request.paramAsBoolean("ccs_minimize_roundtrips", true);
         String routing = request.param("routing");
 
-        final Tuple<XContentType, BytesReference> sourceTuple = request.contentOrSourceParam();
+        final Tuple<XContentType, ReleasableBytesReference> sourceTuple = request.contentOrSourceParam();
         final XContent xContent = sourceTuple.v1().xContent();
-        final BytesReference data = sourceTuple.v2();
+        final ReleasableBytesReference data = sourceTuple.v2();
         MultiSearchRequest.readMultiLineFormat(
             xContent,
             request.contentParserConfig(),

+ 0 - 5
server/src/test/java/org/elasticsearch/common/bytes/BytesArrayTests.java

@@ -108,9 +108,4 @@ public class BytesArrayTests extends AbstractBytesReferenceTestCase {
         assertThat(e.getMessage(), equalTo("Index 9 out of bounds for length 9"));
     }
 
-    public void testCopyBytes() {
-        var data = randomByteArrayOfLength(between(1024, 1024 * 1024 * 50));
-        var copy = BytesReference.copyBytes(new BytesArray(data));
-        assertArrayEquals(data, BytesReference.toBytes(copy));
-    }
 }

+ 2 - 2
server/src/test/java/org/elasticsearch/rest/RestRequestTests.java

@@ -12,7 +12,7 @@ package org.elasticsearch.rest;
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.http.HttpBody;
 import org.elasticsearch.http.HttpChannel;
@@ -321,7 +321,7 @@ public class RestRequestTests extends ESTestCase {
         }
 
         @Override
-        public BytesReference content() {
+        public ReleasableBytesReference content() {
             return restRequest.content();
         }
     }

+ 1 - 1
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearchBaseRestHandler.java

@@ -32,7 +32,7 @@ public abstract class EnterpriseSearchBaseRestHandler extends BaseRestHandler {
             // We need to consume parameters and content from the REST request in order to bypass unrecognized param errors
             // and return a license error.
             request.params().keySet().forEach(key -> request.param(key, ""));
-            request.releasableContent();
+            request.content();
             return channel -> channel.sendResponse(
                 new RestResponse(channel, LicenseUtils.newComplianceException(this.licenseState, this.product))
             );

+ 20 - 22
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/analytics/action/RestPostAnalyticsEventAction.java

@@ -7,8 +7,9 @@
 
 package org.elasticsearch.xpack.application.analytics.action;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.internal.node.NodeClient;
-import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.network.InetAddresses;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.license.XPackLicenseState;
@@ -48,11 +49,26 @@ public class RestPostAnalyticsEventAction extends EnterpriseSearchBaseRestHandle
 
     @Override
     protected RestChannelConsumer innerPrepareRequest(RestRequest restRequest, NodeClient client) {
-        PostAnalyticsEventAction.Request request = buidRequest(restRequest);
+        Tuple<XContentType, ReleasableBytesReference> sourceTuple = restRequest.contentOrSourceParam();
+
+        var content = sourceTuple.v2();
+        PostAnalyticsEventAction.RequestBuilder builder = PostAnalyticsEventAction.Request.builder(
+            restRequest.param("collection_name"),
+            restRequest.param("event_type"),
+            sourceTuple.v1(),
+            content
+        );
+
+        builder.debug(restRequest.paramAsBoolean("debug", false));
+
+        final Map<String, List<String>> headers = restRequest.getHeaders();
+        builder.headers(headers);
+        builder.clientAddress(getClientAddress(restRequest, headers));
+
         return channel -> client.execute(
             PostAnalyticsEventAction.INSTANCE,
-            request,
-            new RestToXContentListener<>(channel, r -> RestStatus.ACCEPTED)
+            builder.request(),
+            ActionListener.withRef(new RestToXContentListener<>(channel, r -> RestStatus.ACCEPTED), content)
         );
     }
 
@@ -71,22 +87,4 @@ public class RestPostAnalyticsEventAction extends EnterpriseSearchBaseRestHandle
         return remoteAddress;
     }
 
-    private static PostAnalyticsEventAction.Request buidRequest(RestRequest restRequest) {
-        Tuple<XContentType, BytesReference> sourceTuple = restRequest.contentOrSourceParam();
-
-        PostAnalyticsEventAction.RequestBuilder builder = PostAnalyticsEventAction.Request.builder(
-            restRequest.param("collection_name"),
-            restRequest.param("event_type"),
-            sourceTuple.v1(),
-            sourceTuple.v2()
-        );
-
-        builder.debug(restRequest.paramAsBoolean("debug", false));
-
-        final Map<String, List<String>> headers = restRequest.getHeaders();
-        builder.headers(headers);
-        builder.clientAddress(getClientAddress(restRequest, headers));
-
-        return builder.request();
-    }
 }

+ 1 - 1
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/rules/action/RestPutQueryRuleAction.java

@@ -43,7 +43,7 @@ public class RestPutQueryRuleAction extends EnterpriseSearchBaseRestHandler {
         PutQueryRuleAction.Request request = new PutQueryRuleAction.Request(
             restRequest.param("ruleset_id"),
             restRequest.param("rule_id"),
-            restRequest.content(),
+            restRequest.requiredContent(),
             restRequest.getXContentType()
         );
         return channel -> client.execute(

+ 1 - 1
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/rules/action/RestPutQueryRulesetAction.java

@@ -42,7 +42,7 @@ public class RestPutQueryRulesetAction extends EnterpriseSearchBaseRestHandler {
     protected RestChannelConsumer innerPrepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
         PutQueryRulesetAction.Request request = new PutQueryRulesetAction.Request(
             restRequest.param("ruleset_id"),
-            restRequest.content(),
+            restRequest.requiredContent(),
             restRequest.getXContentType()
         );
         return channel -> client.execute(

+ 7 - 6
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestPutInferenceModelAction.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.inference.rest;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.inference.TaskType;
 import org.elasticsearch.rest.BaseRestHandler;
@@ -48,12 +49,12 @@ public class RestPutInferenceModelAction extends BaseRestHandler {
             taskType = TaskType.ANY; // task type must be defined in the body
         }
 
-        var request = new PutInferenceModelAction.Request(
-            taskType,
-            inferenceEntityId,
-            restRequest.requiredContent(),
-            restRequest.getXContentType()
+        var content = restRequest.requiredContent();
+        var request = new PutInferenceModelAction.Request(taskType, inferenceEntityId, content, restRequest.getXContentType());
+        return channel -> client.execute(
+            PutInferenceModelAction.INSTANCE,
+            request,
+            ActionListener.withRef(new RestToXContentListener<>(channel), content)
         );
-        return channel -> client.execute(PutInferenceModelAction.INSTANCE, request, new RestToXContentListener<>(channel));
     }
 }

+ 8 - 2
x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestUpdateInferenceModelAction.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.inference.rest;
 
 import org.elasticsearch.ElasticsearchStatusException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.inference.TaskType;
 import org.elasticsearch.rest.BaseRestHandler;
@@ -50,13 +51,18 @@ public class RestUpdateInferenceModelAction extends BaseRestHandler {
             throw new ElasticsearchStatusException("Inference ID must be provided in the path", RestStatus.BAD_REQUEST);
         }
 
+        var content = restRequest.requiredContent();
         var request = new UpdateInferenceModelAction.Request(
             inferenceEntityId,
-            restRequest.requiredContent(),
+            content,
             restRequest.getXContentType(),
             taskType,
             RestUtils.getMasterNodeTimeout(restRequest)
         );
-        return channel -> client.execute(UpdateInferenceModelAction.INSTANCE, request, new RestToXContentListener<>(channel));
+        return channel -> client.execute(
+            UpdateInferenceModelAction.INSTANCE,
+            request,
+            ActionListener.withRef(new RestToXContentListener<>(channel), content)
+        );
     }
 }

+ 1 - 1
x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestPutPipelineAction.java

@@ -49,7 +49,7 @@ public class RestPutPipelineAction extends BaseRestHandler {
         }
 
         return restChannel -> {
-            final String content = request.releasableContent().utf8ToString();
+            final String content = request.content().utf8ToString();
             client.execute(
                 PutPipelineAction.INSTANCE,
                 new PutPipelineRequest(id, content, request.getXContentType()),

+ 8 - 2
x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/rest/job/RestPostDataAction.java

@@ -6,6 +6,7 @@
  */
 package org.elasticsearch.xpack.ml.rest.job;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.rest.BaseRestHandler;
@@ -47,9 +48,14 @@ public class RestPostDataAction extends BaseRestHandler {
         PostDataAction.Request request = new PostDataAction.Request(restRequest.param(Job.ID.getPreferredName()));
         request.setResetStart(restRequest.param(PostDataAction.Request.RESET_START.getPreferredName(), DEFAULT_RESET_START));
         request.setResetEnd(restRequest.param(PostDataAction.Request.RESET_END.getPreferredName(), DEFAULT_RESET_END));
-        request.setContent(restRequest.content(), restRequest.getXContentType());
+        var content = restRequest.content();
+        request.setContent(content, restRequest.getXContentType());
 
-        return channel -> client.execute(PostDataAction.INSTANCE, request, new RestToXContentListener<>(channel, r -> RestStatus.ACCEPTED));
+        return channel -> client.execute(
+            PostDataAction.INSTANCE,
+            request,
+            ActionListener.withRef(new RestToXContentListener<>(channel, r -> RestStatus.ACCEPTED), content)
+        );
     }
 
     @Override

+ 4 - 2
x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/rest/action/RestMonitoringBulkAction.java

@@ -7,6 +7,7 @@
 package org.elasticsearch.xpack.monitoring.rest.action;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.core.RestApiVersion;
@@ -97,8 +98,9 @@ public class RestMonitoringBulkAction extends BaseRestHandler {
         final long intervalMillis = parseTimeValue(intervalAsString, INTERVAL).getMillis();
 
         final MonitoringBulkRequestBuilder requestBuilder = new MonitoringBulkRequestBuilder(client);
-        requestBuilder.add(system, request.content(), request.getXContentType(), timestamp, intervalMillis);
-        return channel -> requestBuilder.execute(getRestBuilderListener(channel));
+        var content = request.content();
+        requestBuilder.add(system, content, request.getXContentType(), timestamp, intervalMillis);
+        return channel -> requestBuilder.execute(ActionListener.withRef(getRestBuilderListener(channel), content));
     }
 
     @Override

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/audit/AuditUtil.java

@@ -27,7 +27,7 @@ public class AuditUtil {
 
     public static String restRequestContent(RestRequest request) {
         if (request.hasContent()) {
-            var content = request.releasableContent();
+            var content = request.content();
             try {
                 return XContentHelper.convertToJson(content, false, false, request.getXContentType());
             } catch (IOException ioe) {

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/SecurityBaseRestHandler.java

@@ -75,7 +75,7 @@ public abstract class SecurityBaseRestHandler extends BaseRestHandler {
             return innerPrepareRequest(request, client);
         } else {
             request.params().keySet().forEach(key -> request.param(key, ""));
-            request.releasableContent(); // mark content consumed
+            request.content(); // mark content consumed
             return channel -> channel.sendResponse(new RestResponse(channel, failedFeature));
         }
     }

+ 2 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/user/RestHasPrivilegesAction.java

@@ -8,7 +8,7 @@ package org.elasticsearch.xpack.security.rest.action.user;
 
 import org.elasticsearch.ElasticsearchSecurityException;
 import org.elasticsearch.client.internal.node.NodeClient;
-import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.core.Tuple;
@@ -86,7 +86,7 @@ public class RestHasPrivilegesAction extends SecurityBaseRestHandler {
          * Consume the body immediately. This ensures that if there is a body and we later reject the request (e.g., because security is not
          * enabled) that the REST infrastructure will not reject the request for not having consumed the body.
          */
-        final Tuple<XContentType, BytesReference> content = request.contentOrSourceParam();
+        final Tuple<XContentType, ReleasableBytesReference> content = request.contentOrSourceParam();
         final String username = getUsername(request);
         if (username == null) {
             return restChannel -> { throw new ElasticsearchSecurityException("there is no authenticated user"); };

+ 8 - 8
x-pack/plugin/text-structure/src/main/java/org/elasticsearch/xpack/textstructure/rest/RestFindStructureAction.java

@@ -6,7 +6,7 @@
  */
 package org.elasticsearch.xpack.textstructure.rest;
 
-import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.rest.BaseRestHandler;
@@ -43,14 +43,14 @@ public class RestFindStructureAction extends BaseRestHandler {
     protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) {
         FindStructureAction.Request request = new FindStructureAction.Request();
         RestFindStructureArgumentsParser.parse(restRequest, request);
+        var content = restRequest.requiredContent();
+        request.setSample(content);
 
-        if (restRequest.hasContent()) {
-            request.setSample(restRequest.content());
-        } else {
-            throw new ElasticsearchParseException("request body is required");
-        }
-
-        return channel -> client.execute(FindStructureAction.INSTANCE, request, new RestToXContentListener<>(channel));
+        return channel -> client.execute(
+            FindStructureAction.INSTANCE,
+            request,
+            ActionListener.withRef(new RestToXContentListener<>(channel), content)
+        );
     }
 
     @Override

+ 15 - 9
x-pack/plugin/watcher/src/main/java/org/elasticsearch/xpack/watcher/rest/action/RestPutWatchAction.java

@@ -7,6 +7,7 @@
 
 package org.elasticsearch.xpack.watcher.rest.action;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.core.RestApiVersion;
@@ -46,19 +47,24 @@ public class RestPutWatchAction extends BaseRestHandler implements RestRequestFi
 
     @Override
     protected RestChannelConsumer prepareRequest(final RestRequest request, NodeClient client) {
-        PutWatchRequest putWatchRequest = new PutWatchRequest(request.param("id"), request.content(), request.getXContentType());
+        var content = request.content();
+        PutWatchRequest putWatchRequest = new PutWatchRequest(request.param("id"), content, request.getXContentType());
         putWatchRequest.setVersion(request.paramAsLong("version", Versions.MATCH_ANY));
         putWatchRequest.setIfSeqNo(request.paramAsLong("if_seq_no", putWatchRequest.getIfSeqNo()));
         putWatchRequest.setIfPrimaryTerm(request.paramAsLong("if_primary_term", putWatchRequest.getIfPrimaryTerm()));
         putWatchRequest.setActive(request.paramAsBoolean("active", putWatchRequest.isActive()));
-        return channel -> client.execute(PutWatchAction.INSTANCE, putWatchRequest, new RestBuilderListener<>(channel) {
-            @Override
-            public RestResponse buildResponse(PutWatchResponse response, XContentBuilder builder) throws Exception {
-                response.toXContent(builder, request);
-                RestStatus status = response.isCreated() ? CREATED : OK;
-                return new RestResponse(status, builder);
-            }
-        });
+        return channel -> client.execute(
+            PutWatchAction.INSTANCE,
+            putWatchRequest,
+            ActionListener.withRef(new RestBuilderListener<>(channel) {
+                @Override
+                public RestResponse buildResponse(PutWatchResponse response, XContentBuilder builder) throws Exception {
+                    response.toXContent(builder, request);
+                    RestStatus status = response.isCreated() ? CREATED : OK;
+                    return new RestResponse(status, builder);
+                }
+            }, content)
+        );
     }
 
     private static final Set<String> FILTERED_FIELDS = Set.of(