Bläddra i källkod

Prevalidate node removal API (pt. 2) (#91256)

This PR extends the basic Prevalidation API so that in case there are 
red non-searchable-snapshot indices in the cluster, we reach out to 
the nodes (whose removal is being prevalidated) to find out if they 
have a local copy of any red indices.

Closes #87776
Pooya Salehi 2 år sedan
förälder
incheckning
3a223d933a
22 ändrade filer med 1030 tillägg och 79 borttagningar
  1. 6 0
      docs/changelog/91256.yaml
  2. 48 5
      docs/reference/cluster/prevalidate-node-removal.asciidoc
  3. 4 0
      rest-api-spec/src/main/resources/rest-api-spec/api/_internal.prevalidate_node_removal.json
  4. 3 3
      rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.prevalidate_node_removal/10_basic.yml
  5. 150 14
      server/src/internalClusterTest/java/org/elasticsearch/cluster/PrevalidateNodeRemovalIT.java
  6. 74 0
      server/src/internalClusterTest/java/org/elasticsearch/cluster/PrevalidateShardPathIT.java
  7. 2 0
      server/src/main/java/org/elasticsearch/action/ActionModule.java
  8. 59 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathRequest.java
  9. 57 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathResponse.java
  10. 43 3
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesRemovalPrevalidation.java
  11. 16 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateNodeRemovalRequest.java
  12. 59 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequest.java
  13. 43 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathResponse.java
  14. 156 51
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportPrevalidateNodeRemovalAction.java
  15. 126 0
      server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportPrevalidateShardPathAction.java
  16. 1 0
      server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPrevalidateNodeRemovalAction.java
  17. 36 0
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathRequestSerializationTests.java
  18. 53 0
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathResponseSerializationTests.java
  19. 5 2
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesRemovalPrevalidationSerializationTests.java
  20. 84 0
      server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequestSerializationTests.java
  21. 3 0
      x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/PrevalidateNodeRemovalWithSearchableSnapshotIntegTests.java
  22. 2 1
      x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

+ 6 - 0
docs/changelog/91256.yaml

@@ -0,0 +1,6 @@
+pr: 91256
+summary: Prevalidate node removal API (pt. 2)
+area: Allocation
+type: enhancement
+issues:
+ - 87776

+ 48 - 5
docs/reference/cluster/prevalidate-node-removal.asciidoc

@@ -21,7 +21,9 @@ Prevalidate node removal.
 [[prevalidate-node-removal-api-desc]]
 ==== {api-description-title}
 
-This API checks whether attempting to remove the specified node(s) from the cluster is likely to succeed or not. For a cluster with no unassigned shards, removal of any node is considered safe which means the removal of the nodes is likely to succeed. In case the cluster has a <<cluster-health,`red` cluster health status>>, it verifies that the removal of the node(s) would not risk removing the last remaining copy of an unassigned shard.
+This API checks whether attempting to remove the specified node(s) from the cluster is likely to succeed or not. For a cluster with no unassigned shards, removal of any node is considered safe which means the removal of the nodes is likely to succeed.
+
+In case the cluster has a <<cluster-health,`red` cluster health status>>, it verifies that the removal of the node(s) would not risk removing the last remaining copy of an unassigned shard. If there are red indices in the cluster, the API checks whether the red indices are <<searchable-snapshots, Searchable Snapshot>> indices, and if not, it sends a request to each of nodes specified in the API call to verify whether the nodes might contain local shard copies of the red indices that are not Searchable Snapshot indices. This request is processed on each receiving node, by checking whether the node has a shard directory for any of the red index shards.
 
 The response includes the overall safety of the removal of the specified nodes, and a detailed response for each node. The node-specific part of the response also includes more details on why removal of that node might not succeed.
 
@@ -32,7 +34,7 @@ Note that if the prevalidation result for a set of nodes returns `true` (i.e. it
 [[prevalidate-node-removal-api-query-params]]
 ==== {api-query-parms-title}
 
-include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
+include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]
 
 `names`::
 (Optional, string) Comma-separated list of node names.
@@ -54,11 +56,50 @@ include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=master-timeout]
 
 `nodes`::
     (object) Prevalidation result for the removal of each of the provided nodes.
++
+.Properties of `nodes`
+[%collapsible%open]
+====
+`<node>`::
+    (object) Contains information about the removal prevalidation of a specific node.
++
+.Properties of `<node>`
+[%collapsible%open]
+=======
+`id`::
+    (string) node ID
+`name`::
+    (string) node name
+`external_id`::
+    (string) node external ID
+`result`::
+    (object) Contains removal prevalidation result of the node.
++
+.Properties of `result`
+[%collapsible%open]
+========
+`is_safe`::
+    (boolean) Whether the removal of the node is considered safe or not.
+`reason`::
+    (string) A string that specifies the reason why the prevalidation result is considered safe or not. It can be one of the following values:
++
+--
+    * `no_problems`: The prevalidation did not find any issues that could prevent the node from being safely removed.
+    * `no_red_shards_except_searchable_snapshots`: The node can be safely removed as all red indices are searchable snapshot indices and therefore removing a node does not risk removing the last copy of that index from the cluster.
+    * `no_red_shards_on_node`: The node does not contain any copies of the red non-searchable-snapshot index shards.
+    * `red_shards_on_node`: The node might contain shard copies of some non-searchable-snapshot red indices. The list of the shards that might be on the node are specified in the `message` field.
+    * `unable_to_verify_red_shards`: Contacting the node failed or timed out. More details is provided in the `message` field.
+--
+`message`::
+    (Optional, string) Detailed information about the removal prevalidation result.
+========
+=======
+====
 
 [[prevalidate-node-removal-api-example]]
 ==== {api-examples-title}
 
-This example validates whether it is safe to remove the nodes `node1` and `node2`. The response indicates that it is safe to remove `node1`, but it might not be safe to remove `node2`. Therefore, the overall prevalidation of the removal of the two nodes returns `false`.
+This example validates whether it is safe to remove the nodes `node1` and `node2`. The response indicates that it is safe to remove `node1`, but it might not be safe to remove `node2` as it might contain copies of the specified red shards. Therefore, the overall prevalidation of the removal of the two nodes returns `false`.
 
 [source,console]
 --------------------------------------------------
@@ -72,7 +113,7 @@ The API returns the following response:
 --------------------------------------------------
 {
   "is_safe": false,
-  "message": "cluster health is RED",
+  "message": "removal of the following nodes might not be safe: [node2-id]",
   "nodes": [
     {
       "id": "node1-id",
@@ -80,6 +121,7 @@ The API returns the following response:
       "external_id" : "node1-externalId",
       "result" : {
         "is_safe": true,
+        "reason": "no_red_shards_on_node",
         "message": ""
       }
     },
@@ -89,7 +131,8 @@ The API returns the following response:
       "external_id" : "node2-externalId",
       "result" : {
         "is_safe": false,
-        "message": "node may contain a copy of a red index shard"
+        "reason": "red_shards_on_node",
+        "message": "node contains copies of the following red shards: [[indexName][0]]"
       }
     }
   ]

+ 4 - 0
rest-api-spec/src/main/resources/rest-api-spec/api/_internal.prevalidate_node_removal.json

@@ -35,6 +35,10 @@
       "master_timeout":{
         "type":"time",
         "description":"Explicit operation timeout for connection to master node"
+      },
+      "timeout":{
+        "type":"time",
+        "description":"Explicit operation timeout"
       }
     }
   }

+ 3 - 3
rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/cluster.prevalidate_node_removal/10_basic.yml

