Browse Source

Merge pull request #11988 from jpountz/fix/zlib_skip_underlying

Transport: Do not make the buffer skip while a stream is open.
Adrien Grand 10 years ago
parent
commit
6765635067

+ 39 - 47
core/src/main/java/org/elasticsearch/transport/netty/MessageChannelHandler.java

@@ -85,13 +85,12 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
         // netty always copies a buffer, either in NioWorker in its read handler, where it copies to a fresh
         // buffer, or in the cumlation buffer, which is cleaned each time
         StreamInput streamIn = ChannelBufferStreamInputFactory.create(buffer, size);
-
-        long requestId = buffer.readLong();
-        byte status = buffer.readByte();
-        Version version = Version.fromId(buffer.readInt());
-
-        StreamInput wrappedStream = null;
+        boolean success = false;
         try {
+            long requestId = streamIn.readLong();
+            byte status = streamIn.readByte();
+            Version version = Version.fromId(streamIn.readInt());
+
             if (TransportStatus.isCompress(status) && hasMessageBytesToRead && buffer.readable()) {
                 Compressor compressor;
                 try {
@@ -106,52 +105,40 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
                     sb.append("]");
                     throw new IllegalStateException(sb.toString());
                 }
-                wrappedStream = compressor.streamInput(streamIn);
-            } else {
-                wrappedStream = streamIn;
+                streamIn = compressor.streamInput(streamIn);
             }
-            wrappedStream.setVersion(version);
+            streamIn.setVersion(version);
 
             if (TransportStatus.isRequest(status)) {
-                String action = handleRequest(ctx.getChannel(), wrappedStream, requestId, version);
-                boolean success = false;
-                try {
-                    final int nextByte = wrappedStream.read();
-                    // calling read() is useful to make sure the message is fully read, even if there is an EOS marker
-                    if (nextByte != -1) {
-                        throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
-                                + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
-                    }
-                    if (buffer.readerIndex() < expectedIndexReader) {
-                        throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
-                    }
-                    if (buffer.readerIndex() > expectedIndexReader) {
-                        throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
-                                + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
-                    }
-                    success = true;
-                } finally {
-                    if (!success) {
-                        buffer.readerIndex(expectedIndexReader);
-                    }
+                String action = handleRequest(ctx.getChannel(), streamIn, requestId, version);
+
+                // Chek the entire message has been read
+                final int nextByte = streamIn.read();
+                // calling read() is useful to make sure the message is fully read, even if there some kind of EOS marker
+                if (nextByte != -1) {
+                    throw new IllegalStateException("Message not fully read (request) for requestId [" + requestId + "], action ["
+                            + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
+                }
+                if (buffer.readerIndex() < expectedIndexReader) {
+                    throw new IllegalStateException("Message is fully read (request), yet there are " + (expectedIndexReader - buffer.readerIndex()) + " remaining bytes; resetting");
                 }
+                if (buffer.readerIndex() > expectedIndexReader) {
+                    throw new IllegalStateException("Message read past expected size (request) for requestId [" + requestId + "], action ["
+                            + action + "], readerIndex [" + buffer.readerIndex() + "] vs expected [" + expectedIndexReader + "]; resetting");
+                }
+
             } else {
-                TransportResponseHandler handler = transportServiceAdapter.onResponseReceived(requestId);
+                TransportResponseHandler<?> handler = transportServiceAdapter.onResponseReceived(requestId);
                 // ignore if its null, the adapter logs it
                 if (handler != null) {
                     if (TransportStatus.isError(status)) {
-                        handlerResponseError(wrappedStream, handler);
+                        handlerResponseError(streamIn, handler);
                     } else {
-                        handleResponse(ctx.getChannel(), wrappedStream, handler);
+                        handleResponse(ctx.getChannel(), streamIn, handler);
                     }
-                } else {
-                    // if its null, skip those bytes
-                    buffer.readerIndex(markedReaderIndex + size);
-                }
 
-                boolean success = false;
-                try {
-                    final int nextByte = wrappedStream.read();
+                    // Chek the entire message has been read
+                    final int nextByte = streamIn.read();
                     // calling read() is useful to make sure the message is fully read, even if there is an EOS marker
                     if (nextByte != -1) {
                         throw new IllegalStateException("Message not fully read (response) for requestId [" + requestId + "], handler ["
@@ -164,15 +151,20 @@ public class MessageChannelHandler extends SimpleChannelUpstreamHandler {
                         throw new IllegalStateException("Message read past expected size (response) for requestId [" + requestId + "], handler ["
                                 + handler + "], error [" + TransportStatus.isError(status) + "]; resetting");
                     }
-                    success = true;
-                } finally {
-                    if (!success) {
-                        buffer.readerIndex(expectedIndexReader);
-                    }
+
                 }
             }
         } finally {
-            IOUtils.close(wrappedStream);
+            try {
+                if (success) {
+                    IOUtils.close(streamIn);
+                } else {
+                    IOUtils.closeWhileHandlingException(streamIn);
+                }
+            } finally {
+                // Set the expected position of the buffer, no matter what happened
+                buffer.readerIndex(expectedIndexReader);
+            }
         }
     }
 

+ 0 - 1
core/src/test/java/org/elasticsearch/transport/AbstractSimpleTransportTests.java

@@ -575,7 +575,6 @@ public abstract class AbstractSimpleTransportTests extends ElasticsearchTestCase
 
     @Test
     @TestLogging(value = "test. transport.tracer:TRACE")
-    @AwaitsFix(bugUrl = "@spinscale is looking into failures: http://build-us-00.elastic.co/job/es_core_master_strong/3986/")
     public void testTracerLog() throws InterruptedException {
         TransportRequestHandler handler = new TransportRequestHandler<StringMessageRequest>() {
             @Override