Browse Source

shards allocation health indicator services (#83513)

Add a health indicator implementations that checks shards status 
and report their health status based on availability
Ievgen Degtiarenko 3 years ago
parent
commit
8d637f588f

+ 5 - 0
docs/changelog/83513.yaml

@@ -0,0 +1,5 @@
+pr: 83513
+summary: Shards allocation health indicator services
+area: Health
+type: enhancement
+issues: []

+ 3 - 0
docs/reference/index-modules.asciidoc

@@ -163,6 +163,9 @@ specific index module:
 
     The number of replicas each primary shard has. Defaults to 1.
 
+    WARNING: Configuring it to 0 may lead to temporary availability loss
+    during node restarts or permanent data loss in case of data corruption.
+
 [[dynamic-index-auto-expand-replicas]]
 `index.auto_expand_replicas`::
 Auto-expand the number of replicas based on the number of data nodes in the

+ 5 - 4
server/src/main/java/org/elasticsearch/cluster/metadata/NodesShutdownMetadata.java

@@ -41,6 +41,7 @@ import java.util.stream.Collectors;
 public class NodesShutdownMetadata implements Metadata.Custom {
     public static final String TYPE = "node_shutdown";
     public static final Version NODE_SHUTDOWN_VERSION = Version.V_7_13_0;
+    public static final NodesShutdownMetadata EMPTY = new NodesShutdownMetadata(Map.of());
 
     private static final ParseField NODES_FIELD = new ParseField("nodes");
 
@@ -70,17 +71,17 @@ public class NodesShutdownMetadata implements Metadata.Custom {
 
     public static Optional<NodesShutdownMetadata> getShutdowns(final ClusterState state) {
         assert state != null : "cluster state should never be null";
-        return Optional.ofNullable(state).map(ClusterState::metadata).map(m -> m.custom(TYPE));
+        return Optional.of(state).map(ClusterState::metadata).map(m -> m.custom(TYPE));
     }
 
     private final Map<String, SingleNodeShutdownMetadata> nodes;
 
     public NodesShutdownMetadata(Map<String, SingleNodeShutdownMetadata> nodes) {
-        this.nodes = nodes;
+        this.nodes = Collections.unmodifiableMap(nodes);
     }
 
     public NodesShutdownMetadata(StreamInput in) throws IOException {
-        this.nodes = in.readMap(StreamInput::readString, SingleNodeShutdownMetadata::new);
+        this(in.readMap(StreamInput::readString, SingleNodeShutdownMetadata::new));
     }
 
     @Override
@@ -92,7 +93,7 @@ public class NodesShutdownMetadata implements Metadata.Custom {
      * @return A map of NodeID to shutdown metadata.
      */
     public Map<String, SingleNodeShutdownMetadata> getAllNodeMetadataMap() {
-        return Collections.unmodifiableMap(nodes);
+        return nodes;
     }
 
     /**

+ 210 - 0
server/src/main/java/org/elasticsearch/cluster/routing/allocation/ShardsAvailabilityHealthIndicatorService.java

@@ -0,0 +1,210 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
+import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.health.HealthIndicatorResult;
+import org.elasticsearch.health.HealthIndicatorService;
+import org.elasticsearch.health.HealthStatus;
+import org.elasticsearch.health.SimpleHealthIndicatorDetails;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static java.util.stream.Collectors.joining;
+import static org.elasticsearch.cluster.health.ClusterShardHealth.getInactivePrimaryHealth;
+import static org.elasticsearch.health.HealthStatus.GREEN;
+import static org.elasticsearch.health.HealthStatus.RED;
+import static org.elasticsearch.health.HealthStatus.YELLOW;
+import static org.elasticsearch.health.ServerHealthComponents.DATA;
+
+/**
+ * This indicator reports health for shards.
+ * <p>
+ * Indicator will report:
+ * * RED when one or more primary shards are not available
+ * * YELLOW when one or more replica shards are not available
+ * * GREEN otherwise
+ * <p>
+ * Each shard needs to be available and replicated in order to guarantee high availability and prevent data loses.
+ * Shards allocated on nodes scheduled for restart (using nodes shutdown API) will not degrade this indicator health.
+ */
+public class ShardsAvailabilityHealthIndicatorService implements HealthIndicatorService {
+
+    public static final String NAME = "shards_availability";
+
+    private final ClusterService clusterService;
+
+    public ShardsAvailabilityHealthIndicatorService(ClusterService clusterService) {
+        this.clusterService = clusterService;
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    @Override
+    public String component() {
+        return DATA;
+    }
+
+    @Override
+    public HealthIndicatorResult calculate() {
+        var state = clusterService.state();
+        var shutdown = state.getMetadata().custom(NodesShutdownMetadata.TYPE, NodesShutdownMetadata.EMPTY);
+        var status = new ShardAllocationStatus();
+
+        for (IndexRoutingTable indexShardRouting : state.routingTable()) {
+            for (IndexShardRoutingTable shardRouting : indexShardRouting) {
+                status.addPrimary(shardRouting.primaryShard(), shutdown);
+                for (ShardRouting replicaShard : shardRouting.replicaShards()) {
+                    status.addReplica(replicaShard, shutdown);
+                }
+            }
+        }
+
+        return createIndicator(status.getStatus(), status.getSummary(), status.getDetails());
+    }
+
+    private static class ShardAllocationCounts {
+        private boolean available = true;
+        private int unassigned = 0;
+        private int unassigned_new = 0;
+        private int unassigned_restarting = 0;
+        private int initializing = 0;
+        private int started = 0;
+        private int relocating = 0;
+
+        public void increment(ShardRouting routing, NodesShutdownMetadata shutdowns) {
+            boolean isNew = isUnassignedDueToNewInitialization(routing);
+            boolean isRestarting = isUnassignedDueToTimelyRestart(routing, shutdowns);
+            available &= routing.active() || isRestarting || isNew;
+
+            switch (routing.state()) {
+                case UNASSIGNED -> {
+                    if (isNew) {
+                        unassigned_new++;
+                    } else if (isRestarting) {
+                        unassigned_restarting++;
+                    } else {
+                        unassigned++;
+                    }
+                }
+                case INITIALIZING -> initializing++;
+                case STARTED -> started++;
+                case RELOCATING -> relocating++;
+            }
+        }
+    }
+
+    private static boolean isUnassignedDueToTimelyRestart(ShardRouting routing, NodesShutdownMetadata shutdowns) {
+        var info = routing.unassignedInfo();
+        if (info == null || info.getReason() != UnassignedInfo.Reason.NODE_RESTARTING) {
+            return false;
+        }
+        var shutdown = shutdowns.getAllNodeMetadataMap().get(info.getLastAllocatedNodeId());
+        if (shutdown == null || shutdown.getType() != SingleNodeShutdownMetadata.Type.RESTART) {
+            return false;
+        }
+        var now = System.nanoTime();
+        var restartingAllocationDelayExpiration = info.getUnassignedTimeInNanos() + shutdown.getAllocationDelay().nanos();
+        return now <= restartingAllocationDelayExpiration;
+    }
+
+    private static boolean isUnassignedDueToNewInitialization(ShardRouting routing) {
+        return routing.primary() && routing.active() == false && getInactivePrimaryHealth(routing) == ClusterHealthStatus.YELLOW;
+    }
+
+    private static class ShardAllocationStatus {
+        private final ShardAllocationCounts primaries = new ShardAllocationCounts();
+        private final ShardAllocationCounts replicas = new ShardAllocationCounts();
+
+        public void addPrimary(ShardRouting routing, NodesShutdownMetadata shutdowns) {
+            primaries.increment(routing, shutdowns);
+        }
+
+        public void addReplica(ShardRouting routing, NodesShutdownMetadata shutdowns) {
+            replicas.increment(routing, shutdowns);
+        }
+
+        public HealthStatus getStatus() {
+            if (primaries.available == false) {
+                return RED;
+            } else if (replicas.available == false) {
+                return YELLOW;
+            } else {
+                return GREEN;
+            }
+        }
+
+        public String getSummary() {
+            var builder = new StringBuilder("This cluster has ");
+            if (primaries.unassigned > 0
+                || primaries.unassigned_new > 0
+                || primaries.unassigned_restarting > 0
+                || replicas.unassigned > 0
+                || replicas.unassigned_restarting > 0) {
+                builder.append(
+                    Stream.of(
+                        createMessage(primaries.unassigned, "unavailable primary", " unavailable primaries"),
+                        createMessage(primaries.unassigned_new, "creating primary", " creating primaries"),
+                        createMessage(primaries.unassigned_restarting, "restarting primary", " restarting primaries"),
+                        createMessage(replicas.unassigned, "unavailable replica", "unavailable replicas"),
+                        createMessage(replicas.unassigned_restarting, "restarting replica", "restarting replicas")
+                    ).flatMap(Function.identity()).collect(joining(" , "))
+                ).append(".");
+            } else {
+                builder.append("all shards available.");
+            }
+            return builder.toString();
+        }
+
+        private static Stream<String> createMessage(int count, String singular, String plural) {
+            return switch (count) {
+                case 0 -> Stream.empty();
+                case 1 -> Stream.of("1 " + singular);
+                default -> Stream.of(count + " " + plural);
+            };
+        }
+
+        public SimpleHealthIndicatorDetails getDetails() {
+            return new SimpleHealthIndicatorDetails(
+                Map.of(
+                    "unassigned_primaries",
+                    primaries.unassigned,
+                    "initializing_primaries",
+                    primaries.initializing,
+                    "creating_primaries",
+                    primaries.unassigned_new,
+                    "restarting_primaries",
+                    primaries.unassigned_restarting,
+                    "started_primaries",
+                    primaries.started + primaries.relocating,
+                    "unassigned_replicas",
+                    replicas.unassigned,
+                    "initializing_replicas",
+                    replicas.initializing,
+                    "restarting_replicas",
+                    replicas.unassigned_restarting,
+                    "started_replicas",
+                    replicas.started + replicas.relocating
+                )
+            );
+        }
+    }
+}

+ 3 - 1
server/src/main/java/org/elasticsearch/node/Node.java

@@ -57,6 +57,7 @@ import org.elasticsearch.cluster.node.DiscoveryNodeRole;
 import org.elasticsearch.cluster.routing.BatchedRerouteService;
 import org.elasticsearch.cluster.routing.RerouteService;
 import org.elasticsearch.cluster.routing.allocation.DiskThresholdMonitor;
+import org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService;
 import org.elasticsearch.cluster.service.ClusterService;
 import org.elasticsearch.common.StopWatch;
 import org.elasticsearch.common.breaker.CircuitBreaker;
@@ -1043,7 +1044,8 @@ public class Node implements Closeable {
     private HealthService createHealthService(ClusterService clusterService) {
         var serverHealthIndicatorServices = List.of(
             new InstanceHasMasterHealthIndicatorService(clusterService),
-            new RepositoryIntegrityHealthIndicatorService(clusterService)
+            new RepositoryIntegrityHealthIndicatorService(clusterService),
+            new ShardsAvailabilityHealthIndicatorService(clusterService)
         );
         var pluginHealthIndicatorServices = pluginsService.filterPlugins(HealthPlugin.class)
             .stream()

+ 352 - 0
server/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardsAvailabilityHealthIndicatorServiceTests.java

@@ -0,0 +1,352 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0 and the Server Side Public License, v 1; you may not use this file except
+ * in compliance with, at your election, the Elastic License 2.0 or the Server
+ * Side Public License, v 1.
+ */
+
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.Metadata;
+import org.elasticsearch.cluster.metadata.NodesShutdownMetadata;
+import org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata;
+import org.elasticsearch.cluster.routing.IndexRoutingTable;
+import org.elasticsearch.cluster.routing.RecoverySource;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.health.HealthIndicatorResult;
+import org.elasticsearch.health.HealthStatus;
+import org.elasticsearch.health.SimpleHealthIndicatorDetails;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.test.ESTestCase;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import static java.util.stream.Collectors.toMap;
+import static org.elasticsearch.cluster.metadata.SingleNodeShutdownMetadata.Type.RESTART;
+import static org.elasticsearch.cluster.routing.ShardRouting.newUnassigned;
+import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorService.NAME;
+import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.AVAILABLE;
+import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.INITIALIZING;
+import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.RESTARTING;
+import static org.elasticsearch.cluster.routing.allocation.ShardsAvailabilityHealthIndicatorServiceTests.ShardState.UNAVAILABLE;
+import static org.elasticsearch.common.util.CollectionUtils.concatLists;
+import static org.elasticsearch.core.TimeValue.timeValueSeconds;
+import static org.elasticsearch.health.HealthStatus.GREEN;
+import static org.elasticsearch.health.HealthStatus.RED;
+import static org.elasticsearch.health.HealthStatus.YELLOW;
+import static org.elasticsearch.health.ServerHealthComponents.DATA;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ShardsAvailabilityHealthIndicatorServiceTests extends ESTestCase {
+
+    public void testShouldBeGreenWhenAllPrimariesAndReplicasAreStarted() {
+        var clusterState = createClusterStateWith(
+            List.of(
+                index("replicated-index", new ShardAllocation(randomNodeId(), AVAILABLE), new ShardAllocation(randomNodeId(), AVAILABLE)),
+                index("unreplicated-index", new ShardAllocation(randomNodeId(), AVAILABLE))
+            ),
+            List.of()
+        );
+        var service = createAllocationHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(
+                createExpectedResult(GREEN, "This cluster has all shards available.", Map.of("started_primaries", 2, "started_replicas", 1))
+            )
+        );
+    }
+
+    public void testShouldBeYellowWhenThereAreUnassignedReplicas() {
+        var availableReplicas = randomList(0, 5, () -> new ShardAllocation(randomNodeId(), AVAILABLE));
+        var unavailableReplicas = randomList(1, 5, () -> new ShardAllocation(randomNodeId(), UNAVAILABLE));
+
+        var clusterState = createClusterStateWith(
+            List.of(
+                index(
+                    "yellow-index",
+                    new ShardAllocation(randomNodeId(), AVAILABLE),
+                    concatLists(availableReplicas, unavailableReplicas).toArray(ShardAllocation[]::new)
+                )
+            ),
+            List.of()
+        );
+        var service = createAllocationHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(
+                createExpectedResult(
+                    YELLOW,
+                    unavailableReplicas.size() > 1
+                        ? "This cluster has " + unavailableReplicas.size() + " unavailable replicas."
+                        : "This cluster has 1 unavailable replica.",
+                    Map.of(
+                        "started_primaries",
+                        1,
+                        "unassigned_replicas",
+                        unavailableReplicas.size(),
+                        "started_replicas",
+                        availableReplicas.size()
+                    )
+                )
+            )
+        );
+    }
+
+    public void testShouldBeRedWhenThereAreUnassignedPrimaries() {
+        var clusterState = createClusterStateWith(List.of(index("red-index", new ShardAllocation(randomNodeId(), UNAVAILABLE))), List.of());
+        var service = createAllocationHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(createExpectedResult(RED, "This cluster has 1 unavailable primary.", Map.of("unassigned_primaries", 1)))
+        );
+    }
+
+    public void testShouldBeGreenWhenThereAreRestartingReplicas() {
+        var clusterState = createClusterStateWith(
+            List.of(
+                index(
+                    "restarting-index",
+                    new ShardAllocation(randomNodeId(), AVAILABLE),
+                    new ShardAllocation("node-0", RESTARTING, System.nanoTime())
+                )
+            ),
+            List.of(new NodeShutdown("node-0", RESTART, 60))
+        );
+        var service = createAllocationHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(
+                createExpectedResult(
+                    GREEN,
+                    "This cluster has 1 restarting replica.",
+                    Map.of("started_primaries", 1, "restarting_replicas", 1)
+                )
+            )
+        );
+    }
+
+    public void testShouldBeYellowWhenRestartingReplicasReachedAllocationDelay() {
+        var clusterState = createClusterStateWith(
+            List.of(
+                index(
+                    "restarting-index",
+                    new ShardAllocation(randomNodeId(), AVAILABLE),
+                    new ShardAllocation("node-0", RESTARTING, System.nanoTime() - timeValueSeconds(between(60, 180)).nanos())
+                )
+            ),
+            List.of(new NodeShutdown("node-0", RESTART, 60))
+        );
+        var service = createAllocationHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(
+                createExpectedResult(
+                    YELLOW,
+                    "This cluster has 1 unavailable replica.",
+                    Map.of("started_primaries", 1, "unassigned_replicas", 1)
+                )
+            )
+        );
+    }
+
+    public void testShouldBeGreenWhenThereAreInitializingPrimaries() {
+        var clusterState = createClusterStateWith(
+            List.of(index("restarting-index", new ShardAllocation("node-0", INITIALIZING))),
+            List.of()
+        );
+        var service = createAllocationHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(createExpectedResult(GREEN, "This cluster has 1 creating primary.", Map.of("creating_primaries", 1)))
+        );
+    }
+
+    public void testShouldBeGreenWhenThereAreRestartingPrimaries() {
+        var clusterState = createClusterStateWith(
+            List.of(index("restarting-index", new ShardAllocation("node-0", RESTARTING, System.nanoTime()))),
+            List.of(new NodeShutdown("node-0", RESTART, 60))
+        );
+        var service = createAllocationHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(createExpectedResult(GREEN, "This cluster has 1 restarting primary.", Map.of("restarting_primaries", 1)))
+        );
+    }
+
+    public void testShouldBeRedWhenRestartingPrimariesReachedAllocationDelay() {
+        var clusterState = createClusterStateWith(
+            List.of(
+                index(
+                    "restarting-index",
+                    new ShardAllocation("node-0", RESTARTING, System.nanoTime() - timeValueSeconds(between(60, 120)).nanos())
+                )
+            ),
+            List.of(new NodeShutdown("node-0", RESTART, 60))
+        );
+        var service = createAllocationHealthIndicatorService(clusterState);
+
+        assertThat(
+            service.calculate(),
+            equalTo(createExpectedResult(RED, "This cluster has 1 unavailable primary.", Map.of("unassigned_primaries", 1)))
+        );
+    }
+
+    private HealthIndicatorResult createExpectedResult(HealthStatus status, String summary, Map<String, Object> details) {
+        return new HealthIndicatorResult(NAME, DATA, status, summary, new SimpleHealthIndicatorDetails(addDefaults(details)));
+    }
+
+    private static ClusterState createClusterStateWith(List<IndexRoutingTable> indexes, List<NodeShutdown> nodeShutdowns) {
+        var builder = RoutingTable.builder();
+        for (IndexRoutingTable index : indexes) {
+            builder.add(index);
+        }
+
+        var nodesShutdownMetadata = new NodesShutdownMetadata(
+            nodeShutdowns.stream()
+                .collect(
+                    toMap(
+                        it -> it.nodeId,
+                        it -> SingleNodeShutdownMetadata.builder()
+                            .setNodeId(it.nodeId)
+                            .setType(it.type)
+                            .setReason("test")
+                            .setNodeSeen(true)
+                            .setStartedAtMillis(System.currentTimeMillis())
+                            .setAllocationDelay(it.allocationDelaySeconds != null ? timeValueSeconds(it.allocationDelaySeconds) : null)
+                            .build()
+                    )
+                )
+        );
+
+        return ClusterState.builder(new ClusterName("test-cluster"))
+            .routingTable(builder.build())
+            .metadata(Metadata.builder().putCustom(NodesShutdownMetadata.TYPE, nodesShutdownMetadata).build())
+            .build();
+    }
+
+    private static Map<String, Object> addDefaults(Map<String, Object> override) {
+        return Map.of(
+            "unassigned_primaries",
+            override.getOrDefault("unassigned_primaries", 0),
+            "initializing_primaries",
+            override.getOrDefault("initializing_primaries", 0),
+            "creating_primaries",
+            override.getOrDefault("creating_primaries", 0),
+            "restarting_primaries",
+            override.getOrDefault("restarting_primaries", 0),
+            "started_primaries",
+            override.getOrDefault("started_primaries", 0),
+            "unassigned_replicas",
+            override.getOrDefault("unassigned_replicas", 0),
+            "initializing_replicas",
+            override.getOrDefault("initializing_replicas", 0),
+            "restarting_replicas",
+            override.getOrDefault("restarting_replicas", 0),
+            "started_replicas",
+            override.getOrDefault("started_replicas", 0)
+        );
+    }
+
+    private static IndexRoutingTable index(String name, ShardAllocation primaryState, ShardAllocation... replicaStates) {
+        var index = new Index(name, UUID.randomUUID().toString());
+        var shardId = new ShardId(index, 0);
+
+        var builder = IndexRoutingTable.builder(index);
+        builder.addShard(createShardRouting(shardId, true, primaryState));
+        for (var replicaState : replicaStates) {
+            builder.addShard(createShardRouting(shardId, false, replicaState));
+        }
+        return builder.build();
+    }
+
+    private static ShardRouting createShardRouting(ShardId shardId, boolean primary, ShardAllocation allocation) {
+        var routing = newUnassigned(
+            shardId,
+            primary,
+            getSource(primary, allocation.state),
+            new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null)
+        );
+        if (allocation.state == UNAVAILABLE || allocation.state == INITIALIZING) {
+            return routing;
+        }
+        routing = routing.initialize(allocation.nodeId, null, 0);
+        routing = routing.moveToStarted();
+        if (allocation.state == AVAILABLE) {
+            return routing;
+        }
+        routing = routing.moveToUnassigned(
+            new UnassignedInfo(
+                UnassignedInfo.Reason.NODE_RESTARTING,
+                null,
+                null,
+                -1,
+                allocation.unassignedTimeNanos != null ? allocation.unassignedTimeNanos : 0,
+                0,
+                false,
+                UnassignedInfo.AllocationStatus.DELAYED_ALLOCATION,
+                Set.of(),
+                allocation.nodeId
+            )
+        );
+        if (allocation.state == RESTARTING) {
+            return routing;
+        }
+
+        throw new AssertionError("Unexpected state [" + allocation.state + "]");
+    }
+
+    private static RecoverySource getSource(boolean primary, ShardState state) {
+        if (primary) {
+            return state == INITIALIZING
+                ? RecoverySource.EmptyStoreRecoverySource.INSTANCE
+                : RecoverySource.ExistingStoreRecoverySource.INSTANCE;
+        } else {
+            return RecoverySource.PeerRecoverySource.INSTANCE;
+        }
+    }
+
+    public enum ShardState {
+        UNAVAILABLE,
+        INITIALIZING,
+        AVAILABLE,
+        RESTARTING
+    }
+
+    private record ShardAllocation(String nodeId, ShardState state, Long unassignedTimeNanos) {
+
+        ShardAllocation(String nodeId, ShardState state) {
+            this(nodeId, state, null);
+        }
+    }
+
+    private record NodeShutdown(String nodeId, SingleNodeShutdownMetadata.Type type, Integer allocationDelaySeconds) {}
+
+    private static String randomNodeId() {
+        return UUID.randomUUID().toString();
+    }
+
+    private static ShardsAvailabilityHealthIndicatorService createAllocationHealthIndicatorService(ClusterState clusterState) {
+        var clusterService = mock(ClusterService.class);
+        when(clusterService.state()).thenReturn(clusterState);
+        return new ShardsAvailabilityHealthIndicatorService(clusterService);
+    }
+}

