Browse Source

Extend reroute with an option to force assign stale primary shard copies

Closes #15708
Yannick Welsch 9 years ago
parent
commit
d5b691b68e
16 changed files with 967 additions and 309 deletions
  1. 1 1
      core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java
  2. 10 0
      core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
  3. 1 1
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java
  4. 240 0
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java
  5. 0 240
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java
  6. 125 0
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java
  7. 131 0
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateReplicaAllocationCommand.java
  8. 124 0
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java
  9. 10 8
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommands.java
  10. 88 0
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/BasePrimaryAllocationCommand.java
  11. 1 1
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java
  12. 5 5
      core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java
  13. 75 1
      core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java
  14. 110 41
      core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java
  15. 39 11
      docs/reference/cluster/reroute.asciidoc
  16. 7 0
      docs/reference/migration/migrate_3_0.asciidoc

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java

@@ -319,7 +319,7 @@ public class DiscoveryNodes extends AbstractDiffable<DiscoveryNodes> implements
             throw new IllegalArgumentException("resolved [" + node + "] into [" + resolvedNodeIds.length + "] nodes, where expected to be resolved to a single node");
         }
         if (resolvedNodeIds.length == 0) {
-            throw new IllegalArgumentException("failed to resolve [" + node + " ], no matching nodes");
+            throw new IllegalArgumentException("failed to resolve [" + node + "], no matching nodes");
         }
         return nodes.get(resolvedNodeIds[0]);
     }

+ 10 - 0
core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java

@@ -114,6 +114,16 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
         return shard;
     }
 
