Ver código fonte

Fix stream leak in InboundDecoderTest (#80371)

We recently changed the outboud serialization stream. The stream now
uses the mock page cache recycler. The tests will throw errors if this
stream is not released. This commit resolves a leak where tests create
this stream without releasing.
Tim Brooks 4 anos atrás
pai
commit
ace74d6884

+ 177 - 161
server/src/test/java/org/elasticsearch/transport/InboundDecoderTests.java

@@ -11,6 +11,7 @@ package org.elasticsearch.transport;
 import org.elasticsearch.Version;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
+import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.MockPageCacheRecycler;
@@ -69,45 +70,48 @@ public class InboundDecoderTests extends ESTestCase {
             );
         }
 
-        final BytesReference totalBytes = message.serialize(new RecyclerBytesStreamOutput(recycler));
-        int totalHeaderSize = TcpHeader.headerSize(Version.CURRENT) + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
-        final BytesReference messageBytes = totalBytes.slice(totalHeaderSize, totalBytes.length() - totalHeaderSize);
-
-        InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
-        final ArrayList<Object> fragments = new ArrayList<>();
-        final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
-        int bytesConsumed = decoder.decode(releasable1, fragments::add);
-        assertEquals(totalHeaderSize, bytesConsumed);
-        assertEquals(1, releasable1.refCount());
-
-        final Header header = (Header) fragments.get(0);
-        assertEquals(requestId, header.getRequestId());
-        assertEquals(Version.CURRENT, header.getVersion());
-        assertFalse(header.isCompressed());
-        assertFalse(header.isHandshake());
-        if (isRequest) {
-            assertEquals(action, header.getActionName());
-            assertTrue(header.isRequest());
-            assertEquals(header.getHeaders().v1().get(headerKey), headerValue);
-        } else {
-            assertTrue(header.isResponse());
-            assertThat(header.getHeaders().v2().get(headerKey), hasItems(headerValue));
+        try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
+            final BytesReference totalBytes = message.serialize(os);
+            int totalHeaderSize = TcpHeader.headerSize(Version.CURRENT) + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
+            final BytesReference messageBytes = totalBytes.slice(totalHeaderSize, totalBytes.length() - totalHeaderSize);
+
+            InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
+            final ArrayList<Object> fragments = new ArrayList<>();
+            final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
+            int bytesConsumed = decoder.decode(releasable1, fragments::add);
+            assertEquals(totalHeaderSize, bytesConsumed);
+            assertEquals(1, releasable1.refCount());
+
+            final Header header = (Header) fragments.get(0);
+            assertEquals(requestId, header.getRequestId());
+            assertEquals(Version.CURRENT, header.getVersion());
+            assertFalse(header.isCompressed());
+            assertFalse(header.isHandshake());
+            if (isRequest) {
+                assertEquals(action, header.getActionName());
+                assertTrue(header.isRequest());
+                assertEquals(header.getHeaders().v1().get(headerKey), headerValue);
+            } else {
+                assertTrue(header.isResponse());
+                assertThat(header.getHeaders().v2().get(headerKey), hasItems(headerValue));
+            }
+            assertFalse(header.needsToReadVariableHeader());
+            fragments.clear();
+
+            final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
+            final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
+            int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
+            assertEquals(totalBytes.length() - totalHeaderSize, bytesConsumed2);
+
+            final Object content = fragments.get(0);
+            final Object endMarker = fragments.get(1);
+
+            assertEquals(messageBytes, content);
+            // Ref count is incremented since the bytes are forwarded as a fragment
+            assertEquals(2, releasable2.refCount());
+            assertEquals(InboundDecoder.END_CONTENT, endMarker);
         }
-        assertFalse(header.needsToReadVariableHeader());
-        fragments.clear();
 
