Ver código fonte

Add direct access to current recycler stream page (#135114)

This change makes a number of minor optimizations to the recycler bytes
stream. The most important change is that it allow cached direct access
to the current page. This helps in most scenarios where are write does
not cross page boundaries.

Additionally, it enables future subclasses to implement custom
serialization directly to the page with minimal bounds checks.

Finally, it always creates the first page in the ctor to remove guaranteed
expand calls in the first stream write.
Tim Brooks 2 semanas atrás
pai
commit
3e7e63ee0c

+ 228 - 0
benchmarks/src/main/java/org/elasticsearch/benchmark/bytes/RecyclerBytesStreamOutputBenchmark.java

@@ -0,0 +1,228 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.benchmark.bytes;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
+import org.elasticsearch.common.recycler.Recycler;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.Warmup;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+@Warmup(iterations = 3)
+@Measurement(iterations = 3)
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.NANOSECONDS)
+@State(Scope.Thread)
+@Fork(value = 1)
+public class RecyclerBytesStreamOutputBenchmark {
+
+    private final AtomicReference<BytesRef> bytesRef = new AtomicReference<>(new BytesRef(16384));
+    private RecyclerBytesStreamOutput streamOutput;
+    private String shortString;
+    private String longString;
+    private String nonAsciiString;
+    private String veryLongString;
+    private byte[] bytes1;
+    private byte[] bytes2;
+    private byte[] bytes3;
+    private byte[] multiPageBytes;
+    private int[] vints;
+
+    @Setup
+    public void initResults() throws IOException {
+        streamOutput = new RecyclerBytesStreamOutput(new BenchmarkRecycler(bytesRef));
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+
+        bytes1 = new byte[327];
+        bytes2 = new byte[712];
+        bytes3 = new byte[1678];
+        multiPageBytes = new byte[16387 * 4];
+        random.nextBytes(bytes1);
+        random.nextBytes(bytes2);
+        random.nextBytes(bytes3);
+        random.nextBytes(multiPageBytes);
+
+        // We use weights to generate certain sized UTF-8 characters and vInts. However, there is still some non-determinism which could
+        // impact direct comparisons run-to-run
+
+        shortString = generateAsciiString(20);
+        longString = generateAsciiString(100);
+        nonAsciiString = generateUtf8String(200);
+        veryLongString = generateAsciiString(800);
+        // vint values for benchmarking
+        vints = new int[1000];
+        for (int i = 0; i < vints.length; i++) {
+            if (random.nextBoolean()) {
+                // 1-byte 50% of the time
+                vints[i] = random.nextInt(128);
+            } else if (random.nextBoolean()) {
+                // 2-byte 25% of the time
+                vints[i] = random.nextInt(128, 16384);
+            } else {
+                if (random.nextBoolean()) {
+                    // 3-byte vints
+                    vints[i] = random.nextInt(16384, 2097152);
+                } else {
+                    // All vint variants
+                    vints[i] = random.nextInt();
+                }
+            }
+        }
+    }
+
+    @Benchmark
+    public void writeByte() throws IOException {
+        streamOutput.seek(1);
+        for (byte item : bytes1) {
+            streamOutput.writeByte(item);
+        }
+        for (byte item : bytes2) {
+            streamOutput.writeByte(item);
+        }
+        for (byte item : bytes3) {
+            streamOutput.writeByte(item);
+        }
+    }
+
+    @Benchmark
+    public void writeBytes() throws IOException {
+        streamOutput.seek(1);
+        streamOutput.writeBytes(bytes1, 0, bytes1.length);
+        streamOutput.writeBytes(bytes2, 0, bytes2.length);
+        streamOutput.writeBytes(bytes3, 0, bytes3.length);
+    }
+
+    @Benchmark
+    public void writeBytesAcrossPageBoundary() throws IOException {
+        streamOutput.seek(16384 - 1000);
+        streamOutput.writeBytes(bytes1, 0, bytes1.length);
+        streamOutput.writeBytes(bytes2, 0, bytes2.length);
+        streamOutput.writeBytes(bytes3, 0, bytes3.length);
+    }
+
+    @Benchmark
+    public void writeBytesMultiPage() throws IOException {
+        streamOutput.seek(16384 - 1000);
+        streamOutput.writeBytes(multiPageBytes, 0, multiPageBytes.length);
+    }
+
+    @Benchmark
+    public void writeString() throws IOException {
+        streamOutput.seek(1);
+        streamOutput.writeString(shortString);
+        streamOutput.writeString(longString);
+        streamOutput.writeString(nonAsciiString);
+        streamOutput.writeString(veryLongString);
+    }
+
+    @Benchmark
+    public void writeVInt() throws IOException {
+        streamOutput.seek(1);
+        for (int vint : vints) {
+            streamOutput.writeVInt(vint);
+        }
+    }
+
+    public static String generateAsciiString(int n) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        StringBuilder sb = new StringBuilder(n);
+
+        for (int i = 0; i < n; i++) {
+            int ascii = random.nextInt(128);
+            sb.append((char) ascii);
+        }
+
+        return sb.toString();
+    }
+
+    public static String generateUtf8String(int n) {
+        ThreadLocalRandom random = ThreadLocalRandom.current();
+        StringBuilder sb = new StringBuilder(n);
+
+        for (int i = 0; i < n; i++) {
+            int codePoint;
+            int probability = random.nextInt(100);
+
+            if (probability < 85) {
+                // 1-byte UTF-8 (ASCII range)
+                // 0x0000 to 0x007F
+                codePoint = random.nextInt(0x0080);
+            } else if (probability < 95) {
+                // 2-byte UTF-8
+                // 0x0080 to 0x07FF
+                codePoint = random.nextInt(0x0080, 0x0800);
+            } else {
+                // 3-byte UTF-8
+                // 0x0800 to 0xFFFF
+                do {
+                    codePoint = random.nextInt(0x0800, 0x10000);
+                    // Skip surrogate pairs (0xD800-0xDFFF)
+                } while (codePoint >= 0xD800 && codePoint <= 0xDFFF);
+            }
+
+            sb.appendCodePoint(codePoint);
+        }
+
+        return sb.toString();
+    }
+
+    private record BenchmarkRecycler(AtomicReference<BytesRef> bytesRef) implements Recycler<BytesRef> {
+
+        @Override
+        public V<BytesRef> obtain() {
+            BytesRef recycledBytesRef = bytesRef.getAndSet(null);
+            final BytesRef localBytesRef;
+            final boolean recycled;
+            if (recycledBytesRef != null) {
+                recycled = true;
+                localBytesRef = recycledBytesRef;
+            } else {
+                recycled = false;
+                localBytesRef = new BytesRef(16384);
+            }
+            return new V<>() {
+                @Override
+                public BytesRef v() {
+                    return localBytesRef;
+                }
+
+                @Override
+                public boolean isRecycled() {
+                    return recycled;
+                }
+
+                @Override
+                public void close() {
+                    if (recycled) {
+                        bytesRef.set(localBytesRef);
+                    }
+                }
+            };
+        }
+
+        @Override
+        public int pageSize() {
+            return 16384;
+        }
+    }
+}