+    /**
+     * All shards for the provided {@link ShardId}
+     * @return All the shard routing entries for the given index and shard id
+     * @throws IndexNotFoundException if provided index does not exist
+     * @throws ShardNotFoundException if provided shard id is unknown
+     */
+    public IndexShardRoutingTable shardRoutingTable(ShardId shardId) {
+        return shardRoutingTable(shardId.getIndex(), shardId.getId());
+    }
+
     public RoutingTable validateRaiseException(MetaData metaData) throws RoutingValidationException {
         RoutingTableValidation validation = validate(metaData);
         if (!validation.valid()) {

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java

@@ -32,7 +32,7 @@ import org.elasticsearch.gateway.GatewayAllocator;
 /**
  * The {@link ShardsAllocator} class offers methods for allocating shard within a cluster.
  * These methods include moving shards and re-balancing the cluster. It also allows management
- * of shards by their state. 
+ * of shards by their state.
  */
 public class ShardsAllocators extends AbstractComponent implements ShardsAllocator {
 

+ 240 - 0
core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AbstractAllocateAllocationCommand.java

@@ -0,0 +1,240 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation.command;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.RoutingNodes;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.io.stream.StreamableReader;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.shard.ShardId;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+/**
+ * Abstract base class for allocating an unassigned shard to a node
+ */
+public abstract class AbstractAllocateAllocationCommand implements AllocationCommand, ToXContent {
+
+    private static final String INDEX_KEY = "index";
+    private static final String SHARD_KEY = "shard";
+    private static final String NODE_KEY = "node";
+
+    protected static <T extends Builder> ObjectParser<T, Void> createAllocateParser(String command) {
+        ObjectParser<T, Void> parser = new ObjectParser<>(command);
+        parser.declareString(Builder::setIndex, new ParseField(INDEX_KEY));
+        parser.declareInt(Builder::setShard, new ParseField(SHARD_KEY));
+        parser.declareString(Builder::setNode, new ParseField(NODE_KEY));
+        return parser;
+    }
+
+    protected static abstract class Builder<T extends AbstractAllocateAllocationCommand> implements StreamableReader<Builder<T>> {
+        protected String index;
+        protected int shard = -1;
+        protected String node;
+
+        public void setIndex(String index) {
+            this.index = index;
+        }
+
+        public void setShard(int shard) {
+            this.shard = shard;
+        }
+
+        public void setNode(String node) {
+            this.node = node;
+        }
+
+        @Override
+        public Builder<T> readFrom(StreamInput in) throws IOException {
+            index = in.readString();
+            shard = in.readVInt();
+            node = in.readString();
+            return this;
+        }
+
+        public abstract Builder<T> parse(XContentParser parser) throws IOException;
+
+        public abstract T build();
+
+        protected void validate() {
+            if (index == null) {
+                throw new IllegalArgumentException("Argument [index] must be defined");
+            }
+            if (shard < 0) {
+                throw new IllegalArgumentException("Argument [shard] must be defined and non-negative");
+            }
+            if (node == null) {
+                throw new IllegalArgumentException("Argument [node] must be defined");
+            }
+        }
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        builder.field(INDEX_KEY, shardId().index().name());
+        builder.field(SHARD_KEY, shardId().id());
+        builder.field(NODE_KEY, node());
+        return builder;
+    }
+
+    public void writeTo(StreamOutput out) throws IOException {
+        out.writeString(shardId.getIndex());
+        out.writeVInt(shardId.getId());
+        out.writeString(node);
+    }
+
+    public static abstract class Factory<T extends AbstractAllocateAllocationCommand> implements AllocationCommand.Factory<T> {
+
+        protected abstract Builder<T> newBuilder();
+
+        @Override
+        public T readFrom(StreamInput in) throws IOException {
+            return newBuilder().readFrom(in).build();
+        }
+
+        @Override
+        public void writeTo(T command, StreamOutput out) throws IOException {
+            command.writeTo(out);
+        }
+
+        @Override
+        public T fromXContent(XContentParser parser) throws IOException {
+            return newBuilder().parse(parser).build();
+        }
+
+        @Override
+        public void toXContent(T command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
+            if (objectName == null) {
+                builder.startObject();
+            } else {
+                builder.startObject(objectName);
+            }
+            builder.endObject();
+        }
+    }
+
+    protected final ShardId shardId;
+    protected final String node;
+
+    protected AbstractAllocateAllocationCommand(ShardId shardId, String node) {
+        this.shardId = shardId;
+        this.node = node;
+    }
+
+    /**
+     * Get the shard id
+     *
+     * @return id of the shard
+     */
+    public ShardId shardId() {
+        return this.shardId;
+    }
+
+    /**
+     * Get the id of the node
+     *
+     * @return id of the node
+     */
+    public String node() {
+        return this.node;
+    }
+
+    /**
+     * Handle case where a disco node cannot be found in the routing table. Usually means that it's not a data node.
+     */
+    protected RerouteExplanation explainOrThrowMissingRoutingNode(RoutingAllocation allocation, boolean explain, DiscoveryNode discoNode) {
+        if (!discoNode.dataNode()) {
+            return explainOrThrowRejectedCommand(explain, allocation, "allocation can only be done on data nodes, not [" + node + "]");
+        } else {
+            return explainOrThrowRejectedCommand(explain, allocation, "could not find [" + node + "] among the routing nodes");
+        }
+    }
+
+    /**
+     * Utility method for rejecting the current allocation command based on provided reason
+     */
+    protected RerouteExplanation explainOrThrowRejectedCommand(boolean explain, RoutingAllocation allocation, String reason) {
+        if (explain) {
+            return new RerouteExplanation(this, allocation.decision(Decision.NO, name() + " (allocation command)", reason));
+        }
+        throw new IllegalArgumentException("[" + name() + "] " + reason);
+    }
+
+    /**
+     * Utility method for rejecting the current allocation command based on provided exception
+     */
+    protected RerouteExplanation explainOrThrowRejectedCommand(boolean explain, RoutingAllocation allocation, RuntimeException rte) {
+        if (explain) {
+            return new RerouteExplanation(this, allocation.decision(Decision.NO, name() + " (allocation command)", rte.getMessage()));
+        }
+        throw rte;
+    }
+
+    /**
+     * Initializes an unassigned shard on a node and removes it from the unassigned
+     *
+     * @param allocation the allocation
+     * @param routingNodes the routing nodes
+     * @param routingNode the node to initialize it to
+     * @param shardRouting the shard routing that is to be matched in unassigned shards
+     */
+    protected void initializeUnassignedShard(RoutingAllocation allocation, RoutingNodes routingNodes, RoutingNode routingNode, ShardRouting shardRouting) {
+        initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, null);
+    }
+
+    /**
+     * Initializes an unassigned shard on a node and removes it from the unassigned
+     *
+     * @param allocation the allocation
+     * @param routingNodes the routing nodes
+     * @param routingNode the node to initialize it to
+     * @param shardRouting the shard routing that is to be matched in unassigned shards
+     * @param shardRoutingChanges changes to apply for shard routing in unassigned shards before initialization
+     */
+    protected void initializeUnassignedShard(RoutingAllocation allocation, RoutingNodes routingNodes, RoutingNode routingNode,
+                                             ShardRouting shardRouting, @Nullable Consumer<ShardRouting> shardRoutingChanges) {
+        for (RoutingNodes.UnassignedShards.UnassignedIterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
+            ShardRouting unassigned = it.next();
+            if (!unassigned.equalsIgnoringMetaData(shardRouting)) {
+                continue;
+            }
+            if (shardRoutingChanges != null) {
+                shardRoutingChanges.accept(unassigned);
+            }
+            it.initialize(routingNode.nodeId(), unassigned.version(),
+                allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
+            return;
+        }
+        assert false : "shard to initialize not found in list of unassigned shards";
+    }
+}

+ 0 - 240
core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java

@@ -1,240 +0,0 @@
-/*
- * Licensed to Elasticsearch under one or more contributor
- * license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright
- * ownership. Elasticsearch licenses this file to you under
- * the Apache License, Version 2.0 (the "License"); you may
- * not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.elasticsearch.cluster.routing.allocation.command;
-
-import org.elasticsearch.ElasticsearchParseException;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.routing.RoutingNode;
-import org.elasticsearch.cluster.routing.RoutingNodes;
-import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.cluster.routing.UnassignedInfo;
-import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
-import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
-import org.elasticsearch.cluster.routing.allocation.decider.Decision;
-import org.elasticsearch.common.io.stream.StreamInput;
-import org.elasticsearch.common.io.stream.StreamOutput;
-import org.elasticsearch.common.xcontent.ToXContent;
-import org.elasticsearch.common.xcontent.XContentBuilder;
-import org.elasticsearch.common.xcontent.XContentParser;
-import org.elasticsearch.index.shard.ShardId;
-
-import java.io.IOException;
-
-/**
- * Allocates an unassigned shard to a specific node. Note, primary allocation will "force"
- * allocation which might mean one will loose data if using local gateway..., use with care
- * with the <tt>allowPrimary</tt> flag.
- */
-public class AllocateAllocationCommand implements AllocationCommand {
-
-    public static final String NAME = "allocate";
-
-    public static class Factory implements AllocationCommand.Factory<AllocateAllocationCommand> {
-
-        @Override
-        public AllocateAllocationCommand readFrom(StreamInput in) throws IOException {
-            return new AllocateAllocationCommand(ShardId.readShardId(in), in.readString(), in.readBoolean());
-        }
-
-        @Override
-        public void writeTo(AllocateAllocationCommand command, StreamOutput out) throws IOException {
-            command.shardId().writeTo(out);
-            out.writeString(command.node());
-            out.writeBoolean(command.allowPrimary());
-        }
-
-        @Override
-        public AllocateAllocationCommand fromXContent(XContentParser parser) throws IOException {
-            String index = null;
-            int shardId = -1;
-            String nodeId = null;
-            boolean allowPrimary = false;
-
-            String currentFieldName = null;
-            XContentParser.Token token;
-            while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
-                if (token == XContentParser.Token.FIELD_NAME) {
-                    currentFieldName = parser.currentName();
-                } else if (token.isValue()) {
-                    if ("index".equals(currentFieldName)) {
-                        index = parser.text();
-                    } else if ("shard".equals(currentFieldName)) {
-                        shardId = parser.intValue();
-                    } else if ("node".equals(currentFieldName)) {
-                        nodeId = parser.text();
-                    } else if ("allow_primary".equals(currentFieldName) || "allowPrimary".equals(currentFieldName)) {
-                        allowPrimary = parser.booleanValue();
-                    } else {
-                        throw new ElasticsearchParseException("[{}] command does not support field [{}]", NAME, currentFieldName);
-                    }
-                } else {
-                    throw new ElasticsearchParseException("[{}] command does not support complex json tokens [{}]", NAME, token);
-                }
-            }
-            if (index == null) {
-                throw new ElasticsearchParseException("[{}] command missing the index parameter", NAME);
-            }
-            if (shardId == -1) {
-                throw new ElasticsearchParseException("[{}] command missing the shard parameter", NAME);
-            }
-            if (nodeId == null) {
-                throw new ElasticsearchParseException("[{}] command missing the node parameter", NAME);
-            }
-            return new AllocateAllocationCommand(new ShardId(index, shardId), nodeId, allowPrimary);
-        }
-
-        @Override
-        public void toXContent(AllocateAllocationCommand command, XContentBuilder builder, ToXContent.Params params, String objectName) throws IOException {
-            if (objectName == null) {
-                builder.startObject();
-            } else {
-                builder.startObject(objectName);
-            }
-            builder.field("index", command.shardId().index().name());
-            builder.field("shard", command.shardId().id());
-            builder.field("node", command.node());
-            builder.field("allow_primary", command.allowPrimary());
-            builder.endObject();
-        }
-    }
-
-    private final ShardId shardId;
-    private final String node;
-    private final boolean allowPrimary;
-
-    /**
-     * Create a new {@link AllocateAllocationCommand}
-     *
-     * @param shardId      {@link ShardId} of the shrad to assign
-     * @param node         Node to assign the shard to
-     * @param allowPrimary should the node be allow to allocate the shard as primary
-     */
-    public AllocateAllocationCommand(ShardId shardId, String node, boolean allowPrimary) {
-        this.shardId = shardId;
-        this.node = node;
-        this.allowPrimary = allowPrimary;
-    }
-
-    @Override
-    public String name() {
-        return NAME;
-    }
-
-    /**
-     * Get the shards id
-     *
-     * @return id of the shard
-     */
-    public ShardId shardId() {
-        return this.shardId;
-    }
-
-    /**
-     * Get the id of the Node
-     *
-     * @return id of the Node
-     */
-    public String node() {
-        return this.node;
-    }
-
-    /**
-     * Determine if primary allocation is allowed
-     *
-     * @return <code>true</code> if primary allocation is allowed. Otherwise <code>false</code>
-     */
-    public boolean allowPrimary() {
-        return this.allowPrimary;
-    }
-
-    @Override
-    public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
-        final DiscoveryNode discoNode = allocation.nodes().resolveNode(node);
-        final RoutingNodes routingNodes = allocation.routingNodes();
-
-        ShardRouting shardRouting = null;
-        for (ShardRouting routing : routingNodes.unassigned()) {
-            if (routing.shardId().equals(shardId)) {
-                // prefer primaries first to allocate
-                if (shardRouting == null || routing.primary()) {
-                    shardRouting = routing;
-                }
-            }
-        }
-
-        if (shardRouting == null) {
-            if (explain) {
-                return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
-                        "failed to find " + shardId + " on the list of unassigned shards"));
-            }
-            throw new IllegalArgumentException("[allocate] failed to find " + shardId + " on the list of unassigned shards");
-        }
-
-        if (shardRouting.primary() && !allowPrimary) {
-            if (explain) {
-                return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
-                        "trying to allocate a primary shard " + shardId + ", which is disabled"));
-            }
-            throw new IllegalArgumentException("[allocate] trying to allocate a primary shard " + shardId + ", which is disabled");
-        }
-
-        RoutingNode routingNode = routingNodes.node(discoNode.id());
-        if (routingNode == null) {
-            if (!discoNode.dataNode()) {
-                if (explain) {
-                    return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
-                            "Allocation can only be done on data nodes, not [" + node + "]"));
-                }
-                throw new IllegalArgumentException("Allocation can only be done on data nodes, not [" + node + "]");
-            } else {
-                if (explain) {
-                    return new RerouteExplanation(this, allocation.decision(Decision.NO, "allocate_allocation_command",
-                            "Could not find [" + node + "] among the routing nodes"));
-                }
-                throw new IllegalStateException("Could not find [" + node + "] among the routing nodes");
-            }
-        }
-
-        Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
-        if (decision.type() == Decision.Type.NO) {
-            if (explain) {
-                return new RerouteExplanation(this, decision);
-            }
-            throw new IllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
-        }
-        // go over and remove it from the unassigned
-        for (RoutingNodes.UnassignedShards.UnassignedIterator it = routingNodes.unassigned().iterator(); it.hasNext(); ) {
-            ShardRouting unassigned = it.next();
-            if (unassigned != shardRouting) {
-                continue;
-            }
-            // if we force allocation of a primary, we need to move the unassigned info back to treat it as if
-            // it was index creation
-            if (unassigned.primary() && unassigned.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
-                unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
-                        "force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(),
-                        unassigned.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis()));
-            }
-            it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
-            break;
-        }
-        return new RerouteExplanation(this, decision);
-    }
-}

