Bladeren bron

Merge pull request #15775 from ywelsch/fix/node-versions-compatiblity

Prevent peer recovery from node with older version
Yannick Welsch 9 jaren geleden
bovenliggende
commit
501497df1c

+ 12 - 10
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/NodeVersionAllocationDecider.java

@@ -44,22 +44,24 @@ public class NodeVersionAllocationDecider extends AllocationDecider {
 
     @Override
     public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
-        String sourceNodeId = shardRouting.currentNodeId();
-        /* if sourceNodeId is not null we do a relocation and just check the version of the node
-         * that we are currently allocate on. If not we are initializing and recover from primary.*/
-        if (sourceNodeId == null) { // we allocate - check primary
-            if (shardRouting.primary()) {
-                // we are the primary we can allocate wherever
+        if (shardRouting.primary()) {
+            if (shardRouting.currentNodeId() == null) {
+                // fresh primary, we can allocate wherever
                 return allocation.decision(Decision.YES, NAME, "primary shard can be allocated anywhere");
+            } else {
+                // relocating primary, only migrate to newer host
+                return isVersionCompatible(allocation.routingNodes(), shardRouting.currentNodeId(), node, allocation);
             }
+        } else {
             final ShardRouting primary = allocation.routingNodes().activePrimary(shardRouting);
-            if (primary == null) { // we have a primary - it's a start ;)
+            // check that active primary has a newer version so that peer recovery works
+            if (primary != null) {
+                return isVersionCompatible(allocation.routingNodes(), primary.currentNodeId(), node, allocation);
+            } else {
+                // ReplicaAfterPrimaryActiveAllocationDecider should prevent this case from occurring
                 return allocation.decision(Decision.YES, NAME, "no active primary shard yet");
             }
-            sourceNodeId = primary.currentNodeId();
         }
-        return isVersionCompatible(allocation.routingNodes(), sourceNodeId, node, allocation);
-
     }
 
     private Decision isVersionCompatible(final RoutingNodes routingNodes, final String sourceNodeId, final RoutingNode target, RoutingAllocation allocation) {

+ 69 - 7
core/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java

@@ -21,19 +21,32 @@ package org.elasticsearch.cluster.routing.allocation;
 
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.EmptyClusterInfoService;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.RoutingNodes;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.TestShardRouting;
+import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
+import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
+import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
+import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
+import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
 import org.elasticsearch.common.logging.ESLogger;
 import org.elasticsearch.common.logging.Loggers;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.DummyTransportAddress;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESAllocationTestCase;
 import org.elasticsearch.test.VersionUtils;
+import org.elasticsearch.test.gateway.NoopGatewayAllocator;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -285,6 +298,45 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
         }
     }
 
+    public void testRebalanceDoesNotAllocatePrimaryAndReplicasOnDifferentVersionNodes() {
+        ShardId shard1 = new ShardId("test1", 0);
+        ShardId shard2 = new ShardId("test2", 0);
+        final DiscoveryNode newNode = new DiscoveryNode("newNode", DummyTransportAddress.INSTANCE, Version.CURRENT);
+        final DiscoveryNode oldNode1 = new DiscoveryNode("oldNode1", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion());
+        final DiscoveryNode oldNode2 = new DiscoveryNode("oldNode2", DummyTransportAddress.INSTANCE, VersionUtils.getPreviousVersion());
+        MetaData metaData = MetaData.builder()
+            .put(IndexMetaData.builder(shard1.getIndex()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1))
+            .put(IndexMetaData.builder(shard2.getIndex()).settings(settings(Version.CURRENT).put(Settings.EMPTY)).numberOfShards(1).numberOfReplicas(1))
+            .build();
+        RoutingTable routingTable = RoutingTable.builder()
+            .add(IndexRoutingTable.builder(shard1.getIndex())
+                .addIndexShard(new IndexShardRoutingTable.Builder(shard1)
+                    .addShard(TestShardRouting.newShardRouting(shard1.getIndex(), shard1.getId(), newNode.id(), true, ShardRoutingState.STARTED, 10))
+                    .addShard(TestShardRouting.newShardRouting(shard1.getIndex(), shard1.getId(), oldNode1.id(), false, ShardRoutingState.STARTED, 10))
+                    .build())
+            )
+            .add(IndexRoutingTable.builder(shard2.getIndex())
+                .addIndexShard(new IndexShardRoutingTable.Builder(shard2)
+                    .addShard(TestShardRouting.newShardRouting(shard2.getIndex(), shard2.getId(), newNode.id(), true, ShardRoutingState.STARTED, 10))
+                    .addShard(TestShardRouting.newShardRouting(shard2.getIndex(), shard2.getId(), oldNode1.id(), false, ShardRoutingState.STARTED, 10))
+                    .build())
+            )
+            .build();
+        ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
+            .metaData(metaData)
+            .routingTable(routingTable)
+            .nodes(DiscoveryNodes.builder().put(newNode).put(oldNode1).put(oldNode2)).build();
+        AllocationDeciders allocationDeciders = new AllocationDeciders(Settings.EMPTY, new AllocationDecider[] {new NodeVersionAllocationDecider(Settings.EMPTY)});
+        AllocationService strategy = new MockAllocationService(Settings.EMPTY,
+            allocationDeciders,
+            new ShardsAllocators(Settings.EMPTY, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
+        RoutingAllocation.Result result = strategy.reroute(state, new AllocationCommands(), true);
+        // the two indices must stay as is, the replicas cannot move to oldNode2 because versions don't match
+        state = ClusterState.builder(state).routingResult(result).build();
+        assertThat(result.routingTable().index(shard2.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
+        assertThat(result.routingTable().index(shard1.getIndex()).shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
+    }
+
     private ClusterState stabilize(ClusterState clusterState, AllocationService service) {
         logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint());
 
@@ -317,17 +369,27 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
 
         List<ShardRouting> mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.RELOCATING);
         for (ShardRouting r : mutableShardRoutings) {
-            String toId = r.relocatingNodeId();
-            String fromId = r.currentNodeId();
-            assertThat(fromId, notNullValue());
-            assertThat(toId, notNullValue());
-            logger.trace("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version());
-            assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
+            if (r.primary()) {
+                String toId = r.relocatingNodeId();
+                String fromId = r.currentNodeId();
+                assertThat(fromId, notNullValue());
+                assertThat(toId, notNullValue());
+                logger.trace("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version());
+                assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
+            } else {
+                ShardRouting primary = routingNodes.activePrimary(r);
+                assertThat(primary, notNullValue());
+                String fromId = primary.currentNodeId();
+                String toId = r.relocatingNodeId();
+                logger.error("From: " + fromId + " with Version: " + routingNodes.node(fromId).node().version() + " to: " + toId + " with Version: " + routingNodes.node(toId).node().version());
+                logger.error(routingNodes.prettyPrint());
+                assertTrue(routingNodes.node(toId).node().version().onOrAfter(routingNodes.node(fromId).node().version()));
+            }
         }
 
         mutableShardRoutings = routingNodes.shardsWithState(ShardRoutingState.INITIALIZING);
         for (ShardRouting r : mutableShardRoutings) {
-            if (r.initializing() && r.relocatingNodeId() == null && !r.primary()) {
+            if (!r.primary()) {
                 ShardRouting primary = routingNodes.activePrimary(r);
                 assertThat(primary, notNullValue());
                 String fromId = primary.currentNodeId();