1
0
Эх сурвалжийг харах

Use ByteBuffer as slice holder in BytesReferenceStreamInput (#85585)

This commit converts the handle for each slice in
BytesReferenceStreamInput to be a ByteBuffer. The ByteBuffer class
better interoperates with JDK based methods for decoding streams, and
itself uses intrinsics internally to provide faster decoding.
Ryan Ernst 2 жил өмнө
parent
commit
40698df6bc

+ 56 - 57
server/src/main/java/org/elasticsearch/common/bytes/BytesReferenceStreamInput.java

@@ -10,11 +10,11 @@ package org.elasticsearch.common.bytes;
 
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefIterator;
-import org.elasticsearch.common.Numbers;
 import org.elasticsearch.common.io.stream.StreamInput;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
  * A StreamInput that reads off a {@link BytesRefIterator}. This is used to provide
@@ -23,33 +23,39 @@ import java.io.IOException;
  */
 class BytesReferenceStreamInput extends StreamInput {
 
+    private static final ByteBuffer EMPTY = ByteBuffer.wrap(new byte[0]);
+
     protected final BytesReference bytesReference;
     private BytesRefIterator iterator;
-    private int sliceIndex;
-    private BytesRef slice;
-    private int sliceStartOffset; // the offset on the stream at which the current slice starts
-
+    private ByteBuffer slice;
+    private int totalOffset; // the offset on the stream at which the current slice starts
     private int mark = 0;
 
     BytesReferenceStreamInput(BytesReference bytesReference) throws IOException {
         this.bytesReference = bytesReference;
         this.iterator = bytesReference.iterator();
-        this.slice = iterator.next();
-        this.sliceStartOffset = 0;
-        this.sliceIndex = 0;
+        this.slice = convertToByteBuffer(iterator.next());
+        this.totalOffset = 0;
+    }
+
+    static ByteBuffer convertToByteBuffer(BytesRef bytesRef) {
+        if (bytesRef == null) {
+            return EMPTY;
+        }
+        // slice here forces the buffer to have a sliced view, keeping track of the original offset
+        return ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length).slice();
     }
 
     @Override
     public byte readByte() throws IOException {
         maybeNextSlice();
-        return slice.bytes[slice.offset + (sliceIndex++)];
+        return slice.get();
     }
 
     @Override
     public short readShort() throws IOException {
-        if (slice.length - sliceIndex >= 2) {
-            sliceIndex += 2;
-            return Numbers.bytesToShort(slice.bytes, slice.offset + sliceIndex - 2);
+        if (slice.remaining() >= 2) {
+            return slice.getShort();
         } else {
             // slow path
             return super.readShort();
@@ -58,9 +64,8 @@ class BytesReferenceStreamInput extends StreamInput {
 
     @Override
     public int readInt() throws IOException {
-        if (slice.length - sliceIndex >= 4) {
-            sliceIndex += 4;
-            return Numbers.bytesToInt(slice.bytes, slice.offset + sliceIndex - 4);
+        if (slice.remaining() >= 4) {
+            return slice.getInt();
         } else {
             // slow path
             return super.readInt();
@@ -69,9 +74,8 @@ class BytesReferenceStreamInput extends StreamInput {
 
     @Override
     public long readLong() throws IOException {
-        if (slice.length - sliceIndex >= 8) {
-            sliceIndex += 8;
-            return Numbers.bytesToLong(slice.bytes, slice.offset + sliceIndex - 8);
+        if (slice.remaining() >= 8) {
+            return slice.getLong();
         } else {
             // slow path
             return super.readLong();
@@ -80,30 +84,28 @@ class BytesReferenceStreamInput extends StreamInput {
 
     @Override
     public int readVInt() throws IOException {
-        if (slice.length - sliceIndex >= 5) {
-            final byte[] buf = slice.bytes;
-            final int offset = slice.offset;
-            byte b = buf[offset + sliceIndex++];
+        if (slice.remaining() >= 5) {
+            byte b = slice.get();
             if (b >= 0) {
                 return b;
             }
             int i = b & 0x7F;
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7F) << 7;
             if (b >= 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7F) << 14;
             if (b >= 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7F) << 21;
             if (b >= 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x0F) << 28;
             if ((b & 0xF0) == 0) {
                 return i;
@@ -115,55 +117,53 @@ class BytesReferenceStreamInput extends StreamInput {
 
     @Override
     public long readVLong() throws IOException {
-        if (slice.length - sliceIndex >= 10) {
-            final byte[] buf = slice.bytes;
-            final int offset = slice.offset;
-            byte b = buf[offset + sliceIndex++];
+        if (slice.remaining() >= 10) {
+            byte b = slice.get();
             long i = b & 0x7FL;
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7FL) << 7;
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7FL) << 14;
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7FL) << 21;
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7FL) << 28;
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7FL) << 35;
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7FL) << 42;
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= (b & 0x7FL) << 49;
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             i |= ((b & 0x7FL) << 56);
             if ((b & 0x80) == 0) {
                 return i;
             }
-            b = buf[offset + sliceIndex++];
+            b = slice.get();
             if (b != 0 && b != 1) {
                 throwOnBrokenVLong(b, i);
             }
@@ -175,27 +175,28 @@ class BytesReferenceStreamInput extends StreamInput {
     }
 
     protected int offset() {
-        return sliceStartOffset + sliceIndex;
+        return totalOffset + slice.position();
     }
 
     private void maybeNextSlice() throws IOException {
-        if (sliceIndex == slice.length) {
+        if (slice.hasRemaining() == false) {
             // moveToNextSlice is intentionally extracted to another method since it's the assumed cold-path
             moveToNextSlice();
         }
     }
 
     private void moveToNextSlice() throws IOException {
-        slice = iterator.next();
-        while (slice != null && slice.length == 0) {
+        totalOffset += slice.limit();
+        BytesRef bytesRef = iterator.next();
+        while (bytesRef != null && bytesRef.length == 0) {
             // rare corner case of a bytes reference that has a 0-length component
-            slice = iterator.next();
+            bytesRef = iterator.next();
         }
-        if (slice == null) {
+        if (bytesRef == null) {
             throw new EOFException();
         }
-        sliceStartOffset += sliceIndex;
-        sliceIndex = 0;
+        slice = convertToByteBuffer(bytesRef);
+        assert slice.position() == 0;
     }
 
     @Override
@@ -233,12 +234,11 @@ class BytesReferenceStreamInput extends StreamInput {
         int destOffset = bOffset;
         while (remaining > 0) {
             maybeNextSlice();
-            final int currentLen = Math.min(remaining, slice.length - sliceIndex);
+            final int currentLen = Math.min(remaining, slice.remaining());
             assert currentLen > 0 : "length has to be > 0 to make progress but was: " + currentLen;
-            System.arraycopy(slice.bytes, slice.offset + sliceIndex, b, destOffset, currentLen);
+            slice.get(b, destOffset, currentLen);
             destOffset += currentLen;
             remaining -= currentLen;
-            sliceIndex += currentLen;
             assert remaining >= 0 : "remaining: " + remaining;
         }
         return numBytesToCopy;
@@ -273,9 +273,9 @@ class BytesReferenceStreamInput extends StreamInput {
         int remaining = numBytesSkipped;
         while (remaining > 0) {
             maybeNextSlice();
-            int currentLen = Math.min(remaining, slice.length - sliceIndex);
+            int currentLen = Math.min(remaining, slice.remaining());
             remaining -= currentLen;
-            sliceIndex += currentLen;
+            slice.position(slice.position() + currentLen);
             assert remaining >= 0 : "remaining: " + remaining;
         }
         return numBytesSkipped;
@@ -283,13 +283,12 @@ class BytesReferenceStreamInput extends StreamInput {
 
     @Override
     public void reset() throws IOException {
-        if (sliceStartOffset <= mark) {
-            sliceIndex = mark - sliceStartOffset;
+        if (totalOffset <= mark) {
+            slice.position(mark - totalOffset);
         } else {
             iterator = bytesReference.iterator();
-            slice = iterator.next();
-            sliceStartOffset = 0;
-            sliceIndex = 0;
+            slice = convertToByteBuffer(iterator.next());
+            totalOffset = 0;
             final long skipped = skip(mark);
             assert skipped == mark : skipped + " vs " + mark;
         }