+ 125 - 0
core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java

@@ -0,0 +1,125 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation.command;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.RoutingNodes;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.UnassignedInfo;
+import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
+
+import java.io.IOException;
+
+/**
+ * Allocates an unassigned empty primary shard to a specific node. Use with extreme care as this will result in data loss.
+ * Allocation deciders are ignored.
+ */
+public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocationCommand {
+    public static final String NAME = "allocate_empty_primary";
+
+    private static final ObjectParser<Builder, Void> EMPTY_PRIMARY_PARSER = BasePrimaryAllocationCommand.createAllocatePrimaryParser(NAME);
+
+    /**
+     * Creates a new {@link AllocateEmptyPrimaryAllocationCommand}
+     *
+     * @param shardId        {@link ShardId} of the shard to assign
+     * @param node           node id of the node to assign the shard to
+     * @param acceptDataLoss whether the user agrees to data loss
+     */
+    public AllocateEmptyPrimaryAllocationCommand(ShardId shardId, String node, boolean acceptDataLoss) {
+        super(shardId, node, acceptDataLoss);
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    public static class Builder extends BasePrimaryAllocationCommand.Builder<AllocateEmptyPrimaryAllocationCommand> {
+
+        @Override
+        public Builder parse(XContentParser parser) throws IOException {
+            return EMPTY_PRIMARY_PARSER.parse(parser, this);
+        }
+
+        @Override
+        public AllocateEmptyPrimaryAllocationCommand build() {
+            validate();
+            return new AllocateEmptyPrimaryAllocationCommand(new ShardId(index, shard), node, acceptDataLoss);
+        }
+    }
+
+    public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateEmptyPrimaryAllocationCommand> {
+
+        @Override
+        protected Builder newBuilder() {
+            return new Builder();
+        }
+    }
+
+    @Override
+    public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
+        final DiscoveryNode discoNode;
+        try {
+            discoNode = allocation.nodes().resolveNode(node);
+        } catch (IllegalArgumentException e) {
+            return explainOrThrowRejectedCommand(explain, allocation, e);
+        }
+        final RoutingNodes routingNodes = allocation.routingNodes();
+        RoutingNode routingNode = routingNodes.node(discoNode.id());
+        if (routingNode == null) {
+            return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
+        }
+
+        final ShardRouting shardRouting;
+        try {
+            shardRouting = allocation.routingTable().shardRoutingTable(shardId).primaryShard();
+        } catch (IndexNotFoundException | ShardNotFoundException e) {
+            return explainOrThrowRejectedCommand(explain, allocation, e);
+        }
+        if (shardRouting.unassigned() == false) {
+            return explainOrThrowRejectedCommand(explain, allocation, "primary " + shardId + " is already assigned");
+        }
+
+        if (shardRouting.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED && acceptDataLoss == false) {
+            return explainOrThrowRejectedCommand(explain, allocation,
+                "allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true");
+        }
+
+        initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting,
+            shr -> {
+                if (shr.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
+                    // we need to move the unassigned info back to treat it as if it was index creation
+                    shr.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
+                        "force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(),
+                        shardRouting.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis()));
+                }
+            });
+        return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
+    }
+}

+ 131 - 0
core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateReplicaAllocationCommand.java