-        final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
-        final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
-        int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
-        assertEquals(totalBytes.length() - totalHeaderSize, bytesConsumed2);
-
-        final Object content = fragments.get(0);
-        final Object endMarker = fragments.get(1);
-
-        assertEquals(messageBytes, content);
-        // Ref count is incremented since the bytes are forwarded as a fragment
-        assertEquals(2, releasable2.refCount());
-        assertEquals(InboundDecoder.END_CONTENT, endMarker);
     }
 
     public void testDecodePreHeaderSizeVariableInt() throws IOException {
@@ -128,42 +132,44 @@ public class InboundDecoderTests extends ESTestCase {
             compressionScheme
         );
 
-        final BytesReference totalBytes = message.serialize(new RecyclerBytesStreamOutput(recycler));
-        int partialHeaderSize = TcpHeader.headerSize(preHeaderVariableInt);
-
-        InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
-        final ArrayList<Object> fragments = new ArrayList<>();
-        final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
-        int bytesConsumed = decoder.decode(releasable1, fragments::add);
-        assertEquals(partialHeaderSize, bytesConsumed);
-        assertEquals(1, releasable1.refCount());
-
-        final Header header = (Header) fragments.get(0);
-        assertEquals(requestId, header.getRequestId());
-        assertEquals(preHeaderVariableInt, header.getVersion());
-        if (compressionScheme == null) {
-            assertFalse(header.isCompressed());
-        } else {
-            assertTrue(header.isCompressed());
-        }
-        assertTrue(header.isHandshake());
-        assertTrue(header.isRequest());
-        assertTrue(header.needsToReadVariableHeader());
-        fragments.clear();
-
-        final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
-        final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
-        int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
-        if (compressionScheme == null) {
-            assertEquals(2, fragments.size());
-        } else {
-            assertEquals(3, fragments.size());
-            final Object body = fragments.get(1);
-            assertThat(body, instanceOf(ReleasableBytesReference.class));
-            ((ReleasableBytesReference) body).close();
+        try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
+            final BytesReference totalBytes = message.serialize(os);
+            int partialHeaderSize = TcpHeader.headerSize(preHeaderVariableInt);
+
+            InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
+            final ArrayList<Object> fragments = new ArrayList<>();
+            final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
+            int bytesConsumed = decoder.decode(releasable1, fragments::add);
+            assertEquals(partialHeaderSize, bytesConsumed);
+            assertEquals(1, releasable1.refCount());
+
+            final Header header = (Header) fragments.get(0);
+            assertEquals(requestId, header.getRequestId());
+            assertEquals(preHeaderVariableInt, header.getVersion());
+            if (compressionScheme == null) {
+                assertFalse(header.isCompressed());
+            } else {
+                assertTrue(header.isCompressed());
+            }
+            assertTrue(header.isHandshake());
+            assertTrue(header.isRequest());
+            assertTrue(header.needsToReadVariableHeader());
+            fragments.clear();
+
+            final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
+            final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
+            int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
+            if (compressionScheme == null) {
+                assertEquals(2, fragments.size());
+            } else {
+                assertEquals(3, fragments.size());
+                final Object body = fragments.get(1);
+                assertThat(body, instanceOf(ReleasableBytesReference.class));
+                ((ReleasableBytesReference) body).close();
+            }
+            assertEquals(InboundDecoder.END_CONTENT, fragments.get(fragments.size() - 1));
+            assertEquals(totalBytes.length() - bytesConsumed, bytesConsumed2);
         }
-        assertEquals(InboundDecoder.END_CONTENT, fragments.get(fragments.size() - 1));
-        assertEquals(totalBytes.length() - bytesConsumed, bytesConsumed2);
     }
 
     public void testDecodeHandshakeCompatibility() throws IOException {
@@ -183,25 +189,28 @@ public class InboundDecoderTests extends ESTestCase {
             null
         );
 
-        final BytesReference bytes = message.serialize(new RecyclerBytesStreamOutput(recycler));
-        int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);
-
-        InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
-        final ArrayList<Object> fragments = new ArrayList<>();
-        final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
-        int bytesConsumed = decoder.decode(releasable1, fragments::add);
-        assertEquals(totalHeaderSize, bytesConsumed);
-        assertEquals(1, releasable1.refCount());
-
-        final Header header = (Header) fragments.get(0);
-        assertEquals(requestId, header.getRequestId());
-        assertEquals(handshakeCompat, header.getVersion());
-        assertFalse(header.isCompressed());
-        assertTrue(header.isHandshake());
-        assertTrue(header.isRequest());
-        // TODO: On 9.0 this will be true because all compatible versions with contain the variable header int
-        assertTrue(header.needsToReadVariableHeader());
-        fragments.clear();
+        try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
+            final BytesReference bytes = message.serialize(os);
+            int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);
+
+            InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
+            final ArrayList<Object> fragments = new ArrayList<>();
+            final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
+            int bytesConsumed = decoder.decode(releasable1, fragments::add);
+            assertEquals(totalHeaderSize, bytesConsumed);
+            assertEquals(1, releasable1.refCount());
+
+            final Header header = (Header) fragments.get(0);
+            assertEquals(requestId, header.getRequestId());
+            assertEquals(handshakeCompat, header.getVersion());
+            assertFalse(header.isCompressed());
+            assertTrue(header.isHandshake());
+            assertTrue(header.isRequest());
+            // TODO: On 9.0 this will be true because all compatible versions with contain the variable header int
+            assertTrue(header.needsToReadVariableHeader());
+            fragments.clear();
+        }
+
     }
 
     public void testCompressedDecode() throws IOException {
@@ -226,51 +235,54 @@ public class InboundDecoderTests extends ESTestCase {
             message = new OutboundMessage.Response(threadContext, transportMessage, Version.CURRENT, requestId, false, scheme);
         }
 
-        final BytesReference totalBytes = message.serialize(new RecyclerBytesStreamOutput(recycler));
-        final RecyclerBytesStreamOutput out = new RecyclerBytesStreamOutput(recycler);
-        transportMessage.writeTo(out);
-        final BytesReference uncompressedBytes = out.bytes();
-        int totalHeaderSize = TcpHeader.headerSize(Version.CURRENT) + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
-
-        InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
-        final ArrayList<Object> fragments = new ArrayList<>();
-        final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
-        int bytesConsumed = decoder.decode(releasable1, fragments::add);
-        assertEquals(totalHeaderSize, bytesConsumed);
-        assertEquals(1, releasable1.refCount());
-
-        final Header header = (Header) fragments.get(0);
-        assertEquals(requestId, header.getRequestId());
-        assertEquals(Version.CURRENT, header.getVersion());
-        assertTrue(header.isCompressed());
-        assertFalse(header.isHandshake());
-        if (isRequest) {
-            assertEquals(action, header.getActionName());
-            assertTrue(header.isRequest());
-            assertEquals(header.getHeaders().v1().get(headerKey), headerValue);
-        } else {
-            assertTrue(header.isResponse());
-            assertThat(header.getHeaders().v2().get(headerKey), hasItems(headerValue));
+        try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
+            final BytesReference totalBytes = message.serialize(os);
+            final BytesStreamOutput out = new BytesStreamOutput();
+            transportMessage.writeTo(out);
+            final BytesReference uncompressedBytes = out.bytes();
+            int totalHeaderSize = TcpHeader.headerSize(Version.CURRENT) + totalBytes.getInt(TcpHeader.VARIABLE_HEADER_SIZE_POSITION);
+
+            InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
+            final ArrayList<Object> fragments = new ArrayList<>();
+            final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(totalBytes);
+            int bytesConsumed = decoder.decode(releasable1, fragments::add);
+            assertEquals(totalHeaderSize, bytesConsumed);
+            assertEquals(1, releasable1.refCount());
+
+            final Header header = (Header) fragments.get(0);
+            assertEquals(requestId, header.getRequestId());
+            assertEquals(Version.CURRENT, header.getVersion());
+            assertTrue(header.isCompressed());
+            assertFalse(header.isHandshake());
+            if (isRequest) {
+                assertEquals(action, header.getActionName());
+                assertTrue(header.isRequest());
+                assertEquals(header.getHeaders().v1().get(headerKey), headerValue);
+            } else {
+                assertTrue(header.isResponse());
+                assertThat(header.getHeaders().v2().get(headerKey), hasItems(headerValue));
+            }
+            assertFalse(header.needsToReadVariableHeader());
+            fragments.clear();
+
+            final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
+            final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
+            int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
+            assertEquals(totalBytes.length() - totalHeaderSize, bytesConsumed2);
+
+            final Object compressionScheme = fragments.get(0);
+            final Object content = fragments.get(1);
+            final Object endMarker = fragments.get(2);
+
+            assertEquals(scheme, compressionScheme);
+            assertEquals(uncompressedBytes, content);
+            assertThat(content, instanceOf(ReleasableBytesReference.class));
+            ((ReleasableBytesReference) content).close();
+            // Ref count is not incremented since the bytes are immediately consumed on decompression
+            assertEquals(1, releasable2.refCount());
+            assertEquals(InboundDecoder.END_CONTENT, endMarker);
         }
-        assertFalse(header.needsToReadVariableHeader());
-        fragments.clear();
-
-        final BytesReference bytes2 = totalBytes.slice(bytesConsumed, totalBytes.length() - bytesConsumed);
-        final ReleasableBytesReference releasable2 = ReleasableBytesReference.wrap(bytes2);
-        int bytesConsumed2 = decoder.decode(releasable2, fragments::add);
-        assertEquals(totalBytes.length() - totalHeaderSize, bytesConsumed2);
-
-        final Object compressionScheme = fragments.get(0);
-        final Object content = fragments.get(1);
-        final Object endMarker = fragments.get(2);
-
-        assertEquals(scheme, compressionScheme);
-        assertEquals(uncompressedBytes, content);
-        assertThat(content, instanceOf(ReleasableBytesReference.class));
-        ((ReleasableBytesReference) content).close();
-        // Ref count is not incremented since the bytes are immediately consumed on decompression
-        assertEquals(1, releasable2.refCount());
-        assertEquals(InboundDecoder.END_CONTENT, endMarker);
+
     }
 
     public void testCompressedDecodeHandshakeCompatibility() throws IOException {
@@ -290,25 +302,27 @@ public class InboundDecoderTests extends ESTestCase {
             Compression.Scheme.DEFLATE
         );
 
-        final BytesReference bytes = message.serialize(new RecyclerBytesStreamOutput(recycler));
-        int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);
-
-        InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
-        final ArrayList<Object> fragments = new ArrayList<>();
-        final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
-        int bytesConsumed = decoder.decode(releasable1, fragments::add);
-        assertEquals(totalHeaderSize, bytesConsumed);
-        assertEquals(1, releasable1.refCount());
-
-        final Header header = (Header) fragments.get(0);
-        assertEquals(requestId, header.getRequestId());
-        assertEquals(handshakeCompat, header.getVersion());
-        assertTrue(header.isCompressed());
-        assertTrue(header.isHandshake());
-        assertTrue(header.isRequest());
-        // TODO: On 9.0 this will be true because all compatible versions with contain the variable header int
-        assertTrue(header.needsToReadVariableHeader());
-        fragments.clear();
+        try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
+            final BytesReference bytes = message.serialize(os);
+            int totalHeaderSize = TcpHeader.headerSize(handshakeCompat);
+
+            InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
+            final ArrayList<Object> fragments = new ArrayList<>();
+            final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
+            int bytesConsumed = decoder.decode(releasable1, fragments::add);
+            assertEquals(totalHeaderSize, bytesConsumed);
+            assertEquals(1, releasable1.refCount());
+
+            final Header header = (Header) fragments.get(0);
+            assertEquals(requestId, header.getRequestId());
+            assertEquals(handshakeCompat, header.getVersion());
+            assertTrue(header.isCompressed());
+            assertTrue(header.isHandshake());
+            assertTrue(header.isRequest());
+            // TODO: On 9.0 this will be true because all compatible versions with contain the variable header int
+            assertTrue(header.needsToReadVariableHeader());
+            fragments.clear();
+        }
     }
 
     public void testVersionIncompatibilityDecodeException() throws IOException {
@@ -325,14 +339,16 @@ public class InboundDecoderTests extends ESTestCase {
             Compression.Scheme.DEFLATE
         );
 
-        final BytesReference bytes = message.serialize(new RecyclerBytesStreamOutput(recycler));
+        try (RecyclerBytesStreamOutput os = new RecyclerBytesStreamOutput(recycler)) {
+            final BytesReference bytes = message.serialize(os);
 
-        InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
-        final ArrayList<Object> fragments = new ArrayList<>();
-        final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
-        expectThrows(IllegalStateException.class, () -> decoder.decode(releasable1, fragments::add));
-        // No bytes are retained
-        assertEquals(1, releasable1.refCount());
+            InboundDecoder decoder = new InboundDecoder(Version.CURRENT, recycler);
+            final ArrayList<Object> fragments = new ArrayList<>();
+            final ReleasableBytesReference releasable1 = ReleasableBytesReference.wrap(bytes);
+            expectThrows(IllegalStateException.class, () -> decoder.decode(releasable1, fragments::add));
+            // No bytes are retained
+            assertEquals(1, releasable1.refCount());
+        }
     }
 
     public void testEnsureVersionCompatibility() throws IOException {