Quellcode durchsuchen

Fix up roles after rolling upgrade (#64693)

Node roles vary by version, and new roles are suppressed for BWC. This
means we can receive a join from a node that's already in the cluster
but with a different set of roles: the node didn't change roles, but the
cluster state came via an older master. This commit ensures that we
properly process a join from such a node to ensure that the roles are
correct.

Closes #62840
David Turner vor 5 Jahren
Ursprung
Commit
9b97719833

+ 1 - 1
server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java

@@ -132,7 +132,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
         for (final Task joinTask : joiningNodes) {
             if (joinTask.isBecomeMasterTask() || joinTask.isFinishElectionTask()) {
                 // noop
-            } else if (currentNodes.nodeExists(joinTask.node())) {
+            } else if (currentNodes.nodeExistsWithSameRoles(joinTask.node())) {
                 logger.debug("received a join request for an existing node [{}]", joinTask.node());
             } else {
                 final DiscoveryNode node = joinTask.node();

+ 9 - 0
server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

@@ -202,6 +202,15 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
         return existing != null && existing.equals(node);
     }
 
+    /**
+     * Determine if the given node exists and has the right roles. Supported roles vary by version, and our local cluster state might
+     * have come via an older master, so the roles may differ even if the node is otherwise identical.
+     */
+    public boolean nodeExistsWithSameRoles(DiscoveryNode discoveryNode) {
+        final DiscoveryNode existing = nodes.get(discoveryNode.getId());
+        return existing != null && existing.equals(discoveryNode) && existing.getRoles().equals(discoveryNode.getRoles());
+    }
+
     /**
      * Get the id of the master node
      *

+ 46 - 0
server/src/test/java/org/elasticsearch/cluster/coordination/JoinTaskExecutorTests.java

@@ -19,18 +19,31 @@
 package org.elasticsearch.cluster.coordination;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateTaskExecutor;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.RerouteService;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.VersionUtils;
 
+import java.util.HashSet;
+import java.util.List;
+
 import static org.elasticsearch.test.VersionUtils.maxCompatibleVersion;
 import static org.elasticsearch.test.VersionUtils.randomCompatibleVersion;
 import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasSize;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class JoinTaskExecutorTests extends ESTestCase {
 
@@ -118,4 +131,37 @@ public class JoinTaskExecutorTests extends ESTestCase {
             JoinTaskExecutor.ensureIndexCompatibility(Version.CURRENT,
                 metadata);
     }
+
+    public void testUpdatesNodeWithNewRoles() throws Exception {
+        // Node roles vary by version, and new roles are suppressed for BWC. This means we can receive a join from a node that's already
+        // in the cluster but with a different set of roles: the node didn't change roles, but the cluster state came via an older master.
+        // In this case we must properly process its join to ensure that the roles are correct.
+
+        final AllocationService allocationService = mock(AllocationService.class);
+        when(allocationService.adaptAutoExpandReplicas(any())).then(invocationOnMock -> invocationOnMock.getArguments()[0]);
+        final RerouteService rerouteService = (reason, priority, listener) -> listener.onResponse(null);
+
+        final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(allocationService, logger, rerouteService);
+
+        final DiscoveryNode masterNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
+
+        final DiscoveryNode actualNode = new DiscoveryNode(UUIDs.base64UUID(), buildNewFakeTransportAddress(), Version.CURRENT);
+        final DiscoveryNode bwcNode = new DiscoveryNode(actualNode.getName(), actualNode.getId(), actualNode.getEphemeralId(),
+                actualNode.getHostName(), actualNode.getHostAddress(), actualNode.getAddress(), actualNode.getAttributes(),
+                new HashSet<>(randomSubsetOf(actualNode.getRoles())), actualNode.getVersion());
+        final ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).nodes(DiscoveryNodes.builder()
+                .add(masterNode)
+                .localNodeId(masterNode.getId())
+                .masterNodeId(masterNode.getId())
+                .add(bwcNode)
+        ).build();
+
+        final ClusterStateTaskExecutor.ClusterTasksResult<JoinTaskExecutor.Task> result
+                = joinTaskExecutor.execute(clusterState, List.of(new JoinTaskExecutor.Task(actualNode, "test")));
+        assertThat(result.executionResults.entrySet(), hasSize(1));
+        final ClusterStateTaskExecutor.TaskResult taskResult = result.executionResults.values().iterator().next();
+        assertTrue(taskResult.isSuccess());
+
+        assertThat(result.resultingState.getNodes().get(actualNode.getId()).getRoles(), equalTo(actualNode.getRoles()));
+    }
 }