Browse Source

Merge pull request #11768 from jpountz/fix/stream_eos

Make sure messages are fully read even in case of EOS markers.
Adrien Grand 10 years ago
parent
commit
ed561cd434

+ 76 - 47
core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.transport.netty;
 
+import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.compress.Compressor;
@@ -90,62 +91,90 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
         byte status = buffer.readByte();
         Version version = Version.fromId(buffer.readInt());
 
-        StreamInput wrappedStream;
-        if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
-            Compressor compressor;
-            try {
-                compressor = CompressorFactory.compressor(buffer);
-            } catch (NotCompressedException ex) {
-                int maxToRead = Math.min(buffer.readableBytes(), 10);
-                int offset = buffer.readerIndex();
-                StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are [");
-                for (int i = 0; i < maxToRead; i++) {
-                    sb.append(buffer.getByte(offset + i)).append(",");
+        StreamInput wrappedStream = null;
+        try {
+            if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
+                Compressor compressor;
+                try {
+                    compressor = CompressorFactory.compressor(buffer);
+                } catch (NotCompressedException ex) {
+                    int maxToRead = Math.min(buffer.readableBytes(), 10);
+                    int offset = buffer.readerIndex();
+                    StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are [");
+                    for (int i = 0; i < maxToRead; i++) {
+                        sb.append(buffer.getByte(offset + i)).append(",");
+                    }
+                    sb.append("]");
+                    throw new IllegalStateException(sb.toString());
                 }
-                sb.append("]");
-                throw new IllegalStateException(sb.toString());
+                wrappedStream = compressor.streamInput(streamIn);
+            } else {
+                wrappedStream = streamIn;
             }
-            wrappedStream = compressor.streamInput(streamIn);
-        } else {
-            wrappedStream = streamIn;
-        }
-        wrappedStream.setVersion(version);
+            wrappedStream.setVersion(version);
 
-        if (TransportStatus.isRequest(status)) {
-            String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
-            if (buffer.readerIndex() != expectedIndexReader) {
-                if (buffer.readerIndex() < expectedIndexReader) {
-                    logger.warn("Message not fully read (request) for requestId [{}], action [{}], readerIndex [{}] vs expected [{}]; resetting",
-                                requestId, action, buffer.readerIndex(), expectedIndexReader);
-                } else {
-                    logger.warn("Message read past expected size (request) for requestId=[{}], action [{}], readerIndex [{}] vs expected [{}]; resetting",
-                                requestId, action, buffer.readerIndex(), expectedIndexReader);
-                }
-                buffer.readerIndex(expectedIndexReader);
-            }
-        } else {
-            TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
-            // ignore if its null, the adapter logs it
-            if (handler != null) {
-                if (TransportStatus.isError(status)) {
-                    handlerResponseError(wrappedStream, handler);
-                } else {
-                    handleResponse(ctx.getChannel(), wrappedStream, handler);
+            if (TransportStatus.isRequest(status)) {
+                String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
+                boolean success = false;
+                try {
+                    final int nextByte = wrappedStream.read();
+                    // calling read() is useful to make sure the message is fully read, even if there is an EOS marker
+                    if (nextByte != -1) {
+                        throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
+                                + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
+                    }
+                    if (buffer.readerIndex() < expectedIndexReader) {
+                        throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
+                    }
+                    if (buffer.readerIndex() > expectedIndexReader) {
+                        throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
+                                + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
+                    }
+                    success = true;
+                } finally {
+                    if (!success) {
+                        buffer.readerIndex(expectedIndexReader);
+                    }
                 }
             } else {
-                // if its null, skip those bytes
-                buffer.readerIndex(markedReaderIndex + size);
-            }
-            if (buffer.readerIndex() != expectedIndexReader) {
-                if (buffer.readerIndex() < expectedIndexReader) {
-                    logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
+                TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
+                // ignore if its null, the adapter logs it
+                if (handler != null) {
+                    if (TransportStatus.isError(status)) {
+                        handlerResponseError(wrappedStream, handler);
+                    } else {
+                        handleResponse(ctx.getChannel(), wrappedStream, handler);
+                    }
                 } else {
-                    logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
+                    // if its null, skip those bytes
+                    buffer.readerIndex(markedReaderIndex + size);
+                }
+
+                boolean success = false;
+                try {
+                    final int nextByte = wrappedStream.read();
+                    // calling read() is useful to make sure the message is fully read, even if there is an EOS marker
+                    if (nextByte != -1) {
+                        throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
+                                + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
+                    }
+                    if (buffer.readerIndex() < expectedIndexReader) {
+                        throw new IllegalStateException("Message is fully read (response), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
+                    }
+                    if (buffer.readerIndex() > expectedIndexReader) {
+                        throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler ["
+                                + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
+                    }
+                    success = true;
+                } finally {
+                    if (!success) {
+                        buffer.readerIndex(expectedIndexReader);
+                    }
                 }
-                buffer.readerIndex(expectedIndexReader);
             }
+        } finally {
+            IOUtils.close(wrappedStream);
         }
-        wrappedStream.close();
     }
 
     protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {