Browse Source

Allocate primary shards based on allocation ids

Closes #15281
Yannick Welsch 9 years ago
parent
commit
3a442db9bd
26 changed files with 643 additions and 348 deletions
  1. 34 21
      core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java
  2. 8 8
      core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java
  3. 1 1
      core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
  4. 0 9
      core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java
  5. 8 1
      core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java
  6. 3 2
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java
  7. 4 3
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java
  8. 134 64
      core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java
  9. 9 2
      core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
  10. 16 7
      core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java
  11. 2 1
      core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  12. 7 0
      core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java
  13. 2 1
      core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java
  14. 12 9
      core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreResponseTests.java
  15. 103 0
      core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java
  16. 240 112
      core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java
  17. 5 69
      core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java
  18. 13 5
      core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java
  19. 5 3
      core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  20. 4 7
      core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java
  21. 0 13
      docs/reference/index-modules.asciidoc
  22. 2 3
      docs/reference/indices/shadow-replicas.asciidoc
  23. 5 3
      docs/reference/indices/shard-stores.asciidoc
  24. 21 0
      docs/reference/migration/migrate_3_0.asciidoc
  25. 2 3
      docs/reference/modules/cluster/shards_allocation.asciidoc
  26. 3 1
      test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java

+ 34 - 21
core/src/main/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoresResponse.java

