Browse Source

Make sure messages are fully read even in case of EOS markers.

When using compression over the network, you might sometimes see warnings that
the stream was not fully read. This is because DeflaterOutputStream adds an
end-of-stream marker. When deserializing, we need to poll for one byte using
InputStream.read() to make sure to decode this EOS marker.

For the record, it does not strike all the time today because we perform
buffering when decompressing to avoid performing too many JNI calls, but it
is easy to make this warning happen all the time by decreasing the size of
the buffer we use.

Close #11748
Adrien Grand 10 years ago
parent
commit
1bfa722d43

+ 76 - 47
core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.transport.netty;
 
+import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.component.Lifecycle;
 import org.elasticsearch.common.compress.Compressor;
@@ -90,62 +91,90 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
         byte status = buffer.readByte();
         Version version = Version.fromId(buffer.readInt());
 
-        StreamInput wrappedStream;
-        if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
-            Compressor compressor;
-            try {
-                compressor = CompressorFactory.compressor(buffer);
-            } catch (NotCompressedException ex) {
-                int maxToRead = Math.min(buffer.readableBytes(), 10);
-                int offset = buffer.readerIndex();
-                StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are [");
-                for (int i = 0; i < maxToRead; i++) {
-                    sb.append(buffer.getByte(offset + i)).append(",");
+        StreamInput wrappedStream = null;
+        try {
+            if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
+                Compressor compressor;
+                try {
+                    compressor = CompressorFactory.compressor(buffer);
+                } catch (NotCompressedException ex) {
+                    int maxToRead = Math.min(buffer.readableBytes(), 10);
+                    int offset = buffer.readerIndex();
+                    StringBuilder sb = new StringBuilder("stream marked as compressed, but no compressor found, first [").append(maxToRead).append("] content bytes out of [").append(buffer.readableBytes()).append("] readable bytes with message size [").append(size).append("] ").append("] are [");
+                    for (int i = 0; i < maxToRead; i++) {
+                        sb.append(buffer.getByte(offset + i)).append(",");
+                    }
+                    sb.append("]");
+                    throw new IllegalStateException(sb.toString());
                 }
-                sb.append("]");
-                throw new IllegalStateException(sb.toString());
+                wrappedStream = compressor.streamInput(streamIn);
+            } else {
+                wrappedStream = streamIn;
             }
-            wrappedStream = compressor.streamInput(streamIn);
-        } else {
-            wrappedStream = streamIn;
-        }
-        wrappedStream.setVersion(version);
+            wrappedStream.setVersion(version);
 
-        if (TransportStatus.isRequest(status)) {
-            String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
-            if (buffer.readerIndex() != expectedIndexReader) {
-                if (buffer.readerIndex() < expectedIndexReader) {
-                    logger.warn("Message not fully read (request) for requestId [{}], action [{}], readerIndex [{}] vs expected [{}]; resetting",
-                                requestId, action, buffer.readerIndex(), expectedIndexReader);
-                } else {
-                    logger.warn("Message read past expected size (request) for requestId=[{}], action [{}], readerIndex [{}] vs expected [{}]; resetting",
-                                requestId, action, buffer.readerIndex(), expectedIndexReader);
-                }
-                buffer.readerIndex(expectedIndexReader);
-            }
-        } else {
-            TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
-            // ignore if its null, the adapter logs it
-            if (handler != null) {
-                if (TransportStatus.isError(status)) {
-                    handlerResponseError(wrappedStream, handler);
-                } else {
-                    handleResponse(ctx.getChannel(), wrappedStream, handler);
+            if (TransportStatus.isRequest(status)) {
+                String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
+                boolean success = false;
+                try {
+                    final int nextByte = wrappedStream.read();
+                    // calling read() is useful to make sure the message is fully read, even if there is an EOS marker
+                    if (nextByte != -1) {
+                        throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
+                                + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
+                    }
+                    if (buffer.readerIndex() < expectedIndexReader) {
+                        throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
+                    }
+                    if (buffer.readerIndex() > expectedIndexReader) {
+                        throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
+                                + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
+                    }
+                    success = true;
+                } finally {
+                    if (!success) {
+                        buffer.readerIndex(expectedIndexReader);
+                    }
                 }
             } else {
-                // if its null, skip those bytes
-                buffer.readerIndex(markedReaderIndex + size);
-            }
-            if (buffer.readerIndex() != expectedIndexReader) {
-                if (buffer.readerIndex() < expectedIndexReader) {
-                    logger.warn("Message not fully read (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
+                TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
+                // ignore if its null, the adapter logs it
+                if (handler != null) {
+                    if (TransportStatus.isError(status)) {
+                        handlerResponseError(wrappedStream, handler);
+                    } else {
+                        handleResponse(ctx.getChannel(), wrappedStream, handler);
+                    }
                 } else {
-                    logger.warn("Message read past expected size (response) for [{}] handler {}, error [{}], resetting", requestId, handler, TransportStatus.isError(status));
+                    // if its null, skip those bytes
+                    buffer.readerIndex(markedReaderIndex + size);
+                }
+
+                boolean success = false;
+                try {
+                    final int nextByte = wrappedStream.read();
+                    // calling read() is useful to make sure the message is fully read, even if there is an EOS marker
+                    if (nextByte != -1) {
+                        throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
+                                + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
+                    }
+                    if (buffer.readerIndex() < expectedIndexReader) {
+                        throw new IllegalStateException("Message is fully read (response), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
+                    }
+                    if (buffer.readerIndex() > expectedIndexReader) {
+                        throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler ["
+                                + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
+                    }
+                    success = true;
+                } finally {
+                    if (!success) {
+                        buffer.readerIndex(expectedIndexReader);
+                    }
                 }
-                buffer.readerIndex(expectedIndexReader);
             }
+        } finally {
+            IOUtils.close(wrappedStream);
         }
-        wrappedStream.close();
     }
 
     protected void handleResponse(Channel channel, StreamInput buffer, final TransportResponseHandler handler) {