@@ -0,0 +1,131 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation.command;
+
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.RoutingNodes;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
+import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Allocates an unassigned replica shard to a specific node. Checks if allocation deciders allow allocation.
+ */
+public class AllocateReplicaAllocationCommand extends AbstractAllocateAllocationCommand {
+    public static final String NAME = "allocate_replica";
+
+    private static final ObjectParser<AllocateReplicaAllocationCommand.Builder, Void> REPLICA_PARSER = createAllocateParser(NAME);
+
+    /**
+     * Creates a new {@link AllocateReplicaAllocationCommand}
+     *
+     * @param shardId        {@link ShardId} of the shard to assign
+     * @param node           node id of the node to assign the shard to
+     */
+    public AllocateReplicaAllocationCommand(ShardId shardId, String node) {
+        super(shardId, node);
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    protected static class Builder extends AbstractAllocateAllocationCommand.Builder<AllocateReplicaAllocationCommand> {
+
+        @Override
+        public Builder parse(XContentParser parser) throws IOException {
+            return REPLICA_PARSER.parse(parser, this);
+        }
+
+        @Override
+        public AllocateReplicaAllocationCommand build() {
+            validate();
+            return new AllocateReplicaAllocationCommand(new ShardId(index, shard), node);
+        }
+    }
+
+    public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateReplicaAllocationCommand> {
+        @Override
+        protected Builder newBuilder() {
+            return new Builder();
+        }
+    }
+
+    @Override
+    public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
+        final DiscoveryNode discoNode;
+        try {
+            discoNode = allocation.nodes().resolveNode(node);
+        } catch (IllegalArgumentException e) {
+            return explainOrThrowRejectedCommand(explain, allocation, e);
+        }
+        final RoutingNodes routingNodes = allocation.routingNodes();
+        RoutingNode routingNode = routingNodes.node(discoNode.id());
+        if (routingNode == null) {
+            return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
+        }
+
+        final ShardRouting primaryShardRouting;
+        try {
+            primaryShardRouting = allocation.routingTable().shardRoutingTable(shardId).primaryShard();
+        } catch (IndexNotFoundException | ShardNotFoundException e) {
+            return explainOrThrowRejectedCommand(explain, allocation, e);
+        }
+        if (primaryShardRouting.unassigned()) {
+            return explainOrThrowRejectedCommand(explain, allocation,
+                "trying to allocate a replica shard " + shardId + ", while corresponding primary shard is still unassigned");
+        }
+
+        List<ShardRouting> replicaShardRoutings = allocation.routingTable().shardRoutingTable(shardId).replicaShardsWithState(ShardRoutingState.UNASSIGNED);
+        ShardRouting shardRouting;
+        if (replicaShardRoutings.isEmpty()) {
+            return explainOrThrowRejectedCommand(explain, allocation,
+                "all copies of " + shardId +" are already assigned. Use the move allocation command instead");
+        } else {
+            shardRouting = replicaShardRoutings.get(0);
+        }
+
+        Decision decision = allocation.deciders().canAllocate(shardRouting, routingNode, allocation);
+        if (decision.type() == Decision.Type.NO) {
+            // don't use explainOrThrowRejectedCommand to keep the original "NO" decision
+            if (explain) {
+                return new RerouteExplanation(this, decision);
+            }
+            throw new IllegalArgumentException("[" + name() + "] allocation of " + shardId + " on node " + discoNode + " is not allowed, reason: " + decision);
+        }
+
+        initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting);
+        return new RerouteExplanation(this, decision);
+    }
+
+
+}

+ 124 - 0
core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateStalePrimaryAllocationCommand.java

@@ -0,0 +1,124 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation.command;
+
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.routing.RoutingNode;
+import org.elasticsearch.cluster.routing.RoutingNodes;
+import org.elasticsearch.cluster.routing.ShardRouting;
+import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.routing.allocation.decider.Decision;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.XContentParser;
+import org.elasticsearch.index.IndexNotFoundException;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
+
+import java.io.IOException;
+
+/**
+ * Allocates an unassigned stale primary shard to a specific node. Use with extreme care as this will result in data loss.
+ * Allocation deciders are ignored.
+ */
+public class AllocateStalePrimaryAllocationCommand extends BasePrimaryAllocationCommand {
+    public static final String NAME = "allocate_stale_primary";
+
+    private static final ObjectParser<Builder, Void> STALE_PRIMARY_PARSER = BasePrimaryAllocationCommand.createAllocatePrimaryParser(NAME);
+
+    /**
+     * Creates a new {@link AllocateStalePrimaryAllocationCommand}
+     *
+     * @param shardId        {@link ShardId} of the shard to assign
+     * @param node           node id of the node to assign the shard to
+     * @param acceptDataLoss whether the user agrees to data loss
+     */
+    public AllocateStalePrimaryAllocationCommand(ShardId shardId, String node, boolean acceptDataLoss) {
+        super(shardId, node, acceptDataLoss);
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    public static class Builder extends BasePrimaryAllocationCommand.Builder<AllocateStalePrimaryAllocationCommand> {
+
+        @Override
+        public Builder parse(XContentParser parser) throws IOException {
+            return STALE_PRIMARY_PARSER.parse(parser, this);
+        }
+
+        @Override
+        public AllocateStalePrimaryAllocationCommand build() {
+            validate();
+            return new AllocateStalePrimaryAllocationCommand(new ShardId(index, shard), node, acceptDataLoss);
+        }
+    }
+
+    public static class Factory extends AbstractAllocateAllocationCommand.Factory<AllocateStalePrimaryAllocationCommand> {
+
+        @Override
+        protected Builder newBuilder() {
+            return new Builder();
+        }
+    }
+
+    @Override
+    public RerouteExplanation execute(RoutingAllocation allocation, boolean explain) {
+        final DiscoveryNode discoNode;
+        try {
+            discoNode = allocation.nodes().resolveNode(node);
+        } catch (IllegalArgumentException e) {
+            return explainOrThrowRejectedCommand(explain, allocation, e);
+        }
+        final RoutingNodes routingNodes = allocation.routingNodes();
+        RoutingNode routingNode = routingNodes.node(discoNode.id());
+        if (routingNode == null) {
+            return explainOrThrowMissingRoutingNode(allocation, explain, discoNode);
+        }
+
+        final ShardRouting shardRouting;
+        try {
+            shardRouting = allocation.routingTable().shardRoutingTable(shardId).primaryShard();
+        } catch (IndexNotFoundException | ShardNotFoundException e) {
+            return explainOrThrowRejectedCommand(explain, allocation, e);
+        }
+        if (shardRouting.unassigned() == false) {
+            return explainOrThrowRejectedCommand(explain, allocation, "primary " + shardId + " is already assigned");
+        }
+
+        if (acceptDataLoss == false) {
+            return explainOrThrowRejectedCommand(explain, allocation,
+                "allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true");
+        }
+
+        final IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex());
+        if (shardRouting.allocatedPostIndexCreate(indexMetaData) == false) {
+            return explainOrThrowRejectedCommand(explain, allocation,
+                "trying to allocate an existing primary shard " + shardId + ", while no such shard has ever been active");
+        }
+
+        initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting);
+        return new RerouteExplanation(this, allocation.decision(Decision.YES, name() + " (allocation command)", "ignore deciders"));
+    }
+
+}

