소스 검색

Backport explicit http content copy/retain #116115 (#117276)

* backport explicit http content copy/retain #116115

* spotless
Mikhail Berezovskiy 10 달 전
부모
커밋
7ab9097449
31개의 변경된 파일1505개의 추가작업 그리고 137개의 파일을 삭제
  1. 2 0
      build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt
  2. 5 0
      docs/changelog/116115.yaml
  3. 0 5
      modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java
  4. 122 0
      modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4TrashingAllocatorIT.java
  5. 0 34
      modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java
  6. 1 1
      modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java
  7. 111 2
      modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java
  8. 1036 0
      modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/WrappedByteBuf.java
  9. 106 0
      modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyAllocatorTests.java
  10. 23 0
      server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java
  11. 4 5
      server/src/main/java/org/elasticsearch/http/HttpBody.java
  12. 0 6
      server/src/main/java/org/elasticsearch/http/HttpRequest.java
  13. 1 1
      server/src/main/java/org/elasticsearch/http/HttpTracer.java
  14. 5 4
      server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java
  15. 0 5
      server/src/main/java/org/elasticsearch/rest/FilterRestHandler.java
  16. 0 4
      server/src/main/java/org/elasticsearch/rest/RestController.java
  17. 0 12
      server/src/main/java/org/elasticsearch/rest/RestHandler.java
  18. 47 14
      server/src/main/java/org/elasticsearch/rest/RestRequest.java
  19. 4 3
      server/src/main/java/org/elasticsearch/rest/RestRequestFilter.java
  20. 8 8
      server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java
  21. 14 5
      server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java
  22. 0 4
      server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java
  23. 6 0
      server/src/test/java/org/elasticsearch/common/bytes/BytesArrayTests.java
  24. 0 5
      server/src/test/java/org/elasticsearch/http/TestHttpRequest.java
  25. 0 5
      server/src/test/java/org/elasticsearch/rest/RestControllerTests.java
  26. 0 5
      test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java
  27. 1 1
      x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearchBaseRestHandler.java
  28. 1 1
      x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestPutPipelineAction.java
  29. 3 2
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/audit/AuditUtil.java
  30. 1 1
      x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/SecurityBaseRestHandler.java
  31. 4 4
      x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailTests.java

+ 2 - 0
build-tools-internal/src/main/resources/forbidden/es-server-signatures.txt

@@ -171,3 +171,5 @@ org.elasticsearch.cluster.SnapshotDeletionsInProgress$Entry#<init>(java.lang.Str
 @defaultMessage Use a Thread constructor with a name, anonymous threads are more difficult to debug
 java.lang.Thread#<init>(java.lang.Runnable)
 java.lang.Thread#<init>(java.lang.ThreadGroup, java.lang.Runnable)
+
+org.elasticsearch.common.bytes.BytesReference#copyBytes(org.elasticsearch.common.bytes.BytesReference) @ This method is a subject for removal. Copying bytes is prone to performance regressions and unnecessary allocations.

+ 5 - 0
docs/changelog/116115.yaml

@@ -0,0 +1,5 @@
+pr: 116115
+summary: Allow http unsafe buffers by default
+area: Network
+type: enhancement
+issues: []

+ 0 - 5
modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4IncrementalRequestHandlingIT.java

@@ -699,11 +699,6 @@ public class Netty4IncrementalRequestHandlingIT extends ESNetty4IntegTestCase {
             Predicate<NodeFeature> clusterSupportsFeature
         ) {
             return List.of(new BaseRestHandler() {
-                @Override
-                public boolean allowsUnsafeBuffers() {
-                    return true;
-                }
-
                 @Override
                 public String getName() {
                     return ROUTE;

+ 122 - 0
modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4TrashingAllocatorIT.java

@@ -0,0 +1,122 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.http.netty4;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.ESNetty4IntegTestCase;
+import org.elasticsearch.client.internal.node.NodeClient;
+import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
+import org.elasticsearch.common.settings.ClusterSettings;
+import org.elasticsearch.common.settings.IndexScopedSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.settings.SettingsFilter;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.features.NodeFeature;
+import org.elasticsearch.http.HttpServerTransport;
+import org.elasticsearch.plugins.ActionPlugin;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.rest.BaseRestHandler;
+import org.elasticsearch.rest.RestController;
+import org.elasticsearch.rest.RestHandler;
+import org.elasticsearch.rest.RestRequest;
+import org.elasticsearch.rest.RestResponse;
+import org.elasticsearch.rest.RestStatus;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+
+public class Netty4TrashingAllocatorIT extends ESNetty4IntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return CollectionUtils.concatLists(List.of(Handler.class), super.nodePlugins());
+    }
+
+    @Override
+    protected boolean addMockHttpTransport() {
+        return false;
+    }
+
+    public void testTrashContent() throws InterruptedException {
+        try (var client = new Netty4HttpClient()) {
+            var addr = randomFrom(internalCluster().getInstance(HttpServerTransport.class).boundAddress().boundAddresses()).address();
+            var content = randomAlphaOfLength(between(1024, 2048));
+            var responses = client.post(addr, List.of(new Tuple<>(Handler.ROUTE, content)));
+            assertEquals(HttpResponseStatus.OK, responses.stream().findFirst().get().status());
+        }
+    }
+
+    public static class Handler extends Plugin implements ActionPlugin {
+        static final String ROUTE = "/_test/trashing-alloc";
+
+        @Override
+        public Collection<RestHandler> getRestHandlers(
+            Settings settings,
+            NamedWriteableRegistry namedWriteableRegistry,
+            RestController restController,
+            ClusterSettings clusterSettings,
+            IndexScopedSettings indexScopedSettings,
+            SettingsFilter settingsFilter,
+            IndexNameExpressionResolver indexNameExpressionResolver,
+            Supplier<DiscoveryNodes> nodesInCluster,
+            Predicate<NodeFeature> clusterSupportsFeature
+        ) {
+            return List.of(new BaseRestHandler() {
+                @Override
+                public String getName() {
+                    return ROUTE;
+                }
+
+                @Override
+                public List<Route> routes() {
+                    return List.of(new Route(RestRequest.Method.POST, ROUTE));
+                }
+
+                @Override
+                protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
+                    var content = request.releasableContent();
+                    var iter = content.iterator();
+                    return (chan) -> {
+                        request.getHttpRequest().release();
+                        assertFalse(content.hasReferences());
+                        BytesRef br;
+                        while ((br = iter.next()) != null) {
+                            for (int i = br.offset; i < br.offset + br.length; i++) {
+                                if (br.bytes[i] != 0) {
+                                    fail(
+                                        new AssertionError(
+                                            "buffer is not trashed, off="
+                                                + br.offset
+                                                + " len="
+                                                + br.length
+                                                + " pos="
+                                                + i
+                                                + " ind="
+                                                + (i - br.offset)
+                                        )
+                                    );
+                                }
+                            }
+                        }
+                        chan.sendResponse(new RestResponse(RestStatus.OK, ""));
+                    };
+                }
+            });
+        }
+    }
+}

+ 0 - 34
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpRequest.java

@@ -9,7 +9,6 @@
 
 package org.elasticsearch.http.netty4;
 
-import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.http.DefaultFullHttpRequest;
 import io.netty.handler.codec.http.EmptyHttpHeaders;
@@ -128,39 +127,6 @@ public class Netty4HttpRequest implements HttpRequest {
         }
     }
 