@@ -56,13 +56,14 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
     public static class StoreStatus implements Streamable, ToXContent, Comparable<StoreStatus> {
         private DiscoveryNode node;
         private long version;
+        private String allocationId;
         private Throwable storeException;
-        private Allocation allocation;
+        private AllocationStatus allocationStatus;
 
         /**
          * The status of the shard store with respect to the cluster
          */
-        public enum Allocation {
+        public enum AllocationStatus {
 
             /**
              * Allocated as primary
@@ -81,16 +82,16 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
 
             private final byte id;
 
-            Allocation(byte id) {
+            AllocationStatus(byte id) {
                 this.id = id;
             }
 
-            private static Allocation fromId(byte id) {
+            private static AllocationStatus fromId(byte id) {
                 switch (id) {
                     case 0: return PRIMARY;
                     case 1: return REPLICA;
                     case 2: return UNUSED;
-                    default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]");
+                    default: throw new IllegalArgumentException("unknown id for allocation status [" + id + "]");
                 }
             }
 
@@ -99,11 +100,11 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
                     case 0: return "primary";
                     case 1: return "replica";
                     case 2: return "unused";
-                    default: throw new IllegalArgumentException("unknown id for allocation [" + id + "]");
+                    default: throw new IllegalArgumentException("unknown id for allocation status [" + id + "]");
                 }
             }
 
-            private static Allocation readFrom(StreamInput in) throws IOException {
+            private static AllocationStatus readFrom(StreamInput in) throws IOException {
                 return fromId(in.readByte());
             }
 
@@ -115,10 +116,11 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
         private StoreStatus() {
         }
 
-        public StoreStatus(DiscoveryNode node, long version, Allocation allocation, Throwable storeException) {
+        public StoreStatus(DiscoveryNode node, long version, String allocationId, AllocationStatus allocationStatus, Throwable storeException) {
             this.node = node;
             this.version = version;
-            this.allocation = allocation;
+            this.allocationId = allocationId;
+            this.allocationStatus = allocationStatus;
             this.storeException = storeException;
         }
 
@@ -130,13 +132,20 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
         }
 
         /**
-         * Version of the store, used to select the store that will be
-         * used as a primary.
+         * Version of the store
          */
         public long getVersion() {
             return version;
         }
 
+        /**
+         * AllocationStatus id of the store, used to select the store that will be
+         * used as a primary.
+         */
+        public String getAllocationId() {
+            return allocationId;
+        }
+
         /**
          * Exception while trying to open the
          * shard index or from when the shard failed
@@ -146,13 +155,13 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
         }
 
         /**
-         * The allocation status of the store.
-         * {@link Allocation#PRIMARY} indicates a primary shard copy
-         * {@link Allocation#REPLICA} indicates a replica shard copy
-         * {@link Allocation#UNUSED} indicates an unused shard copy
+         * The allocationStatus status of the store.
+         * {@link AllocationStatus#PRIMARY} indicates a primary shard copy
+         * {@link AllocationStatus#REPLICA} indicates a replica shard copy
+         * {@link AllocationStatus#UNUSED} indicates an unused shard copy
          */
-        public Allocation getAllocation() {
-            return allocation;
+        public AllocationStatus getAllocationStatus() {
+            return allocationStatus;
         }
 
         static StoreStatus readStoreStatus(StreamInput in) throws IOException {
@@ -165,7 +174,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
         public void readFrom(StreamInput in) throws IOException {
             node = DiscoveryNode.readNode(in);
             version = in.readLong();
-            allocation = Allocation.readFrom(in);
+            allocationId = in.readOptionalString();
+            allocationStatus = AllocationStatus.readFrom(in);
             if (in.readBoolean()) {
                 storeException = in.readThrowable();
             }
@@ -175,7 +185,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
         public void writeTo(StreamOutput out) throws IOException {
             node.writeTo(out);
             out.writeLong(version);
-            allocation.writeTo(out);
+            out.writeOptionalString(allocationId);
+            allocationStatus.writeTo(out);
             if (storeException != null) {
                 out.writeBoolean(true);
                 out.writeThrowable(storeException);
@@ -188,7 +199,8 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
         public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
             node.toXContent(builder, params);
             builder.field(Fields.VERSION, version);
-            builder.field(Fields.ALLOCATED, allocation.value());
+            builder.field(Fields.ALLOCATION_ID, allocationId);
+            builder.field(Fields.ALLOCATED, allocationStatus.value());
             if (storeException != null) {
                 builder.startObject(Fields.STORE_EXCEPTION);
                 ElasticsearchException.toXContent(builder, params, storeException);
@@ -206,7 +218,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
             } else {
                 int compare = Long.compare(other.version, version);
                 if (compare == 0) {
-                    return Integer.compare(allocation.id, other.allocation.id);
+                    return Integer.compare(allocationStatus.id, other.allocationStatus.id);
                 }
                 return compare;
             }
@@ -379,6 +391,7 @@ public class IndicesShardStoresResponse extends ActionResponse implements ToXCon
         static final XContentBuilderString STORES = new XContentBuilderString("stores");
         // StoreStatus fields
         static final XContentBuilderString VERSION = new XContentBuilderString("version");
+        static final XContentBuilderString ALLOCATION_ID = new XContentBuilderString("allocation_id");
         static final XContentBuilderString STORE_EXCEPTION = new XContentBuilderString("store_exception");
         static final XContentBuilderString ALLOCATED = new XContentBuilderString("allocation");
     }

+ 8 - 8
core/src/main/java/org/elasticsearch/action/admin/indices/shards/TransportIndicesShardStoresAction.java

@@ -179,8 +179,8 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
                     }
                     for (NodeGatewayStartedShards response : fetchResponse.responses) {
                         if (shardExistsInNode(response)) {
-                            IndicesShardStoresResponse.StoreStatus.Allocation allocation = getAllocation(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode());
-                            storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), allocation, response.storeException()));
+                            IndicesShardStoresResponse.StoreStatus.AllocationStatus allocationStatus = getAllocationStatus(fetchResponse.shardId.getIndex(), fetchResponse.shardId.id(), response.getNode());
+                            storeStatuses.add(new IndicesShardStoresResponse.StoreStatus(response.getNode(), response.version(), response.allocationId(), allocationStatus, response.storeException()));
                         }
                     }
                     CollectionUtil.timSort(storeStatuses);
@@ -193,27 +193,27 @@ public class TransportIndicesShardStoresAction extends TransportMasterNodeReadAc
                 listener.onResponse(new IndicesShardStoresResponse(indicesStoreStatusesBuilder.build(), Collections.unmodifiableList(failureBuilder)));
             }
 
-            private IndicesShardStoresResponse.StoreStatus.Allocation getAllocation(String index, int shardID, DiscoveryNode node) {
+            private IndicesShardStoresResponse.StoreStatus.AllocationStatus getAllocationStatus(String index, int shardID, DiscoveryNode node) {
                 for (ShardRouting shardRouting : routingNodes.node(node.id())) {
                     ShardId shardId = shardRouting.shardId();
                     if (shardId.id() == shardID && shardId.getIndex().equals(index)) {
                         if (shardRouting.primary()) {
-                            return IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY;
+                            return IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY;
                         } else if (shardRouting.assignedToNode()) {
-                            return IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA;
+                            return IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA;
                         } else {
-                            return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
+                            return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
                         }
                     }
                 }
-                return IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED;
+                return IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED;
             }
 
             /**
              * A shard exists/existed in a node only if shard state file exists in the node
              */
             private boolean shardExistsInNode(final NodeGatewayStartedShards response) {
-                return response.storeException() != null || response.version() != -1;
+                return response.storeException() != null || response.version() != -1 || response.allocationId() != null;
             }
 
             @Override

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

@@ -621,7 +621,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         public int numberOfReplicas() {
             return settings.getAsInt(SETTING_NUMBER_OF_REPLICAS, -1);
         }
-        
+
         public Builder creationDate(long creationDate) {
             settings = settingsBuilder().put(settings).put(SETTING_CREATION_DATE, creationDate).build();
             return this;

+ 0 - 9
core/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java

@@ -47,7 +47,6 @@ import org.elasticsearch.rest.RestStatus;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Locale;
 
 /**
  * Service responsible for submitting open/close index requests
@@ -92,14 +91,6 @@ public class MetaDataIndexStateService extends AbstractComponent {
                     }
 
                     if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
-                        IndexRoutingTable indexRoutingTable = currentState.routingTable().index(index);
-                        for (IndexShardRoutingTable shard : indexRoutingTable) {
-                            for (ShardRouting shardRouting : shard) {
-                                if (shardRouting.primary() == true && shardRouting.allocatedPostIndexCreate() == false) {
-                                    throw new IndexPrimaryShardNotAllocatedException(new Index(index));
-                                }
-                            }
-                        }
                         indicesToClose.add(index);
                     }
                 }

+ 8 - 1
core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java

@@ -19,6 +19,8 @@
 
 package org.elasticsearch.cluster.routing;
 
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
@@ -267,7 +269,7 @@ public final class ShardRouting implements Streamable, ToXContent {
         return shardIdentifier;
     }
 
-    public boolean allocatedPostIndexCreate() {
+    public boolean allocatedPostIndexCreate(IndexMetaData indexMetaData) {
         if (active()) {
             return true;
         }
@@ -279,6 +281,11 @@ public final class ShardRouting implements Streamable, ToXContent {
             return false;
         }
 
+        if (indexMetaData.activeAllocationIds(id()).isEmpty() && indexMetaData.getCreationVersion().onOrAfter(Version.V_3_0_0)) {
+            // when no shards with this id have ever been active for this index
+            return false;
+        }
+
         return true;
     }
 

+ 3 - 2
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java

@@ -22,13 +22,13 @@ package org.elasticsearch.cluster.routing.allocation.decider;
 import com.carrotsearch.hppc.ObjectLookupContainer;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
-
 import org.elasticsearch.ElasticsearchParseException;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterInfo;
 import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.DiskUsage;
 import org.elasticsearch.cluster.EmptyClusterInfoService;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
@@ -360,7 +360,8 @@ public class DiskThresholdDecider extends AllocationDecider {
         }
 
         // a flag for whether the primary shard has been previously allocated
-        boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate();
+        IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex());
+        boolean primaryHasBeenAllocated = shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData);
 
         // checks for exact byte comparisons
         if (freeBytes < freeBytesThresholdLow.bytes()) {

+ 4 - 3
core/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/EnableAllocationDecider.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.cluster.routing.allocation.decider;
 
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -82,8 +83,8 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
             return allocation.decision(Decision.YES, NAME, "allocation disabling is ignored");
         }
 
-        Settings indexSettings = allocation.routingNodes().metaData().index(shardRouting.index()).getSettings();
-        String enableIndexValue = indexSettings.get(INDEX_ROUTING_ALLOCATION_ENABLE);
+        IndexMetaData indexMetaData = allocation.metaData().index(shardRouting.getIndex());
+        String enableIndexValue = indexMetaData.getSettings().get(INDEX_ROUTING_ALLOCATION_ENABLE);
         final Allocation enable;
         if (enableIndexValue != null) {
             enable = Allocation.parse(enableIndexValue);
@@ -96,7 +97,7 @@ public class EnableAllocationDecider extends AllocationDecider implements NodeSe
             case NONE:
                 return allocation.decision(Decision.NO, NAME, "no allocations are allowed");
             case NEW_PRIMARIES:
-                if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate() == false) {
+                if (shardRouting.primary() && shardRouting.allocatedPostIndexCreate(indexMetaData) == false) {
                     return allocation.decision(Decision.YES, NAME, "new primary allocations are allowed");
                 } else {
                     return allocation.decision(Decision.NO, NAME, "non-new primary allocations are forbidden");

+ 134 - 64
core/src/main/java/org/elasticsearch/gateway/PrimaryShardAllocator.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.gateway;
 
 import org.apache.lucene.util.CollectionUtil;
+import org.elasticsearch.Version;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
@@ -30,8 +31,10 @@ import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
 import org.elasticsearch.cluster.routing.allocation.decider.Decision;
 import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexSettings;
 
 import java.util.*;
+import java.util.stream.Collectors;
 
 /**
  * The primary shard allocator allocates primary shard that were not created as
@@ -39,6 +42,7 @@ import java.util.*;
  */
 public abstract class PrimaryShardAllocator extends AbstractComponent {
 
+    @Deprecated
     public static final String INDEX_RECOVERY_INITIAL_SHARDS = "index.recovery.initial_shards";
 
     private final String initialShards;
@@ -56,13 +60,21 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
 
         final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
         while (unassignedIterator.hasNext()) {
-            ShardRouting shard = unassignedIterator.next();
+            final ShardRouting shard = unassignedIterator.next();
 
-            if (needToFindPrimaryCopy(shard) == false) {
+            if (shard.primary() == false) {
                 continue;
             }
 
-            AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
+            final IndexMetaData indexMetaData = metaData.index(shard.getIndex());
+            final IndexSettings indexSettings = new IndexSettings(indexMetaData, settings, Collections.emptyList());
+
+            if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
+                // when we create a fresh index
+                continue;
+            }
+
+            final AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState = fetchData(shard, allocation);
             if (shardState.hasData() == false) {
                 logger.trace("{}: ignoring allocation, still fetching shard started state", shard);
                 allocation.setHasPendingAsyncFetch();
@@ -70,25 +82,50 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
                 continue;
             }
 
-            IndexMetaData indexMetaData = metaData.index(shard.getIndex());
-            Settings indexSettings = Settings.builder().put(settings).put(indexMetaData.getSettings()).build();
-
-            NodesAndVersions nodesAndVersions = buildNodesAndVersions(shard, recoverOnAnyNode(indexSettings), allocation.getIgnoreNodes(shard.shardId()), shardState);
-            logger.debug("[{}][{}] found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
+            final Set<String> lastActiveAllocationIds = indexMetaData.activeAllocationIds(shard.id());
+            final boolean snapshotRestore = shard.restoreSource() != null;
+            final boolean recoverOnAnyNode = recoverOnAnyNode(indexSettings);
+
+            final NodesAndVersions nodesAndVersions;
+            final boolean enoughAllocationsFound;
+
+            if (lastActiveAllocationIds.isEmpty()) {
+                assert indexSettings.getIndexVersionCreated().before(Version.V_3_0_0) : "trying to allocated a primary with an empty allocation id set, but index is new";
+                // when we load an old index (after upgrading cluster) or restore a snapshot of an old index
+                // fall back to old version-based allocation mode
+                // Note that once the shard has been active, lastActiveAllocationIds will be non-empty
+                nodesAndVersions = buildNodesAndVersions(shard, snapshotRestore || recoverOnAnyNode, allocation.getIgnoreNodes(shard.shardId()), shardState);
+                if (snapshotRestore || recoverOnAnyNode) {
+                    enoughAllocationsFound = nodesAndVersions.allocationsFound > 0;
+                } else {
+                    enoughAllocationsFound = isEnoughVersionBasedAllocationsFound(shard, indexMetaData, nodesAndVersions);
+                }
+                logger.debug("[{}][{}]: version-based allocation for pre-{} index found {} allocations of {}, highest version: [{}]", shard.index(), shard.id(), Version.V_3_0_0, nodesAndVersions.allocationsFound, shard, nodesAndVersions.highestVersion);
+            } else {
+                assert lastActiveAllocationIds.isEmpty() == false;
+                // use allocation ids to select nodes
+                nodesAndVersions = buildAllocationIdBasedNodes(shard, snapshotRestore || recoverOnAnyNode,
+                        allocation.getIgnoreNodes(shard.shardId()), lastActiveAllocationIds, shardState);
+                enoughAllocationsFound = nodesAndVersions.allocationsFound > 0;
+                logger.debug("[{}][{}]: found {} allocations of {} based on allocation ids: [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound, shard, lastActiveAllocationIds);
+            }
 
-            if (isEnoughAllocationsFound(shard, indexMetaData, nodesAndVersions) == false) {
-                // if we are restoring this shard we still can allocate
-                if (shard.restoreSource() == null) {
+            if (enoughAllocationsFound == false){
+                if (snapshotRestore) {
+                    // let BalancedShardsAllocator take care of allocating this shard
+                    logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
+                } else if (recoverOnAnyNode) {
+                    // let BalancedShardsAllocator take care of allocating this shard
+                    logger.debug("[{}][{}]: missing local data, recover from any node", shard.index(), shard.id());
+                } else {
                     // we can't really allocate, so ignore it and continue
                     unassignedIterator.removeAndIgnore();
                     logger.debug("[{}][{}]: not allocating, number_of_allocated_shards_found [{}]", shard.index(), shard.id(), nodesAndVersions.allocationsFound);
-                } else {
-                    logger.debug("[{}][{}]: missing local data, will restore from [{}]", shard.index(), shard.id(), shard.restoreSource());
                 }
                 continue;
             }
 
-            NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesAndVersions);
+            final NodesToAllocate nodesToAllocate = buildNodesToAllocate(shard, allocation, nodesAndVersions.nodes);
             if (nodesToAllocate.yesNodes.isEmpty() == false) {
                 DiscoveryNode node = nodesToAllocate.yesNodes.get(0);
                 logger.debug("[{}][{}]: allocating [{}] to [{}] on primary allocation", shard.index(), shard.id(), shard, node);
@@ -109,63 +146,99 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
     }
 
     /**
-     * Does the shard need to find a primary copy?
+     * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have an allocation id matching
+     * lastActiveAllocationIds are added to the list. Otherwise, any node that has a shard is added to the list, but
+     * entries with matching allocation id are always at the front of the list.
      */
-    boolean needToFindPrimaryCopy(ShardRouting shard) {
-        if (shard.primary() == false) {
-            return false;
-        }
+    protected NodesAndVersions buildAllocationIdBasedNodes(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
+                                                           Set<String> lastActiveAllocationIds, AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
+        List<DiscoveryNode> matchingNodes = new ArrayList<>();
+        List<DiscoveryNode> nonMatchingNodes = new ArrayList<>();
+        long highestVersion = -1;
+        for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : shardState.getData().values()) {
+            DiscoveryNode node = nodeShardState.getNode();
+            String allocationId = nodeShardState.allocationId();
+
+            if (ignoreNodes.contains(node.id())) {
+                continue;
+            }
+
+            if (nodeShardState.storeException() == null) {
+                if (allocationId == null && nodeShardState.version() != -1) {
+                    // old shard with no allocation id, assign dummy value so that it gets added below in case of matchAnyShard
+                    allocationId = "_n/a_";
+                }
+
+                logger.trace("[{}] on node [{}] has allocation id [{}] of shard", shard, nodeShardState.getNode(), allocationId);
+            } else {
+                logger.trace("[{}] on node [{}] has allocation id [{}] but the store can not be opened, treating as no allocation id", nodeShardState.storeException(), shard, nodeShardState.getNode(), allocationId);
+                allocationId = null;
+            }
 
-        // this is an API allocation, ignore since we know there is no data...
-        if (shard.allocatedPostIndexCreate() == false) {
-            return false;
+            if (allocationId != null) {
+                if (lastActiveAllocationIds.contains(allocationId)) {
+                    matchingNodes.add(node);
+                    highestVersion = Math.max(highestVersion, nodeShardState.version());
+                } else if (matchAnyShard) {
+                    nonMatchingNodes.add(node);
+                    highestVersion = Math.max(highestVersion, nodeShardState.version());
+                }
+            }
         }
 
-        return true;
+        List<DiscoveryNode> nodes = new ArrayList<>();
+        nodes.addAll(matchingNodes);
+        nodes.addAll(nonMatchingNodes);
+
+        if (logger.isTraceEnabled()) {
+            logger.trace("{} candidates for allocation: {}", shard, nodes.stream().map(DiscoveryNode::name).collect(Collectors.joining(", ")));
+        }
+        return new NodesAndVersions(nodes, nodes.size(), highestVersion);
     }
 
-    private boolean isEnoughAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) {
+    /**
+     * used by old version-based allocation
+     */
+    private boolean isEnoughVersionBasedAllocationsFound(ShardRouting shard, IndexMetaData indexMetaData, NodesAndVersions nodesAndVersions) {
         // check if the counts meets the minimum set
         int requiredAllocation = 1;
         // if we restore from a repository one copy is more then enough
-        if (shard.restoreSource() == null) {
-            try {
-                String initialShards = indexMetaData.getSettings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
-                if ("quorum".equals(initialShards)) {
-                    if (indexMetaData.getNumberOfReplicas() > 1) {
-                        requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2) + 1;
-                    }
-                } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
-                    if (indexMetaData.getNumberOfReplicas() > 2) {
-                        requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2);
-                    }
-                } else if ("one".equals(initialShards)) {
-                    requiredAllocation = 1;
-                } else if ("full".equals(initialShards) || "all".equals(initialShards)) {
-                    requiredAllocation = indexMetaData.getNumberOfReplicas() + 1;
-                } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
-                    if (indexMetaData.getNumberOfReplicas() > 1) {
-                        requiredAllocation = indexMetaData.getNumberOfReplicas();
-                    }
-                } else {
-                    requiredAllocation = Integer.parseInt(initialShards);
+        try {
+            String initialShards = indexMetaData.getSettings().get(INDEX_RECOVERY_INITIAL_SHARDS, settings.get(INDEX_RECOVERY_INITIAL_SHARDS, this.initialShards));
+            if ("quorum".equals(initialShards)) {
+                if (indexMetaData.getNumberOfReplicas() > 1) {
+                    requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2) + 1;
                 }
-            } catch (Exception e) {
-                logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
+            } else if ("quorum-1".equals(initialShards) || "half".equals(initialShards)) {
+                if (indexMetaData.getNumberOfReplicas() > 2) {
+                    requiredAllocation = ((1 + indexMetaData.getNumberOfReplicas()) / 2);
+                }
+            } else if ("one".equals(initialShards)) {
+                requiredAllocation = 1;
+            } else if ("full".equals(initialShards) || "all".equals(initialShards)) {
+                requiredAllocation = indexMetaData.getNumberOfReplicas() + 1;
+            } else if ("full-1".equals(initialShards) || "all-1".equals(initialShards)) {
+                if (indexMetaData.getNumberOfReplicas() > 1) {
+                    requiredAllocation = indexMetaData.getNumberOfReplicas();
+                }
+            } else {
+                requiredAllocation = Integer.parseInt(initialShards);
             }
+        } catch (Exception e) {
+            logger.warn("[{}][{}] failed to derived initial_shards from value {}, ignore allocation for {}", shard.index(), shard.id(), initialShards, shard);
         }
 
         return nodesAndVersions.allocationsFound >= requiredAllocation;
     }
 
     /**
-     * Based on the nodes and versions, build the list of yes/no/throttle nodes that the shard applies to.
+     * Split the list of nodes to lists of yes/no/throttle nodes based on allocation deciders
      */
-    private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, NodesAndVersions nodesAndVersions) {
+    private NodesToAllocate buildNodesToAllocate(ShardRouting shard, RoutingAllocation allocation, List<DiscoveryNode> nodes) {
         List<DiscoveryNode> yesNodes = new ArrayList<>();
         List<DiscoveryNode> throttledNodes = new ArrayList<>();
         List<DiscoveryNode> noNodes = new ArrayList<>();
-        for (DiscoveryNode discoNode : nodesAndVersions.nodes) {
+        for (DiscoveryNode discoNode : nodes) {
             RoutingNode node = allocation.routingNodes().node(discoNode.id());
             if (node == null) {
                 continue;
@@ -184,9 +257,11 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
     }
 
     /**
-     * Builds a list of nodes and version
+     * Builds a list of nodes. If matchAnyShard is set to false, only nodes that have the highest shard version
+     * are added to the list. Otherwise, any node that has a shard is added to the list, but entries with highest
+     * version are always at the front of the list.
      */
-    NodesAndVersions buildNodesAndVersions(ShardRouting shard, boolean recoveryOnAnyNode, Set<String> ignoreNodes,
+    NodesAndVersions buildNodesAndVersions(ShardRouting shard, boolean matchAnyShard, Set<String> ignoreNodes,
                                            AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> shardState) {
         final Map<DiscoveryNode, Long> nodesWithVersion = new HashMap<>();
         int numberOfAllocationsFound = 0;
@@ -208,20 +283,15 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
                 version = -1;
             }
 
-            if (recoveryOnAnyNode) {
-                numberOfAllocationsFound++;
-                if (version > highestVersion) {
-                    highestVersion = version;
-                }
-                // We always put the node without clearing the map
-                nodesWithVersion.put(node, version);
-            } else if (version != -1) {
+            if (version != -1) {
                 numberOfAllocationsFound++;
                 // If we've found a new "best" candidate, clear the
                 // current candidates and add it
                 if (version > highestVersion) {
                     highestVersion = version;
-                    nodesWithVersion.clear();
+                    if (matchAnyShard == false) {
+                        nodesWithVersion.clear();
+                    }
                     nodesWithVersion.put(node, version);
                 } else if (version == highestVersion) {
                     // If the candidate is the same, add it to the
@@ -258,9 +328,9 @@ public abstract class PrimaryShardAllocator extends AbstractComponent {
      * Return {@code true} if the index is configured to allow shards to be
      * recovered on any node
      */
-    private boolean recoverOnAnyNode(Settings idxSettings) {
-        return IndexMetaData.isOnSharedFilesystem(idxSettings) &&
-                idxSettings.getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false);
+    private boolean recoverOnAnyNode(IndexSettings indexSettings) {
+        return indexSettings.isOnSharedFilesystem()
+            && indexSettings.getSettings().getAsBoolean(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, false);
     }
 
     protected abstract AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation);

+ 9 - 2
core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java

@@ -24,6 +24,8 @@ import com.carrotsearch.hppc.ObjectLongMap;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectLongCursor;
 import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.RoutingNodes;
@@ -56,6 +58,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
      */
     public boolean processExistingRecoveries(RoutingAllocation allocation) {
         boolean changed = false;
+        MetaData metaData = allocation.metaData();
         for (RoutingNodes.RoutingNodesIterator nodes = allocation.routingNodes().nodes(); nodes.hasNext(); ) {
             nodes.next();
             for (RoutingNodes.RoutingNodeIterator it = nodes.nodeShards(); it.hasNext(); ) {
@@ -69,8 +72,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
                 if (shard.relocatingNodeId() != null) {
                     continue;
                 }
+
                 // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
-                if (shard.allocatedPostIndexCreate() == false) {
+                IndexMetaData indexMetaData = metaData.index(shard.getIndex());
+                if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
                     continue;
                 }
 
@@ -114,6 +119,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
         boolean changed = false;
         final RoutingNodes routingNodes = allocation.routingNodes();
         final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
+        MetaData metaData = allocation.metaData();
         while (unassignedIterator.hasNext()) {
             ShardRouting shard = unassignedIterator.next();
             if (shard.primary()) {
@@ -121,7 +127,8 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
             }
 
             // if we are allocating a replica because of index creation, no need to go and find a copy, there isn't one...
-            if (shard.allocatedPostIndexCreate() == false) {
+            IndexMetaData indexMetaData = metaData.index(shard.getIndex());
+            if (shard.allocatedPostIndexCreate(indexMetaData) == false) {
                 continue;
             }
 

+ 16 - 7
core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java

@@ -139,7 +139,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
                         Store.tryOpenIndex(shardPath.resolveIndex());
                     } catch (Exception exception) {
                         logger.trace("{} can't open index for shard [{}] in path [{}]", exception, shardId, shardStateMetaData, (shardPath != null) ? shardPath.resolveIndex() : "");
-                        return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, exception);
+                        String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
+                        return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId, exception);
                     }
                 }
                 // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
@@ -149,11 +150,12 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
                     logger.warn("{} shard state info found but indexUUID didn't match expected [{}] actual [{}]", shardId, indexUUID, shardStateMetaData.indexUUID);
                 } else {
                     logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
-                    return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version);
+                    String allocationId = shardStateMetaData.allocationId != null ? shardStateMetaData.allocationId.getId() : null;
+                    return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version, allocationId);
                 }
             }
             logger.trace("{} no local shard info found", shardId);
-            return new NodeGatewayStartedShards(clusterService.localNode(), -1);
+            return new NodeGatewayStartedShards(clusterService.localNode(), -1, null);
         } catch (Exception e) {
             throw new ElasticsearchException("failed to load started shards", e);
         }
@@ -277,17 +279,19 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
     public static class NodeGatewayStartedShards extends BaseNodeResponse {
 
         private long version = -1;
+        private String allocationId = null;
         private Throwable storeException = null;
 
         public NodeGatewayStartedShards() {
         }
-        public NodeGatewayStartedShards(DiscoveryNode node, long version) {
-            this(node, version, null);
+        public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId) {
+            this(node, version, allocationId, null);
         }
 
-        public NodeGatewayStartedShards(DiscoveryNode node, long version, Throwable storeException) {
+        public NodeGatewayStartedShards(DiscoveryNode node, long version, String allocationId, Throwable storeException) {
             super(node);
             this.version = version;
+            this.allocationId = allocationId;
             this.storeException = storeException;
         }
 
@@ -295,6 +299,10 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
             return this.version;
         }
 
+        public String allocationId() {
+            return this.allocationId;
+        }
+
         public Throwable storeException() {
             return this.storeException;
         }
@@ -303,16 +311,17 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction
         public void readFrom(StreamInput in) throws IOException {
             super.readFrom(in);
             version = in.readLong();
+            allocationId = in.readOptionalString();
             if (in.readBoolean()) {
                 storeException = in.readThrowable();
             }
-
         }
 
         @Override
         public void writeTo(StreamOutput out) throws IOException {
             super.writeTo(out);
             out.writeLong(version);
+            out.writeOptionalString(allocationId);
             if (storeException != null) {
                 out.writeBoolean(true);
                 out.writeThrowable(storeException);

+ 2 - 1
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1099,7 +1099,8 @@ public class IndexShard extends AbstractIndexShardComponent {
         // we are the first primary, recover from the gateway
         // if its post api allocation, the index should exists
         assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
-        final boolean shouldExist = shardRouting.allocatedPostIndexCreate();
+        boolean shouldExist = shardRouting.allocatedPostIndexCreate(idxSettings.getIndexMetaData());
+
         StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
         return storeRecovery.recoverFromStore(this, shouldExist, localNode);
     }

+ 7 - 0
core/src/test/java/org/elasticsearch/action/admin/indices/create/CreateIndexIT.java

@@ -285,4 +285,11 @@ public class CreateIndexIT extends ESIntegTestCase {
             assertThat(messages.toString(), containsString("mapper [text] is used by multiple types"));
         }
     }
+
+    public void testRestartIndexCreationAfterFullClusterRestart() throws Exception {
+        client().admin().cluster().prepareUpdateSettings().setTransientSettings(Settings.builder().put("cluster.routing.allocation.enable", "none")).get();
+        client().admin().indices().prepareCreate("test").setSettings(indexSettings()).get();
+        internalCluster().fullRestart();
+        ensureGreen("test");
+    }
 }

+ 2 - 1
core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreRequestIT.java

@@ -87,6 +87,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
         for (ObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> shardStoreStatuses : shardStores.values()) {
             for (IndicesShardStoresResponse.StoreStatus storeStatus : shardStoreStatuses.value) {
                 assertThat(storeStatus.getVersion(), greaterThan(-1l));
+                assertThat(storeStatus.getAllocationId(), notNullValue());
                 assertThat(storeStatus.getNode(), notNullValue());
                 assertThat(storeStatus.getStoreException(), nullValue());
             }
@@ -108,7 +109,7 @@ public class IndicesShardStoreRequestIT extends ESIntegTestCase {
         assertThat(shardStoresStatuses.size(), equalTo(unassignedShards.size()));
         for (IntObjectCursor<List<IndicesShardStoresResponse.StoreStatus>> storesStatus : shardStoresStatuses) {
             assertThat("must report for one store", storesStatus.value.size(), equalTo(1));
-            assertThat("reported store should be primary", storesStatus.value.get(0).getAllocation(), equalTo(IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY));
+            assertThat("reported store should be primary", storesStatus.value.get(0).getAllocationStatus(), equalTo(IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY));
         }
         logger.info("--> enable allocation");
         enableAllocation(index);

+ 12 - 9
core/src/test/java/org/elasticsearch/action/admin/indices/shards/IndicesShardStoreResponseTests.java

@@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.shards;
 import org.apache.lucene.util.CollectionUtil;
 import org.elasticsearch.Version;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.ImmutableOpenIntMap;
 import org.elasticsearch.common.collect.ImmutableOpenMap;
@@ -44,9 +45,9 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
         DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT);
         DiscoveryNode node2 = new DiscoveryNode("node2", DummyTransportAddress.INSTANCE, Version.CURRENT);
         List<IndicesShardStoresResponse.StoreStatus> storeStatusList = new ArrayList<>();
-        storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
-        storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, 2, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, null));
-        storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED, new IOException("corrupted")));
+        storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, null, IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
+        storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node2, 2, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
+        storeStatusList.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, new IOException("corrupted")));
         storeStatuses.put(0, storeStatusList);
         storeStatuses.put(1, storeStatusList);
         ImmutableOpenIntMap<List<IndicesShardStoresResponse.StoreStatus>> storesMap = storeStatuses.build();
@@ -89,8 +90,10 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
                         IndicesShardStoresResponse.StoreStatus storeStatus = storeStatusList.get(i);
                         assertThat(storeInfo.containsKey("version"), equalTo(true));
                         assertThat(((int) storeInfo.get("version")), equalTo(((int) storeStatus.getVersion())));
+                        assertThat(storeInfo.containsKey("allocation_id"), equalTo(true));
+                        assertThat(((String) storeInfo.get("allocation_id")), equalTo((storeStatus.getAllocationId())));
                         assertThat(storeInfo.containsKey("allocation"), equalTo(true));
-                        assertThat(((String) storeInfo.get("allocation")), equalTo(storeStatus.getAllocation().value()));
+                        assertThat(((String) storeInfo.get("allocation")), equalTo(storeStatus.getAllocationStatus().value()));
                         assertThat(storeInfo.containsKey(storeStatus.getNode().id()), equalTo(true));
                         if (storeStatus.getStoreException() != null) {
                             assertThat(storeInfo.containsKey("store_exception"), equalTo(true));
@@ -104,11 +107,11 @@ public class IndicesShardStoreResponseTests extends ESTestCase {
     public void testStoreStatusOrdering() throws Exception {
         DiscoveryNode node1 = new DiscoveryNode("node1", DummyTransportAddress.INSTANCE, Version.CURRENT);
         List<IndicesShardStoresResponse.StoreStatus> orderedStoreStatuses = new ArrayList<>();
-        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 2, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
-        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.PRIMARY, null));
-        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, null));
-        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, IndicesShardStoresResponse.StoreStatus.Allocation.UNUSED, null));
-        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, IndicesShardStoresResponse.StoreStatus.Allocation.REPLICA, new IOException("corrupted")));
+        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 2, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
+        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.PRIMARY, null));
+        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, null));
+        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 1, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.UNUSED, null));
+        orderedStoreStatuses.add(new IndicesShardStoresResponse.StoreStatus(node1, 3, Strings.randomBase64UUID(), IndicesShardStoresResponse.StoreStatus.AllocationStatus.REPLICA, new IOException("corrupted")));
 
         List<IndicesShardStoresResponse.StoreStatus> storeStatuses = new ArrayList<>(orderedStoreStatuses);
         Collections.shuffle(storeStatuses, random());

