Преглед изворни кода

Fail connection attempts earlier in tests (#43320)

Today the `DisruptibleMockTransport` always allows a connection to a node to be
established, and then fails requests sent to that node such as the subsequent
handshake. Since #42342, we log handshake failures on an open connection as a
warning, and this makes the test logs rather noisy. This change fails the
connection attempt first, avoiding these unrealistic warnings.
David Turner пре 6 година
родитељ
комит
e250973e72

+ 3 - 2
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -862,13 +862,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
         }
 
         public void clearNetworkDisruptions() {
-            disruptedLinks.disconnected.forEach(nodeName -> {
+            final Set<String> disconnectedNodes = new HashSet<>(disruptedLinks.disconnected);
+            disruptedLinks.clear();
+            disconnectedNodes.forEach(nodeName -> {
                 if (testClusterNodes.nodes.containsKey(nodeName)) {
                     final DiscoveryNode node = testClusterNodes.nodes.get(nodeName).node;
                     testClusterNodes.nodes.values().forEach(n -> n.transportService.getConnectionManager().openConnection(node, null));
                 }
             });
-            disruptedLinks.clear();
         }
 
         private NetworkDisruption.DisruptedLinks getDisruption() {

+ 10 - 4
test/framework/src/main/java/org/elasticsearch/test/disruption/DisruptableMockTransport.java

@@ -87,8 +87,14 @@ public abstract class DisruptableMockTransport extends MockTransport {
 
     @Override
     public Releasable openConnection(DiscoveryNode node, ConnectionProfile profile, ActionListener<Connection> listener) {
-        final Optional<DisruptableMockTransport> matchingTransport = getDisruptableMockTransport(node.getAddress());
-        if (matchingTransport.isPresent()) {
+        final Optional<DisruptableMockTransport> optionalMatchingTransport = getDisruptableMockTransport(node.getAddress());
+        if (optionalMatchingTransport.isPresent()) {
+            final DisruptableMockTransport matchingTransport = optionalMatchingTransport.get();
+            final ConnectionStatus connectionStatus = getConnectionStatus(matchingTransport.getLocalNode());
+            if (connectionStatus != ConnectionStatus.CONNECTED) {
+                throw new ConnectTransportException(node, "node [" + node + "] is [" + connectionStatus + "] not [CONNECTED]");
+            }
+
             listener.onResponse(new CloseableConnection() {
                 @Override
                 public DiscoveryNode getNode() {
@@ -98,12 +104,12 @@ public abstract class DisruptableMockTransport extends MockTransport {
                 @Override
                 public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
                     throws TransportException {
-                    onSendRequest(requestId, action, request, matchingTransport.get());
+                    onSendRequest(requestId, action, request, matchingTransport);
                 }
             });
             return () -> {};
         } else {
-            throw new ConnectTransportException(node, "node " + node + " does not exist");
+            throw new ConnectTransportException(node, "node [" + node + "] does not exist");
         }
     }
 

+ 25 - 0
test/framework/src/test/java/org/elasticsearch/test/disruption/DisruptableMockTransportTests.java

@@ -30,6 +30,7 @@ import org.elasticsearch.node.Node;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.disruption.DisruptableMockTransport.ConnectionStatus;
 import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.ConnectTransportException;
 import org.elasticsearch.transport.TransportChannel;
 import org.elasticsearch.transport.TransportException;
 import org.elasticsearch.transport.TransportRequest;
@@ -53,6 +54,7 @@ import java.util.function.Consumer;
 
 import static org.elasticsearch.transport.TransportService.NOOP_TRANSPORT_INTERCEPTOR;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.endsWith;
 
 public class DisruptableMockTransportTests extends ESTestCase {
 
@@ -399,4 +401,27 @@ public class DisruptableMockTransportTests extends ESTestCase {
         deterministicTaskQueue.runAllRunnableTasks();
         assertTrue(responseHandlerCalled.get());
     }
+
+    public void testBrokenLinkFailsToConnect() {
+        service1.disconnectFromNode(node2);
+
+        disconnectedLinks.add(Tuple.tuple(node1, node2));
+        assertThat(expectThrows(ConnectTransportException.class, () -> service1.connectToNode(node2)).getMessage(),
+            endsWith("is [DISCONNECTED] not [CONNECTED]"));
+        disconnectedLinks.clear();
+
+        blackholedLinks.add(Tuple.tuple(node1, node2));
+        assertThat(expectThrows(ConnectTransportException.class, () -> service1.connectToNode(node2)).getMessage(),
+            endsWith("is [BLACK_HOLE] not [CONNECTED]"));
+        blackholedLinks.clear();
+
+        blackholedRequestLinks.add(Tuple.tuple(node1, node2));
+        assertThat(expectThrows(ConnectTransportException.class, () -> service1.connectToNode(node2)).getMessage(),
+            endsWith("is [BLACK_HOLE_REQUESTS_ONLY] not [CONNECTED]"));
+        blackholedRequestLinks.clear();
+
+        final DiscoveryNode node3 = new DiscoveryNode("node3", buildNewFakeTransportAddress(), Version.CURRENT);
+        assertThat(expectThrows(ConnectTransportException.class, () -> service1.connectToNode(node3)).getMessage(),
+            endsWith("does not exist"));
+    }
 }