Browse Source

Master node disconnect (#132023)

Extends the Coordinator so that we don't prematurely close the connection to a joining node if there is a master election in the interim. This prevents a `node-join: [{}] with reason [{}]; for troubleshooting guidance, see {}` WARN log being emitted unnecessarily.

Closes #126192

Jira Ticket - ES-11449
Joshua Adams 1 month ago
parent
commit
656a7a99cf

+ 258 - 0
server/src/internalClusterTest/java/org/elasticsearch/cluster/coordination/NodeJoiningIT.java

@@ -0,0 +1,258 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the "Elastic License
+ * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
+ * Public License v 1"; you may not use this file except in compliance with, at
+ * your election, the "Elastic License 2.0", the "GNU Affero General Public
+ * License v3.0 only", or the "Server Side Public License, v 1".
+ */
+
+package org.elasticsearch.cluster.coordination;
+
+import org.apache.logging.log4j.Level;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterApplierService;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.cluster.service.MasterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.util.CollectionUtils;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.test.ClusterServiceUtils;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.MockLog;
+import org.elasticsearch.test.junit.annotations.TestLogging;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.Transport;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.Collection;
+import java.util.List;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
+public class NodeJoiningIT extends ESIntegTestCase {
+
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return CollectionUtils.appendToCopyNoNullElements(super.nodePlugins(), MockTransportService.TestPlugin.class);
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal, otherSettings))
+            // detect leader failover quickly
+            .put(LeaderChecker.LEADER_CHECK_RETRY_COUNT_SETTING.getKey(), 1)
+            .put(LeaderChecker.LEADER_CHECK_INTERVAL_SETTING.getKey(), "100ms")
+            .build();
+    }
+
+    public void testNodeJoinsCluster() {
+        internalCluster().startNodes(3);
+        String masterNodeName = internalCluster().getMasterName();
+        int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();
+        int numberOfMasterNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName)
+            .state()
+            .nodes()
+            .getMasterNodes()
+            .size();
+        List<String> namesOfDataNodesInOriginalCluster = getListOfDataNodeNamesFromCluster(masterNodeName);
+
+        // Attempt to add new node
+        String newNodeName = internalCluster().startDataOnlyNode();
+        ensureStableCluster(4);
+
+        // Assert the new data node was added
+        ClusterState state = internalCluster().clusterService(masterNodeName).state();
+        assertEquals(numberOfNodesOriginallyInCluster + 1, state.nodes().getSize());
+        assertEquals(namesOfDataNodesInOriginalCluster.size() + 1, state.nodes().getDataNodes().size());
+        assertEquals(numberOfMasterNodesOriginallyInCluster, state.nodes().getMasterNodes().size());
+
+        List<String> namesOfDataNodesInNewCluster = getListOfDataNodeNamesFromCluster(masterNodeName);
+        assertTrue(namesOfDataNodesInNewCluster.contains(newNodeName));
+        for (String nodeName : namesOfDataNodesInOriginalCluster) {
+            assertTrue(namesOfDataNodesInNewCluster.contains(nodeName));
+        }
+    }
+
+    @TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination.NodeJoinExecutor:INFO")
+    public void testNodeTriesToJoinClusterAndThenDifferentMasterIsElected() {
+        List<String> nodeNames = internalCluster().startNodes(3);
+        ensureStableCluster(3);
+        String originalMasterNodeName = internalCluster().getMasterName();
+        int numberOfNodesOriginallyInCluster = internalCluster().clusterService(originalMasterNodeName).state().getNodes().size();
+        // Determine upfront who we want the next master to be
+        final var newMasterNodeName = randomValueOtherThan(originalMasterNodeName, () -> randomFrom(nodeNames));
+
+        // Ensure the logging is as expected
+        try (var mockLog = MockLog.capture(NodeJoinExecutor.class)) {
+
+            // Sets MockTransportService behaviour
+            for (final var transportService : internalCluster().getInstances(TransportService.class)) {
+                final var mockTransportService = asInstanceOf(MockTransportService.class, transportService);
+
+                if (mockTransportService.getLocalNode().getName().equals(newMasterNodeName) == false) {
+                    List<String> listOfActionsToBlock = List.of(
+                        // This forces the current master node to fail
+                        PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME,
+                        // This disables pre-voting on all nodes except the new master, forcing it to win the election
+                        StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME
+                    );
+                    blockActionNameOnMockTransportService(mockTransportService, listOfActionsToBlock);
+                }
+            }
+
+            // We do not expect to see a WARN log about a node disconnecting (#ES-11449)
+            addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog);
+
+            // We haven't changed master nodes yet
+            assertEquals(originalMasterNodeName, internalCluster().getMasterName());
+
+            // Sends a node join request to the original master node. This will fail, and cause a master failover
+            // startDataOnlyNode waits for the new node to be added, and this can only occur after a re-election
+            String newNodeName = internalCluster().startDataOnlyNode();
+            assertNotEquals(originalMasterNodeName, internalCluster().getMasterName());
+            logger.info("New master is elected");
+
+            // Assert all nodes have accepted N into their cluster state
+            assertNewNodeIsInAllClusterStates(newNodeName);
+
+            mockLog.assertAllExpectationsMatched();
+
+            // Assert the new data node was added
+            DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
+            assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
+            assertTrue(getListOfDataNodeNamesFromCluster(newMasterNodeName).contains(newNodeName));
+        }
+    }
+
+    /*
+        In this scenario, node N attempts to join a cluster, there is an election and the original master is re-elected.
+        Node N should join the cluster, but it should not be disconnected (#ES-11449)
+     */
+    @TestLogging(reason = "test includes assertions about logging", value = "org.elasticsearch.cluster.coordination:INFO")
+    public void testNodeTriesToJoinClusterAndThenSameMasterIsElected() {
+        internalCluster().startNodes(3);
+        ensureStableCluster(3);
+        String masterNodeName = internalCluster().getMasterName();
+
+        long originalTerm = getTerm(masterNodeName);
+        int numberOfNodesOriginallyInCluster = internalCluster().clusterService(masterNodeName).state().getNodes().size();
+
+        try (var mockLog = MockLog.capture(NodeJoinExecutor.class, MasterService.class, ClusterApplierService.class)) {
+            for (String nodeName : internalCluster().getNodeNames()) {
+                final var mockTransportService = MockTransportService.getInstance(nodeName);
+
+                if (nodeName.equals(masterNodeName)) {
+                    // This makes the master fail, forcing a re-election
+                    blockActionNameOnMockTransportService(
+                        mockTransportService,
+                        List.of(PublicationTransportHandler.PUBLISH_STATE_ACTION_NAME)
+                    );
+
+                    // Wait until the master has stepped down before removing the publishing ban
+                    // This allows the master to be re-elected
+                    ClusterServiceUtils.addTemporaryStateListener(internalCluster().clusterService(masterNodeName), clusterState -> {
+                        DiscoveryNode currentMasterNode = clusterState.nodes().getMasterNode();
+                        boolean hasMasterSteppedDown = currentMasterNode == null
+                            || currentMasterNode.getName().equals(masterNodeName) == false;
+                        if (hasMasterSteppedDown) {
+                            logger.info("Master publishing ban removed");
+                            mockTransportService.addSendBehavior(Transport.Connection::sendRequest);
+                        }
+                        return hasMasterSteppedDown;
+                    });
+
+                } else {
+                    // This disables pre-voting on all nodes except the master, forcing it to win the election
+                    blockActionNameOnMockTransportService(
+                        mockTransportService,
+                        List.of(StatefulPreVoteCollector.REQUEST_PRE_VOTE_ACTION_NAME)
+                    );
+                }
+            }
+
+            // We expect the node join request to fail with a FailedToCommitClusterStateException
+            mockLog.addExpectation(
+                new MockLog.SeenEventExpectation(
+                    "failed to commit cluster state",
+                    MasterService.class.getCanonicalName(),
+                    Level.WARN,
+                    "failed to commit cluster state"
+                )
+            );
+
+            /*
+                We expect the cluster to reuse the connection to N and not disconnect it
+                Therefore, this WARN log should not be thrown (#ES-11449)
+             */
+            addJoiningNodeDisconnectedWarnLogFalseExpectation(mockLog);
+
+            // Before we add the new node, assert we haven't changed master nodes yet
+            assertEquals(masterNodeName, internalCluster().getMasterName());
+
+            // Sends a node join request to the original master node. This will fail, and cause a master failover
+            logger.info("Sending node join request");
+            String newNodeName = internalCluster().startDataOnlyNode();
+
+            // Assert the master was re-elected
+            assertEquals(masterNodeName, internalCluster().getMasterName());
+            assertTrue(originalTerm < getTerm(masterNodeName));
+
+            // Assert all nodes have accepted N into their cluster state
+            assertNewNodeIsInAllClusterStates(newNodeName);
+
+            // If the WARN log was thrown, then the connection to N was disconnected so fail the test
+            mockLog.assertAllExpectationsMatched();
+
+            // Assert the new data node was added
+            DiscoveryNodes discoveryNodes = internalCluster().clusterService().state().nodes();
+            assertEquals(numberOfNodesOriginallyInCluster + 1, discoveryNodes.getSize());
+            assertTrue(getListOfDataNodeNamesFromCluster(masterNodeName).contains(newNodeName));
+        }
+    }
+
+    private long getTerm(String masterNodeName) {
+        return internalCluster().clusterService(masterNodeName).state().coordinationMetadata().term();
+    }
+
+    private void assertNewNodeIsInAllClusterStates(String newNodeName) {
+        for (ClusterService clusterService : internalCluster().getInstances(ClusterService.class)) {
+            assertTrue(clusterService.state().nodes().getAllNodes().stream().map(DiscoveryNode::getName).toList().contains(newNodeName));
+        }
+    }
+
+    private List<String> getListOfDataNodeNamesFromCluster(String nodeName) {
+        return internalCluster().clusterService(nodeName)
+            .state()
+            .getNodes()
+            .getDataNodes()
+            .values()
+            .stream()
+            .map(DiscoveryNode::getName)
+            .toList();
+    }
+
+    private void addJoiningNodeDisconnectedWarnLogFalseExpectation(MockLog mockLog) {
+        mockLog.addExpectation(
+            new MockLog.UnseenEventExpectation(
+                "warn message with troubleshooting link",
+                "org.elasticsearch.cluster.coordination.NodeJoinExecutor",
+                Level.WARN,
+                "*"
+            )
+        );
+    }
+
+    private void blockActionNameOnMockTransportService(MockTransportService mockTransportService, List<String> actionNamesToBlock) {
+        mockTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
+            if (actionNamesToBlock.contains(action)) {
+                throw new ElasticsearchException("[{}] for [{}] denied", action, connection.getNode());
+            } else {
+                connection.sendRequest(requestId, action, request, options);
+            }
+        });
+    }
+}