+ 103 - 0
core/src/test/java/org/elasticsearch/cluster/routing/PrimaryAllocationIT.java

@@ -0,0 +1,103 @@
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.gateway.GatewayAllocator;
+import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.InternalTestCluster;
+import org.elasticsearch.test.disruption.NetworkDisconnectPartition;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+
+import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
+import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
+import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
+import static org.hamcrest.Matchers.equalTo;
+
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
+@ESIntegTestCase.SuppressLocalMode
+public class PrimaryAllocationIT extends ESIntegTestCase {
+
+    public void testDoNotAllowStaleReplicasToBePromotedToPrimary() throws Exception {
+        logger.info("--> starting 3 nodes, 1 master, 2 data");
+        String master = internalCluster().startMasterOnlyNode(Settings.EMPTY);
+        internalCluster().startDataOnlyNodesAsync(2).get();
+
+        assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
+            .put("index.number_of_shards", 1).put("index.number_of_replicas", 1)).get());
+        ensureGreen();
+        logger.info("--> indexing...");
+        client().prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
+        refresh();
+
+        ClusterState state = client().admin().cluster().prepareState().all().get().getState();
+        List<ShardRouting> shards = state.routingTable().allShards("test");
+        assertThat(shards.size(), equalTo(2));
+
+        final String primaryNode;
+        final String replicaNode;
+        if (shards.get(0).primary()) {
+            primaryNode = state.getRoutingNodes().node(shards.get(0).currentNodeId()).node().name();
+            replicaNode = state.getRoutingNodes().node(shards.get(1).currentNodeId()).node().name();
+        } else {
+            primaryNode = state.getRoutingNodes().node(shards.get(1).currentNodeId()).node().name();
+            replicaNode = state.getRoutingNodes().node(shards.get(0).currentNodeId()).node().name();
+        }
+
+        NetworkDisconnectPartition partition = new NetworkDisconnectPartition(
+            new HashSet<>(Arrays.asList(master, replicaNode)), Collections.singleton(primaryNode), random());
+        internalCluster().setDisruptionScheme(partition);
+        logger.info("--> partitioning node with primary shard from rest of cluster");
+        partition.startDisrupting();
+
+        ensureStableCluster(2, master);
+
+        logger.info("--> index a document into previous replica shard (that is now primary)");
+        client(replicaNode).prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
+
+        logger.info("--> shut down node that has new acknowledged document");
+        internalCluster().stopRandomNode(InternalTestCluster.nameFilter(replicaNode));
+
+        ensureStableCluster(1, master);
+
+        partition.stopDisrupting();
+
+        logger.info("--> waiting for node with old primary shard to rejoin the cluster");
+        ensureStableCluster(2, master);
+
+        logger.info("--> check that old primary shard does not get promoted to primary again");
+        // kick reroute and wait for all shard states to be fetched
+        client(master).admin().cluster().prepareReroute().get();
+        assertBusy(() -> assertThat(internalCluster().getInstance(GatewayAllocator.class, master).getNumberOfInFlightFetch(), equalTo(0)));
+        // kick reroute a second time and check that all shards are unassigned
+        assertThat(client(master).admin().cluster().prepareReroute().get().getState().getRoutingNodes().unassigned().size(), equalTo(2));
+
+        logger.info("--> starting node that reuses data folder with the up-to-date primary shard");
+        internalCluster().startDataOnlyNode(Settings.EMPTY);
+
+        logger.info("--> check that the up-to-date primary shard gets promoted and that documents are available");
+        ensureYellow("test");
+        assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l);
+    }
+
+    public void testNotWaitForQuorumCopies() throws Exception {
+        logger.info("--> starting 3 nodes");
+        internalCluster().startNodesAsync(3).get();
+        logger.info("--> creating index with 1 primary and 2 replicas");
+        assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder()
+            .put("index.number_of_shards", randomIntBetween(1, 3)).put("index.number_of_replicas", 2)).get());
+        ensureGreen("test");
+        client().prepareIndex("test", "type1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
+        logger.info("--> removing 2 nodes from cluster");
+        internalCluster().stopRandomDataNode();
+        internalCluster().stopRandomDataNode();
+        internalCluster().fullRestart();
+        logger.info("--> checking that index still gets allocated with only 1 shard copy being available");
+        ensureYellow("test");
+        assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 1l);
+    }
+}

