|
@@ -49,7 +49,6 @@ import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
import java.util.Objects;
|
|
|
-import java.util.Optional;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.stream.Collectors;
|
|
@@ -254,11 +253,20 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
|
|
|
int totalRemainingShards = relocatingShards + startedShards + initializingShards;
|
|
|
|
|
|
// If there's relocating shards, or no shards on this node, we'll just use the number of shards left to move
|
|
|
- if (relocatingShards > 0 || totalRemainingShards == 0) {
|
|
|
- SingleNodeShutdownMetadata.Status shardStatus = totalRemainingShards == 0
|
|
|
- ? SingleNodeShutdownMetadata.Status.COMPLETE
|
|
|
- : SingleNodeShutdownMetadata.Status.IN_PROGRESS;
|
|
|
- return new ShutdownShardMigrationStatus(shardStatus, startedShards, relocatingShards, initializingShards);
|
|
|
+ if (totalRemainingShards == 0) {
|
|
|
+ return new ShutdownShardMigrationStatus(
|
|
|
+ SingleNodeShutdownMetadata.Status.COMPLETE,
|
|
|
+ startedShards,
|
|
|
+ relocatingShards,
|
|
|
+ initializingShards
|
|
|
+ );
|
|
|
+ } else if (relocatingShards > 0) {
|
|
|
+ return new ShutdownShardMigrationStatus(
|
|
|
+ SingleNodeShutdownMetadata.Status.IN_PROGRESS,
|
|
|
+ startedShards,
|
|
|
+ relocatingShards,
|
|
|
+ initializingShards
|
|
|
+ );
|
|
|
} else if (initializingShards > 0 && relocatingShards == 0 && startedShards == 0) {
|
|
|
// If there's only initializing shards left, return now with a note that only initializing shards are left
|
|
|
return new ShutdownShardMigrationStatus(
|
|
@@ -270,11 +278,8 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
|
|
|
);
|
|
|
}
|
|
|
|
|
|
- // If there's no relocating shards and shards still on this node, we need to figure out why
|
|
|
- AtomicInteger shardsToIgnoreForFinalStatus = new AtomicInteger(0);
|
|
|
-
|
|
|
- // Explain shard allocations until we find one that can't move, then stop (as `findFirst` short-circuits)
|
|
|
- Optional<Tuple<ShardRouting, ShardAllocationDecision>> unmovableShard = currentState.getRoutingNodes()
|
|
|
+ // Get all shard explanations
|
|
|
+ var unmovableShards = currentState.getRoutingNodes()
|
|
|
.node(nodeId)
|
|
|
.shardsWithState(ShardRoutingState.STARTED)
|
|
|
.peek(s -> cancellableTask.ensureNotCancelled())
|
|
@@ -285,10 +290,16 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
|
|
|
: "shard [" + pair + "] can remain on node [" + nodeId + "], but that node is shutting down";
|
|
|
return pair.v2().getMoveDecision().canRemain() == false;
|
|
|
})
|
|
|
- // It's okay if some are throttled, they'll move eventually
|
|
|
- .filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.THROTTLED) == false)
|
|
|
// These shards will move as soon as possible
|
|
|
.filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.YES) == false)
|
|
|
+ .toList();
|
|
|
+
|
|
|
+ // If there's no relocating shards and shards still on this node, we need to figure out why
|
|
|
+ AtomicInteger shardsToIgnoreForFinalStatus = new AtomicInteger(0);
|
|
|
+
|
|
|
+ // Find first one that can not move permanently
|
|
|
+ var unmovableShard = unmovableShards.stream()
|
|
|
+ .filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.THROTTLED) == false)
|
|
|
// If the shard that can't move is on every node in the cluster, we shouldn't be `STALLED` on it.
|
|
|
.filter(pair -> {
|
|
|
final boolean hasShardCopyOnOtherNode = hasShardCopyOnAnotherNode(currentState, pair.v1(), shuttingDownNodes);
|
|
@@ -312,6 +323,10 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
|
|
|
)
|
|
|
.findFirst();
|
|
|
|
|
|
+ var temporarilyUnmovableShards = unmovableShards.stream()
|
|
|
+ .filter(pair -> pair.v2().getMoveDecision().getAllocationDecision().equals(AllocationDecision.THROTTLED))
|
|
|
+ .toList();
|
|
|
+
|
|
|
if (totalRemainingShards == shardsToIgnoreForFinalStatus.get() && unmovableShard.isEmpty()) {
|
|
|
return new ShutdownShardMigrationStatus(
|
|
|
SingleNodeShutdownMetadata.Status.COMPLETE,
|
|
@@ -338,14 +353,38 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
|
|
|
),
|
|
|
decision
|
|
|
);
|
|
|
- } else {
|
|
|
- return new ShutdownShardMigrationStatus(
|
|
|
- SingleNodeShutdownMetadata.Status.IN_PROGRESS,
|
|
|
- startedShards,
|
|
|
- relocatingShards,
|
|
|
- initializingShards
|
|
|
- );
|
|
|
- }
|
|
|
+ } else if (relocatingShards == 0
|
|
|
+ && initializingShards == 0
|
|
|
+ && startedShards > 0
|
|
|
+ && temporarilyUnmovableShards.size() == startedShards) {
|
|
|
+ // We found a shard that can't be moved temporarily,
|
|
|
+ // report it so that the cause of the throttling could be addressed if it is taking significant time
|
|
|
+ ShardRouting shardRouting = temporarilyUnmovableShards.get(0).v1();
|
|
|
+ ShardAllocationDecision decision = temporarilyUnmovableShards.get(0).v2();
|
|
|
+
|
|
|
+ return new ShutdownShardMigrationStatus(
|
|
|
+ SingleNodeShutdownMetadata.Status.IN_PROGRESS,
|
|
|
+ startedShards,
|
|
|
+ relocatingShards,
|
|
|
+ initializingShards,
|
|
|
+ format(
|
|
|
+ "shard [%s] [%s] of index [%s] is waiting to be moved, see [%s] "
|
|
|
+ + "for details or use the cluster allocation explain API",
|
|
|
+ shardRouting.shardId().getId(),
|
|
|
+ shardRouting.primary() ? "primary" : "replica",
|
|
|
+ shardRouting.index().getName(),
|
|
|
+ NODE_ALLOCATION_DECISION_KEY
|
|
|
+ ),
|
|
|
+ decision
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ return new ShutdownShardMigrationStatus(
|
|
|
+ SingleNodeShutdownMetadata.Status.IN_PROGRESS,
|
|
|
+ startedShards,
|
|
|
+ relocatingShards,
|
|
|
+ initializingShards
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private static boolean isIlmRestrictingShardMovement(ClusterState currentState, ShardRouting pair) {
|
|
@@ -373,9 +412,8 @@ public class TransportGetShutdownStatusAction extends TransportMasterNodeAction<
|
|
|
|
|
|
private static boolean hasShardCopyOnAnotherNode(ClusterState clusterState, ShardRouting shardRouting, Set<String> shuttingDownNodes) {
|
|
|
return clusterState.routingTable()
|
|
|
- .allShards(shardRouting.index().getName())
|
|
|
- .stream()
|
|
|
- .filter(sr -> sr.id() == shardRouting.id())
|
|
|
+ .shardRoutingTable(shardRouting.shardId())
|
|
|
+ .allShards()
|
|
|
.filter(sr -> sr.role().equals(shardRouting.role()))
|
|
|
// If any shards are both 1) `STARTED` and 2) are not on a node that's shutting down, we have at least one copy
|
|
|
// of this shard safely on a node that's not shutting down, so we don't want to report `STALLED` because of this shard.
|