+ 58 - 6
server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java

@@ -19,8 +19,10 @@ import org.elasticsearch.action.support.RefCountingListener;
 import org.elasticsearch.action.support.SubscribableListener;
 import org.elasticsearch.action.support.ThreadedActionListener;
 import org.elasticsearch.client.internal.Client;
+import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
 import org.elasticsearch.cluster.ClusterStatePublicationEvent;
 import org.elasticsearch.cluster.ClusterStateUpdateTask;
 import org.elasticsearch.cluster.LocalMasterServiceTask;
@@ -39,6 +41,7 @@ import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
 import org.elasticsearch.cluster.service.ClusterApplierService;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
 import org.elasticsearch.cluster.version.CompatibilityVersions;
@@ -190,6 +193,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
     private final NodeHealthService nodeHealthService;
     private final List<PeerFinderListener> peerFinderListeners;
     private final LeaderHeartbeatService leaderHeartbeatService;
+    private final ClusterService clusterService;
 
     /**
      * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
@@ -218,7 +222,8 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         LeaderHeartbeatService leaderHeartbeatService,
         PreVoteCollector.Factory preVoteCollectorFactory,
         CompatibilityVersions compatibilityVersions,
-        FeatureService featureService
+        FeatureService featureService,
+        ClusterService clusterService
     ) {
         this.settings = settings;
         this.transportService = transportService;
@@ -328,6 +333,7 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         this.peerFinderListeners.add(clusterBootstrapService);
         this.leaderHeartbeatService = leaderHeartbeatService;
         this.compatibilityVersions = compatibilityVersions;
+        this.clusterService = clusterService;
     }
 
     /**
@@ -662,11 +668,57 @@ public class Coordinator extends AbstractLifecycleComponent implements ClusterSt
         transportService.connectToNode(joinRequest.getSourceNode(), new ActionListener<>() {
             @Override
             public void onResponse(Releasable response) {
-                validateJoinRequest(
-                    joinRequest,
-                    ActionListener.runBefore(joinListener, () -> Releasables.close(response))
-                        .delegateFailure((l, ignored) -> processJoinRequest(joinRequest, l))
-                );
+                SubscribableListener
+                    // Validates the join request: can the remote node deserialize our cluster state and does it respond to pings?
+                    .<Void>newForked(l -> validateJoinRequest(joinRequest, l))
+
+                    // Adds the joining node to the cluster state
+                    .<Void>andThen(l -> processJoinRequest(joinRequest, l.delegateResponse((ll, e) -> {
+                        // #ES-11449
+                        if (e instanceof FailedToCommitClusterStateException) {
+                            // The commit failed (i.e. master is failing over) but this does not imply that the join has actually failed:
+                            // the next master may have already accepted the state that we just published and will therefore include the
+                            // joining node in its future states too. Thus we need to wait for the next committed state before we know the
+                            // eventual outcome, and we need to wait for that before we can release (our ref to) the connection and complete
+                            // the listener.
+
+                            // NB we are on the master update thread here at the end of processing the failed cluster state update, so this
+                            // all happens before any cluster state update that re-elects a master
+                            assert ThreadPool.assertCurrentThreadPool(MasterService.MASTER_UPDATE_THREAD_NAME);
+
+                            final ClusterStateListener clusterStateListener = new ClusterStateListener() {
+                                @Override
+                                public void clusterChanged(ClusterChangedEvent event) {
+                                    final var discoveryNodes = event.state().nodes();
+                                    // Keep the connection open until the next committed state
+                                    if (discoveryNodes.getMasterNode() != null) {
+                                        // Remove this listener to avoid memory leaks
+                                        clusterService.removeListener(this);
+                                        if (discoveryNodes.nodeExists(joinRequest.getSourceNode().getId())) {
+                                            ll.onResponse(null);
+                                        } else {
+                                            ll.onFailure(e);
+                                        }
+                                    }
+                                }
+                            };
+                            clusterService.addListener(clusterStateListener);
+                            clusterStateListener.clusterChanged(
+                                new ClusterChangedEvent(
+                                    "Checking if another master has been elected since "
+                                        + joinRequest.getSourceNode().getName()
+                                        + " attempted to join cluster",
+                                    clusterService.state(),
+                                    clusterService.state()
+                                )
+                            );
+                        } else {
+                            ll.onFailure(e);
+                        }
+                    })))
+
+                    // Whatever the outcome, release (our ref to) the connection we just opened and notify the joining node.
+                    .addListener(ActionListener.runBefore(joinListener, () -> Releasables.close(response)));
             }
 
             @Override

+ 5 - 2
server/src/main/java/org/elasticsearch/discovery/DiscoveryModule.java

@@ -23,6 +23,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterApplier;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.cluster.version.CompatibilityVersions;
 import org.elasticsearch.common.Randomness;
@@ -107,7 +108,8 @@ public class DiscoveryModule extends AbstractModule {
         NodeHealthService nodeHealthService,
         CircuitBreakerService circuitBreakerService,
         CompatibilityVersions compatibilityVersions,
-        FeatureService featureService
+        FeatureService featureService,
+        ClusterService clusterService
     ) {
         final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
         final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
@@ -194,7 +196,8 @@ public class DiscoveryModule extends AbstractModule {
                 leaderHeartbeatService,
                 preVoteCollectorFactory,
                 compatibilityVersions,
-                featureService
+                featureService,
+                clusterService
             );
         } else {
             throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");

+ 2 - 1
server/src/main/java/org/elasticsearch/node/NodeConstruction.java

@@ -1722,7 +1722,8 @@ class NodeConstruction {
             fsHealthService,
             circuitBreakerService,
             compatibilityVersions,
-            featureService
+            featureService,
+            clusterService
         );
 
         modules.add(module, b -> {

+ 12 - 2
server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java

@@ -22,6 +22,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodeUtils;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.cluster.service.MasterServiceTests;
@@ -212,8 +213,16 @@ public class NodeJoinTests extends ESTestCase {
             clusterSettings,
             Collections.emptySet()
         );
+        String nodeName = "test_node";
+        Settings settings = Settings.builder().put(Node.NODE_NAME_SETTING.getKey(), nodeName).build();
+        ClusterService clusterService = new ClusterService(
+            settings,
+            clusterSettings,
+            threadPool,
+            new TaskManager(settings, threadPool, Set.of())
+        );
         coordinator = new Coordinator(
-            "test_node",
+            nodeName,
             Settings.EMPTY,
             clusterSettings,
             transportService,
@@ -234,7 +243,8 @@ public class NodeJoinTests extends ESTestCase {
             LeaderHeartbeatService.NO_OP,
             StatefulPreVoteCollector::new,
             CompatibilityVersionsUtils.staticCurrent(),
-            new FeatureService(List.of())
+            new FeatureService(List.of()),
+            clusterService
         );
         transportService.start();
         transportService.acceptIncomingRequests();

+ 5 - 1
server/src/test/java/org/elasticsearch/discovery/DiscoveryModuleTests.java

@@ -19,6 +19,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.VersionInformation;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.service.ClusterApplier;
+import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.cluster.service.MasterService;
 import org.elasticsearch.cluster.version.CompatibilityVersionsUtils;
 import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -61,6 +62,7 @@ public class DiscoveryModuleTests extends ESTestCase {
     private ClusterApplier clusterApplier;
     private ClusterSettings clusterSettings;
     private GatewayMetaState gatewayMetaState;
+    private ClusterService clusterService;
 
     public interface DummyHostsProviderPlugin extends DiscoveryPlugin {
         Map<String, Supplier<SeedHostsProvider>> impl();
@@ -88,6 +90,7 @@ public class DiscoveryModuleTests extends ESTestCase {
         clusterApplier = mock(ClusterApplier.class);
         clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
         gatewayMetaState = mock(GatewayMetaState.class);
+        clusterService = mock(ClusterService.class);
     }
 
     @After
@@ -118,7 +121,8 @@ public class DiscoveryModuleTests extends ESTestCase {
             null,
             new NoneCircuitBreakerService(),
             CompatibilityVersionsUtils.staticCurrent(),
-            new FeatureService(List.of())
+            new FeatureService(List.of()),
+            clusterService
         );
     }
 

+ 2 - 1
server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java

@@ -2981,7 +2981,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
                     LeaderHeartbeatService.NO_OP,
                     StatefulPreVoteCollector::new,
                     CompatibilityVersionsUtils.staticCurrent(),
-                    new FeatureService(List.of())
+                    new FeatureService(List.of()),
+                    this.clusterService
                 );
                 masterService.setClusterStatePublisher(coordinator);
                 coordinator.start();

+ 2 - 1
test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java

@@ -1171,7 +1171,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
                     coordinationServices.getLeaderHeartbeatService(),
                     coordinationServices.getPreVoteCollectorFactory(),
                     CompatibilityVersionsUtils.staticCurrent(),
-                    new FeatureService(List.of())
+                    new FeatureService(List.of()),
+                    clusterService
                 );
                 coordinationDiagnosticsService = new CoordinationDiagnosticsService(
                     clusterService,