+ 240 - 112
core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java

@@ -36,7 +36,7 @@ import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESAllocationTestCase;
 import org.junit.Before;
 
-import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -59,25 +59,29 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
         this.testAllocator = new TestAllocator();
     }
 
-    /**
-     * Verifies that the canProcess method of primary allocation behaves correctly
-     * and processes only the applicable shard.
-     */
-    public void testNoProcessReplica() {
-        ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
-        assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
-    }
-
-    public void testNoProcessPrimayNotAllcoatedBefore() {
-        ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, true, ShardRoutingState.UNASSIGNED, 0, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
-        assertThat(testAllocator.needToFindPrimaryCopy(shard), equalTo(false));
+    public void testNoProcessPrimaryNotAllocatedBefore() {
+        final RoutingAllocation allocation;
+        if (randomBoolean()) {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), randomBoolean(), Version.CURRENT);
+        } else {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), true, Version.V_2_1_0);
+        }
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(false));
+        assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
+        assertThat(allocation.routingNodes().unassigned().iterator().next().shardId(), equalTo(shardId));
     }
 
     /**
      * Tests that when async fetch returns that there is no data, the shard will not be allocated.
      */
     public void testNoAsyncFetchData() {
-        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
+        final RoutingAllocation allocation;
+        if (randomBoolean()) {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId");
+        } else {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
+        }
         boolean changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(false));
         assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@@ -85,11 +89,29 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
     }
 
     /**
-     * Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned.
+     * Tests when the node returns that no data was found for it (-1 for version and null for allocation id),
+     * it will be moved to ignore unassigned.
      */
     public void testNoAllocationFound() {
-        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
-        testAllocator.addData(node1, -1);
+        final RoutingAllocation allocation;
+        if (randomBoolean()) {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "allocId");
+        } else {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_0);
+        }
+        testAllocator.addData(node1, -1, null);
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(false));
+        assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
+        assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
+    }
+
+    /**
+     * Tests when the node returns data with a shard allocation id that does not match active allocation ids, it will be moved to ignore unassigned.
+     */
+    public void testNoMatchingAllocationIdFound() {
+        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.CURRENT, "id2");
+        testAllocator.addData(node1, 1, "id1");
         boolean changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(false));
         assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@@ -97,11 +119,31 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
     }
 
     /**
-     * Tests when the node returns that no data was found for it (-1), it will be moved to ignore unassigned.
+     * Tests that when there is a node to allocate the shard to, and there are no active allocation ids, it will be allocated to it.
+     * This is the case when we have old shards from pre-3.0 days.
+     */
+    public void testNoActiveAllocationIds() {
+        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
+        testAllocator.addData(node1, 1, null);
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(true));
+        assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
+        assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
+        assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node1.id()));
+    }
+
+    /**
+     * Tests when the node returns that no data was found for it, it will be moved to ignore unassigned.
      */
     public void testStoreException() {
-        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
-        testAllocator.addData(node1, 3, new CorruptIndexException("test", "test"));
+        final RoutingAllocation allocation;
+        if (randomBoolean()) {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
+            testAllocator.addData(node1, 1, "allocId1", new CorruptIndexException("test", "test"));
+        } else {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_1_1);
+            testAllocator.addData(node1, 3, null, new CorruptIndexException("test", "test"));
+        }
         boolean changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(false));
         assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@@ -112,8 +154,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
      * Tests that when there is a node to allocate the shard to, it will be allocated to it.
      */
     public void testFoundAllocationAndAllocating() {
-        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
-        testAllocator.addData(node1, 10);
+        final RoutingAllocation allocation;
+        if (randomBoolean()) {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
+            testAllocator.addData(node1, 1, "allocId1");
+        } else {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_2_0);
+            testAllocator.addData(node1, 3, null);
+        }
         boolean changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(true));
         assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@@ -126,8 +174,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
      * it will be moved to ignore unassigned until it can be allocated to.
      */
     public void testFoundAllocationButThrottlingDecider() {
-        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders());
-        testAllocator.addData(node1, 10);
+        final RoutingAllocation allocation;
+        if (randomBoolean()) {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
+            testAllocator.addData(node1, 1, "allocId1");
+        } else {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(throttleAllocationDeciders(), false, Version.V_2_2_0);
+            testAllocator.addData(node1, 3, null);
+        }
         boolean changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(false));
         assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@@ -139,8 +193,14 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
      * force the allocation to it.
      */
     public void testFoundAllocationButNoDecider() {
-        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders());
-        testAllocator.addData(node1, 10);
+        final RoutingAllocation allocation;
+        if (randomBoolean()) {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, randomFrom(Version.V_2_0_0, Version.CURRENT), "allocId1");
+            testAllocator.addData(node1, 1, "allocId1");
+        } else {
+            allocation = routingAllocationWithOnePrimaryNoReplicas(noAllocationDeciders(), false, Version.V_2_0_0);
+            testAllocator.addData(node1, 3, null);
+        }
         boolean changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(true));
         assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@@ -149,11 +209,11 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
     }
 
     /**
-     * Tests that the highest version node is chosed for allocation.
+     * Tests that the highest version node is chosen for allocation.
      */
