فهرست منبع

Stop Ignoring Exceptions on Close in Network Code (#69665)

We should not be ignoring and suppressing exceptions on releasing
network resources quietly in these spots.

Co-authored-by: David Turner <david.turner@elastic.co>
Armin Braun 4 سال پیش
والد
کامیت
bb77ab46e0

+ 1 - 3
server/src/main/java/org/elasticsearch/common/lease/Releasable.java → libs/core/src/main/java/org/elasticsearch/common/lease/Releasable.java

@@ -8,12 +8,10 @@
 
 package org.elasticsearch.common.lease;
 
-import org.elasticsearch.ElasticsearchException;
-
 import java.io.Closeable;
 
 /**
- * Specialization of {@link AutoCloseable} that may only throw an {@link ElasticsearchException}.
+ * Specialization of {@link Closeable} that may only throw a {@link RuntimeException}.
  */
 public interface Releasable extends Closeable {
 

+ 20 - 9
server/src/main/java/org/elasticsearch/common/lease/Releasables.java → libs/core/src/main/java/org/elasticsearch/common/lease/Releasables.java

@@ -50,23 +50,34 @@ public enum Releasables {
         close(Arrays.asList(releasables));
     }
 
-    /** Release the provided {@link Releasable}s, ignoring exceptions. */
-    public static void closeWhileHandlingException(Iterable<? extends Releasable> releasables) {
-        close(releasables, true);
+    /** Release the provided {@link Releasable}s expecting no exception to by thrown by any of them. */
+    public static void closeExpectNoException(Releasable... releasables) {
+        try {
+            close(releasables);
+        } catch (RuntimeException e) {
+            assert false : e;
+            throw e;
+        }
+    }
+
+    /** Release the provided {@link Releasable} expecting no exception to by thrown. */
+    public static void closeExpectNoException(Releasable releasable) {
+        try {
+            close(releasable);
+        } catch (RuntimeException e) {
+            assert false : e;
+            throw e;
+        }
     }
 
     /** Release the provided {@link Releasable}s, ignoring exceptions. */
     public static void closeWhileHandlingException(Releasable... releasables) {
-        closeWhileHandlingException(Arrays.asList(releasables));
+        close(Arrays.asList(releasables), true);
     }
 
     /** Release the provided {@link Releasable}s, ignoring exceptions if <code>success</code> is {@code false}. */
     public static void close(boolean success, Iterable<Releasable> releasables) {
-        if (success) {
-            close(releasables);
-        } else {
-            closeWhileHandlingException(releasables);
-        }
+        close(releasables, success == false);
     }
 
     /** Release the provided {@link Releasable}s, ignoring exceptions if <code>success</code> is {@code false}. */

+ 7 - 6
libs/nio/src/main/java/org/elasticsearch/nio/Page.java

@@ -8,12 +8,13 @@
 
 package org.elasticsearch.nio;
 
+import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
 
-import java.io.Closeable;
 import java.nio.ByteBuffer;
 
-public class Page implements Closeable {
+public class Page implements Releasable {
 
     private final ByteBuffer byteBuffer;
     // This is reference counted as some implementations want to retain the byte pages by calling
@@ -22,7 +23,7 @@ public class Page implements Closeable {
     // released.
     private final RefCountedCloseable refCountedCloseable;
 
-    public Page(ByteBuffer byteBuffer, Runnable closeable) {
+    public Page(ByteBuffer byteBuffer, Releasable closeable) {
         this(byteBuffer, new RefCountedCloseable(closeable));
     }
 
@@ -61,16 +62,16 @@ public class Page implements Closeable {
 
     private static class RefCountedCloseable extends AbstractRefCounted {
 
-        private final Runnable closeable;
+        private final Releasable closeable;
 
-        private RefCountedCloseable(Runnable closeable) {
+        private RefCountedCloseable(Releasable closeable) {
             super("byte array page");
             this.closeable = closeable;
         }
 
         @Override
         protected void closeInternal() {
-            closeable.run();
+            Releasables.closeExpectNoException(closeable);
         }
     }
 }

+ 3 - 2
libs/nio/src/test/java/org/elasticsearch/nio/SocketChannelContextTests.java

@@ -9,6 +9,7 @@
 package org.elasticsearch.nio;
 
 import org.elasticsearch.common.SuppressForbidden;
+import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.test.ESTestCase;
 import org.junit.Before;
 import org.mockito.ArgumentCaptor;
@@ -328,13 +329,13 @@ public class SocketChannelContextTests extends ESTestCase {
         try (SocketChannel realChannel = SocketChannel.open()) {
             when(channel.getRawChannel()).thenReturn(realChannel);
             when(channel.isOpen()).thenReturn(true);
-            Runnable closer = mock(Runnable.class);
+            Releasable closer = mock(Releasable.class);
             IntFunction<Page> pageAllocator = (n) -> new Page(ByteBuffer.allocate(n), closer);
             InboundChannelBuffer buffer = new InboundChannelBuffer(pageAllocator);
             buffer.ensureCapacity(1);
             TestSocketChannelContext context = new TestSocketChannelContext(channel, selector, exceptionHandler, handler, buffer);
             context.closeFromSelector();
-            verify(closer).run();
+            verify(closer).close();
         }
     }
 

+ 1 - 1
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4MessageChannelHandler.java

@@ -109,7 +109,7 @@ final class Netty4MessageChannelHandler extends ChannelDuplexHandler {
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         assert Transports.assertDefaultThreadContext(transport.getThreadPool().getThreadContext());
         doFlush(ctx);
-        Releasables.closeWhileHandlingException(pipeline);
+        Releasables.closeExpectNoException(pipeline);
         super.channelInactive(ctx);
     }
 

+ 1 - 1
plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/PageAllocator.java

@@ -29,7 +29,7 @@ public class PageAllocator implements IntFunction<Page> {
     public Page apply(int length) {
         if (length >= RECYCLE_LOWER_THRESHOLD && length <= PageCacheRecycler.BYTE_PAGE_SIZE){
             Recycler.V<byte[]> bytePage = recycler.bytePage(false);
-            return new Page(ByteBuffer.wrap(bytePage.v(), 0, length), bytePage::close);
+            return new Page(ByteBuffer.wrap(bytePage.v(), 0, length), bytePage);
         } else {
             return new Page(ByteBuffer.allocate(length), () -> {});
         }

+ 2 - 4
plugins/transport-nio/src/main/java/org/elasticsearch/transport/nio/TcpReadWriteHandler.java

@@ -15,7 +15,6 @@ import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.PageCacheRecycler;
-import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.nio.BytesWriteHandler;
 import org.elasticsearch.nio.InboundChannelBuffer;
 import org.elasticsearch.nio.Page;
@@ -48,7 +47,7 @@ public class TcpReadWriteHandler extends BytesWriteHandler {
         for (int i = 0; i < pages.length; ++i) {
             references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer());
         }
-        Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages);
+        Releasable releasable = pages.length == 1 ? pages[0] : () -> Releasables.closeExpectNoException(pages);
         try (ReleasableBytesReference reference = new ReleasableBytesReference(CompositeBytesReference.of(references), releasable)) {
             pipeline.handleBytes(channel, reference);
             return reference.length();
@@ -57,7 +56,6 @@ public class TcpReadWriteHandler extends BytesWriteHandler {
 
     @Override
     public void close() {
-        Releasables.closeWhileHandlingException(pipeline);
-        super.close();
+        Releasables.closeExpectNoException(pipeline, super::close);
     }
 }

+ 2 - 1
server/src/main/java/org/elasticsearch/common/bytes/ReleasableBytesReference.java

@@ -12,6 +12,7 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.BytesRefIterator;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
 import org.elasticsearch.common.util.concurrent.RefCounted;
 import org.elasticsearch.common.xcontent.XContentBuilder;
@@ -191,7 +192,7 @@ public final class ReleasableBytesReference implements RefCounted, Releasable, B
 
         @Override
         protected void closeInternal() {
-            releasable.close();
+            Releasables.closeExpectNoException(releasable);
         }
     }
 }

+ 1 - 1
server/src/main/java/org/elasticsearch/common/util/BigArrays.java

@@ -451,7 +451,7 @@ public class BigArrays {
             success = true;
         } finally {
             if (success == false) {
-                Releasables.closeWhileHandlingException(array);
+                Releasables.closeExpectNoException(array);
             }
         }
         return array;

+ 1 - 1
server/src/main/java/org/elasticsearch/http/DefaultRestChannel.java

@@ -75,7 +75,7 @@ public class DefaultRestChannel extends AbstractRestChannel implements RestChann
     @Override
     public void sendResponse(RestResponse restResponse) {
         // We're sending a response so we know we won't be needing the request content again and release it
-        Releasables.closeWhileHandlingException(httpRequest::release);
+        httpRequest.release();
 
         final ArrayList<Releasable> toClose = new ArrayList<>(3);
         if (HttpUtils.shouldCloseConnection(httpRequest)) {

+ 8 - 5
server/src/main/java/org/elasticsearch/transport/InboundDecoder.java

@@ -13,8 +13,8 @@ import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.lease.Releasable;
+import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.util.PageCacheRecycler;
-import org.elasticsearch.core.internal.io.IOUtils;
 
 import java.io.IOException;
 import java.util.function.Consumer;
@@ -116,10 +116,13 @@ public class InboundDecoder implements Releasable {
     }
 
     private void cleanDecodeState() {
-        IOUtils.closeWhileHandlingException(decompressor);
-        decompressor = null;
-        totalNetworkSize = -1;
-        bytesConsumed = 0;
+        try {
+            Releasables.closeExpectNoException(decompressor);
+        } finally {
+            decompressor = null;
+            totalNetworkSize = -1;
+            bytesConsumed = 0;
+        }
     }
 
     private void decompress(ReleasableBytesReference content) throws IOException {

+ 7 - 3
server/src/main/java/org/elasticsearch/transport/InboundMessage.java

@@ -8,10 +8,10 @@
 
 package org.elasticsearch.transport;
 
+import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.lease.Releasable;
-import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.core.internal.io.IOUtils;
 
 import java.io.IOException;
@@ -91,8 +91,12 @@ public class InboundMessage implements Releasable {
 
     @Override
     public void close() {
-        IOUtils.closeWhileHandlingException(streamInput);
-        Releasables.closeWhileHandlingException(content, breakerRelease);
+        try {
+            IOUtils.close(streamInput, content, breakerRelease);
+        } catch (Exception e) {
+            assert false : e;
+            throw new ElasticsearchException(e);
+        }
     }
 
     @Override

+ 2 - 4
server/src/main/java/org/elasticsearch/transport/InboundPipeline.java

@@ -58,9 +58,7 @@ public class InboundPipeline implements Releasable {
     @Override
     public void close() {
         isClosed = true;
-        Releasables.closeWhileHandlingException(decoder, aggregator);
-        Releasables.closeWhileHandlingException(pending);
-        pending.clear();
+        Releasables.closeExpectNoException(decoder, aggregator, () -> Releasables.close(pending), pending::clear);
     }
 
     public void handleBytes(TcpChannel channel, ReleasableBytesReference reference) throws IOException {
@@ -152,7 +150,7 @@ public class InboundPipeline implements Releasable {
                 bytesReferences[index] = pendingReference.retain();
                 ++index;
             }
-            final Releasable releasable = () -> Releasables.closeWhileHandlingException(bytesReferences);
+            final Releasable releasable = () -> Releasables.closeExpectNoException(bytesReferences);
             return new ReleasableBytesReference(CompositeBytesReference.of(bytesReferences), releasable);
         }
     }

+ 2 - 2
server/src/main/java/org/elasticsearch/transport/TransportDecompressor.java

@@ -14,16 +14,16 @@ import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.ReleasableBytesReference;
 import org.elasticsearch.common.compress.CompressorFactory;
+import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.recycler.Recycler;
 import org.elasticsearch.common.util.PageCacheRecycler;
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.zip.DataFormatException;
 import java.util.zip.Inflater;
 
-public class TransportDecompressor implements Closeable {
+public class TransportDecompressor implements Releasable {
 
     private final Inflater inflater;
     private final PageCacheRecycler recycler;

+ 4 - 6
test/framework/src/main/java/org/elasticsearch/transport/nio/MockNioTransport.java

@@ -29,7 +29,6 @@ import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.PageCacheRecycler;
 import org.elasticsearch.common.util.concurrent.AbstractRefCounted;
-import org.elasticsearch.core.internal.io.IOUtils;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.nio.BytesChannelContext;
 import org.elasticsearch.nio.BytesWriteHandler;
@@ -214,7 +213,7 @@ public class MockNioTransport extends TcpTransport {
                     return new Page(ByteBuffer.allocate(length), () -> {});
                 } else {
                     Recycler.V<byte[]> bytes = pageCacheRecycler.bytePage(false);
-                    return new Page(ByteBuffer.wrap(bytes.v(), 0, length), bytes::close);
+                    return new Page(ByteBuffer.wrap(bytes.v(), 0, length), bytes);
                 }
             };
             MockTcpReadWriteHandler readWriteHandler = new MockTcpReadWriteHandler(nioChannel, pageCacheRecycler, MockNioTransport.this);
@@ -274,7 +273,7 @@ public class MockNioTransport extends TcpTransport {
         protected void closeInternal() {
             boolean leakReleased = leak.close(releasable);
             assert leakReleased : "leak should not have been released already";
-            releasable.close();
+            Releasables.closeExpectNoException(releasable);
         }
 
         @Override
@@ -306,7 +305,7 @@ public class MockNioTransport extends TcpTransport {
             for (int i = 0; i < pages.length; ++i) {
                 references[i] = BytesReference.fromByteBuffer(pages[i].byteBuffer());
             }
-            Releasable releasable = () -> IOUtils.closeWhileHandlingException(pages);
+            Releasable releasable = pages.length == 1 ? pages[0] : () -> Releasables.closeExpectNoException(pages);
             try (ReleasableBytesReference reference =
                          new ReleasableBytesReference(CompositeBytesReference.of(references), new LeakAwareRefCounted(releasable))) {
                 pipeline.handleBytes(channel, reference);
@@ -316,8 +315,7 @@ public class MockNioTransport extends TcpTransport {
 
         @Override
         public void close() {
-            Releasables.closeWhileHandlingException(pipeline);
-            super.close();
+            Releasables.closeExpectNoException(pipeline, super::close);
         }
     }