+ 1 - 6
x-pack/plugin/shutdown/src/main/java/org/elasticsearch/xpack/shutdown/TransportPutShutdownNodeAction.java

@@ -31,8 +31,6 @@ import org.elasticsearch.tasks.Task;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.elasticsearch.transport.TransportService;
 
-import java.util.HashMap;
-
 public class TransportPutShutdownNodeAction extends AcknowledgedTransportMasterNodeAction<PutShutdownNodeAction.Request> {
     private static final Logger logger = LogManager.getLogger(TransportPutShutdownNodeAction.class);
 
@@ -66,10 +64,7 @@ public class TransportPutShutdownNodeAction extends AcknowledgedTransportMasterN
         clusterService.submitStateUpdateTask("put-node-shutdown-" + request.getNodeId(), new ClusterStateUpdateTask() {
             @Override
             public ClusterState execute(ClusterState currentState) {
-                NodesShutdownMetadata currentShutdownMetadata = currentState.metadata().custom(NodesShutdownMetadata.TYPE);
-                if (currentShutdownMetadata == null) {
-                    currentShutdownMetadata = new NodesShutdownMetadata(new HashMap<>());
-                }
+                var currentShutdownMetadata = currentState.metadata().custom(NodesShutdownMetadata.TYPE, NodesShutdownMetadata.EMPTY);
 
                 final boolean nodeSeen = currentState.getNodes().nodeExists(request.getNodeId());