-    @Override
-    public HttpRequest releaseAndCopy() {
-        assert released.get() == false;
-        if (pooled == false) {
-            return this;
-        }
-        try {
-            final ByteBuf copiedContent = Unpooled.copiedBuffer(request.content());
-            HttpBody newContent;
-            if (content.isStream()) {
-                newContent = content;
-            } else {
-                newContent = Netty4Utils.fullHttpBodyFrom(copiedContent);
-            }
-            return new Netty4HttpRequest(
-                sequence,
-                new DefaultFullHttpRequest(
-                    request.protocolVersion(),
-                    request.method(),
-                    request.uri(),
-                    copiedContent,
-                    request.headers(),
-                    request.trailingHeaders()
-                ),
-                new AtomicBoolean(false),
-                false,
-                newContent
-            );
-        } finally {
-            release();
-        }
-    }
-
     @Override
     public final Map<String, List<String>> getHeaders() {
         return headers;

+ 1 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4Utils.java

@@ -179,7 +179,7 @@ public class Netty4Utils {
     }
 
     public static HttpBody.Full fullHttpBodyFrom(final ByteBuf buf) {
-        return new HttpBody.ByteRefHttpBody(toBytesReference(buf));
+        return new HttpBody.ByteRefHttpBody(toReleasableBytesReference(buf));
     }
 
     public static Recycler<BytesRef> createRecycler(Settings settings) {

+ 111 - 2
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java

@@ -24,9 +24,11 @@ import org.apache.lucene.util.BytesRef;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.PageCacheRecycler;
+import org.elasticsearch.core.Assertions;
 import org.elasticsearch.core.Booleans;
 import org.elasticsearch.monitor.jvm.JvmInfo;
 
+import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 public class NettyAllocator {
@@ -44,8 +46,9 @@ public class NettyAllocator {
     private static final String USE_NETTY_DEFAULT_CHUNK = "es.unsafe.use_netty_default_chunk_and_page_size";
 
     static {
+        ByteBufAllocator allocator;
         if (Booleans.parseBoolean(System.getProperty(USE_NETTY_DEFAULT), false)) {
-            ALLOCATOR = ByteBufAllocator.DEFAULT;
+            allocator = ByteBufAllocator.DEFAULT;
             SUGGESTED_MAX_ALLOCATION_SIZE = 1024 * 1024;
             DESCRIPTION = "[name=netty_default, suggested_max_allocation_size="
                 + ByteSizeValue.ofBytes(SUGGESTED_MAX_ALLOCATION_SIZE)
@@ -127,7 +130,12 @@ public class NettyAllocator {
                     + g1gcRegionSize
                     + "}]";
             }
-            ALLOCATOR = new NoDirectBuffers(delegate);
+            allocator = new NoDirectBuffers(delegate);
+        }
+        if (Assertions.ENABLED) {
+            ALLOCATOR = new TrashingByteBufAllocator(allocator);
+        } else {
+            ALLOCATOR = allocator;
         }
 
         RECYCLER = new Recycler<>() {
@@ -353,4 +361,105 @@ public class NettyAllocator {
             return delegate;
         }
     }
+
+    static class TrashingByteBuf extends WrappedByteBuf {
+
+        private boolean trashed = false;
+
+        protected TrashingByteBuf(ByteBuf buf) {
+            super(buf);
+        }
+
+        @Override
+        public boolean release() {
+            if (refCnt() == 1) {
+                // see [NOTE on racy trashContent() calls]
+                trashContent();
+            }
+            return super.release();
+        }
+
+        @Override
+        public boolean release(int decrement) {
+            if (refCnt() == decrement && refCnt() > 0) {
+                // see [NOTE on racy trashContent() calls]
+                trashContent();
+            }
+            return super.release(decrement);
+        }
+
+        // [NOTE on racy trashContent() calls]: We trash the buffer content _before_ reducing the ref
+        // count to zero, which looks racy because in principle a concurrent caller could come along
+        // and successfully retain() this buffer to keep it alive after it's been trashed. Such a
+        // caller would sometimes get an IllegalReferenceCountException ofc but that's something it
+        // could handle - see for instance org.elasticsearch.transport.netty4.Netty4Utils.ByteBufRefCounted.tryIncRef.
+        // Yet in practice this should never happen, we only ever retain() these buffers while we
+        // know them to be alive (i.e. via RefCounted#mustIncRef or its moral equivalents) so it'd
+        // be a bug for a caller to retain() a buffer whose ref count is heading to zero and whose
+        // contents we've already decided to trash.
+        private void trashContent() {
+            if (trashed == false) {
+                trashed = true;
+                TrashingByteBufAllocator.trashBuffer(buf);
+            }
+        }
+    }
+
+    static class TrashingCompositeByteBuf extends CompositeByteBuf {
+
+        TrashingCompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) {
+            super(alloc, direct, maxNumComponents);
+        }
+
+        @Override
+        protected void deallocate() {
+            TrashingByteBufAllocator.trashBuffer(this);
+            super.deallocate();
+        }
+    }
+
+    static class TrashingByteBufAllocator extends NoDirectBuffers {
+
+        static int DEFAULT_MAX_COMPONENTS = 16;
+
+        static void trashBuffer(ByteBuf buf) {
+            for (var nioBuf : buf.nioBuffers()) {
+                if (nioBuf.hasArray()) {
+                    var from = nioBuf.arrayOffset() + nioBuf.position();
+                    var to = from + nioBuf.remaining();
+                    Arrays.fill(nioBuf.array(), from, to, (byte) 0);
+                }
+            }
+        }
+
+        TrashingByteBufAllocator(ByteBufAllocator delegate) {
+            super(delegate);
+        }
+
+        @Override
+        public ByteBuf heapBuffer() {
+            return new TrashingByteBuf(super.heapBuffer());
+        }
+
+        @Override
+        public ByteBuf heapBuffer(int initialCapacity) {
+            return new TrashingByteBuf(super.heapBuffer(initialCapacity));
+        }
+
+        @Override
+        public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
+            return new TrashingByteBuf(super.heapBuffer(initialCapacity, maxCapacity));
+        }
+
+        @Override
+        public CompositeByteBuf compositeHeapBuffer() {
+            return new TrashingCompositeByteBuf(this, false, DEFAULT_MAX_COMPONENTS);
+        }
+
+        @Override
+        public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) {
+            return new TrashingCompositeByteBuf(this, false, maxNumComponents);
+        }
+
+    }
 }

+ 1036 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/WrappedByteBuf.java

@@ -0,0 +1,1036 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.transport.netty4;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.util.ByteProcessor;
+import io.netty.util.internal.ObjectUtil;
+import io.netty.util.internal.StringUtil;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.channels.GatheringByteChannel;
+import java.nio.channels.ScatteringByteChannel;
+import java.nio.charset.Charset;
+
+/**
+ * A copy of Netty's WrappedByteBuf.
+ */
+class WrappedByteBuf extends ByteBuf {
+
+    protected final ByteBuf buf;
+
+    protected WrappedByteBuf(ByteBuf buf) {
+        this.buf = ObjectUtil.checkNotNull(buf, "buf");
+    }
+
+    @Override
+    public final boolean hasMemoryAddress() {
+        return buf.hasMemoryAddress();
+    }
+
+    @Override
+    public boolean isContiguous() {
+        return buf.isContiguous();
+    }
+
+    @Override
+    public final long memoryAddress() {
+        return buf.memoryAddress();
+    }
+
+    @Override
+    public final int capacity() {
+        return buf.capacity();
+    }
+
+    @Override
+    public ByteBuf capacity(int newCapacity) {
+        buf.capacity(newCapacity);
+        return this;
+    }
+
+    @Override
+    public final int maxCapacity() {
+        return buf.maxCapacity();
+    }
+
+    @Override
+    public final ByteBufAllocator alloc() {
+        return buf.alloc();
+    }
+
+    @Override
+    public final ByteOrder order() {
+        return buf.order();
+    }
+
+    @Override
+    public ByteBuf order(ByteOrder endianness) {
+        return buf.order(endianness);
+    }
+
+    @Override
+    public final ByteBuf unwrap() {
+        return buf;
+    }
+
+    @Override
+    public ByteBuf asReadOnly() {
+        return buf.asReadOnly();
+    }
+
+    @Override
+    public boolean isReadOnly() {
+        return buf.isReadOnly();
+    }
+
+    @Override
+    public final boolean isDirect() {
+        return buf.isDirect();
+    }
+
+    @Override
+    public final int readerIndex() {
+        return buf.readerIndex();
+    }
+
+    @Override
+    public final ByteBuf readerIndex(int readerIndex) {
+        buf.readerIndex(readerIndex);
+        return this;
+    }
+
+    @Override
+    public final int writerIndex() {
+        return buf.writerIndex();
+    }
+
+    @Override
+    public final ByteBuf writerIndex(int writerIndex) {
+        buf.writerIndex(writerIndex);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setIndex(int readerIndex, int writerIndex) {
+        buf.setIndex(readerIndex, writerIndex);
+        return this;
+    }
+
+    @Override
+    public final int readableBytes() {
+        return buf.readableBytes();
+    }
+
+    @Override
+    public final int writableBytes() {
+        return buf.writableBytes();
+    }
+
+    @Override
+    public final int maxWritableBytes() {
+        return buf.maxWritableBytes();
+    }
+
+    @Override
+    public int maxFastWritableBytes() {
+        return buf.maxFastWritableBytes();
+    }
+
+    @Override
+    public final boolean isReadable() {
+        return buf.isReadable();
+    }
+
+    @Override
+    public final boolean isWritable() {
+        return buf.isWritable();
+    }
+
+    @Override
+    public final ByteBuf clear() {
+        buf.clear();
+        return this;
+    }
+
+    @Override
+    public final ByteBuf markReaderIndex() {
+        buf.markReaderIndex();
+        return this;
+    }
+
+    @Override
+    public final ByteBuf resetReaderIndex() {
+        buf.resetReaderIndex();
+        return this;
+    }
+
+    @Override
+    public final ByteBuf markWriterIndex() {
+        buf.markWriterIndex();
+        return this;
+    }
+
+    @Override
+    public final ByteBuf resetWriterIndex() {
+        buf.resetWriterIndex();
+        return this;
+    }
+
+    @Override
+    public ByteBuf discardReadBytes() {
+        buf.discardReadBytes();
+        return this;
+    }
+
+    @Override
+    public ByteBuf discardSomeReadBytes() {
+        buf.discardSomeReadBytes();
+        return this;
+    }
+
+    @Override
+    public ByteBuf ensureWritable(int minWritableBytes) {
+        buf.ensureWritable(minWritableBytes);
+        return this;
+    }
+
+    @Override
+    public int ensureWritable(int minWritableBytes, boolean force) {
+        return buf.ensureWritable(minWritableBytes, force);
+    }
+
+    @Override
+    public boolean getBoolean(int index) {
+        return buf.getBoolean(index);
+    }
+
+    @Override
+    public byte getByte(int index) {
+        return buf.getByte(index);
+    }
+
+    @Override
+    public short getUnsignedByte(int index) {
+        return buf.getUnsignedByte(index);
+    }
+
+    @Override
+    public short getShort(int index) {
+        return buf.getShort(index);
+    }
+
+    @Override
+    public short getShortLE(int index) {
+        return buf.getShortLE(index);
+    }
+
+    @Override
+    public int getUnsignedShort(int index) {
+        return buf.getUnsignedShort(index);
+    }
+
+    @Override
+    public int getUnsignedShortLE(int index) {
+        return buf.getUnsignedShortLE(index);
+    }
+
+    @Override
+    public int getMedium(int index) {
+        return buf.getMedium(index);
+    }
+
+    @Override
+    public int getMediumLE(int index) {
+        return buf.getMediumLE(index);
+    }
+
+    @Override
+    public int getUnsignedMedium(int index) {
+        return buf.getUnsignedMedium(index);
+    }
+
+    @Override
+    public int getUnsignedMediumLE(int index) {
+        return buf.getUnsignedMediumLE(index);
+    }
+
+    @Override
+    public int getInt(int index) {
+        return buf.getInt(index);
+    }
+
+    @Override
+    public int getIntLE(int index) {
+        return buf.getIntLE(index);
+    }
+
+    @Override
+    public long getUnsignedInt(int index) {
+        return buf.getUnsignedInt(index);
+    }
+
+    @Override
+    public long getUnsignedIntLE(int index) {
+        return buf.getUnsignedIntLE(index);
+    }
+
+    @Override
+    public long getLong(int index) {
+        return buf.getLong(index);
+    }
+
+    @Override
+    public long getLongLE(int index) {
+        return buf.getLongLE(index);
+    }
+
+    @Override
+    public char getChar(int index) {
+        return buf.getChar(index);
+    }
+
+    @Override
+    public float getFloat(int index) {
+        return buf.getFloat(index);
+    }
+
+    @Override
+    public double getDouble(int index) {
+        return buf.getDouble(index);
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuf dst) {
+        buf.getBytes(index, dst);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuf dst, int length) {
+        buf.getBytes(index, dst, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
+        buf.getBytes(index, dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, byte[] dst) {
+        buf.getBytes(index, dst);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
+        buf.getBytes(index, dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, ByteBuffer dst) {
+        buf.getBytes(index, dst);
+        return this;
+    }
+
+    @Override
+    public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
+        buf.getBytes(index, out, length);
+        return this;
+    }
+
+    @Override
+    public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
+        return buf.getBytes(index, out, length);
+    }
+
+    @Override
+    public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
+        return buf.getBytes(index, out, position, length);
+    }
+
+    @Override
+    public CharSequence getCharSequence(int index, int length, Charset charset) {
+        return buf.getCharSequence(index, length, charset);
+    }
+
+    @Override
+    public ByteBuf setBoolean(int index, boolean value) {
+        buf.setBoolean(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setByte(int index, int value) {
+        buf.setByte(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setShort(int index, int value) {
+        buf.setShort(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setShortLE(int index, int value) {
+        buf.setShortLE(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setMedium(int index, int value) {
+        buf.setMedium(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setMediumLE(int index, int value) {
+        buf.setMediumLE(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setInt(int index, int value) {
+        buf.setInt(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setIntLE(int index, int value) {
+        buf.setIntLE(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setLong(int index, long value) {
+        buf.setLong(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setLongLE(int index, long value) {
+        buf.setLongLE(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setChar(int index, int value) {
+        buf.setChar(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setFloat(int index, float value) {
+        buf.setFloat(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setDouble(int index, double value) {
+        buf.setDouble(index, value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuf src) {
+        buf.setBytes(index, src);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuf src, int length) {
+        buf.setBytes(index, src, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
+        buf.setBytes(index, src, srcIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, byte[] src) {
+        buf.setBytes(index, src);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
+        buf.setBytes(index, src, srcIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf setBytes(int index, ByteBuffer src) {
+        buf.setBytes(index, src);
+        return this;
+    }
+
+    @Override
+    public int setBytes(int index, InputStream in, int length) throws IOException {
+        return buf.setBytes(index, in, length);
+    }
+
+    @Override
+    public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
+        return buf.setBytes(index, in, length);
+    }
+
+    @Override
+    public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
+        return buf.setBytes(index, in, position, length);
+    }
+
+    @Override
+    public ByteBuf setZero(int index, int length) {
+        buf.setZero(index, length);
+        return this;
+    }
+
+    @Override
+    public int setCharSequence(int index, CharSequence sequence, Charset charset) {
+        return buf.setCharSequence(index, sequence, charset);
+    }
+
+    @Override
+    public boolean readBoolean() {
+        return buf.readBoolean();
+    }
+
+    @Override
+    public byte readByte() {
+        return buf.readByte();
+    }
+
+    @Override
+    public short readUnsignedByte() {
+        return buf.readUnsignedByte();
+    }
+
+    @Override
+    public short readShort() {
+        return buf.readShort();
+    }
+
+    @Override
+    public short readShortLE() {
+        return buf.readShortLE();
+    }
+
+    @Override
+    public int readUnsignedShort() {
+        return buf.readUnsignedShort();
+    }
+
+    @Override
+    public int readUnsignedShortLE() {
+        return buf.readUnsignedShortLE();
+    }
+
+    @Override
+    public int readMedium() {
+        return buf.readMedium();
+    }
+
+    @Override
+    public int readMediumLE() {
+        return buf.readMediumLE();
+    }
+
+    @Override
+    public int readUnsignedMedium() {
+        return buf.readUnsignedMedium();
+    }
+
+    @Override
+    public int readUnsignedMediumLE() {
+        return buf.readUnsignedMediumLE();
+    }
+
+    @Override
+    public int readInt() {
+        return buf.readInt();
+    }
+
+    @Override
+    public int readIntLE() {
+        return buf.readIntLE();
+    }
+
+    @Override
+    public long readUnsignedInt() {
+        return buf.readUnsignedInt();
+    }
+
+    @Override
+    public long readUnsignedIntLE() {
+        return buf.readUnsignedIntLE();
+    }
+
+    @Override
+    public long readLong() {
+        return buf.readLong();
+    }
+
+    @Override
+    public long readLongLE() {
+        return buf.readLongLE();
+    }
+
+    @Override
+    public char readChar() {
+        return buf.readChar();
+    }
+
+    @Override
+    public float readFloat() {
+        return buf.readFloat();
+    }
+
+    @Override
+    public double readDouble() {
+        return buf.readDouble();
+    }
+
+    @Override
+    public ByteBuf readBytes(int length) {
+        return buf.readBytes(length);
+    }
+
+    @Override
+    public ByteBuf readSlice(int length) {
+        return buf.readSlice(length);
+    }
+
+    @Override
+    public ByteBuf readRetainedSlice(int length) {
+        return buf.readRetainedSlice(length);
+    }
+
+    @Override
+    public ByteBuf readBytes(ByteBuf dst) {
+        buf.readBytes(dst);
+        return this;
+    }
+
+    @Override
+    public ByteBuf readBytes(ByteBuf dst, int length) {
+        buf.readBytes(dst, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
+        buf.readBytes(dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf readBytes(byte[] dst) {
+        buf.readBytes(dst);
+        return this;
+    }
+
+    @Override
+    public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
+        buf.readBytes(dst, dstIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf readBytes(ByteBuffer dst) {
+        buf.readBytes(dst);
+        return this;
+    }
+
+    @Override
+    public ByteBuf readBytes(OutputStream out, int length) throws IOException {
+        buf.readBytes(out, length);
+        return this;
+    }
+
+    @Override
+    public int readBytes(GatheringByteChannel out, int length) throws IOException {
+        return buf.readBytes(out, length);
+    }
+
+    @Override
+    public int readBytes(FileChannel out, long position, int length) throws IOException {
+        return buf.readBytes(out, position, length);
+    }
+
+    @Override
+    public CharSequence readCharSequence(int length, Charset charset) {
+        return buf.readCharSequence(length, charset);
+    }
+
+    @Override
+    public ByteBuf skipBytes(int length) {
+        buf.skipBytes(length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeBoolean(boolean value) {
+        buf.writeBoolean(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeByte(int value) {
+        buf.writeByte(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeShort(int value) {
+        buf.writeShort(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeShortLE(int value) {
+        buf.writeShortLE(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeMedium(int value) {
+        buf.writeMedium(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeMediumLE(int value) {
+        buf.writeMediumLE(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeInt(int value) {
+        buf.writeInt(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeIntLE(int value) {
+        buf.writeIntLE(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeLong(long value) {
+        buf.writeLong(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeLongLE(long value) {
+        buf.writeLongLE(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeChar(int value) {
+        buf.writeChar(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeFloat(float value) {
+        buf.writeFloat(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeDouble(double value) {
+        buf.writeDouble(value);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeBytes(ByteBuf src) {
+        buf.writeBytes(src);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeBytes(ByteBuf src, int length) {
+        buf.writeBytes(src, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeBytes(ByteBuf src, int srcIndex, int length) {
+        buf.writeBytes(src, srcIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeBytes(byte[] src) {
+        buf.writeBytes(src);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
+        buf.writeBytes(src, srcIndex, length);
+        return this;
+    }
+
+    @Override
+    public ByteBuf writeBytes(ByteBuffer src) {
+        buf.writeBytes(src);
+        return this;
+    }
+
+    @Override
+    public int writeBytes(InputStream in, int length) throws IOException {
+        return buf.writeBytes(in, length);
+    }
+
+    @Override
+    public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
+        return buf.writeBytes(in, length);
+    }
+
+    @Override
+    public int writeBytes(FileChannel in, long position, int length) throws IOException {
+        return buf.writeBytes(in, position, length);
+    }
+
+    @Override
+    public ByteBuf writeZero(int length) {
+        buf.writeZero(length);
+        return this;
+    }
+
+    @Override
+    public int writeCharSequence(CharSequence sequence, Charset charset) {
+        return buf.writeCharSequence(sequence, charset);
+    }
+
+    @Override
+    public int indexOf(int fromIndex, int toIndex, byte value) {
+        return buf.indexOf(fromIndex, toIndex, value);
+    }
+
+    @Override
+    public int bytesBefore(byte value) {
+        return buf.bytesBefore(value);
+    }
+
+    @Override
+    public int bytesBefore(int length, byte value) {
+        return buf.bytesBefore(length, value);
+    }
+
+    @Override
+    public int bytesBefore(int index, int length, byte value) {
+        return buf.bytesBefore(index, length, value);
+    }
+
+    @Override
+    public int forEachByte(ByteProcessor processor) {
+        return buf.forEachByte(processor);
+    }
+
+    @Override
+    public int forEachByte(int index, int length, ByteProcessor processor) {
+        return buf.forEachByte(index, length, processor);
+    }
+
+    @Override
+    public int forEachByteDesc(ByteProcessor processor) {
+        return buf.forEachByteDesc(processor);
+    }
+
+    @Override
+    public int forEachByteDesc(int index, int length, ByteProcessor processor) {
+        return buf.forEachByteDesc(index, length, processor);
+    }
+
+    @Override
+    public ByteBuf copy() {
+        return buf.copy();
+    }
+
+    @Override
+    public ByteBuf copy(int index, int length) {
+        return buf.copy(index, length);
+    }
+
+    @Override
+    public ByteBuf slice() {
+        return buf.slice();
+    }
+
+    @Override
+    public ByteBuf retainedSlice() {
+        return buf.retainedSlice();
+    }
+
+    @Override
+    public ByteBuf slice(int index, int length) {
+        return buf.slice(index, length);
+    }
+
+    @Override
+    public ByteBuf retainedSlice(int index, int length) {
+        return buf.retainedSlice(index, length);
+    }
+
+    @Override
+    public ByteBuf duplicate() {
+        return buf.duplicate();
+    }
+
+    @Override
+    public ByteBuf retainedDuplicate() {
+        return buf.retainedDuplicate();
+    }
+
+    @Override
+    public int nioBufferCount() {
+        return buf.nioBufferCount();
+    }
+
+    @Override
+    public ByteBuffer nioBuffer() {
+        return buf.nioBuffer();
+    }
+
+    @Override
+    public ByteBuffer nioBuffer(int index, int length) {
+        return buf.nioBuffer(index, length);
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers() {
+        return buf.nioBuffers();
+    }
+
+    @Override
+    public ByteBuffer[] nioBuffers(int index, int length) {
+        return buf.nioBuffers(index, length);
+    }
+
+    @Override
+    public ByteBuffer internalNioBuffer(int index, int length) {
+        return buf.internalNioBuffer(index, length);
+    }
+
+    @Override
+    public boolean hasArray() {
+        return buf.hasArray();
+    }
+
+    @Override
+    public byte[] array() {
+        return buf.array();
+    }
+
+    @Override
+    public int arrayOffset() {
+        return buf.arrayOffset();
+    }
+
+    @Override
+    public String toString(Charset charset) {
+        return buf.toString(charset);
+    }
+
+    @Override
+    public String toString(int index, int length, Charset charset) {
+        return buf.toString(index, length, charset);
+    }
+
+    @Override
+    public int hashCode() {
+        return buf.hashCode();
+    }
+
+    @Override
+    @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
+    public boolean equals(Object obj) {
+        return buf.equals(obj);
+    }
+
+    @Override
+    public int compareTo(ByteBuf buffer) {
+        return buf.compareTo(buffer);
+    }
+
+    @Override
+    public String toString() {
+        return StringUtil.simpleClassName(this) + '(' + buf.toString() + ')';
+    }
+
+    @Override
+    public ByteBuf retain(int increment) {
+        buf.retain(increment);
+        return this;
+    }
+
+    @Override
+    public ByteBuf retain() {
+        buf.retain();
+        return this;
+    }
+
+    @Override
+    public ByteBuf touch() {
+        buf.touch();
+        return this;
+    }
+
+    @Override
+    public ByteBuf touch(Object hint) {
+        buf.touch(hint);
+        return this;
+    }
+
+    @Override
+    public final boolean isReadable(int size) {
+        return buf.isReadable(size);
+    }
+
+    @Override
+    public final boolean isWritable(int size) {
+        return buf.isWritable(size);
+    }
+
+    @Override
+    public final int refCnt() {
+        return buf.refCnt();
+    }
+
+    @Override
+    public boolean release() {
+        return buf.release();
+    }
+
+    @Override
+    public boolean release(int decrement) {
+        return buf.release(decrement);
+    }
+
+}

+ 106 - 0
modules/transport-netty4/src/test/java/org/elasticsearch/transport/netty4/NettyAllocatorTests.java

@@ -0,0 +1,106 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.transport.netty4;
+
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.Unpooled;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.test.ESTestCase;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import static org.elasticsearch.transport.netty4.NettyAllocator.TrashingByteBuf;
+import static org.elasticsearch.transport.netty4.NettyAllocator.TrashingByteBufAllocator;
+
+public class NettyAllocatorTests extends ESTestCase {
+
+    static void assertBufferTrashed(BytesReference bytesRef) throws IOException {
+        var iter = bytesRef.iterator();
+        BytesRef br;
+        while ((br = iter.next()) != null) {
+            for (var i = br.offset; i < br.offset + br.length; i++) {
+                assertEquals("off=" + br.offset + " len=" + br.length + " i=" + i, 0, br.bytes[i]);
+            }
+        }
+    }
+
+    public void testTrashArrayByteBuf() {
+        var arr = randomByteArrayOfLength(between(1024, 2048));
+        var buf = Unpooled.wrappedBuffer(arr);
+        var tBuf = new TrashingByteBuf(buf);
+        tBuf.release();
+        var emptyArr = new byte[arr.length];
+        assertArrayEquals(emptyArr, arr);
+    }
+
+    public void testNioBufsTrashingByteBuf() {
+        var arrCnt = between(1, 16);
+        var byteArrs = new byte[arrCnt][];
+        var byteBufs = new ByteBuffer[arrCnt];
+        for (var i = 0; i < arrCnt; i++) {
+            byteArrs[i] = randomByteArrayOfLength(between(1024, 2048));
+            byteBufs[i] = ByteBuffer.wrap(byteArrs[i]);
+        }
+        var buf = Unpooled.wrappedBuffer(byteBufs);
+        var tBuf = new TrashingByteBuf(buf);
+        tBuf.release();
+        for (int i = 0; i < arrCnt; i++) {
+            for (int j = 0; j < byteArrs[i].length; j++) {
+                assertEquals(0, byteArrs[i][j]);
+            }
+        }
+    }
+
+    public void testNioBufOffsetTrashingByteBuf() {
+        var arr = randomByteArrayOfLength(1024);
+        var off = 1;
+        var len = arr.length - 2;
+        arr[0] = 1;
+        arr[arr.length - 1] = 1;
+        var buf = Unpooled.wrappedBuffer(arr, off, len);
+        var tBuf = new TrashingByteBuf(buf);
+        tBuf.release();
+        assertEquals(1, arr[0]);
+        assertEquals(1, arr[arr.length - 1]);
+        for (int i = 1; i < arr.length - 1; i++) {
+            assertEquals("at index " + i, 0, arr[i]);
+        }
+    }
+
+    public void testTrashingByteBufAllocator() throws IOException {
+        var alloc = new TrashingByteBufAllocator(ByteBufAllocator.DEFAULT);
+        var size = between(1024 * 1024, 10 * 1024 * 1024);
+
+        // use 3 different heap allocation methods
+        for (var buf : List.of(alloc.heapBuffer(), alloc.heapBuffer(1024), alloc.heapBuffer(1024, size))) {
+            buf.writeBytes(randomByteArrayOfLength(size));
+            var bytesRef = Netty4Utils.toBytesReference(buf);
+            buf.release();
+            assertBufferTrashed(bytesRef);
+        }
+    }
+
+    public void testTrashingCompositeByteBuf() throws IOException {
+        var alloc = new TrashingByteBufAllocator(ByteBufAllocator.DEFAULT);
+        var compBuf = alloc.compositeHeapBuffer();
+        for (var i = 0; i < between(1, 10); i++) {
+            var buf = alloc.heapBuffer().writeBytes(randomByteArrayOfLength(between(1024, 8192)));
+            compBuf.addComponent(true, buf);
+        }
+        var bytesRef = Netty4Utils.toBytesReference(compBuf);
+        compBuf.release();
+        assertBufferTrashed(bytesRef);
+    }
+
+}

+ 23 - 0
server/src/main/java/org/elasticsearch/common/bytes/BytesReference.java

@@ -74,6 +74,29 @@ public interface BytesReference extends Comparable<BytesReference>, ToXContentFr
         }
     }
 
+    /**
+     * Allocates new buffer and copy bytes from given BytesReference.
+     *
+     * @deprecated copying bytes is a right place for performance regression and unnecessary allocations.
+     * This method exists to serve very few places that struggle to handle reference counted buffers.
+     */
+    @Deprecated(forRemoval = true)
+    static BytesReference copyBytes(BytesReference bytesReference) {
+        byte[] arr = new byte[bytesReference.length()];
+        int offset = 0;
+        final BytesRefIterator iterator = bytesReference.iterator();
+        try {
+            BytesRef slice;
+            while ((slice = iterator.next()) != null) {
+                System.arraycopy(slice.bytes, slice.offset, arr, offset, slice.length);
+                offset += slice.length;
+            }
+            return new BytesArray(arr);
+        } catch (IOException e) {
+            throw new AssertionError(e);
+        }
+    }
+
     /**
      * Returns BytesReference composed of the provided ByteBuffers.
      */

+ 4 - 5
server/src/main/java/org/elasticsearch/http/HttpBody.java

@@ -9,7 +9,6 @@
 
 package org.elasticsearch.http;
 
-import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.core.Nullable;
@@ -21,11 +20,11 @@ import org.elasticsearch.core.Releasable;
 public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpBody.Stream {
 
     static Full fromBytesReference(BytesReference bytesRef) {
-        return new ByteRefHttpBody(bytesRef);
+        return new ByteRefHttpBody(ReleasableBytesReference.wrap(bytesRef));
     }
 
     static Full empty() {
-        return new ByteRefHttpBody(BytesArray.EMPTY);
+        return new ByteRefHttpBody(ReleasableBytesReference.empty());
     }
 
     default boolean isFull() {
@@ -56,7 +55,7 @@ public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpB
      * Full content represents a complete http body content that can be accessed immediately.
      */
     non-sealed interface Full extends HttpBody {
-        BytesReference bytes();
+        ReleasableBytesReference bytes();
 
         @Override
         default void close() {}
@@ -114,5 +113,5 @@ public sealed interface HttpBody extends Releasable permits HttpBody.Full, HttpB
         default void close() {}
     }
 
-    record ByteRefHttpBody(BytesReference bytes) implements Full {}
+    record ByteRefHttpBody(ReleasableBytesReference bytes) implements Full {}
 }

+ 0 - 6
server/src/main/java/org/elasticsearch/http/HttpRequest.java

@@ -52,10 +52,4 @@ public interface HttpRequest extends HttpPreRequest {
      */
     void release();
 
-    /**
-     * If this instances uses any pooled resources, creates a copy of this instance that does not use any pooled resources and releases
-     * any resources associated with this instance. If the instance does not use any shared resources, returns itself.
-     * @return a safe unpooled http request
-     */
-    HttpRequest releaseAndCopy();
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/http/HttpTracer.java

@@ -94,7 +94,7 @@ class HttpTracer {
 
     private void logFullContent(RestRequest restRequest) {
         try (var stream = HttpBodyTracer.getBodyOutputStream(restRequest.getRequestId(), HttpBodyTracer.Type.REQUEST)) {
-            restRequest.content().writeTo(stream);
+            restRequest.releasableContent().writeTo(stream);
         } catch (Exception e2) {
             assert false : e2; // no real IO here
         }

+ 5 - 4
server/src/main/java/org/elasticsearch/rest/BaseRestHandler.java

@@ -129,6 +129,7 @@ public abstract class BaseRestHandler implements RestHandler {
                 );
             }
 
+            usageCount.increment();
             if (request.isStreamedContent()) {
                 assert action instanceof RequestBodyChunkConsumer;
                 var chunkConsumer = (RequestBodyChunkConsumer) action;
@@ -144,11 +145,11 @@ public abstract class BaseRestHandler implements RestHandler {
                         chunkConsumer.streamClose();
                     }
                 });
+                action.accept(channel);
+            } else {
+                action.accept(channel);
+                request.getHttpRequest().release();
             }
-
-            usageCount.increment();
-            // execute the action
-            action.accept(channel);
         }
     }
 

+ 0 - 5
server/src/main/java/org/elasticsearch/rest/FilterRestHandler.java

@@ -43,11 +43,6 @@ public abstract class FilterRestHandler implements RestHandler {
         return delegate.canTripCircuitBreaker();
     }
 
-    @Override
-    public boolean allowsUnsafeBuffers() {
-        return delegate.allowsUnsafeBuffers();
-    }
-
     @Override
     public boolean supportsBulkContent() {
         return delegate.supportsBulkContent();

+ 0 - 4
server/src/main/java/org/elasticsearch/rest/RestController.java

@@ -467,10 +467,6 @@ public class RestController implements HttpServerTransport.Dispatcher {
             }
             // iff we could reserve bytes for the request we need to send the response also over this channel
             responseChannel = new ResourceHandlingHttpChannel(channel, circuitBreakerService, contentLength, methodHandlers);
-            // TODO: Count requests double in the circuit breaker if they need copying?
-            if (handler.allowsUnsafeBuffers() == false) {
-                request.ensureSafeBuffers();
-            }
 
             if (handler.allowSystemIndexAccessByDefault() == false) {
                 // The ELASTIC_PRODUCT_ORIGIN_HTTP_HEADER indicates that the request is coming from an Elastic product and

+ 0 - 12
server/src/main/java/org/elasticsearch/rest/RestHandler.java

@@ -68,18 +68,6 @@ public interface RestHandler {
         return serverlessScope == null ? null : serverlessScope.value();
     }
 
-    /**
-     * Indicates if the RestHandler supports working with pooled buffers. If the request handler will not escape the return
-     * {@link RestRequest#content()} or any buffers extracted from it then there is no need to make a copies of any pooled buffers in the
-     * {@link RestRequest} instance before passing a request to this handler. If this instance does not support pooled/unsafe buffers
-     * {@link RestRequest#ensureSafeBuffers()} should be called on any request before passing it to {@link #handleRequest}.
-     *
-     * @return true iff the handler supports requests that make use of pooled buffers
-     */
-    default boolean allowsUnsafeBuffers() {
-        return false;
-    }
-
     /**
      * The list of {@link Route}s that this RestHandler is responsible for handling.
      */

+ 47 - 14
server/src/main/java/org/elasticsearch/rest/RestRequest.java

@@ -16,17 +16,21 @@ import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.ValidationException;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.core.Booleans;
 import org.elasticsearch.core.CheckedConsumer;
 import org.elasticsearch.core.Nullable;
 import org.elasticsearch.core.RestApiVersion;
+import org.elasticsearch.core.SuppressForbidden;
 import org.elasticsearch.core.TimeValue;
 import org.elasticsearch.core.Tuple;
 import org.elasticsearch.http.HttpBody;
 import org.elasticsearch.http.HttpChannel;
 import org.elasticsearch.http.HttpRequest;
+import org.elasticsearch.logging.LogManager;
+import org.elasticsearch.logging.Logger;
 import org.elasticsearch.telemetry.tracing.Traceable;
 import org.elasticsearch.xcontent.ParsedMediaType;
 import org.elasticsearch.xcontent.ToXContent;
@@ -51,6 +55,8 @@ import static org.elasticsearch.core.TimeValue.parseTimeValue;
 
 public class RestRequest implements ToXContent.Params, Traceable {
 
+    private static final Logger logger = LogManager.getLogger(RestRequest.class);
+
     /**
      * Internal marker request parameter to indicate that a request was made in serverless mode. Use this parameter, together with
      * {@link #OPERATOR_REQUEST} if you need to toggle behavior for serverless, for example to enforce partial API restrictions
@@ -187,15 +193,6 @@ public class RestRequest implements ToXContent.Params, Traceable {
         }
     }
 
-    /**
-     * Invoke {@link HttpRequest#releaseAndCopy()} on the http request in this instance and replace a pooled http request
-     * with an unpooled copy. This is supposed to be used before passing requests to {@link RestHandler} instances that can not safely
-     * handle http requests that use pooled buffers as determined by {@link RestHandler#allowsUnsafeBuffers()}.
-     */
-    void ensureSafeBuffers() {
-        httpRequest = httpRequest.releaseAndCopy();
-    }
-
     /**
      * Creates a new REST request.
      *
@@ -306,9 +303,31 @@ public class RestRequest implements ToXContent.Params, Traceable {
         return httpRequest.body().isFull();
     }
 
+    /**
+     * Returns a copy of HTTP content. The copy is GC-managed and does not require reference counting.
+     * Please use {@link #releasableContent()} to avoid content copy.
+     */
+    @SuppressForbidden(reason = "temporarily support content copy while migrating RestHandlers to ref counted pooled buffers")
     public BytesReference content() {
+        return BytesReference.copyBytes(releasableContent());
+    }
+
+    /**
+     * Returns a direct reference to the network buffer containing the request body. The HTTP layers will release their references to this
+     * buffer as soon as they have finished the synchronous steps of processing the request on the network thread, which will by default
+     * release the buffer back to the pool where it may be re-used for another request. If you need to keep the buffer alive past the end of
+     * these synchronous steps, acquire your own reference to this buffer and release it once it's no longer needed.
+     */
+    public ReleasableBytesReference releasableContent() {
         this.contentConsumed = true;
-        return httpRequest.body().asFull().bytes();
+        var bytes = httpRequest.body().asFull().bytes();
+        if (bytes.hasReferences() == false) {
+            var e = new IllegalStateException("http releasable content accessed after release");
+            logger.error(e.getMessage(), e);
+            assert false : e;
+            throw e;
+        }
+        return bytes;
     }
 
     public boolean isStreamedContent() {
@@ -319,18 +338,32 @@ public class RestRequest implements ToXContent.Params, Traceable {
         return httpRequest.body().asStream();
     }
 
-    /**
-     * @return content of the request body or throw an exception if the body or content type is missing
-     */
-    public final BytesReference requiredContent() {
+    private void ensureContent() {
         if (hasContent() == false) {
             throw new ElasticsearchParseException("request body is required");
         } else if (xContentType.get() == null) {
             throwValidationException("unknown content type");
         }
+    }
+
+    /**
+     * @return copy of the request body or throw an exception if the body or content type is missing.
+     * See {@link #content()}. Please use {@link #requiredReleasableContent()} to avoid content copy.
+     */
+    public final BytesReference requiredContent() {
+        ensureContent();
         return content();
     }
 
+    /**
+     * Returns reference to the network buffer of HTTP content or throw an exception if the body or content type is missing.
+     * See {@link #releasableContent()}. It's a recommended method to handle HTTP content without copying it.
+     */
+    public ReleasableBytesReference requiredReleasableContent() {
+        ensureContent();
+        return releasableContent();
+    }
+
     private static void throwValidationException(String msg) {
         ValidationException unknownContentType = new ValidationException();
         unknownContentType.addValidationError(msg);

+ 4 - 3
server/src/main/java/org/elasticsearch/rest/RestRequestFilter.java

@@ -12,6 +12,7 @@ package org.elasticsearch.rest;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.xcontent.XContentHelper;
 import org.elasticsearch.common.xcontent.support.XContentMapValues;
 import org.elasticsearch.core.Tuple;
@@ -44,10 +45,10 @@ public interface RestRequestFilter {
                 }
 
                 @Override
-                public BytesReference content() {
+                public ReleasableBytesReference releasableContent() {
                     if (filteredBytes == null) {
                         Tuple<XContentType, Map<String, Object>> result = XContentHelper.convertToMap(
-                            restRequest.requiredContent(),
+                            restRequest.requiredReleasableContent(),
                             true,
                             restRequest.getXContentType()
                         );
@@ -63,7 +64,7 @@ public interface RestRequestFilter {
                             throw new ElasticsearchException("failed to parse request", e);
                         }
                     }
-                    return filteredBytes;
+                    return ReleasableBytesReference.wrap(filteredBytes);
                 }
             };
         } else {

+ 8 - 8
server/src/main/java/org/elasticsearch/rest/action/document/RestBulkAction.java

@@ -10,6 +10,7 @@
 package org.elasticsearch.rest.action.document;
 
 import org.elasticsearch.ElasticsearchParseException;
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkRequestParser;
@@ -108,9 +109,11 @@ public class RestBulkAction extends BaseRestHandler {
             boolean defaultRequireDataStream = request.paramAsBoolean(DocWriteRequest.REQUIRE_DATA_STREAM, false);
             bulkRequest.timeout(request.paramAsTime("timeout", BulkShardRequest.DEFAULT_TIMEOUT));
             bulkRequest.setRefreshPolicy(request.param("refresh"));
+            ReleasableBytesReference content = request.requiredReleasableContent();
+
             try {
                 bulkRequest.add(
-                    request.requiredContent(),
+                    content,
                     defaultIndex,
                     defaultRouting,
                     defaultFetchSourceContext,
@@ -125,8 +128,10 @@ public class RestBulkAction extends BaseRestHandler {
             } catch (Exception e) {
                 return channel -> new RestToXContentListener<>(channel).onFailure(parseFailureException(e));
             }
-
-            return channel -> client.bulk(bulkRequest, new RestRefCountedChunkedToXContentListener<>(channel));
+            return channel -> {
+                content.mustIncRef();
+                client.bulk(bulkRequest, ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), content));
+            };
         } else {
             if (request.getRestApiVersion() == RestApiVersion.V_7 && request.hasParam("type")) {
                 request.param("type");
@@ -280,11 +285,6 @@ public class RestBulkAction extends BaseRestHandler {
         return true;
     }
 
-    @Override
-    public boolean allowsUnsafeBuffers() {
-        return true;
-    }
-
     @Override
     public Set<String> supportedCapabilities() {
         return capabilities;

+ 14 - 5
server/src/main/java/org/elasticsearch/rest/action/document/RestIndexAction.java

@@ -9,12 +9,14 @@
 
 package org.elasticsearch.rest.action.document;
 
+import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.DocWriteRequest;
 import org.elasticsearch.action.DocWriteResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.cluster.metadata.DataStream;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.core.RestApiVersion;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.rest.BaseRestHandler;
@@ -122,11 +124,12 @@ public class RestIndexAction extends BaseRestHandler {
             request.param("type"); // consume and ignore the type
         }
 
+        ReleasableBytesReference source = request.requiredReleasableContent();
         IndexRequest indexRequest = new IndexRequest(request.param("index"));
         indexRequest.id(request.param("id"));
         indexRequest.routing(request.param("routing"));
         indexRequest.setPipeline(request.param("pipeline"));
-        indexRequest.source(request.requiredContent(), request.getXContentType());
+        indexRequest.source(source, request.getXContentType());
         indexRequest.timeout(request.paramAsTime("timeout", IndexRequest.DEFAULT_TIMEOUT));
         indexRequest.setRefreshPolicy(request.param("refresh"));
         indexRequest.version(RestActions.parseVersion(request));
@@ -144,10 +147,16 @@ public class RestIndexAction extends BaseRestHandler {
             indexRequest.opType(sOpType);
         }
 
-        return channel -> client.index(
-            indexRequest,
-            new RestToXContentListener<>(channel, DocWriteResponse::status, r -> r.getLocation(indexRequest.routing()))
-        );
+        return channel -> {
+            source.mustIncRef();
+            client.index(
+                indexRequest,
+                ActionListener.releaseAfter(
+                    new RestToXContentListener<>(channel, DocWriteResponse::status, r -> r.getLocation(indexRequest.routing())),
+                    source
+                )
+            );
+        };
     }
 
     @Override

+ 0 - 4
server/src/main/java/org/elasticsearch/rest/action/search/RestSearchAction.java

@@ -445,8 +445,4 @@ public class RestSearchAction extends BaseRestHandler {
         return RESPONSE_PARAMS;
     }
 
-    @Override
-    public boolean allowsUnsafeBuffers() {
-        return true;
-    }
 }

+ 6 - 0
server/src/test/java/org/elasticsearch/common/bytes/BytesArrayTests.java

@@ -107,4 +107,10 @@ public class BytesArrayTests extends AbstractBytesReferenceTestCase {
         Exception e = expectThrows(ArrayIndexOutOfBoundsException.class, () -> ref.getDoubleLE(9));
         assertThat(e.getMessage(), equalTo("Index 9 out of bounds for length 9"));
     }
+
+    public void testCopyBytes() {
+        var data = randomByteArrayOfLength(between(1024, 1024 * 1024 * 50));
+        var copy = BytesReference.copyBytes(new BytesArray(data));
+        assertArrayEquals(data, BytesReference.toBytes(copy));
+    }
 }

+ 0 - 5
server/src/test/java/org/elasticsearch/http/TestHttpRequest.java

@@ -85,11 +85,6 @@ class TestHttpRequest implements HttpRequest {
     @Override
     public void release() {}
 
-    @Override
-    public HttpRequest releaseAndCopy() {
-        return this;
-    }
-
     @Override
     public Exception getInboundException() {
         return null;

+ 0 - 5
server/src/test/java/org/elasticsearch/rest/RestControllerTests.java

@@ -876,11 +876,6 @@ public class RestControllerTests extends ESTestCase {
             @Override
             public void release() {}
 
-            @Override
-            public HttpRequest releaseAndCopy() {
-                return this;
-            }
-
             @Override
             public Exception getInboundException() {
                 return null;

+ 0 - 5
test/framework/src/main/java/org/elasticsearch/test/rest/FakeRestRequest.java

@@ -138,11 +138,6 @@ public class FakeRestRequest extends RestRequest {
         @Override
         public void release() {}
 
-        @Override
-        public HttpRequest releaseAndCopy() {
-            return this;
-        }
-
         @Override
         public Exception getInboundException() {
             return inboundException;

+ 1 - 1
x-pack/plugin/ent-search/src/main/java/org/elasticsearch/xpack/application/EnterpriseSearchBaseRestHandler.java

@@ -32,7 +32,7 @@ public abstract class EnterpriseSearchBaseRestHandler extends BaseRestHandler {
             // We need to consume parameters and content from the REST request in order to bypass unrecognized param errors
             // and return a license error.
             request.params().keySet().forEach(key -> request.param(key, ""));
-            request.content();
+            request.releasableContent();
             return channel -> channel.sendResponse(
                 new RestResponse(channel, LicenseUtils.newComplianceException(this.licenseState, this.product))
             );

+ 1 - 1
x-pack/plugin/logstash/src/main/java/org/elasticsearch/xpack/logstash/rest/RestPutPipelineAction.java

@@ -49,7 +49,7 @@ public class RestPutPipelineAction extends BaseRestHandler {
         }
 
         return restChannel -> {
-            final String content = request.content().utf8ToString();
+            final String content = request.releasableContent().utf8ToString();
             client.execute(
                 PutPipelineAction.INSTANCE,
                 new PutPipelineRequest(id, content, request.getXContentType()),

+ 3 - 2
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/audit/AuditUtil.java

@@ -27,10 +27,11 @@ public class AuditUtil {
 
     public static String restRequestContent(RestRequest request) {
         if (request.hasContent()) {
+            var content = request.releasableContent();
             try {
-                return XContentHelper.convertToJson(request.content(), false, false, request.getXContentType());
+                return XContentHelper.convertToJson(content, false, false, request.getXContentType());
             } catch (IOException ioe) {
-                return "Invalid Format: " + request.content().utf8ToString();
+                return "Invalid Format: " + content.utf8ToString();
             }
         }
         return "";

+ 1 - 1
x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/rest/action/SecurityBaseRestHandler.java

@@ -75,7 +75,7 @@ public abstract class SecurityBaseRestHandler extends BaseRestHandler {
             return innerPrepareRequest(request, client);
         } else {
             request.params().keySet().forEach(key -> request.param(key, ""));
-            request.content();
+            request.releasableContent(); // mark content consumed
             return channel -> channel.sendResponse(new RestResponse(channel, failedFeature));
         }
     }

+ 4 - 4
x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/audit/logfile/LoggingAuditTrailTests.java

@@ -2614,7 +2614,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
         checkedFields.put(LoggingAuditTrail.REQUEST_METHOD_FIELD_NAME, request.method().toString());
         checkedFields.put(LoggingAuditTrail.REQUEST_ID_FIELD_NAME, requestId);
         checkedFields.put(LoggingAuditTrail.URL_PATH_FIELD_NAME, "_uri");
-        if (includeRequestBody && Strings.hasLength(request.content())) {
+        if (includeRequestBody && request.hasContent()) {
             checkedFields.put(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, request.content().utf8ToString());
         }
         if (params.isEmpty() == false) {
@@ -2643,8 +2643,8 @@ public class LoggingAuditTrailTests extends ESTestCase {
         checkedFields.put(LoggingAuditTrail.REQUEST_METHOD_FIELD_NAME, request.method().toString());
         checkedFields.put(LoggingAuditTrail.REQUEST_ID_FIELD_NAME, requestId);
         checkedFields.put(LoggingAuditTrail.URL_PATH_FIELD_NAME, "_uri");
-        if (includeRequestBody && Strings.hasLength(request.content())) {
-            checkedFields.put(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, request.getHttpRequest().body().asFull().bytes().utf8ToString());
+        if (includeRequestBody && request.hasContent()) {
+            checkedFields.put(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, request.content().utf8ToString());
         }
         if (params.isEmpty() == false) {
             checkedFields.put(LoggingAuditTrail.URL_QUERY_FIELD_NAME, "foo=bar&evac=true");
@@ -2672,7 +2672,7 @@ public class LoggingAuditTrailTests extends ESTestCase {
         checkedFields.put(LoggingAuditTrail.REQUEST_METHOD_FIELD_NAME, request.method().toString());
         checkedFields.put(LoggingAuditTrail.REQUEST_ID_FIELD_NAME, requestId);
         checkedFields.put(LoggingAuditTrail.URL_PATH_FIELD_NAME, "_uri");
-        if (includeRequestBody && Strings.hasLength(request.content().utf8ToString())) {
+        if (includeRequestBody && request.hasContent()) {
             checkedFields.put(LoggingAuditTrail.REQUEST_BODY_FIELD_NAME, request.content().utf8ToString());
         }
         if (params.isEmpty() == false) {