+ 144 - 48
server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java

@@ -38,17 +38,23 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
     static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
     static final VarHandle VH_LE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN);
 
-    private ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>();
+    private ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>(8);
     private final Recycler<BytesRef> recycler;
     private final int pageSize;
     private int pageIndex = -1;
     private int currentCapacity = 0;
+
+    private BytesRef currentBytesRef;
     private int currentPageOffset;
 
     public RecyclerBytesStreamOutput(Recycler<BytesRef> recycler) {
         this.recycler = recycler;
         this.pageSize = recycler.pageSize();
         this.currentPageOffset = pageSize;
+        // Always start with a page. This is because if we don't have a page, one of the hot write paths would be forced to go through
+        // a slow path. We prefer to only execute that path if we need to expand.
+        ensureCapacityFromPosition(1);
+        nextPage();
     }
 
     @Override
@@ -59,15 +65,27 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
     @Override
     public void writeByte(byte b) {
         int currentPageOffset = this.currentPageOffset;
-        if (1 > (pageSize - currentPageOffset)) {
+        if (1 > pageSize - currentPageOffset) {
             ensureCapacity(1);
+            nextPage();
             currentPageOffset = 0;
         }
-        BytesRef currentPage = pages.get(pageIndex).v();
-        currentPage.bytes[currentPage.offset + currentPageOffset] = b;
+        final BytesRef currentPage = currentBytesRef;
+        final int destOffset = currentPage.offset + currentPageOffset;
+        currentPage.bytes[destOffset] = b;
         this.currentPageOffset = currentPageOffset + 1;
     }
 
+    @Override
+    public void write(byte[] b) throws IOException {
+        writeBytes(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        writeBytes(b, off, len);
+    }
+
     @Override
     public void writeBytes(byte[] b, int offset, int length) {
         // nothing to copy
@@ -77,36 +95,79 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
 
         Objects.checkFromIndexSize(offset, length, b.length);
 
-        // get enough pages for new size
-        final int pageSize = this.pageSize;
         int currentPageOffset = this.currentPageOffset;
+        BytesRef currentPage = currentBytesRef;
         if (length > pageSize - currentPageOffset) {
             ensureCapacity(length);
-            currentPageOffset = this.currentPageOffset;
         }
 
-        // bulk copy
         int bytesToCopy = length;
         int srcOff = offset;
-        int j = 0;
         while (true) {
-            BytesRef currentPage = pages.get(pageIndex + j).v();
-            int toCopyThisLoop = Math.min(pageSize - currentPageOffset, bytesToCopy);
-            System.arraycopy(b, srcOff, currentPage.bytes, currentPage.offset + currentPageOffset, toCopyThisLoop);
+            final int toCopyThisLoop = Math.min(pageSize - currentPageOffset, bytesToCopy);
+            final int destOffset = currentPage.offset + currentPageOffset;
+            System.arraycopy(b, srcOff, currentPage.bytes, destOffset, toCopyThisLoop);
             srcOff += toCopyThisLoop;
             bytesToCopy -= toCopyThisLoop;
             if (bytesToCopy > 0) {
                 currentPageOffset = 0;
+                currentPage = pages.get(++pageIndex).v();
             } else {
                 currentPageOffset += toCopyThisLoop;
                 break;
             }
-            j++;
         }
         this.currentPageOffset = currentPageOffset;
+        this.currentBytesRef = currentPage;
+    }
+
+    @Override
+    public void writeVInt(int i) throws IOException {
+        final int currentPageOffset = this.currentPageOffset;
+        final int remainingBytesInPage = pageSize - currentPageOffset;
+
+        // Single byte values (most common)
+        if ((i & 0xFFFFFF80) == 0) {
+            if (1 > remainingBytesInPage) {
+                super.writeVInt(i);
+            } else {
+                BytesRef currentPage = currentBytesRef;
+                currentPage.bytes[currentPage.offset + currentPageOffset] = (byte) i;
+                this.currentPageOffset = currentPageOffset + 1;
+            }
+            return;
+        }
+
+        int bytesNeeded = vIntLength(i);
+        if (bytesNeeded > remainingBytesInPage) {
+            super.writeVInt(i);
+        } else {
+            BytesRef currentPage = currentBytesRef;
+            putVInt(i, bytesNeeded, currentPage.bytes, currentPage.offset + currentPageOffset);
+            this.currentPageOffset = currentPageOffset + bytesNeeded;
+        }
+    }
+
+    protected static int vIntLength(int value) {
+        int leadingZeros = Integer.numberOfLeadingZeros(value);
+        if (leadingZeros >= 25) {
+            return 1;
+        } else if (leadingZeros >= 18) {
+            return 2;
+        } else if (leadingZeros >= 11) {
+            return 3;
+        } else if (leadingZeros >= 4) {
+            return 4;
+        }
+        return 5;
+    }
 
-        // advance
-        pageIndex += j;
+    private void putVInt(int i, int bytesNeeded, byte[] page, int offset) {
+        if (bytesNeeded == 1) {
+            page[offset] = (byte) i;
+        } else {
+            putMultiByteVInt(page, i, offset);
+        }
     }
 
     @Override
@@ -115,7 +176,7 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
         if (4 > (pageSize - currentPageOffset)) {
             super.writeInt(i);
         } else {
-            BytesRef currentPage = pages.get(pageIndex).v();
+            BytesRef currentPage = currentBytesRef;
             VH_BE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
             this.currentPageOffset = currentPageOffset + 4;
         }
@@ -123,12 +184,13 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
 
     @Override
     public void writeIntLE(int i) throws IOException {
+        final int currentPageOffset = this.currentPageOffset;
         if (4 > (pageSize - currentPageOffset)) {
             super.writeIntLE(i);
         } else {
-            BytesRef currentPage = pages.get(pageIndex).v();
+            BytesRef currentPage = currentBytesRef;
             VH_LE_INT.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
-            currentPageOffset += 4;
+            this.currentPageOffset = currentPageOffset + 4;
         }
     }
 
@@ -138,7 +200,7 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
         if (8 > (pageSize - currentPageOffset)) {
             super.writeLong(i);
         } else {
-            BytesRef currentPage = pages.get(pageIndex).v();
+            BytesRef currentPage = currentBytesRef;
             VH_BE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
             this.currentPageOffset = currentPageOffset + 8;
         }
@@ -146,12 +208,13 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
 
     @Override
     public void writeLongLE(long i) throws IOException {
+        final int currentPageOffset = this.currentPageOffset;
         if (8 > (pageSize - currentPageOffset)) {
             super.writeLongLE(i);
         } else {
-            BytesRef currentPage = pages.get(pageIndex).v();
+            BytesRef currentPage = currentBytesRef;
             VH_LE_LONG.set(currentPage.bytes, currentPage.offset + currentPageOffset, i);
-            currentPageOffset += 8;
+            this.currentPageOffset = currentPageOffset + 8;
         }
     }
 
@@ -184,16 +247,20 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
     public void writeString(String str) throws IOException {
         final int currentPageOffset = this.currentPageOffset;
         final int charCount = str.length();
-        // maximum serialized length is 3 bytes per char + 5 bytes for the longest possible vint
-        if (charCount * 3 + 5 > (pageSize - currentPageOffset)) {
+        int bytesNeededForVInt = vIntLength(charCount);
+        // maximum serialized length is 3 bytes per char + n bytes for the vint
+        if (charCount * 3 + bytesNeededForVInt > (pageSize - currentPageOffset)) {
             super.writeString(str);
             return;
         }
-        BytesRef currentPage = pages.get(pageIndex).v();
-        int off = currentPage.offset + currentPageOffset;
+
+        BytesRef currentPage = currentBytesRef;
+        int offset = currentPage.offset + currentPageOffset;
         byte[] buffer = currentPage.bytes;
         // mostly duplicated from StreamOutput.writeString to to get more reliable compilation of this very hot loop
-        int offset = off + putVInt(buffer, charCount, off);
+        putVInt(charCount, bytesNeededForVInt, currentPage.bytes, offset);
+        offset += bytesNeededForVInt;
+
         for (int i = 0; i < charCount; i++) {
             final int c = str.charAt(i);
             if (c <= 0x007F) {
@@ -218,18 +285,28 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
     @Override
     public void seek(long position) {
         ensureCapacityFromPosition(position);
-        int offsetInPage = (int) (position % pageSize);
-        int pageIndex = (int) position / pageSize;
-        // Special handling for seeking to the first index in a new page, which is handled as a seeking to one-after the last index
-        // in the previous case. This is done so that seeking to the first index of a new page does not cause a page allocation while
-        // still allowing a fast check via (pageSize - currentPageOffset) on the remaining size in the current page in all other methods.
-        if (offsetInPage == 0) {
-            this.pageIndex = pageIndex - 1;
-            this.currentPageOffset = pageSize;
+        if (position > 0) {
+            int offsetInPage = (int) (position % pageSize);
+            int pageIndex = (int) position / pageSize;
+
+            // Special handling for seeking to the first index in a new page, which is handled as a seeking to one-after the last index
+            // in the previous case. This is done so that seeking to the first index of a new page does not cause a page allocation while
+            // still allowing a fast check via (pageSize - currentPageOffset) on the remaining size in the current page in all other
+            // methods.
+            if (offsetInPage == 0) {
+                this.pageIndex = pageIndex - 1;
+                this.currentPageOffset = pageSize;
+            } else {
+                this.pageIndex = pageIndex;
+                this.currentPageOffset = offsetInPage;
+            }
         } else {
-            this.pageIndex = pageIndex;
-            this.currentPageOffset = offsetInPage;
+            // We always have an initial page so special handling for seeking to 0.
+            assert position == 0;
+            this.pageIndex = 0;
+            this.currentPageOffset = 0;
         }
+        this.currentBytesRef = pages.get(pageIndex).v();
     }
 
     public void skip(int length) {
@@ -240,7 +317,7 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
     public void close() {
         var pages = this.pages;
         if (pages != null) {
-            this.pages = null;
+            closeFields();
             Releasables.close(pages);
         }
     }
@@ -254,10 +331,19 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
     public ReleasableBytesReference moveToBytesReference() {
         var bytes = bytes();
         var pages = this.pages;
-        this.pages = null;
+        closeFields();
+
         return new ReleasableBytesReference(bytes, () -> Releasables.close(pages));
     }
 
+    private void closeFields() {
+        this.pages = null;
+        this.currentBytesRef = null;
+        this.pageIndex = -1;
+        this.currentPageOffset = pageSize;
+        this.currentCapacity = 0;
+    }
+
     /**
      * Returns the current size of the buffer.
      *
@@ -311,17 +397,27 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
         // than Integer.MAX_VALUE
         if (newPosition > Integer.MAX_VALUE - (Integer.MAX_VALUE % pageSize)) {
             throw new IllegalArgumentException(getClass().getSimpleName() + " cannot hold more than 2GB of data");
+        } else if (pages == null) {
+            throw new IllegalStateException("Cannot use " + getClass().getSimpleName() + " after it has been closed");
         }
-        while (newPosition > currentCapacity) {
-            Recycler.V<BytesRef> newPage = recycler.obtain();
-            assert pageSize == newPage.v().length;
-            pages.add(newPage);
-            currentCapacity += pageSize;
-        }
-        // We are at the end of the current page, increment page index
-        if (currentPageOffset == pageSize) {
-            pageIndex++;
-            currentPageOffset = 0;
+
+        long additionalCapacityNeeded = newPosition - currentCapacity;
+        if (additionalCapacityNeeded > 0) {
+            // Calculate number of additional pages needed
+            int additionalPagesNeeded = (int) ((additionalCapacityNeeded + pageSize - 1) / pageSize);
+            pages.ensureCapacity(pages.size() + additionalPagesNeeded);
+            for (int i = 0; i < additionalPagesNeeded; i++) {
+                Recycler.V<BytesRef> newPage = recycler.obtain();
+                assert pageSize == newPage.v().length;
+                pages.add(newPage);
+            }
+            currentCapacity += additionalPagesNeeded * pageSize;
         }
     }
+
+    private void nextPage() {
+        pageIndex++;
+        currentPageOffset = 0;
+        currentBytesRef = pages.get(pageIndex).v();
+    }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

@@ -254,7 +254,7 @@ public abstract class StreamOutput extends OutputStream {
         return putMultiByteVInt(buffer, i, off);
     }
 
-    private static int putMultiByteVInt(byte[] buffer, int i, int off) {
+    protected static int putMultiByteVInt(byte[] buffer, int i, int off) {
         int index = off;
         do {
             buffer[index++] = ((byte) ((i & 0x7f) | 0x80));

+ 380 - 0
server/src/test/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutputTests.java

@@ -13,6 +13,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.Constants;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.common.geo.GeoPoint;
 import org.elasticsearch.common.lucene.BytesRefs;
@@ -1059,4 +1060,383 @@ public class RecyclerBytesStreamOutputTests extends ESTestCase {
         out.writeByte(b);
         assertEquals(b, out.bytes().get(PageCacheRecycler.BYTE_PAGE_SIZE));
     }
+
+    public void testWriteIntFallbackToSuperClass() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.seek(PageCacheRecycler.BYTE_PAGE_SIZE - 2);
+        int value = randomInt();
+        out.writeInt(value);
+
+        StreamInput in = out.bytes().streamInput();
+        in.skip(PageCacheRecycler.BYTE_PAGE_SIZE - 2);
+        assertEquals(value, in.readInt());
+        out.close();
+    }
+
+    public void testWriteIntLEFallbackToSuperClass() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.seek(PageCacheRecycler.BYTE_PAGE_SIZE - 2);
+        int value = randomInt();
+        out.writeIntLE(value);
+
+        StreamInput in = out.bytes().streamInput();
+        in.skip(PageCacheRecycler.BYTE_PAGE_SIZE - 2);
+        byte[] bytes = new byte[4];
+        in.readBytes(bytes, 0, 4);
+        assertEquals(value, new BytesArray(bytes).getIntLE(0));
+        out.close();
+    }
+
+    public void testWriteLongFallbackToSuperClass() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.seek(PageCacheRecycler.BYTE_PAGE_SIZE - 4);
+        long value = randomLong();
+        out.writeLong(value);
+
+        StreamInput in = out.bytes().streamInput();
+        in.skip(PageCacheRecycler.BYTE_PAGE_SIZE - 4);
+        assertEquals(value, in.readLong());
+        out.close();
+    }
+
+    public void testWriteLongLEFallbackToSuperClass() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.seek(PageCacheRecycler.BYTE_PAGE_SIZE - 4);
+        long value = randomLong();
+        out.writeLongLE(value);
+
+        StreamInput in = out.bytes().streamInput();
+        in.skip(PageCacheRecycler.BYTE_PAGE_SIZE - 4);
+        byte[] bytes = new byte[8];
+        in.readBytes(bytes, 0, 8);
+        assertEquals(value, new BytesArray(bytes).getLongLE(0));
+        out.close();
+    }
+
+    public void testWriteVIntFallbackToSuperClass() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.seek(PageCacheRecycler.BYTE_PAGE_SIZE - 2);
+        int value = randomIntBetween(128, Integer.MAX_VALUE);
+        out.writeVInt(value);
+
+        StreamInput in = out.bytes().streamInput();
+        in.skip(PageCacheRecycler.BYTE_PAGE_SIZE - 2);
+        assertEquals(value, in.readVInt());
+        out.close();
+    }
+
+    public void testWriteStringFallbackToSuperClass() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.seek(PageCacheRecycler.BYTE_PAGE_SIZE - 10);
+        String value = randomAlphaOfLength(100);
+        out.writeString(value);
+
+        StreamInput in = out.bytes().streamInput();
+        in.skip(PageCacheRecycler.BYTE_PAGE_SIZE - 10);
+        assertEquals(value, in.readString());
+        out.close();
+    }
+
+    public void testMoveToBytesReference() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        byte[] testData = randomizedByteArrayWithSize(100);
+        out.writeBytes(testData);
+
+        ReleasableBytesReference ref = out.moveToBytesReference();
+        assertArrayEquals(testData, BytesReference.toBytes(ref));
+
+        // Verify that pages are nulled after move
+        assertEquals(0, out.size());
+
+        // ISE after close
+        expectThrows(IllegalStateException.class, () -> out.write(randomByte()));
+        expectThrows(IllegalStateException.class, () -> out.write(randomByteArrayOfLength(1)));
+
+        // Verify that close becomes noop after move
+        out.close(); // Should not throw
+
+        ref.close();
+    }
+
+    public void testMultipleCloseOperations() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.writeBytes(randomizedByteArrayWithSize(10));
+
+        // First close should work normally
+        out.close();
+
+        // ISE after close
+        expectThrows(IllegalStateException.class, () -> out.write(randomByte()));
+        expectThrows(IllegalStateException.class, () -> out.write(randomByteArrayOfLength(1)));
+
+        // Subsequent closes should be no-op and not throw
+        out.close();
+        out.close();
+    }
+
+    public void testWriteMultiplePagesWithLargeArray() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Write data that spans multiple pages
+        int dataSize = PageCacheRecycler.BYTE_PAGE_SIZE * 3 + 100;
+        byte[] largeData = randomizedByteArrayWithSize(dataSize);
+        out.writeBytes(largeData);
+
+        assertEquals(dataSize, out.size());
+        assertArrayEquals(largeData, BytesReference.toBytes(out.bytes()));
+        out.close();
+    }
+
+    public void testEnsureCapacityWithMultiplePages() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Force allocation of multiple pages by seeking to a large position
+        long targetPosition = (long) PageCacheRecycler.BYTE_PAGE_SIZE * 5;
+        out.seek(targetPosition);
+        byte testByte = randomByte();
+        out.writeByte(testByte);
+
+        BytesReference bytes = out.bytes();
+        assertEquals(targetPosition + 1, bytes.length());
+        assertEquals(testByte, bytes.get((int) targetPosition));
+        out.close();
+    }
+
+    public void testVIntLengthCalculation() {
+        // Test edge cases for vint length calculation
+        assertEquals(1, RecyclerBytesStreamOutput.vIntLength(0));
+        assertEquals(1, RecyclerBytesStreamOutput.vIntLength(127));
+        assertEquals(2, RecyclerBytesStreamOutput.vIntLength(128));
+        assertEquals(2, RecyclerBytesStreamOutput.vIntLength(16383));
+        assertEquals(3, RecyclerBytesStreamOutput.vIntLength(16384));
+        assertEquals(3, RecyclerBytesStreamOutput.vIntLength(2097151));
+        assertEquals(4, RecyclerBytesStreamOutput.vIntLength(2097152));
+        assertEquals(4, RecyclerBytesStreamOutput.vIntLength(268435455));
+        assertEquals(5, RecyclerBytesStreamOutput.vIntLength(268435456));
+        assertEquals(5, RecyclerBytesStreamOutput.vIntLength(Integer.MAX_VALUE));
+    }
+
+    public void testLegacyWriteWithSizePrefix() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        TestWriteable writeable = new TestWriteable(true);
+        out.legacyWriteWithSizePrefix(writeable);
+
+        StreamInput in = out.bytes().streamInput();
+        int size = in.readVInt();
+        assertTrue(size > 0);
+        TestWriteable read = new TestWriteable(in);
+        assertEquals(writeable.value, read.value);
+        out.close();
+    }
+
+    public void testSeekToZero() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Write some data first
+        out.writeBytes(randomizedByteArrayWithSize(100));
+        assertTrue(out.position() > 0);
+
+        // Seek back to zero
+        out.seek(0);
+        assertEquals(0, out.position());
+
+        // Write new data from beginning
+        byte[] newData = randomizedByteArrayWithSize(50);
+        out.writeBytes(newData);
+
+        BytesReference bytes = out.bytes();
+        byte[] result = new byte[50];
+        try (StreamInput in = bytes.slice(0, 50).streamInput()) {
+            assertThat(in.read(result), equalTo(50));
+            assertArrayEquals(newData, result);
+        }
+        out.close();
+    }
+
+    public void testPageBoundarySeekingEdgeCases() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Test seeking to exact page boundaries
+        for (int page = 1; page <= 3; page++) {
+            long position = (long) page * PageCacheRecycler.BYTE_PAGE_SIZE;
+            out.seek(position);
+            assertEquals(position, out.position());
+
+            byte testByte = (byte) page;
+            out.writeByte(testByte);
+            assertEquals(testByte, out.bytes().get((int) position));
+        }
+        out.close();
+    }
+
+    public void testWriteEmptyByteArray() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        int initialSize = out.size();
+        out.writeBytes(new byte[0]);
+        assertEquals(initialSize, out.size());
+
+        out.writeBytes(new byte[0], 0, 0);
+        assertEquals(initialSize, out.size());
+        out.close();
+    }
+
+    public void testNearMaxCapacityHandling() {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Test seeking to near maximum capacity
+        long nearMaxPosition = Integer.MAX_VALUE - 1000L;
+
+        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> out.seek(nearMaxPosition));
+        assertTrue(ex.getMessage().contains("cannot hold more than 2GB"));
+        out.close();
+    }
+
+    public void testMultiplePageAllocation() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Force allocation of multiple pages through large skip
+        int skipAmount = PageCacheRecycler.BYTE_PAGE_SIZE * 2 + 100;
+        out.skip(skipAmount);
+        assertEquals(skipAmount, out.position());
+
+        // Write at the end position
+        byte testByte = randomByte();
+        out.writeByte(testByte);
+        assertEquals(testByte, out.bytes().get(skipAmount));
+        out.close();
+    }
+
+    public void testBytesReferenceWithExactPageBoundary() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Write exactly one page worth of data
+        byte[] pageData = randomizedByteArrayWithSize(PageCacheRecycler.BYTE_PAGE_SIZE);
+        out.writeBytes(pageData);
+
+        BytesReference bytes = out.bytes();
+        assertEquals(PageCacheRecycler.BYTE_PAGE_SIZE, bytes.length());
+        assertArrayEquals(pageData, BytesReference.toBytes(bytes));
+        out.close();
+    }
+
+    public void testWriteAcrossMultiplePageBoundaries() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Start near end of first page
+        int startPos = PageCacheRecycler.BYTE_PAGE_SIZE - 10;
+        out.seek(startPos);
+
+        // Write data that spans 3 pages
+        int dataSize = PageCacheRecycler.BYTE_PAGE_SIZE * 2 + 20;
+        byte[] spanningData = randomizedByteArrayWithSize(dataSize);
+        out.writeBytes(spanningData);
+
+        BytesReference bytes = out.bytes();
+        byte[] result = new byte[dataSize];
+        try (StreamInput in = bytes.slice(startPos, dataSize).streamInput()) {
+            assertThat(in.read(result), equalTo(dataSize));
+            assertArrayEquals(spanningData, result);
+        }
+        out.close();
+    }
+
+    public void testOperationsAfterMoveThrowIllegalStateException() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+        out.writeBytes(randomizedByteArrayWithSize(10));
+
+        ReleasableBytesReference ref = out.moveToBytesReference();
+
+        // Test that all write operations throw IllegalStateException after move
+        expectThrows(IllegalStateException.class, () -> out.seek(100));
+        expectThrows(IllegalStateException.class, () -> out.skip(10));
+        expectThrows(IllegalStateException.class, () -> out.writeByte(randomByte()));
+        expectThrows(IllegalStateException.class, () -> out.writeInt(randomInt()));
+        expectThrows(IllegalStateException.class, () -> out.writeLong(randomLong()));
+        expectThrows(IllegalStateException.class, () -> out.writeVInt(randomInt()));
+        expectThrows(IllegalStateException.class, () -> out.writeString("test"));
+
+        ref.close();
+    }
+
+    public void testVIntEdgeCases() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Test boundary values for each vint length category
+        int[] boundaryValues = {
+            0,
+            127,
+            128,           // 1-2 byte boundary
+            16383,
+            16384,          // 2-3 byte boundary
+            2097151,
+            2097152,      // 3-4 byte boundary
+            268435455,
+            268435456,  // 4-5 byte boundary
+            Integer.MAX_VALUE      // Maximum value
+        };
+
+        for (int value : boundaryValues) {
+            out.writeVInt(value);
+        }
+
+        StreamInput in = out.bytes().streamInput();
+        for (int expectedValue : boundaryValues) {
+            assertEquals(expectedValue, in.readVInt());
+        }
+
+        out.close();
+    }
+
+    public void testWriteStringWithMaxUnicodeCharacters() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Test string with 3-byte UTF-8 characters
+        String unicodeString = "\u20AC\u20AC\u20AC"; // Euro symbols (3 bytes each)
+        out.writeString(unicodeString);
+
+        StreamInput in = out.bytes().streamInput();
+        assertEquals(unicodeString, in.readString());
+
+        out.close();
+    }
+
+    public void testSeekBeyondIntegerMaxValue() {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Test seeking beyond the 2GB limit
+        long beyondLimit = (long) Integer.MAX_VALUE + 1;
+        IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> { out.seek(beyondLimit); });
+        assertTrue(ex.getMessage().contains("cannot hold more than 2GB"));
+
+        out.close();
+    }
+
+    public void testBytesReferenceForEmptyStream() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        BytesReference bytes = out.bytes();
+        assertEquals(0, bytes.length());
+        assertSame(org.elasticsearch.common.bytes.BytesArray.EMPTY, bytes);
+
+        out.close();
+    }
+
+    public void testLegacyWriteWithSizePrefixResourceManagement() throws IOException {
+        RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
+
+        // Create a writeable that exercises the tmp stream cleanup logic
+        TestWriteable writeable = new TestWriteable(randomBoolean());
+        out.legacyWriteWithSizePrefix(writeable);
+
+        // Verify we can read it back correctly
+        StreamInput in = out.bytes().streamInput();
+        int size = in.readVInt();
+        assertTrue(size > 0);
+        TestWriteable read = new TestWriteable(in);
+        assertEquals(writeable.value, read.value);
+
+        out.close();
+    }
 }