Browse Source

Big arrays sliced from netty buffers (byte) (#92706)

Based on #90745 but for longs. This should allow aggregations down the
road to read long values directly from netty buffer, rather than copying
it from the netty buffer.

Relates to #89437
Martijn van Groningen 2 years ago
parent
commit
c9be8d98f1

+ 7 - 0
server/src/main/java/org/elasticsearch/common/util/BigArrays.java

@@ -152,6 +152,13 @@ public class BigArrays {
         public byte[] array() {
             return array;
         }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            int size = Math.toIntExact(size()) * Byte.BYTES;
+            out.writeVInt(size);
+            out.write(array, 0, size);
+        }
     }
 
     private static class ByteArrayAsIntArrayWrapper extends AbstractArrayWrapper implements IntArray {

+ 8 - 0
server/src/main/java/org/elasticsearch/common/util/BigByteArray.java

@@ -11,9 +11,12 @@ package org.elasticsearch.common.util;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.io.stream.StreamOutput;
 
+import java.io.IOException;
 import java.util.Arrays;
 
+import static org.elasticsearch.common.util.BigLongArray.writePages;
 import static org.elasticsearch.common.util.PageCacheRecycler.BYTE_PAGE_SIZE;
 
 /**
@@ -36,6 +39,11 @@ final class BigByteArray extends AbstractBigArray implements ByteArray {
         }
     }
 
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        writePages(out, Math.toIntExact(size), pages, Byte.BYTES, BYTE_PAGE_SIZE);
+    }
+
     @Override
     public byte get(long index) {
         final int pageIndex = pageIndex(index);

+ 8 - 1
server/src/main/java/org/elasticsearch/common/util/ByteArray.java

@@ -9,13 +9,20 @@
 package org.elasticsearch.common.util;
 
 import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.Writeable;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
  * Abstraction of an array of byte values.
  */
-public interface ByteArray extends BigArray {
+public interface ByteArray extends BigArray, Writeable {
+
+    static ByteArray readFrom(StreamInput in) throws IOException {
+        return new ReleasableByteArray(in);
+    }
 
     /**
      * Get an element given its index.

+ 105 - 0
server/src/main/java/org/elasticsearch/common/util/ReleasableByteArray.java

@@ -0,0 +1,105 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.common.util;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.RamUsageEstimator;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+import static org.elasticsearch.common.util.BigArrays.indexIsInt;
+
+public class ReleasableByteArray implements ByteArray {
+
+    private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ReleasableByteArray.class);
+
+    private final ReleasableBytesReference ref;
+
+    ReleasableByteArray(StreamInput in) throws IOException {
+        this.ref = in.readReleasableBytesReference();
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeBytesReference(ref);
+    }
+
+    @Override
+    public long size() {
+        return ref.length() / Byte.BYTES;
+    }
+
+    @Override
+    public byte get(long index) {
+        assert indexIsInt(index);
+        return ref.get((int) index);
+    }
+
+    @Override
+    public boolean get(long index, int len, BytesRef ref) {
+        assert indexIsInt(index);
+        BytesReference sliced = this.ref.slice((int) index, len);
+        if (sliced.length() != 0) {
+            ref.offset = sliced.arrayOffset();
+            ref.length = sliced.length();
+            ref.bytes = sliced.array();
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public byte set(long index, byte value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void set(long index, byte[] buf, int offset, int len) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void fill(long fromIndex, long toIndex, byte value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean hasArray() {
+        return ref.hasArray();
+    }
+
+    @Override
+    public byte[] array() {
+        // The assumption of this method is that the returned array has valid entries starting from slot 0 and
+        // this isn't case when just returning the array from ReleasableBytesReference#array().
+        // The interface that this class implements should have something like an arrayOffset() method,
+        // so that callers know from what array offset the first actual byte starts.
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long ramBytesUsed() {
+        /*
+         * If we return the size of the buffer that we've sliced
+         * we're likely to double count things.
+         */
+        return SHALLOW_SIZE;
+    }
+
+    @Override
+    public void close() {
+        ref.decRef();
+    }
+
+}

+ 20 - 0
server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceStreamInputTests.java

@@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.AbstractStreamTests;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
 import org.elasticsearch.common.util.DoubleArray;
 import org.elasticsearch.common.util.IntArray;
 import org.elasticsearch.common.util.LongArray;
@@ -120,4 +121,23 @@ public class ReleasableBytesReferenceStreamInputTests extends AbstractStreamTest
         assertThat(ref.hasReferences(), equalTo(false));
     }
 