@@ -2,8 +2,8 @@
 "Prevalidation basic test":
   - skip:
       features: contains
-      version: "- 8.5.99"
-      reason: "API added in 8.6.0"
+      version: "- 8.6.99"
+      reason: "The reason field was introduced in 8.7.0"
 
   # Fetch a node ID and stash it in node_id
   - do:
@@ -16,7 +16,7 @@
         ids: $node_id
 
   - match: { is_safe: true}
-  - contains: {nodes: {id: "$node_id", result: {is_safe: true, message: ""}}}
+  - contains: {nodes: {id: "$node_id", result: {is_safe: true, reason: no_problems, message: ""}}}
 ---
 "Prevalidation with no node specified":
   - skip:

+ 150 - 14
server/src/internalClusterTest/java/org/elasticsearch/cluster/PrevalidateNodeRemovalIT.java

@@ -13,17 +13,40 @@ import org.elasticsearch.action.admin.cluster.node.shutdown.NodesRemovalPrevalid
 import org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateNodeRemovalAction;
 import org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateNodeRemovalRequest;
 import org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateNodeRemovalResponse;
+import org.elasticsearch.action.admin.cluster.node.shutdown.TransportPrevalidateShardPathAction;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.cluster.metadata.IndexMetadata;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
+import org.elasticsearch.indices.store.IndicesStore;
+import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.transport.MockTransportService;
+import org.elasticsearch.transport.ConnectTransportException;
+import org.elasticsearch.transport.TransportService;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.oneOf;
+import static org.hamcrest.Matchers.startsWith;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
 public class PrevalidateNodeRemovalIT extends ESIntegTestCase {
 
+    @Override
+    protected Collection<Class<? extends Plugin>> nodePlugins() {
+        return Arrays.asList(MockTransportService.TestPlugin.class);
+    }
+
     public void testNodeRemovalFromNonRedCluster() throws Exception {
         internalCluster().startMasterOnlyNode();
         String node1 = internalCluster().startDataOnlyNode();
@@ -45,10 +68,13 @@ public class PrevalidateNodeRemovalIT extends ESIntegTestCase {
         }
         PrevalidateNodeRemovalResponse resp = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req.build()).get();
         assertTrue(resp.getPrevalidation().isSafe());
+        assertThat(resp.getPrevalidation().message(), equalTo("cluster status is not RED"));
         assertThat(resp.getPrevalidation().nodes().size(), equalTo(1));
         NodesRemovalPrevalidation.NodeResult nodeResult = resp.getPrevalidation().nodes().get(0);
         assertNotNull(nodeResult);
         assertThat(nodeResult.name(), equalTo(nodeName));
+        assertThat(nodeResult.result().reason(), equalTo(NodesRemovalPrevalidation.Reason.NO_PROBLEMS));
+        assertThat(nodeResult.result().message(), equalTo(""));
         assertTrue(nodeResult.result().isSafe());
         // Enforce a replica to get unassigned
         updateIndexSettings(indexName, Settings.builder().put("index.routing.allocation.require._name", node1));
@@ -56,25 +82,145 @@ public class PrevalidateNodeRemovalIT extends ESIntegTestCase {
         PrevalidateNodeRemovalRequest req2 = PrevalidateNodeRemovalRequest.builder().setNames(node2).build();
         PrevalidateNodeRemovalResponse resp2 = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req2).get();
         assertTrue(resp2.getPrevalidation().isSafe());
+        assertThat(resp2.getPrevalidation().message(), equalTo("cluster status is not RED"));
         assertThat(resp2.getPrevalidation().nodes().size(), equalTo(1));
         NodesRemovalPrevalidation.NodeResult nodeResult2 = resp2.getPrevalidation().nodes().get(0);
         assertNotNull(nodeResult2);
         assertThat(nodeResult2.name(), equalTo(node2));
         assertTrue(nodeResult2.result().isSafe());
+        assertThat(nodeResult2.result().reason(), equalTo(NodesRemovalPrevalidation.Reason.NO_PROBLEMS));
+        assertThat(nodeResult2.result().message(), equalTo(""));
     }
 
-    public void testNodeRemovalFromRedCluster() throws Exception {
+    // Test that in case the nodes that are being prevalidated do not contain copies of any of the
+    // red shards, their removal is considered to be safe.
+    public void testNodeRemovalFromRedClusterWithNoLocalShardCopy() throws Exception {
         internalCluster().startMasterOnlyNode();
-        String node1 = internalCluster().startDataOnlyNode();
-        String node2 = internalCluster().startDataOnlyNode();
+        String nodeWithIndex = internalCluster().startDataOnlyNode();
+        List<String> otherNodes = internalCluster().startDataOnlyNodes(randomIntBetween(1, 3));
         // Create an index pinned to one node, and then stop that node so the index is RED.
         String indexName = "test-idx";
         createIndex(
             indexName,
-            Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put("index.routing.allocation.require._name", node1).build()
+            Settings.builder()
+                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                .put("index.routing.allocation.require._name", nodeWithIndex)
+                .build()
         );
         ensureYellow(indexName);
+        internalCluster().stopNode(nodeWithIndex);
+        ensureRed(indexName);
+        String[] otherNodeNames = otherNodes.toArray(new String[otherNodes.size()]);
+        PrevalidateNodeRemovalRequest req = PrevalidateNodeRemovalRequest.builder().setNames(otherNodeNames).build();
+        PrevalidateNodeRemovalResponse resp = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req).get();
+        assertTrue(resp.getPrevalidation().isSafe());
+        assertThat(resp.getPrevalidation().message(), equalTo(""));
+        assertThat(resp.getPrevalidation().nodes().size(), equalTo(otherNodes.size()));
+        for (NodesRemovalPrevalidation.NodeResult nodeResult : resp.getPrevalidation().nodes()) {
+            assertThat(nodeResult.name(), oneOf(otherNodeNames));
+            assertThat(nodeResult.result().reason(), equalTo(NodesRemovalPrevalidation.Reason.NO_RED_SHARDS_ON_NODE));
+            assertTrue(nodeResult.result().isSafe());
+        }
+    }
+
+    public void testNodeRemovalFromRedClusterWithLocalShardCopy() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        String node1 = internalCluster().startDataOnlyNode();
+        String node2 = internalCluster().startDataOnlyNode();
+        String indexName = "test-idx";
+        createIndex(
+            indexName,
+            Settings.builder()
+                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+                .put("index.routing.allocation.require._name", node1)
+                .build()
+        );
+        ensureGreen(indexName);
+        // Prevent node1 from removing its local index shard copies upon removal, by blocking
+        // its ACTION_SHARD_EXISTS requests since after a relocation, the source first waits
+        // until the shard exists somewhere else, then it removes it locally.
+        final CountDownLatch shardActiveRequestSent = new CountDownLatch(1);
+        MockTransportService node1transport = (MockTransportService) internalCluster().getInstance(TransportService.class, node1);
+        TransportService node2transport = internalCluster().getInstance(TransportService.class, node2);
+        node1transport.addSendBehavior(node2transport, (connection, requestId, action, request, options) -> {
+            if (action.equals(IndicesStore.ACTION_SHARD_EXISTS)) {
+                shardActiveRequestSent.countDown();
+                logger.info("prevent shard active request from being sent");
+                throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulated");
+            }
+            connection.sendRequest(requestId, action, request, options);
+        });
+        logger.info("--> move shard from {} to {}, and wait for relocation to finish", node1, node2);
+        updateIndexSettings(indexName, Settings.builder().put("index.routing.allocation.require._name", node2));
+        shardActiveRequestSent.await();
+        ensureGreen(indexName);
+        // To ensure that the index doesn't get relocated back to node1 after stopping node2, we
+        // index a doc to make the index copy on node1 (in case not deleted after the relocation) stale.
+        indexDoc(indexName, "some_id", "foo", "bar");
+        internalCluster().stopNode(node2);
+        ensureRed(indexName);
+        // Ensure that node1 still has data for the unassigned index
+        NodeEnvironment nodeEnv = internalCluster().getInstance(NodeEnvironment.class, node1);
+        Index index = internalCluster().clusterService().state().metadata().index(indexName).getIndex();
+        ShardPath shardPath = ShardPath.loadShardPath(logger, nodeEnv, new ShardId(index, 0), "");
+        assertNotNull("local index shards not found", shardPath);
+        // Prevalidate removal of node1
+        PrevalidateNodeRemovalRequest req = PrevalidateNodeRemovalRequest.builder().setNames(node1).build();
+        PrevalidateNodeRemovalResponse resp = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req).get();
+        String node1Id = internalCluster().clusterService(node1).localNode().getId();
+        assertFalse(resp.getPrevalidation().isSafe());
+        assertThat(resp.getPrevalidation().message(), equalTo("removal of the following nodes might not be safe: [" + node1Id + "]"));
+        assertThat(resp.getPrevalidation().nodes().size(), equalTo(1));
+        NodesRemovalPrevalidation.NodeResult nodeResult = resp.getPrevalidation().nodes().get(0);
+        assertThat(nodeResult.name(), equalTo(node1));
+        assertFalse(nodeResult.result().isSafe());
+        assertThat(nodeResult.result().reason(), equalTo(NodesRemovalPrevalidation.Reason.RED_SHARDS_ON_NODE));
+        assertThat(nodeResult.result().message(), equalTo("node contains copies of the following red shards: [[" + indexName + "][0]]"));
+    }
+
+    public void testNodeRemovalFromRedClusterWithTimeout() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        String node1 = internalCluster().startDataOnlyNode();
+        String node2 = internalCluster().startDataOnlyNode();
+        String indexName = "test-index";
+        createIndex(
+            indexName,
+            Settings.builder()
+                .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
+                .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
+                .put("index.routing.allocation.require._name", node1)
+                .build()
+        );
+        ensureGreen(indexName);
+        // make it red!
         internalCluster().stopNode(node1);