-    public void testAllocateToTheHighestVersion() {
-        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders());
-        testAllocator.addData(node1, 10).addData(node2, 12);
+    public void testAllocateToTheHighestVersionOnLegacyIndex() {
+        RoutingAllocation allocation = routingAllocationWithOnePrimaryNoReplicas(yesAllocationDeciders(), false, Version.V_2_0_0);
+        testAllocator.addData(node1, 10, null).addData(node2, 12, null);
         boolean changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(true));
         assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
@@ -162,35 +222,150 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
     }
 
     /**
-     * Tests that when restoring from snapshot, even if we didn't find any node to allocate on, the shard
-     * will remain in the unassigned list to be allocated later.
+     * Tests that when restoring from a snapshot and we find a node with a shard copy and allocation
+     * deciders say yes, we allocate to that node.
+     */
+    public void testRestore() {
+        RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
+        testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(true));
+        assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
+        assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
+    }
+
+    /**
+     * Tests that when restoring from a snapshot and we find a node with a shard copy and allocation
+     * deciders say throttle, we add it to ignored shards.
+     */
+    public void testRestoreThrottle() {
+        RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
+        testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(false));
+        assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
+    }
+
+    /**
+     * Tests that when restoring from a snapshot and we find a node with a shard copy but allocation
+     * deciders say no, we still allocate to that node.
+     */
+    public void testRestoreForcesAllocateIfShardAvailable() {
+        RoutingAllocation allocation = getRestoreRoutingAllocation(noAllocationDeciders());
+        testAllocator.addData(node1, 1, randomFrom(null, "some allocId"));
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(true));
+        assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
+        assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
+    }
+
+    /**
+     * Tests that when restoring from a snapshot and we don't find a node with a shard copy, the shard will remain in
+     * the unassigned list to be allocated later.
      */
