|
|
@@ -21,7 +21,12 @@ package org.elasticsearch.action.admin.cluster.reroute;
|
|
|
|
|
|
import org.apache.logging.log4j.Logger;
|
|
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
|
|
+import org.elasticsearch.ExceptionsHelper;
|
|
|
import org.elasticsearch.action.ActionListener;
|
|
|
+import org.elasticsearch.action.ActionListenerResponseHandler;
|
|
|
+import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
|
|
|
+import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
|
|
|
+import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
|
|
|
import org.elasticsearch.action.support.ActionFilters;
|
|
|
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
|
|
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
|
|
@@ -29,14 +34,26 @@ import org.elasticsearch.cluster.ClusterState;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockException;
|
|
|
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
|
|
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
|
|
+import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
|
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
|
|
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
|
|
|
+import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
|
|
|
import org.elasticsearch.cluster.service.ClusterService;
|
|
|
import org.elasticsearch.common.Priority;
|
|
|
+import org.elasticsearch.common.Strings;
|
|
|
+import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
|
|
+import org.elasticsearch.common.collect.ImmutableOpenMap;
|
|
|
import org.elasticsearch.common.inject.Inject;
|
|
|
import org.elasticsearch.threadpool.ThreadPool;
|
|
|
import org.elasticsearch.transport.TransportService;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
public class TransportClusterRerouteAction extends TransportMasterNodeAction<ClusterRerouteRequest, ClusterRerouteResponse> {
|
|
|
|
|
|
private final AllocationService allocationService;
|
|
|
@@ -69,18 +86,71 @@ public class TransportClusterRerouteAction extends TransportMasterNodeAction<Clu
|
|
|
@Override
|
|
|
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state,
|
|
|
final ActionListener<ClusterRerouteResponse> listener) {
|
|
|
- ActionListener<ClusterRerouteResponse> logWrapper = ActionListener.wrap(
|
|
|
- response -> {
|
|
|
- if (request.dryRun() == false) {
|
|
|
- response.getExplanations().getYesDecisionMessages().forEach(logger::info);
|
|
|
- }
|
|
|
- listener.onResponse(response);
|
|
|
- },
|
|
|
- listener::onFailure
|
|
|
- );
|
|
|
-
|
|
|
- clusterService.submitStateUpdateTask("cluster_reroute (api)", new ClusterRerouteResponseAckedClusterStateUpdateTask(logger,
|
|
|
- allocationService, request, logWrapper));
|
|
|
+ Map<String, List<AbstractAllocateAllocationCommand>> stalePrimaryAllocations = new HashMap<>();
|
|
|
+ for (AllocationCommand command : request.getCommands().commands()) {
|
|
|
+ if (command instanceof AllocateStalePrimaryAllocationCommand) {
|
|
|
+ final AllocateStalePrimaryAllocationCommand cmd = (AllocateStalePrimaryAllocationCommand) command;
|
|
|
+ stalePrimaryAllocations.computeIfAbsent(cmd.index(), k -> new ArrayList<>()).add(cmd);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (stalePrimaryAllocations.isEmpty()) {
|
|
|
+ submitStateUpdate(request, listener);
|
|
|
+ } else {
|
|
|
+ verifyThenSubmitUpdate(request, listener, stalePrimaryAllocations);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void verifyThenSubmitUpdate(ClusterRerouteRequest request, ActionListener<ClusterRerouteResponse> listener,
|
|
|
+ Map<String, List<AbstractAllocateAllocationCommand>> stalePrimaryAllocations) {
|
|
|
+ transportService.sendRequest(transportService.getLocalNode(), IndicesShardStoresAction.NAME,
|
|
|
+ new IndicesShardStoresRequest().indices(stalePrimaryAllocations.keySet().toArray(Strings.EMPTY_ARRAY)),
|
|
|
+ new ActionListenerResponseHandler<>(
|
|
|
+ ActionListener.wrap(
|
|
|
+ response -> {
|
|
|
+ ImmutableOpenMap<String, ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>>> status =
|
|
|
+ response.getStoreStatuses();
|
|
|
+ Exception e = null;
|
|
|
+ for (Map.Entry<String, List<AbstractAllocateAllocationCommand>> entry : stalePrimaryAllocations.entrySet()) {
|
|
|
+ final String index = entry.getKey();
|
|
|
+ final ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> indexStatus = status.get(index);
|
|
|
+ assert indexStatus != null;
|
|
|
+ for (AbstractAllocateAllocationCommand command : entry.getValue()) {
|
|
|
+ final List<IndicesShardStoresResponse.StoreStatus> shardStatus =
|
|
|
+ indexStatus.get(command.shardId());
|
|
|
+ if (shardStatus == null || shardStatus.isEmpty()) {
|
|
|
+ e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException(
|
|
|
+ "No data for shard [" + command.shardId() + "] of index [" + index + "] found on any node")
|
|
|
+ );
|
|
|
+ } else if (shardStatus.stream().noneMatch(storeStatus -> {
|
|
|
+ final DiscoveryNode node = storeStatus.getNode();
|
|
|
+ final String nodeInCommand = command.node();
|
|
|
+ return nodeInCommand.equals(node.getName()) || nodeInCommand.equals(node.getId());
|
|
|
+ })) {
|
|
|
+ e = ExceptionsHelper.useOrSuppress(e, new IllegalArgumentException(
|
|
|
+ "No data for shard [" + command.shardId() + "] of index [" + index + "] found on node ["
|
|
|
+ + command.node() + ']'));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (e == null) {
|
|
|
+ submitStateUpdate(request, listener);
|
|
|
+ } else {
|
|
|
+ listener.onFailure(e);
|
|
|
+ }
|
|
|
+ }, listener::onFailure
|
|
|
+ ), IndicesShardStoresResponse::new));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void submitStateUpdate(final ClusterRerouteRequest request, final ActionListener<ClusterRerouteResponse> listener) {
|
|
|
+ clusterService.submitStateUpdateTask("cluster_reroute (api)",
|
|
|
+ new ClusterRerouteResponseAckedClusterStateUpdateTask(logger, allocationService, request,
|
|
|
+ ActionListener.wrap(
|
|
|
+ response -> {
|
|
|
+ if (request.dryRun() == false) {
|
|
|
+ response.getExplanations().getYesDecisionMessages().forEach(logger::info);
|
|
|
+ }
|
|
|
+ listener.onResponse(response);
|
|
|
+ }, listener::onFailure)));
|
|
|
}
|
|
|
|
|
|
static class ClusterRerouteResponseAckedClusterStateUpdateTask extends AckedClusterStateUpdateTask<ClusterRerouteResponse> {
|