Browse Source

Speed up serialization of BytesRefArray (#106053)

Currently, we are reading and writing byte by byte during the 
serialization and deserialization of a BytesRefArray. We can improve the
performance by reading/writing through the backing pages or the
underlying array instead. I will open a follow-up PR to utilize this
change in serializing BytesRefBlock in ESQL.
Nhat Nguyen 1 year ago
parent
commit
6b430ae750

+ 5 - 0
docs/changelog/106053.yaml

@@ -0,0 +1,5 @@
+pr: 106053
+summary: Speed up serialization of `BytesRefArray`
+area: ES|QL
+type: enhancement
+issues: []

+ 23 - 0
server/src/main/java/org/elasticsearch/common/util/BigArrays.java

@@ -10,10 +10,12 @@ package org.elasticsearch.common.util;
 
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.core.Nullable;
@@ -143,6 +145,27 @@ public class BigArrays {
             Arrays.fill(array, (int) fromIndex, (int) toIndex, value);
         }
 
+        @Override
+        public BytesRefIterator iterator() {
+            return new BytesRefIterator() {
+                boolean visited = false;
+
+                @Override
+                public BytesRef next() {
+                    if (visited) {
+                        return null;
+                    }
+                    visited = true;
+                    return new BytesRef(array, 0, Math.toIntExact(size()));
+                }
+            };
+        }
+
+        @Override
+        public void fillWith(StreamInput in) throws IOException {
+            in.readBytes(array, 0, Math.toIntExact(size()));
+        }
+
         @Override
         public boolean hasArray() {
             return true;

+ 27 - 0
server/src/main/java/org/elasticsearch/common/util/BigByteArray.java

@@ -10,7 +10,9 @@ package org.elasticsearch.common.util;
 
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 
 import java.io.IOException;
@@ -18,6 +20,7 @@ import java.util.Arrays;
 
 import static org.elasticsearch.common.util.BigLongArray.writePages;
 import static org.elasticsearch.common.util.PageCacheRecycler.BYTE_PAGE_SIZE;
+import static org.elasticsearch.common.util.PageCacheRecycler.PAGE_SIZE_IN_BYTES;
 
 /**
  * Byte array abstraction able to support more than 2B values. This implementation slices data into fixed-sized blocks of
@@ -139,6 +142,30 @@ final class BigByteArray extends AbstractBigArray implements ByteArray {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public BytesRefIterator iterator() {
+        return new BytesRefIterator() {
+            int i = 0;
+
+            @Override
+            public BytesRef next() {
+                if (i >= pages.length) {
+                    return null;
+                }
+                int len = i == pages.length - 1 ? Math.toIntExact(size - (pages.length - 1L) * PAGE_SIZE_IN_BYTES) : PAGE_SIZE_IN_BYTES;
+                return new BytesRef(pages[i++], 0, len);
+            }
+        };
+    }
+
+    @Override
+    public void fillWith(StreamInput in) throws IOException {
+        for (int i = 0; i < pages.length - 1; i++) {
+            in.readBytes(pages[i], 0, PAGE_SIZE_IN_BYTES);
+        }
+        in.readBytes(pages[pages.length - 1], 0, Math.toIntExact(size - (pages.length - 1L) * PAGE_SIZE_IN_BYTES));
+    }
+
     @Override
     protected int numBytesPerElement() {
         return 1;

+ 12 - 0
server/src/main/java/org/elasticsearch/common/util/ByteArray.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.common.util;
 
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.Writeable;
 
@@ -51,6 +52,17 @@ public interface ByteArray extends BigArray, Writeable {
      */
     void fill(long fromIndex, long toIndex, byte value);
 
+    /**
+     * Fills this ByteArray with bytes from the given input stream
+     */
+    void fillWith(StreamInput in) throws IOException;
+
+    /**
+     * Returns a BytesRefIterator for this ByteArray. This method allows
+     * access to the internal pages of this reference without copying them.
+     */
+    BytesRefIterator iterator();
+
     /**
      * Checks if this instance is backed by a single byte array analogous to {@link ByteBuffer#hasArray()}.
      */

+ 11 - 7
server/src/main/java/org/elasticsearch/common/util/BytesRefArray.java

@@ -10,6 +10,7 @@ package org.elasticsearch.common.util;
 
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -64,10 +65,7 @@ public final class BytesRefArray implements Accountable, Releasable, Writeable {
             // bytes
             long sizeOfBytes = in.readVLong();
             bytes = bigArrays.newByteArray(sizeOfBytes, false);
-
-            for (long i = 0; i < sizeOfBytes; ++i) {
-                bytes.set(i, in.readByte());
-            }
+            bytes.fillWith(in);
 
             success = true;
         } finally {
@@ -149,11 +147,17 @@ public final class BytesRefArray implements Accountable, Releasable, Writeable {
         }
 
         // bytes might be overallocated, the last bucket of startOffsets contains the real size
-        long sizeOfBytes = startOffsets.get(size);
+        final long sizeOfBytes = startOffsets.get(size);
         out.writeVLong(sizeOfBytes);
-        for (long i = 0; i < sizeOfBytes; ++i) {
-            out.writeByte(bytes.get(i));
+        final BytesRefIterator bytesIt = bytes.iterator();
+        BytesRef bytesRef;
+        long remained = sizeOfBytes;
+        while (remained > 0 && (bytesRef = bytesIt.next()) != null) {
+            int length = Math.toIntExact(Math.min(remained, bytesRef.length));
+            remained -= length;
+            out.writeBytes(bytesRef.bytes, bytesRef.offset, length);
         }
+        assert remained == 0 : remained;
     }
 
     @Override

+ 12 - 0
server/src/main/java/org/elasticsearch/common/util/ReleasableByteArray.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.common.util;
 
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.apache.lucene.util.RamUsageEstimator;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
@@ -88,6 +89,17 @@ public class ReleasableByteArray implements ByteArray {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public BytesRefIterator iterator() {
+        assert ref.hasReferences();
+        return ref.iterator();
+    }
+
+    @Override
+    public void fillWith(StreamInput in) {
+        throw new UnsupportedOperationException("read-only ByteArray");
+    }
+
     @Override
     public long ramBytesUsed() {
         /*

+ 23 - 0
server/src/test/java/org/elasticsearch/common/util/BigArraysTests.java

@@ -9,9 +9,11 @@
 package org.elasticsearch.common.util;
 
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.breaker.PreallocatedCircuitBreakerService;
+import org.elasticsearch.common.io.stream.ByteArrayStreamInput;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -275,6 +277,27 @@ public class BigArraysTests extends ESTestCase {
         array2.close();
     }
 
+    public void testByteIterator() throws Exception {
+        final byte[] bytes = new byte[randomIntBetween(1, 4000000)];
+        random().nextBytes(bytes);
+        ByteArray array = bigArrays.newByteArray(bytes.length, randomBoolean());
+        array.fillWith(new ByteArrayStreamInput(bytes));
+        for (int i = 0; i < bytes.length; i++) {
+            assertEquals(bytes[i], array.get(i));
+        }
+        BytesRefIterator it = array.iterator();
+        BytesRef ref;
+        int offset = 0;
+        while ((ref = it.next()) != null) {
+            for (int i = 0; i < ref.length; i++) {
+                assertEquals(bytes[offset], ref.bytes[ref.offset + i]);
+                offset++;
+            }
+        }
+        assertThat(offset, equalTo(bytes.length));
+        array.close();
+    }
+
     public void testByteArrayEquals() {
         final ByteArray empty1 = byteArrayWithBytes(BytesRef.EMPTY_BYTES);
         final ByteArray empty2 = byteArrayWithBytes(BytesRef.EMPTY_BYTES);

+ 12 - 0
test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java

@@ -17,9 +17,11 @@ import org.apache.lucene.tests.util.LuceneTestCase;
 import org.apache.lucene.util.Accountable;
 import org.apache.lucene.util.Accountables;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.common.breaker.CircuitBreaker;
 import org.elasticsearch.common.breaker.CircuitBreakingException;
 import org.elasticsearch.common.breaker.NoopCircuitBreaker;
+import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.ByteSizeValue;
@@ -398,6 +400,16 @@ public class MockBigArrays extends BigArrays {
             in.fill(fromIndex, toIndex, value);
         }
 
+        @Override
+        public BytesRefIterator iterator() {
+            return in.iterator();
+        }
+
+        @Override
+        public void fillWith(StreamInput streamInput) throws IOException {
+            in.fillWith(streamInput);
+        }
+
         @Override
         public boolean hasArray() {
             return in.hasArray();