Browse Source

PagedBytesReference with a boatload of tests. Passes all new and
existing tests. Non-allocating hashCode/Equals, zero-copy writeTo() and
ChannelBuffer support.

Fix for #5427

Holger Hoffstätte 11 years ago
parent
commit
977ed1dc15

+ 443 - 0
src/main/java/org/elasticsearch/common/bytes/PagedBytesReference.java

@@ -0,0 +1,443 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.bytes;
+
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.CharsRef;
+import org.apache.lucene.util.UnicodeUtil;
+import org.elasticsearch.ElasticsearchIllegalArgumentException;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.GatheringByteChannel;
+import java.util.Arrays;
+
+public final class PagedBytesReference implements BytesReference {
+
+    private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
+    private static final int NIO_GATHERING_LIMIT = 524288;
+
+    private final BigArrays bigarrays;
+    private final ByteArray bytearray;
+    private final int offset;
+    private final int length;
+    private int hash = 0;
+
+    public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int length) {
+        this(bigarrays, bytearray, 0, length);
+    }
+
+    public PagedBytesReference(BigArrays bigarrays, ByteArray bytearray, int from, int length) {
+        this.bigarrays = bigarrays;
+        this.bytearray = bytearray;
+        this.offset = from;
+        this.length = length;
+    }
+
+    @Override
+    public byte get(int index) {
+        return bytearray.get(offset + index);
+    }
+
+    @Override
+    public int length() {
+        return length;
+    }
+
+    @Override
+    public BytesReference slice(int from, int length) {
+        if (from < 0 || (from + length) > length()) {
+            throw new ElasticsearchIllegalArgumentException("can't slice a buffer with length [" + length() + "], with slice parameters from [" + from + "], length [" + length + "]");
+        }
+
+        return new PagedBytesReference(bigarrays, bytearray, offset + from, length);
+    }
+
+    @Override
+    public StreamInput streamInput() {
+        return new PagedBytesReferenceStreamInput(bytearray, offset, length);
+    }
+
+    @Override
+    public void writeTo(OutputStream os) throws IOException {
+            BytesRef ref = new BytesRef();
+            int written = 0;
+            
+            // are we a slice?
+            if (offset != 0) {
+                // remaining size of page fragment at offset
+                int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
+                bytearray.get(offset, fragmentSize, ref);
+                os.write(ref.bytes, ref.offset, fragmentSize);
+                written += fragmentSize;
+            }
+
+            // handle remainder of pages + trailing fragment
+            while (written < length) {
+                int remaining = length - written;
+                int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
+                bytearray.get(offset + written, bulkSize, ref);
+                os.write(ref.bytes, ref.offset, bulkSize);
+                written += bulkSize;
+            }
+    }
+
+    @Override
+    public void writeTo(GatheringByteChannel channel) throws IOException {
+        ByteBuffer[] buffers;
+        ByteBuffer currentBuffer = null;
+        BytesRef ref = new BytesRef();
+        int pos = 0; 
+
+        // are we a slice?
+        if (offset != 0) {
+            // remaining size of page fragment at offset
+            int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
+            bytearray.get(offset, fragmentSize, ref);
+            currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, fragmentSize);
+            pos += fragmentSize;
+        }
+
+        // we only have a single page
+        if (pos == length && currentBuffer != null) {
+            channel.write(currentBuffer);
+            return;
+        }
+
+        // a slice > pagesize will likely require extra buffers for initial/trailing fragments
+        int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);
+
+        buffers = new ByteBuffer[numBuffers];
+        int bufferSlot = 0;
+        
+        if (currentBuffer != null) {
+            buffers[bufferSlot] = currentBuffer;
+            bufferSlot++;
+        }
+
+        // handle remainder of pages + trailing fragment
+        while (pos < length) {
+            int remaining = length - pos;
+            int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
+            bytearray.get(offset + pos, bulkSize, ref);
+            currentBuffer = ByteBuffer.wrap(ref.bytes, ref.offset, bulkSize);
+            buffers[bufferSlot] = currentBuffer;
+            bufferSlot++;
+            pos += bulkSize;
+        }
+
+        // this would indicate that our numBuffer calculation is off by one.
+        assert (numBuffers == bufferSlot);
+
+        // finally write all buffers
+        channel.write(buffers);
+    }
+
+    @Override
+    public byte[] toBytes() {
+        if (length == 0) {
+            return BytesRef.EMPTY_BYTES;
+        }
+
+        BytesRef ref = new BytesRef();
+        bytearray.get(offset, length, ref);
+
+        // undo the single-page optimization by ByteArray.get(), otherwise
+        // a materialized stream will contain traling garbage/zeros
+        byte[] result = ref.bytes;
+        if (result.length != length || ref.offset != 0) {
+            result = Arrays.copyOfRange(result, ref.offset, ref.offset + length);
+        }
+
+        return result;
+    }
+
+    @Override
+    public BytesArray toBytesArray() {
+        BytesRef ref = new BytesRef();
+        bytearray.get(offset, length, ref);
+        return new BytesArray(ref);
+    }
+
+    @Override
+    public BytesArray copyBytesArray() {
+        BytesRef ref = new BytesRef();
+        boolean copied = bytearray.get(offset, length, ref);
+
+        if (copied) {
+            // BigArray has materialized for us, no need to do it again
+            return new BytesArray(ref.bytes, ref.offset, ref.length);
+        }
+        else {
+            // here we need to copy the bytes even when shared
+            byte[] copy = Arrays.copyOfRange(ref.bytes, ref.offset, ref.offset + ref.length);
+            return new BytesArray(copy);
+        }
+    }
+
+    @Override
+    public ChannelBuffer toChannelBuffer() {
+        ChannelBuffer[] buffers;
+        ChannelBuffer currentBuffer = null;
+        BytesRef ref = new BytesRef();
+        int pos = 0; 
+
+        // are we a slice?
+        if (offset != 0) {
+            // remaining size of page fragment at offset
+            int fragmentSize = Math.min(length, PAGE_SIZE - (offset % PAGE_SIZE));
+            bytearray.get(offset, fragmentSize, ref);
+            currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, fragmentSize);
+            pos += fragmentSize;
+        }
+
+        // no need to create a composite buffer for a single page
+        if (pos == length && currentBuffer != null) {
+            return currentBuffer;
+        }
+
+        // a slice > pagesize will likely require extra buffers for initial/trailing fragments
+        int numBuffers = countRequiredBuffers((currentBuffer != null ? 1 : 0), length - pos);
+
+        buffers = new ChannelBuffer[numBuffers];
+        int bufferSlot = 0;
+
+        if (currentBuffer != null) {
+            buffers[bufferSlot] = currentBuffer;
+            bufferSlot++;
+        }
+
+        // handle remainder of pages + trailing fragment
+        while (pos < length) {
+            int remaining = length - pos;
+            int bulkSize = (remaining > PAGE_SIZE) ? PAGE_SIZE : remaining;
+            bytearray.get(offset + pos, bulkSize, ref);
+            currentBuffer = ChannelBuffers.wrappedBuffer(ref.bytes, ref.offset, bulkSize);
+            buffers[bufferSlot] = currentBuffer;
+            bufferSlot++;
+            pos += bulkSize;
+        }
+
+        // this would indicate that our numBuffer calculation is off by one.
+        assert (numBuffers == bufferSlot);
+
+        // we can use gathering writes from the ChannelBuffers, but only if they are
+        // moderately small to prevent OOMs due to DirectBuffer allocations.
+        return ChannelBuffers.wrappedBuffer(length <= NIO_GATHERING_LIMIT, buffers);
+    }
+
+    @Override
+    public boolean hasArray() {
+        return (offset + length <= PAGE_SIZE);
+    }
+
+    @Override
+    public byte[] array() {
+        if (hasArray()) {
+            if (length == 0) {
+                return BytesRef.EMPTY_BYTES;
+            }
+
+            BytesRef ref = new BytesRef();
+            bytearray.get(offset, length, ref);
+            return ref.bytes;
+        }
+
+        throw new IllegalStateException("array not available");
+    }
+
+    @Override
+    public int arrayOffset() {
+        if (hasArray()) {
+            BytesRef ref = new BytesRef();
+            bytearray.get(offset, length, ref);
+            return ref.offset;
+        }
+
+        throw new IllegalStateException("array not available");
+    }
+
+    @Override
+    public String toUtf8() {
+        if (length() == 0) {
+            return "";
+        }
+
+        byte[] bytes = toBytes();
+        final CharsRef ref = new CharsRef(length);
+        UnicodeUtil.UTF8toUTF16(bytes, offset, length, ref);
+        return ref.toString();
+    }
+
+    @Override
+    public BytesRef toBytesRef() {
+        BytesRef bref = new BytesRef();
+        // if length <= pagesize this will dereference the page, or materialize the byte[]
+        bytearray.get(offset, length, bref);
+        return bref;
+    }
+
+    @Override
+    public BytesRef copyBytesRef() {
+        byte[] bytes = toBytes();
+        return new BytesRef(bytes, offset, length);
+    }
+
+    @Override
+    public int hashCode() {
+        if (hash == 0) {
+            // TODO: delegate to BigArrays via:
+            // hash = bigarrays.hashCode(bytearray);
+            // and for slices:
+            // hash = bigarrays.hashCode(bytearray, offset, length);
+            int tmphash = 1;
+            for (int i = 0; i < length; i++) {
+                tmphash = 31 * tmphash + bytearray.get(offset + i);
+            }
+            hash = tmphash;
+        }
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+
+        if (!(obj instanceof PagedBytesReference)) {
+            return BytesReference.Helper.bytesEqual(this, (BytesReference)obj);
+        }
+
+        PagedBytesReference other = (PagedBytesReference)obj;
+        if (length != other.length) {
+            return false;
+        }
+
+        // TODO: delegate to BigArrays via:
+        // return bigarrays.equals(bytearray, other.bytearray);
+        // and for slices:
+        // return bigarrays.equals(bytearray, start, other.bytearray, otherstart, len);
+        ByteArray otherArray = other.bytearray;
+        int otherOffset = other.offset;
+        for (int i = 0; i < length; i++) {
+            if (bytearray.get(offset + i) != otherArray.get(otherOffset + i)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private int countRequiredBuffers(int initialCount, int numBytes) {
+        int numBuffers = initialCount;
+        // an "estimate" of how many pages remain - rounded down
+        int pages = numBytes / PAGE_SIZE;
+        // a remaining fragment < pagesize needs at least one buffer
+        numBuffers += (pages == 0) ? 1 : pages;
+        // a remainder that is not a multiple of pagesize also needs an extra buffer
+        numBuffers += (pages > 0 && numBuffers % PAGE_SIZE > 0) ? 1 : 0;
+        return numBuffers;
+    }
+
+    private static class PagedBytesReferenceStreamInput extends StreamInput {
+
+        private final ByteArray bytearray;
+        private final int offset;
+        private final int length;
+        private int pos;
+
+        public PagedBytesReferenceStreamInput(ByteArray bytearray, int offset, int length) {
+            this.bytearray = bytearray;
+            this.offset = offset;
+            this.length = length;
+            this.pos = 0;
+
+            if (offset + length > bytearray.size()) {
+                throw new IndexOutOfBoundsException("offset+length >= bytearray.size()");
+            }
+        }
+
+        @Override
+        public byte readByte() throws IOException {
+            if (pos >= length) {
+                throw new EOFException();
+            }
+
+            return bytearray.get(offset + pos++);
+        }
+
+        @Override
+        public void readBytes(byte[] b, int bOffset, int len) throws IOException {
+            if (len > offset + length) {
+                throw new IndexOutOfBoundsException("Cannot read " + len + " bytes from stream with length " + length + " at pos " + pos);
+            }
+
+            read(b, bOffset, len);
+        }
+
+        @Override
+        public int read() throws IOException {
+            return (pos < length) ? bytearray.get(offset + pos++) : -1; 
+        }
+
+        @Override
+        public int read(byte[] b, int bOffset, int len) throws IOException {
+            if (len == 0) {
+                return 0;
+            }
+
+            if (pos >= offset + length) {
+                return -1;
+            }
+
+            // we need to stop at the end
+            len = Math.min(length, len);
+
+            // ByteArray.get(BytesRef) does not work since it flips the
+            // ref's byte[] pointer, so for now we copy byte-by-byte
+            int written = 0;
+            while (written < len) {
+                b[bOffset + written] = bytearray.get(offset + written);
+                written++;
+            }
+
+            pos += written;
+            return written;
+        }
+
+        @Override
+        public void reset() throws IOException {
+            pos = 0;
+        }
+
+        @Override
+        public void close() throws IOException {
+            // do nothing
+        }
+
+    }
+}

+ 2 - 5
src/main/java/org/elasticsearch/common/io/stream/BytesStreamOutput.java

@@ -19,9 +19,8 @@
 
 package org.elasticsearch.common.io.stream;
 
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.bytes.PagedBytesReference;
 import org.elasticsearch.common.io.BytesStream;
 import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.ByteArray;
@@ -154,9 +153,7 @@ public class BytesStreamOutput extends StreamOutput implements BytesStream {
 
     @Override
     public BytesReference bytes() {
-        BytesRef bref = new BytesRef();
-        bytes.get(0, count, bref);
-        return new BytesArray(bref, false);
+        return new PagedBytesReference(bigarrays, bytes, count);
     }
 
     private void ensureCapacity(int offset) {

+ 467 - 0
src/test/java/org/elasticsearch/common/bytes/PagedBytesReferenceTest.java

@@ -0,0 +1,467 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.bytes;
+
+import org.apache.lucene.util.BytesRef;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.util.ByteArray;
+import org.elasticsearch.test.ElasticsearchTestCase;
+import org.elasticsearch.test.cache.recycler.MockBigArrays;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class PagedBytesReferenceTest extends ElasticsearchTestCase {
+
+    private static final int PAGE_SIZE = BigArrays.BYTE_PAGE_SIZE;
+
+    private MockBigArrays bigarrays;
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        bigarrays = new MockBigArrays(ImmutableSettings.EMPTY, null);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        // necessary since we currently never release BigArrays
+        MockBigArrays.reset();
+        super.tearDown();
+    }
+
+    @Test
+    public void testGet() {
+        int length = randomInt(PAGE_SIZE * 3);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        int sliceOffset = randomIntBetween(0, length / 2);
+        int sliceLength = Math.max(1, length - sliceOffset - 1);
+        BytesReference slice = pbr.slice(sliceOffset, sliceLength);
+        assertEquals(pbr.get(sliceOffset), slice.get(0));
+        assertEquals(pbr.get(sliceOffset + sliceLength), slice.get(sliceLength));
+    }
+
+    public void testLength() {
+        int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomInt(PAGE_SIZE * 3)};
+
+        for (int i = 0; i < sizes.length; i++) {
+            BytesReference pbr = getRandomizedPagedBytesReference(sizes[i]);
+            assertEquals(sizes[i], pbr.length());
+        }
+    }
+
+    public void testSlice() {
+        int length = randomInt(PAGE_SIZE * 3);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        int sliceOffset = randomIntBetween(0, length / 2);
+        int sliceLength = Math.max(1, length - sliceOffset - 1);
+        BytesReference slice = pbr.slice(sliceOffset, sliceLength);
+        assertEquals(sliceLength, slice.length());
+
+        if (slice.hasArray()) {
+            assertEquals(sliceOffset, slice.arrayOffset());
+        }
+        else {
+            try {
+                slice.arrayOffset();
+                fail("expected IllegalStateException");
+            }
+            catch (IllegalStateException ise) {
+                // expected
+            }
+        }
+    }
+
+    public void testStreamInput() throws IOException {
+        int length = randomIntBetween(10, PAGE_SIZE * 3);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        StreamInput si = pbr.streamInput();
+        assertNotNull(si);
+
+        // read single bytes one by one
+        assertEquals(pbr.get(0), si.readByte());
+        assertEquals(pbr.get(1), si.readByte());
+        assertEquals(pbr.get(2), si.readByte());
+        si.reset();
+
+        // buffer for bulk reads
+        byte[] origBuf = new byte[length];
+        getRandom().nextBytes(origBuf);
+        byte[] targetBuf = Arrays.copyOf(origBuf, origBuf.length);
+
+        // bulk-read 0 bytes: must not modify buffer
+        si.readBytes(targetBuf, 0, 0);
+        assertEquals(origBuf[0], targetBuf[0]);
+        si.reset();
+
+        // read an int
+        int i = si.read();
+        assertFalse(i == 0);
+        si.reset();
+
+        // bulk-read all
+        si.readFully(targetBuf);
+        assertArrayEquals(pbr.toBytes(), targetBuf);
+
+        // continuing to read should now fail with EOFException
+        try {
+            si.readByte();
+            fail("expected EOF");
+        }
+        catch (EOFException eof) {
+            // yay
+        }
+
+        // try to read more than the stream contains
+        si.reset();
+        try {
+            si.readBytes(targetBuf, 0, length * 2);
+            fail("expected IndexOutOfBoundsException: le > stream.length");
+        }
+        catch (IndexOutOfBoundsException ioob) {
+            // expected
+        }
+    }
+
+    public void testSliceStreamInput() throws IOException {
+        int length = randomIntBetween(10, PAGE_SIZE * 3);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        StreamInput si = pbr.streamInput();
+
+        // test stream input over slice (upper half of original)
+        int sliceOffset = randomIntBetween(1, length / 2);
+        int sliceLength = length - sliceOffset;
+        BytesReference slice = pbr.slice(sliceOffset, sliceLength);
+        StreamInput sliceInput = slice.streamInput();
+
+        // single reads
+        assertEquals(slice.get(0), sliceInput.readByte());
+        assertEquals(slice.get(1), sliceInput.readByte());
+        assertEquals(slice.get(2), sliceInput.readByte());
+        si.reset();
+
+        // bulk read
+        byte[] sliceBytes = new byte[sliceLength];
+        sliceInput.readFully(sliceBytes);
+
+        // compare slice content with upper half of original
+        byte[] pbrSliceBytes = Arrays.copyOfRange(pbr.toBytes(), sliceOffset, length);
+        assertArrayEquals(pbrSliceBytes, sliceBytes);
+
+        // compare slice bytes with bytes read from slice via streamInput :D
+        byte[] sliceToBytes = slice.toBytes();
+        assertEquals(sliceBytes.length, sliceToBytes.length);
+        assertArrayEquals(sliceBytes, sliceToBytes);
+    }
+
+    public void testWriteTo() throws IOException {
+        int length = randomIntBetween(10, PAGE_SIZE * 4);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        BytesStreamOutput out = new BytesStreamOutput();
+        pbr.writeTo(out);
+        assertEquals(pbr.length(), out.size());
+        assertArrayEquals(pbr.toBytes(), out.bytes().toBytes());
+        out.close();
+    }
+
+    public void testSliceWriteTo() throws IOException {
+        int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2,5));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        int sliceOffset = randomIntBetween(1, length / 2);
+        int sliceLength = length - sliceOffset;
+        BytesReference slice = pbr.slice(sliceOffset, sliceLength);
+        BytesStreamOutput sliceOut = new BytesStreamOutput(sliceLength);
+        slice.writeTo(sliceOut);
+        assertEquals(slice.length(), sliceOut.size());
+        assertArrayEquals(slice.toBytes(), sliceOut.bytes().toBytes());
+        sliceOut.close();
+    }
+
+    public void testToBytes() {
+        int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomIntBetween(2, PAGE_SIZE * randomIntBetween(2,5))};
+
+        for (int i = 0; i < sizes.length; i++) {
+            BytesReference pbr = getRandomizedPagedBytesReference(sizes[i]);
+            byte[] bytes = pbr.toBytes();
+            assertEquals(sizes[i], bytes.length);
+            // verify that toBytes() is cheap for small payloads
+            if (sizes[i] <= PAGE_SIZE) {
+                assertSame(bytes, pbr.toBytes());
+            }
+            else {
+                assertNotSame(bytes, pbr.toBytes());
+            }
+        }
+    }
+
+    public void testToBytesArraySharedPage() {
+        int length = randomIntBetween(10, PAGE_SIZE);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        BytesArray ba = pbr.toBytesArray();
+        BytesArray ba2 = pbr.toBytesArray();
+        assertNotNull(ba);
+        assertNotNull(ba2);
+        assertEquals(pbr.length(), ba.length());
+        assertEquals(ba.length(), ba2.length());
+        // single-page optimization
+        assertSame(ba.array(), ba2.array());
+    }
+
+    public void testToBytesArrayMaterializedPages() {
+        int length = randomIntBetween(PAGE_SIZE, PAGE_SIZE * randomIntBetween(2,5));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        BytesArray ba = pbr.toBytesArray();
+        BytesArray ba2 = pbr.toBytesArray();
+        assertNotNull(ba);
+        assertNotNull(ba2);
+        assertEquals(pbr.length(), ba.length());
+        assertEquals(ba.length(), ba2.length());
+        // ensure no single-page optimization
+        assertNotSame(ba.array(), ba2.array());
+    }
+
+    public void testCopyBytesArray() {
+        // small PBR which would normally share the first page
+        int length = randomIntBetween(10, PAGE_SIZE);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        BytesArray ba = pbr.copyBytesArray();
+        BytesArray ba2 = pbr.copyBytesArray();
+        assertNotNull(ba);
+        assertNotSame(ba, ba2);
+        assertNotSame(ba.array(), ba2.array());
+    }
+
+    public void testSliceCopyBytesArray() {
+        int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2,8));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        int sliceOffset = randomIntBetween(0, pbr.length());
+        int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset);
+        BytesReference slice = pbr.slice(sliceOffset, sliceLength);
+
+        BytesArray ba1 = slice.copyBytesArray();
+        BytesArray ba2 = slice.copyBytesArray();
+        assertNotNull(ba1);
+        assertNotNull(ba2);
+        assertNotSame(ba1.array(), ba2.array());
+        assertArrayEquals(slice.toBytes(), ba1.array());
+        assertArrayEquals(slice.toBytes(), ba2.array());
+        assertArrayEquals(ba1.array(), ba2.array());
+    }
+
+    public void testToChannelBuffer() {
+        int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2,8));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        ChannelBuffer cb = pbr.toChannelBuffer();
+        assertNotNull(cb);
+        byte[] bufferBytes = new byte[length];
+        cb.getBytes(0, bufferBytes);
+        assertArrayEquals(pbr.toBytes(), bufferBytes);
+    }
+
+    public void testSliceToChannelBuffer() {
+        int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(2,8));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        int sliceOffset = randomIntBetween(0, pbr.length());
+        int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset);
+        BytesReference slice = pbr.slice(sliceOffset, sliceLength);
+        ChannelBuffer cbSlice = slice.toChannelBuffer();
+        assertNotNull(cbSlice);
+        byte[] sliceBufferBytes = new byte[sliceLength];
+        cbSlice.getBytes(0, sliceBufferBytes);
+        assertArrayEquals(slice.toBytes(), sliceBufferBytes);
+    }
+
+    public void testHasArray() {
+        int length = randomIntBetween(10, PAGE_SIZE * randomIntBetween(1,3));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        // must return true for <= pagesize
+        assertEquals(length <= PAGE_SIZE, pbr.hasArray());
+    }
+
+    public void testArray() {
+        int[] sizes = {0, randomInt(PAGE_SIZE), PAGE_SIZE, randomIntBetween(2, PAGE_SIZE * randomIntBetween(2,5))};
+
+        for (int i = 0; i < sizes.length; i++) {
+            BytesReference pbr = getRandomizedPagedBytesReference(sizes[i]);
+            // verify that array() is cheap for small payloads
+            if (sizes[i] <= PAGE_SIZE) {
+                byte[] array = pbr.array();
+                assertNotNull(array);
+                assertEquals(sizes[i], array.length);
+                assertSame(array, pbr.array());
+            }
+            else {
+                try {
+                    pbr.array();
+                    fail("expected IllegalStateException");
+                }
+                catch (IllegalStateException isx) {
+                    // expected
+                }
+            }
+        }
+    }
+
+    public void testArrayOffset() {
+        int length = randomInt(PAGE_SIZE * randomIntBetween(2,5));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        if (pbr.hasArray()) {
+            assertEquals(0, pbr.arrayOffset());
+        }
+        else {
+            try {
+                pbr.arrayOffset();
+                fail("expected IllegalStateException");
+            }
+            catch (IllegalStateException ise) {
+                // expected
+            }
+        }
+    }
+
+    public void testSliceArrayOffset() {
+        int length = randomInt(PAGE_SIZE * randomIntBetween(2,5));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        int sliceOffset = randomIntBetween(0, pbr.length());
+        int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset);
+        BytesReference slice = pbr.slice(sliceOffset, sliceLength);
+        if (slice.hasArray()) {
+            assertEquals(sliceOffset, slice.arrayOffset());
+        }
+        else {
+            try {
+                slice.arrayOffset();
+                fail("expected IllegalStateException");
+            }
+            catch (IllegalStateException ise) {
+                // expected
+            }
+        }
+    }
+
+    public void testToUtf8() throws IOException {
+        // test empty
+        BytesReference pbr = getRandomizedPagedBytesReference(0);
+        assertEquals("", pbr.toUtf8());
+        // TODO: good way to test?
+    }
+
+    public void testToBytesRef() {
+        int length = randomIntBetween(0, PAGE_SIZE);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        BytesRef ref = pbr.toBytesRef();
+        assertNotNull(ref);
+        assertEquals(pbr.arrayOffset(), ref.offset);
+        assertEquals(pbr.length(), ref.length);
+    }
+
+    public void testSliceToBytesRef() {
+        int length = randomIntBetween(0, PAGE_SIZE);
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        // get a BytesRef from a slice
+        int sliceOffset = randomIntBetween(0, pbr.length());
+        int sliceLength = randomIntBetween(pbr.length() - sliceOffset, pbr.length() - sliceOffset);
+        BytesRef sliceRef = pbr.slice(sliceOffset, sliceLength).toBytesRef();
+        // note that these are only true if we have <= than a page, otherwise offset/length are shifted
+        assertEquals(sliceOffset, sliceRef.offset);
+        assertEquals(sliceLength, sliceRef.length);
+    }
+
+    public void testCopyBytesRef() {
+        int length = randomIntBetween(0, PAGE_SIZE * randomIntBetween(2,5));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        BytesRef ref = pbr.copyBytesRef();
+        assertNotNull(ref);
+        assertEquals(pbr.length(), ref.length);
+    }
+
+    public void testHashCode() {
+        // empty content must have hash 1 (JDK compat)
+        BytesReference pbr = getRandomizedPagedBytesReference(0);
+        assertEquals(Arrays.hashCode(BytesRef.EMPTY_BYTES), pbr.hashCode());
+
+        // test with content
+        pbr = getRandomizedPagedBytesReference(randomIntBetween(0, PAGE_SIZE * randomIntBetween(2,5)));
+        int jdkHash = Arrays.hashCode(pbr.toBytes());
+        int pbrHash = pbr.hashCode();
+        assertEquals(jdkHash, pbrHash);
+
+        // test hashes of slices
+        int sliceFrom = randomIntBetween(0, pbr.length());
+        int sliceLength = randomIntBetween(pbr.length() - sliceFrom, pbr.length() - sliceFrom);
+        BytesReference slice = pbr.slice(sliceFrom, sliceLength);
+        int sliceJdkHash = Arrays.hashCode(slice.toBytes());
+        int sliceHash = slice.hashCode();
+        assertEquals(sliceJdkHash, sliceHash);
+    }
+
+    public void testEquals() {
+        int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2,5));
+        ByteArray ba1 = bigarrays.newByteArray(length, false);
+        ByteArray ba2 = bigarrays.newByteArray(length, false);
+
+        // copy contents
+        for (long i = 0; i < length; i++) {
+            ba2.set(i, ba1.get(i));
+        }
+
+        // get refs & compare
+        BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length);
+        BytesReference pbr2 = new PagedBytesReference(bigarrays, ba2, length);
+        assertEquals(pbr, pbr2);
+    }
+
+    public void testEqualsPeerClass() {
+        int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2,5));
+        BytesReference pbr = getRandomizedPagedBytesReference(length);
+        BytesReference ba = new BytesArray(pbr.toBytes());
+        assertEquals(pbr, ba);
+    }
+
+    public void testSliceEquals() {
+        int length = randomIntBetween(100, PAGE_SIZE * randomIntBetween(2,5));
+        ByteArray ba1 = bigarrays.newByteArray(length, false);
+        BytesReference pbr = new PagedBytesReference(bigarrays, ba1, length);
+
+        // test equality of slices
+        int sliceFrom = randomIntBetween(0, pbr.length());
+        int sliceLength = randomIntBetween(pbr.length() - sliceFrom, pbr.length() - sliceFrom);
+        BytesReference slice1 = pbr.slice(sliceFrom, sliceLength);
+        BytesReference slice2 = pbr.slice(sliceFrom, sliceLength);
+        assertArrayEquals(slice1.toBytes(), slice2.toBytes());
+
+        // test a slice with same offset but different length
+        BytesReference slice3 = pbr.slice(sliceFrom, sliceLength / 2);
+        assertFalse(Arrays.equals(slice1.toBytes(), slice3.toBytes()));
+    }
+
+    private BytesReference getRandomizedPagedBytesReference(int length) {
+        return new PagedBytesReference(bigarrays, bigarrays.newByteArray(length, false), length);
+    }
+
+}