+    public void testBigByteArrayLivesAfterReleasableIsDecremented() throws IOException {
+        ByteArray testData = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(1, false);
+        testData.set(0L, (byte) 1);
+
+        BytesStreamOutput out = new BytesStreamOutput();
+        testData.writeTo(out);
+
+        ReleasableBytesReference ref = ReleasableBytesReference.wrap(out.bytes());
+
+        try (ByteArray in = ByteArray.readFrom(ref.streamInput())) {
+            ref.decRef();
+            assertThat(ref.hasReferences(), equalTo(true));
+
+            assertThat(in.size(), equalTo(testData.size()));
+            assertThat(in.get(0), equalTo((byte) 1));
+        }
+        assertThat(ref.hasReferences(), equalTo(false));
+    }
+
 }

+ 28 - 2
server/src/test/java/org/elasticsearch/common/io/stream/AbstractStreamTests.java

@@ -14,6 +14,7 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.SecureString;
 import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
 import org.elasticsearch.common.util.DoubleArray;
 import org.elasticsearch.common.util.IntArray;
 import org.elasticsearch.common.util.LongArray;
@@ -274,11 +275,11 @@ public abstract class AbstractStreamTests extends ESTestCase {
     }
 
     public void testSmallBigLongArray() throws IOException {
-        assertBigLongArray(between(0, PageCacheRecycler.DOUBLE_PAGE_SIZE));
+        assertBigLongArray(between(0, PageCacheRecycler.LONG_PAGE_SIZE));
     }
 
     public void testLargeBigLongArray() throws IOException {
-        assertBigLongArray(between(PageCacheRecycler.DOUBLE_PAGE_SIZE, 10000));
+        assertBigLongArray(between(PageCacheRecycler.LONG_PAGE_SIZE, 10000));
     }
 
     private void assertBigLongArray(int size) throws IOException {
@@ -298,6 +299,31 @@ public abstract class AbstractStreamTests extends ESTestCase {
         }
     }
 
+    public void testSmallBigByteArray() throws IOException {
+        assertBigByteArray(between(0, PageCacheRecycler.BYTE_PAGE_SIZE / 10));
+    }
+
+    public void testLargeBigByteArray() throws IOException {
+        assertBigByteArray(between(PageCacheRecycler.BYTE_PAGE_SIZE / 10, PageCacheRecycler.BYTE_PAGE_SIZE * 10));
+    }
+
+    private void assertBigByteArray(int size) throws IOException {
+        ByteArray testData = BigArrays.NON_RECYCLING_INSTANCE.newByteArray(size, false);
+        for (int i = 0; i < size; i++) {
+            testData.set(i, randomByte());
+        }
+
+        BytesStreamOutput out = new BytesStreamOutput();
+        testData.writeTo(out);
+
+        try (ByteArray in = ByteArray.readFrom(getStreamInput(out.bytes()))) {
+            assertThat(in.size(), equalTo(testData.size()));
+            for (int i = 0; i < size; i++) {
+                assertThat(in.get(i), equalTo(testData.get(i)));
+            }
+        }
+    }
+
     public void testCollection() throws IOException {
         class FooBar implements Writeable {
 

+ 5 - 0
test/framework/src/main/java/org/elasticsearch/common/util/MockBigArrays.java

@@ -412,6 +412,11 @@ public class MockBigArrays extends BigArrays {
         public Collection<Accountable> getChildResources() {
             return Collections.singleton(Accountables.namedAccountable("delegate", in));
         }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            in.writeTo(out);
+        }
     }
 
     private class IntArrayWrapper extends AbstractArrayWrapper implements IntArray {