Browse Source

Fix bug in copying bytes for socket write (#45463)

Currently we take the array of nio buffers from the netty channel
outbound buffer and copy their bytes to a direct buffer. In the process
we mutate the nio buffer positions. It seems like netty will continue to
reuse these buffers. This means than any data that is not flushed in a
call is lost. This commit fixes this by incrementing the positions after
the flush has completed. This is similar to the behavior that
SocketChannel would have provided and netty relied upon.

Fixes #45444.
Tim Brooks 6 years ago
parent
commit
4a0aa720ed

+ 12 - 7
modules/transport-netty4/src/main/java/org/elasticsearch/transport/CopyBytesSocketChannel.java

@@ -42,8 +42,6 @@ import org.elasticsearch.common.SuppressForbidden;
 
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.util.Arrays;
-import java.util.Objects;
 
 import static io.netty.channel.internal.ChannelUtils.MAX_BYTES_PER_GATHERING_WRITE_ATTEMPTED_LOW_THRESHOLD;
 
@@ -89,7 +87,6 @@ public class CopyBytesSocketChannel extends NioSocketChannel {
             // Ensure the pending writes are made of ByteBufs only.
             int maxBytesPerGatheringWrite = writeConfig.getMaxBytesPerGatheringWrite();
             ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
-            assert Arrays.stream(nioBuffers).filter(Objects::nonNull).noneMatch(ByteBuffer::isDirect) : "Expected all to be heap buffers";
             int nioBufferCnt = in.nioBufferCount();
 
             if (nioBufferCnt == 0) {// We have something else beside ByteBuffers to write so fallback to normal writes.
@@ -108,6 +105,7 @@ public class CopyBytesSocketChannel extends NioSocketChannel {
                     return;
                 }
                 adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
+                setWrittenBytes(nioBuffers, localWrittenBytes);
                 in.removeBytes(localWrittenBytes);
                 --writeSpinCount;
             }
@@ -151,11 +149,18 @@ public class CopyBytesSocketChannel extends NioSocketChannel {
     private static void copyBytes(ByteBuffer[] source, int nioBufferCnt, ByteBuffer destination) {
         for (int i = 0; i < nioBufferCnt && destination.hasRemaining(); i++) {
             ByteBuffer buffer = source[i];
+            assert buffer.hasArray() : "Buffer must have heap array";
             int nBytesToCopy = Math.min(destination.remaining(), buffer.remaining());
-            int initialLimit = buffer.limit();
-            buffer.limit(buffer.position() + nBytesToCopy);
-            destination.put(buffer);
-            buffer.limit(initialLimit);
+            destination.put(buffer.array(), buffer.arrayOffset() + buffer.position(), nBytesToCopy);
+        }
+    }
+
+    private static void setWrittenBytes(ByteBuffer[] source, int bytesWritten) {
+        for (int i = 0; bytesWritten > 0; i++) {
+            ByteBuffer buffer = source[i];
+            int nBytes = Math.min(buffer.remaining(), bytesWritten);
+            buffer.position(buffer.position() + nBytes);
+            bytesWritten = bytesWritten - nBytes;
         }
     }