Pārlūkot izejas kodu

Stop allocating/acquiring pages redundantly in RecyclerBytesStreamOutput (#105856)

Acquiring a page just to figure out the recycler's page size is quite
wasteful. Especially for the Netty allocator there is a non-trivial
amount of underlying logic here that may even involve contention with
other threads. We can easily pass the page size with the recycler.
Armin Braun 1 gadu atpakaļ
vecāks
revīzija
6bb6cab5bb

+ 5 - 0
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/NettyAllocator.java

@@ -152,6 +152,11 @@ public class NettyAllocator {
                     }
                 };
             }
+
+            @Override
+            public int pageSize() {
+                return PageCacheRecycler.BYTE_PAGE_SIZE;
+            }
         };
     }
 

+ 1 - 3
server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java

@@ -43,9 +43,7 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
 
     public RecyclerBytesStreamOutput(Recycler<BytesRef> recycler) {
         this.recycler = recycler;
-        try (Recycler.V<BytesRef> obtain = recycler.obtain()) {
-            pageSize = obtain.v().length;
-        }
+        this.pageSize = recycler.pageSize();
         this.currentPageOffset = pageSize;
     }
 

+ 4 - 0
server/src/main/java/org/elasticsearch/common/recycler/AbstractRecycler.java

@@ -16,4 +16,8 @@ abstract class AbstractRecycler<T> implements Recycler<T> {
         this.c = c;
     }
 
+    @Override
+    public int pageSize() {
+        return c.pageSize();
+    }
 }

+ 10 - 0
server/src/main/java/org/elasticsearch/common/recycler/Recycler.java

@@ -30,6 +30,11 @@ public interface Recycler<T> {
 
         /** Destroy the data. This operation allows the data structure to release any internal resources before GC. */
         void destroy(T value);
+
+        /**
+         * @return see {@link Recycler#pageSize()}
+         */
+        int pageSize();
     }
 
     interface V<T> extends Releasable {
@@ -44,4 +49,9 @@ public interface Recycler<T> {
 
     V<T> obtain();
 
+    /**
+     * @return the page size of the recycled object if it is array backed.
+     */
+    int pageSize();
+
 }

+ 8 - 0
server/src/main/java/org/elasticsearch/common/recycler/Recyclers.java

@@ -92,6 +92,10 @@ public enum Recyclers {
                 };
             }
 
+            @Override
+            public int pageSize() {
+                return getDelegate().pageSize();
+            }
         };
     }
 
@@ -134,6 +138,10 @@ public enum Recyclers {
                 return recyclers[slot()];
             }
 
+            @Override
+            public int pageSize() {
+                return recyclers[slot()].pageSize();
+            }
         };
     }
 

+ 12 - 2
server/src/main/java/org/elasticsearch/common/util/PageCacheRecycler.java

@@ -99,7 +99,7 @@ public class PageCacheRecycler {
         final int maxPageCount = (int) Math.min(Integer.MAX_VALUE, limit / PAGE_SIZE_IN_BYTES);
 
         final int maxBytePageCount = (int) (bytesWeight * maxPageCount / totalWeight);
-        bytePage = build(type, maxBytePageCount, allocatedProcessors, new AbstractRecyclerC<byte[]>() {
+        bytePage = build(type, maxBytePageCount, allocatedProcessors, new AbstractRecyclerC<>() {
             @Override
             public byte[] newInstance() {
                 return new byte[BYTE_PAGE_SIZE];
@@ -109,10 +109,15 @@ public class PageCacheRecycler {
             public void recycle(byte[] value) {
                 // nothing to do
             }
+
+            @Override
+            public int pageSize() {
+                return BYTE_PAGE_SIZE;
+            }
         });
 
         final int maxObjectPageCount = (int) (objectsWeight * maxPageCount / totalWeight);
-        objectPage = build(type, maxObjectPageCount, allocatedProcessors, new AbstractRecyclerC<Object[]>() {
+        objectPage = build(type, maxObjectPageCount, allocatedProcessors, new AbstractRecyclerC<>() {
             @Override
             public Object[] newInstance() {
                 return new Object[OBJECT_PAGE_SIZE];
@@ -122,6 +127,11 @@ public class PageCacheRecycler {
             public void recycle(Object[] value) {
                 Arrays.fill(value, null); // we need to remove the strong refs on the objects stored in the array
             }
+
+            @Override
+            public int pageSize() {
+                return OBJECT_PAGE_SIZE;
+            }
         });
 
         assert PAGE_SIZE_IN_BYTES * (maxBytePageCount + maxObjectPageCount) <= limit;

+ 5 - 0
server/src/main/java/org/elasticsearch/transport/BytesRefRecycler.java

@@ -43,4 +43,9 @@ public class BytesRefRecycler implements Recycler<BytesRef> {
             }
         };
     }
+
+    @Override
+    public int pageSize() {
+        return PageCacheRecycler.BYTE_PAGE_SIZE;
+    }
 }

+ 5 - 0
server/src/test/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutputTests.java

@@ -1027,6 +1027,11 @@ public class RecyclerBytesStreamOutputTests extends ESTestCase {
                 pagesAllocated.incrementAndGet();
                 return page;
             }
+
+            @Override
+            public int pageSize() {
+                return pageSize;
+            }
         })) {
             var bytesAllocated = 0;
             while (bytesAllocated < Integer.MAX_VALUE) {

+ 6 - 1
server/src/test/java/org/elasticsearch/common/recycler/AbstractRecyclerTestCase.java

@@ -26,7 +26,7 @@ public abstract class AbstractRecyclerTestCase extends ESTestCase {
 
         @Override
         public byte[] newInstance() {
-            byte[] value = new byte[10];
+            byte[] value = new byte[pageSize()];
             // "fresh" is intentionally not 0 to ensure we covered this code path
             Arrays.fill(value, FRESH);
             return value;
@@ -43,6 +43,11 @@ public abstract class AbstractRecyclerTestCase extends ESTestCase {
             Arrays.fill(value, DEAD);
         }
 
+        @Override
+        public int pageSize() {
+            return 10;
+        }
+
     };
 
     protected void assertFresh(byte[] data) {

+ 5 - 0
server/src/test/java/org/elasticsearch/rest/action/cat/RestTableTests.java

@@ -404,6 +404,11 @@ public class RestTableTests extends ESTestCase {
                     }
                 };
             }
+
+            @Override
+            public int pageSize() {
+                return pageSize;
+            }
         };
 
         final var bodyChunks = new ArrayList<String>();

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -2033,6 +2033,11 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
             return trackedRef;
         }
 
+        @Override
+        public int pageSize() {
+            return delegate.pageSize();
+        }
+
         /**
          * Release all tracked refs as if the node rebooted.
          */