Ver Fonte

Use desired nodes during data tier allocation decisions (#87735)

Today when the DataTierAllocationDecider checks if a shard can be allocated
in a node or not it uses the current cluster topology. This works fine in most cases
when the data flows from hotter to colder data nodes as it was designed. But
in certain situations users might want to remove a tier (i.e. the cold tier) that
allows having shards in hotter tiers, but with the current implementation once
a shard is allocated into the highest preference tier and there are nodes of that
tier, the shard won't move. This commit introduces a change that allows to use
the desired nodes (when available) to compute the preferred tier based on the
cluster topology provided by the desired nodes (only taking into account desired
nodes that are in ACTUALIZED status).
Francisco Fernández Castaño há 3 anos atrás
pai
commit
5a0b332303

+ 5 - 0
docs/changelog/87735.yaml

@@ -0,0 +1,5 @@
+pr: 87735
+summary: Use desired nodes during data tier allocation decisions
+area: Allocation
+type: enhancement
+issues: []

+ 13 - 4
server/src/main/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesAction.java

@@ -25,6 +25,7 @@ import org.elasticsearch.cluster.metadata.DesiredNodes;
 import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.routing.RerouteService;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.inject.Inject;
@@ -50,7 +51,8 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
         ThreadPool threadPool,
         ActionFilters actionFilters,
         IndexNameExpressionResolver indexNameExpressionResolver,
-        DesiredNodesSettingsValidator settingsValidator
+        DesiredNodesSettingsValidator settingsValidator,
+        AllocationService allocationService
     ) {
         super(
             UpdateDesiredNodesAction.NAME,
@@ -65,7 +67,7 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
             ThreadPool.Names.SAME
         );
         this.settingsValidator = settingsValidator;
-        this.taskExecutor = new UpdateDesiredNodesExecutor(clusterService.getRerouteService());
+        this.taskExecutor = new UpdateDesiredNodesExecutor(clusterService.getRerouteService(), allocationService);
     }
 
     @Override
@@ -167,9 +169,11 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
         );
 
         private final RerouteService rerouteService;
+        private final AllocationService allocationService;
 
-        UpdateDesiredNodesExecutor(RerouteService rerouteService) {
+        UpdateDesiredNodesExecutor(RerouteService rerouteService, AllocationService allocationService) {
             this.rerouteService = rerouteService;
+            this.allocationService = allocationService;
         }
 
         @Override
@@ -194,7 +198,12 @@ public class TransportUpdateDesiredNodesAction extends TransportMasterNodeAction
 
             desiredNodes = DesiredNodes.updateDesiredNodesStatusIfNeeded(currentState.nodes(), desiredNodes);
 
-            return desiredNodes == initialDesiredNodes ? currentState : replaceDesiredNodes(currentState, desiredNodes);
+            if (desiredNodes == initialDesiredNodes) {
+                return currentState;
+            } else {
+                final ClusterState withUpdatedDesiredNodes = replaceDesiredNodes(currentState, desiredNodes);
+                return allocationService.adaptAutoExpandReplicas(withUpdatedDesiredNodes);
+            }
         }
 
         @Override

+ 8 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

@@ -11,6 +11,7 @@ package org.elasticsearch.cluster.routing.allocation;
 import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.RestoreInProgress;
+import org.elasticsearch.cluster.metadata.DesiredNodes;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
@@ -87,6 +88,8 @@ public class RoutingAllocation {
      * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
      * @param routingNodes Routing nodes in the current cluster or {@code null} if using those in the given cluster state
      * @param clusterState cluster state before rerouting
+     * @param clusterInfo information about node disk usage and shard disk usage
+     * @param shardSizeInfo information about snapshot shard sizes
      * @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
      */
     public RoutingAllocation(
@@ -168,6 +171,11 @@ public class RoutingAllocation {
         return shardSizeInfo;
     }
 
+    @Nullable
+    public DesiredNodes desiredNodes() {
+        return DesiredNodes.latestFromClusterState(clusterState);
+    }
+
     /**
      * Returns the map of node id to shutdown metadata currently in the cluster
      */

+ 7 - 3
server/src/test/java/org/elasticsearch/action/admin/cluster/desirednodes/TransportUpdateDesiredNodesActionTests.java

@@ -24,6 +24,7 @@ import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
 import org.elasticsearch.cluster.metadata.DesiredNodesTestCase;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
 import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
@@ -59,7 +60,8 @@ public class TransportUpdateDesiredNodesActionTests extends DesiredNodesTestCase
             mock(ThreadPool.class),
             mock(ActionFilters.class),
             mock(IndexNameExpressionResolver.class),
-            NO_OP_SETTINGS_VALIDATOR
+            NO_OP_SETTINGS_VALIDATOR,
+            mock(AllocationService.class)
         );
 
         final ClusterBlocks blocks = ClusterBlocks.builder()
@@ -83,7 +85,8 @@ public class TransportUpdateDesiredNodesActionTests extends DesiredNodesTestCase
             mock(ThreadPool.class),
             mock(ActionFilters.class),
             mock(IndexNameExpressionResolver.class),
-            NO_OP_SETTINGS_VALIDATOR
+            NO_OP_SETTINGS_VALIDATOR,
+            mock(AllocationService.class)
         );
 
         final ClusterBlocks blocks = ClusterBlocks.builder().build();
@@ -106,7 +109,8 @@ public class TransportUpdateDesiredNodesActionTests extends DesiredNodesTestCase
             mock(ThreadPool.class),
             mock(ActionFilters.class),
             mock(IndexNameExpressionResolver.class),
-            validator
+            validator,
+            mock(AllocationService.class)
         );
 
         final ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(10))).build();

+ 4 - 7
server/src/test/java/org/elasticsearch/cluster/metadata/DesiredNodeSerializationTests.java

@@ -16,8 +16,6 @@ import org.elasticsearch.xcontent.XContentParser;
 
 import java.io.IOException;
 
-import static org.elasticsearch.cluster.metadata.DesiredNodesTestCase.randomProcessorRange;
-
 public class DesiredNodeSerializationTests extends AbstractSerializingTestCase<DesiredNode> {
     @Override
     protected DesiredNode doParseInstance(XContentParser parser) throws IOException {
@@ -52,15 +50,14 @@ public class DesiredNodeSerializationTests extends AbstractSerializingTestCase<D
             );
             case 1 -> new DesiredNode(
                 instance.settings(),
-                randomFloat() + randomIntBetween(1, 128),
+                randomValueOtherThan(instance.processors(), () -> randomFloat() + randomIntBetween(1, 128)),
                 instance.memory(),
                 instance.storage(),
                 instance.version()
             );
-
             case 2 -> new DesiredNode(
                 instance.settings(),
-                randomProcessorRange(),
+                randomValueOtherThan(instance.processorsRange(), DesiredNodesTestCase::randomProcessorRange),
                 instance.memory(),
                 instance.storage(),
                 instance.version()
@@ -69,7 +66,7 @@ public class DesiredNodeSerializationTests extends AbstractSerializingTestCase<D
                 instance.settings(),
                 instance.processors(),
                 instance.processorsRange(),
-                ByteSizeValue.ofGb(randomIntBetween(1, 128)),
+                ByteSizeValue.ofGb(randomValueOtherThan(instance.memory().getGb(), () -> (long) randomIntBetween(1, 128))),
                 instance.storage(),
                 instance.version()
             );
@@ -78,7 +75,7 @@ public class DesiredNodeSerializationTests extends AbstractSerializingTestCase<D
                 instance.processors(),
                 instance.processorsRange(),
                 instance.memory(),
-                ByteSizeValue.ofGb(randomIntBetween(1, 128)),
+                ByteSizeValue.ofGb(randomValueOtherThan(instance.storage().getGb(), () -> (long) randomIntBetween(1, 128))),
                 instance.version()
             );
             case 5 -> new DesiredNode(

+ 2 - 1
x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/storage/ReactiveStorageDeciderService.java

@@ -13,6 +13,7 @@ import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.DiskUsage;
 import org.elasticsearch.cluster.metadata.DataStream;
 import org.elasticsearch.cluster.metadata.DataStreamMetadata;
+import org.elasticsearch.cluster.metadata.DesiredNodes;
 import org.elasticsearch.cluster.metadata.IndexAbstraction;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
@@ -401,7 +402,7 @@ public class ReactiveStorageDeciderService implements AutoscalingDeciderService
             return allocation.metadata().getIndexSafe(shard.index());
         }
 