+        ensureRed(indexName);
+        MockTransportService node2TransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, node2);
+        node2TransportService.addRequestHandlingBehavior(
+            TransportPrevalidateShardPathAction.ACTION_NAME + "[n]",
+            (handler, request, channel, task) -> { logger.info("drop the check shards request"); }
+        );
+        PrevalidateNodeRemovalRequest req = PrevalidateNodeRemovalRequest.builder()
+            .setNames(node2)
+            .build()
+            .timeout(TimeValue.timeValueSeconds(1));
+        PrevalidateNodeRemovalResponse resp = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req).get();
+        assertFalse("prevalidation result should return false", resp.getPrevalidation().isSafe());
+        String node2Id = internalCluster().clusterService(node2).localNode().getId();
+        assertThat(
+            resp.getPrevalidation().message(),
+            equalTo("cannot prevalidate removal of nodes with the following IDs: [" + node2Id + "]")
+        );
+        assertThat(resp.getPrevalidation().nodes().size(), equalTo(1));
+        NodesRemovalPrevalidation.NodeResult nodeResult = resp.getPrevalidation().nodes().get(0);
+        assertThat(nodeResult.name(), equalTo(node2));
+        assertFalse(nodeResult.result().isSafe());
+        assertThat(nodeResult.result().message(), startsWith("failed contacting the node"));
+        assertThat(nodeResult.result().reason(), equalTo(NodesRemovalPrevalidation.Reason.UNABLE_TO_VERIFY));
+    }
+
+    private void ensureRed(String indexName) throws Exception {
         assertBusy(() -> {
             ClusterHealthResponse healthResponse = client().admin()
                 .cluster()
@@ -85,15 +231,5 @@ public class PrevalidateNodeRemovalIT extends ESIntegTestCase {
                 .actionGet();
             assertThat(healthResponse.getStatus(), equalTo(ClusterHealthStatus.RED));
         });
-        // With a RED non-searchable-snapshot index, node removal is potentially unsafe
-        // since that node might have the last copy of the unassigned index.
-        PrevalidateNodeRemovalRequest req = PrevalidateNodeRemovalRequest.builder().setNames(node2).build();
-        PrevalidateNodeRemovalResponse resp = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req).get();
-        assertFalse(resp.getPrevalidation().isSafe());
-        assertThat(resp.getPrevalidation().message(), equalTo("cluster health is RED"));
-        assertThat(resp.getPrevalidation().nodes().size(), equalTo(1));
-        NodesRemovalPrevalidation.NodeResult nodeResult = resp.getPrevalidation().nodes().get(0);
-        assertThat(nodeResult.name(), equalTo(node2));
-        assertFalse(nodeResult.result().isSafe());
     }
 }

+ 74 - 0
server/src/internalClusterTest/java/org/elasticsearch/cluster/PrevalidateShardPathIT.java

@@ -0,0 +1,74 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster;
+
+import org.elasticsearch.action.admin.cluster.node.shutdown.NodePrevalidateShardPathResponse;
+import org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathRequest;
+import org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathResponse;
+import org.elasticsearch.action.admin.cluster.node.shutdown.TransportPrevalidateShardPathAction;
+import org.elasticsearch.cluster.metadata.IndexMetadata;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.Matchers.equalTo;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+public class PrevalidateShardPathIT extends ESIntegTestCase {
+
+    public void testCheckShards() throws Exception {
+        internalCluster().startMasterOnlyNode();
+        String node1 = internalCluster().startDataOnlyNode();
+        String node2 = internalCluster().startDataOnlyNode();
+        String indexName = "index1";
+        int index1shards = randomIntBetween(1, 5);
+        createIndex("index1", Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, index1shards).build());
+        ensureGreen(indexName);
+        var shardIds = clusterService().state()
+            .routingTable()
+            .allShards(indexName)
+            .stream()
+            .map(ShardRouting::shardId)
+            .collect(Collectors.toSet());
+        String node1Id = internalCluster().clusterService(node1).localNode().getId();
+        String node2Id = internalCluster().clusterService(node2).localNode().getId();
+        Set<ShardId> shardIdsToCheck = new HashSet<>(shardIds);
+        boolean includeUnknownShardId = randomBoolean();
+        if (includeUnknownShardId) {
+            shardIdsToCheck.add(new ShardId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(), randomIntBetween(0, 10)));
+        }
+        PrevalidateShardPathRequest req = new PrevalidateShardPathRequest(shardIdsToCheck, node1Id, node2Id);
+        PrevalidateShardPathResponse resp = client().execute(TransportPrevalidateShardPathAction.TYPE, req).get();
+        var nodeResponses = resp.getNodes();
+        assertThat(nodeResponses.size(), equalTo(2));
+        assertThat(nodeResponses.stream().map(r -> r.getNode().getId()).collect(Collectors.toSet()), equalTo(Set.of(node1Id, node2Id)));
+        assertTrue(resp.failures().isEmpty());
+        for (NodePrevalidateShardPathResponse nodeResponse : nodeResponses) {
+            assertThat(nodeResponse.getShardIds(), equalTo(shardIds));
+        }
+        // Check that after relocation the source node doesn't have the shard path
+        String node3 = internalCluster().startDataOnlyNode();
+        updateIndexSettings(indexName, Settings.builder().put("index.routing.allocation.exclude._name", node2));
+        ensureGreen(indexName);
+        assertBusy(() -> {
+            // The excluded node should eventually delete the shards
+            PrevalidateShardPathRequest req2 = new PrevalidateShardPathRequest(shardIdsToCheck, node2Id);
+            PrevalidateShardPathResponse resp2 = client().execute(TransportPrevalidateShardPathAction.TYPE, req2).get();
+            assertThat(resp2.getNodes().size(), equalTo(1));
+            assertTrue(resp.failures().isEmpty());
+            assertTrue(resp2.getNodes().get(0).getShardIds().isEmpty());
+        });
+    }
+}