-    public void testRestoreIgnoresNoNodesToAllocate() {
+    public void testRestoreDoesNotAssignIfNoShardAvailable() {
+        RoutingAllocation allocation = getRestoreRoutingAllocation(yesAllocationDeciders());
+        testAllocator.addData(node1, -1, null);
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(false));
+        assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
+        assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
+    }
+
+    private RoutingAllocation getRestoreRoutingAllocation(AllocationDeciders allocationDeciders) {
+        Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
         MetaData metaData = MetaData.builder()
-                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
-                .build();
+            .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)).numberOfShards(1).numberOfReplicas(0)
+                .putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet()))
+            .build();
+
         RoutingTable routingTable = RoutingTable.builder()
-                .addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), Version.CURRENT, shardId.getIndex()))
-                .build();
+            .addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), version, shardId.getIndex()))
+            .build();
         ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
-                .metaData(metaData)
-                .routingTable(routingTable)
-                .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
-        RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null, System.nanoTime());
+            .metaData(metaData)
+            .routingTable(routingTable)
+            .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
+        return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
+    }
+
+    /**
+     * Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy and allocation
+     * deciders say yes, we allocate to that node.
+     */
+    public void testRecoverOnAnyNode() {
+        RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
+        testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(true));
+        assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
+        assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
+    }
+
+    /**
+     * Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy and allocation
+     * deciders say throttle, we add it to ignored shards.
+     */
+    public void testRecoverOnAnyNodeThrottle() {
+        RoutingAllocation allocation = getRestoreRoutingAllocation(throttleAllocationDeciders());
+        testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(false));
+        assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(false));
+    }
 
-        testAllocator.addData(node1, -1).addData(node2, -1);
+    /**
+     * Tests that when recovering using "recover_on_any_node" and we find a node with a shard copy but allocation
+     * deciders say no, we still allocate to that node.
+     */
+    public void testRecoverOnAnyNodeForcesAllocateIfShardAvailable() {
+        RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(noAllocationDeciders());
+        testAllocator.addData(node1, 1, randomFrom(null, "allocId"));
+        boolean changed = testAllocator.allocateUnassigned(allocation);
+        assertThat(changed, equalTo(true));
+        assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
+        assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
+    }
+
+    /**
+     * Tests that when recovering using "recover_on_any_node" and we don't find a node with a shard copy we let
+     * BalancedShardAllocator assign the shard
+     */
+    public void testRecoverOnAnyNodeDoesNotAssignIfNoShardAvailable() {
+        RoutingAllocation allocation = getRecoverOnAnyNodeRoutingAllocation(yesAllocationDeciders());
+        testAllocator.addData(node1, -1, null);
         boolean changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(false));
         assertThat(allocation.routingNodes().unassigned().ignored().isEmpty(), equalTo(true));
+        assertThat(allocation.routingNodes().unassigned().size(), equalTo(1));
+    }
+
+    private RoutingAllocation getRecoverOnAnyNodeRoutingAllocation(AllocationDeciders allocationDeciders) {
+        Version version = randomFrom(Version.CURRENT, Version.V_2_0_0);
+        MetaData metaData = MetaData.builder()
+            .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version)
+                .put(IndexMetaData.SETTING_SHARED_FILESYSTEM, true)
+                .put(IndexMetaData.SETTING_SHARED_FS_ALLOW_RECOVERY_ON_ANY_NODE, true))
+                .numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, version == Version.CURRENT ? new HashSet<>(Arrays.asList("allocId")) : Collections.emptySet()))
+            .build();
+
+        RoutingTable routingTable = RoutingTable.builder()
+            .addAsRestore(metaData.index(shardId.getIndex()), new RestoreSource(new SnapshotId("test", "test"), Version.CURRENT, shardId.getIndex()))
+            .build();
+        ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
+            .metaData(metaData)
+            .routingTable(routingTable)
+            .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
+        return new RoutingAllocation(allocationDeciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
     }
 
     /**
      * Tests that only when enough copies of the shard exists we are going to allocate it. This test
      * verifies that with same version (1), and quorum allocation.
      */
-    public void testEnoughCopiesFoundForAllocation() {
+    public void testEnoughCopiesFoundForAllocationOnLegacyIndex() {
         MetaData metaData = MetaData.builder()
-                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
+                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.V_2_0_0)).numberOfShards(1).numberOfReplicas(2))
                 .build();
         RoutingTable routingTable = RoutingTable.builder()
                 .addAsRecovery(metaData.index(shardId.getIndex()))
@@ -207,7 +382,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
         assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
         assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
 
-        testAllocator.addData(node1, 1);
+        testAllocator.addData(node1, 1, null);
         allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
         changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(false));
@@ -215,7 +390,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
         assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
         assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
 
-        testAllocator.addData(node2, 1);
+        testAllocator.addData(node2, 1, null);
         allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
         changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(true));
@@ -229,9 +404,9 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
      * Tests that only when enough copies of the shard exists we are going to allocate it. This test
      * verifies that even with different version, we treat different versions as a copy, and count them.
      */
-    public void testEnoughCopiesFoundForAllocationWithDifferentVersion() {
+    public void testEnoughCopiesFoundForAllocationOnLegacyIndexWithDifferentVersion() {
         MetaData metaData = MetaData.builder()
-                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
+                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.V_2_0_0)).numberOfShards(1).numberOfReplicas(2))
                 .build();
         RoutingTable routingTable = RoutingTable.builder()
                 .addAsRecovery(metaData.index(shardId.getIndex()))
@@ -248,7 +423,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
         assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
         assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
 
-        testAllocator.addData(node1, 1);
+        testAllocator.addData(node1, 1, null);
         allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
         changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(false));
@@ -256,7 +431,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
         assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
         assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
 
-        testAllocator.addData(node2, 2);
+        testAllocator.addData(node2, 2, null);
         allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
         changed = testAllocator.allocateUnassigned(allocation);
         assertThat(changed, equalTo(true));
@@ -266,67 +441,20 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
         assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).get(0).currentNodeId(), equalTo(node2.id()));
     }
 