+ 10 - 8
core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocationCommands.java

@@ -67,7 +67,9 @@ public class AllocationCommands {
     }
 
     static {
-        registerFactory(AllocateAllocationCommand.NAME, new AllocateAllocationCommand.Factory());
+        registerFactory(AllocateEmptyPrimaryAllocationCommand.NAME, new AllocateEmptyPrimaryAllocationCommand.Factory());
+        registerFactory(AllocateStalePrimaryAllocationCommand.NAME, new AllocateStalePrimaryAllocationCommand.Factory());
+        registerFactory(AllocateReplicaAllocationCommand.NAME, new AllocateReplicaAllocationCommand.Factory());
         registerFactory(CancelAllocationCommand.NAME, new CancelAllocationCommand.Factory());
         registerFactory(MoveAllocationCommand.NAME, new MoveAllocationCommand.Factory());
     }
@@ -76,7 +78,7 @@ public class AllocationCommands {
 
     /**
      * Creates a new set of {@link AllocationCommands}
-     *   
+     *
      * @param commands {@link AllocationCommand}s that are wrapped by this instance
      */
     public AllocationCommands(AllocationCommand... commands) {
@@ -122,7 +124,7 @@ public class AllocationCommands {
      * Reads a {@link AllocationCommands} from a {@link StreamInput}
      * @param in {@link StreamInput} to read from
      * @return {@link AllocationCommands} read
-     * 
+     *
      * @throws IOException if something happens during read
      */
     public static AllocationCommands readFrom(StreamInput in) throws IOException {
@@ -137,7 +139,7 @@ public class AllocationCommands {
 
     /**
      * Writes {@link AllocationCommands} to a {@link StreamOutput}
-     * 
+     *
      * @param commands Commands to write
      * @param out {@link StreamOutput} to write the commands to
      * @throws IOException if something happens during write
@@ -149,7 +151,7 @@ public class AllocationCommands {
             lookupFactorySafe(command.name()).writeTo(command, out);
         }
     }
-    
+
     /**
      * Reads {@link AllocationCommands} from a {@link XContentParser}
      * <pre>
@@ -161,7 +163,7 @@ public class AllocationCommands {
      * </pre>
      * @param parser {@link XContentParser} to read the commands from
      * @return {@link AllocationCommands} read
-     * @throws IOException if something bad happens while reading the stream 
+     * @throws IOException if something bad happens while reading the stream
      */
     public static AllocationCommands fromXContent(XContentParser parser) throws IOException {
         AllocationCommands commands = new AllocationCommands();
@@ -203,10 +205,10 @@ public class AllocationCommands {
         }
         return commands;
     }
-    
+
     /**
      * Writes {@link AllocationCommands} to a {@link XContentBuilder}
-     * 
+     *
      * @param commands {@link AllocationCommands} to write
      * @param builder {@link XContentBuilder} to use
      * @param params Parameters to use for building

+ 88 - 0
core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/BasePrimaryAllocationCommand.java

@@ -0,0 +1,88 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing.allocation.command;
+
+import org.elasticsearch.common.ParseField;
+import org.elasticsearch.common.io.stream.StreamInput;
+import org.elasticsearch.common.io.stream.StreamOutput;
+import org.elasticsearch.common.xcontent.ObjectParser;
+import org.elasticsearch.common.xcontent.ToXContent;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.shard.ShardId;
+
+import java.io.IOException;
+
+/**
+ * Abstract base class for allocating an unassigned primary shard to a node
+ */
+public abstract class BasePrimaryAllocationCommand extends AbstractAllocateAllocationCommand {
+
+    private static final String ACCEPT_DATA_LOSS_KEY = "accept_data_loss";
+
+    protected static <T extends Builder> ObjectParser<T, Void> createAllocatePrimaryParser(String command) {
+        ObjectParser<T, Void> parser = AbstractAllocateAllocationCommand.createAllocateParser(command);
+        parser.declareBoolean(Builder::setAcceptDataLoss, new ParseField(ACCEPT_DATA_LOSS_KEY));
+        return parser;
+    }
+
+    protected final boolean acceptDataLoss;
+
+    protected BasePrimaryAllocationCommand(ShardId shardId, String node, boolean acceptDataLoss) {
+        super(shardId, node);
+        this.acceptDataLoss = acceptDataLoss;
+    }
+
+    /**
+     * The operation only executes if the user explicitly agrees to possible data loss
+     *
+     * @return whether data loss is acceptable
+     */
+    public boolean acceptDataLoss() {
+        return acceptDataLoss;
+    }
+
+    protected static abstract class Builder<T extends BasePrimaryAllocationCommand> extends AbstractAllocateAllocationCommand.Builder<T> {
+        protected boolean acceptDataLoss;
+
+        public void setAcceptDataLoss(boolean acceptDataLoss) {
+            this.acceptDataLoss = acceptDataLoss;
+        }
+
+        @Override
+        public Builder readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            acceptDataLoss = in.readBoolean();
+            return this;
+        }
+    }
+
+    @Override
+    public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
+        super.toXContent(builder, params);
+        builder.field(ACCEPT_DATA_LOSS_KEY, acceptDataLoss);
+        return builder;
+    }
+
+    @Override
+    public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeBoolean(acceptDataLoss);
+    }
+}

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java

@@ -125,7 +125,7 @@ public class CancelAllocationCommand implements AllocationCommand {
 
     /**
      * Creates a new {@link CancelAllocationCommand}
-     * 
+     *
      * @param shardId id of the shard which allocation should be canceled
      * @param node id of the node that manages the shard which allocation should be canceled
      */

+ 5 - 5
core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java

@@ -29,7 +29,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
 import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
-import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@@ -100,7 +100,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
         logger.info("--> explicitly allocate shard 1, *under dry_run*");
         state = client().admin().cluster().prepareReroute()
                 .setExplain(randomBoolean())
-                .add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
+                .add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node_1, true))
                 .setDryRun(true)
                 .execute().actionGet().getState();
         assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
@@ -113,7 +113,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
         logger.info("--> explicitly allocate shard 1, actually allocating, no dry run");
         state = client().admin().cluster().prepareReroute()
                 .setExplain(randomBoolean())
-                .add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
+                .add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node_1, true))
                 .execute().actionGet().getState();
         assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
         assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).id()).get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
@@ -212,7 +212,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
         logger.info("--> explicitly allocate shard 1, actually allocating, no dry run");
         state = client().admin().cluster().prepareReroute()
                 .setExplain(randomBoolean())