-        private Optional<String> highestPreferenceTier(List<String> preferredTiers, DiscoveryNodes unused) {
+        private Optional<String> highestPreferenceTier(List<String> preferredTiers, DiscoveryNodes unused, DesiredNodes desiredNodes) {
             assert preferredTiers.isEmpty() == false;
             return Optional.of(preferredTiers.get(0));
         }

+ 315 - 12
x-pack/plugin/core/src/internalClusterTest/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderIT.java

@@ -7,30 +7,51 @@
 
 package org.elasticsearch.xpack.cluster.routing.allocation;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.action.admin.cluster.desirednodes.UpdateDesiredNodesAction;
+import org.elasticsearch.action.admin.cluster.desirednodes.UpdateDesiredNodesRequest;
 import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
 import org.elasticsearch.action.admin.indices.shrink.ResizeType;
 import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
+import org.elasticsearch.cluster.metadata.DesiredNode;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Template;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.routing.allocation.DataTier;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
+import org.elasticsearch.core.Nullable;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.xpack.core.DataTiersFeatureSetUsage;
 import org.elasticsearch.xpack.core.action.XPackUsageRequestBuilder;
 import org.elasticsearch.xpack.core.action.XPackUsageResponse;
+import org.junit.Before;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
+import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_AUTO_EXPAND_REPLICAS_SETTING;
+import static org.elasticsearch.cluster.metadata.IndexMetadata.INDEX_NUMBER_OF_REPLICAS_SETTING;
+import static org.elasticsearch.node.Node.NODE_EXTERNAL_ID_SETTING;
+import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
+import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 
-@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0)
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0, numClientNodes = 0, autoManageMasterNodes = false)
 public class DataTierAllocationDeciderIT extends ESIntegTestCase {
     private static final String index = "myindex";
 
@@ -39,6 +60,13 @@ public class DataTierAllocationDeciderIT extends ESIntegTestCase {
         return Collections.singleton(DataTierTelemetryPlugin.class);
     }
 
+    @Before
+    public void setUpMasterNode() {
+        // Ensure that master nodes cannot hold any data
+        internalCluster().setBootstrapMasterNodeIndex(0);
+        internalCluster().startMasterOnlyNode();
+    }
+
     public void testDefaultIndexAllocateToContent() {
         startWarmOnlyNode();
         startColdOnlyNode();
@@ -67,6 +95,194 @@ public class DataTierAllocationDeciderIT extends ESIntegTestCase {
         ensureYellow(index);
     }
 
+    public void testDesiredNodesAreConsideredDuringAllocation() throws Exception {
+        final var warmDesiredNode = desiredNode(randomAlphaOfLength(10), DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+        final var coldDesiredNode = desiredNode(randomAlphaOfLength(15), DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+        final var masterDesiredNode = desiredNode(internalCluster().getMasterName(), DiscoveryNodeRole.MASTER_ROLE);
+        updateDesiredNodes(warmDesiredNode, coldDesiredNode, masterDesiredNode);
+
+        startWarmOnlyNode(warmDesiredNode.externalId());
+        final var coldNodeName = startColdOnlyNode(coldDesiredNode.externalId());
+
+        createIndexWithTierPreference(DataTier.DATA_COLD, DataTier.DATA_WARM);
+
+        ensureGreen(index);
+
+        assertPrimaryShardIsAllocatedInNodeWithRole(0, DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+
+        // Remove the cold tier
+        updateDesiredNodes(masterDesiredNode, warmDesiredNode);
+
+        assertBusy(() -> assertPrimaryShardIsAllocatedInNodeWithRole(0, DiscoveryNodeRole.DATA_WARM_NODE_ROLE));
+
+        ensureGreen(index);
+    }
+
+    public void testShardsAreKeptInPreferredTierUntilTheNextTierIsInItsFinalState() throws Exception {
+        final var hotDesiredNode = desiredNode("hot-node-0", DiscoveryNodeRole.DATA_HOT_NODE_ROLE);
+        final var warmDesiredNode = desiredNode("warn-node-0", DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+        final var coldDesiredNode = desiredNode("cold-node-0", DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+        final var masterDesiredNode = desiredNode(internalCluster().getMasterName(), DiscoveryNodeRole.MASTER_ROLE);
+        updateDesiredNodes(hotDesiredNode, warmDesiredNode, coldDesiredNode, masterDesiredNode);
+
+        startHotOnlyNode(hotDesiredNode.externalId());
+        startWarmOnlyNode(warmDesiredNode.externalId());
+        startColdOnlyNode(coldDesiredNode.externalId());
+
+        createIndexWithTierPreference(DataTier.DATA_COLD, DataTier.DATA_WARM, DataTier.DATA_HOT);
+
+        ensureGreen(index);
+
+        assertPrimaryShardIsAllocatedInNodeWithRole(0, DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+
+        final List<DesiredNode> newDesiredNodesInLeastPreferredTiers = new ArrayList<>();
+        final var numberOfNewNodes = randomIntBetween(1, 5);
+        for (int i = 1; i <= numberOfNewNodes; i++) {
+            if (randomBoolean()) {
+                newDesiredNodesInLeastPreferredTiers.add(desiredNode("hot-node-" + i, DiscoveryNodeRole.DATA_HOT_NODE_ROLE));
+            } else {
+                newDesiredNodesInLeastPreferredTiers.add(desiredNode("warm-node-" + i, DiscoveryNodeRole.DATA_WARM_NODE_ROLE));
+            }
+        }
+
+        // Remove the cold tier and grow the next preferred tiers
+        final List<DesiredNode> newDesiredNodes = new ArrayList<>(newDesiredNodesInLeastPreferredTiers);
+        newDesiredNodes.add(masterDesiredNode);
+        newDesiredNodes.add(hotDesiredNode);
+        newDesiredNodes.add(warmDesiredNode);
+        updateDesiredNodes(newDesiredNodes);
+
+        ensureGreen(index);
+
+        assertBusy(() -> assertPrimaryShardIsAllocatedInNodeWithRole(0, DiscoveryNodeRole.DATA_COLD_NODE_ROLE));
+
+        for (final var newDesiredNode : newDesiredNodesInLeastPreferredTiers) {
+            if (newDesiredNode.getRoles().contains(DiscoveryNodeRole.DATA_HOT_NODE_ROLE)) {
+                startHotOnlyNode(newDesiredNode.externalId());
+            } else {
+                startWarmOnlyNode(newDesiredNode.externalId());
+            }
+        }
+
+        ensureGreen(index);
+
+        assertBusy(() -> assertPrimaryShardIsAllocatedInNodeWithRole(0, DiscoveryNodeRole.DATA_WARM_NODE_ROLE));
+    }
+
+    public void testSimpleAllocationDecisionWithDesiredNodes() {
+        final var warmDesiredNode = desiredNode("warn-node-0", DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+        final var warmDesiredNode2 = desiredNode("warn-node-1", DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+        final var masterDesiredNode = desiredNode(internalCluster().getMasterName(), DiscoveryNodeRole.MASTER_ROLE);
+        updateDesiredNodes(warmDesiredNode, warmDesiredNode2, masterDesiredNode);
+
+        startWarmOnlyNode(warmDesiredNode.externalId());
+
+        createIndexWithTierPreference(DataTier.DATA_COLD, DataTier.DATA_WARM);
+
+        ensureGreen(index);
+
+        assertPrimaryShardIsAllocatedInNodeWithRole(0, DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+    }
+
+    public void testGrowAndShrinkSingleNodeInTier() throws Exception {
+        final var warmDesiredNode = desiredNode("warm-node", DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+        final var coldDesiredNode = desiredNode("cold-node-1", DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+        final var masterDesiredNode = desiredNode(internalCluster().getMasterName(), DiscoveryNodeRole.MASTER_ROLE);
+        updateDesiredNodes(warmDesiredNode, coldDesiredNode, masterDesiredNode);
+
+        startWarmOnlyNode(warmDesiredNode.externalId());
+        var coldNodeName = startColdOnlyNode(coldDesiredNode.externalId());
+
+        createIndexWithTierPreference(DataTier.DATA_COLD, DataTier.DATA_WARM);
+
+        ensureGreen(index);
+
+        assertPrimaryShardIsAllocatedInNodeWithRole(0, DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+
+        final var newColdDesiredNode = desiredNode("cold-node-2", DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+        updateDesiredNodes(warmDesiredNode, newColdDesiredNode, masterDesiredNode);
+
+        // Exclude the node that we want to decommission, so it can move to the new cold node
+        client().admin()
+            .indices()
+            .prepareUpdateSettings(index)
+            .setSettings(Settings.builder().put("index.routing.allocation.exclude._name", coldNodeName).build())
+            .get();
+
+        assertBusy(() -> assertPrimaryShardIsAllocatedInNodeWithRole(0, DiscoveryNodeRole.DATA_COLD_NODE_ROLE));
+
+        startColdOnlyNode(newColdDesiredNode.externalId());
+
+        ensureGreen(index);
+
+        assertBusy(() -> assertPrimaryShardIsAllocatedInNode(0, newColdDesiredNode));
+
+        internalCluster().stopNode(coldNodeName);
+
+        ensureGreen(index);
+    }
+
+    public void testDesiredNodesAreTakenIntoAccountInAutoExpandReplicas() throws Exception {
+        final var masterDesiredNode = desiredNode(internalCluster().getMasterName(), DiscoveryNodeRole.MASTER_ROLE);
+        final int numberOfColdNodes = randomIntBetween(2, 5);
+        final List<DesiredNode> coldDesiredNodes = new ArrayList<>();
+        for (int i = 0; i < numberOfColdNodes; i++) {
+            final var coldDesiredNode = desiredNode("cold-node-" + i, DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+            coldDesiredNodes.add(coldDesiredNode);
+            startColdOnlyNode(coldDesiredNode.externalId());
+        }
+        final int numberOfWarmNodes = randomIntBetween(numberOfColdNodes + 1, 10);
+        final List<DesiredNode> warmDesiredNodes = new ArrayList<>();
+        for (int i = 0; i < numberOfWarmNodes; i++) {
+            final var warmDesiredNode = desiredNode("warm-node-" + i, DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+            warmDesiredNodes.add(warmDesiredNode);
+            startWarmOnlyNode(warmDesiredNode.externalId());
+        }
+        final List<DesiredNode> desiredNodesWithWarmAndColdTier = new ArrayList<>();
+        desiredNodesWithWarmAndColdTier.addAll(warmDesiredNodes);
+        desiredNodesWithWarmAndColdTier.addAll(coldDesiredNodes);
+        desiredNodesWithWarmAndColdTier.add(masterDesiredNode);
+
+        updateDesiredNodes(desiredNodesWithWarmAndColdTier);
+
+        client().admin()
+            .indices()
+            .prepareCreate(index)
+            .setWaitForActiveShards(0)
+            .setSettings(
+                Settings.builder()
+                    .put(DataTier.TIER_PREFERENCE, String.join(",", DataTier.DATA_COLD, DataTier.DATA_WARM))
+                    .put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
+                    .put(INDEX_AUTO_EXPAND_REPLICAS_SETTING.getKey(), "0-all")
+            )
+            .get();
+
+        var replicas = client().admin()
+            .indices()
+            .prepareGetIndex()
+            .setIndices(index)
+            .get()
+            .getSetting(index, INDEX_NUMBER_OF_REPLICAS_SETTING.getKey());
+
+        assertThat(Integer.parseInt(replicas), is(equalTo(numberOfColdNodes - 1)));
+
+        final List<DesiredNode> desiredNodesWithoutColdTier = new ArrayList<>(warmDesiredNodes);
+        desiredNodesWithoutColdTier.add(masterDesiredNode);
+
+        updateDesiredNodes(desiredNodesWithoutColdTier);
+
+        assertBusy(() -> {
+            var newReplicaCount = client().admin()
+                .indices()
+                .prepareGetIndex()
+                .setIndices(index)
+                .get()
+                .getSetting(index, INDEX_NUMBER_OF_REPLICAS_SETTING.getKey());
+
+            assertThat(Integer.parseInt(newReplicaCount), is(equalTo(numberOfWarmNodes - 1)));
+        });
+    }
+
     public void testOverrideDefaultAllocation() {
         startWarmOnlyNode();
         startColdOnlyNode();
@@ -293,27 +509,50 @@ public class DataTierAllocationDeciderIT extends ESIntegTestCase {
     }
 
     public void startHotOnlyNode() {
-        Settings nodeSettings = Settings.builder()
+        startHotOnlyNode(null);
+    }
+
+    public void startHotOnlyNode(@Nullable String externalId) {
+        Settings.Builder nodeSettings = Settings.builder()
             .putList("node.roles", Arrays.asList("master", "data_hot", "ingest"))
-            .put("node.attr.box", "hot")
-            .build();
+            .put("node.attr.box", "hot");
+
+        if (externalId != null) {
+            nodeSettings.put(NODE_EXTERNAL_ID_SETTING.getKey(), externalId);
+        }
+
         internalCluster().startNode(nodeSettings);
     }
 
     public void startWarmOnlyNode() {
-        Settings nodeSettings = Settings.builder()
+        startWarmOnlyNode(null);
+    }
+
+    public String startWarmOnlyNode(@Nullable String externalId) {
+        Settings.Builder nodeSettings = Settings.builder()
             .putList("node.roles", Arrays.asList("master", "data_warm", "ingest"))
-            .put("node.attr.box", "warm")
-            .build();
-        internalCluster().startNode(nodeSettings);
+            .put("node.attr.box", "warm");
+
+        if (externalId != null) {
+            nodeSettings.put(NODE_EXTERNAL_ID_SETTING.getKey(), externalId);
+        }
+        return internalCluster().startNode(nodeSettings);
     }
 
     public void startColdOnlyNode() {
-        Settings nodeSettings = Settings.builder()
+        startColdOnlyNode(null);
+    }
+
+    public String startColdOnlyNode(@Nullable String externalId) {
+        Settings.Builder nodeSettings = Settings.builder()
             .putList("node.roles", Arrays.asList("master", "data_cold", "ingest"))
-            .put("node.attr.box", "cold")
-            .build();
-        internalCluster().startNode(nodeSettings);
+            .put("node.attr.box", "cold");
+
+        if (externalId != null) {
+            nodeSettings.put(NODE_EXTERNAL_ID_SETTING.getKey(), externalId);
+        }
+
+        return internalCluster().startNode(nodeSettings);
     }
 
     public void startFrozenOnlyNode() {
@@ -323,4 +562,68 @@ public class DataTierAllocationDeciderIT extends ESIntegTestCase {
             .build();
         internalCluster().startNode(nodeSettings);
     }
+
+    private DesiredNode desiredNode(String externalId, DiscoveryNodeRole... roles) {
+        assertThat(roles.length, is(greaterThan(0)));
+
+        final var nodeRoles = Arrays.stream(roles).map(DiscoveryNodeRole::roleName).collect(Collectors.joining(","));
+        final var settings = Settings.builder()
+            .put(NODE_ROLES_SETTING.getKey(), nodeRoles)
+            .put(NODE_EXTERNAL_ID_SETTING.getKey(), externalId)
+            .put(NODE_NAME_SETTING.getKey(), externalId)
+            .build();
+        return new DesiredNode(settings, 1, ByteSizeValue.ONE, ByteSizeValue.ONE, Version.CURRENT);
+    }
+
+    private void updateDesiredNodes(DesiredNode... desiredNodes) {
+        assertThat(desiredNodes.length, is(greaterThan(0)));
+        updateDesiredNodes(Arrays.asList(desiredNodes));
+    }
+
+    private void updateDesiredNodes(List<DesiredNode> desiredNodes) {
+        assertThat(desiredNodes.size(), is(greaterThan(0)));
+
+        final var request = new UpdateDesiredNodesRequest(randomAlphaOfLength(10), 1, desiredNodes);
+        internalCluster().client().execute(UpdateDesiredNodesAction.INSTANCE, request).actionGet();
+    }
+
+    private void assertPrimaryShardIsAllocatedInNodeWithRole(int shard, DiscoveryNodeRole expectedRole) {
+        final var discoveryNode = getPrimaryShardAssignedNode(shard);
+        assertThat(explainAllocation(shard), discoveryNode.getRoles().contains(expectedRole), is(true));
+    }
+
+    private void assertPrimaryShardIsAllocatedInNode(int shard, DesiredNode expectedNode) {
+        final var discoveryNode = getPrimaryShardAssignedNode(shard);
+        assertThat(explainAllocation(shard), discoveryNode.getExternalId(), is(equalTo(expectedNode.externalId())));
+    }
+
+    private DiscoveryNode getPrimaryShardAssignedNode(int shard) {
+        final var state = client().admin().cluster().prepareState().get().getState();
+        final var routingTable = state.routingTable().index(index).shard(shard);
+        final var primaryShard = routingTable.primaryShard();
+        final var discoveryNode = state.nodes().get(primaryShard.currentNodeId());
+        assertThat(discoveryNode, is(notNullValue()));
+        return discoveryNode;
+    }
+
+    private String explainAllocation(int shard) {
+        return Strings.toString(
+            client().admin().cluster().prepareAllocationExplain().setIndex(index).setShard(shard).setPrimary(true).get().getExplanation(),
+            true,
+            true
+        );
+    }
+
+    private void createIndexWithTierPreference(String... tiers) {
+        assertThat(tiers.length, is(greaterThan(0)));
+
+        client().admin()
+            .indices()
+            .prepareCreate(index)
+            .setWaitForActiveShards(0)
+            .setSettings(
+                Settings.builder().put(DataTier.TIER_PREFERENCE, String.join(",", tiers)).put(INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0)
+            )
+            .get();
+    }
 }

+ 79 - 10
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDecider.java

@@ -7,6 +7,8 @@
 
 package org.elasticsearch.xpack.cluster.routing.allocation;
 
+import org.elasticsearch.cluster.metadata.DesiredNode;
+import org.elasticsearch.cluster.metadata.DesiredNodes;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
@@ -19,6 +21,7 @@ import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.common.Strings;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
@@ -65,7 +68,7 @@ public final class DataTierAllocationDecider extends AllocationDecider {
     }
 
     public interface PreferredTierFunction {
-        Optional<String> apply(List<String> tierPreference, DiscoveryNodes nodes);
+        Optional<String> apply(List<String> tierPreference, DiscoveryNodes nodes, DesiredNodes desiredNodes);
     }
 
     private static final Decision YES_PASSES = Decision.single(Decision.YES.type(), NAME, "node passes tier preference filters");
@@ -80,7 +83,7 @@ public final class DataTierAllocationDecider extends AllocationDecider {
         if (tierPreference.isEmpty() != false) {
             return YES_PASSES;
         }
-        Optional<String> tier = preferredTierFunction.apply(tierPreference, allocation.nodes());
+        Optional<String> tier = preferredTierFunction.apply(tierPreference, allocation.nodes(), allocation.desiredNodes());
         if (tier.isPresent()) {
             String tierName = tier.get();
             if (allocationAllowed(tierName, roles)) {
@@ -133,9 +136,74 @@ public final class DataTierAllocationDecider extends AllocationDecider {
      * Given a string of comma-separated prioritized tiers (highest priority
      * first) and an allocation, find the highest priority tier for which nodes
      * exist. If no nodes for any of the tiers are available, returns an empty
-     * {@code Optional<String>}.
+     * {@code Optional<String>}. This method takes into account the desired nodes
+     * in order to know if there are planned topology changes in the cluster
+     * that can remove a tier that's part of the cluster now.
      */
-    public static Optional<String> preferredAvailableTier(List<String> prioritizedTiers, DiscoveryNodes nodes) {
+    public static Optional<String> preferredAvailableTier(List<String> prioritizedTiers, DiscoveryNodes nodes, DesiredNodes desiredNodes) {
+        final var desiredNodesPreferredTier = getPreferredTierFromDesiredNodes(prioritizedTiers, nodes, desiredNodes);
+
+        if (desiredNodesPreferredTier.isPresent()) {
+            return desiredNodesPreferredTier;
+        }
+
+        return getPreferredAvailableTierFromClusterMembers(prioritizedTiers, nodes);
+    }
+
+    /**
+     * Given a list of tiers in descending order, return the tier that's present
+     * in the desired nodes with the highest priority, if none is present returns an
+     * {@code Optional.empty()}.
+     */
+    public static Optional<String> getPreferredTierFromDesiredNodes(
+        List<String> prioritizedTiers,
+        DiscoveryNodes discoveryNodes,
+        DesiredNodes desiredNodes
+    ) {
+        if (desiredNodes == null) {
+            return Optional.empty();
+        }
+
+        for (int tierIndex = 0; tierIndex < prioritizedTiers.size(); tierIndex++) {
+            final var tier = prioritizedTiers.get(tierIndex);
+            if (tierNodesPresent(tier, desiredNodes.actualized())
+                || isDesiredNodeWithinTierJoining(tier, discoveryNodes, desiredNodes)
+                || nextTierIsGrowingAndCurrentTierCanHoldTheIndex(prioritizedTiers, tierIndex, discoveryNodes, desiredNodes)) {
+                return Optional.of(tier);
+            }
+        }
+        return Optional.empty();
+    }
+
+    private static boolean nextTierIsGrowingAndCurrentTierCanHoldTheIndex(
+        List<String> prioritizedTiers,
+        int tierIndex,
+        DiscoveryNodes discoveryNodes,
+        DesiredNodes desiredNodes
+    ) {
+        final var tier = prioritizedTiers.get(tierIndex);
+        assert tierNodesPresent(tier, desiredNodes.actualized()) == false;
+        // If there's a plan to grow the next preferred tier, and it hasn't materialized yet,
+        // wait until all the nodes in the next tier have joined. This would avoid overwhelming
+        // the next tier if within the same plan one tier is removed and the next preferred tier
+        // grows.
+        boolean nextPreferredTierIsGrowing = false;
+        for (int i = tierIndex + 1; i < prioritizedTiers.size(); i++) {
+            final var nextTier = prioritizedTiers.get(i);
+            nextPreferredTierIsGrowing |= tierNodesPresent(nextTier, desiredNodes.pending());
+        }
+        return tierNodesPresent(tier, discoveryNodes) && nextPreferredTierIsGrowing;
+    }
+
+    private static boolean isDesiredNodeWithinTierJoining(String tier, DiscoveryNodes discoveryNodes, DesiredNodes desiredNodes) {
+        assert tierNodesPresent(tier, desiredNodes.actualized()) == false;
+        // Take into account the case when the desired nodes have been updated and the node in the tier would be replaced by
+        // a new one. In that case the desired node in the tier won't be actualized as it has to join, but we still need to ensure
+        // that at least one cluster member has the requested tier as we would prefer to minimize the shard movements in these cases.
+        return tierNodesPresent(tier, desiredNodes.pending()) && tierNodesPresent(tier, discoveryNodes);
+    }
+
+    private static Optional<String> getPreferredAvailableTierFromClusterMembers(List<String> prioritizedTiers, DiscoveryNodes nodes) {
         for (String tier : prioritizedTiers) {
             if (tierNodesPresent(tier, nodes)) {
                 return Optional.of(tier);
@@ -144,15 +212,16 @@ public final class DataTierAllocationDecider extends AllocationDecider {
         return Optional.empty();
     }
 
+    static boolean tierNodesPresent(String singleTier, Collection<DesiredNode> nodes) {
+        assert singleTier.equals(DiscoveryNodeRole.DATA_ROLE.roleName()) || DataTier.validTierName(singleTier)
+            : "tier " + singleTier + " is an invalid tier name";
+        return nodes.stream().anyMatch(node -> allocationAllowed(singleTier, node.getRoles()));
+    }
+
     static boolean tierNodesPresent(String singleTier, DiscoveryNodes nodes) {
         assert singleTier.equals(DiscoveryNodeRole.DATA_ROLE.roleName()) || DataTier.validTierName(singleTier)
             : "tier " + singleTier + " is an invalid tier name";
-        for (DiscoveryNode node : nodes) {
-            if (allocationAllowed(singleTier, node.getRoles())) {
-                return true;
-            }
-        }
-        return false;
+        return nodes.stream().anyMatch(node -> allocationAllowed(singleTier, node.getRoles()));
     }
 
     private static boolean allocationAllowed(String tierName, Set<DiscoveryNodeRole> roles) {

+ 3 - 1
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/DataTierMigrationRoutedStep.java

@@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.elasticsearch.action.support.ActiveShardCount;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DesiredNodes;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.index.Index;
@@ -54,7 +55,8 @@ public class DataTierMigrationRoutedStep extends ClusterStateWaitStep {
         List<String> preferredTierConfiguration = idxMeta.getTierPreference();
         Optional<String> availableDestinationTier = DataTierAllocationDecider.preferredAvailableTier(
             preferredTierConfiguration,
-            clusterState.getNodes()
+            clusterState.getNodes(),
+            DesiredNodes.latestFromClusterState(clusterState)
         );
 
         if (ActiveShardCount.ALL.enoughShardsActive(clusterState, index.getName()) == false) {

+ 6 - 2
x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ilm/WaitForDataTierStep.java

@@ -8,6 +8,7 @@
 package org.elasticsearch.xpack.core.ilm;
 
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.DesiredNodes;
 import org.elasticsearch.cluster.routing.allocation.DataTier;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.xpack.cluster.routing.allocation.DataTierAllocationDecider;
@@ -33,8 +34,11 @@ public class WaitForDataTierStep extends ClusterStateWaitStep {
 
     @Override
     public Result isConditionMet(Index index, ClusterState clusterState) {
-        boolean present = DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList(tierPreference), clusterState.nodes())
-            .isPresent();
+        boolean present = DataTierAllocationDecider.preferredAvailableTier(
+            DataTier.parseTierList(tierPreference),
+            clusterState.nodes(),
+            DesiredNodes.latestFromClusterState(clusterState)
+        ).isPresent();
         SingleMessageFieldInfo info = present ? null : new SingleMessageFieldInfo("no nodes for tiers [" + tierPreference + "] available");
         return new Result(present, info);
     }

+ 447 - 142
x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/cluster/routing/allocation/DataTierAllocationDeciderTests.java

@@ -10,23 +10,24 @@ package org.elasticsearch.xpack.cluster.routing.allocation;
 import joptsimple.internal.Strings;
 
 import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterName;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.ESAllocationTestCase;
-import org.elasticsearch.cluster.EmptyClusterInfoService;
+import org.elasticsearch.cluster.metadata.DesiredNode;
+import org.elasticsearch.cluster.metadata.DesiredNodeWithStatus;
+import org.elasticsearch.cluster.metadata.DesiredNodes;
+import org.elasticsearch.cluster.metadata.DesiredNodesMetadata;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.cluster.metadata.Metadata;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.RecoverySource;
-import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.RoutingNodesHelper;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
-import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.cluster.routing.allocation.DataTier;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
-import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
 import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
 import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider;
@@ -35,20 +36,24 @@ import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.settings.ClusterSettings;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.index.IndexModule;
 import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.snapshots.EmptySnapshotsInfoService;
 import org.elasticsearch.snapshots.SearchableSnapshotsSettings;
-import org.elasticsearch.test.gateway.TestGatewayAllocator;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Locale;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.cluster.routing.allocation.DataTier.DATA_COLD;
 import static org.elasticsearch.cluster.routing.allocation.DataTier.DATA_FROZEN;
+import static org.elasticsearch.node.Node.NODE_EXTERNAL_ID_SETTING;
+import static org.elasticsearch.node.NodeRoleSettings.NODE_ROLES_SETTING;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
@@ -63,6 +68,12 @@ public class DataTierAllocationDeciderTests extends ESAllocationTestCase {
     );
     private static final DiscoveryNode DATA_NODE = newNode("node-data", Collections.singleton(DiscoveryNodeRole.DATA_ROLE));
 
+    private static final DesiredNode HOT_DESIRED_NODE = newDesiredNode("node-hot", DiscoveryNodeRole.DATA_HOT_NODE_ROLE);
+    private static final DesiredNode WARM_DESIRED_NODE = newDesiredNode("node-warm", DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+    private static final DesiredNode COLD_DESIRED_NODE = newDesiredNode("node-cold", DiscoveryNodeRole.DATA_COLD_NODE_ROLE);
+    private static final DesiredNode CONTENT_DESIRED_NODE = newDesiredNode("node-content", DiscoveryNodeRole.DATA_CONTENT_NODE_ROLE);
+    private static final DesiredNode DATA_DESIRED_NODE = newDesiredNode("node-data", DiscoveryNodeRole.DATA_ROLE);
+
     private final ClusterSettings clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
     private final AllocationDeciders allocationDeciders = new AllocationDeciders(
         Arrays.asList(
@@ -71,13 +82,6 @@ public class DataTierAllocationDeciderTests extends ESAllocationTestCase {
             new ReplicaAfterPrimaryActiveAllocationDecider()
         )
     );
-    private final AllocationService service = new AllocationService(
-        allocationDeciders,
-        new TestGatewayAllocator(),
-        new BalancedShardsAllocator(Settings.EMPTY),
-        EmptyClusterInfoService.INSTANCE,
-        EmptySnapshotsInfoService.INSTANCE
-    );
 
     private final ShardRouting shard = ShardRouting.newUnassigned(
         new ShardId("myindex", "myindex", 0),
@@ -87,113 +91,111 @@ public class DataTierAllocationDeciderTests extends ESAllocationTestCase {
     );
 
     public void testIndexPrefer() {
-        ClusterState state = ClusterState.builder(service.reroute(ClusterState.EMPTY_STATE, "initial state"))
-            .nodes(DiscoveryNodes.builder().add(HOT_NODE).build())
-            .metadata(
-                Metadata.builder()
-                    .put(
-                        IndexMetadata.builder("myindex")
-                            .settings(
-                                Settings.builder()
-                                    .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
-                                    .put(IndexMetadata.SETTING_INDEX_UUID, "myindex")
-                                    .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
-                                    .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
-                                    .put(DataTier.TIER_PREFERENCE, "data_warm,data_cold")
-                                    .build()
-                            )
-                    )
-                    .build()
-            )
-            .build();
-        RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
-        allocation.debugDecision(true);
-        Decision d;
-        RoutingNode node;
+        {
+            final var desiredNodes = randomBoolean() ? null : createDesiredNodesWithActualizedNodes(HOT_DESIRED_NODE);
+            final var clusterState = clusterStateWithIndexAndNodes(
+                "data_warm,data_cold",
+                DiscoveryNodes.builder().add(HOT_NODE).build(),
+                desiredNodes
+            );
 
-        for (DiscoveryNode n : Arrays.asList(HOT_NODE, WARM_NODE, COLD_NODE)) {
-            node = RoutingNodesHelper.routingNode(n.getId(), n, shard);
-            d = DataTierAllocationDecider.INSTANCE.canAllocate(shard, node, allocation);
-            assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO));
-            assertThat(
-                node.toString(),
-                d.getExplanation(),
-                containsString(
+            for (DiscoveryNode n : Arrays.asList(HOT_NODE, WARM_NODE, COLD_NODE)) {
+                assertAllocationDecision(
+                    clusterState,
+                    n,
+                    Decision.Type.NO,
                     "index has a preference for tiers [data_warm,data_cold], "
                         + "but no nodes for any of those tiers are available in the cluster"
-                )
+                );
+            }
+        }
+
+        {
+            final var desiredNodes = randomBoolean() ? null : createDesiredNodesWithActualizedNodes(HOT_DESIRED_NODE, COLD_DESIRED_NODE);
+            final var clusterState = clusterStateWithIndexAndNodes(
+                "data_warm,data_cold",
+                DiscoveryNodes.builder().add(HOT_NODE).add(COLD_NODE).build(),
+                desiredNodes
             );
-            d = DataTierAllocationDecider.INSTANCE.canRemain(shard, node, allocation);
-            assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO));
-            assertThat(
-                node.toString(),
-                d.getExplanation(),
-                containsString(
-                    "index has a preference for tiers [data_warm,data_cold], "
-                        + "but no nodes for any of those tiers are available in the cluster"
-                )
+
+            for (DiscoveryNode n : Arrays.asList(HOT_NODE, WARM_NODE)) {
+                assertAllocationDecision(
+                    clusterState,
+                    n,
+                    Decision.Type.NO,
+                    "index has a preference for tiers [data_warm,data_cold] and node does not meet the required [data_cold] tier"
+                );
+            }
+
+            assertAllocationDecision(
+                clusterState,
+                COLD_NODE,
+                Decision.Type.YES,
+                "index has a preference for tiers [data_warm,data_cold] and node has tier [data_cold]"
             );
         }
 
-        state = ClusterState.builder(service.reroute(ClusterState.EMPTY_STATE, "initial state"))
-            .nodes(DiscoveryNodes.builder().add(HOT_NODE).add(COLD_NODE).build())
-            .metadata(
-                Metadata.builder()
-                    .put(
-                        IndexMetadata.builder("myindex")
-                            .settings(
-                                Settings.builder()
-                                    .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
-                                    .put(IndexMetadata.SETTING_INDEX_UUID, "myindex")
-                                    .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
-                                    .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
-                                    .put(DataTier.TIER_PREFERENCE, "data_warm,data_cold")
-                                    .build()
-                            )
-                    )
-                    .build()
-            )
-            .build();
-        allocation = new RoutingAllocation(allocationDeciders, state, null, null, 0);
-        allocation.debugDecision(true);
-
-        for (DiscoveryNode n : Arrays.asList(HOT_NODE, WARM_NODE)) {
-            node = RoutingNodesHelper.routingNode(n.getId(), n, shard);
-            d = DataTierAllocationDecider.INSTANCE.canAllocate(shard, node, allocation);
-            assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO));
-            assertThat(
-                node.toString(),
-                d.getExplanation(),
-                containsString(
-                    "index has a preference for tiers [data_warm,data_cold] " + "and node does not meet the required [data_cold] tier"
-                )
+        {
+            // Remove the cold tier from desired nodes
+            final var desiredNodes = createDesiredNodesWithActualizedNodes(WARM_DESIRED_NODE);
+            final var state = clusterStateWithIndexAndNodes(
+                "data_cold,data_warm",
+                DiscoveryNodes.builder().add(WARM_NODE).add(COLD_NODE).build(),
+                desiredNodes
             );
-            d = DataTierAllocationDecider.INSTANCE.canRemain(shard, node, allocation);
-            assertThat(node.toString(), d.type(), equalTo(Decision.Type.NO));
-            assertThat(
-                node.toString(),
-                d.getExplanation(),
-                containsString(
-                    "index has a preference for tiers [data_warm,data_cold] " + "and node does not meet the required [data_cold] tier"
-                )
+
+            for (DiscoveryNode node : List.of(HOT_NODE, COLD_NODE)) {
+                assertAllocationDecision(
+                    state,
+                    node,
+                    Decision.Type.NO,
+                    "index has a preference for tiers [data_cold,data_warm] and node does not meet the required [data_warm] tier"
+                );
+            }
+
+            assertAllocationDecision(
+                state,
+                WARM_NODE,
+                Decision.Type.YES,
+                "index has a preference for tiers [data_cold,data_warm] and node has tier [data_warm]"
             );
         }
 
-        node = RoutingNodesHelper.routingNode(COLD_NODE.getId(), COLD_NODE, shard);
-        d = DataTierAllocationDecider.INSTANCE.canAllocate(shard, node, allocation);
-        assertThat(node.toString(), d.type(), equalTo(Decision.Type.YES));
-        assertThat(
-            node.toString(),
-            d.getExplanation(),
-            containsString("index has a preference for tiers [data_warm,data_cold] and node has tier [data_cold]")
-        );
-        d = DataTierAllocationDecider.INSTANCE.canRemain(shard, node, allocation);
-        assertThat(node.toString(), d.type(), equalTo(Decision.Type.YES));
-        assertThat(
-            node.toString(),
-            d.getExplanation(),
-            containsString("index has a preference for tiers [data_warm,data_cold] and node has tier [data_cold]")
-        );
+        {
+            // There's a warm node in the desired nodes, but it hasn't joined the cluster yet,
+            // in that case we consider that there aren't any nodes with the preferred tier in the cluster
+            final ClusterState clusterState;
+            final String tierPreference;
+            if (randomBoolean()) {
+                tierPreference = "data_warm,data_cold";
+                clusterState = clusterStateWithIndexAndNodes(
+                    tierPreference,
+                    DiscoveryNodes.builder().add(HOT_NODE).build(),
+                    DesiredNodes.create("history", 1, List.of(pendingDesiredNode(WARM_DESIRED_NODE)))
+                );
+            } else {
+                tierPreference = "data_warm,data_hot";
+                clusterState = clusterStateWithIndexAndNodes(
+                    tierPreference,
+                    DiscoveryNodes.builder().add(COLD_NODE).build(),
+                    DesiredNodes.create("history", 1, List.of(pendingDesiredNode(WARM_DESIRED_NODE)))
+                );
+            }
+
+            for (DiscoveryNode node : List.of(HOT_NODE, WARM_NODE, COLD_NODE)) {
+                assertAllocationDecision(
+                    clusterState,
+                    node,
+                    Decision.Type.NO,
+                    String.format(
+                        Locale.ROOT,
+                        "index has a preference for tiers [%s], but no nodes for any of those tiers are available in the cluster",
+                        tierPreference
+                    )
+                );
+            }
+
+        }
     }
 
     public void testTierNodesPresent() {
@@ -222,43 +224,269 @@ public class DataTierAllocationDeciderTests extends ESAllocationTestCase {
         assertTrue(DataTierAllocationDecider.tierNodesPresent("data_content", nodes));
     }
 
+    public void testTierNodesPresentDesiredNodes() {
+        Set<DesiredNode> nodes = Collections.emptySet();
+
+        assertFalse(DataTierAllocationDecider.tierNodesPresent("data", nodes));
+        assertFalse(DataTierAllocationDecider.tierNodesPresent("data_hot", nodes));
+        assertFalse(DataTierAllocationDecider.tierNodesPresent("data_warm", nodes));
+        assertFalse(DataTierAllocationDecider.tierNodesPresent("data_cold", nodes));
+        assertFalse(DataTierAllocationDecider.tierNodesPresent("data_content", nodes));
+
+        nodes = Set.of(WARM_DESIRED_NODE, CONTENT_DESIRED_NODE);
+
+        assertFalse(DataTierAllocationDecider.tierNodesPresent("data", nodes));
+        assertFalse(DataTierAllocationDecider.tierNodesPresent("data_hot", nodes));
+        assertTrue(DataTierAllocationDecider.tierNodesPresent("data_warm", nodes));
+        assertFalse(DataTierAllocationDecider.tierNodesPresent("data_cold", nodes));
+        assertTrue(DataTierAllocationDecider.tierNodesPresent("data_content", nodes));
+
+        nodes = Set.of(DATA_DESIRED_NODE);
+
+        assertTrue(DataTierAllocationDecider.tierNodesPresent("data", nodes));
+        assertTrue(DataTierAllocationDecider.tierNodesPresent("data_hot", nodes));
+        assertTrue(DataTierAllocationDecider.tierNodesPresent("data_warm", nodes));
+        assertTrue(DataTierAllocationDecider.tierNodesPresent("data_cold", nodes));
+        assertTrue(DataTierAllocationDecider.tierNodesPresent("data_content", nodes));
+    }
+
     public void testPreferredTierAvailable() {
-        DiscoveryNodes nodes = DiscoveryNodes.builder().build();
+        {
+            final var nodes = DiscoveryNodes.builder().build();
+            final DesiredNodes desiredNodes = randomBoolean()
+                ? null
+                : createDesiredNodesWithPendingNodes(HOT_DESIRED_NODE, WARM_DESIRED_NODE, COLD_DESIRED_NODE);
 
-        assertThat(DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data"), nodes), equalTo(Optional.empty()));
-        assertThat(
-            DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_hot,data_warm"), nodes),
-            equalTo(Optional.empty())
-        );
-        assertThat(
-            DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_warm,data_content"), nodes),
-            equalTo(Optional.empty())
-        );
-        assertThat(DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_cold"), nodes), equalTo(Optional.empty()));
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data"), nodes, desiredNodes),
+                equalTo(Optional.empty())
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_hot,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.empty())
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_warm,data_content"), nodes, desiredNodes),
+                equalTo(Optional.empty())
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_cold"), nodes, desiredNodes),
+                equalTo(Optional.empty())
+            );
+        }
 
-        nodes = DiscoveryNodes.builder().add(WARM_NODE).add(CONTENT_NODE).build();
+        {
+            final var nodes = DiscoveryNodes.builder().add(WARM_NODE).add(CONTENT_NODE).build();
+            final var desiredNodes = randomBoolean()
+                ? null
+                : createDesiredNodesWithActualizedNodes(WARM_DESIRED_NODE, CONTENT_DESIRED_NODE);
 
-        assertThat(DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data"), nodes), equalTo(Optional.empty()));
-        assertThat(
-            DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_hot,data_warm"), nodes),
-            equalTo(Optional.of("data_warm"))
-        );
-        assertThat(
-            DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_warm,data_content"), nodes),
-            equalTo(Optional.of("data_warm"))
-        );
-        assertThat(
-            DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_content,data_warm"), nodes),
-            equalTo(Optional.of("data_content"))
-        );
-        assertThat(
-            DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_hot,data_content,data_warm"), nodes),
-            equalTo(Optional.of("data_content"))
-        );
-        assertThat(
-            DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_hot,data_cold,data_warm"), nodes),
-            equalTo(Optional.of("data_warm"))
-        );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data"), nodes, desiredNodes),
+                equalTo(Optional.empty())
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_hot,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.of("data_warm"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_warm,data_content"), nodes, desiredNodes),
+                equalTo(Optional.of("data_warm"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_content,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.of("data_content"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(
+                    DataTier.parseTierList("data_hot,data_content,data_warm"),
+                    nodes,
+                    desiredNodes
+                ),
+                equalTo(Optional.of("data_content"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(
+                    DataTier.parseTierList("data_hot,data_cold,data_warm"),
+                    nodes,
+                    desiredNodes
+                ),
+                equalTo(Optional.of("data_warm"))
+            );
+        }
+
+        {
+            final var nodes = DiscoveryNodes.builder().add(WARM_NODE).add(CONTENT_NODE).build();
+            final var desiredNodes = createDesiredNodesWithActualizedNodes(HOT_DESIRED_NODE, WARM_DESIRED_NODE, CONTENT_DESIRED_NODE);
+
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data"), nodes, desiredNodes),
+                equalTo(Optional.empty())
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_hot,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.of("data_hot"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_warm,data_content"), nodes, desiredNodes),
+                equalTo(Optional.of("data_warm"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_content,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.of("data_content"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(
+                    DataTier.parseTierList("data_hot,data_content,data_warm"),
+                    nodes,
+                    desiredNodes
+                ),
+                equalTo(Optional.of("data_hot"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(
+                    DataTier.parseTierList("data_hot,data_cold,data_warm"),
+                    nodes,
+                    desiredNodes
+                ),
+                equalTo(Optional.of("data_hot"))
+            );
+        }
+
+        {
+            // When there are desired nodes that haven't joined the cluster yet, those are not considered
+            final var nodes = DiscoveryNodes.builder().add(WARM_NODE).add(CONTENT_NODE).build();
+            // i.e. HOT_DESIRED_NODE might be part of the DesiredNodes, but it is not part of the cluster yet
+            final var desiredNodes = DesiredNodes.create(
+                randomAlphaOfLength(10),
+                1,
+                List.of(
+                    pendingDesiredNode(HOT_DESIRED_NODE),
+                    actualizedDesiredNode(WARM_DESIRED_NODE),
+                    actualizedDesiredNode(CONTENT_DESIRED_NODE)
+                )
+            );
+
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data"), nodes, desiredNodes),
+                equalTo(Optional.empty())
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_hot,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.of("data_warm"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_warm,data_content"), nodes, desiredNodes),
+                equalTo(Optional.of("data_warm"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_content,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.of("data_content"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(
+                    DataTier.parseTierList("data_hot,data_content,data_warm"),
+                    nodes,
+                    desiredNodes
+                ),
+                equalTo(Optional.of("data_content"))
+            );
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(
+                    DataTier.parseTierList("data_hot,data_cold,data_warm"),
+                    nodes,
+                    desiredNodes
+                ),
+                equalTo(Optional.of("data_warm"))
+            );
+        }
+
+        {
+            // Cold tier is planned to be removed
+            final var nodes = DiscoveryNodes.builder().add(HOT_NODE).add(WARM_NODE).add(COLD_NODE).build();
+            final var desiredNodes = createDesiredNodesWithActualizedNodes(HOT_DESIRED_NODE, WARM_DESIRED_NODE);
+
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_cold,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.of("data_warm"))
+            );
+        }
+
+        {
+            // During grow and shrink (i.e. a way to replace a node) we should avoid moving the shard from a preferred tier to a less
+            // preferred tier if there's a node that can hold that shard and we know that a new desired node would substitute the old one
+            final var nodes = DiscoveryNodes.builder().add(HOT_NODE).add(WARM_NODE).add(COLD_NODE).build();
+            final var desiredNodes = DesiredNodes.create(
+                "history",
+                1,
+                List.of(
+                    actualizedDesiredNode(HOT_DESIRED_NODE),
+                    actualizedDesiredNode(WARM_DESIRED_NODE),
+                    pendingDesiredNode(COLD_DESIRED_NODE)
+                )
+            );
+
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(DataTier.parseTierList("data_cold,data_warm"), nodes, desiredNodes),
+                equalTo(Optional.of("data_cold"))
+            );
+        }
+
+        {
+            // Ensure that when we are removing a tier and growing the next preferred tier we wait until all the new
+            // nodes have joined the cluster avoiding filling the new nodes with shards from the removed tier
+            final var nodes = DiscoveryNodes.builder().add(HOT_NODE).add(WARM_NODE).add(COLD_NODE).build();
+            final DesiredNodes desiredNodes;
+            // Grow any of the next preferred tiers
+            if (randomBoolean()) {
+                final var newWarmNode = newDesiredNode("node-warm-2", DiscoveryNodeRole.DATA_WARM_NODE_ROLE);
+                desiredNodes = DesiredNodes.create(
+                    "history",
+                    1,
+                    List.of(
+                        actualizedDesiredNode(HOT_DESIRED_NODE),
+                        actualizedDesiredNode(WARM_DESIRED_NODE),
+                        pendingDesiredNode(newWarmNode)
+                    )
+                );
+            } else {
+                final var newHotNode = newDesiredNode("node-hot-2", DiscoveryNodeRole.DATA_HOT_NODE_ROLE);
+                desiredNodes = DesiredNodes.create(
+                    "history",
+                    1,
+                    List.of(
+                        actualizedDesiredNode(HOT_DESIRED_NODE),
+                        pendingDesiredNode(newHotNode),
+                        actualizedDesiredNode(WARM_DESIRED_NODE)
+                    )
+                );
+            }
+
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(
+                    DataTier.parseTierList("data_cold,data_warm,data_hot"),
+                    nodes,
+                    desiredNodes
+                ),
+                equalTo(Optional.of("data_cold"))
+            );
+
+            // Once all the nodes have joined, we can move the shard to the next tier
+            final var updatedDesiredNodes = DesiredNodes.create(
+                "history",
+                2,
+                desiredNodes.nodes().stream().map(DesiredNodeWithStatus::desiredNode).map(this::actualizedDesiredNode).toList()
+            );
+
+            assertThat(
+                DataTierAllocationDecider.preferredAvailableTier(
+                    DataTier.parseTierList("data_cold,data_warm,data_hot"),
+                    nodes,
+                    updatedDesiredNodes
+                ),
+                equalTo(Optional.of("data_warm"))
+            );
+        }
     }
 
     public void testFrozenIllegalForRegularIndices() {
@@ -354,4 +582,81 @@ public class DataTierAllocationDeciderTests extends ESAllocationTestCase {
         Settings settings = builder.build();
         assertThat(DataTier.TIER_PREFERENCE_SETTING.get(settings), equalTo(DATA_FROZEN));
     }
+
+    private ClusterState clusterStateWithIndexAndNodes(String tierPreference, DiscoveryNodes discoveryNodes, DesiredNodes desiredNodes) {
+        final Metadata.Builder metadata = Metadata.builder()
+            .put(
+                IndexMetadata.builder(shard.getIndexName())
+                    .settings(
+                        Settings.builder()
+                            .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
+                            .put(IndexMetadata.SETTING_INDEX_UUID, shard.getIndexName())
+                            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+                            .put(DataTier.TIER_PREFERENCE, tierPreference)
+                            .build()
+                    )
+            );
+        if (desiredNodes != null) {
+            metadata.putCustom(DesiredNodesMetadata.TYPE, new DesiredNodesMetadata(desiredNodes));
+        }
+        return ClusterState.builder(new ClusterName("test")).nodes(discoveryNodes).metadata(metadata).build();
+    }
+
+    private static DesiredNode newDesiredNode(String externalId, DiscoveryNodeRole... roles) {
+        assert roles.length > 0;
+
+        return new DesiredNode(
+            Settings.builder()
+                .put(NODE_EXTERNAL_ID_SETTING.getKey(), externalId)
+                .put(NODE_ROLES_SETTING.getKey(), Arrays.stream(roles).map(DiscoveryNodeRole::roleName).collect(Collectors.joining(",")))
+                .build(),
+            1,
+            ByteSizeValue.ONE,
+            ByteSizeValue.ONE,
+            Version.CURRENT
+        );
+    }
+
+    private DesiredNodes createDesiredNodesWithActualizedNodes(DesiredNode... nodes) {
+        return createDesiredNodesWithStatus(DesiredNodeWithStatus.Status.ACTUALIZED, nodes);
+    }
+
+    private DesiredNodes createDesiredNodesWithPendingNodes(DesiredNode... nodes) {
+        return createDesiredNodesWithStatus(DesiredNodeWithStatus.Status.PENDING, nodes);
+    }
+
+    private DesiredNodes createDesiredNodesWithStatus(DesiredNodeWithStatus.Status status, DesiredNode... nodes) {
+        return DesiredNodes.create(
+            randomAlphaOfLength(10),
+            1,
+            Arrays.stream(nodes).map(desiredNode -> new DesiredNodeWithStatus(desiredNode, status)).toList()
+        );
+    }
+
+    private void assertAllocationDecision(ClusterState state, DiscoveryNode node, Decision.Type decisionType, String explanationMessage) {
+        final var allocation = new RoutingAllocation(allocationDeciders, null, state, null, null, 0);
+        allocation.debugDecision(true);
+
+        final var routingNode = RoutingNodesHelper.routingNode(node.getId(), node, shard);
+        {
+            final var decision = DataTierAllocationDecider.INSTANCE.canAllocate(shard, routingNode, allocation);
+            assertThat(routingNode.toString(), decision.type(), equalTo(decisionType));
+            assertThat(routingNode.toString(), decision.getExplanation(), containsString(explanationMessage));
+        }
+
+        {
+            final var decision = DataTierAllocationDecider.INSTANCE.canRemain(shard, routingNode, allocation);
+            assertThat(routingNode.toString(), decision.type(), equalTo(decisionType));
+            assertThat(routingNode.toString(), decision.getExplanation(), containsString(explanationMessage));
+        }
+    }
+
+    private DesiredNodeWithStatus actualizedDesiredNode(DesiredNode desiredNode) {
+        return new DesiredNodeWithStatus(desiredNode, DesiredNodeWithStatus.Status.ACTUALIZED);
+    }
+
+    private DesiredNodeWithStatus pendingDesiredNode(DesiredNode desiredNode) {
+        return new DesiredNodeWithStatus(desiredNode, DesiredNodeWithStatus.Status.PENDING);
+    }
 }