-    public void testAllocationOnAnyNodeWithSharedFs() {
-        ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false,
-                ShardRoutingState.UNASSIGNED, 0,
-                new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
-
-        Map<DiscoveryNode, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> data = new HashMap<>();
-        data.put(node1, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node1, 1));
-        data.put(node2, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node2, 5));
-        data.put(node3, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node3, -1));
-        AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetches =
-                new AsyncShardFetch.FetchResult(shardId, data, new HashSet<>(), new HashSet<>());
-
-        PrimaryShardAllocator.NodesAndVersions nAndV = testAllocator.buildNodesAndVersions(shard, false, new HashSet<String>(), fetches);
-        assertThat(nAndV.allocationsFound, equalTo(2));
-        assertThat(nAndV.highestVersion, equalTo(5L));
-        assertThat(nAndV.nodes, contains(node2));
-
-        nAndV = testAllocator.buildNodesAndVersions(shard, true, new HashSet<String>(), fetches);
-        assertThat(nAndV.allocationsFound, equalTo(3));
-        assertThat(nAndV.highestVersion, equalTo(5L));
-        // All three nodes are potential candidates because shards can be recovered on any node
-        assertThat(nAndV.nodes, contains(node2, node1, node3));
-    }
-
-    public void testAllocationOnAnyNodeShouldPutNodesWithExceptionsLast() {
-        ShardRouting shard = TestShardRouting.newShardRouting("test", 0, null, null, null, false,
-                ShardRoutingState.UNASSIGNED, 0,
-                new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null));
-
-        Map<DiscoveryNode, TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> data = new HashMap<>();
-        data.put(node1, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node1, 1));
-        data.put(node2, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node2, 1));
-        data.put(node3, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node3, 1, new IOException("I failed to open")));
-        HashSet<String> ignoredNodes = new HashSet<>();
-        ignoredNodes.add(node2.id());
-        AsyncShardFetch.FetchResult<TransportNodesListGatewayStartedShards.NodeGatewayStartedShards> fetches =
-                new AsyncShardFetch.FetchResult(shardId, data, new HashSet<>(), ignoredNodes);
-
-        PrimaryShardAllocator.NodesAndVersions nAndV = testAllocator.buildNodesAndVersions(shard, false, ignoredNodes, fetches);
-        assertThat(nAndV.allocationsFound, equalTo(1));
-        assertThat(nAndV.highestVersion, equalTo(1L));
-        assertThat(nAndV.nodes, contains(node1));
-
-        nAndV = testAllocator.buildNodesAndVersions(shard, true, ignoredNodes, fetches);
-        assertThat(nAndV.allocationsFound, equalTo(2));
-        assertThat(nAndV.highestVersion, equalTo(1L));
-        // node3 should be last here
-        assertThat(nAndV.nodes.size(), equalTo(2));
-        assertThat(nAndV.nodes, contains(node1, node3));
-    }
-
-    private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders) {
+    private RoutingAllocation routingAllocationWithOnePrimaryNoReplicas(AllocationDeciders deciders, boolean asNew, Version version, String... activeAllocationIds) {
         MetaData metaData = MetaData.builder()
-                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
-                .build();
-        RoutingTable routingTable = RoutingTable.builder()
-                .addAsRecovery(metaData.index(shardId.getIndex()))
-                .build();
+                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(version))
+                    .numberOfShards(1).numberOfReplicas(0).putActiveAllocationIds(0, new HashSet<>(Arrays.asList(activeAllocationIds))))
+            .build();
+        RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
+        if (asNew) {
+            routingTableBuilder.addAsNew(metaData.index(shardId.getIndex()));
+        } else {
+            routingTableBuilder.addAsRecovery(metaData.index(shardId.getIndex()));
+        }
         ClusterState state = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT)
                 .metaData(metaData)
-                .routingTable(routingTable)
+                .routingTable(routingTableBuilder.build())
                 .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
         return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
     }
@@ -344,15 +472,15 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
             return this;
         }
 
-        public TestAllocator addData(DiscoveryNode node, long version) {
-            return addData(node, version, null);
+        public TestAllocator addData(DiscoveryNode node, long version, String allocationId) {
+            return addData(node, version, allocationId, null);
         }
 
-        public TestAllocator addData(DiscoveryNode node, long version, @Nullable Throwable storeException) {
+        public TestAllocator addData(DiscoveryNode node, long version, String allocationId, @Nullable Throwable storeException) {
             if (data == null) {
                 data = new HashMap<>();
             }
-            data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, storeException));
+            data.put(node, new TransportNodesListGatewayStartedShards.NodeGatewayStartedShards(node, version, allocationId, storeException));
             return this;
         }
 

+ 5 - 69
core/src/test/java/org/elasticsearch/gateway/QuorumGatewayIT.java

@@ -20,10 +20,10 @@
 package org.elasticsearch.gateway;
 
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.cluster.health.ClusterHealthStatus;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
 import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.discovery.zen.elect.ElectMasterService;
 import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
 import org.elasticsearch.test.ESIntegTestCase.Scope;
@@ -32,14 +32,10 @@ import org.elasticsearch.test.InternalTestCluster.RestartCallback;
 import java.util.concurrent.TimeUnit;
 
 import static org.elasticsearch.client.Requests.clusterHealthRequest;