-                .add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
+                .add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node_1, true))
                 .execute().actionGet().getState();
         assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
         assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).id()).get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
@@ -246,7 +246,7 @@ public class ClusterRerouteIT extends ESIntegTestCase {
         logger.info("--> explicitly allocate primary");
         state = client().admin().cluster().prepareReroute()
                 .setExplain(randomBoolean())
-                .add(new AllocateAllocationCommand(new ShardId("test", 0), node_1, true))
+                .add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node_1, true))
                 .execute().actionGet().getState();
         assertThat(state.getRoutingNodes().unassigned().size(), equalTo(1));
         assertThat(state.getRoutingNodes().node(state.nodes().resolveNode(node_1).id()).get(0).state(), equalTo(ShardRoutingState.INITIALIZING));

+ 75 - 1
core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

@@ -19,10 +19,17 @@ package org.elasticsearch.cluster.routing;
  * under the License.
  */
 
+import com.carrotsearch.hppc.cursors.IntObjectCursor;
+import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequestBuilder;
+import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
+import org.elasticsearch.common.collect.ImmutableOpenIntMap;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.gateway.GatewayAllocator;
 import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.InternalTestCluster;
 import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
@@ -33,11 +40,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 
 @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
@@ -50,7 +59,7 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
         return pluginList(MockTransportService.TestPlugin.class);
     }
 
-    public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception {
+    private void createStaleReplicaScenario() throws Exception {
         logger.info("--> starting 3 nodes, 1 master, 2 data");
         String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
         internalCluster().startDataOnlyNodesAsync(2).get();
@@ -103,6 +112,10 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
         assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)));
         // kick reroute a second time and check that all shards are unassigned
         assertThat(client(master).admin().cluster().prepareReroute().get().getState().getRoutingNodes().unassigned().size(), equalTo(2));
+    }
+
+    public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception {
+        createStaleReplicaScenario();
 
         logger.info("--> starting node that reuses data folder with the up-to-date primary shard");
         internalCluster().startDataOnlyNode(Settings.EMPTY);
@@ -112,6 +125,67 @@ public class PrimaryAllocationIT extends ESIntegTestCase {
         assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l);
     }
 