+ 2 - 0
server/src/main/java/org/elasticsearch/action/ActionModule.java

@@ -41,6 +41,7 @@ import org.elasticsearch.action.admin.cluster.node.reload.NodesReloadSecureSetti
 import org.elasticsearch.action.admin.cluster.node.reload.TransportNodesReloadSecureSettingsAction;
 import org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateNodeRemovalAction;
 import org.elasticsearch.action.admin.cluster.node.shutdown.TransportPrevalidateNodeRemovalAction;
+import org.elasticsearch.action.admin.cluster.node.shutdown.TransportPrevalidateShardPathAction;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction;
 import org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction;
 import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksAction;
@@ -707,6 +708,7 @@ public class ActionModule extends AbstractModule {
         actions.register(TransportNodesListShardStoreMetadata.TYPE, TransportNodesListShardStoreMetadata.class);
         actions.register(TransportShardFlushAction.TYPE, TransportShardFlushAction.class);
         actions.register(TransportShardRefreshAction.TYPE, TransportShardRefreshAction.class);
+        actions.register(TransportPrevalidateShardPathAction.TYPE, TransportPrevalidateShardPathAction.class);
 
         // desired nodes
         actions.register(GetDesiredNodesAction.INSTANCE, TransportGetDesiredNodesAction.class);

+ 59 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathRequest.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.shutdown;
+
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.transport.TransportRequest;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A node-specific request derived from the corresponding {@link PrevalidateShardPathRequest}.
+*/
+public class NodePrevalidateShardPathRequest extends TransportRequest {
+
+    private final Set<ShardId> shardIds;
+
+    public NodePrevalidateShardPathRequest(Collection<ShardId> shardIds) {
+        this.shardIds = Set.copyOf(Objects.requireNonNull(shardIds));
+    }
+
+    public NodePrevalidateShardPathRequest(StreamInput in) throws IOException {
+        super(in);
+        this.shardIds = Set.copyOf(Objects.requireNonNull(in.readSet(ShardId::new)));
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeCollection(shardIds, (o, value) -> value.writeTo(o));
+    }
+
+    public Set<ShardId> getShardIds() {
+        return shardIds;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o instanceof NodePrevalidateShardPathRequest == false) return false;
+        NodePrevalidateShardPathRequest other = (NodePrevalidateShardPathRequest) o;
+        return Objects.equals(shardIds, other.shardIds);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(shardIds);
+    }
+}

+ 57 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathResponse.java

@@ -0,0 +1,57 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.shutdown;
+
+import org.elasticsearch.action.support.nodes.BaseNodeResponse;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.shard.ShardId;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+
+public class NodePrevalidateShardPathResponse extends BaseNodeResponse {
+
+    private final Set<ShardId> shardIds;
+
+    protected NodePrevalidateShardPathResponse(DiscoveryNode node, Set<ShardId> shardIds) {
+        super(node);
+        this.shardIds = Set.copyOf(Objects.requireNonNull(shardIds));
+    }
+
+    protected NodePrevalidateShardPathResponse(StreamInput in) throws IOException {
+        super(in);
+        shardIds = Set.copyOf(Objects.requireNonNull(in.readSet(ShardId::new)));
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeCollection(shardIds);
+    }
+
+    public Set<ShardId> getShardIds() {
+        return shardIds;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o instanceof NodePrevalidateShardPathResponse == false) return false;
+        NodePrevalidateShardPathResponse other = (NodePrevalidateShardPathResponse) o;
+        return Objects.equals(shardIds, other.shardIds) && Objects.equals(getNode(), other.getNode());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(shardIds, getNode());
+    }
+}

+ 43 - 3
server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesRemovalPrevalidation.java

@@ -119,14 +119,15 @@ public record NodesRemovalPrevalidation(boolean isSafe, String message, List<Nod
     }
 
     // The prevalidation result of a node
-    public record Result(boolean isSafe, String message) implements ToXContentObject, Writeable {
+    public record Result(boolean isSafe, Reason reason, String message) implements ToXContentObject, Writeable {
 
         private static final ParseField IS_SAFE_FIELD = new ParseField("is_safe");
+        private static final ParseField REASON_FIELD = new ParseField("reason");
         private static final ParseField MESSAGE_FIELD = new ParseField("message");
 
         private static final ConstructingObjectParser<Result, Void> PARSER = new ConstructingObjectParser<>(
             "nodes_removal_prevalidation_result",
-            objects -> new Result((boolean) objects[0], (String) objects[1])
+            objects -> new Result((boolean) objects[0], Reason.fromString((String) objects[1]), (String) objects[2])
         );
 
         static {
@@ -135,23 +136,26 @@ public record NodesRemovalPrevalidation(boolean isSafe, String message, List<Nod
 
         static <T> void configureParser(ConstructingObjectParser<T, Void> parser) {
             parser.declareBoolean(ConstructingObjectParser.constructorArg(), IS_SAFE_FIELD);
+            parser.declareString(ConstructingObjectParser.constructorArg(), REASON_FIELD);
             parser.declareString(ConstructingObjectParser.constructorArg(), MESSAGE_FIELD);
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             out.writeBoolean(isSafe);
+            reason.writeTo(out);
             out.writeString(message);
         }
 
         public static Result readFrom(final StreamInput in) throws IOException {
-            return new Result(in.readBoolean(), in.readString());
+            return new Result(in.readBoolean(), Reason.readFrom(in), in.readString());
         }
 
         @Override
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             builder.startObject();
             builder.field(IS_SAFE_FIELD.getPreferredName(), isSafe);
+            builder.field(REASON_FIELD.getPreferredName(), reason.reason);
             builder.field(MESSAGE_FIELD.getPreferredName(), message);
             builder.endObject();
             return builder;
@@ -161,4 +165,40 @@ public record NodesRemovalPrevalidation(boolean isSafe, String message, List<Nod
             return PARSER.parse(parser, null);
         }
     }
+
+    public enum Reason implements Writeable {
+        NO_PROBLEMS("no_problems"),
+        NO_RED_SHARDS_ON_NODE("no_red_shards_on_node"),
+        NO_RED_SHARDS_EXCEPT_SEARCHABLE_SNAPSHOTS("no_red_shards_except_searchable_snapshots"),
+        RED_SHARDS_ON_NODE("red_shards_on_node"),
+        UNABLE_TO_VERIFY("unable_to_verify_red_shards");
+
+        private final String reason;
+
+        Reason(String reason) {
+            this.reason = reason;
+        }
+
+        public String reason() {
+            return reason;
+        }
+
+        public static Reason readFrom(final StreamInput in) throws IOException {
+            return fromString(in.readString());
+        }
+
+        public static Reason fromString(String s) {
+            for (Reason r : values()) {
+                if (s.equalsIgnoreCase(r.reason)) {
+                    return r;
+                }
+            }
+            throw new IllegalArgumentException("unexpected Reason value [" + s + "]");
+        }
+
+        @Override
+        public void writeTo(StreamOutput out) throws IOException {
+            out.writeString(reason);
+        }
+    }
 }

+ 16 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateNodeRemovalRequest.java

@@ -13,6 +13,7 @@ import org.elasticsearch.action.support.master.MasterNodeReadRequest;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.core.TimeValue;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -29,6 +30,7 @@ public class PrevalidateNodeRemovalRequest extends MasterNodeReadRequest<Prevali
     private final String[] names;
     private final String[] ids;
     private final String[] externalIds;
+    private TimeValue timeout = TimeValue.timeValueSeconds(30);
 
     private PrevalidateNodeRemovalRequest(Builder builder) {
         this.names = builder.names;
@@ -41,6 +43,7 @@ public class PrevalidateNodeRemovalRequest extends MasterNodeReadRequest<Prevali
         names = in.readStringArray();
         ids = in.readStringArray();
         externalIds = in.readStringArray();
+        timeout = in.readTimeValue();
     }
 
     @Override
@@ -49,6 +52,7 @@ public class PrevalidateNodeRemovalRequest extends MasterNodeReadRequest<Prevali
         out.writeStringArray(names);
         out.writeStringArray(ids);
         out.writeStringArray(externalIds);
+        out.writeTimeValue(timeout);
     }
 
     @Override
@@ -79,6 +83,18 @@ public class PrevalidateNodeRemovalRequest extends MasterNodeReadRequest<Prevali
         return externalIds;
     }
 
