瀏覽代碼

Keep NodeConnectionsService in sync with current nodes in the cluster state (#22509)

The NodeConnectionsService currently determines which nodes to connect to / disconnect from by inspecting cluster state changes and connecting to added nodes / disconnecting from removed nodes. When a master steps down (for example due to another master-eligible node shutting down which brings the number of master-eligible nodes below minimum_master_master), and the connection to other existing nodes was dropped while pinging, however, the connection to these nodes is not re-established while publishing the first cluster state that establishes the node as master.
This commit changes the NodeConnectionsService connect / disconnect logic to always rely on the state that is to be / was published, looking not only at the added / removed nodes, but validating that exactly all nodes that are currently registered in NodeConnectionsService are connected (corresponds to a NOOP if the node is already connected).
Yannick Welsch 8 年之前
父節點
當前提交
9fc1a735cc

+ 14 - 6
core/src/main/java/org/elasticsearch/cluster/NodeConnectionsService.java

@@ -36,7 +36,9 @@ import org.elasticsearch.discovery.zen.NodesFaultDetection;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ScheduledFuture;
 
@@ -76,20 +78,26 @@ public class NodeConnectionsService extends AbstractLifecycleComponent {
         this.reconnectInterval = NodeConnectionsService.CLUSTER_NODE_RECONNECT_INTERVAL_SETTING.get(settings);
     }
 
-    public void connectToNodes(List<DiscoveryNode> addedNodes) {
+    public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {
 
         // TODO: do this in parallel (and wait)
-        for (final DiscoveryNode node : addedNodes) {
+        for (final DiscoveryNode node : discoveryNodes) {
             try (Releasable ignored = nodeLocks.acquire(node)) {
-                Integer current = nodes.put(node, 0);
-                assert current == null : "node " + node + " was added in event but already in internal nodes";
+                nodes.putIfAbsent(node, 0);
                 validateNodeConnected(node);
             }
         }
     }
 
-    public void disconnectFromNodes(List<DiscoveryNode> removedNodes) {
-        for (final DiscoveryNode node : removedNodes) {
+    /**
+     * Disconnects from all nodes except the ones provided as parameter
+     */
+    public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
+        Set<DiscoveryNode> currentNodes = new HashSet<>(nodes.keySet());
+        for (DiscoveryNode node : nodesToKeep) {
+            currentNodes.remove(node);
+        }
+        for (final DiscoveryNode node : currentNodes) {
             try (Releasable ignored = nodeLocks.acquire(node)) {
                 Integer current = nodes.remove(node);
                 assert current != null : "node " + node + " was removed in event but not in internal nodes";

+ 4 - 3
core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

@@ -772,7 +772,7 @@ public class ClusterService extends AbstractLifecycleComponent {
             taskOutputs.createAckListener(threadPool, newClusterState) :
             null;
 
-        nodeConnectionsService.connectToNodes(clusterChangedEvent.nodesDelta().addedNodes());
+        nodeConnectionsService.connectToNodes(newClusterState.nodes());
 
         // if we are the master, publish the new state to all nodes
         // we publish here before we send a notification to all the listeners, since if it fails
@@ -788,7 +788,8 @@ public class ClusterService extends AbstractLifecycleComponent {
                         "failing [{}]: failed to commit cluster state version [{}]", taskInputs.summary, version),
                     t);
                 // ensure that list of connected nodes in NodeConnectionsService is in-sync with the nodes of the current cluster state
-                nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().addedNodes());
+                nodeConnectionsService.connectToNodes(previousClusterState.nodes());
+                nodeConnectionsService.disconnectFromNodesExcept(previousClusterState.nodes());
                 taskOutputs.publishingFailed(t);
                 return;
             }
@@ -808,7 +809,7 @@ public class ClusterService extends AbstractLifecycleComponent {
         logger.debug("set local cluster state to version {}", newClusterState.version());
         callClusterStateAppliers(newClusterState, clusterChangedEvent);
 
-        nodeConnectionsService.disconnectFromNodes(clusterChangedEvent.nodesDelta().removedNodes());
+        nodeConnectionsService.disconnectFromNodesExcept(newClusterState.nodes());
 
         updateState(css -> newClusterState);
 

+ 1 - 1
core/src/test/java/org/elasticsearch/cluster/MinimumMasterNodesIT.java

@@ -375,7 +375,7 @@ public class MinimumMasterNodesIT extends ESIntegTestCase {
         otherNodes.remove(master);
         NetworkDisruption partition = new NetworkDisruption(
             new TwoPartitions(Collections.singleton(master), otherNodes),
-            new NetworkDelay(TimeValue.timeValueMinutes(1)));
+            new NetworkDisruption.NetworkDisconnect());
         internalCluster().setDisruptionScheme(partition);
 
         final CountDownLatch latch = new CountDownLatch(1);

+ 6 - 6
core/src/test/java/org/elasticsearch/cluster/NodeConnectionsServiceTests.java

@@ -87,19 +87,19 @@ public class NodeConnectionsServiceTests extends ESTestCase {
         ClusterState current = clusterStateFromNodes(Collections.emptyList());
         ClusterChangedEvent event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
 
-        service.connectToNodes(event.nodesDelta().addedNodes());
-        assertConnected(event.nodesDelta().addedNodes());
+        service.connectToNodes(event.state().nodes());
+        assertConnected(event.state().nodes());
 
-        service.disconnectFromNodes(event.nodesDelta().removedNodes());
+        service.disconnectFromNodesExcept(event.state().nodes());
         assertConnectedExactlyToNodes(event.state());
 
         current = event.state();
         event = new ClusterChangedEvent("test", clusterStateFromNodes(randomSubsetOf(nodes)), current);
 
-        service.connectToNodes(event.nodesDelta().addedNodes());
-        assertConnected(event.nodesDelta().addedNodes());
+        service.connectToNodes(event.state().nodes());
+        assertConnected(event.state().nodes());
 
-        service.disconnectFromNodes(event.nodesDelta().removedNodes());
+        service.disconnectFromNodesExcept(event.state().nodes());
         assertConnectedExactlyToNodes(event.state());
     }
 

+ 9 - 9
core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java

@@ -18,6 +18,7 @@
  */
 package org.elasticsearch.cluster.service;
 
+import com.google.common.collect.Iterables;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
@@ -25,12 +26,12 @@ import org.apache.logging.log4j.util.Supplier;
 import org.elasticsearch.ExceptionsHelper;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.LocalClusterUpdateTask;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ClusterStateTaskConfig;
 import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.ClusterStateTaskListener;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.LocalClusterUpdateTask;
 import org.elasticsearch.cluster.LocalNodeMasterListener;
 import org.elasticsearch.cluster.NodeConnectionsService;
 import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -127,12 +128,12 @@ public class ClusterServiceTests extends ESTestCase {
             emptySet(), Version.CURRENT));
         timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
             @Override
-            public void connectToNodes(List<DiscoveryNode> addedNodes) {
+            public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {
                 // skip
             }
 
             @Override
-            public void disconnectFromNodes(List<DiscoveryNode> removedNodes) {
+            public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
                 // skip
             }
         });
@@ -1058,17 +1059,16 @@ public class ClusterServiceTests extends ESTestCase {
             threadPool);
         timedClusterService.setLocalNode(new DiscoveryNode("node1", buildNewFakeTransportAddress(), emptyMap(),
             emptySet(), Version.CURRENT));
-        Set<DiscoveryNode> currentNodes = Collections.synchronizedSet(new HashSet<>());
-        currentNodes.add(timedClusterService.localNode());
+        Set<DiscoveryNode> currentNodes = new HashSet<>();
         timedClusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
             @Override
-            public void connectToNodes(List<DiscoveryNode> addedNodes) {
-                currentNodes.addAll(addedNodes);
+            public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {
+                discoveryNodes.forEach(currentNodes::add);
             }
 
             @Override
-            public void disconnectFromNodes(List<DiscoveryNode> removedNodes) {
-                currentNodes.removeAll(removedNodes);
+            public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
+                currentNodes.removeIf(node -> Iterables.contains(nodesToKeep, node) == false);
             }
         });
         AtomicBoolean failToCommit = new AtomicBoolean();

+ 2 - 2
test/framework/src/main/java/org/elasticsearch/test/ClusterServiceUtils.java

@@ -54,12 +54,12 @@ public class ClusterServiceUtils {
         clusterService.setLocalNode(localNode);
         clusterService.setNodeConnectionsService(new NodeConnectionsService(Settings.EMPTY, null, null) {
             @Override
-            public void connectToNodes(List<DiscoveryNode> addedNodes) {
+            public void connectToNodes(Iterable<DiscoveryNode> discoveryNodes) {
                 // skip
             }
 
             @Override
-            public void disconnectFromNodes(List<DiscoveryNode> removedNodes) {
+            public void disconnectFromNodesExcept(Iterable<DiscoveryNode> nodesToKeep) {
                 // skip
             }
         });

+ 26 - 8
test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

@@ -132,6 +132,7 @@ import static org.apache.lucene.util.LuceneTestCase.TEST_NIGHTLY;
 import static org.apache.lucene.util.LuceneTestCase.rarely;
 import static org.elasticsearch.discovery.zen.ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING;
 import static org.elasticsearch.test.ESTestCase.assertBusy;
+import static org.elasticsearch.test.ESTestCase.awaitBusy;
 import static org.elasticsearch.test.ESTestCase.randomFrom;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.hamcrest.Matchers.equalTo;
@@ -1052,21 +1053,38 @@ public final class InternalTestCluster extends TestCluster {
         logger.debug("Cluster is consistent again - nodes: [{}] nextNodeId: [{}] numSharedNodes: [{}]", nodes.keySet(), nextNodeId.get(), newSize);
     }
 
-    /** ensure a cluster is form with {@link #nodes}.size() nodes. */
+    /** ensure a cluster is formed with all published nodes. */
     private void validateClusterFormed() {
         String name = randomFrom(random, getNodeNames());
         validateClusterFormed(name);
     }
 
-    /** ensure a cluster is form with {@link #nodes}.size() nodes, but do so by using the client of the specified node */
+    /** ensure a cluster is formed with all published nodes, but do so by using the client of the specified node */
     private void validateClusterFormed(String viaNode) {
-        final int size = nodes.size();
-        logger.trace("validating cluster formed via [{}], expecting [{}]", viaNode, size);
+        Set<DiscoveryNode> expectedNodes = new HashSet<>();
+        for (NodeAndClient nodeAndClient : nodes.values()) {
+            expectedNodes.add(getInstanceFromNode(ClusterService.class, nodeAndClient.node()).localNode());
+        }
+        logger.trace("validating cluster formed via [{}], expecting {}", viaNode, expectedNodes);
         final Client client = client(viaNode);
-        ClusterHealthResponse response = client.admin().cluster().prepareHealth().setWaitForNodes(Integer.toString(size)).get();
-        if (response.isTimedOut()) {
-            logger.warn("failed to wait for a cluster of size [{}], got [{}]", size, response);
-            throw new IllegalStateException("cluster failed to reach the expected size of [" + size + "]");
+        try {
+            if (awaitBusy(() -> {
+                DiscoveryNodes discoveryNodes = client.admin().cluster().prepareState().get().getState().nodes();
+                if (discoveryNodes.getSize() != expectedNodes.size()) {
+                    return false;
+                }
+                for (DiscoveryNode expectedNode : expectedNodes) {
+                    if (discoveryNodes.nodeExists(expectedNode) == false) {
+                        return false;
+                    }
+                }
+                return true;
+            }, 30, TimeUnit.SECONDS) == false) {
+                throw new IllegalStateException("cluster failed to from with expected nodes " + expectedNodes + " and actual nodes " +
+                    client.admin().cluster().prepareState().get().getState().nodes());
+            }
+        } catch (InterruptedException e) {
+            throw new IllegalStateException(e);
         }
     }