+    public void testFailedAllocationOfStalePrimaryToDataNodeWithNoData() throws Exception {
+        String dataNodeWithShardCopy = internalCluster().startNode();
+
+        logger.info("--> create single shard index");
+        assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
+            .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
+        ensureGreen("test");
+
+        String dataNodeWithNoShardCopy = internalCluster().startNode();
+        ensureStableCluster(2);
+
+        internalCluster().stopRandomNode(InternalTestCluster.nameFilter(dataNodeWithShardCopy));
+        ensureStableCluster(1);
+        assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
+
+        logger.info("--> force allocation of stale copy to node that does not have shard copy");
+        client().admin().cluster().prepareReroute().add(new AllocateStalePrimaryAllocationCommand(new ShardId("test", 0), dataNodeWithNoShardCopy, true)).get();
+
+        logger.info("--> wait until shard is failed and becomes unassigned again");
+        assertBusy(() -> assertTrue(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").allPrimaryShardsUnassigned()));
+        assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().index("test").getShards().get(0).primaryShard().unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
+    }
+
+    public void testForceStaleReplicaToBePromotedToPrimary() throws Exception {
+        boolean useStaleReplica = randomBoolean(); // if true, use stale replica, otherwise a completely empty copy
+        createStaleReplicaScenario();
+
+        logger.info("--> explicitly promote old primary shard");
+        ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storeStatuses = client().admin().indices().prepareShardStores("test").get().getStoreStatuses().get("test");
+        ClusterRerouteRequestBuilder rerouteBuilder = client().admin().cluster().prepareReroute();
+        for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : storeStatuses) {
+            int shardId = shardStoreStatuses.key;
+            IndicesShardStoresResponse.StoreStatus storeStatus = randomFrom(shardStoreStatuses.value);
+            logger.info("--> adding allocation command for shard " + shardId);
+            // force allocation based on node id
+            if (useStaleReplica) {
+                rerouteBuilder.add(new AllocateStalePrimaryAllocationCommand(new ShardId("test", shardId), storeStatus.getNode().getId(), true));
+            } else {
+                rerouteBuilder.add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", shardId), storeStatus.getNode().getId(), true));
+            }
+        }
+        rerouteBuilder.get();
+
+        logger.info("--> check that the stale primary shard gets allocated and that documents are available");
+        ensureYellow("test");
+
+        assertHitCount(client().prepareSearch("test").setSize(0).setQuery(matchAllQuery()).get(), useStaleReplica ? 1l : 0l);
+    }
+
+    public void testForcePrimaryShardIfAllocationDecidersSayNoAfterIndexCreation() throws ExecutionException, InterruptedException {
+        String node = internalCluster().startNode();
+        client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
+            .put("index.routing.allocation.exclude._name", node)
+            .put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get();
+
+        assertThat(client().admin().cluster().prepareState().get().getState().getRoutingTable().shardRoutingTable("test", 0).assignedShards(), empty());
+
+        client().admin().cluster().prepareReroute().add(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), node, true)).get();
+        ensureGreen("test");
+    }
+
     public void testNotWaitForQuorumCopies() throws Exception {
         logger.info("--> starting 3 nodes");
         internalCluster().startNodesAsync(3).get();

+ 110 - 41
core/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationCommandsTests.java

@@ -26,7 +26,10 @@ import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
-import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.command.AbstractAllocateAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand;
+import org.elasticsearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
 import org.elasticsearch.cluster.routing.allocation.command.CancelAllocationCommand;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
@@ -38,7 +41,9 @@ import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.xcontent.XContentFactory;
 import org.elasticsearch.common.xcontent.XContentParser;
 import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.IndexNotFoundException;
 import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.shard.ShardNotFoundException;
 import org.elasticsearch.test.ESAllocationTestCase;
 
 import static java.util.Collections.singletonMap;
@@ -46,6 +51,7 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
 import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
 import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
 import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 
 /**
@@ -96,6 +102,14 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         assertThat(clusterState.getRoutingNodes().node(toNodeId).get(0).state(), equalTo(ShardRoutingState.STARTED));
     }
 
+    private AbstractAllocateAllocationCommand randomAllocateCommand(ShardId shardId, String node) {
+        return randomFrom(
+            new AllocateReplicaAllocationCommand(shardId, node),
+            new AllocateEmptyPrimaryAllocationCommand(shardId, node, true),
+            new AllocateStalePrimaryAllocationCommand(shardId, node, true)
+        );
+    }
+
     public void testAllocateCommand() {
         AllocationService allocation = createAllocationService(settingsBuilder()
                 .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
@@ -106,10 +120,13 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         MetaData metaData = MetaData.builder()
                 .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
                 .build();
+        // shard routing is added as "from recovery" instead of "new index creation" so that we can test below that allocating an empty
+        // primary with accept_data_loss flag set to false fails
         RoutingTable routingTable = RoutingTable.builder()
-                .addAsNew(metaData.index("test"))
+                .addAsRecovery(metaData.index("test"))
                 .build();
         ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
+        ShardId shardId = new ShardId("test", 0);
 
         logger.info("--> adding 3 nodes on same rack and do rerouting");
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
@@ -122,22 +139,56 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
         assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
 
-        logger.info("--> allocating with primary flag set to false, should fail");
+        logger.info("--> allocating to non-existent node, should fail");
         try {
-            allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", false)));
-            fail();
+            allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(shardId, "node42")));
+            fail("expected IllegalArgumentException when allocating to non-existing node");
         } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), containsString("failed to resolve [node42], no matching nodes"));
         }
 
         logger.info("--> allocating to non-data node, should fail");
         try {
-            rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node4", true)));
-            fail();
+            allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(shardId, "node4")));
+            fail("expected IllegalArgumentException when allocating to non-data node");
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), containsString("allocation can only be done on data nodes"));
+        }
+
+        logger.info("--> allocating non-existing shard, should fail");
+        try {
+            allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(new ShardId("test", 1), "node2")));
+            fail("expected ShardNotFoundException when allocating non-existing shard");
+        } catch (ShardNotFoundException e) {
+            assertThat(e.getMessage(), containsString("no such shard"));
+        }
+
+        logger.info("--> allocating non-existing index, should fail");
+        try {
+            allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(new ShardId("test2", 0), "node2")));
+            fail("expected ShardNotFoundException when allocating non-existing index");
+        } catch (IndexNotFoundException e) {
+            assertThat(e.getMessage(), containsString("no such index"));
+        }
+
+        logger.info("--> allocating empty primary with acceptDataLoss flag set to false");
+        try {
+            allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), "node1", false)));
+            fail("expected IllegalArgumentException when allocating empty primary with acceptDataLoss flag set to false");
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), containsString("allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true"));
+        }
+
+        logger.info("--> allocating stale primary with acceptDataLoss flag set to false");
+        try {
+            allocation.reroute(clusterState, new AllocationCommands(new AllocateStalePrimaryAllocationCommand(shardId, "node1", false)));
+            fail("expected IllegalArgumentException when allocating stale primary with acceptDataLoss flag set to false");
         } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), containsString("allocating an empty primary for " + shardId + " can result in data loss. Please confirm by setting the accept_data_loss parameter to true"));
         }
 
-        logger.info("--> allocating with primary flag set to true");
-        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true)));
+        logger.info("--> allocating empty primary with acceptDataLoss flag set to true");
+        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), "node1", true)));
         assertThat(rerouteResult.changed(), equalTo(true));
         clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
         assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@@ -153,13 +204,13 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
 
         logger.info("--> allocate the replica shard on the primary shard node, should fail");
         try {
-            allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", false)));
-            fail();
+            allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node1")));
+            fail("expected IllegalArgumentException when allocating replica shard on the primary shard node");
         } catch (IllegalArgumentException e) {
         }
 
         logger.info("--> allocate the replica shard on on the second node");
-        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
+        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node2")));
         assertThat(rerouteResult.changed(), equalTo(true));
         clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
         assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@@ -178,8 +229,8 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
 
         logger.info("--> verify that we fail when there are no unassigned shards");
         try {
-            allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node3", false)));
-            fail();
+            allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(new ShardId("test", 0), "node3")));
+            fail("expected IllegalArgumentException when allocating shard while no unassigned shard available");
         } catch (IllegalArgumentException e) {
         }
     }
@@ -209,8 +260,8 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
         assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));
 
-        logger.info("--> allocating with primary flag set to true");
-        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true)));
+        logger.info("--> allocating empty primary shard with accept_data_loss flag set to true");
+        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 0), "node1", true)));
         assertThat(rerouteResult.changed(), equalTo(true));
         clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
         assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@@ -239,7 +290,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         }
 
         logger.info("--> allocate the replica shard on on the second node");
-        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
+        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node2")));
         assertThat(rerouteResult.changed(), equalTo(true));
         clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
         assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@@ -257,7 +308,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
 
         logger.info("--> allocate the replica shard on on the second node");
-        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
+        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node2")));
         assertThat(rerouteResult.changed(), equalTo(true));
         clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
         assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@@ -290,7 +341,7 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         assertThat(clusterState.getRoutingNodes().node("node3").size(), equalTo(0));
 
         logger.info("--> allocate the replica shard on on the second node");
-        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
+        rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateReplicaAllocationCommand(new ShardId("test", 0), "node2")));
         clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
         assertThat(rerouteResult.changed(), equalTo(true));
         assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
@@ -335,7 +386,9 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
 
     public void testSerialization() throws Exception {
         AllocationCommands commands = new AllocationCommands(
-                new AllocateAllocationCommand(new ShardId("test", 1), "node1", true),
+                new AllocateEmptyPrimaryAllocationCommand(new ShardId("test", 1), "node1", true),
+                new AllocateStalePrimaryAllocationCommand(new ShardId("test", 2), "node1", true),
+                new AllocateReplicaAllocationCommand(new ShardId("test", 2), "node1"),
                 new MoveAllocationCommand(new ShardId("test", 3), "node2", "node3"),
                 new CancelAllocationCommand(new ShardId("test", 4), "node5", true)
         );
@@ -343,24 +396,33 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         AllocationCommands.writeTo(commands, bytes);
         AllocationCommands sCommands = AllocationCommands.readFrom(StreamInput.wrap(bytes.bytes()));
 
-        assertThat(sCommands.commands().size(), equalTo(3));
-        assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));
-        assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).node(), equalTo("node1"));
-        assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).allowPrimary(), equalTo(true));
+        assertThat(sCommands.commands().size(), equalTo(5));
+        assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));
+        assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).node(), equalTo("node1"));
+        assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).acceptDataLoss(), equalTo(true));
+
+        assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).shardId(), equalTo(new ShardId("test", 2)));
+        assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).node(), equalTo("node1"));
+        assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).acceptDataLoss(), equalTo(true));
 
-        assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).shardId(), equalTo(new ShardId("test", 3)));
-        assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).fromNode(), equalTo("node2"));
-        assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).toNode(), equalTo("node3"));
+        assertThat(((AllocateReplicaAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 2)));
+        assertThat(((AllocateReplicaAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node1"));
 
-        assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4)));
-        assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5"));
-        assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).allowPrimary(), equalTo(true));
+        assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).shardId(), equalTo(new ShardId("test", 3)));
+        assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).fromNode(), equalTo("node2"));
+        assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).toNode(), equalTo("node3"));
+
+        assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).shardId(), equalTo(new ShardId("test", 4)));
+        assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).node(), equalTo("node5"));
+        assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).allowPrimary(), equalTo(true));
     }
 
     public void testXContent() throws Exception {
         String commands = "{\n" +
                 "    \"commands\" : [\n" +
-                "        {\"allocate\" : {\"index\" : \"test\", \"shard\" : 1, \"node\" : \"node1\", \"allow_primary\" : true}}\n" +
+                "        {\"allocate_empty_primary\" : {\"index\" : \"test\", \"shard\" : 1, \"node\" : \"node1\", \"accept_data_loss\" : true}}\n" +
+                "       ,{\"allocate_stale_primary\" : {\"index\" : \"test\", \"shard\" : 2, \"node\" : \"node1\", \"accept_data_loss\" : true}}\n" +
+                "       ,{\"allocate_replica\" : {\"index\" : \"test\", \"shard\" : 2, \"node\" : \"node1\"}}\n" +
                 "       ,{\"move\" : {\"index\" : \"test\", \"shard\" : 3, \"from_node\" : \"node2\", \"to_node\" : \"node3\"}} \n" +
                 "       ,{\"cancel\" : {\"index\" : \"test\", \"shard\" : 4, \"node\" : \"node5\", \"allow_primary\" : true}} \n" +
                 "    ]\n" +
@@ -371,17 +433,24 @@ public class AllocationCommandsTests extends ESAllocationTestCase {
         parser.nextToken();
         AllocationCommands sCommands = AllocationCommands.fromXContent(parser);
 
-        assertThat(sCommands.commands().size(), equalTo(3));
-        assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));
-        assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).node(), equalTo("node1"));
-        assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).allowPrimary(), equalTo(true));
+        assertThat(sCommands.commands().size(), equalTo(5));
+        assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));
+        assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).node(), equalTo("node1"));
+        assertThat(((AllocateEmptyPrimaryAllocationCommand) (sCommands.commands().get(0))).acceptDataLoss(), equalTo(true));
+
+        assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).shardId(), equalTo(new ShardId("test", 2)));
+        assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).node(), equalTo("node1"));
+        assertThat(((AllocateStalePrimaryAllocationCommand) (sCommands.commands().get(1))).acceptDataLoss(), equalTo(true));
+
+        assertThat(((AllocateReplicaAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 2)));
+        assertThat(((AllocateReplicaAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node1"));
 
-        assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).shardId(), equalTo(new ShardId("test", 3)));
-        assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).fromNode(), equalTo("node2"));
-        assertThat(((MoveAllocationCommand) (sCommands.commands().get(1))).toNode(), equalTo("node3"));
+        assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).shardId(), equalTo(new ShardId("test", 3)));
+        assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).fromNode(), equalTo("node2"));
+        assertThat(((MoveAllocationCommand) (sCommands.commands().get(3))).toNode(), equalTo("node3"));
 
-        assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4)));
-        assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5"));
-        assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).allowPrimary(), equalTo(true));
+        assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).shardId(), equalTo(new ShardId("test", 4)));
+        assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).node(), equalTo("node5"));
+        assertThat(((CancelAllocationCommand) (sCommands.commands().get(4))).allowPrimary(), equalTo(true));
     }
 }

+ 39 - 11
docs/reference/cluster/reroute.asciidoc

@@ -20,7 +20,7 @@ curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
             }
         },
         {
-          "allocate" : {
+          "allocate_replica" : {
               "index" : "test", "shard" : 1, "node" : "node3"
           }
         }
@@ -64,14 +64,42 @@ The commands supported are:
     from the primary shard by cancelling them and allowing them to be
     reinitialized through the standard reallocation process.
 
-`allocate`::
-    Allocate an unassigned shard to a node. Accepts the
+`allocate_replica`::
+    Allocate an unassigned replica shard to a node. Accepts the
     `index` and `shard` for index name and shard number, and `node` to
-    allocate the shard to. It also accepts `allow_primary` flag to
-    explicitly specify that it is allowed to explicitly allocate a primary
-    shard (might result in data loss).
-
-WARNING: The `allow_primary` parameter will force a new _empty_ primary shard
-to be allocated *without any data*. If a node which has a copy of the original
-primary shard (including data) rejoins the cluster later on, that data will be
-deleted: the old shard copy will be replaced by the new live shard copy.
+    allocate the shard to. Takes <<modules-cluster,allocation deciders>> into account.
+
+Two more commands are available that allow the allocation of a primary shard
+to a node. These commands should however be used with extreme care, as primary
+shard allocation is usually fully automatically handled by Elasticsearch.
+Reasons why a primary shard cannot be automatically allocated include the following:
+
+- A new index was created but there is no node which satisfies the allocation deciders.
+- An up-to-date shard copy of the data cannot be found on the current data nodes in
+the cluster. To prevent data loss, the system does not automatically promote a stale
+shard copy to primary.
+
+As a manual override, two commands to forcefully allocate primary shards
+are available:
+
+`allocate_stale_primary`::
+    Allocate a primary shard to a node that holds a stale copy. Accepts the
+    `index` and `shard` for index name and shard number, and `node` to
+    allocate the shard to. Using this command may lead to data loss
+    for the provided shard id. If a node which has the good copy of the
+    data rejoins the cluster later on, that data will be overwritten with
+    the data of the stale copy that was forcefully allocated with this
+    command. To ensure that these implications are well-understood,
+    this command requires the special field `accept_data_loss` to be
+    explicitly set to `true` for it to work.
+
+`allocate_empty_primary`::
+    Allocate an empty primary shard to a node. Accepts the
+    `index` and `shard` for index name and shard number, and `node` to
+    allocate the shard to. Using this command leads to a complete loss
+    of all data that was indexed into this shard, if it was previously
+    started. If a node which has a copy of the
+    data rejoins the cluster later on, that data will be deleted!
+    To ensure that these implications are well-understood,
+    this command requires the special field `accept_data_loss` to be
+    explicitly set to `true` for it to work.

+ 7 - 0
docs/reference/migration/migrate_3_0.asciidoc

@@ -603,6 +603,13 @@ Allocation IDs assign unique identifiers to shard copies. This allows the cluste
 copies of the same data and track which shards have been active, so that after a cluster restart, shard copies
 containing only the most recent data can become primaries.
 
+=== Reroute commands
+
+The reroute command `allocate` has been split into two distinct commands `allocate_replica` and `allocate_empty_primary`.
+This was done as we introduced a new `allocate_stale_primary` command. The new `allocate_replica` command corresponds to the
+old `allocate` command  with `allow_primary` set to false. The new `allocate_empty_primary` command corresponds to the old
+`allocate` command with `allow_primary` set to true.
+
 ==== `index.shared_filesystem.recover_on_any_node` changes
 
 The behavior of `index.shared_filesystem.recover_on_any_node = true` has been changed. Previously, in the case where no