فهرست منبع

Remove exception-mangling in connect/close listeners (#127954)

The close-listeners are never completed exceptionally today so they do
not need the exception mangling of a `ListenableFuture`. The connect-
and remove-listeners sometimes see an exception if the connection
attempt fails, but they also do not need any exception-mangling.

This commit removes the exception-mangling by replacing these
`ListenableFuture` instances with `SubscribableListener` ones.
David Turner 5 ماه پیش
والد
کامیت
3504c27e7d

+ 2 - 2
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpChannel.java

@@ -12,7 +12,7 @@ package org.elasticsearch.http.netty4;
 import io.netty.channel.Channel;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.http.HttpChannel;
 import org.elasticsearch.http.HttpResponse;
 
@@ -25,7 +25,7 @@ import static org.elasticsearch.transport.netty4.Netty4Utils.safeWriteAndFlush;
 public class Netty4HttpChannel implements HttpChannel {
 
     private final Channel channel;
-    private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
+    private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
 
     Netty4HttpChannel(Channel channel) {
         this.channel = channel;

+ 2 - 2
modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpServerChannel.java

@@ -12,7 +12,7 @@ package org.elasticsearch.http.netty4;
 import io.netty.channel.Channel;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.http.HttpServerChannel;
 
 import java.net.InetSocketAddress;
@@ -22,7 +22,7 @@ import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;
 public class Netty4HttpServerChannel implements HttpServerChannel {
 
     private final Channel channel;
-    private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
+    private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
 
     Netty4HttpServerChannel(Channel channel) {
         this.channel = channel;

+ 3 - 4
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpChannel.java

@@ -14,8 +14,8 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelOption;
 
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.common.bytes.BytesReference;
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.core.IOUtils;
 import org.elasticsearch.core.Releasables;
 import org.elasticsearch.transport.TcpChannel;
@@ -30,8 +30,8 @@ public class Netty4TcpChannel implements TcpChannel {
     private final Channel channel;
     private final boolean isServer;
     private final String profile;
-    private final ListenableFuture<Void> connectContext;
-    private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
+    private final SubscribableListener<Void> connectContext = new SubscribableListener<>();
+    private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
     private final ChannelStats stats = new ChannelStats();
     private final boolean rstOnClose;
     /**
@@ -43,7 +43,6 @@ public class Netty4TcpChannel implements TcpChannel {
         this.channel = channel;
         this.isServer = isServer;
         this.profile = profile;
-        this.connectContext = new ListenableFuture<>();
         this.rstOnClose = rstOnClose;
         addListener(connectFuture, connectContext);
         addListener(this.channel.closeFuture(), new ActionListener<>() {

+ 2 - 2
modules/transport-netty4/src/main/java/org/elasticsearch/transport/netty4/Netty4TcpServerChannel.java

@@ -12,7 +12,7 @@ package org.elasticsearch.transport.netty4;
 import io.netty.channel.Channel;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.transport.TcpServerChannel;
 
 import java.net.InetSocketAddress;
@@ -22,7 +22,7 @@ import static org.elasticsearch.transport.netty4.Netty4Utils.addListener;
 public class Netty4TcpServerChannel implements TcpServerChannel {
 
     private final Channel channel;
-    private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
+    private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
 
     Netty4TcpServerChannel(Channel channel) {
         this.channel = channel;

+ 3 - 3
server/src/main/java/org/elasticsearch/transport/CloseableConnection.java

@@ -10,7 +10,7 @@
 package org.elasticsearch.transport;
 
 import org.elasticsearch.action.ActionListener;
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.core.AbstractRefCounted;
 
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -20,8 +20,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public abstract class CloseableConnection extends AbstractRefCounted implements Transport.Connection {
 
-    private final ListenableFuture<Void> closeContext = new ListenableFuture<>();
-    private final ListenableFuture<Void> removeContext = new ListenableFuture<>();
+    private final SubscribableListener<Void> closeContext = new SubscribableListener<>();
+    private final SubscribableListener<Void> removeContext = new SubscribableListener<>();
 
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final AtomicBoolean removed = new AtomicBoolean(false);

+ 8 - 8
server/src/main/java/org/elasticsearch/transport/ClusterConnectionManager.java

@@ -12,11 +12,11 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.support.ContextPreservingActionListener;
+import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.common.ReferenceDocs;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
-import org.elasticsearch.common.util.concurrent.ListenableFuture;
 import org.elasticsearch.common.util.concurrent.RunOnce;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
 import org.elasticsearch.core.AbstractRefCounted;
@@ -43,7 +43,7 @@ public class ClusterConnectionManager implements ConnectionManager {
     private static final Logger logger = LogManager.getLogger(ClusterConnectionManager.class);
 
     private final ConcurrentMap<DiscoveryNode, Transport.Connection> connectedNodes = ConcurrentCollections.newConcurrentMap();
-    private final ConcurrentMap<DiscoveryNode, ListenableFuture<Transport.Connection>> pendingConnections = ConcurrentCollections
+    private final ConcurrentMap<DiscoveryNode, SubscribableListener<Transport.Connection>> pendingConnections = ConcurrentCollections
         .newConcurrentMap();
     private final AbstractRefCounted connectingRefCounter = AbstractRefCounted.of(this::pendingConnectionsComplete);
 
@@ -184,8 +184,8 @@ public class ClusterConnectionManager implements ConnectionManager {
             return;
         }
 
-        final ListenableFuture<Transport.Connection> currentListener = new ListenableFuture<>();
-        final ListenableFuture<Transport.Connection> existingListener = pendingConnections.putIfAbsent(node, currentListener);
+        final SubscribableListener<Transport.Connection> currentListener = new SubscribableListener<>();
+        final SubscribableListener<Transport.Connection> existingListener = pendingConnections.putIfAbsent(node, currentListener);
         if (existingListener != null) {
             try {
                 // wait on previous entry to complete connection attempt
@@ -203,7 +203,7 @@ public class ClusterConnectionManager implements ConnectionManager {
         // extra connection to the node. We could _just_ check here, but checking up front skips the work to mark the connection as pending.
         final Transport.Connection existingConnectionRecheck = connectedNodes.get(node);
         if (existingConnectionRecheck != null) {
-            ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
+            var future = pendingConnections.remove(node);
             assert future == currentListener : "Listener in pending map is different than the expected listener";
             connectingRefCounter.decRef();
             future.onResponse(existingConnectionRecheck);
@@ -257,7 +257,7 @@ public class ClusterConnectionManager implements ConnectionManager {
                             }
                         }
                     } finally {
-                        ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
+                        var future = pendingConnections.remove(node);
                         assert future == currentListener : "Listener in pending map is different than the expected listener";
                         managerRefs.decRef();
                         releaseOnce.run();
@@ -387,9 +387,9 @@ public class ClusterConnectionManager implements ConnectionManager {
         DiscoveryNode node,
         RunOnce releaseOnce,
         Exception e,
-        ListenableFuture<Transport.Connection> expectedListener
+        SubscribableListener<Transport.Connection> expectedListener
     ) {
-        ListenableFuture<Transport.Connection> future = pendingConnections.remove(node);
+        final var future = pendingConnections.remove(node);
         releaseOnce.run();
         if (future != null) {
             assert future == expectedListener : "Listener in pending map is different than the expected listener";