|
@@ -33,15 +33,19 @@ import org.elasticsearch.cluster.routing.AllocationId;
|
|
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource;
|
|
|
+import org.elasticsearch.cluster.routing.RoutingChangesObserver;
|
|
|
+import org.elasticsearch.cluster.routing.RoutingNode;
|
|
|
import org.elasticsearch.cluster.routing.RoutingNodes;
|
|
|
import org.elasticsearch.cluster.routing.RoutingTable;
|
|
|
import org.elasticsearch.cluster.routing.ShardRouting;
|
|
|
+import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
|
|
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
|
|
import org.elasticsearch.cluster.routing.TestShardRouting;
|
|
|
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
|
|
|
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider;
|
|
|
import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
|
|
|
import org.elasticsearch.common.UUIDs;
|
|
@@ -68,6 +72,7 @@ import static org.elasticsearch.test.VersionUtils.randomVersion;
|
|
|
import static org.hamcrest.Matchers.equalTo;
|
|
|
import static org.hamcrest.Matchers.notNullValue;
|
|
|
import static org.hamcrest.Matchers.nullValue;
|
|
|
+import static org.hamcrest.core.Is.is;
|
|
|
|
|
|
public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
|
|
|
|
|
@@ -428,4 +433,82 @@ public class NodeVersionAllocationDeciderTests extends ESAllocationTestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public void testMessages() {
|
|
|
+
|
|
|
+ MetaData metaData = MetaData.builder()
|
|
|
+ .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ RoutingTable initialRoutingTable = RoutingTable.builder()
|
|
|
+ .addAsNew(metaData.index("test"))
|
|
|
+ .build();
|
|
|
+
|
|
|
+ RoutingNode newNode = new RoutingNode("newNode", newNode("newNode", Version.CURRENT));
|
|
|
+ RoutingNode oldNode = new RoutingNode("oldNode", newNode("oldNode", VersionUtils.getPreviousVersion()));
|
|
|
+
|
|
|
+ final ClusterName clusterName = ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY);
|
|
|
+ ClusterState clusterState = ClusterState.builder(clusterName).metaData(metaData).routingTable(initialRoutingTable)
|
|
|
+ .nodes(DiscoveryNodes.builder().add(newNode.node()).add(oldNode.node())).build();
|
|
|
+
|
|
|
+ final ShardId shardId = clusterState.routingTable().index("test").shard(0).getShardId();
|
|
|
+ final ShardRouting primaryShard = clusterState.routingTable().shardRoutingTable(shardId).primaryShard();
|
|
|
+ final ShardRouting replicaShard = clusterState.routingTable().shardRoutingTable(shardId).replicaShards().get(0);
|
|
|
+
|
|
|
+ RoutingAllocation routingAllocation = new RoutingAllocation(null, clusterState.getRoutingNodes(), clusterState, null, 0);
|
|
|
+ routingAllocation.debugDecision(true);
|
|
|
+
|
|
|
+ final NodeVersionAllocationDecider allocationDecider = new NodeVersionAllocationDecider(Settings.EMPTY);
|
|
|
+ Decision decision = allocationDecider.canAllocate(primaryShard, newNode, routingAllocation);
|
|
|
+ assertThat(decision.type(), is(Decision.Type.YES));
|
|
|
+ assertThat(decision.getExplanation(), is("the primary shard is new or already existed on the node"));
|
|
|
+
|
|
|
+ decision = allocationDecider.canAllocate(ShardRoutingHelper.initialize(primaryShard, "oldNode"), newNode, routingAllocation);
|
|
|
+ assertThat(decision.type(), is(Decision.Type.YES));
|
|
|
+ assertThat(decision.getExplanation(), is("can relocate primary shard from a node with version [" +
|
|
|
+ oldNode.node().getVersion() + "] to a node with equal-or-newer version [" + newNode.node().getVersion() + "]"));
|
|
|
+
|
|
|
+ decision = allocationDecider.canAllocate(ShardRoutingHelper.initialize(primaryShard, "newNode"), oldNode, routingAllocation);
|
|
|
+ assertThat(decision.type(), is(Decision.Type.NO));
|
|
|
+ assertThat(decision.getExplanation(), is("cannot relocate primary shard from a node with version [" +
|
|
|
+ newNode.node().getVersion() + "] to a node with older version [" + oldNode.node().getVersion() + "]"));
|
|
|
+
|
|
|
+ final SnapshotRecoverySource newVersionSnapshot = new SnapshotRecoverySource(
|
|
|
+ new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), newNode.node().getVersion(), "test");
|
|
|
+ final SnapshotRecoverySource oldVersionSnapshot = new SnapshotRecoverySource(
|
|
|
+ new Snapshot("rep1", new SnapshotId("snp1", UUIDs.randomBase64UUID())), oldNode.node().getVersion(), "test");
|
|
|
+
|
|
|
+ decision = allocationDecider.canAllocate(ShardRoutingHelper.newWithRestoreSource(primaryShard, newVersionSnapshot),
|
|
|
+ oldNode, routingAllocation);
|
|
|
+ assertThat(decision.type(), is(Decision.Type.NO));
|
|
|
+ assertThat(decision.getExplanation(), is("node version [" +
|
|
|
+ oldNode.node().getVersion() + "] is older than the snapshot version [" + newNode.node().getVersion() + "]"));
|
|
|
+
|
|
|
+ decision = allocationDecider.canAllocate(ShardRoutingHelper.newWithRestoreSource(primaryShard, oldVersionSnapshot),
|
|
|
+ newNode, routingAllocation);
|
|
|
+ assertThat(decision.type(), is(Decision.Type.YES));
|
|
|
+ assertThat(decision.getExplanation(), is("node version [" +
|
|
|
+ newNode.node().getVersion() + "] is the same or newer than snapshot version [" + oldNode.node().getVersion() + "]"));
|
|
|
+
|
|
|
+ final RoutingChangesObserver routingChangesObserver = new RoutingChangesObserver.AbstractRoutingChangesObserver();
|
|
|
+ final RoutingNodes routingNodes = new RoutingNodes(clusterState, false);
|
|
|
+ final ShardRouting startedPrimary = routingNodes.startShard(logger, routingNodes.initializeShard(primaryShard, "newNode", null, 0,
|
|
|
+ routingChangesObserver), routingChangesObserver);
|
|
|
+ routingAllocation = new RoutingAllocation(null, routingNodes, clusterState, null, 0);
|
|
|
+ routingAllocation.debugDecision(true);
|
|
|
+
|
|
|
+ decision = allocationDecider.canAllocate(replicaShard, oldNode, routingAllocation);
|
|
|
+ assertThat(decision.type(), is(Decision.Type.NO));
|
|
|
+ assertThat(decision.getExplanation(), is("cannot allocate replica shard to a node with version [" +
|
|
|
+ oldNode.node().getVersion() + "] since this is older than the primary version [" + newNode.node().getVersion() + "]"));
|
|
|
+
|
|
|
+ routingNodes.startShard(logger, routingNodes.relocateShard(startedPrimary, "oldNode", 0, routingChangesObserver).v2(),
|
|
|
+ routingChangesObserver);
|
|
|
+ routingAllocation = new RoutingAllocation(null, routingNodes, clusterState, null, 0);
|
|
|
+ routingAllocation.debugDecision(true);
|
|
|
+
|
|
|
+ decision = allocationDecider.canAllocate(replicaShard, newNode, routingAllocation);
|
|
|
+ assertThat(decision.type(), is(Decision.Type.YES));
|
|
|
+ assertThat(decision.getExplanation(), is("can allocate replica shard to a node with version [" +
|
|
|
+ newNode.node().getVersion() + "] since this is equal-or-newer than the primary version [" + oldNode.node().getVersion() + "]"));
|
|
|
+ }
|
|
|
}
|