瀏覽代碼

Ensure network connections are restored after disruptions (#23135)

With #22977, network disruption also disconnects nodes from the transport service. That has the side effect that when the disruption is healed, the disconnected node stay disconnected until the `NodeConnectionsService` restores the connection. This can take too long for the tests. This PR adds logic to the cluster healing to restore connections immediately. 

See https://elasticsearch-ci.elastic.co/job/elastic+elasticsearch+master+multijob-unix-compatibility/os=debian/611/console for an example failure.
Boaz Leskes 8 年之前
父節點
當前提交
f83db675c8

+ 13 - 1
test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java

@@ -23,7 +23,6 @@ import com.carrotsearch.randomizedtesting.RandomizedContext;
 import com.carrotsearch.randomizedtesting.annotations.TestGroup;
 import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
-
 import org.apache.http.HttpHost;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.LuceneTestCase;
@@ -126,6 +125,7 @@ import org.elasticsearch.search.MockSearchService;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.test.client.RandomizingClient;
 import org.elasticsearch.test.discovery.TestZenDiscovery;
+import org.elasticsearch.test.disruption.NetworkDisruption;
 import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
 import org.elasticsearch.test.store.MockFSIndexStore;
 import org.elasticsearch.test.transport.MockTransportService;
@@ -1169,6 +1169,18 @@ public abstract class ESIntegTestCase extends ESTestCase {
                 + stateResponse.getState());
         }
         assertThat(clusterHealthResponse.isTimedOut(), is(false));
+        ensureFullyConnectedCluster();
+    }
+
+    /**
+     * Ensures that all nodes in the cluster are connected to each other.
+     *
+     * Some network disruptions may leave nodes that are not the master disconnected from each other.
+     * {@link org.elasticsearch.cluster.NodeConnectionsService} will eventually reconnect but it's
+     * handy to be able to ensure this happens faster
+     */
+    protected void ensureFullyConnectedCluster() {
+        NetworkDisruption.ensureFullyConnectedCluster(internalCluster());
     }
 
     /**

+ 26 - 0
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruption.java

@@ -21,6 +21,9 @@ package org.elasticsearch.test.disruption;
 
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import org.apache.logging.log4j.Logger;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.NodeConnectionsService;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.set.Sets;
@@ -79,7 +82,30 @@ public class NetworkDisruption implements ServiceDisruptionScheme {
     @Override
     public void removeAndEnsureHealthy(InternalTestCluster cluster) {
         removeFromCluster(cluster);
+        ensureHealthy(cluster);
+    }
+
+    /**
+     * ensures the cluster is healthy after the disruption
+     */
+    public void ensureHealthy(InternalTestCluster cluster) {
+        assert activeDisruption == false;
         ensureNodeCount(cluster);
+        ensureFullyConnectedCluster(cluster);
+    }
+
+    /**
+     * Ensures that all nodes in the cluster are connected to each other.
+     *
+     * Some network disruptions may leave nodes that are not the master disconnected from each other.
+     * {@link org.elasticsearch.cluster.NodeConnectionsService} will eventually reconnect but it's
+     * handy to be able to ensure this happens faster
+     */
+    public static void ensureFullyConnectedCluster(InternalTestCluster cluster) {
+        for (String node: cluster.getNodeNames()) {
+            ClusterState stateOnNode = cluster.getInstance(ClusterService.class, node).state();
+            cluster.getInstance(NodeConnectionsService.class, node).connectToNodes(stateOnNode.nodes());
+        }
     }
 
     protected void ensureNodeCount(InternalTestCluster cluster) {

+ 38 - 0
test/framework/src/main/java/org/elasticsearch/test/disruption/NetworkDisruptionIT.java

@@ -25,10 +25,15 @@ import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.disruption.NetworkDisruption.NetworkUnresponsive;
 import org.elasticsearch.test.disruption.NetworkDisruption.TwoPartitions;
 import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.TransportService;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 
 public class NetworkDisruptionIT extends ESIntegTestCase {
     @Override
@@ -46,4 +51,37 @@ public class NetworkDisruptionIT extends ESIntegTestCase {
         internalCluster().stopRandomNode(InternalTestCluster.nameFilter(nodeNames[0]));
         internalCluster().clearDisruptionScheme();
     }
+
+    public void testNetworkPartitionRemovalRestoresConnections() throws IOException {
+        Set<String> nodes = new HashSet<>();
+        nodes.addAll(Arrays.asList(internalCluster().getNodeNames()));
+        nodes.remove(internalCluster().getMasterName());
+        if (nodes.size() <= 2) {
+            internalCluster().ensureAtLeastNumDataNodes(3 - nodes.size());
+            nodes.addAll(Arrays.asList(internalCluster().getNodeNames()));
+            nodes.remove(internalCluster().getMasterName());
+        }
+        Set<String> side1 = new HashSet<>(randomSubsetOf(randomIntBetween(1, nodes.size() - 1), nodes));
+        Set<String> side2 = new HashSet<>(nodes);
+        side2.removeAll(side1);
+        assertThat(side2.size(), greaterThanOrEqualTo(1));
+        NetworkDisruption networkDisruption = new NetworkDisruption(new TwoPartitions(side1, side2),
+            new NetworkDisruption.NetworkDisconnect());
+        internalCluster().setDisruptionScheme(networkDisruption);
+        networkDisruption.startDisrupting();
+        // sends some requests
+        client(randomFrom(side1)).admin().cluster().prepareNodesInfo().get();
+        client(randomFrom(side2)).admin().cluster().prepareNodesInfo().get();
+        internalCluster().clearDisruptionScheme();
+        // check all connections are restore
+        for (String nodeA : side1) {
+            for (String nodeB : side2) {
+                TransportService serviceA = internalCluster().getInstance(TransportService.class, nodeA);
+                TransportService serviceB = internalCluster().getInstance(TransportService.class, nodeB);
+                assertTrue(nodeA + " is not connected to " + nodeB, serviceA.nodeConnected(serviceB.getLocalNode()));
+                assertTrue(nodeB + " is not connected to " + nodeA, serviceB.nodeConnected(serviceA.getLocalNode()));
+            }
+        }
+    }
+
 }