+    public TimeValue timeout() {
+        return timeout;
+    }
+
+    public PrevalidateNodeRemovalRequest timeout(TimeValue timeout) {
+        this.timeout = timeout;
+        if (masterNodeTimeout == DEFAULT_MASTER_NODE_TIMEOUT) {
+            masterNodeTimeout = timeout;
+        }
+        return this;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;

+ 59 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequest.java

@@ -0,0 +1,59 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.shutdown;
+
+import org.elasticsearch.action.support.nodes.BaseNodesRequest;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.index.shard.ShardId;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.Set;
+
+public class PrevalidateShardPathRequest extends BaseNodesRequest<PrevalidateShardPathRequest> {
+
+    private final Set<ShardId> shardIds;
+
+    public PrevalidateShardPathRequest(Set<ShardId> shardIds, String... nodeIds) {
+        super(nodeIds);
+        this.shardIds = Set.copyOf(Objects.requireNonNull(shardIds));
+    }
+
+    public PrevalidateShardPathRequest(StreamInput in) throws IOException {
+        super(in);
+        this.shardIds = Set.copyOf(Objects.requireNonNull(in.readSet(ShardId::new)));
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeCollection(shardIds);
+    }
+
+    public Set<ShardId> getShardIds() {
+        return shardIds;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o instanceof PrevalidateShardPathRequest == false) return false;
+        PrevalidateShardPathRequest other = (PrevalidateShardPathRequest) o;
+        return Objects.equals(shardIds, other.shardIds)
+            && Arrays.equals(nodesIds(), other.nodesIds())
+            && Objects.equals(timeout(), other.timeout());
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(shardIds, Arrays.hashCode(nodesIds()), timeout());
+    }
+}

+ 43 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathResponse.java

@@ -0,0 +1,43 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.shutdown;
+
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.nodes.BaseNodesResponse;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+
+import java.io.IOException;
+import java.util.List;
+
+public class PrevalidateShardPathResponse extends BaseNodesResponse<NodePrevalidateShardPathResponse> {
+
+    public PrevalidateShardPathResponse(
+        ClusterName clusterName,
+        List<NodePrevalidateShardPathResponse> nodes,
+        List<FailedNodeException> failures
+    ) {
+        super(clusterName, nodes, failures);
+    }
+
+    public PrevalidateShardPathResponse(StreamInput in) throws IOException {
+        super(in);
+    }
+
+    @Override
+    protected List<NodePrevalidateShardPathResponse> readNodesFrom(StreamInput in) throws IOException {
+        return in.readList(NodePrevalidateShardPathResponse::new);
+    }
+
+    @Override
+    protected void writeNodesTo(StreamOutput out, List<NodePrevalidateShardPathResponse> nodes) throws IOException {
+        out.writeList(nodes);
+    }
+}

+ 156 - 51
server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportPrevalidateNodeRemovalAction.java

@@ -10,8 +10,10 @@ package org.elasticsearch.action.admin.cluster.node.shutdown;
 
 import org.elasticsearch.ResourceNotFoundException;
 import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.FailedNodeException;
 import org.elasticsearch.action.support.ActionFilters;
 import org.elasticsearch.action.support.master.TransportMasterNodeReadAction;
+import org.elasticsearch.client.internal.node.NodeClient;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
@@ -22,12 +24,16 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.core.Strings;
+import org.elasticsearch.core.Tuple;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.logging.LogManager;
 import org.elasticsearch.logging.Logger;
 import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -46,13 +52,16 @@ public class TransportPrevalidateNodeRemovalAction extends TransportMasterNodeRe
 
     private static final Logger logger = LogManager.getLogger(TransportPrevalidateNodeRemovalAction.class);
 
+    private final NodeClient client;
+
     @Inject
     public TransportPrevalidateNodeRemovalAction(
         TransportService transportService,
         ClusterService clusterService,
         ThreadPool threadPool,
         ActionFilters actionFilters,
-        IndexNameExpressionResolver indexNameExpressionResolver
+        IndexNameExpressionResolver indexNameExpressionResolver,
+        NodeClient client
     ) {
         super(
             PrevalidateNodeRemovalAction.NAME,
@@ -66,6 +75,7 @@ public class TransportPrevalidateNodeRemovalAction extends TransportMasterNodeRe
             PrevalidateNodeRemovalResponse::new,
             ThreadPool.Names.SAME
         );
+        this.client = client;
     }
 
     @Override