-import static org.elasticsearch.common.settings.Settings.settingsBuilder;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
 
 /**
  *
@@ -51,72 +47,12 @@ public class QuorumGatewayIT extends ESIntegTestCase {
         return 2;
     }
 
-    public void testChangeInitialShardsRecovery() throws Exception {
-        logger.info("--> starting 3 nodes");
-        final String[] nodes = internalCluster().startNodesAsync(3).get().toArray(new String[0]);
-
-        createIndex("test");
-        ensureGreen();
-        NumShards test = getNumShards("test");
-
-        logger.info("--> indexing...");
-        client().prepareIndex("test", "type1", "1").setSource(jsonBuilder().startObject().field("field", "value1").endObject()).get();
-        //We don't check for failures in the flush response: if we do we might get the following:
-        // FlushNotAllowedEngineException[[test][1] recovery is in progress, flush [COMMIT_TRANSLOG] is not allowed]
-        flush();
-        client().prepareIndex("test", "type1", "2").setSource(jsonBuilder().startObject().field("field", "value2").endObject()).get();
-        refresh();
-
-        for (int i = 0; i < 10; i++) {
-            assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l);
-        }
-
-        final String nodeToRemove = nodes[between(0,2)];
-        logger.info("--> restarting 1 nodes -- kill 2");
-        internalCluster().fullRestart(new RestartCallback() {
-            @Override
-            public Settings onNodeStopped(String nodeName) throws Exception {
-                return Settings.EMPTY;
-            }
-
-            @Override
-            public boolean doRestart(String nodeName) {
-                return nodeToRemove.equals(nodeName);
-            }
-        });
-        if (randomBoolean()) {
-            Thread.sleep(between(1, 400)); // wait a bit and give is a chance to try to allocate
-        }
-        ClusterHealthResponse clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForNodes("1")).actionGet();
-        assertThat(clusterHealth.isTimedOut(), equalTo(false));
-        assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.RED));  // nothing allocated yet
-        assertTrue(awaitBusy(() -> {
-            ClusterStateResponse clusterStateResponse = internalCluster().smartClient().admin().cluster().prepareState().setMasterNodeTimeout("500ms").get();
-            return clusterStateResponse.getState() != null && clusterStateResponse.getState().routingTable().index("test") != null;
-        })); // wait until we get a cluster state - could be null if we quick enough.
-        final ClusterStateResponse clusterStateResponse = internalCluster().smartClient().admin().cluster().prepareState().setMasterNodeTimeout("500ms").get();
-        assertThat(clusterStateResponse.getState(), notNullValue());
-        assertThat(clusterStateResponse.getState().routingTable().index("test"), notNullValue());
-        assertThat(clusterStateResponse.getState().routingTable().index("test").allPrimaryShardsActive(), is(false));
-        logger.info("--> change the recovery.initial_shards setting, and make sure its recovered");
-        client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put("recovery.initial_shards", 1)).get();
-
-        logger.info("--> running cluster_health (wait for the shards to startup), primaries only since we only have 1 node");
-        clusterHealth = client().admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(test.numPrimaries)).actionGet();
-        logger.info("--> done cluster_health, status " + clusterHealth.getStatus());
-        assertThat(clusterHealth.isTimedOut(), equalTo(false));
-        assertThat(clusterHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
-
-        for (int i = 0; i < 10; i++) {
-            assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), 2l);
-        }
-    }
-
     public void testQuorumRecovery() throws Exception {
         logger.info("--> starting 3 nodes");
-        internalCluster().startNodesAsync(3).get();
         // we are shutting down nodes - make sure we don't have 2 clusters if we test network
-        setMinimumMasterNodes(2);
+        internalCluster().startNodesAsync(3,
+                Settings.builder().put(ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES, 2).build()).get();
+
 
         createIndex("test");
         ensureGreen();

+ 13 - 5
core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java

@@ -43,9 +43,11 @@ import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
 import org.elasticsearch.test.ESAllocationTestCase;
 import org.junit.Before;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -275,13 +277,16 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
     }
 
     private RoutingAllocation onePrimaryOnNode1And1Replica(AllocationDeciders deciders, Settings settings, UnassignedInfo.Reason reason) {
+        ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10);
         MetaData metaData = MetaData.builder()
-                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings)).numberOfShards(1).numberOfReplicas(0))
-                .build();
+                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT).put(settings))
+                    .numberOfShards(1).numberOfReplicas(1)
+                    .putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId()))))
+            .build();
         RoutingTable routingTable = RoutingTable.builder()
                 .add(IndexRoutingTable.builder(shardId.getIndex())
                                 .addIndexShard(new IndexShardRoutingTable.Builder(shardId)
-                                        .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10))
+                                        .addShard(primaryShard)
                                         .addShard(ShardRouting.newUnassigned(shardId.getIndex(), shardId.getId(), null, false, new UnassignedInfo(reason, null)))
                                         .build())
                 )
@@ -294,13 +299,16 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
     }
 
     private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
+        ShardRouting primaryShard = TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10);
         MetaData metaData = MetaData.builder()
-                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(0))
+                .put(IndexMetaData.builder(shardId.getIndex()).settings(settings(Version.CURRENT))
+                    .numberOfShards(1).numberOfReplicas(1)
+                    .putActiveAllocationIds(0, new HashSet<>(Arrays.asList(primaryShard.allocationId().getId()))))
                 .build();
         RoutingTable routingTable = RoutingTable.builder()
                 .add(IndexRoutingTable.builder(shardId.getIndex())
                                 .addIndexShard(new IndexShardRoutingTable.Builder(shardId)
-                                        .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node1.id(), true, ShardRoutingState.STARTED, 10))
+                                        .addShard(primaryShard)
                                         .addShard(TestShardRouting.newShardRouting(shardId.getIndex(), shardId.getId(), node2.id(), null, null, false, ShardRoutingState.INITIALIZING, 10, new UnassignedInfo(UnassignedInfo.Reason.CLUSTER_RECOVERED, null)))
                                         .build())
                 )

+ 5 - 3
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -133,7 +133,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
             ShardId id = new ShardId("foo", 1);
             long version = between(1, Integer.MAX_VALUE / 2);
             boolean primary = randomBoolean();
-            AllocationId allocationId = randomAllocationId();
+            AllocationId allocationId = randomBoolean() ? null : randomAllocationId();
             ShardStateMetaData state1 = new ShardStateMetaData(version, primary, "foo", allocationId);
             write(state1, env.availableShardPaths(id));
             ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(id));
@@ -288,7 +288,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
     }
 
     public void testShardStateMetaHashCodeEquals() {
-        ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), randomAllocationId());
+        AllocationId allocationId = randomBoolean() ? null : randomAllocationId();
+        ShardStateMetaData meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId);
 
         assertEquals(meta, new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID, meta.allocationId));
         assertEquals(meta.hashCode(), new ShardStateMetaData(meta.version, meta.primary, meta.indexUUID, meta.allocationId).hashCode());
@@ -299,7 +300,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         assertFalse(meta.equals(new ShardStateMetaData(meta.version, !meta.primary, meta.indexUUID + "foo", randomAllocationId())));
         Set<Integer> hashCodes = new HashSet<>();
         for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode
-            meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), randomAllocationId());
+            allocationId = randomBoolean() ? null : randomAllocationId();
+            meta = new ShardStateMetaData(randomLong(), randomBoolean(), randomRealisticUnicodeOfCodepointLengthBetween(1, 10), allocationId);
             hashCodes.add(meta.hashCode());
         }
         assertTrue("more than one unique hashcode expected but got: " + hashCodes.size(), hashCodes.size() > 1);

+ 4 - 7
core/src/test/java/org/elasticsearch/indices/state/SimpleIndexStateIT.java

@@ -97,7 +97,7 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
         client().prepareIndex("test", "type1", "1").setSource("field1", "value1").get();
     }
 
-    public void testFastCloseAfterCreateDoesNotClose() {
+    public void testFastCloseAfterCreateContinuesCreateAfterOpen() {
         logger.info("--> creating test index that cannot be allocated");
         client().admin().indices().prepareCreate("test").setSettings(Settings.settingsBuilder()
                 .put("index.routing.allocation.include.tag", "no_such_node").build()).get();
@@ -106,17 +106,14 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
         assertThat(health.isTimedOut(), equalTo(false));
         assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED));
 
-        try {
-            client().admin().indices().prepareClose("test").get();
-            fail("Exception should have been thrown");
-        } catch(IndexPrimaryShardNotAllocatedException e) {
-            // expected
-        }
+        client().admin().indices().prepareClose("test").get();
 
         logger.info("--> updating test index settings to allow allocation");
         client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.settingsBuilder()
                 .put("index.routing.allocation.include.tag", "").build()).get();
 
+        client().admin().indices().prepareOpen("test").get();
+
         logger.info("--> waiting for green status");
         ensureGreen();
 

+ 0 - 13
docs/reference/index-modules.asciidoc

@@ -129,19 +129,6 @@ specific index module:
     experimental[] Disables the purge of <<mapping-ttl-field,expired docs>> on
     the current index.
 
-[[index.recovery.initial_shards]]`index.recovery.initial_shards`::
-+
---
-A primary shard is only recovered only if there are enough nodes available to
-allocate sufficient replicas to form a quorum. It can be set to:
-
-    * `quorum` (default)
-    * `quorum-1` (or `half`)
-    * `full`
-    * `full-1`.
-    * Number values are also supported, e.g. `1`.
---
-
 
 [float]
 === Settings in other index modules

+ 2 - 3
docs/reference/indices/shadow-replicas.asciidoc

@@ -104,9 +104,8 @@ settings API:
 
 `index.shared_filesystem.recover_on_any_node`::
     Boolean value indicating whether the primary shards for the index should be
-    allowed to recover on any node in the cluster, regardless of the number of
-    replicas or whether the node has previously had the shard allocated to it
-    before. Defaults to `false`.
+    allowed to recover on any node in the cluster. If a node holding a copy of
+    the shard is found, recovery prefers that node. Defaults to `false`.
 
 === Node level settings related to shadow replicas
 

+ 5 - 3
docs/reference/indices/shard-stores.asciidoc

@@ -52,8 +52,9 @@ The shard stores information is grouped by indices and shard ids.
                     }
                 },
                 "version": 4, <4>
+                "allocation_id": "2iNySv_OQVePRX-yaRH_lQ", <5>
                 "allocation" : "primary" | "replica" | "unused", <6>
-                "store_exception": ... <5>
+                "store_exception": ... <7>
             },
             ...
         ]
@@ -66,7 +67,8 @@ The shard stores information is grouped by indices and shard ids.
 <3> The node information that hosts a copy of the store, the key
     is the unique node id.
 <4> The version of the store copy
-<5> The status of the store copy, whether it is used as a
+<5> The allocation id of the store copy
+<6> The status of the store copy, whether it is used as a
     primary, replica or not used at all
-<6> Any exception encountered while opening the shard index or
+<7> Any exception encountered while opening the shard index or
     from earlier engine failure

+ 21 - 0
docs/reference/migration/migrate_3_0.asciidoc

@@ -14,6 +14,7 @@ your application to Elasticsearch 3.0.
 * <<breaking_30_cache_concurrency>>
 * <<breaking_30_non_loopback>>
 * <<breaking_30_thread_pool>>
+* <<breaking_30_allocation>>
 
 [[breaking_30_search_changes]]
 === Search changes
@@ -515,3 +516,23 @@ from `OsStats.Cpu#getPercent`.
 Only stored fields are retrievable with this option.
 The fields option won't be able to load non stored fields from _source anymore.
 
+[[breaking_30_allocation]]
+=== Primary shard allocation
+
+Previously, primary shards were only assigned if a quorum of shard copies were found (configurable using
+`index.recovery.initial_shards`, now deprecated). In case where a primary had only a single replica, quorum was defined
+to be a single shard. This meant that any shard copy of an index with replication factor 1 could become primary, even it
+was a stale copy of the data on disk. This is now fixed by using allocation IDs.
+
+Allocation IDs assign unique identifiers to shard copies. This allows the cluster to differentiate between multiple
+copies of the same data and track which shards have been active, so that after a cluster restart, shard copies
+containing only the most recent data can become primaries.
+
+==== `index.shared_filesystem.recover_on_any_node` changes
+
+The behavior of `index.shared_filesystem.recover_on_any_node = true` has been changed. Previously, in the case where no
+shard copies could be found, an arbitrary node was chosen by potentially ignoring allocation deciders. Now, we take
+balancing into account but don't assign the shard if the allocation deciders are not satisfied. The behavior has also changed
+in the case where shard copies can be found. Previously, a node not holding the shard copy was chosen if none of the nodes
+holding shard copies were satisfying the allocation deciders. Now, the shard will be assigned to a node having a shard copy,
+even if none of the nodes holding a shard copy satisfy the allocation deciders.

+ 2 - 3
docs/reference/modules/cluster/shards_allocation.asciidoc

@@ -22,9 +22,8 @@ Enable or disable allocation for specific kinds of shards:
 
 This setting does not affect the recovery of local primary shards when
 restarting a node.  A restarted node that has a copy of an unassigned primary
-shard will recover that primary immediately, assuming that the
-<<index.recovery.initial_shards,`index.recovery.initial_shards`>> setting is
-satisfied.
+shard will recover that primary immediately, assuming that its allocation id matches
+one of the active allocation ids in the cluster state.
 
 --
 

+ 3 - 1
test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java

@@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.ClusterModule;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.EmptyClusterInfoService;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.RoutingNode;
 import org.elasticsearch.cluster.routing.RoutingNodes;
@@ -230,7 +231,8 @@ public abstract class ESAllocationTestCase extends ESTestCase {
             boolean changed = false;
             while (unassignedIterator.hasNext()) {
                 ShardRouting shard = unassignedIterator.next();
-                if (shard.primary() || shard.allocatedPostIndexCreate() == false) {
+                IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
+                if (shard.primary() || shard.allocatedPostIndexCreate(indexMetaData) == false) {
                     continue;
                 }
                 changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);