Browse Source

Make Connection Future Err. Handling more Resilient (#42781)

* There were a number of possible (runtime-) exceptions that could be raised in the adjusted code and prevent resolving the listener
* Relates #42350
Armin Braun 6 years ago
parent
commit
c6e1eb7b95
1 changed files with 24 additions and 30 deletions
  1. 24 30
      server/src/main/java/org/elasticsearch/transport/TcpTransport.java

+ 24 - 30
server/src/main/java/org/elasticsearch/transport/TcpTransport.java

@@ -933,34 +933,20 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
             if (countDown.countDown()) {
                 final TcpChannel handshakeChannel = channels.get(0);
                 try {
-                    executeHandshake(node, handshakeChannel, connectionProfile, new ActionListener<Version>() {
-                        @Override
-                        public void onResponse(Version version) {
-                            NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
-                            long relativeMillisTime = threadPool.relativeTimeInMillis();
-                            nodeChannels.channels.forEach(ch -> {
-                                // Mark the channel init time
-                                ch.getChannelStats().markAccessed(relativeMillisTime);
-                                ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
-                            });
-                            keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
-                            listener.onResponse(nodeChannels);
-                        }
-
-                        @Override
-                        public void onFailure(Exception e) {
-                            CloseableChannel.closeChannels(channels, false);
-
-                            if (e instanceof ConnectTransportException) {
-                                listener.onFailure(e);
-                            } else {
-                                listener.onFailure(new ConnectTransportException(node, "general node connection failure", e));
-                            }
-                        }
-                    });
+                    executeHandshake(node, handshakeChannel, connectionProfile, ActionListener.wrap(version -> {
+                        NodeChannels nodeChannels = new NodeChannels(node, channels, connectionProfile, version);
+                        long relativeMillisTime = threadPool.relativeTimeInMillis();
+                        nodeChannels.channels.forEach(ch -> {
+                            // Mark the channel init time
+                            ch.getChannelStats().markAccessed(relativeMillisTime);
+                            ch.addCloseListener(ActionListener.wrap(nodeChannels::close));
+                        });
+                        keepAlive.registerNodeConnection(nodeChannels.channels, connectionProfile);
+                        listener.onResponse(nodeChannels);
+                    }, e -> closeAndFail(e instanceof ConnectTransportException ?
+                        e : new ConnectTransportException(node, "general node connection failure", e))));
                 } catch (Exception ex) {
-                    CloseableChannel.closeChannels(channels, false);
-                    listener.onFailure(ex);
+                    closeAndFail(ex);
                 }
             }
         }
@@ -968,15 +954,23 @@ public abstract class TcpTransport extends AbstractLifecycleComponent implements
         @Override
         public void onFailure(Exception ex) {
             if (countDown.fastForward()) {
-                CloseableChannel.closeChannels(channels, false);
-                listener.onFailure(new ConnectTransportException(node, "connect_exception", ex));
+                closeAndFail(new ConnectTransportException(node, "connect_exception", ex));
             }
         }
 
         public void onTimeout() {
             if (countDown.fastForward()) {
+                closeAndFail(new ConnectTransportException(node, "connect_timeout[" + connectionProfile.getConnectTimeout() + "]"));
+            }
+        }
+
+        private void closeAndFail(Exception e) {
+            try {
                 CloseableChannel.closeChannels(channels, false);
-                listener.onFailure(new ConnectTransportException(node, "connect_timeout[" + connectionProfile.getConnectTimeout() + "]"));
+            } catch (Exception ex) {
+                e.addSuppressed(ex);
+            } finally {
+                listener.onFailure(e);
             }
         }
     }