@@ -76,8 +86,8 @@ public class TransportPrevalidateNodeRemovalAction extends TransportMasterNodeRe
         ActionListener<PrevalidateNodeRemovalResponse> listener
     ) {
         try {
-            Set<DiscoveryNode> discoveryNodes = resolveNodes(request, state.nodes());
-            doPrevalidation(discoveryNodes, state, listener);
+            Set<DiscoveryNode> requestNodes = resolveNodes(request, state.nodes());
+            doPrevalidation(request, requestNodes, state, listener);
         } catch (Exception e) {
             listener.onFailure(e);
         }
@@ -141,62 +151,157 @@ public class TransportPrevalidateNodeRemovalAction extends TransportMasterNodeRe
     }
 
     private void doPrevalidation(
-        Set<DiscoveryNode> nodes,
+        PrevalidateNodeRemovalRequest request,
+        Set<DiscoveryNode> requestNodes,
         ClusterState clusterState,
         ActionListener<PrevalidateNodeRemovalResponse> listener
     ) {
-        assert nodes != null && nodes.isEmpty() == false;
+        assert requestNodes != null && requestNodes.isEmpty() == false;
 
-        logger.debug(() -> "prevalidate node removal for nodes " + nodes);
+        logger.debug(() -> "prevalidate node removal for nodes " + requestNodes);
         ClusterStateHealth clusterStateHealth = new ClusterStateHealth(clusterState);
         Metadata metadata = clusterState.metadata();
-        switch (clusterStateHealth.getStatus()) {
-            case GREEN, YELLOW -> {
-                List<NodeResult> nodesResults = nodes.stream()
-                    .map(dn -> new NodeResult(dn.getName(), dn.getId(), dn.getExternalId(), new Result(true, "")))
-                    .toList();
-                listener.onResponse(
-                    new PrevalidateNodeRemovalResponse(new NodesRemovalPrevalidation(true, "cluster status is not RED", nodesResults))
-                );
-            }
-            case RED -> {
-                Set<String> redIndices = clusterStateHealth.getIndices()
-                    .entrySet()
-                    .stream()
-                    .filter(entry -> entry.getValue().getStatus() == ClusterHealthStatus.RED)
-                    .map(Map.Entry::getKey)
-                    .collect(Collectors.toSet());
-                // If all red indices are searchable snapshot indices, it is safe to remove any node.
-                Set<String> redNonSSIndices = redIndices.stream()
-                    .map(metadata::index)
-                    .filter(i -> i.isSearchableSnapshot() == false)
-                    .map(im -> im.getIndex().getName())
-                    .collect(Collectors.toSet());
-                if (redNonSSIndices.isEmpty()) {
-                    List<NodeResult> nodeResults = nodes.stream()
-                        .map(dn -> new NodeResult(dn.getName(), dn.getId(), dn.getExternalId(), new Result(true, "")))
-                        .toList();
-                    listener.onResponse(
-                        new PrevalidateNodeRemovalResponse(
-                            new NodesRemovalPrevalidation(true, "all red indices are searchable snapshot indices", nodeResults)
-                        )
-                    );
-                } else {
-                    List<NodeResult> nodeResults = nodes.stream()
-                        .map(
-                            dn -> new NodeResult(
-                                dn.getName(),
-                                dn.getId(),
-                                dn.getExternalId(),
-                                new Result(false, "node may contain a copy of a red index shard")
-                            )
-                        )
-                        .toList();
-                    listener.onResponse(
-                        new PrevalidateNodeRemovalResponse(new NodesRemovalPrevalidation(false, "cluster health is RED", nodeResults))
-                    );
+        DiscoveryNodes clusterNodes = clusterState.getNodes();
+        if (clusterStateHealth.getStatus() == ClusterHealthStatus.GREEN || clusterStateHealth.getStatus() == ClusterHealthStatus.YELLOW) {
+            List<NodeResult> nodesResults = requestNodes.stream()
+                .map(
+                    dn -> new NodeResult(
+                        dn.getName(),
+                        dn.getId(),
+                        dn.getExternalId(),
+                        new Result(true, NodesRemovalPrevalidation.Reason.NO_PROBLEMS, "")
+                    )
+                )
+                .toList();
+            listener.onResponse(
+                new PrevalidateNodeRemovalResponse(new NodesRemovalPrevalidation(true, "cluster status is not RED", nodesResults))
+            );
+            return;
+        }
+        // RED cluster state
+        Set<String> redIndices = clusterStateHealth.getIndices()
+            .entrySet()
+            .stream()
+            .filter(entry -> entry.getValue().getStatus() == ClusterHealthStatus.RED)
+            .map(Map.Entry::getKey)
+            .collect(Collectors.toSet());
+        // If all red indices are searchable snapshot indices, it is safe to remove any node.
+        Set<String> redNonSSIndices = redIndices.stream()
+            .map(metadata::index)
+            .filter(i -> i.isSearchableSnapshot() == false)
+            .map(im -> im.getIndex().getName())
+            .collect(Collectors.toSet());
+        if (redNonSSIndices.isEmpty()) {
+            List<NodeResult> nodeResults = requestNodes.stream()
+                .map(
+                    dn -> new NodeResult(
+                        dn.getName(),
+                        dn.getId(),
+                        dn.getExternalId(),
+                        new Result(true, NodesRemovalPrevalidation.Reason.NO_RED_SHARDS_EXCEPT_SEARCHABLE_SNAPSHOTS, "")
+                    )
+                )
+                .toList();
+            listener.onResponse(
+                new PrevalidateNodeRemovalResponse(
+                    new NodesRemovalPrevalidation(true, "all red indices are searchable snapshot indices", nodeResults)
+                )
+            );
+        } else {
+            // Reach out to the nodes to find out whether they contain copies of the red non-searchable-snapshot indices
+            Set<ShardId> redShards = clusterStateHealth.getIndices()
+                .entrySet()
+                .stream()
+                .filter(indexHealthEntry -> redNonSSIndices.contains(indexHealthEntry.getKey()))
+                .map(Map.Entry::getValue) // ClusterHealthIndex of red non-searchable-snapshot indices
+                .flatMap(
+                    redIndexHealth -> redIndexHealth.getShards()
+                        .values()
+                        .stream()
+                        .filter(shardHealth -> shardHealth.getStatus() == ClusterHealthStatus.RED)
+                        .map(redShardHealth -> Tuple.tuple(redIndexHealth.getIndex(), redShardHealth))
+                ) // (Index, ClusterShardHealth) of all red shards
+                .map(
+                    redIndexShardHealthTuple -> new ShardId(
+                        metadata.index(redIndexShardHealthTuple.v1()).getIndex(),
+                        redIndexShardHealthTuple.v2().getShardId()
+                    )
+                ) // Convert to ShardId
+                .collect(Collectors.toSet());
+            var nodeIds = requestNodes.stream().map(DiscoveryNode::getId).toList().toArray(new String[0]);
+            var checkShardsRequest = new PrevalidateShardPathRequest(redShards, nodeIds).timeout(request.timeout());
+            client.execute(TransportPrevalidateShardPathAction.TYPE, checkShardsRequest, new ActionListener<>() {
+                @Override
+                public void onResponse(PrevalidateShardPathResponse response) {
+                    listener.onResponse(new PrevalidateNodeRemovalResponse(createPrevalidationResult(clusterNodes, response)));
+                }
+
+                @Override
+                public void onFailure(Exception e) {
+                    listener.onFailure(e);
                 }
+            });
+        }
+    }
+
+    private NodesRemovalPrevalidation createPrevalidationResult(DiscoveryNodes nodes, PrevalidateShardPathResponse response) {
+        List<NodeResult> nodeResults = new ArrayList<>(response.getNodes().size() + response.failures().size());
+        for (NodePrevalidateShardPathResponse nodeResponse : response.getNodes()) {
+            Result result;
+            if (nodeResponse.getShardIds().isEmpty()) {
+                result = new Result(true, NodesRemovalPrevalidation.Reason.NO_RED_SHARDS_ON_NODE, "");
+            } else {
+                result = new Result(
+                    false,
+                    NodesRemovalPrevalidation.Reason.RED_SHARDS_ON_NODE,
+                    Strings.format("node contains copies of the following red shards: %s", nodeResponse.getShardIds())
+                );
             }
+            nodeResults.add(
+                new NodeResult(
+                    nodeResponse.getNode().getName(),
+                    nodeResponse.getNode().getId(),
+                    nodeResponse.getNode().getExternalId(),
+                    result
+                )
+            );
+        }
+        for (FailedNodeException failedResponse : response.failures()) {
+            DiscoveryNode node = nodes.get(failedResponse.nodeId());
+            nodeResults.add(
+                new NodeResult(
+                    node.getName(),
+                    node.getId(),
+                    node.getExternalId(),
+                    new Result(
+                        false,
+                        NodesRemovalPrevalidation.Reason.UNABLE_TO_VERIFY,
+                        Strings.format("failed contacting the node: %s", failedResponse.getDetailedMessage())
+                    )
+                )
+            );
+        }
+        // determine overall result from the node results.
+        Set<String> unsafeNodeRemovals = response.getNodes()
+            .stream()
+            .filter(r -> r.getShardIds().isEmpty() == false)
+            .map(r -> r.getNode().getId())
+            .collect(Collectors.toSet());
+        if (unsafeNodeRemovals.isEmpty() == false) {
+            return new NodesRemovalPrevalidation(
+                false,
+                Strings.format("removal of the following nodes might not be safe: %s", unsafeNodeRemovals),
+                nodeResults
+            );
+        }
+        if (response.failures().isEmpty() == false) {
+            Set<String> unknownNodeRemovals = response.failures().stream().map(FailedNodeException::nodeId).collect(Collectors.toSet());
+            return new NodesRemovalPrevalidation(
+                false,
+                Strings.format("cannot prevalidate removal of nodes with the following IDs: %s", unknownNodeRemovals),
+                nodeResults
+            );
         }
+        return new NodesRemovalPrevalidation(true, "", nodeResults);
     }
 }

+ 126 - 0
server/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportPrevalidateShardPathAction.java

@@ -0,0 +1,126 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.shutdown;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.elasticsearch.action.ActionType;
+import org.elasticsearch.action.FailedNodeException;
+import org.elasticsearch.action.support.ActionFilters;
+import org.elasticsearch.action.support.nodes.TransportNodesAction;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.env.NodeEnvironment;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardPath;
+import org.elasticsearch.tasks.Task;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+
+/**
+ * Given a set of shard IDs, checks which of those shards have a matching directory in the local data path.
+ * This is used by {@link PrevalidateNodeRemovalAction} to find out whether a node may contain some copy
+ * of a specific shard. The response contains a subset of the request shard IDs which are in the cluster state
+ * of this node and have a matching shard path on the local data path.
+ */
+public class TransportPrevalidateShardPathAction extends TransportNodesAction<
+    PrevalidateShardPathRequest,
+    PrevalidateShardPathResponse,
+    NodePrevalidateShardPathRequest,
+    NodePrevalidateShardPathResponse> {
+
+    public static final String ACTION_NAME = "internal:admin/indices/prevalidate_shard_path";
+    public static final ActionType<PrevalidateShardPathResponse> TYPE = new ActionType<>(ACTION_NAME, PrevalidateShardPathResponse::new);
+    private static final Logger logger = LogManager.getLogger(TransportPrevalidateShardPathAction.class);
+
+    private final TransportService transportService;
+    private final NodeEnvironment nodeEnv;
+    private final Settings settings;
+
+    @Inject
+    public TransportPrevalidateShardPathAction(
+        ThreadPool threadPool,
+        ClusterService clusterService,
+        TransportService transportService,
+        ActionFilters actionFilters,
+        NodeEnvironment nodeEnv,
+        Settings settings
+    ) {
+        super(
+            ACTION_NAME,
+            threadPool,
+            clusterService,
+            transportService,
+            actionFilters,
+            PrevalidateShardPathRequest::new,
+            NodePrevalidateShardPathRequest::new,
+            ThreadPool.Names.MANAGEMENT,
+            NodePrevalidateShardPathResponse.class
+        );
+        this.transportService = transportService;
+        this.nodeEnv = nodeEnv;
+        this.settings = settings;
+    }
+
+    @Override
+    protected PrevalidateShardPathResponse newResponse(
+        PrevalidateShardPathRequest request,
+        List<NodePrevalidateShardPathResponse> nodeResponses,
+        List<FailedNodeException> failures
+    ) {
+        return new PrevalidateShardPathResponse(clusterService.getClusterName(), nodeResponses, failures);
+    }
+
+    @Override
+    protected NodePrevalidateShardPathRequest newNodeRequest(PrevalidateShardPathRequest request) {
+        return new NodePrevalidateShardPathRequest(request.getShardIds());
+    }
+
+    @Override
+    protected NodePrevalidateShardPathResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
+        return new NodePrevalidateShardPathResponse(in);
+    }
+
+    @Override
+    protected NodePrevalidateShardPathResponse nodeOperation(NodePrevalidateShardPathRequest request, Task task) {
+        Set<ShardId> localShards = new HashSet<>();
+        ShardPath shardPath = null;
+        // For each shard we only check whether the shard path exists, regardless of whether the content is a valid index or not.
+        for (ShardId shardId : request.getShardIds()) {
+            try {
+                var indexMetadata = clusterService.state().metadata().index(shardId.getIndex());
+                String customDataPath = null;
+                if (indexMetadata != null) {
+                    customDataPath = new IndexSettings(indexMetadata, settings).customDataPath();
+                } else {
+                    // The index is not known to this node. This shouldn't happen, but it can be safely ignored for this operation.
+                    logger.warn("node doesn't have metadata for the index [{}]", shardId.getIndex());
+                }
+                shardPath = ShardPath.loadShardPath(logger, nodeEnv, shardId, customDataPath);
+                if (shardPath != null) {
+                    localShards.add(shardId);
+                }
+            } catch (IOException e) {
+                final String path = shardPath != null ? shardPath.resolveIndex().toString() : "";
+                logger.error(() -> String.format(Locale.ROOT, "error loading shard path for shard [%s]", shardId), e);
+            }
+        }
+        return new NodePrevalidateShardPathResponse(transportService.getLocalNode(), localShards);
+    }
+}

+ 1 - 0
server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPrevalidateNodeRemovalAction.java

@@ -44,6 +44,7 @@ public class RestPrevalidateNodeRemovalAction extends BaseRestHandler {
             .setExternalIds(externalIds)
             .build();
         prevalidationRequest.masterNodeTimeout(request.paramAsTime("master_timeout", prevalidationRequest.masterNodeTimeout()));
+        prevalidationRequest.timeout(request.paramAsTime("timeout", prevalidationRequest.timeout()));
         return channel -> client.execute(
             PrevalidateNodeRemovalAction.INSTANCE,
             prevalidationRequest,

+ 36 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathRequestSerializationTests.java

@@ -0,0 +1,36 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.shutdown;
+
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathRequestSerializationTests.createSetMutation;
+
+public class NodePrevalidateShardPathRequestSerializationTests extends AbstractWireSerializingTestCase<NodePrevalidateShardPathRequest> {
+
+    @Override
+    protected Writeable.Reader<NodePrevalidateShardPathRequest> instanceReader() {
+        return NodePrevalidateShardPathRequest::new;
+    }
+
+    @Override
+    protected NodePrevalidateShardPathRequest createTestInstance() {
+        return new NodePrevalidateShardPathRequest(randomSet(0, 50, PrevalidateShardPathRequestSerializationTests::randomShardId));
+    }
+
+    @Override
+    protected NodePrevalidateShardPathRequest mutateInstance(NodePrevalidateShardPathRequest request) throws IOException {
+        return new NodePrevalidateShardPathRequest(
+            createSetMutation(request.getShardIds(), PrevalidateShardPathRequestSerializationTests::randomShardId)
+        );
+    }
+}

+ 53 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodePrevalidateShardPathResponseSerializationTests.java

@@ -0,0 +1,53 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.shutdown;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+
+import java.io.IOException;
+
+import static org.elasticsearch.action.admin.cluster.node.shutdown.PrevalidateShardPathRequestSerializationTests.createSetMutation;
+
+public class NodePrevalidateShardPathResponseSerializationTests extends AbstractWireSerializingTestCase<NodePrevalidateShardPathResponse> {
+
+    @Override
+    protected Writeable.Reader<NodePrevalidateShardPathResponse> instanceReader() {
+        return NodePrevalidateShardPathResponse::new;
+    }
+
+    @Override
+    protected NodePrevalidateShardPathResponse createTestInstance() {
+        return getRandomResponse();
+    }
+
+    public static NodePrevalidateShardPathResponse getRandomResponse() {
+        return new NodePrevalidateShardPathResponse(
+            getRandomNode(),
+            randomSet(0, 100, PrevalidateShardPathRequestSerializationTests::randomShardId)
+        );
+    }
+
+    public static DiscoveryNode getRandomNode() {
+        return new DiscoveryNode(randomAlphaOfLength(10), buildNewFakeTransportAddress(), Version.CURRENT);
+    }
+
+    @Override
+    protected NodePrevalidateShardPathResponse mutateInstance(NodePrevalidateShardPathResponse response) throws IOException {
+        if (randomBoolean()) {
+            return new NodePrevalidateShardPathResponse(getRandomNode(), response.getShardIds());
+        }
+        return new NodePrevalidateShardPathResponse(
+            response.getNode(),
+            createSetMutation(response.getShardIds(), PrevalidateShardPathRequestSerializationTests::randomShardId)
+        );
+    }
+}

+ 5 - 2
server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/NodesRemovalPrevalidationSerializationTests.java

@@ -87,7 +87,6 @@ public class NodesRemovalPrevalidationSerializationTests extends AbstractXConten
     public static NodesRemovalPrevalidation randomNodesRemovalPrevalidation() {
         int noOfNodes = randomIntBetween(1, 10);
         List<NodesRemovalPrevalidation.NodeResult> nodes = new ArrayList<>(noOfNodes);
-        NodesRemovalPrevalidation.Result result = randomResult();
         for (int i = 0; i < noOfNodes; i++) {
             nodes.add(
                 new NodesRemovalPrevalidation.NodeResult(
@@ -102,6 +101,10 @@ public class NodesRemovalPrevalidationSerializationTests extends AbstractXConten
     }
 
     private static NodesRemovalPrevalidation.Result randomResult() {
-        return new NodesRemovalPrevalidation.Result(randomBoolean(), randomAlphaOfLengthBetween(0, 1000));
+        return new NodesRemovalPrevalidation.Result(
+            randomBoolean(),
+            randomFrom(NodesRemovalPrevalidation.Reason.values()),
+            randomAlphaOfLengthBetween(0, 1000)
+        );
     }
 }

+ 84 - 0
server/src/test/java/org/elasticsearch/action/admin/cluster/node/shutdown/PrevalidateShardPathRequestSerializationTests.java

@@ -0,0 +1,84 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.shutdown;
+
+import org.elasticsearch.common.UUIDs;
+import org.elasticsearch.common.io.stream.Writeable;
+import org.elasticsearch.core.TimeValue;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.AbstractWireSerializingTestCase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+
+public class PrevalidateShardPathRequestSerializationTests extends AbstractWireSerializingTestCase<PrevalidateShardPathRequest> {
+
+    @Override
+    protected Writeable.Reader<PrevalidateShardPathRequest> instanceReader() {
+        return PrevalidateShardPathRequest::new;
+    }
+
+    @Override
+    protected PrevalidateShardPathRequest createTestInstance() {
+        Set<ShardId> shardIds = randomSet(0, 100, PrevalidateShardPathRequestSerializationTests::randomShardId);
+        String[] nodeIds = randomArray(1, 5, String[]::new, () -> randomAlphaOfLength(20));
+        PrevalidateShardPathRequest request = new PrevalidateShardPathRequest(shardIds, nodeIds);
+        return randomBoolean() ? request : request.timeout(randomTimeValue());
+    }
+
+    @Override
+    protected PrevalidateShardPathRequest mutateInstance(PrevalidateShardPathRequest request) throws IOException {
+        int i = randomInt(2);
+        return switch (i) {
+            case 0 -> new PrevalidateShardPathRequest(
+                createSetMutation(request.getShardIds(), PrevalidateShardPathRequestSerializationTests::randomShardId),
+                request.nodesIds()
+            ).timeout(request.timeout());
+            case 1 -> new PrevalidateShardPathRequest(
+                request.getShardIds(),
+                createArrayMutation(request.nodesIds(), () -> randomAlphaOfLength(20), String[]::new)
+            ).timeout(request.timeout());
+            case 2 -> new PrevalidateShardPathRequest(request.getShardIds(), request.nodesIds()).timeout(
+                randomValueOtherThan(request.timeout(), () -> new TimeValue(randomLongBetween(1000, 10000)))
+            );
+            default -> throw new IllegalStateException("unexpected value: " + i);
+        };
+    }
+
+    public static ShardId randomShardId() {
+        return new ShardId(randomAlphaOfLength(20), UUIDs.randomBase64UUID(), randomIntBetween(0, 25));
+    }
+
+    public static <T> void mutateList(List<T> list, Supplier<T> supplier) {
+        if (list.size() > 0 && randomBoolean()) {
+            // just remove one
+            list.remove(randomInt(list.size() - 1));
+        } else {
+            list.add(supplier.get());
+        }
+    }
+
+    public static <T> Set<T> createSetMutation(Set<T> set, Supplier<T> supplier) {
+        List<T> list = new ArrayList<>(set);
+        mutateList(list, supplier);
+        return new HashSet<>(list);
+    }
+
+    public static <T> T[] createArrayMutation(T[] array, Supplier<T> supplier, IntFunction<T[]> arrayConstructor) {
+        List<T> list = new ArrayList<>(Arrays.asList(array));
+        mutateList(list, supplier);
+        return list.toArray(arrayConstructor.apply(list.size()));
+    }
+}

+ 3 - 0
x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/PrevalidateNodeRemovalWithSearchableSnapshotIntegTests.java

@@ -71,6 +71,7 @@ public class PrevalidateNodeRemovalWithSearchableSnapshotIntegTests extends Base
         }
         PrevalidateNodeRemovalResponse resp = client().execute(PrevalidateNodeRemovalAction.INSTANCE, req.build()).get();
         assertTrue(resp.getPrevalidation().isSafe());
+        assertThat(resp.getPrevalidation().message(), equalTo("all red indices are searchable snapshot indices"));
         assertThat(resp.getPrevalidation().nodes().size(), equalTo(1));
         NodesRemovalPrevalidation.NodeResult nodeResult = resp.getPrevalidation().nodes().get(0);
         assertNotNull(nodeResult);
@@ -78,5 +79,7 @@ public class PrevalidateNodeRemovalWithSearchableSnapshotIntegTests extends Base
         assertThat(nodeResult.Id(), not(emptyString()));
         assertThat(nodeResult.externalId(), not(emptyString()));
         assertTrue(nodeResult.result().isSafe());
+        assertThat(nodeResult.result().reason(), equalTo(NodesRemovalPrevalidation.Reason.NO_RED_SHARDS_EXCEPT_SEARCHABLE_SNAPSHOTS));
+        assertThat(nodeResult.result().message(), equalTo(""));
     }
 }

+ 2 - 1
x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

@@ -488,6 +488,7 @@ public class Constants {
         "internal:cluster/master_history/get",
         "internal:cluster/coordination_diagnostics/info",
         "internal:cluster/formation/info",
-        "internal:gateway/local/started_shards"
+        "internal:gateway/local/started_shards",
+        "internal:admin/indices/prevalidate_shard_path"
     );
 }