Browse Source

Introduce resizable inbound byte buffer (#27551)

This is related to #27563. In order to interface with java nio, we must
have buffers that are compatible with ByteBuffer. This commit introduces
a basic ByteBufferReference to easily allow transferring bytes off the
wire to usage in the application.

Additionally it introduces an InboundChannelBuffer. This is a buffer
that can internally expand as more space is needed. It is designed to
be integrated with a page recycler so that it can internally reuse pages.
The final piece is moving all of the index work for writing bytes to a
channel into the WriteOperation.
Tim Brooks 7 years ago
parent
commit
2aa62daed4
16 changed files with 601 additions and 433 deletions
  1. 89 0
      core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java
  2. 44 0
      core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java
  3. 204 0
      test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java
  4. 0 157
      test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java
  5. 48 12
      test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java
  6. 8 21
      test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java
  7. 3 3
      test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java
  8. 19 39
      test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java
  9. 0 155
      test/framework/src/test/java/org/elasticsearch/transport/nio/ByteBufferReferenceTests.java
  10. 152 0
      test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java
  11. 2 4
      test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java
  12. 3 3
      test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java
  13. 3 11
      test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java
  14. 8 10
      test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java
  15. 16 16
      test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java
  16. 2 2
      test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java

+ 89 - 0
core/src/main/java/org/elasticsearch/common/bytes/ByteBufferReference.java

@@ -0,0 +1,89 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.bytes;
+
+import org.apache.lucene.util.BytesRef;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This is a {@link BytesReference} backed by a {@link ByteBuffer}. The byte buffer can either be a heap or
+ * direct byte buffer. The reference is composed of the space between the {@link ByteBuffer#position} and
+ * {@link ByteBuffer#limit} at construction time. If the position or limit of the underlying byte buffer is
+ * changed, those changes will not be reflected in this reference. However, modifying the limit or position
+ * of the underlying byte buffer is not recommended as those can be used during {@link ByteBuffer#get()}
+ * bounds checks. Use {@link ByteBuffer#duplicate()} at creation time if you plan on modifying the markers of
+ * the underlying byte buffer. Any changes to the underlying data in the byte buffer will be reflected.
+ */
+public class ByteBufferReference extends BytesReference {
+
+    private final ByteBuffer buffer;
+    private final int offset;
+    private final int length;
+
+    public ByteBufferReference(ByteBuffer buffer) {
+        this.buffer = buffer;
+        this.offset = buffer.position();
+        this.length = buffer.remaining();
+    }
+
+    @Override
+    public byte get(int index) {
+        return buffer.get(index + offset);
+    }
+
+    @Override
+    public int length() {
+        return length;
+    }
+
+    @Override
+    public BytesReference slice(int from, int length) {
+        if (from < 0 || (from + length) > this.length) {
+            throw new IndexOutOfBoundsException("can't slice a buffer with length [" + this.length + "], with slice parameters from ["
+                + from + "], length [" + length + "]");
+        }
+        ByteBuffer newByteBuffer = buffer.duplicate();
+        newByteBuffer.position(offset + from);
+        newByteBuffer.limit(offset + from + length);
+        return new ByteBufferReference(newByteBuffer);
+    }
+
+    /**
+     * This will return a bytes ref composed of the bytes. If this is a direct byte buffer, the bytes will
+     * have to be copied.
+     *
+     * @return the bytes ref
+     */
+    @Override
+    public BytesRef toBytesRef() {
+        if (buffer.hasArray()) {
+            return new BytesRef(buffer.array(), buffer.arrayOffset() + offset, length);
+        }
+        final byte[] copy = new byte[length];
+        buffer.get(copy, offset, length);
+        return new BytesRef(copy);
+    }
+
+    @Override
+    public long ramBytesUsed() {
+        return buffer.capacity();
+    }
+}

+ 44 - 0
core/src/test/java/org/elasticsearch/common/bytes/ByteBufferReferenceTests.java

@@ -0,0 +1,44 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.common.bytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class ByteBufferReferenceTests extends AbstractBytesReferenceTestCase {
+
+    private void initializeBytes(byte[] bytes) {
+        for (int i = 0 ; i < bytes.length; ++i) {
+            bytes[i] = (byte) i;
+        }
+    }
+
+    @Override
+    protected BytesReference newBytesReference(int length) throws IOException {
+        return newBytesReferenceWithOffsetOfZero(length);
+    }
+
+    @Override
+    protected BytesReference newBytesReferenceWithOffsetOfZero(int length) throws IOException {
+        byte[] bytes = new byte[length];
+        initializeBytes(bytes);
+        return new ByteBufferReference(ByteBuffer.wrap(bytes));
+    }
+}

+ 204 - 0
test/framework/src/main/java/org/elasticsearch/transport/nio/InboundChannelBuffer.java

@@ -0,0 +1,204 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport.nio;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/**
+ * This is a channel byte buffer composed internally of 16kb pages. When an entire message has been read
+ * and consumed, the {@link #release(long)} method releases the bytes from the head of the buffer and closes
+ * the pages internally. If more space is needed at the end of the buffer {@link #ensureCapacity(long)} can
+ * be called and the buffer will expand using the supplier provided.
+ */
+public final class InboundChannelBuffer {
+
+    private static final int PAGE_SIZE = 1 << 14;
+    private static final int PAGE_MASK = PAGE_SIZE - 1;
+    private static final int PAGE_SHIFT = Integer.numberOfTrailingZeros(PAGE_SIZE);
+    private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
+
+
+    private final ArrayDeque<ByteBuffer> pages;
+    private final Supplier<ByteBuffer> pageSupplier;
+
+    private long capacity = 0;
+    private long internalIndex = 0;
+    // The offset is an int as it is the offset of where the bytes begin in the first buffer
+    private int offset = 0;
+
+    public InboundChannelBuffer() {
+        this(() -> ByteBuffer.wrap(new byte[PAGE_SIZE]));
+    }
+
+    private InboundChannelBuffer(Supplier<ByteBuffer> pageSupplier) {
+        this.pageSupplier = pageSupplier;
+        this.pages = new ArrayDeque<>();
+        this.capacity = PAGE_SIZE * pages.size();
+        ensureCapacity(PAGE_SIZE);
+    }
+
+    public void ensureCapacity(long requiredCapacity) {
+        if (capacity < requiredCapacity) {
+            int numPages = numPages(requiredCapacity + offset);
+            int pagesToAdd = numPages - pages.size();
+            for (int i = 0; i < pagesToAdd; i++) {
+                pages.addLast(pageSupplier.get());
+            }
+            capacity += pagesToAdd * PAGE_SIZE;
+        }
+    }
+
+    /**
+     * This method will release bytes from the head of this buffer. If you release bytes past the current
+     * index the index is truncated to zero.
+     *
+     * @param bytesToRelease number of bytes to drop
+     */
+    public void release(long bytesToRelease) {
+        if (bytesToRelease > capacity) {
+            throw new IllegalArgumentException("Releasing more bytes [" + bytesToRelease + "] than buffer capacity [" + capacity + "].");
+        }
+
+        int pagesToRelease = pageIndex(offset + bytesToRelease);
+        for (int i = 0; i < pagesToRelease; i++) {
+            pages.removeFirst();
+        }
+        capacity -= bytesToRelease;
+        internalIndex = Math.max(internalIndex - bytesToRelease, 0);
+        offset = indexInPage(bytesToRelease + offset);
+    }
+
+    /**
+     * This method will return an array of {@link ByteBuffer} representing the bytes from the beginning of
+     * this buffer up through the index argument that was passed. The buffers will be duplicates of the
+     * internal buffers, so any modifications to the markers {@link ByteBuffer#position()},
+     * {@link ByteBuffer#limit()}, etc will not modify the this class.
+     *
+     * @param to the index to slice up to
+     * @return the byte buffers
+     */
+    public ByteBuffer[] sliceBuffersTo(long to) {
+        if (to > capacity) {
+            throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity +
+                "], with slice parameters to [" + to + "]");
+        } else if (to == 0) {
+            return EMPTY_BYTE_BUFFER_ARRAY;
+        }
+        long indexWithOffset = to + offset;
+        int pageCount = pageIndex(indexWithOffset);
+        int finalLimit = indexInPage(indexWithOffset);
+        if (finalLimit != 0) {
+            pageCount += 1;
+        }
+
+        ByteBuffer[] buffers = new ByteBuffer[pageCount];
+        Iterator<ByteBuffer> pageIterator = pages.iterator();
+        ByteBuffer firstBuffer = pageIterator.next().duplicate();
+        firstBuffer.position(firstBuffer.position() + offset);
+        buffers[0] = firstBuffer;
+        for (int i = 1; i < buffers.length; i++) {
+            buffers[i] = pageIterator.next().duplicate();
+        }
+        if (finalLimit != 0) {
+            buffers[buffers.length - 1].limit(finalLimit);
+        }
+
+        return buffers;
+    }
+
+    /**
+     * This method will return an array of {@link ByteBuffer} representing the bytes from the index passed
+     * through the end of this buffer. The buffers will be duplicates of the internal buffers, so any
+     * modifications to the markers {@link ByteBuffer#position()}, {@link ByteBuffer#limit()}, etc will not
+     * modify the this class.
+     *
+     * @param from the index to slice from
+     * @return the byte buffers
+     */
+    public ByteBuffer[] sliceBuffersFrom(long from) {
+        if (from > capacity) {
+            throw new IndexOutOfBoundsException("can't slice a channel buffer with capacity [" + capacity +
+                "], with slice parameters from [" + from + "]");
+        } else if (from == capacity) {
+            return EMPTY_BYTE_BUFFER_ARRAY;
+        }
+        long indexWithOffset = from + offset;
+
+        int pageIndex = pageIndex(indexWithOffset);
+        int indexInPage = indexInPage(indexWithOffset);
+
+        ByteBuffer[] buffers = new ByteBuffer[pages.size() - pageIndex];
+        Iterator<ByteBuffer> pageIterator = pages.descendingIterator();
+        for (int i = buffers.length - 1; i > 0; --i) {
+            buffers[i] = pageIterator.next().duplicate();
+        }
+        ByteBuffer firstPostIndexBuffer = pageIterator.next().duplicate();
+        firstPostIndexBuffer.position(firstPostIndexBuffer.position() + indexInPage);
+        buffers[0] = firstPostIndexBuffer;
+
+        return buffers;
+    }
+
+    public void incrementIndex(long delta) {
+        if (delta < 0) {
+            throw new IllegalArgumentException("Cannot increment an index with a negative delta [" + delta + "]");
+        }
+
+        long newIndex = delta + internalIndex;
+        if (newIndex > capacity) {
+            throw new IllegalArgumentException("Cannot increment an index [" + internalIndex + "] with a delta [" + delta +
+                "] that will result in a new index [" + newIndex + "] that is greater than the capacity [" + capacity + "].");
+        }
+        internalIndex = newIndex;
+    }
+
+    public long getIndex() {
+        return internalIndex;
+    }
+
+    public long getCapacity() {
+        return capacity;
+    }
+
+    public long getRemaining() {
+        long remaining = capacity - internalIndex;
+        assert remaining >= 0 : "The remaining [" + remaining + "] number of bytes should not be less than zero.";
+        return remaining;
+    }
+
+    private int numPages(long capacity) {
+        final long numPages = (capacity + PAGE_MASK) >>> PAGE_SHIFT;
+        if (numPages > Integer.MAX_VALUE) {
+            throw new IllegalArgumentException("pageSize=" + (PAGE_MASK + 1) + " is too small for such as capacity: " + capacity);
+        }
+        return (int) numPages;
+    }
+
+    private int pageIndex(long index) {
+        return (int) (index >>> PAGE_SHIFT);
+    }
+
+    private int indexInPage(long index) {
+        return (int) (index & PAGE_MASK);
+    }
+}

+ 0 - 157
test/framework/src/main/java/org/elasticsearch/transport/nio/NetworkBytesReference.java

@@ -1,157 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.transport.nio;
-
-import org.apache.lucene.util.BytesRef;
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.common.bytes.BytesReference;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-public class NetworkBytesReference extends BytesReference {
-
-    private final BytesArray bytesArray;
-    private final ByteBuffer writeBuffer;
-    private final ByteBuffer readBuffer;
-
-    private int writeIndex;
-    private int readIndex;
-
-    public NetworkBytesReference(BytesArray bytesArray, int writeIndex, int readIndex) {
-        this.bytesArray = bytesArray;
-        this.writeIndex = writeIndex;
-        this.readIndex = readIndex;
-        this.writeBuffer = ByteBuffer.wrap(bytesArray.array());
-        this.readBuffer = ByteBuffer.wrap(bytesArray.array());
-    }
-
-    public static NetworkBytesReference wrap(BytesArray bytesArray) {
-        return wrap(bytesArray, 0, 0);
-    }
-
-    public static NetworkBytesReference wrap(BytesArray bytesArray, int writeIndex, int readIndex) {
-        if (readIndex > writeIndex) {
-            throw new IndexOutOfBoundsException("Read index [" + readIndex + "] was greater than write index [" + writeIndex + "]");
-        }
-        return new NetworkBytesReference(bytesArray, writeIndex, readIndex);
-    }
-
-    @Override
-    public byte get(int index) {
-        return bytesArray.get(index);
-    }
-
-    @Override
-    public int length() {
-        return bytesArray.length();
-    }
-
-    @Override
-    public NetworkBytesReference slice(int from, int length) {
-        BytesReference ref = bytesArray.slice(from, length);
-        BytesArray newBytesArray;
-        if (ref instanceof BytesArray) {
-            newBytesArray = (BytesArray) ref;
-        } else {
-            newBytesArray = new BytesArray(ref.toBytesRef());
-        }
-
-        int newReadIndex = Math.min(Math.max(readIndex - from, 0), length);
-        int newWriteIndex = Math.min(Math.max(writeIndex - from, 0), length);
-
-        return wrap(newBytesArray, newWriteIndex, newReadIndex);
-    }
-
-    @Override
-    public BytesRef toBytesRef() {
-        return bytesArray.toBytesRef();
-    }
-
-    @Override
-    public long ramBytesUsed() {
-        return bytesArray.ramBytesUsed();
-    }
-
-    public int getWriteIndex() {
-        return writeIndex;
-    }
-
-    public void incrementWrite(int delta) {
-        int newWriteIndex = writeIndex + delta;
-        if (newWriteIndex > bytesArray.length()) {
-            throw new IndexOutOfBoundsException("New write index [" + newWriteIndex + "] would be greater than length" +
-                " [" + bytesArray.length() + "]");
-        }
-
-        writeIndex = newWriteIndex;
-    }
-
-    public int getWriteRemaining() {
-        return bytesArray.length() - writeIndex;
-    }
-
-    public boolean hasWriteRemaining() {
-        return getWriteRemaining() > 0;
-    }
-
-    public int getReadIndex() {
-        return readIndex;
-    }
-
-    public void incrementRead(int delta) {
-        int newReadIndex = readIndex + delta;
-        if (newReadIndex > writeIndex) {
-            throw new IndexOutOfBoundsException("New read index [" + newReadIndex + "] would be greater than write" +
-                " index [" + writeIndex + "]");
-        }
-        readIndex = newReadIndex;
-    }
-
-    public int getReadRemaining() {
-        return writeIndex - readIndex;
-    }
-
-    public boolean hasReadRemaining() {
-        return getReadRemaining() > 0;
-    }
-
-    public ByteBuffer getWriteByteBuffer() {
-        writeBuffer.position(bytesArray.offset() + writeIndex);
-        writeBuffer.limit(bytesArray.offset() + bytesArray.length());
-        return writeBuffer;
-    }
-
-    public ByteBuffer getReadByteBuffer() {
-        readBuffer.position(bytesArray.offset() + readIndex);
-        readBuffer.limit(bytesArray.offset() + writeIndex);
-        return readBuffer;
-    }
-
-    public static void vectorizedIncrementReadIndexes(Iterable<NetworkBytesReference> references, int delta) {
-        Iterator<NetworkBytesReference> refs = references.iterator();
-        while (delta != 0) {
-            NetworkBytesReference ref = refs.next();
-            int amountToInc = Math.min(ref.getReadRemaining(), delta);
-            ref.incrementRead(amountToInc);
-            delta -= amountToInc;
-        }
-    }
-}

+ 48 - 12
test/framework/src/main/java/org/elasticsearch/transport/nio/WriteOperation.java

@@ -27,22 +27,35 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.transport.nio.channel.NioSocketChannel;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 
 public class WriteOperation {
 
     private final NioSocketChannel channel;
     private final ActionListener<Void> listener;
-    private final NetworkBytesReference[] references;
+    private final ByteBuffer[] buffers;
+    private final int[] offsets;
+    private final int length;
+    private int internalIndex;
 
     public WriteOperation(NioSocketChannel channel, BytesReference bytesReference, ActionListener<Void> listener) {
         this.channel = channel;
         this.listener = listener;
-        this.references = toArray(bytesReference);
+        this.buffers = toByteBuffers(bytesReference);
+        this.offsets = new int[buffers.length];
+        int offset = 0;
+        for (int i = 0; i < buffers.length; i++) {
+            ByteBuffer buffer = buffers[i];
+            offsets[i] = offset;
+            offset += buffer.remaining();
+        }
+        length = offset;
     }
 
-    public NetworkBytesReference[] getByteReferences() {
-        return references;
+    public ByteBuffer[] getByteBuffers() {
+        return buffers;
     }
 
     public ActionListener<Void> getListener() {
@@ -54,23 +67,46 @@ public class WriteOperation {
     }
 
     public boolean isFullyFlushed() {
-        return references[references.length - 1].hasReadRemaining() == false;
+        return internalIndex == length;
     }
 
     public int flush() throws IOException {
-        return channel.write(references);
+        int written = channel.write(getBuffersToWrite());
+        internalIndex += written;
+        return written;
+    }
+
+    private ByteBuffer[] getBuffersToWrite() {
+        int offsetIndex = getOffsetIndex(internalIndex);
+
+        ByteBuffer[] postIndexBuffers = new ByteBuffer[buffers.length - offsetIndex];
+
+        ByteBuffer firstBuffer = buffers[0].duplicate();
+        firstBuffer.position(internalIndex - offsets[offsetIndex]);
+        postIndexBuffers[offsetIndex] = firstBuffer;
+        int j = 1;
+        for (int i = (offsetIndex + 1); i < buffers.length; ++i) {
+            postIndexBuffers[j++] = buffers[i].duplicate();
+        }
+
+        return postIndexBuffers;
+    }
+
+    private int getOffsetIndex(int offset) {
+        final int i = Arrays.binarySearch(offsets, offset);
+        return i < 0 ? (-(i + 1)) - 1 : i;
     }
 
-    private static NetworkBytesReference[] toArray(BytesReference reference) {
-        BytesRefIterator byteRefIterator = reference.iterator();
+    private static ByteBuffer[] toByteBuffers(BytesReference bytesReference) {
+        BytesRefIterator byteRefIterator = bytesReference.iterator();
         BytesRef r;
         try {
-            // Most network messages are composed of three buffers
-            ArrayList<NetworkBytesReference> references = new ArrayList<>(3);
+            // Most network messages are composed of three buffers.
+            ArrayList<ByteBuffer> buffers = new ArrayList<>(3);
             while ((r = byteRefIterator.next()) != null) {
-                references.add(NetworkBytesReference.wrap(new BytesArray(r), r.length, 0));
+                buffers.add(ByteBuffer.wrap(r.bytes, r.offset, r.length));
             }
-            return references.toArray(new NetworkBytesReference[references.size()]);
+            return buffers.toArray(new ByteBuffer[buffers.size()]);
 
         } catch (IOException e) {
             // this is really an error since we don't do IO in our bytesreferences

+ 8 - 21
test/framework/src/main/java/org/elasticsearch/transport/nio/channel/NioSocketChannel.java

@@ -20,7 +20,7 @@
 package org.elasticsearch.transport.nio.channel;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.transport.nio.NetworkBytesReference;
+import org.elasticsearch.transport.nio.InboundChannelBuffer;
 import org.elasticsearch.transport.nio.SocketSelector;
 
 import java.io.IOException;
@@ -28,7 +28,6 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SocketChannel;
-import java.util.Arrays;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
@@ -66,34 +65,22 @@ public class NioSocketChannel extends AbstractNioChannel<SocketChannel> {
         return socketSelector;
     }
 
-    public int write(NetworkBytesReference[] references) throws IOException {
-        int written;
-        if (references.length == 1) {
-            written = socketChannel.write(references[0].getReadByteBuffer());
+    public int write(ByteBuffer[] buffers) throws IOException {
+        if (buffers.length == 1) {
+            return socketChannel.write(buffers[0]);
         } else {
-            ByteBuffer[] buffers = new ByteBuffer[references.length];
-            for (int i = 0; i < references.length; ++i) {
-                buffers[i] = references[i].getReadByteBuffer();
-            }
-            written = (int) socketChannel.write(buffers);
-        }
-        if (written <= 0) {
-            return written;
+            return (int) socketChannel.write(buffers);
         }
-
-        NetworkBytesReference.vectorizedIncrementReadIndexes(Arrays.asList(references), written);
-
-        return written;
     }
 
-    public int read(NetworkBytesReference reference) throws IOException {
-        int bytesRead = socketChannel.read(reference.getWriteByteBuffer());
+    public int read(InboundChannelBuffer buffer) throws IOException {
+        int bytesRead = (int) socketChannel.read(buffer.sliceBuffersFrom(buffer.getIndex()));
 
         if (bytesRead == -1) {
             return bytesRead;
         }
 
-        reference.incrementWrite(bytesRead);
+        buffer.incrementIndex(bytesRead);
         return bytesRead;
     }
 

+ 3 - 3
test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoder.java

@@ -36,11 +36,11 @@ public class TcpFrameDecoder {
 
     private int expectedMessageLength = -1;
 
-    public BytesReference decode(BytesReference bytesReference, int currentBufferSize) throws IOException {
-        if (currentBufferSize >= 6) {
+    public BytesReference decode(BytesReference bytesReference) throws IOException {
+        if (bytesReference.length() >= 6) {
             int messageLength = readHeaderBuffer(bytesReference);
             int totalLength = messageLength + HEADER_SIZE;
-            if (totalLength > currentBufferSize) {
+            if (totalLength > bytesReference.length()) {
                 expectedMessageLength = totalLength;
                 return null;
             } else if (totalLength == bytesReference.length()) {

+ 19 - 39
test/framework/src/main/java/org/elasticsearch/transport/nio/channel/TcpReadContext.java

@@ -19,25 +19,21 @@
 
 package org.elasticsearch.transport.nio.channel;
 
-import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.ByteBufferReference;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.CompositeBytesReference;
-import org.elasticsearch.transport.nio.NetworkBytesReference;
+import org.elasticsearch.transport.nio.InboundChannelBuffer;
 import org.elasticsearch.transport.nio.TcpReadHandler;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
+import java.nio.ByteBuffer;
 
 public class TcpReadContext implements ReadContext {
 
-    private static final int DEFAULT_READ_LENGTH = 1 << 14;
-
     private final TcpReadHandler handler;
     private final TcpNioSocketChannel channel;
     private final TcpFrameDecoder frameDecoder;
-    private final LinkedList<NetworkBytesReference> references = new LinkedList<>();
-    private int rawBytesCount = 0;
+    private final InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
 
     public TcpReadContext(NioSocketChannel channel, TcpReadHandler handler) {
         this((TcpNioSocketChannel) channel, handler, new TcpFrameDecoder());
@@ -47,33 +43,27 @@ public class TcpReadContext implements ReadContext {
         this.handler = handler;
         this.channel = channel;
         this.frameDecoder = frameDecoder;
-        this.references.add(NetworkBytesReference.wrap(new BytesArray(new byte[DEFAULT_READ_LENGTH])));
     }
 
     @Override
     public int read() throws IOException {
-        NetworkBytesReference last = references.peekLast();
-        if (last == null || last.hasWriteRemaining() == false) {
-            this.references.add(NetworkBytesReference.wrap(new BytesArray(new byte[DEFAULT_READ_LENGTH])));
+        if (channelBuffer.getRemaining() == 0) {
+            // Requiring one additional byte will ensure that a new page is allocated.
+            channelBuffer.ensureCapacity(channelBuffer.getCapacity() + 1);
         }
 
-        int bytesRead = channel.read(references.getLast());
+        int bytesRead = channel.read(channelBuffer);
 
         if (bytesRead == -1) {
             return bytesRead;
         }
 
-        rawBytesCount += bytesRead;
-
         BytesReference message;
 
         // Frame decoder will throw an exception if the message is improperly formatted, the header is incorrect,
         // or the message is corrupted
-        while ((message = frameDecoder.decode(createCompositeBuffer(), rawBytesCount)) != null) {
+        while ((message = frameDecoder.decode(toBytesReference(channelBuffer))) != null) {
             int messageLengthWithHeader = message.length();
-            NetworkBytesReference.vectorizedIncrementReadIndexes(references, messageLengthWithHeader);
-            trimDecodedMessages(messageLengthWithHeader);
-            rawBytesCount -= messageLengthWithHeader;
 
             try {
                 BytesReference messageWithoutHeader = message.slice(6, message.length() - 6);
@@ -84,32 +74,22 @@ public class TcpReadContext implements ReadContext {
                 }
             } catch (Exception e) {
                 handler.handleException(channel, e);
+            } finally {
+                channelBuffer.release(messageLengthWithHeader);
             }
         }
 
         return bytesRead;
     }
 
-    private CompositeBytesReference createCompositeBuffer() {
-        return new CompositeBytesReference(references.toArray(new BytesReference[references.size()]));
-    }
-
-    private void trimDecodedMessages(int bytesToTrim) {
-        while (bytesToTrim != 0) {
-            NetworkBytesReference ref = references.getFirst();
-            int readIndex = ref.getReadIndex();
-            bytesToTrim -= readIndex;
-            if (readIndex == ref.length()) {
-                references.removeFirst();
-            } else {
-                assert bytesToTrim == 0;
-                if (readIndex != 0) {
-                    references.removeFirst();
-                    NetworkBytesReference slicedRef = ref.slice(readIndex, ref.length() - readIndex);
-                    references.addFirst(slicedRef);
-                }
-            }
-
+    private static BytesReference toBytesReference(InboundChannelBuffer channelBuffer) {
+        ByteBuffer[] writtenToBuffers = channelBuffer.sliceBuffersTo(channelBuffer.getIndex());
+        ByteBufferReference[] references = new ByteBufferReference[writtenToBuffers.length];
+        for (int i = 0; i < references.length; ++i) {
+            references[i] = new ByteBufferReference(writtenToBuffers[i]);
         }
+
+        return new CompositeBytesReference(references);
     }
+
 }

+ 0 - 155
test/framework/src/test/java/org/elasticsearch/transport/nio/ByteBufferReferenceTests.java

@@ -1,155 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.transport.nio;
-
-import org.elasticsearch.common.bytes.BytesArray;
-import org.elasticsearch.test.ESTestCase;
-
-import java.nio.ByteBuffer;
-
-public class ByteBufferReferenceTests extends ESTestCase {
-
-    private NetworkBytesReference buffer;
-
-    public void testBasicGetByte() {
-        byte[] bytes = new byte[10];
-        initializeBytes(bytes);
-        buffer = NetworkBytesReference.wrap(new BytesArray(bytes));
-
-        assertEquals(10, buffer.length());
-        for (int i = 0 ; i < bytes.length; ++i) {
-            assertEquals(i, buffer.get(i));
-        }
-    }
-
-    public void testBasicGetByteWithOffset() {
-        byte[] bytes = new byte[10];
-        initializeBytes(bytes);
-        buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 8));
-
-        assertEquals(8, buffer.length());
-        for (int i = 2 ; i < bytes.length; ++i) {
-            assertEquals(i, buffer.get(i - 2));
-        }
-    }
-
-    public void testBasicGetByteWithOffsetAndLimit() {
-        byte[] bytes = new byte[10];
-        initializeBytes(bytes);
-        buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 6));
-
-        assertEquals(6, buffer.length());
-        for (int i = 2 ; i < bytes.length - 2; ++i) {
-            assertEquals(i, buffer.get(i - 2));
-        }
-    }
-
-    public void testGetWriteBufferRespectsWriteIndex() {
-        byte[] bytes = new byte[10];
-
-        buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 8));
-
-        ByteBuffer writeByteBuffer = buffer.getWriteByteBuffer();
-
-        assertEquals(2, writeByteBuffer.position());
-        assertEquals(10, writeByteBuffer.limit());
-
-        buffer.incrementWrite(2);
-
-        writeByteBuffer = buffer.getWriteByteBuffer();
-        assertEquals(4, writeByteBuffer.position());
-        assertEquals(10, writeByteBuffer.limit());
-    }
-
-    public void testGetReadBufferRespectsReadIndex() {
-        byte[] bytes = new byte[10];
-
-        buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 3, 6), 6, 0);
-
-        ByteBuffer readByteBuffer = buffer.getReadByteBuffer();
-
-        assertEquals(3, readByteBuffer.position());
-        assertEquals(9, readByteBuffer.limit());
-
-        buffer.incrementRead(2);
-
-        readByteBuffer = buffer.getReadByteBuffer();
-        assertEquals(5, readByteBuffer.position());
-        assertEquals(9, readByteBuffer.limit());
-    }
-
-    public void testWriteAndReadRemaining() {
-        byte[] bytes = new byte[10];
-
-        buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 8));
-
-        assertEquals(0, buffer.getReadRemaining());
-        assertEquals(8, buffer.getWriteRemaining());
-
-        buffer.incrementWrite(3);
-        buffer.incrementRead(2);
-
-        assertEquals(1, buffer.getReadRemaining());
-        assertEquals(5, buffer.getWriteRemaining());
-    }
-
-    public void testBasicSlice() {
-        byte[] bytes = new byte[20];
-        initializeBytes(bytes);
-
-        buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 18));
-
-        NetworkBytesReference slice = buffer.slice(4, 14);
-
-        assertEquals(14, slice.length());
-        assertEquals(0, slice.getReadIndex());
-        assertEquals(0, slice.getWriteIndex());
-
-        for (int i = 6; i < 20; ++i) {
-            assertEquals(i, slice.get(i - 6));
-        }
-    }
-
-    public void testSliceWithReadAndWriteIndexes() {
-        byte[] bytes = new byte[20];
-        initializeBytes(bytes);
-
-        buffer = NetworkBytesReference.wrap(new BytesArray(bytes, 2, 18));
-
-        buffer.incrementWrite(9);
-        buffer.incrementRead(5);
-
-        NetworkBytesReference slice = buffer.slice(6, 12);
-
-        assertEquals(12, slice.length());
-        assertEquals(0, slice.getReadIndex());
-        assertEquals(3, slice.getWriteIndex());
-
-        for (int i = 8; i < 20; ++i) {
-            assertEquals(i, slice.get(i - 8));
-        }
-    }
-
-    private void initializeBytes(byte[] bytes) {
-        for (int i = 0 ; i < bytes.length; ++i) {
-            bytes[i] = (byte) i;
-        }
-    }
-}

+ 152 - 0
test/framework/src/test/java/org/elasticsearch/transport/nio/InboundChannelBufferTests.java

@@ -0,0 +1,152 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.transport.nio;
+
+import org.elasticsearch.test.ESTestCase;
+
+import java.nio.ByteBuffer;
+
+public class InboundChannelBufferTests extends ESTestCase {
+
+    private static final int PAGE_SIZE = 1 << 14;
+
+    public void testNewBufferHasSinglePage() {
+        InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
+
+        assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
+        assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
+        assertEquals(0, channelBuffer.getIndex());
+    }
+
+    public void testExpandCapacity() {
+        InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
+
+        assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
+        assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
+
+        channelBuffer.ensureCapacity(PAGE_SIZE + 1);
+
+        assertEquals(PAGE_SIZE * 2, channelBuffer.getCapacity());
+        assertEquals(PAGE_SIZE * 2, channelBuffer.getRemaining());
+    }
+
+    public void testExpandCapacityMultiplePages() {
+        InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
+
+        assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
+
+        int multiple = randomInt(80);
+        channelBuffer.ensureCapacity(PAGE_SIZE + ((multiple * PAGE_SIZE) - randomInt(500)));
+
+        assertEquals(PAGE_SIZE * (multiple + 1), channelBuffer.getCapacity());
+        assertEquals(PAGE_SIZE * (multiple + 1), channelBuffer.getRemaining());
+    }
+
+    public void testExpandCapacityRespectsOffset() {
+        InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
+
+        assertEquals(PAGE_SIZE, channelBuffer.getCapacity());
+        assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
+
+        int offset = randomInt(300);
+
+        channelBuffer.release(offset);
+
+        assertEquals(PAGE_SIZE - offset, channelBuffer.getCapacity());
+        assertEquals(PAGE_SIZE - offset, channelBuffer.getRemaining());
+
+        channelBuffer.ensureCapacity(PAGE_SIZE + 1);
+
+        assertEquals(PAGE_SIZE * 2 - offset, channelBuffer.getCapacity());
+        assertEquals(PAGE_SIZE * 2 - offset, channelBuffer.getRemaining());
+    }
+
+    public void testIncrementIndex() {
+        InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
+
+        assertEquals(0, channelBuffer.getIndex());
+        assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
+
+        channelBuffer.incrementIndex(10);
+
+        assertEquals(10, channelBuffer.getIndex());
+        assertEquals(PAGE_SIZE - 10, channelBuffer.getRemaining());
+    }
+
+    public void testIncrementIndexWithOffset() {
+        InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
+
+        assertEquals(0, channelBuffer.getIndex());
+        assertEquals(PAGE_SIZE, channelBuffer.getRemaining());
+
+        channelBuffer.release(10);
+        assertEquals(PAGE_SIZE - 10, channelBuffer.getRemaining());
+
+        channelBuffer.incrementIndex(10);
+
+        assertEquals(10, channelBuffer.getIndex());
+        assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining());
+
+        channelBuffer.release(2);
+        assertEquals(8, channelBuffer.getIndex());
+        assertEquals(PAGE_SIZE - 20, channelBuffer.getRemaining());
+    }
+
+    public void testAccessByteBuffers() {
+        InboundChannelBuffer channelBuffer = new InboundChannelBuffer();
+
+        int pages = randomInt(50) + 5;
+        channelBuffer.ensureCapacity(pages * PAGE_SIZE);
+
+        long capacity = channelBuffer.getCapacity();
+
+        ByteBuffer[] postIndexBuffers = channelBuffer.sliceBuffersFrom(channelBuffer.getIndex());
+        int i = 0;
+        for (ByteBuffer buffer : postIndexBuffers) {
+            while (buffer.hasRemaining()) {
+                buffer.put((byte) (i++ % 127));
+            }
+        }
+
+        int indexIncremented = 0;
+        int bytesReleased = 0;
+        while (indexIncremented < capacity) {
+            assertEquals(indexIncremented - bytesReleased, channelBuffer.getIndex());
+
+            long amountToInc = Math.min(randomInt(2000), channelBuffer.getRemaining());
+            ByteBuffer[] postIndexBuffers2 = channelBuffer.sliceBuffersFrom(channelBuffer.getIndex());
+            assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased) % 127), postIndexBuffers2[0].get());
+            ByteBuffer[] preIndexBuffers = channelBuffer.sliceBuffersTo(channelBuffer.getIndex());
+            if (preIndexBuffers.length > 0) {
+                ByteBuffer preIndexBuffer = preIndexBuffers[preIndexBuffers.length - 1];
+                assertEquals((byte) ((channelBuffer.getIndex() + bytesReleased - 1) % 127), preIndexBuffer.get(preIndexBuffer.limit() - 1));
+            }
+            if (randomBoolean()) {
+                long bytesToRelease = Math.min(randomInt(50), channelBuffer.getIndex());
+                channelBuffer.release(bytesToRelease);
+                bytesReleased += bytesToRelease;
+            }
+            channelBuffer.incrementIndex(amountToInc);
+            indexIncremented += amountToInc;
+        }
+
+        assertEquals(0, channelBuffer.sliceBuffersFrom(channelBuffer.getIndex()).length);
+    }
+}

+ 2 - 4
test/framework/src/test/java/org/elasticsearch/transport/nio/SocketEventHandlerTests.java

@@ -122,8 +122,7 @@ public class SocketEventHandlerTests extends ESTestCase {
         assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps());
 
         BytesArray bytesArray = new BytesArray(new byte[1]);
-        NetworkBytesReference networkBuffer = NetworkBytesReference.wrap(bytesArray);
-        channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, networkBuffer, mock(ActionListener.class)));
+        channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, bytesArray, mock(ActionListener.class)));
 
         when(rawChannel.write(ByteBuffer.wrap(bytesArray.array()))).thenReturn(1);
         handler.handleWrite(channel);
@@ -138,8 +137,7 @@ public class SocketEventHandlerTests extends ESTestCase {
         assertEquals(SelectionKey.OP_READ | SelectionKey.OP_WRITE, selectionKey.interestOps());
 
         BytesArray bytesArray = new BytesArray(new byte[1]);
-        NetworkBytesReference networkBuffer = NetworkBytesReference.wrap(bytesArray, 1, 0);
-        channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, networkBuffer, mock(ActionListener.class)));
+        channel.getWriteContext().queueWriteOperations(new WriteOperation(channel, bytesArray, mock(ActionListener.class)));
 
         when(rawChannel.write(ByteBuffer.wrap(bytesArray.array()))).thenReturn(0);
         handler.handleWrite(channel);

+ 3 - 3
test/framework/src/test/java/org/elasticsearch/transport/nio/SocketSelectorTests.java

@@ -21,6 +21,7 @@ package org.elasticsearch.transport.nio;
 
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.transport.nio.channel.NioSocketChannel;
 import org.elasticsearch.transport.nio.channel.WriteContext;
@@ -53,7 +54,7 @@ public class SocketSelectorTests extends ESTestCase {
     private TestSelectionKey selectionKey;
     private WriteContext writeContext;
     private ActionListener<Void> listener;
-    private NetworkBytesReference bufferReference = NetworkBytesReference.wrap(new BytesArray(new byte[1]));
+    private BytesReference bufferReference = new BytesArray(new byte[1]);
     private Selector rawSelector;
 
     @Before
@@ -294,8 +295,7 @@ public class SocketSelectorTests extends ESTestCase {
 
         socketSelector.preSelect();
 
-        NetworkBytesReference networkBuffer = NetworkBytesReference.wrap(new BytesArray(new byte[1]));
-        socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), networkBuffer, listener));
+        socketSelector.queueWrite(new WriteOperation(mock(NioSocketChannel.class), new BytesArray(new byte[1]), listener));
         socketSelector.scheduleForRegistration(unRegisteredChannel);
 
         TestSelectionKey testSelectionKey = new TestSelectionKey(0);

+ 3 - 11
test/framework/src/test/java/org/elasticsearch/transport/nio/WriteOperationTests.java

@@ -26,6 +26,7 @@ import org.elasticsearch.transport.nio.channel.NioSocketChannel;
 import org.junit.Before;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
@@ -48,11 +49,7 @@ public class WriteOperationTests extends ESTestCase {
         WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener);
 
 
-        when(channel.write(any())).thenAnswer(invocationOnMock -> {
-            NetworkBytesReference[] refs = (NetworkBytesReference[]) invocationOnMock.getArguments()[0];
-            refs[0].incrementRead(10);
-            return 10;
-        });
+        when(channel.write(any(ByteBuffer[].class))).thenReturn(10);
 
         writeOp.flush();
 
@@ -62,15 +59,10 @@ public class WriteOperationTests extends ESTestCase {
     public void testPartialFlush() throws IOException {
         WriteOperation writeOp = new WriteOperation(channel, new BytesArray(new byte[10]), listener);
 
-        when(channel.write(any())).thenAnswer(invocationOnMock -> {
-            NetworkBytesReference[] refs = (NetworkBytesReference[]) invocationOnMock.getArguments()[0];
-            refs[0].incrementRead(5);
-            return 5;
-        });
+        when(channel.write(any(ByteBuffer[].class))).thenReturn(5);
 
         writeOp.flush();
 
         assertFalse(writeOp.isFullyFlushed());
-        assertEquals(5, writeOp.getByteReferences()[0].getReadRemaining());
     }
 }

+ 8 - 10
test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpFrameDecoderTests.java

@@ -43,10 +43,8 @@ public class TcpFrameDecoderTests extends ESTestCase {
         streamOutput.write('S');
         streamOutput.write(1);
         streamOutput.write(1);
-        streamOutput.write(0);
-        streamOutput.write(0);
 
-        assertNull(frameDecoder.decode(streamOutput.bytes(), 4));
+        assertNull(frameDecoder.decode(streamOutput.bytes()));
         assertEquals(-1, frameDecoder.expectedMessageLength());
     }
 
@@ -56,7 +54,7 @@ public class TcpFrameDecoderTests extends ESTestCase {
         streamOutput.write('S');
         streamOutput.writeInt(-1);
 
-        BytesReference message = frameDecoder.decode(streamOutput.bytes(), 6);
+        BytesReference message = frameDecoder.decode(streamOutput.bytes());
 
         assertEquals(-1, frameDecoder.expectedMessageLength());
         assertEquals(streamOutput.bytes(), message);
@@ -70,7 +68,7 @@ public class TcpFrameDecoderTests extends ESTestCase {
         streamOutput.write('E');
         streamOutput.write('S');
 
-        BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8);
+        BytesReference message = frameDecoder.decode(streamOutput.bytes());
 
         assertEquals(6, message.length());
         assertEquals(streamOutput.bytes().slice(0, 6), message);
@@ -84,7 +82,7 @@ public class TcpFrameDecoderTests extends ESTestCase {
         streamOutput.write('M');
         streamOutput.write('A');
 
-        BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8);
+        BytesReference message = frameDecoder.decode(streamOutput.bytes());
 
         assertEquals(-1, frameDecoder.expectedMessageLength());
         assertEquals(streamOutput.bytes(), message);
@@ -98,7 +96,7 @@ public class TcpFrameDecoderTests extends ESTestCase {
         streamOutput.write('M');
         streamOutput.write('A');
 
-        BytesReference message = frameDecoder.decode(streamOutput.bytes(), 8);
+        BytesReference message = frameDecoder.decode(streamOutput.bytes());
 
         assertEquals(9, frameDecoder.expectedMessageLength());
         assertNull(message);
@@ -113,7 +111,7 @@ public class TcpFrameDecoderTests extends ESTestCase {
         streamOutput.write('A');
 
         try {
-            frameDecoder.decode(streamOutput.bytes(), 8);
+            frameDecoder.decode(streamOutput.bytes());
             fail("Expected exception");
         } catch (Exception ex) {
             assertThat(ex, instanceOf(StreamCorruptedException.class));
@@ -134,7 +132,7 @@ public class TcpFrameDecoderTests extends ESTestCase {
         streamOutput.write(randomByte());
 
         try {
-            frameDecoder.decode(streamOutput.bytes(), 7);
+            frameDecoder.decode(streamOutput.bytes());
             fail("Expected exception");
         } catch (Exception ex) {
             assertThat(ex, instanceOf(StreamCorruptedException.class));
@@ -158,7 +156,7 @@ public class TcpFrameDecoderTests extends ESTestCase {
 
             try {
                 BytesReference bytes = streamOutput.bytes();
-                frameDecoder.decode(bytes, bytes.length());
+                frameDecoder.decode(bytes);
                 fail("Expected exception");
             } catch (Exception ex) {
                 assertThat(ex, instanceOf(TcpTransport.HttpOnTransportException.class));

+ 16 - 16
test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpReadContextTests.java

@@ -22,13 +22,13 @@ package org.elasticsearch.transport.nio.channel;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.CompositeBytesReference;
 import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.transport.nio.NetworkBytesReference;
+import org.elasticsearch.transport.nio.InboundChannelBuffer;
 import org.elasticsearch.transport.nio.TcpReadHandler;
 import org.junit.Before;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.mockito.Matchers.any;
@@ -57,13 +57,13 @@ public class TcpReadContextTests extends ESTestCase {
         byte[] bytes = createMessage(messageLength);
         byte[] fullMessage = combineMessageAndHeader(bytes);
 
-        final AtomicInteger bufferCapacity = new AtomicInteger();
-        when(channel.read(any(NetworkBytesReference.class))).thenAnswer(invocationOnMock -> {
-            NetworkBytesReference reference = (NetworkBytesReference) invocationOnMock.getArguments()[0];
-            ByteBuffer buffer = reference.getWriteByteBuffer();
-            bufferCapacity.set(reference.getWriteRemaining());
-            buffer.put(fullMessage);
-            reference.incrementWrite(fullMessage.length);
+        final AtomicLong bufferCapacity = new AtomicLong();
+        when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> {
+            InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0];
+            ByteBuffer byteBuffer = buffer.sliceBuffersFrom(buffer.getIndex())[0];
+            bufferCapacity.set(buffer.getCapacity() - buffer.getIndex());
+            byteBuffer.put(fullMessage);
+            buffer.incrementIndex(fullMessage.length);
             return fullMessage.length;
         });
 
@@ -82,15 +82,15 @@ public class TcpReadContextTests extends ESTestCase {
         byte[] fullPart1 = combineMessageAndHeader(part1, messageLength + messageLength);
         byte[] part2 = createMessage(messageLength);
 
-        final AtomicInteger bufferCapacity = new AtomicInteger();
+        final AtomicLong bufferCapacity = new AtomicLong();
         final AtomicReference<byte[]> bytes = new AtomicReference<>();
 
-        when(channel.read(any(NetworkBytesReference.class))).thenAnswer(invocationOnMock -> {
-            NetworkBytesReference reference = (NetworkBytesReference) invocationOnMock.getArguments()[0];
-            ByteBuffer buffer = reference.getWriteByteBuffer();
-            bufferCapacity.set(reference.getWriteRemaining());
-            buffer.put(bytes.get());
-            reference.incrementWrite(bytes.get().length);
+        when(channel.read(any(InboundChannelBuffer.class))).thenAnswer(invocationOnMock -> {
+            InboundChannelBuffer buffer = (InboundChannelBuffer) invocationOnMock.getArguments()[0];
+            ByteBuffer byteBuffer = buffer.sliceBuffersFrom(buffer.getIndex())[0];
+            bufferCapacity.set(buffer.getCapacity() - buffer.getIndex());
+            byteBuffer.put(bytes.get());
+            buffer.incrementIndex(bytes.get().length);
             return bytes.get().length;
         });
 

+ 2 - 2
test/framework/src/test/java/org/elasticsearch/transport/nio/channel/TcpWriteContextTests.java

@@ -80,7 +80,7 @@ public class TcpWriteContextTests extends ESTestCase {
 
         assertSame(listener, writeOp.getListener());
         assertSame(channel, writeOp.getChannel());
-        assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0].getReadByteBuffer());
+        assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteBuffers()[0]);
     }
 
     public void testSendMessageFromSameThreadIsQueuedInChannel() throws Exception {
@@ -97,7 +97,7 @@ public class TcpWriteContextTests extends ESTestCase {
 
         assertSame(listener, writeOp.getListener());
         assertSame(channel, writeOp.getChannel());
-        assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteReferences()[0].getReadByteBuffer());
+        assertEquals(ByteBuffer.wrap(bytes), writeOp.getByteBuffers()[0]);
     }
 
     public void testWriteIsQueuedInChannel() throws Exception {