Переглянути джерело

Port Primary Terms to master #17044

Primary terms is a way to make sure that operations replicated from stale primary are rejected by shards following a newly elected primary.

Original PRs adding this to the seq# feature branch #14062 , #14651 . Unlike those PR, here we take a different approach (based on newer code in master) where the primary terms are stored in the meta data only (and not in `ShardRouting` objects).

Relates to #17038

Closes #17044
Boaz Leskes 9 роки тому
батько
коміт
fe43eef1b5
24 змінених файлів з 1060 додано та 303 видалено
  1. 14 0
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
  2. 29 35
      core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
  3. 34 17
      core/src/main/java/org/elasticsearch/cluster/ClusterState.java
  4. 132 29
      core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java
  5. 0 4
      core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java
  6. 62 28
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
  7. 1 1
      core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
  8. 2 3
      core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
  9. 17 12
      core/src/main/java/org/elasticsearch/index/IndexService.java
  10. 4 4
      core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java
  11. 84 23
      core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  12. 62 56
      core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java
  13. 10 0
      core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java
  14. 1 1
      core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java
  15. 12 6
      core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java
  16. 17 5
      core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  17. 26 20
      core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java
  18. 241 0
      core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java
  19. 0 1
      core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java
  20. 22 16
      core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java
  21. 80 0
      core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java
  22. 56 1
      core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java
  23. 153 40
      core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  24. 1 1
      test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java

+ 14 - 0
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

@@ -51,6 +51,8 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
      */
     protected ShardId shardId;
 
+    long primaryTerm;
+
     protected TimeValue timeout = DEFAULT_TIMEOUT;
     protected String index;
 
@@ -148,6 +150,16 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
         return routedBasedOnClusterVersion;
     }
 
+    /** returns the primary term active at the time the operation was performed on the primary shard */
+    public long primaryTerm() {
+        return primaryTerm;
+    }
+
+    /** marks the primary term in which the operation was performed */
+    public void primaryTerm(long term) {
+        primaryTerm = term;
+    }
+
     @Override
     public ActionRequestValidationException validate() {
         ActionRequestValidationException validationException = null;
@@ -169,6 +181,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
         timeout = TimeValue.readTimeValue(in);
         index = in.readString();
         routedBasedOnClusterVersion = in.readVLong();
+        primaryTerm = in.readVLong();
     }
 
     @Override
@@ -184,6 +197,7 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
         timeout.writeTo(out);
         out.writeString(index);
         out.writeVLong(routedBasedOnClusterVersion);
+        out.writeVLong(primaryTerm);
     }
 
     @Override

+ 29 - 35
core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -52,7 +52,6 @@ import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.ThreadContext;
-import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexService;
 import org.elasticsearch.index.engine.VersionConflictEngineException;
 import org.elasticsearch.index.shard.IndexShard;
@@ -359,32 +358,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
                     }
                 });
             } else {
-                try {
-                    failReplicaIfNeeded(t);
-                } catch (Throwable unexpected) {
-                    logger.error("{} unexpected error while failing replica", unexpected, request.shardId().id());
-                } finally {
                     responseWithFailure(t);
-                }
-            }
-        }
-
-        private void failReplicaIfNeeded(Throwable t) {
-            Index index = request.shardId().getIndex();
-            int shardId = request.shardId().id();
-            logger.trace("failure on replica [{}][{}], action [{}], request [{}]", t, index, shardId, actionName, request);
-            if (ignoreReplicaException(t) == false) {
-                IndexService indexService = indicesService.indexService(index);
-                if (indexService == null) {
-                    logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
-                    return;
-                }
-                IndexShard indexShard = indexService.getShardOrNull(shardId);
-                if (indexShard == null) {
-                    logger.debug("ignoring failed replica {}[{}] because index was already removed.", index, shardId);
-                    return;
-                }
-                indexShard.failShard(actionName + " failed on replica", t);
             }
         }
 
@@ -401,7 +375,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
         protected void doRun() throws Exception {
             setPhase(task, "replica");
             assert request.shardId() != null : "request shardId must be set";
-            try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId())) {
+            try (Releasable ignored = getIndexShardReferenceOnReplica(request.shardId(), request.primaryTerm())) {
                 shardOperationOnReplica(request);
                 if (logger.isTraceEnabled()) {
                     logger.trace("action [{}] completed on shard [{}] for request [{}]", transportReplicaAction, request.shardId(), request);
@@ -707,7 +681,6 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
             indexShardReference = getIndexShardReferenceOnPrimary(shardId);
             if (indexShardReference.isRelocated() == false) {
                 executeLocally();
-
             } else {
                 executeRemotely();
             }
@@ -716,6 +689,7 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
         private void executeLocally() throws Exception {
             // execute locally
             Tuple<Response, ReplicaRequest> primaryResponse = shardOperationOnPrimary(state.metaData(), request);
+            primaryResponse.v2().primaryTerm(indexShardReference.opPrimaryTerm());
             if (logger.isTraceEnabled()) {
                 logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version());
             }
@@ -825,17 +799,17 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
     protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         IndexShard indexShard = indexService.getShard(shardId.id());
-        return new IndexShardReferenceImpl(indexShard, true);
+        return IndexShardReferenceImpl.createOnPrimary(indexShard);
     }
 
     /**
      * returns a new reference to {@link IndexShard} on a node that the request is replicated to. The reference is closed as soon as
      * replication is completed on the node.
      */
-    protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
+    protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long primaryTerm) {
         IndexService indexService = indicesService.indexServiceSafe(shardId.getIndex());
         IndexShard indexShard = indexService.getShard(shardId.id());
-        return new IndexShardReferenceImpl(indexShard, false);
+        return IndexShardReferenceImpl.createOnReplica(indexShard, primaryTerm);
     }
 
     /**
@@ -1098,9 +1072,13 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
                                 totalShards,
                                 success.get(),
                                 failuresArray
-
                         )
                 );
+                if (logger.isTraceEnabled()) {
+                    logger.trace("finished replicating action [{}], request [{}], shardInfo [{}]", actionName, replicaRequest,
+                            finalResponse.getShardInfo());
+                }
+
                 try {
                     channel.sendResponse(finalResponse);
                 } catch (IOException responseException) {
@@ -1125,6 +1103,9 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
         boolean isRelocated();
         void failShard(String reason, @Nullable Throwable e);
         ShardRouting routingEntry();
+
+        /** returns the primary term of the current operation */
+        long opPrimaryTerm();
     }
 
     static final class IndexShardReferenceImpl implements IndexShardReference {
@@ -1132,15 +1113,23 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
         private final IndexShard indexShard;
         private final Releasable operationLock;
 
-        IndexShardReferenceImpl(IndexShard indexShard, boolean primaryAction) {
+        private IndexShardReferenceImpl(IndexShard indexShard, long primaryTerm) {
             this.indexShard = indexShard;
-            if (primaryAction) {
+            if (primaryTerm < 0) {
                 operationLock = indexShard.acquirePrimaryOperationLock();
             } else {
-                operationLock = indexShard.acquireReplicaOperationLock();
+                operationLock = indexShard.acquireReplicaOperationLock(primaryTerm);
             }
         }
 
+        static IndexShardReferenceImpl createOnPrimary(IndexShard indexShard) {
+            return new IndexShardReferenceImpl(indexShard, -1);
+        }
+
+        static IndexShardReferenceImpl createOnReplica(IndexShard indexShard, long primaryTerm) {
+            return new IndexShardReferenceImpl(indexShard, primaryTerm);
+        }
+
         @Override
         public void close() {
             operationLock.close();
@@ -1160,6 +1149,11 @@ public abstract class TransportReplicationAction<Request extends ReplicationRequ
         public ShardRouting routingEntry() {
             return indexShard.routingEntry();
         }
+
+        @Override
+        public long opPrimaryTerm() {
+            return indexShard.getPrimaryTerm();
+        }
     }
 
     protected final void processAfterWrite(boolean refresh, IndexShard indexShard, Translog.Location location) {

+ 34 - 17
core/src/main/java/org/elasticsearch/cluster/ClusterState.java

@@ -63,7 +63,7 @@ import java.util.Set;
 
 /**
  * Represents the current state of the cluster.
- *
+ * <p>
  * The cluster state object is immutable with an
  * exception of the {@link RoutingNodes} structure, which is built on demand from the {@link RoutingTable},
  * and cluster state {@link #status}, which is updated during cluster state publishing and applying
@@ -74,7 +74,7 @@ import java.util.Set;
  * the type of discovery. For example, for local discovery it is implemented by the {@link LocalDiscovery#publish}
  * method. In the Zen Discovery it is handled in the {@link PublishClusterStateAction#publish} method. The
  * publishing mechanism can be overridden by other discovery.
- *
+ * <p>
  * The cluster state implements the {@link Diffable} interface in order to support publishing of cluster state
  * differences instead of the entire state on each change. The publishing mechanism should only send differences
  * to a node if this node was present in the previous version of the cluster state. If a node is not present was
@@ -135,7 +135,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
 
     public static <T extends Custom> T lookupPrototypeSafe(String type) {
         @SuppressWarnings("unchecked")
-        T proto = (T)customPrototypes.get(type);
+        T proto = (T) customPrototypes.get(type);
         if (proto == null) {
             throw new IllegalArgumentException("No custom state prototype registered for type [" + type + "], node likely missing plugins");
         }
@@ -281,6 +281,16 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
         sb.append("state uuid: ").append(stateUUID).append("\n");
         sb.append("from_diff: ").append(wasReadFromDiff).append("\n");
         sb.append("meta data version: ").append(metaData.version()).append("\n");
+        for (IndexMetaData indexMetaData : metaData) {
+            final String TAB = "   ";
+            sb.append(TAB).append(indexMetaData.getIndex());
+            sb.append(": v[").append(indexMetaData.getVersion()).append("]\n");
+            for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
+                sb.append(TAB).append(TAB).append(shard).append(": ");
+                sb.append("p_term [").append(indexMetaData.primaryTerm(shard)).append("], ");
+                sb.append("a_ids ").append(indexMetaData.activeAllocationIds(shard)).append("\n");
+            }
+        }
         sb.append(blocks().prettyPrint());
         sb.append(nodes().prettyPrint());
         sb.append(routingTable().prettyPrint());
@@ -477,6 +487,12 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
                 }
                 builder.endArray();
 
+                builder.startObject(IndexMetaData.KEY_PRIMARY_TERMS);
+                for (int shard = 0; shard < indexMetaData.getNumberOfShards(); shard++) {
+                    builder.field(Integer.toString(shard), indexMetaData.primaryTerm(shard));
+                }
+                builder.endObject();
+
                 builder.startObject(IndexMetaData.KEY_ACTIVE_ALLOCATIONS);
                 for (IntObjectCursor<Set<String>> cursor : indexMetaData.getActiveAllocationIds()) {
                     builder.startArray(String.valueOf(cursor.key));
@@ -487,6 +503,7 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
                 }
                 builder.endObject();
 
+                // index metadata
                 builder.endObject();
             }
             builder.endObject();
@@ -683,16 +700,16 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
         }
 
         /**
-         * @param data               input bytes
-         * @param localNode          used to set the local node in the cluster state.
+         * @param data      input bytes
+         * @param localNode used to set the local node in the cluster state.
          */
         public static ClusterState fromBytes(byte[] data, DiscoveryNode localNode) throws IOException {
             return readFrom(StreamInput.wrap(data), localNode);
         }
 
         /**
-         * @param in                 input stream
-         * @param localNode          used to set the local node in the cluster state. can be null.
+         * @param in        input stream
+         * @param localNode used to set the local node in the cluster state. can be null.
          */
         public static ClusterState readFrom(StreamInput in, @Nullable DiscoveryNode localNode) throws IOException {
             return PROTO.readFrom(in, localNode);
@@ -791,17 +808,17 @@ public class ClusterState implements ToXContent, Diffable<ClusterState> {
             metaData = proto.metaData.readDiffFrom(in);
             blocks = proto.blocks.readDiffFrom(in);
             customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
-                    new DiffableUtils.DiffableValueSerializer<String, Custom>() {
-                @Override
-                public Custom read(StreamInput in, String key) throws IOException {
-                    return lookupPrototypeSafe(key).readFrom(in);
-                }
+                new DiffableUtils.DiffableValueSerializer<String, Custom>() {
+                    @Override
+                    public Custom read(StreamInput in, String key) throws IOException {
+                        return lookupPrototypeSafe(key).readFrom(in);
+                    }
 
-                @Override
-                public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
-                    return lookupPrototypeSafe(key).readDiffFrom(in);
-                }
-            });
+                    @Override
+                    public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
+                        return lookupPrototypeSafe(key).readDiffFrom(in);
+                    }
+                });
         }
 
         @Override

+ 132 - 29
core/src/main/java/org/elasticsearch/cluster/metadata/IndexMetaData.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.cluster.metadata;
 
+import com.carrotsearch.hppc.LongArrayList;
 import com.carrotsearch.hppc.cursors.IntObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
@@ -29,6 +30,8 @@ import org.elasticsearch.cluster.DiffableUtils;
 import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
+import org.elasticsearch.cluster.routing.RoutingTable;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.ParseFieldMatcher;
 import org.elasticsearch.common.collect.ImmutableOpenIntMap;
@@ -56,6 +59,7 @@ import org.joda.time.DateTimeZone;
 
 import java.io.IOException;
 import java.text.ParseException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -217,6 +221,13 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             .numberOfShards(1).numberOfReplicas(0).build();
 
     public static final String KEY_ACTIVE_ALLOCATIONS = "active_allocations";
+    static final String KEY_VERSION = "version";
+    static final String KEY_SETTINGS = "settings";
+    static final String KEY_STATE = "state";
+    static final String KEY_MAPPINGS = "mappings";
+    static final String KEY_ALIASES = "aliases";
+    public static final String KEY_PRIMARY_TERMS = "primary_terms";
+
     public static final String INDEX_STATE_FILE_PREFIX = "state-";
 
     private final int numberOfShards;
@@ -224,6 +235,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
 
     private final Index index;
     private final long version;
+    private final long[] primaryTerms;
 
     private final State state;
 
@@ -247,7 +259,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     private final Version indexUpgradedVersion;
     private final org.apache.lucene.util.Version minimumCompatibleLuceneVersion;
 
-    private IndexMetaData(Index index, long version, State state, int numberOfShards, int numberOfReplicas, Settings settings,
+    private IndexMetaData(Index index, long version, long[] primaryTerms, State state, int numberOfShards, int numberOfReplicas, Settings settings,
                           ImmutableOpenMap<String, MappingMetaData> mappings, ImmutableOpenMap<String, AliasMetaData> aliases,
                           ImmutableOpenMap<String, Custom> customs, ImmutableOpenIntMap<Set<String>> activeAllocationIds,
                           DiscoveryNodeFilters requireFilters, DiscoveryNodeFilters includeFilters, DiscoveryNodeFilters excludeFilters,
@@ -255,6 +267,8 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
 
         this.index = index;
         this.version = version;
+        this.primaryTerms = primaryTerms;
+        assert primaryTerms.length == numberOfShards;
         this.state = state;
         this.numberOfShards = numberOfShards;
         this.numberOfReplicas = numberOfReplicas;
@@ -296,6 +310,16 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         return this.version;
     }
 
+
+    /**
+     * The term of the current selected primary. This is a non-negative number incremented when
+     * a primary shard is assigned after a full cluster restart or a replica shard is promoted to a primary
+     * See {@link AllocationService#updateMetaDataWithRoutingTable(MetaData, RoutingTable, RoutingTable)}.
+     **/
+    public long primaryTerm(int shardId) {
+        return this.primaryTerms[shardId];
+    }
+
     /**
      * Return the {@link Version} on which this index has been created. This
      * information is typically useful for backward compatibility.
@@ -416,6 +440,10 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
 
         IndexMetaData that = (IndexMetaData) o;
 
+        if (version != that.version) {
+            return false;
+        }
+
         if (!aliases.equals(that.aliases)) {
             return false;
         }
@@ -434,6 +462,10 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         if (!customs.equals(that.customs)) {
             return false;
         }
+
+        if (Arrays.equals(primaryTerms, that.primaryTerms) == false) {
+            return false;
+        }
         if (!activeAllocationIds.equals(that.activeAllocationIds)) {
             return false;
         }
@@ -443,14 +475,18 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
     @Override
     public int hashCode() {
         int result = index.hashCode();
+        result = 31 * result + Long.hashCode(version);
         result = 31 * result + state.hashCode();
         result = 31 * result + aliases.hashCode();
         result = 31 * result + settings.hashCode();
         result = 31 * result + mappings.hashCode();
+        result = 31 * result + customs.hashCode();
+        result = 31 * result + Arrays.hashCode(primaryTerms);
         result = 31 * result + activeAllocationIds.hashCode();
         return result;
     }
 
+
     @Override
     public Diff<IndexMetaData> diff(IndexMetaData previousState) {
         return new IndexMetaDataDiff(previousState, this);
@@ -476,6 +512,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
 
         private final String index;
         private final long version;
+        private final long[] primaryTerms;
         private final State state;
         private final Settings settings;
         private final Diff<ImmutableOpenMap<String, MappingMetaData>> mappings;
@@ -488,11 +525,12 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             version = after.version;
             state = after.state;
             settings = after.settings;
+            primaryTerms = after.primaryTerms;
             mappings = DiffableUtils.diff(before.mappings, after.mappings, DiffableUtils.getStringKeySerializer());
             aliases = DiffableUtils.diff(before.aliases, after.aliases, DiffableUtils.getStringKeySerializer());
             customs = DiffableUtils.diff(before.customs, after.customs, DiffableUtils.getStringKeySerializer());
             activeAllocationIds = DiffableUtils.diff(before.activeAllocationIds, after.activeAllocationIds,
-                    DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
+                DiffableUtils.getVIntKeySerializer(), DiffableUtils.StringSetValueSerializer.getInstance());
         }
 
         public IndexMetaDataDiff(StreamInput in) throws IOException {
@@ -500,22 +538,23 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             version = in.readLong();
             state = State.fromId(in.readByte());
             settings = Settings.readSettingsFromStream(in);
+            primaryTerms = in.readVLongArray();
             mappings = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), MappingMetaData.PROTO);
             aliases = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(), AliasMetaData.PROTO);
             customs = DiffableUtils.readImmutableOpenMapDiff(in, DiffableUtils.getStringKeySerializer(),
-                    new DiffableUtils.DiffableValueSerializer<String, Custom>() {
-                        @Override
-                        public Custom read(StreamInput in, String key) throws IOException {
-                            return lookupPrototypeSafe(key).readFrom(in);
-                        }
+                new DiffableUtils.DiffableValueSerializer<String, Custom>() {
+                    @Override
+                    public Custom read(StreamInput in, String key) throws IOException {
+                        return lookupPrototypeSafe(key).readFrom(in);
+                    }
 
-                        @Override
-                        public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
-                            return lookupPrototypeSafe(key).readDiffFrom(in);
-                        }
-                    });
+                    @Override
+                    public Diff<Custom> readDiff(StreamInput in, String key) throws IOException {
+                        return lookupPrototypeSafe(key).readDiffFrom(in);
+                    }
+                });
             activeAllocationIds = DiffableUtils.readImmutableOpenIntMapDiff(in, DiffableUtils.getVIntKeySerializer(),
-                    DiffableUtils.StringSetValueSerializer.getInstance());
+                DiffableUtils.StringSetValueSerializer.getInstance());
         }
 
         @Override
@@ -524,6 +563,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             out.writeLong(version);
             out.writeByte(state.id);
             Settings.writeSettingsToStream(settings, out);
+            out.writeVLongArray(primaryTerms);
             mappings.writeTo(out);
             aliases.writeTo(out);
             customs.writeTo(out);
@@ -536,6 +576,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             builder.version(version);
             builder.state(state);
             builder.settings(settings);
+            builder.primaryTerms(primaryTerms);
             builder.mappings.putAll(mappings.apply(part.mappings));
             builder.aliases.putAll(aliases.apply(part.aliases));
             builder.customs.putAll(customs.apply(part.customs));
@@ -550,6 +591,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         builder.version(in.readLong());
         builder.state(State.fromId(in.readByte()));
         builder.settings(readSettingsFromStream(in));
+        builder.primaryTerms(in.readVLongArray());
         int mappingsSize = in.readVInt();
         for (int i = 0; i < mappingsSize; i++) {
             MappingMetaData mappingMd = MappingMetaData.PROTO.readFrom(in);
@@ -581,6 +623,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         out.writeLong(version);
         out.writeByte(state.id());
         writeSettingsToStream(settings, out);
+        out.writeVLongArray(primaryTerms);
         out.writeVInt(mappings.size());
         for (ObjectCursor<MappingMetaData> cursor : mappings.values()) {
             cursor.value.writeTo(out);
@@ -614,6 +657,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         private String index;
         private State state = State.OPEN;
         private long version = 1;
+        private long[] primaryTerms = null;
         private Settings settings = Settings.Builder.EMPTY_SETTINGS;
         private final ImmutableOpenMap.Builder<String, MappingMetaData> mappings;
         private final ImmutableOpenMap.Builder<String, AliasMetaData> aliases;
@@ -633,6 +677,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             this.state = indexMetaData.state;
             this.version = indexMetaData.version;
             this.settings = indexMetaData.getSettings();
+            this.primaryTerms = indexMetaData.primaryTerms.clone();
             this.mappings = ImmutableOpenMap.builder(indexMetaData.mappings);
             this.aliases = ImmutableOpenMap.builder(indexMetaData.aliases);
             this.customs = ImmutableOpenMap.builder(indexMetaData.customs);
@@ -672,8 +717,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
         }
 
         public Builder settings(Settings.Builder settings) {
-            this.settings = settings.build();
-            return this;
+            return settings(settings.build());
         }
 
         public Builder settings(Settings settings) {
@@ -741,6 +785,42 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
             return this;
         }
 
+        /**
+         * returns the primary term for the given shard.
+         * See {@link IndexMetaData#primaryTerm(int)} for more information.
+         */
+        public long primaryTerm(int shardId) {
+            if (primaryTerms == null) {
+                initializePrimaryTerms();
+            }
+            return this.primaryTerms[shardId];
+        }
+
+        /**
+         * sets the primary term for the given shard.
+         * See {@link IndexMetaData#primaryTerm(int)} for more information.
+         */
+        public Builder primaryTerm(int shardId, long primaryTerm) {
+            if (primaryTerms == null) {
+                initializePrimaryTerms();
+            }
+            this.primaryTerms[shardId] = primaryTerm;
+            return this;
+        }
+
+        private void primaryTerms(long[] primaryTerms) {
+            this.primaryTerms = primaryTerms.clone();
+        }
+
+        private void initializePrimaryTerms() {
+            assert primaryTerms == null;
+            if (numberOfShards() < 0) {
+                throw new IllegalStateException("you must set the number of shards before setting/reading primary terms");
+            }
+            primaryTerms = new long[numberOfShards()];
+        }
+
+
         public IndexMetaData build() {
             ImmutableOpenMap.Builder<String, AliasMetaData> tmpAliases = aliases;
             Settings tmpSettings = settings;
@@ -815,27 +895,34 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                 minimumCompatibleLuceneVersion = null;
             }
 
+            if (primaryTerms == null) {
+                initializePrimaryTerms();
+            } else if (primaryTerms.length != numberOfShards) {
+                throw new IllegalStateException("primaryTerms length is [" + primaryTerms.length
+                    + "] but should be equal to number of shards [" + numberOfShards() + "]");
+            }
+
             final String uuid = settings.get(SETTING_INDEX_UUID, INDEX_UUID_NA_VALUE);
-            return new IndexMetaData(new Index(index, uuid), version, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
-                    tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters,
-                    indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion);
+            return new IndexMetaData(new Index(index, uuid), version, primaryTerms, state, numberOfShards, numberOfReplicas, tmpSettings, mappings.build(),
+                tmpAliases.build(), customs.build(), filledActiveAllocationIds.build(), requireFilters, includeFilters, excludeFilters,
+                indexCreatedVersion, indexUpgradedVersion, minimumCompatibleLuceneVersion);
         }
 
         public static void toXContent(IndexMetaData indexMetaData, XContentBuilder builder, ToXContent.Params params) throws IOException {
             builder.startObject(indexMetaData.getIndex().getName(), XContentBuilder.FieldCaseConversion.NONE);
 
-            builder.field("version", indexMetaData.getVersion());
-            builder.field("state", indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));
+            builder.field(KEY_VERSION, indexMetaData.getVersion());
+            builder.field(KEY_STATE, indexMetaData.getState().toString().toLowerCase(Locale.ENGLISH));
 
             boolean binary = params.paramAsBoolean("binary", false);
 
-            builder.startObject("settings");
+            builder.startObject(KEY_SETTINGS);
             for (Map.Entry<String, String> entry : indexMetaData.getSettings().getAsMap().entrySet()) {
                 builder.field(entry.getKey(), entry.getValue());
             }
             builder.endObject();
 
-            builder.startArray("mappings");
+            builder.startArray(KEY_MAPPINGS);
             for (ObjectObjectCursor<String, MappingMetaData> cursor : indexMetaData.getMappings()) {
                 if (binary) {
                     builder.value(cursor.value.source().compressed());
@@ -855,12 +942,18 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                 builder.endObject();
             }
 
-            builder.startObject("aliases");
+            builder.startObject(KEY_ALIASES);
             for (ObjectCursor<AliasMetaData> cursor : indexMetaData.getAliases().values()) {
                 AliasMetaData.Builder.toXContent(cursor.value, builder, params);
             }
             builder.endObject();
 
+            builder.startArray(KEY_PRIMARY_TERMS);
+            for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
+                builder.value(indexMetaData.primaryTerm(i));
+            }
+            builder.endArray();
+
             builder.startObject(KEY_ACTIVE_ALLOCATIONS);
             for (IntObjectCursor<Set<String>> cursor : indexMetaData.activeAllocationIds) {
                 builder.startArray(String.valueOf(cursor.key));
@@ -895,9 +988,9 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                 if (token == XContentParser.Token.FIELD_NAME) {
                     currentFieldName = parser.currentName();
                 } else if (token == XContentParser.Token.START_OBJECT) {
-                    if ("settings".equals(currentFieldName)) {
+                    if (KEY_SETTINGS.equals(currentFieldName)) {
                         builder.settings(Settings.settingsBuilder().put(SettingsLoader.Helper.loadNestedFromMap(parser.mapOrdered())));
-                    } else if ("mappings".equals(currentFieldName)) {
+                    } else if (KEY_MAPPINGS.equals(currentFieldName)) {
                         while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
                             if (token == XContentParser.Token.FIELD_NAME) {
                                 currentFieldName = parser.currentName();
@@ -909,7 +1002,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                                 throw new IllegalArgumentException("Unexpected token: " + token);
                             }
                         }
-                    } else if ("aliases".equals(currentFieldName)) {
+                    } else if (KEY_ALIASES.equals(currentFieldName)) {
                         while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
                             builder.putAlias(AliasMetaData.Builder.fromXContent(parser));
                         }
@@ -949,7 +1042,7 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                         }
                     }
                 } else if (token == XContentParser.Token.START_ARRAY) {
-                    if ("mappings".equals(currentFieldName)) {
+                    if (KEY_MAPPINGS.equals(currentFieldName)) {
                         while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
                             if (token == XContentParser.Token.VALUE_EMBEDDED_OBJECT) {
                                 builder.putMapping(new MappingMetaData(new CompressedXContent(parser.binaryValue())));
@@ -961,13 +1054,23 @@ public class IndexMetaData implements Diffable<IndexMetaData>, FromXContentBuild
                                 }
                             }
                         }
+                    } else if (KEY_PRIMARY_TERMS.equals(currentFieldName)) {
+                        LongArrayList list = new LongArrayList();
+                        while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
+                            if (token == XContentParser.Token.VALUE_NUMBER) {
+                                list.add(parser.longValue());
+                            } else {
+                                throw new IllegalStateException("found a non-numeric value under [" + KEY_PRIMARY_TERMS + "]");
+                            }
+                        }
+                        builder.primaryTerms(list.toArray());
                     } else {
                         throw new IllegalArgumentException("Unexpected field for an array " + currentFieldName);
                     }
                 } else if (token.isValue()) {
-                    if ("state".equals(currentFieldName)) {
+                    if (KEY_STATE.equals(currentFieldName)) {
                         builder.state(State.fromString(parser.text()));
-                    } else if ("version".equals(currentFieldName)) {
+                    } else if (KEY_VERSION.equals(currentFieldName)) {
                         builder.version(parser.longValue());
                     } else {
                         throw new IllegalArgumentException("Unexpected field [" + currentFieldName + "]");

+ 0 - 4
core/src/main/java/org/elasticsearch/cluster/routing/RoutingTable.java

@@ -586,10 +586,6 @@ public class RoutingTable implements Iterable<IndexRoutingTable>, Diffable<Routi
             if (indicesRouting == null) {
                 throw new IllegalStateException("once build is called the builder cannot be reused");
             }
-            // normalize the versions right before we build it...
-            for (ObjectCursor<IndexRoutingTable> indexRoutingTable : indicesRouting.values()) {
-                indicesRouting.put(indexRoutingTable.value.getIndex().getName(), indexRoutingTable.value);
-            }
             RoutingTable table = new RoutingTable(version, indicesRouting.build());
             indicesRouting = null;
             return table;

+ 62 - 28
core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java

@@ -42,6 +42,7 @@ import org.elasticsearch.common.component.AbstractComponent;
 import org.elasticsearch.common.inject.Inject;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.gateway.GatewayAllocator;
+import org.elasticsearch.index.shard.ShardId;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -98,7 +99,7 @@ public class AllocationService extends AbstractComponent {
         if (withReroute) {
             reroute(allocation);
         }
-        final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+        final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
 
         String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
         logClusterHealthStateChange(
@@ -107,37 +108,44 @@ public class AllocationService extends AbstractComponent {
                 "shards started [" + startedShardsAsString + "] ..."
         );
         return result;
-    }
 
+    }
 
-    protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes) {
-        return buildChangedResult(metaData, routingNodes, new RoutingExplanations());
+    protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) {
+        return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations());
 
     }
-    protected RoutingAllocation.Result buildChangedResult(MetaData metaData, RoutingNodes routingNodes, RoutingExplanations explanations) {
-        final RoutingTable routingTable = new RoutingTable.Builder().updateNodes(routingNodes).build();
-        MetaData newMetaData = updateMetaDataWithRoutingTable(metaData,routingTable);
-        return new RoutingAllocation.Result(true, routingTable.validateRaiseException(newMetaData), newMetaData, explanations);
+
+    protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes,
+                                                          RoutingExplanations explanations) {
+        final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build();
+        MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable);
+        return new RoutingAllocation.Result(true, newRoutingTable.validateRaiseException(newMetaData), newMetaData, explanations);
     }
 
     /**
-     * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}.
+     * Updates the current {@link MetaData} based on the newly created {@link RoutingTable}. Specifically
+     * we update {@link IndexMetaData#getActiveAllocationIds()} and {@link IndexMetaData#primaryTerm(int)} based on
+     * the changes made during this allocation.
      *
-     * @param currentMetaData {@link MetaData} object from before the routing table was changed.
+     * @param oldMetaData     {@link MetaData} object from before the routing table was changed.
+     * @param oldRoutingTable {@link RoutingTable} from before the  change.
      * @param newRoutingTable new {@link RoutingTable} created by the allocation change
      * @return adapted {@link MetaData}, potentially the original one if no change was needed.
      */
-    static MetaData updateMetaDataWithRoutingTable(MetaData currentMetaData, RoutingTable newRoutingTable) {
-        // make sure index meta data and routing tables are in sync w.r.t active allocation ids
+    static MetaData updateMetaDataWithRoutingTable(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingTable newRoutingTable) {
         MetaData.Builder metaDataBuilder = null;
-        for (IndexRoutingTable indexRoutingTable : newRoutingTable) {
-            final IndexMetaData indexMetaData = currentMetaData.index(indexRoutingTable.getIndex());
-            if (indexMetaData == null) {
-                throw new IllegalStateException("no metadata found for index " + indexRoutingTable.getIndex().getName());
+        for (IndexRoutingTable newIndexTable : newRoutingTable) {
+            final IndexMetaData oldIndexMetaData = oldMetaData.index(newIndexTable.getIndex());
+            if (oldIndexMetaData == null) {
+                throw new IllegalStateException("no metadata found for index " + newIndexTable.getIndex().getName());
             }
             IndexMetaData.Builder indexMetaDataBuilder = null;
-            for (IndexShardRoutingTable shardRoutings : indexRoutingTable) {
-                Set<String> activeAllocationIds = shardRoutings.activeShards().stream()
+            for (IndexShardRoutingTable newShardTable : newIndexTable) {
+                final ShardId shardId = newShardTable.shardId();
+
+                // update activeAllocationIds
+                Set<String> activeAllocationIds = newShardTable.activeShards().stream()
                         .map(ShardRouting::allocationId)
                         .filter(Objects::nonNull)
                         .map(AllocationId::getId)
@@ -145,19 +153,44 @@ public class AllocationService extends AbstractComponent {
                 // only update active allocation ids if there is an active shard
                 if (activeAllocationIds.isEmpty() == false) {
                     // get currently stored allocation ids
-                    Set<String> storedAllocationIds = indexMetaData.activeAllocationIds(shardRoutings.shardId().id());
+                    Set<String> storedAllocationIds = oldIndexMetaData.activeAllocationIds(shardId.id());
                     if (activeAllocationIds.equals(storedAllocationIds) == false) {
                         if (indexMetaDataBuilder == null) {
-                            indexMetaDataBuilder = IndexMetaData.builder(indexMetaData);
+                            indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
                         }
+                        indexMetaDataBuilder.putActiveAllocationIds(shardId.id(), activeAllocationIds);
+                    }
+                }
 
-                        indexMetaDataBuilder.putActiveAllocationIds(shardRoutings.shardId().id(), activeAllocationIds);
+                // update primary terms
+                final ShardRouting newPrimary = newShardTable.primaryShard();
+                if (newPrimary == null) {
+                    throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
+                }
+                final ShardRouting oldPrimary = oldRoutingTable.shardRoutingTable(shardId).primaryShard();
+                if (oldPrimary == null) {
+                    throw new IllegalStateException("missing primary shard for " + newShardTable.shardId());
+                }
+                // we update the primary term on initial assignment or when a replica is promoted. Most notably we do *not*
+                // update them when a primary relocates
+                if (newPrimary.unassigned() ||
+                        newPrimary.isSameAllocation(oldPrimary) ||
+                        // we do not use newPrimary.isTargetRelocationOf(oldPrimary) because that one enforces newPrimary to
+                        // be initializing. However, when the target shard is activated, we still want the primary term to staty
+                        // the same
+                        (oldPrimary.relocating() && newPrimary.isSameAllocation(oldPrimary.buildTargetRelocatingShard()))) {
+                    // do nothing
+                } else {
+                    // incrementing the primary term
+                    if (indexMetaDataBuilder == null) {
+                        indexMetaDataBuilder = IndexMetaData.builder(oldIndexMetaData);
                     }
+                    indexMetaDataBuilder.primaryTerm(shardId.id(), oldIndexMetaData.primaryTerm(shardId.id()) + 1);
                 }
             }
             if (indexMetaDataBuilder != null) {
                 if (metaDataBuilder == null) {
-                    metaDataBuilder = MetaData.builder(currentMetaData);
+                    metaDataBuilder = MetaData.builder(oldMetaData);
                 }
                 metaDataBuilder.put(indexMetaDataBuilder);
             }
@@ -165,7 +198,7 @@ public class AllocationService extends AbstractComponent {
         if (metaDataBuilder != null) {
             return metaDataBuilder.build();
         } else {
-            return currentMetaData;
+            return oldMetaData;
         }
     }
 
@@ -196,7 +229,7 @@ public class AllocationService extends AbstractComponent {
         }
         gatewayAllocator.applyFailedShards(allocation);
         reroute(allocation);
-        final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+        final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
         String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
         logClusterHealthStateChange(
                 new ClusterStateHealth(clusterState),
@@ -243,7 +276,7 @@ public class AllocationService extends AbstractComponent {
         // the assumption is that commands will move / act on shards (or fail through exceptions)
         // so, there will always be shard "movements", so no need to check on reroute
         reroute(allocation);
-        RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes, explanations);
+        RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations);
         logClusterHealthStateChange(
                 new ClusterStateHealth(clusterState),
                 new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
@@ -252,6 +285,7 @@ public class AllocationService extends AbstractComponent {
         return result;
     }
 
+
     /**
      * Reroutes the routing table based on the live nodes.
      * <p>
@@ -275,7 +309,7 @@ public class AllocationService extends AbstractComponent {
         if (!reroute(allocation)) {
             return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
         }
-        RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), routingNodes);
+        RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
         logClusterHealthStateChange(
                 new ClusterStateHealth(clusterState),
                 new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
@@ -412,8 +446,8 @@ public class AllocationService extends AbstractComponent {
         boolean changed = false;
         for (ShardRouting routing : replicas) {
             changed |= applyFailedShard(allocation, routing, false,
-                new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
-                    null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
+                    new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
+                            null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
         }
         return changed;
     }

+ 1 - 1
core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java

@@ -44,7 +44,7 @@ import static java.util.Collections.unmodifiableSet;
 public class RoutingAllocation {
 
     /**
-     * this class is used to describe results of a {@link RoutingAllocation}  
+     * this class is used to describe results of a {@link RoutingAllocation}
      */
     public static class Result {
 

+ 2 - 3
core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java

@@ -685,9 +685,8 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
             warnAboutSlowTaskIfNeeded(executionTime, source);
         } catch (Throwable t) {
             TimeValue executionTime = TimeValue.timeValueMillis(Math.max(0, TimeValue.nsecToMSec(currentTimeInNanos() - startTimeNS)));
-            logger.warn("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}{}{}", t, executionTime,
-                    newClusterState.version(), newClusterState.stateUUID(), source, newClusterState.nodes().prettyPrint(),
-                    newClusterState.routingTable().prettyPrint(), newClusterState.getRoutingNodes().prettyPrint());
+            logger.warn("failed to apply updated cluster state in [{}]:\nversion [{}], uuid [{}], source [{}]\n{}", t, executionTime,
+                    newClusterState.version(), newClusterState.stateUUID(), source, newClusterState.prettyPrint());
             // TODO: do we want to call updateTask.onFailure here?
         }
 

+ 17 - 12
core/src/main/java/org/elasticsearch/index/IndexService.java

@@ -19,18 +19,6 @@
 
 package org.elasticsearch.index;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import org.apache.lucene.search.BooleanClause;
 import org.apache.lucene.search.BooleanQuery;
 import org.apache.lucene.search.Query;
@@ -82,6 +70,18 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
 import org.elasticsearch.indices.mapper.MapperRegistry;
 import org.elasticsearch.threadpool.ThreadPool;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.unmodifiableMap;
 import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
@@ -621,6 +621,11 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
                 rescheduleFsyncTask(durability);
             }
         }
+
+        // update primary terms
+        for (final IndexShard shard : this.shards.values()) {
+            shard.updatePrimaryTerm(metadata.primaryTerm(shard.shardId().id()));
+        }
     }
 
     private void rescheduleFsyncTask(Translog.Durability durability) {

+ 4 - 4
core/src/main/java/org/elasticsearch/index/shard/IllegalIndexShardStateException.java

@@ -33,12 +33,12 @@ public class IllegalIndexShardStateException extends ElasticsearchException {
 
     private final IndexShardState currentState;
 
-    public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg) {
-        this(shardId, currentState, msg, null);
+    public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Object... args) {
+        this(shardId, currentState, msg, null, args);
     }
 
-    public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Throwable ex) {
-        super("CurrentState[" + currentState + "] " + msg, ex);
+    public IllegalIndexShardStateException(ShardId shardId, IndexShardState currentState, String msg, Throwable ex, Object... args) {
+        super("CurrentState[" + currentState + "] " + msg, ex, args);
         setShard(shardId);
         this.currentState = currentState;
     }

+ 84 - 23
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -41,6 +41,7 @@ import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.io.stream.BytesStreamOutput;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
+import org.elasticsearch.common.logging.LoggerMessageFormat;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.metrics.MeanMetric;
 import org.elasticsearch.common.settings.Settings;
@@ -144,13 +145,16 @@ public class IndexShard extends AbstractIndexShardComponent {
     private final TranslogConfig translogConfig;
     private final IndexEventListener indexEventListener;
 
-    /** How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh.  IndexingMemoryController polls this
-     *  across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
-     *  being indexed/deleted. */
+    /**
+     * How many bytes we are currently moving to disk, via either IndexWriter.flush or refresh.  IndexingMemoryController polls this
+     * across all shards to decide if throttling is necessary because moving bytes to disk is falling behind vs incoming documents
+     * being indexed/deleted.
+     */
     private final AtomicLong writingBytes = new AtomicLong();
 
     protected volatile ShardRouting shardRouting;
     protected volatile IndexShardState state;
+    protected volatile long primaryTerm;
     protected final AtomicReference<Engine> currentEngineReference = new AtomicReference<>();
     protected final EngineFactory engineFactory;
 
@@ -236,13 +240,16 @@ public class IndexShard extends AbstractIndexShardComponent {
         this.engineConfig = newEngineConfig(translogConfig, cachingPolicy);
         this.suspendableRefContainer = new SuspendableRefContainer();
         this.searcherWrapper = indexSearcherWrapper;
+        this.primaryTerm = indexSettings.getIndexMetaData().primaryTerm(shardId.id());
     }
 
     public Store store() {
         return this.store;
     }
 
-    /** returns true if this shard supports indexing (i.e., write) operations. */
+    /**
+     * returns true if this shard supports indexing (i.e., write) operations.
+     */
     public boolean canIndex() {
         return true;
     }
@@ -279,6 +286,30 @@ public class IndexShard extends AbstractIndexShardComponent {
         return this.shardFieldData;
     }
 
+
+    /**
+     * Returns the primary term the index shard is on. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
+     */
+    public long getPrimaryTerm() {
+        return this.primaryTerm;
+    }
+
+    /**
+     * notifies the shard of an increase in the primary term
+     */
+    public void updatePrimaryTerm(final long newTerm) {
+        synchronized (mutex) {
+            if (newTerm != primaryTerm) {
+                assert shardRouting.primary() == false : "a primary shard should never update it's term. shard: " + shardRouting
+                    + " current term [" + primaryTerm + "] new term [" + newTerm + "]";
+                assert newTerm > primaryTerm : "primary terms can only go up. current [" + primaryTerm + "], new [" + newTerm + "]";
+                primaryTerm = newTerm;
+            }
+        }
+
+
+    }
+
     /**
      * Returns the latest cluster routing entry received with this shard. Might be null if the
      * shard was just created.
@@ -297,12 +328,12 @@ public class IndexShard extends AbstractIndexShardComponent {
      * unless explicitly disabled.
      *
      * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted
-     * @throws IOException if shard state could not be persisted
+     * @throws IOException                  if shard state could not be persisted
      */
     public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) throws IOException {
         final ShardRouting currentRouting = this.shardRouting;
         if (!newRouting.shardId().equals(shardId())) {
-            throw new IllegalArgumentException("Trying to set a routing entry with shardId [" + newRouting.shardId() + "] on a shard with shardId [" + shardId() + "]");
+            throw new IllegalArgumentException("Trying to set a routing entry with shardId " + newRouting.shardId() + " on a shard with shardId " + shardId() + "");
         }
         if ((currentRouting == null || newRouting.isSameAllocation(currentRouting)) == false) {
             throw new IllegalArgumentException("Trying to set a routing entry with a different allocation. Current " + currentRouting + ", new " + newRouting);
@@ -419,9 +450,7 @@ public class IndexShard extends AbstractIndexShardComponent {
 
     public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType) {
         try {
-            if (shardRouting.primary() == false) {
-                throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
-            }
+            verifyPrimary();
             return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY);
         } catch (Throwable t) {
             verifyNotClosed(t);
@@ -431,6 +460,7 @@ public class IndexShard extends AbstractIndexShardComponent {
 
     public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType) {
         try {
+            verifyReplicationTarget();
             return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA);
         } catch (Throwable t) {
             verifyNotClosed(t);
@@ -474,9 +504,7 @@ public class IndexShard extends AbstractIndexShardComponent {
     }
 
     public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version, VersionType versionType) {
-        if (shardRouting.primary() == false) {
-            throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
-        }
+        verifyPrimary();
         final DocumentMapper documentMapper = docMapper(type).getDocumentMapper();
         return prepareDelete(type, id, documentMapper.uidMapper().term(Uid.createUid(type, id)), version, versionType, Engine.Operation.Origin.PRIMARY);
     }
@@ -515,7 +543,9 @@ public class IndexShard extends AbstractIndexShardComponent {
         return getEngine().get(get, this::acquireSearcher);
     }
 
-    /** Writes all indexing changes to disk and opens a new searcher reflecting all changes.  This can throw {@link EngineClosedException}. */
+    /**
+     * Writes all indexing changes to disk and opens a new searcher reflecting all changes.  This can throw {@link EngineClosedException}.
+     */
     public void refresh(String source) {
         verifyNotClosed();
         if (canIndex()) {
@@ -538,7 +568,9 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
-    /** Returns how many bytes we are currently moving from heap to disk */
+    /**
+     * Returns how many bytes we are currently moving from heap to disk
+     */
     public long getWritingBytes() {
         return writingBytes.get();
     }
@@ -940,6 +972,22 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
+    private void verifyPrimary() {
+        if (shardRouting.primary() == false) {
+            // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+            throw new IllegalStateException("shard is not a primary " + shardRouting);
+        }
+    }
+
+    private void verifyReplicationTarget() {
+        final IndexShardState state = state();
+        if (shardRouting.primary() && shardRouting.active() && state != IndexShardState.RELOCATED) {
+            // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+            throw new IllegalStateException("active primary shard cannot be a replication target before " +
+                " relocation hand off " + shardRouting + ", state is [" + state + "]");
+        }
+    }
+
     protected final void verifyStartedOrRecovering() throws IllegalIndexShardStateException {
         IndexShardState state = this.state; // one time volatile read
         if (state != IndexShardState.STARTED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) {
@@ -969,7 +1017,9 @@ public class IndexShard extends AbstractIndexShardComponent {
         }
     }
 
-    /** Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed */
+    /**
+     * Returns number of heap bytes used by the indexing buffer for this shard, or 0 if the shard is closed
+     */
     public long getIndexBufferRAMBytesUsed() {
         Engine engine = getEngineOrNull();
         if (engine == null) {
@@ -986,8 +1036,10 @@ public class IndexShard extends AbstractIndexShardComponent {
         this.shardEventListener.delegates.add(onShardFailure);
     }
 
-    /** Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
-     *  indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen. */
+    /**
+     * Called by {@link IndexingMemoryController} to check whether more than {@code inactiveTimeNS} has passed since the last
+     * indexing operation, and notify listeners that we are now inactive so e.g. sync'd flush can happen.
+     */
     public void checkIdle(long inactiveTimeNS) {
         Engine engineOrNull = getEngineOrNull();
         if (engineOrNull != null && System.nanoTime() - engineOrNull.getLastWriteNanos() >= inactiveTimeNS) {
@@ -1132,11 +1184,12 @@ public class IndexShard extends AbstractIndexShardComponent {
             }
         } catch (Exception e) {
             handleRefreshException(e);
-        };
+        }
     }
 
     /**
      * Should be called for each no-op update operation to increment relevant statistics.
+     *
      * @param type the doc type of the update
      */
     public void noopUpdate(String type) {
@@ -1336,14 +1389,22 @@ public class IndexShard extends AbstractIndexShardComponent {
 
     public Releasable acquirePrimaryOperationLock() {
         verifyNotClosed();
-        if (shardRouting.primary() == false) {
-            throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary");
-        }
+        verifyPrimary();
         return suspendableRefContainer.acquireUninterruptibly();
     }
 
-    public Releasable acquireReplicaOperationLock() {
+    /**
+     * acquires operation log. If the given primary term is lower then the one in {@link #shardRouting}
+     * an {@link IllegalArgumentException} is thrown.
+     */
+    public Releasable acquireReplicaOperationLock(long opPrimaryTerm) {
         verifyNotClosed();
+        verifyReplicationTarget();
+        if (primaryTerm > opPrimaryTerm) {
+            // must use exception that is not ignored by replication logic. See TransportActions.isShardNotAvailableException
+            throw new IllegalArgumentException(LoggerMessageFormat.format("{} operation term [{}] is too old (current [{}])",
+                shardId, opPrimaryTerm, primaryTerm));
+        }
         return suspendableRefContainer.acquireUninterruptibly();
     }
 
@@ -1447,7 +1508,7 @@ public class IndexShard extends AbstractIndexShardComponent {
      * Returns <code>true</code> iff one or more changes to the engine are not visible to via the current searcher.
      * Otherwise <code>false</code>.
      *
-     * @throws EngineClosedException if the engine is already closed
+     * @throws EngineClosedException  if the engine is already closed
      * @throws AlreadyClosedException if the internal indexwriter in the engine is already closed
      */
     public boolean isRefreshNeeded() {

+ 62 - 56
core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java

@@ -19,7 +19,6 @@
 
 package org.elasticsearch.indices.cluster;
 
-import com.carrotsearch.hppc.IntHashSet;
 import com.carrotsearch.hppc.cursors.ObjectCursor;
 import org.elasticsearch.cluster.ClusterChangedEvent;
 import org.elasticsearch.cluster.ClusterState;
@@ -71,9 +70,11 @@ import org.elasticsearch.snapshots.RestoreService;
 import org.elasticsearch.threadpool.ThreadPool;
 
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 /**
@@ -90,7 +91,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
     private final NodeMappingRefreshAction nodeMappingRefreshAction;
     private final NodeServicesProvider nodeServicesProvider;
 
-    private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {};
+    private static final ShardStateAction.Listener SHARD_STATE_ACTION_LISTENER = new ShardStateAction.Listener() {
+    };
 
     // a list of shards that failed during recovery
     // we keep track of these shards in order to prevent repeated recovery of these shards on each cluster state update
@@ -174,13 +176,18 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
 
             cleanFailedShards(event);
 
+            // cleaning up indices that are completely deleted so we won't need to worry about them
+            // when checking for shards
             applyDeletedIndices(event);
+            applyDeletedShards(event);
+            // call after deleted shards so indices with no shards will be cleaned
+            applyCleanedIndices(event);
+            // make sure that newly created shards use the latest meta data
+            applyIndexMetaData(event);
             applyNewIndices(event);
+            // apply mappings also updates new indices. TODO: make new indices good to begin with
             applyMappings(event);
             applyNewOrUpdatedShards(event);
-            applyDeletedShards(event);
-            applyCleanedIndices(event);
-            applySettings(event);
         }
     }
 
@@ -201,9 +208,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
                 }
             }
         }
+
+        Set<Index> hasAllocations = new HashSet<>();
+        for (ShardRouting routing : event.state().getRoutingNodes().node(event.state().nodes().localNodeId())) {
+            hasAllocations.add(routing.index());
+        }
         for (IndexService indexService : indicesService) {
             Index index = indexService.index();
-            if (indexService.shardIds().isEmpty()) {
+            if (hasAllocations.contains(index) == false) {
+                assert indexService.shardIds().isEmpty() :
+                    "no locally assigned shards, but index wasn't emptied by applyDeletedShards."
+                        + " index " + index + ", shards: " + indexService.shardIds();
                 if (logger.isDebugEnabled()) {
                     logger.debug("{} cleaning index (no shards allocated)", index);
                 }
@@ -218,16 +233,6 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
         final String localNodeId = event.state().nodes().localNodeId();
         assert localNodeId != null;
 
-        for (IndexService indexService : indicesService) {
-            IndexMetaData indexMetaData = event.state().metaData().index(indexService.index().getName());
-            if (indexMetaData != null) {
-                if (!indexMetaData.isSameUUID(indexService.indexUUID())) {
-                    logger.debug("[{}] mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated", indexMetaData.getIndex());
-                    deleteIndex(indexMetaData.getIndex(), "mismatch on index UUIDs between cluster state and local state, cleaning the index so it will be recreated");
-                }
-            }
-        }
-
         for (Index index : event.indicesDeleted()) {
             if (logger.isDebugEnabled()) {
                 logger.debug("[{}] cleaning index, no longer part of the metadata", index);
@@ -249,7 +254,17 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
             }
         }
 
-
+        for (IndexService indexService : indicesService) {
+            IndexMetaData indexMetaData = event.state().metaData().index(indexService.index());
+            if (indexMetaData == null) {
+                assert false : "index" + indexService.index() + " exists locally, doesn't have a metadata but is not part "
+                    + " of the delete index list. \nprevious state: " + event.previousState().prettyPrint()
+                    + "\n current state:\n" + event.state().prettyPrint();
+                logger.warn("[{}] isn't part of metadata but is part of in memory structures. removing",
+                    indexService.index());
+                deleteIndex(indexService.index(), "isn't part of metadata (explicit check)");
+            }
+        }
     }
 
     private void applyDeletedShards(final ClusterChangedEvent event) {
@@ -257,34 +272,33 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
         if (routingNode == null) {
             return;
         }
-        IntHashSet newShardIds = new IntHashSet();
+        Set<String> newShardAllocationIds = new HashSet<>();
         for (IndexService indexService : indicesService) {
             Index index = indexService.index();
-            IndexMetaData indexMetaData = event.state().metaData().getIndexSafe(index);
-            if (indexMetaData == null) {
-                continue;
-            }
+            IndexMetaData indexMetaData = event.state().metaData().index(index);
+            assert indexMetaData != null : "local index doesn't have metadata, should have been cleaned up by applyDeletedIndices: " + index;
             // now, go over and delete shards that needs to get deleted
-            newShardIds.clear();
+            newShardAllocationIds.clear();
             for (ShardRouting shard : routingNode) {
                 if (shard.index().equals(index)) {
-                    newShardIds.add(shard.id());
+                    // use the allocation id and not object so we won't be influence by relocation targets
+                    newShardAllocationIds.add(shard.allocationId().getId());
                 }
             }
-            for (Integer existingShardId : indexService.shardIds()) {
-                if (!newShardIds.contains(existingShardId)) {
+            for (IndexShard existingShard : indexService) {
+                if (newShardAllocationIds.contains(existingShard.routingEntry().allocationId().getId()) == false) {
                     if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
                         if (logger.isDebugEnabled()) {
-                            logger.debug("{}[{}] removing shard (index is closed)", index, existingShardId);
+                            logger.debug("{} removing shard (index is closed)", existingShard.shardId());
                         }
-                        indexService.removeShard(existingShardId, "removing shard (index is closed)");
+                        indexService.removeShard(existingShard.shardId().id(), "removing shard (index is closed)");
                     } else {
                         // we can just remove the shard, without cleaning it locally, since we will clean it
                         // when all shards are allocated in the IndicesStore
                         if (logger.isDebugEnabled()) {
-                            logger.debug("{}[{}] removing shard (not allocated)", index, existingShardId);
+                            logger.debug("{} removing shard (not allocated)", existingShard.shardId());
                         }
-                        indexService.removeShard(existingShardId, "removing shard (not allocated)");
+                        indexService.removeShard(existingShard.shardId().id(), "removing shard (not allocated)");
                     }
                 }
             }
@@ -312,7 +326,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
         }
     }
 
-    private void applySettings(ClusterChangedEvent event) {
+    private void applyIndexMetaData(ClusterChangedEvent event) {
         if (!event.metaDataChanged()) {
             return;
         }
@@ -361,8 +375,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
                 }
                 if (requireRefresh && sendRefreshMapping) {
                     nodeMappingRefreshAction.nodeMappingRefresh(event.state(),
-                            new NodeMappingRefreshAction.NodeMappingRefreshRequest(index.getName(), indexMetaData.getIndexUUID(),
-                                    event.state().nodes().localNodeId())
+                        new NodeMappingRefreshAction.NodeMappingRefreshRequest(index.getName(), indexMetaData.getIndexUUID(),
+                            event.state().nodes().localNodeId())
                     );
                 }
             } catch (Throwable t) {
@@ -426,14 +440,13 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
         for (final ShardRouting shardRouting : routingNode) {
             final IndexService indexService = indicesService.indexService(shardRouting.index());
             if (indexService == null) {
-                // got deleted on us, ignore
+                // creation failed for some reasons
+                assert failedShards.containsKey(shardRouting.shardId()) :
+                    "index has local allocation but is not created by applyNewIndices and is not failed " + shardRouting;
                 continue;
             }
             final IndexMetaData indexMetaData = event.state().metaData().index(shardRouting.index());
-            if (indexMetaData == null) {
-                // the index got deleted on the metadata, we will clean it later in the apply deleted method call
-                continue;
-            }
+            assert indexMetaData != null : "index has local allocation but no meta data. " + shardRouting.index();
 
             final int shardId = shardRouting.id();
 
@@ -458,12 +471,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
                 // for example: a shard that recovers from one node and now needs to recover to another node,
                 //              or a replica allocated and then allocating a primary because the primary failed on another node
                 boolean shardHasBeenRemoved = false;
-                if (currentRoutingEntry.isSameAllocation(shardRouting) == false) {
-                    logger.debug("[{}][{}] removing shard (different instance of it allocated on this node, current [{}], global [{}])", shardRouting.index(), shardRouting.id(), currentRoutingEntry, shardRouting);
-                    // closing the shard will also cancel any ongoing recovery.
-                    indexService.removeShard(shardRouting.id(), "removing shard (different instance of it allocated on this node)");
-                    shardHasBeenRemoved = true;
-                } else if (isPeerRecovery(shardRouting)) {
+                assert currentRoutingEntry.isSameAllocation(shardRouting) :
+                    "local shard has a different allocation id but wasn't cleaning by applyDeletedShards. "
+                        + "cluster state: " + shardRouting + " local: " + currentRoutingEntry;
+                if (isPeerRecovery(shardRouting)) {
                     final DiscoveryNode sourceNode = findSourceNodeForPeerRecovery(routingTable, nodes, shardRouting);
                     // check if there is an existing recovery going, and if so, and the source node is not the same, cancel the recovery to restart it
                     if (recoveryTargetService.cancelRecoveriesForShard(indexShard.shardId(), "recovery source node changed", status -> !status.sourceNode().equals(sourceNode))) {
@@ -477,7 +488,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
                 if (shardHasBeenRemoved == false) {
                     // shadow replicas do not support primary promotion. The master would reinitialize the shard, giving it a new allocation, meaning we should be there.
                     assert (shardRouting.primary() && currentRoutingEntry.primary() == false) == false || indexShard.allowsPrimaryPromotion() :
-                            "shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry;
+                        "shard for doesn't support primary promotion but master promoted it with changing allocation. New routing " + shardRouting + ", current routing " + currentRoutingEntry;
                     try {
                         indexShard.updateRoutingEntry(shardRouting, event.state().blocks().disableStatePersistence() == false);
                     } catch (Throwable e) {
@@ -487,7 +498,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
             }
 
             if (shardRouting.initializing()) {
-                applyInitializingShard(event.state(), indexMetaData, shardRouting);
+                applyInitializingShard(event.state(), indexMetaData, indexService, shardRouting);
             }
         }
     }
@@ -519,12 +530,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
         }
     }
 
-    private void applyInitializingShard(final ClusterState state, final IndexMetaData indexMetaData, final ShardRouting shardRouting) {
-        final IndexService indexService = indicesService.indexService(shardRouting.index());
-        if (indexService == null) {
-            // got deleted on us, ignore
-            return;
-        }
+    private void applyInitializingShard(final ClusterState state, final IndexMetaData indexMetaData, IndexService indexService, final ShardRouting shardRouting) {
         final RoutingTable routingTable = state.routingTable();
         final DiscoveryNodes nodes = state.getNodes();
         final int shardId = shardRouting.id();
@@ -537,7 +543,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
                 // we managed to tell the master we started), mark us as started
                 if (logger.isTraceEnabled()) {
                     logger.trace("{} master marked shard as initializing, but shard has state [{}], resending shard started to {}",
-                            indexShard.shardId(), indexShard.state(), nodes.masterNode());
+                        indexShard.shardId(), indexShard.state(), nodes.masterNode());
                 }
                 if (nodes.masterNode() != null) {
                     shardStateAction.shardStarted(shardRouting,
@@ -618,8 +624,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
             assert indexShard.routingEntry().equals(shardRouting); // should have already be done before
             // recover from filesystem store
             final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
-                    RecoveryState.Type.STORE,
-                    nodes.localNode(), nodes.localNode());
+                RecoveryState.Type.STORE,
+                nodes.localNode(), nodes.localNode());
             indexShard.markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
             threadPool.generic().execute(() -> {
                 try {
@@ -634,7 +640,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
         } else {
             // recover from a restore
             final RecoveryState recoveryState = new RecoveryState(indexShard.shardId(), shardRouting.primary(),
-                    RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.localNode());
+                RecoveryState.Type.SNAPSHOT, shardRouting.restoreSource(), nodes.localNode());
             indexShard.markAsRecovering("from snapshot", recoveryState); // mark the shard as recovering on the cluster state thread
             threadPool.generic().execute(() -> {
                 final ShardId sId = indexShard.shardId();

+ 10 - 0
core/src/test/java/org/elasticsearch/action/support/broadcast/node/TransportBroadcastByNodeActionTests.java

@@ -34,7 +34,9 @@ import org.elasticsearch.cluster.block.ClusterBlock;
 import org.elasticsearch.cluster.block.ClusterBlockException;
 import org.elasticsearch.cluster.block.ClusterBlockLevel;
 import org.elasticsearch.cluster.block.ClusterBlocks;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.node.DiscoveryNodes;
 import org.elasticsearch.cluster.routing.IndexRoutingTable;
@@ -212,10 +214,12 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
         IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(new Index(index, "_na_"));
 
         int shardIndex = -1;
+        int totalIndexShards = 0;
         for (int i = 0; i < numberOfNodes; i++) {
             final DiscoveryNode node = newNode(i);
             discoBuilder = discoBuilder.put(node);
             int numberOfShards = randomIntBetween(1, 10);
+            totalIndexShards += numberOfShards;
             for (int j = 0; j < numberOfShards; j++) {
                 final ShardId shardId = new ShardId(index, "_na_", ++shardIndex);
                 ShardRouting shard = TestShardRouting.newShardRouting(index, shardId.getId(), node.id(), true, ShardRoutingState.STARTED);
@@ -228,6 +232,12 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
         discoBuilder.masterNodeId(newNode(numberOfNodes - 1).id());
         ClusterState.Builder stateBuilder = ClusterState.builder(new ClusterName(TEST_CLUSTER));
         stateBuilder.nodes(discoBuilder);
+        final IndexMetaData.Builder indexMetaData = IndexMetaData.builder(index)
+                .settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
+                .numberOfReplicas(0)
+                .numberOfShards(totalIndexShards);
+
+        stateBuilder.metaData(MetaData.builder().put(indexMetaData));
         stateBuilder.routingTable(RoutingTable.builder().add(indexRoutingTable.build()).build());
         ClusterState clusterState = stateBuilder.build();
         setState(clusterService, clusterState);

+ 1 - 1
core/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java

@@ -142,7 +142,7 @@ public class BroadcastReplicationTests extends ESTestCase {
 
     public void testResultCombine() throws InterruptedException, ExecutionException, IOException {
         final String index = "test";
-        int numShards = randomInt(3);
+        int numShards = 1 + randomInt(3);
         setState(clusterService, stateWithAssignedPrimariesAndOneReplica(index, numShards));
         logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint());
         Future<BroadcastResponse> response = (broadcastReplicationAction.execute(new DummyBroadcastRequest().indices(index)));

+ 12 - 6
core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java

@@ -45,6 +45,7 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
 import static org.elasticsearch.test.ESTestCase.randomFrom;
+import static org.elasticsearch.test.ESTestCase.randomInt;
 import static org.elasticsearch.test.ESTestCase.randomIntBetween;
 
 /**
@@ -84,10 +85,11 @@ public class ClusterStateCreationUtils {
         }
         discoBuilder.localNodeId(newNode(0).id());
         discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures
+        final int primaryTerm = randomInt(200);
         IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
                 .put(SETTING_VERSION_CREATED, Version.CURRENT)
                 .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, numberOfReplicas)
-                .put(SETTING_CREATION_DATE, System.currentTimeMillis())).build();
+                .put(SETTING_CREATION_DATE, System.currentTimeMillis())).primaryTerm(0, primaryTerm).build();
 
         RoutingTable.Builder routing = new RoutingTable.Builder();
         routing.addAsNew(indexMetaData);
@@ -111,7 +113,8 @@ public class ClusterStateCreationUtils {
         } else {
             unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
         }
-        indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, 0, primaryNode, relocatingNode, null, true, primaryState, unassignedInfo));
+        indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, 0, primaryNode, relocatingNode, null, true,
+                primaryState, unassignedInfo));
 
         for (ShardRoutingState replicaState : replicaStates) {
             String replicaNode = null;
@@ -152,7 +155,7 @@ public class ClusterStateCreationUtils {
         discoBuilder.masterNodeId(newNode(1).id()); // we need a non-local master to test shard failures
         IndexMetaData indexMetaData = IndexMetaData.builder(index).settings(Settings.builder()
                 .put(SETTING_VERSION_CREATED, Version.CURRENT)
-                .put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 1)
+                .put(SETTING_NUMBER_OF_SHARDS, numberOfShards).put(SETTING_NUMBER_OF_REPLICAS, 1)
                 .put(SETTING_CREATION_DATE, System.currentTimeMillis())).build();
         ClusterState.Builder state = ClusterState.builder(new ClusterName("test"));
         state.nodes(discoBuilder);
@@ -163,8 +166,10 @@ public class ClusterStateCreationUtils {
             routing.addAsNew(indexMetaData);
             final ShardId shardId = new ShardId(index, "_na_", i);
             IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
-            indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(0).id(), null, null, true, ShardRoutingState.STARTED, null));
-            indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(1).id(), null, null, false, ShardRoutingState.STARTED, null));
+            indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(0).id(), null, null, true,
+                    ShardRoutingState.STARTED, null));
+            indexShardRoutingBuilder.addShard(TestShardRouting.newShardRouting(index, i, newNode(1).id(), null, null, false,
+                    ShardRoutingState.STARTED, null));
             indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder.build());
         }
         state.routingTable(RoutingTable.builder().add(indexRoutingTableBuilder.build()).build());
@@ -229,12 +234,13 @@ public class ClusterStateCreationUtils {
 
     /**
      * Creates a cluster state where local node and master node can be specified
+     *
      * @param localNode  node in allNodes that is the local node
      * @param masterNode node in allNodes that is the master node. Can be null if no master exists
      * @param allNodes   all nodes in the cluster
      * @return cluster state
      */
-    public static  ClusterState state(DiscoveryNode localNode, DiscoveryNode masterNode, DiscoveryNode... allNodes) {
+    public static ClusterState state(DiscoveryNode localNode, DiscoveryNode masterNode, DiscoveryNode... allNodes) {
         DiscoveryNodes.Builder discoBuilder = DiscoveryNodes.builder();
         for (DiscoveryNode node : allNodes) {
             discoBuilder.put(node);

+ 17 - 5
core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -630,11 +630,13 @@ public class TransportReplicationActionTests extends ESTestCase {
         final ShardIterator shardIt = shardRoutingTable.shardsIt();
         final ShardId shardId = shardIt.shardId();
         final Request request = new Request(shardId);
+        final long primaryTerm = randomInt(200);
+        request.primaryTerm(primaryTerm);
         final PlainActionFuture<Response> listener = new PlainActionFuture<>();
         ReplicationTask task = maybeTask();
         logger.debug("expecting [{}] assigned replicas, [{}] total shards. using state: \n{}", assignedReplicas, totalShards, clusterService.state().prettyPrint());
 
-        TransportReplicationAction.IndexShardReference reference = getOrCreateIndexShardOperationsCounter();
+        TransportReplicationAction.IndexShardReference reference = getOrCreateIndexShardOperationsCounter(0);
 
         ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard();
         indexShardRouting.set(primaryShard);
@@ -767,6 +769,9 @@ public class TransportReplicationActionTests extends ESTestCase {
         }
         // all replicas have responded so the counter should be decreased again
         assertIndexShardCounter(1);
+
+        // assert that nothing in the replica logic changes the primary term of the operation
+        assertThat(request.primaryTerm(), equalTo(primaryTerm));
     }
 
     public void testCounterOnPrimary() throws Exception {
@@ -989,7 +994,7 @@ public class TransportReplicationActionTests extends ESTestCase {
     /**
      * Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run.
      */
-    private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter() {
+    private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter(long primaryTerm) {
         count.incrementAndGet();
         return new TransportReplicationAction.IndexShardReference() {
             @Override
@@ -1009,6 +1014,11 @@ public class TransportReplicationActionTests extends ESTestCase {
                 return shardRouting;
             }
 
+            @Override
+            public long opPrimaryTerm() {
+                return primaryTerm;
+            }
+
             @Override
             public void close() {
                 count.decrementAndGet();
@@ -1104,13 +1114,15 @@ public class TransportReplicationActionTests extends ESTestCase {
             return false;
         }
 
+
         @Override
         protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) {
-            return getOrCreateIndexShardOperationsCounter();
+            final IndexMetaData indexMetaData = clusterService.state().metaData().index(shardId.getIndex());
+            return getOrCreateIndexShardOperationsCounter(indexMetaData.primaryTerm(shardId.id()));
         }
 
-        protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) {
-            return getOrCreateIndexShardOperationsCounter();
+        protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId, long opPrimaryTerm) {
+            return getOrCreateIndexShardOperationsCounter(opPrimaryTerm);
         }
     }
 

+ 26 - 20
core/src/test/java/org/elasticsearch/cluster/metadata/ToAndFromJsonMetaDataTests.java

@@ -41,11 +41,14 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
                 .put(IndexMetaData.builder("test1")
                         .settings(settings(Version.CURRENT))
                         .numberOfShards(1)
-                        .numberOfReplicas(2))
+                        .numberOfReplicas(2)
+                        .primaryTerm(0, 1))
                 .put(IndexMetaData.builder("test2")
                         .settings(settings(Version.CURRENT).put("setting1", "value1").put("setting2", "value2"))
                         .numberOfShards(2)
-                        .numberOfReplicas(3))
+                        .numberOfReplicas(3)
+                        .primaryTerm(0, 2)
+                        .primaryTerm(1, 2))
                 .put(IndexMetaData.builder("test3")
                         .settings(settings(Version.CURRENT))
                         .numberOfShards(1)
@@ -112,15 +115,15 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
                         .putAlias(newAliasMetaDataBuilder("alias1").filter(ALIAS_FILTER1))
                         .putAlias(newAliasMetaDataBuilder("alias2"))
                         .putAlias(newAliasMetaDataBuilder("alias4").filter(ALIAS_FILTER2)))
-                        .put(IndexTemplateMetaData.builder("foo")
-                                .template("bar")
-                                .order(1)
-                                .settings(settingsBuilder()
-                                        .put("setting1", "value1")
-                                        .put("setting2", "value2"))
-                                .putAlias(newAliasMetaDataBuilder("alias-bar1"))
-                                .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
-                                .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
+                .put(IndexTemplateMetaData.builder("foo")
+                        .template("bar")
+                        .order(1)
+                        .settings(settingsBuilder()
+                                .put("setting1", "value1")
+                                .put("setting2", "value2"))
+                        .putAlias(newAliasMetaDataBuilder("alias-bar1"))
+                        .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
+                        .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
                 .put(IndexMetaData.builder("test12")
                         .settings(settings(Version.CURRENT)
                                 .put("setting1", "value1")
@@ -133,15 +136,15 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
                         .putAlias(newAliasMetaDataBuilder("alias1").filter(ALIAS_FILTER1))
                         .putAlias(newAliasMetaDataBuilder("alias2"))
                         .putAlias(newAliasMetaDataBuilder("alias4").filter(ALIAS_FILTER2)))
-                        .put(IndexTemplateMetaData.builder("foo")
-                                .template("bar")
-                                .order(1)
-                                .settings(settingsBuilder()
-                                        .put("setting1", "value1")
-                                        .put("setting2", "value2"))
-                                .putAlias(newAliasMetaDataBuilder("alias-bar1"))
-                                .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
-                                .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
+                .put(IndexTemplateMetaData.builder("foo")
+                        .template("bar")
+                        .order(1)
+                        .settings(settingsBuilder()
+                                .put("setting1", "value1")
+                                .put("setting2", "value2"))
+                        .putAlias(newAliasMetaDataBuilder("alias-bar1"))
+                        .putAlias(newAliasMetaDataBuilder("alias-bar2").filter("{\"term\":{\"user\":\"kimchy\"}}"))
+                        .putAlias(newAliasMetaDataBuilder("alias-bar3").routing("routing-bar")))
                 .build();
 
         String metaDataSource = MetaData.Builder.toXContent(metaData);
@@ -150,6 +153,7 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
         MetaData parsedMetaData = MetaData.Builder.fromXContent(XContentFactory.xContent(XContentType.JSON).createParser(metaDataSource));
 
         IndexMetaData indexMetaData = parsedMetaData.index("test1");
+        assertThat(indexMetaData.primaryTerm(0), equalTo(1L));
         assertThat(indexMetaData.getNumberOfShards(), equalTo(1));
         assertThat(indexMetaData.getNumberOfReplicas(), equalTo(2));
         assertThat(indexMetaData.getCreationDate(), equalTo(-1L));
@@ -159,6 +163,8 @@ public class ToAndFromJsonMetaDataTests extends ESTestCase {
         indexMetaData = parsedMetaData.index("test2");
         assertThat(indexMetaData.getNumberOfShards(), equalTo(2));
         assertThat(indexMetaData.getNumberOfReplicas(), equalTo(3));
+        assertThat(indexMetaData.primaryTerm(0), equalTo(2L));
+        assertThat(indexMetaData.primaryTerm(1), equalTo(2L));
         assertThat(indexMetaData.getCreationDate(), equalTo(-1L));
         assertThat(indexMetaData.getSettings().getAsMap().size(), equalTo(5));
         assertThat(indexMetaData.getSettings().get("setting1"), equalTo("value1"));

+ 241 - 0
core/src/test/java/org/elasticsearch/cluster/routing/PrimaryTermsTests.java

@@ -0,0 +1,241 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.health.ClusterStateHealth;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.node.DiscoveryNodes.Builder;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.test.ESAllocationTestCase;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
+import static org.elasticsearch.common.settings.Settings.settingsBuilder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class PrimaryTermsTests extends ESAllocationTestCase {
+
+    private static final String TEST_INDEX_1 = "test1";
+    private static final String TEST_INDEX_2 = "test2";
+    private RoutingTable testRoutingTable;
+    private int numberOfShards;
+    private int numberOfReplicas;
+    private final static Settings DEFAULT_SETTINGS = Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT).build();
+    private AllocationService allocationService;
+    private ClusterState clusterState;
+
+    private final Map<String, long[]> primaryTermsPerIndex = new HashMap<>();
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        this.allocationService = createAllocationService(settingsBuilder()
+                .put("cluster.routing.allocation.node_concurrent_recoveries", Integer.MAX_VALUE) // don't limit recoveries
+                .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE)
+                .build());
+        this.numberOfShards = randomIntBetween(1, 5);
+        this.numberOfReplicas = randomIntBetween(1, 5);
+        logger.info("Setup test with " + this.numberOfShards + " shards and " + this.numberOfReplicas + " replicas.");
+        this.primaryTermsPerIndex.clear();
+        MetaData metaData = MetaData.builder()
+                .put(createIndexMetaData(TEST_INDEX_1))
+                .put(createIndexMetaData(TEST_INDEX_2))
+                .build();
+
+        this.testRoutingTable = new RoutingTable.Builder()
+                .add(new IndexRoutingTable.Builder(metaData.index(TEST_INDEX_1).getIndex()).initializeAsNew(metaData.index(TEST_INDEX_1))
+                        .build())
+                .add(new IndexRoutingTable.Builder(metaData.index(TEST_INDEX_2).getIndex()).initializeAsNew(metaData.index(TEST_INDEX_2))
+                        .build())
+                .build();
+
+        this.clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData)
+                .routingTable(testRoutingTable).build();
+    }
+
+    /**
+     * puts primary shard routings into initializing state
+     */
+    private void initPrimaries() {
+        logger.info("adding " + (this.numberOfReplicas + 1) + " nodes and performing rerouting");
+        Builder discoBuilder = DiscoveryNodes.builder();
+        for (int i = 0; i < this.numberOfReplicas + 1; i++) {
+            discoBuilder = discoBuilder.put(newNode("node" + i));
+        }
+        this.clusterState = ClusterState.builder(clusterState).nodes(discoBuilder).build();
+        RoutingAllocation.Result rerouteResult = allocationService.reroute(clusterState, "reroute");
+        this.testRoutingTable = rerouteResult.routingTable();
+        assertThat(rerouteResult.changed(), is(true));
+        applyRerouteResult(rerouteResult);
+        primaryTermsPerIndex.keySet().forEach(this::incrementPrimaryTerm);
+    }
+
+    private void incrementPrimaryTerm(String index) {
+        final long[] primaryTerms = primaryTermsPerIndex.get(index);
+        for (int i = 0; i < primaryTerms.length; i++) {
+            primaryTerms[i]++;
+        }
+    }
+
+    private void incrementPrimaryTerm(String index, int shard) {
+        primaryTermsPerIndex.get(index)[shard]++;
+    }
+
+    private boolean startInitializingShards(String index) {
+        this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build();
+        final List<ShardRouting> startedShards = this.clusterState.getRoutingNodes().shardsWithState(index, INITIALIZING);
+        logger.info("start primary shards for index [{}]: {} ", index, startedShards);
+        RoutingAllocation.Result rerouteResult = allocationService.applyStartedShards(this.clusterState, startedShards);
+        applyRerouteResult(rerouteResult);
+        return rerouteResult.changed();
+    }
+
+    private void applyRerouteResult(RoutingAllocation.Result rerouteResult) {
+        ClusterState previousClusterState = this.clusterState;
+        ClusterState newClusterState = ClusterState.builder(previousClusterState).routingResult(rerouteResult).build();
+        ClusterState.Builder builder = ClusterState.builder(newClusterState).incrementVersion();
+        if (previousClusterState.routingTable() != newClusterState.routingTable()) {
+            builder.routingTable(RoutingTable.builder(newClusterState.routingTable()).version(newClusterState.routingTable().version() + 1)
+                    .build());
+        }
+        if (previousClusterState.metaData() != newClusterState.metaData()) {
+            builder.metaData(MetaData.builder(newClusterState.metaData()).version(newClusterState.metaData().version() + 1));
+        }
+        this.clusterState = builder.build();
+        this.testRoutingTable = rerouteResult.routingTable();
+        final ClusterStateHealth clusterHealth = new ClusterStateHealth(clusterState);
+        logger.info("applied reroute. active shards: p [{}], t [{}], init shards: [{}], relocating: [{}]",
+                clusterHealth.getActivePrimaryShards(), clusterHealth.getActiveShards(),
+                clusterHealth.getInitializingShards(), clusterHealth.getRelocatingShards());
+    }
+
+    private void failSomePrimaries(String index) {
+        this.clusterState = ClusterState.builder(clusterState).routingTable(this.testRoutingTable).build();
+        final IndexRoutingTable indexShardRoutingTable = testRoutingTable.index(index);
+        Set<Integer> shardIdsToFail = new HashSet<>();
+        for (int i = 1 + randomInt(numberOfShards - 1); i > 0; i--) {
+            shardIdsToFail.add(randomInt(numberOfShards - 1));
+        }
+        logger.info("failing primary shards {} for index [{}]", shardIdsToFail, index);
+        List<FailedRerouteAllocation.FailedShard> failedShards = new ArrayList<>();
+        for (int shard : shardIdsToFail) {
+            failedShards.add(new FailedRerouteAllocation.FailedShard(indexShardRoutingTable.shard(shard).primaryShard(), "test", null));
+            incrementPrimaryTerm(index, shard); // the primary failure should increment the primary term;
+        }
+        RoutingAllocation.Result rerouteResult = allocationService.applyFailedShards(this.clusterState, failedShards);
+        applyRerouteResult(rerouteResult);
+    }
+
+    private void addNodes() {
+        DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(clusterState.nodes());
+        final int newNodes = randomInt(10);
+        logger.info("adding [{}] nodes", newNodes);
+        for (int i = 0; i < newNodes; i++) {
+            nodesBuilder.put(newNode("extra_" + i));
+        }
+        this.clusterState = ClusterState.builder(clusterState).nodes(nodesBuilder).build();
+        RoutingAllocation.Result rerouteResult = allocationService.reroute(this.clusterState, "nodes added");
+        applyRerouteResult(rerouteResult);
+
+    }
+
+    private IndexMetaData.Builder createIndexMetaData(String indexName) {
+        primaryTermsPerIndex.put(indexName, new long[numberOfShards]);
+        final IndexMetaData.Builder builder = new IndexMetaData.Builder(indexName)
+                .settings(DEFAULT_SETTINGS)
+                .numberOfReplicas(this.numberOfReplicas)
+                .numberOfShards(this.numberOfShards);
+        for (int i = 0; i < numberOfShards; i++) {
+            builder.primaryTerm(i, randomInt(200));
+            primaryTermsPerIndex.get(indexName)[i] = builder.primaryTerm(i);
+        }
+        return builder;
+    }
+
+    private void assertAllPrimaryTerm() {
+        primaryTermsPerIndex.keySet().forEach(this::assertPrimaryTerm);
+    }
+
+    private void assertPrimaryTerm(String index) {
+        final long[] terms = primaryTermsPerIndex.get(index);
+        final IndexMetaData indexMetaData = clusterState.metaData().index(index);
+        for (IndexShardRoutingTable shardRoutingTable : this.testRoutingTable.index(index)) {
+            final int shard = shardRoutingTable.shardId().id();
+            assertThat("primary term mismatch between indexMetaData of [" + index + "] and shard [" + shard + "]'s routing",
+                    indexMetaData.primaryTerm(shard), equalTo(terms[shard]));
+        }
+    }
+
+    public void testPrimaryTermMetaDataSync() {
+        assertAllPrimaryTerm();
+
+        initPrimaries();
+        assertAllPrimaryTerm();
+
+        startInitializingShards(TEST_INDEX_1);
+        assertAllPrimaryTerm();
+
+        startInitializingShards(TEST_INDEX_2);
+        assertAllPrimaryTerm();
+
+        // now start all replicas too
+        startInitializingShards(TEST_INDEX_1);
+        startInitializingShards(TEST_INDEX_2);
+        assertAllPrimaryTerm();
+
+        // relocations shouldn't change much
+        addNodes();
+        assertAllPrimaryTerm();
+        boolean changed = true;
+        while (changed) {
+            changed = startInitializingShards(TEST_INDEX_1);
+            assertAllPrimaryTerm();
+            changed |= startInitializingShards(TEST_INDEX_2);
+            assertAllPrimaryTerm();
+        }
+
+        // primary promotion
+        failSomePrimaries(TEST_INDEX_1);
+        assertAllPrimaryTerm();
+
+        // stablize cluster
+        changed = true;
+        while (changed) {
+            changed = startInitializingShards(TEST_INDEX_1);
+            assertAllPrimaryTerm();
+            changed |= startInitializingShards(TEST_INDEX_2);
+            assertAllPrimaryTerm();
+        }
+    }
+}

+ 0 - 1
core/src/test/java/org/elasticsearch/cluster/routing/allocation/CatAllocationTestCase.java

@@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.RoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
-import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.test.ESAllocationTestCase;
 
 import java.io.BufferedReader;

+ 22 - 16
core/src/test/java/org/elasticsearch/cluster/routing/allocation/PrimaryElectionRoutingTests.java

@@ -58,29 +58,31 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
         ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
 
         logger.info("Adding two nodes and performing rerouting");
-        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
-        RoutingTable prevRoutingTable = routingTable;
-        routingTable = strategy.reroute(clusterState, "reroute").routingTable();
-        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
+        RoutingAllocation.Result result = strategy.reroute(clusterState, "reroute");
+        clusterState = ClusterState.builder(clusterState).routingResult(result).build();
+
+        clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build();
+        result = strategy.reroute(clusterState, "reroute");
+        clusterState = ClusterState.builder(clusterState).routingResult(result).build();
 
         logger.info("Start the primary shard (on node1)");
         RoutingNodes routingNodes = clusterState.getRoutingNodes();
-        prevRoutingTable = routingTable;
-        routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING)).routingTable();
-        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        result = strategy.applyStartedShards(clusterState, routingNodes.node("node1").shardsWithState(INITIALIZING));
+        clusterState = ClusterState.builder(clusterState).routingResult(result).build();
 
         logger.info("Start the backup shard (on node2)");
         routingNodes = clusterState.getRoutingNodes();
-        prevRoutingTable = routingTable;
-        routingTable = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING)).routingTable();
-        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        result = strategy.applyStartedShards(clusterState, routingNodes.node("node2").shardsWithState(INITIALIZING));
+        clusterState = ClusterState.builder(clusterState).routingResult(result).build();
 
         logger.info("Adding third node and reroute and kill first node");
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node3")).remove("node1")).build();
-        prevRoutingTable = routingTable;
-        routingTable = strategy.reroute(clusterState, "reroute").routingTable();
-        clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
+        RoutingTable prevRoutingTable = clusterState.routingTable();
+        result = strategy.reroute(clusterState, "reroute");
+        clusterState = ClusterState.builder(clusterState).routingResult(result).build();
         routingNodes = clusterState.getRoutingNodes();
+        routingTable = clusterState.routingTable();
 
         assertThat(prevRoutingTable != routingTable, equalTo(true));
         assertThat(routingTable.index("test").shards().size(), equalTo(1));
@@ -89,6 +91,7 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
         assertThat(routingNodes.node("node3").numberOfShardsWithState(INITIALIZING), equalTo(1));
         // verify where the primary is
         assertThat(routingTable.index("test").shard(0).primaryShard().currentNodeId(), equalTo("node2"));
+        assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L));
         assertThat(routingTable.index("test").shard(0).replicaShards().get(0).currentNodeId(), equalTo("node3"));
     }
 
@@ -110,16 +113,18 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
         logger.info("Adding two nodes and performing rerouting");
         clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
         RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState, "reroute");
-        clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+        clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
 
         logger.info("Start the primary shards");
         RoutingNodes routingNodes = clusterState.getRoutingNodes();
         rerouteResult = allocation.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING));
-        clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+        clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
         routingNodes = clusterState.getRoutingNodes();
 
         assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(2));
         assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(2));
+        assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(1L));
+        assertThat(clusterState.metaData().index("test").primaryTerm(1), equalTo(1L));
 
         // now, fail one node, while the replica is initializing, and it also holds a primary
         logger.info("--> fail node with primary");
@@ -129,12 +134,13 @@ public class PrimaryElectionRoutingTests extends ESAllocationTestCase {
                 .put(newNode(nodeIdRemaining))
         ).build();
         rerouteResult = allocation.reroute(clusterState, "reroute");
-        clusterState = ClusterState.builder(clusterState).routingTable(rerouteResult.routingTable()).build();
+        clusterState = ClusterState.builder(clusterState).routingResult(rerouteResult).build();
         routingNodes = clusterState.getRoutingNodes();
 
         assertThat(routingNodes.shardsWithState(STARTED).size(), equalTo(1));
         assertThat(routingNodes.shardsWithState(INITIALIZING).size(), equalTo(1));
         assertThat(routingNodes.node(nodeIdRemaining).shardsWithState(INITIALIZING).get(0).primary(), equalTo(true));
+        assertThat(clusterState.metaData().index("test").primaryTerm(0), equalTo(2L));
 
     }
 }

+ 80 - 0
core/src/test/java/org/elasticsearch/cluster/routing/allocation/ShardStateIT.java

@@ -0,0 +1,80 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.cluster.routing.allocation;
+
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.health.ClusterHealthStatus;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.index.IndexService;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.indices.IndicesService;
+import org.elasticsearch.test.ESIntegTestCase;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class ShardStateIT extends ESIntegTestCase {
+
+    public void testPrimaryFailureIncreasesTerm() throws Exception {
+        internalCluster().ensureAtLeastNumDataNodes(2);
+        prepareCreate("test").setSettings(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2, IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1).get();
+        ensureGreen();
+        assertPrimaryTerms(1, 1);
+
+        logger.info("--> disabling allocation to capture shard failure");
+        disableAllocation("test");
+
+        ClusterState state = client().admin().cluster().prepareState().get().getState();
+        final int shard = randomBoolean() ? 0 : 1;
+        final String nodeId = state.routingTable().index("test").shard(shard).primaryShard().currentNodeId();
+        final String node = state.nodes().get(nodeId).name();
+        logger.info("--> failing primary of [{}] on node [{}]", shard, node);
+        IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
+        indicesService.indexService(resolveIndex("test")).getShard(shard).failShard("simulated test failure", null);
+
+        logger.info("--> waiting for a yellow index");
+        assertBusy(() -> assertThat(client().admin().cluster().prepareHealth().get().getStatus(), equalTo(ClusterHealthStatus.YELLOW)));
+
+        final long term0 = shard == 0 ? 2 : 1;
+        final long term1 = shard == 1 ? 2 : 1;
+        assertPrimaryTerms(term0, term1);
+
+        logger.info("--> enabling allocation");
+        enableAllocation("test");
+        ensureGreen();
+        assertPrimaryTerms(term0, term1);
+    }
+
+    protected void assertPrimaryTerms(long term0, long term1) {
+        for (String node : internalCluster().getNodeNames()) {
+            logger.debug("--> asserting primary terms terms on [{}]", node);
+            ClusterState state = client(node).admin().cluster().prepareState().setLocal(true).get().getState();
+            IndexMetaData metaData = state.metaData().index("test");
+            assertThat(metaData.primaryTerm(0), equalTo(term0));
+            assertThat(metaData.primaryTerm(1), equalTo(term1));
+            IndicesService indicesService = internalCluster().getInstance(IndicesService.class, node);
+            IndexService indexService = indicesService.indexService(metaData.getIndex());
+            if (indexService != null) {
+                for (IndexShard shard : indexService) {
+                    assertThat("term mismatch for shard " + shard.shardId(),
+                        shard.getPrimaryTerm(), equalTo(metaData.primaryTerm(shard.shardId().id())));
+                }
+            }
+        }
+    }
+}

+ 56 - 1
core/src/test/java/org/elasticsearch/gateway/RecoveryFromGatewayIT.java

@@ -19,11 +19,13 @@
 
 package org.elasticsearch.gateway;
 
+import com.carrotsearch.hppc.cursors.ObjectCursor;
 import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
 import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
 import org.elasticsearch.common.settings.Settings;
@@ -37,11 +39,13 @@ import org.elasticsearch.test.ESIntegTestCase;
 import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
 import org.elasticsearch.test.ESIntegTestCase.Scope;
 import org.elasticsearch.test.InternalTestCluster.RestartCallback;
-import org.elasticsearch.test.junit.annotations.TestLogging;
 import org.elasticsearch.test.store.MockFSDirectoryService;
 import org.elasticsearch.test.store.MockFSIndexStore;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
 
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
@@ -88,10 +92,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
         assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
         ensureYellow("test"); // wait for primary allocations here otherwise if we have a lot of shards we might have a
         // shard that is still in post recovery when we restart and the ensureYellow() below will timeout
+
+        Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
         internalCluster().fullRestart();
 
         logger.info("Running Cluster Health (wait for the shards to startup)");
         ensureYellow();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         client().admin().indices().prepareRefresh().execute().actionGet();
         assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
@@ -100,11 +107,37 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
 
         logger.info("Running Cluster Health (wait for the shards to startup)");
         ensureYellow();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         client().admin().indices().prepareRefresh().execute().actionGet();
         assertHitCount(client().prepareSearch().setSize(0).setQuery(termQuery("appAccountIds", 179)).execute().actionGet(), 2);
     }
 
+    private Map<String, long[]> assertAndCapturePrimaryTerms(Map<String, long[]> previousTerms) {
+        if (previousTerms == null) {
+            previousTerms = new HashMap<>();
+        }
+        final Map<String, long[]> result = new HashMap<>();
+        final ClusterState state = client().admin().cluster().prepareState().get().getState();
+        for (ObjectCursor<IndexMetaData> cursor : state.metaData().indices().values()) {
+            final IndexMetaData indexMetaData = cursor.value;
+            final String index = indexMetaData.getIndex().getName();
+            final long[] previous = previousTerms.get(index);
+            final long[] current = IntStream.range(0, indexMetaData.getNumberOfShards()).mapToLong(indexMetaData::primaryTerm).toArray();
+            if (previous == null) {
+                result.put(index, current);
+            } else {
+                assertThat("number of terms changed for index [" + index + "]", current.length, equalTo(previous.length));
+                for (int shard = 0; shard < current.length; shard++) {
+                    assertThat("primary term didn't increase for [" + index + "][" + shard + "]", current[shard], greaterThan(previous[shard]));
+                }
+                result.put(index, current);
+            }
+        }
+
+        return result;
+    }
+
     public void testSingleNodeNoFlush() throws Exception {
         internalCluster().startNode();
 
@@ -163,10 +196,14 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
             logger.info("Ensure all primaries have been started");
             ensureYellow();
         }
+
+        Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
+
         internalCluster().fullRestart();
 
         logger.info("Running Cluster Health (wait for the shards to startup)");
         ensureYellow();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         for (int i = 0; i <= randomInt(10); i++) {
             assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), value1Docs + value2Docs);
@@ -180,6 +217,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
 
         logger.info("Running Cluster Health (wait for the shards to startup)");
         ensureYellow();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         for (int i = 0; i <= randomInt(10); i++) {
             assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).get(), value1Docs + value2Docs);
@@ -201,10 +239,13 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
         ensureYellow("test"); // wait for primary allocations here otherwise if we have a lot of shards we might have a
         // shard that is still in post recovery when we restart and the ensureYellow() below will timeout
 
+        Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
+
         internalCluster().fullRestart();
 
         logger.info("Running Cluster Health (wait for the shards to startup)");
         ensureYellow();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         for (int i = 0; i < 10; i++) {
             assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2);
@@ -214,6 +255,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
 
         logger.info("Running Cluster Health (wait for the shards to startup)");
         ensureYellow();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         for (int i = 0; i < 10; i++) {
             assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2);
@@ -236,6 +278,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
             assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2);
         }
 
+        Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
+
         internalCluster().fullRestart(new RestartCallback() {
             @Override
             public Settings onNodeStopped(String nodeName) throws Exception {
@@ -251,6 +295,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
 
         logger.info("Running Cluster Health (wait for the shards to startup)");
         ensureGreen();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         for (int i = 0; i < 10; i++) {
             assertHitCount(client().prepareSearch().setSize(0).setQuery(matchAllQuery()).execute().actionGet(), 2);
@@ -276,6 +321,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
         String metaDataUuid = client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID();
         assertThat(metaDataUuid, not(equalTo("_na_")));
 
+        Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
+
         logger.info("--> closing first node, and indexing more data to the second node");
         internalCluster().fullRestart(new RestartCallback() {
 
@@ -315,6 +362,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
 
         logger.info("--> running cluster_health (wait for the shards to startup)");
         ensureGreen();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         assertThat(client().admin().cluster().prepareState().execute().get().getState().getMetaData().clusterUUID(), equalTo(metaDataUuid));
 
@@ -386,11 +434,15 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
                 .setTransientSettings(settingsBuilder()
                         .put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), EnableAllocationDecider.Allocation.NONE))
                 .get();
+
+        Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
+
         logger.info("--> full cluster restart");
         internalCluster().fullRestart();
 
         logger.info("--> waiting for cluster to return to green after {}shutdown", useSyncIds ? "" : "second ");
         ensureGreen();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         if (useSyncIds) {
             assertSyncIdsNotNull();
@@ -445,6 +497,8 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
         internalCluster().startNode(settingsBuilder().put(Environment.PATH_DATA_SETTING.getKey(), createTempDir()).build());
 
         ensureGreen();
+        Map<String, long[]> primaryTerms = assertAndCapturePrimaryTerms(null);
+
 
         internalCluster().fullRestart(new RestartCallback() {
 
@@ -455,6 +509,7 @@ public class RecoveryFromGatewayIT extends ESIntegTestCase {
         });
 
         ensureYellow();
+        primaryTerms = assertAndCapturePrimaryTerms(primaryTerms);
 
         assertThat(client().admin().indices().prepareExists("test").execute().actionGet().isExists(), equalTo(true));
         assertHitCount(client().prepareSearch("test").setSize(0).setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(), 1);

+ 153 - 40
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -37,18 +37,22 @@ import org.elasticsearch.action.admin.indices.stats.CommonStats;
 import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
 import org.elasticsearch.action.admin.indices.stats.IndexStats;
 import org.elasticsearch.action.admin.indices.stats.ShardStats;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.TransportIndexAction;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.support.IndicesOptions;
 import org.elasticsearch.cluster.ClusterInfoService;
 import org.elasticsearch.cluster.ClusterState;
 import org.elasticsearch.cluster.InternalClusterInfoService;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
 import org.elasticsearch.cluster.metadata.SnapshotId;
 import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.AllocationId;
 import org.elasticsearch.cluster.routing.RestoreSource;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingHelper;
+import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
 import org.elasticsearch.cluster.routing.UnassignedInfo;
 import org.elasticsearch.cluster.service.ClusterService;
@@ -59,7 +63,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lease.Releasables;
 import org.elasticsearch.common.logging.ESLogger;
-import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.DummyTransportAddress;
 import org.elasticsearch.common.unit.ByteSizeUnit;
@@ -119,6 +122,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
 import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
+import static org.elasticsearch.common.lucene.Lucene.cleanLuceneIndex;
 import static org.elasticsearch.common.settings.Settings.settingsBuilder;
 import static org.elasticsearch.common.xcontent.ToXContent.EMPTY_PARAMS;
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
@@ -127,6 +131,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
 import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 
@@ -168,6 +173,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         createIndex("test");
         ensureGreen();
         NodeEnvironment env = getInstanceFromNode(NodeEnvironment.class);
+
         ClusterService cs = getInstanceFromNode(ClusterService.class);
         final Index index = cs.state().metaData().index("test").getIndex();
         Path[] shardPaths = env.availableShardPaths(new ShardId(index, 0));
@@ -295,31 +301,133 @@ public class IndexShardTests extends ESSingleNodeTestCase {
             // expected
         }
         try {
-            indexShard.acquireReplicaOperationLock();
+            indexShard.acquireReplicaOperationLock(indexShard.getPrimaryTerm());
             fail("we should not be able to increment anymore");
         } catch (IndexShardClosedException e) {
             // expected
         }
     }
 
-    public void testIndexOperationsCounter() throws InterruptedException, ExecutionException, IOException {
+    public void testOperationLocksOnPrimaryShards() throws InterruptedException, ExecutionException, IOException {
         assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
         ensureGreen("test");
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
         IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test"));
         IndexShard indexShard = indexService.getShardOrNull(0);
+        long primaryTerm = indexShard.getPrimaryTerm();
+
+        ShardRouting temp = indexShard.routingEntry();
+        final ShardRouting newPrimaryShardRouting;
+        if (randomBoolean()) {
+            // relocation target
+            newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), "other node",
+                true, ShardRoutingState.INITIALIZING, AllocationId.newRelocation(temp.allocationId()));
+        } else if (randomBoolean()) {
+            // simulate promotion
+            ShardRouting newReplicaShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null,
+                false, ShardRoutingState.STARTED, temp.allocationId());
+            indexShard.updateRoutingEntry(newReplicaShardRouting, false);
+            primaryTerm = primaryTerm + 1;
+            indexShard.updatePrimaryTerm(primaryTerm);
+            newPrimaryShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null,
+                true, ShardRoutingState.STARTED, temp.allocationId());
+        } else {
+            newPrimaryShardRouting = temp;
+        }
+        indexShard.updateRoutingEntry(newPrimaryShardRouting, false);
+
         assertEquals(0, indexShard.getActiveOperationsCount());
+        if (newPrimaryShardRouting.isRelocationTarget() == false) {
+            try {
+                indexShard.acquireReplicaOperationLock(primaryTerm);
+                fail("shard shouldn't accept operations as replica");
+            } catch (IllegalStateException ignored) {
+
+            }
+        }
         Releasable operation1 = indexShard.acquirePrimaryOperationLock();
         assertEquals(1, indexShard.getActiveOperationsCount());
         Releasable operation2 = indexShard.acquirePrimaryOperationLock();
         assertEquals(2, indexShard.getActiveOperationsCount());
+
+        Releasables.close(operation1, operation2);
+        assertEquals(0, indexShard.getActiveOperationsCount());
+    }
+
+    public void testOperationLocksOnReplicaShards() throws InterruptedException, ExecutionException, IOException {
+        assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get());
+        ensureGreen("test");
+        IndicesService indicesService = getInstanceFromNode(IndicesService.class);
+        IndexService indexService = indicesService.indexServiceSafe(resolveIndex("test"));
+        IndexShard indexShard = indexService.getShardOrNull(0);
+        long primaryTerm = indexShard.getPrimaryTerm();
+
+        // ugly hack to allow the shard to operated as a replica
+        final ShardRouting temp = indexShard.routingEntry();
+        final ShardRouting newShardRouting;
+        switch (randomInt(2)) {
+            case 0:
+                // started replica
+                newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), null,
+                    false, ShardRoutingState.STARTED, AllocationId.newRelocation(temp.allocationId()));
+
+                indexShard.updateRoutingEntry(newShardRouting, false);
+                break;
+            case 1:
+                // initializing replica / primary
+                final boolean relocating = randomBoolean();
+                newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(),
+                    relocating ? "sourceNode" : null,
+                    relocating ? randomBoolean() : false,
+                    ShardRoutingState.INITIALIZING,
+                    relocating ? AllocationId.newRelocation(temp.allocationId()) : temp.allocationId());
+                indexShard.updateRoutingEntry(newShardRouting, false);
+                break;
+            case 2:
+                // relocation source
+                newShardRouting = TestShardRouting.newShardRouting(temp.index(), temp.id(), temp.currentNodeId(), "otherNode",
+                    false, ShardRoutingState.RELOCATING, AllocationId.newRelocation(temp.allocationId()));
+                indexShard.updateRoutingEntry(newShardRouting, false);
+                indexShard.relocated("test");
+                break;
+            default:
+                throw new UnsupportedOperationException("get your numbers straight");
+
+        }
+        logger.info("updated shard routing to {}", newShardRouting);
+
+        assertEquals(0, indexShard.getActiveOperationsCount());
+        if (newShardRouting.primary() == false) {
+            try {
+                indexShard.acquirePrimaryOperationLock();
+                fail("shard shouldn't accept primary ops");
+            } catch (IllegalStateException ignored) {
+
+            }
+        }
+
+        Releasable operation1 = indexShard.acquireReplicaOperationLock(primaryTerm);
+        assertEquals(1, indexShard.getActiveOperationsCount());
+        Releasable operation2 = indexShard.acquireReplicaOperationLock(primaryTerm);
+        assertEquals(2, indexShard.getActiveOperationsCount());
+
+        try {
+            indexShard.acquireReplicaOperationLock(primaryTerm - 1);
+            fail("you can not increment the operation counter with an older primary term");
+        } catch (IllegalArgumentException e) {
+            assertThat(e.getMessage(), containsString("operation term"));
+            assertThat(e.getMessage(), containsString("too old"));
+        }
+
+        // but you can increment with a newer one..
+        indexShard.acquireReplicaOperationLock(primaryTerm + 1 + randomInt(20)).close();
         Releasables.close(operation1, operation2);
         assertEquals(0, indexShard.getActiveOperationsCount());
     }
 
     public void testMarkAsInactiveTriggersSyncedFlush() throws Exception {
         assertAcked(client().admin().indices().prepareCreate("test")
-                .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
+            .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
         client().prepareIndex("test", "test").setSource("{}").get();
         ensureGreen("test");
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
@@ -364,14 +472,14 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         assertTrue(shard.getEngine().getTranslog().syncNeeded());
         setDurability(shard, Translog.Durability.REQUEST);
         assertNoFailures(client().prepareBulk()
-                .add(client().prepareIndex("test", "bar", "3").setSource("{}"))
-                .add(client().prepareDelete("test", "bar", "1")).get());
+            .add(client().prepareIndex("test", "bar", "3").setSource("{}"))
+            .add(client().prepareDelete("test", "bar", "1")).get());
         assertFalse(shard.getEngine().getTranslog().syncNeeded());
 
         setDurability(shard, Translog.Durability.ASYNC);
         assertNoFailures(client().prepareBulk()
-                .add(client().prepareIndex("test", "bar", "4").setSource("{}"))
-                .add(client().prepareDelete("test", "bar", "3")).get());
+            .add(client().prepareIndex("test", "bar", "4").setSource("{}"))
+            .add(client().prepareDelete("test", "bar", "3")).get());
         setDurability(shard, Translog.Durability.REQUEST);
         assertTrue(shard.getEngine().getTranslog().syncNeeded());
     }
@@ -384,7 +492,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
     public void testMinimumCompatVersion() {
         Version versionCreated = VersionUtils.randomVersion(random());
         assertAcked(client().admin().indices().prepareCreate("test")
-                .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id));
+            .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0, SETTING_VERSION_CREATED, versionCreated.id));
         client().prepareIndex("test", "test").setSource("{}").get();
         ensureGreen("test");
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
@@ -398,7 +506,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
 
     public void testUpdatePriority() {
         assertAcked(client().admin().indices().prepareCreate("test")
-                .setSettings(IndexMetaData.SETTING_PRIORITY, 200));
+            .setSettings(IndexMetaData.SETTING_PRIORITY, 200));
         IndexService indexService = getInstanceFromNode(IndicesService.class).indexService(resolveIndex("test"));
         assertEquals(200, indexService.getIndexSettings().getSettings().getAsInt(IndexMetaData.SETTING_PRIORITY, 0).intValue());
         client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(IndexMetaData.SETTING_PRIORITY, 400).build()).get();
@@ -434,8 +542,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         Path idxPath = env.sharedDataFile().resolve(randomAsciiOfLength(10));
         logger.info("--> idxPath: [{}]", idxPath);
         Settings idxSettings = Settings.builder()
-                .put(IndexMetaData.SETTING_DATA_PATH, idxPath)
-                .build();
+            .put(IndexMetaData.SETTING_DATA_PATH, idxPath)
+            .build();
         createIndex("test", idxSettings);
         ensureGreen("test");
         client().prepareIndex("test", "bar", "1").setSource("{}").setRefresh(true).get();
@@ -447,7 +555,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
 
     public void testExpectedShardSizeIsPresent() throws InterruptedException {
         assertAcked(client().admin().indices().prepareCreate("test")
-                .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
+            .setSettings(SETTING_NUMBER_OF_SHARDS, 1, SETTING_NUMBER_OF_REPLICAS, 0));
         for (int i = 0; i < 50; i++) {
             client().prepareIndex("test", "test").setSource("{}").get();
         }
@@ -475,11 +583,11 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         IOUtils.rm(endDir);
 
         Settings sb = Settings.builder()
-                .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString())
-                .build();
+            .put(IndexMetaData.SETTING_DATA_PATH, startDir.toAbsolutePath().toString())
+            .build();
         Settings sb2 = Settings.builder()
-                .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString())
-                .build();
+            .put(IndexMetaData.SETTING_DATA_PATH, endDir.toAbsolutePath().toString())
+            .build();
 
         logger.info("--> creating an index with data_path [{}]", startDir.toAbsolutePath().toString());
         createIndex(INDEX, sb);
@@ -510,9 +618,9 @@ public class IndexShardTests extends ESSingleNodeTestCase {
 
         logger.info("--> updating settings...");
         client().admin().indices().prepareUpdateSettings(INDEX)
-                .setSettings(sb2)
-                .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true))
-                .get();
+            .setSettings(sb2)
+            .setIndicesOptions(IndicesOptions.fromOptions(true, false, true, true))
+            .get();
 
         assert Files.exists(startDir) == false : "start dir shouldn't exist";
 
@@ -642,7 +750,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         try {
             shard.index(index);
             fail();
-        }catch (IllegalIndexShardStateException e){
+        } catch (IllegalIndexShardStateException e) {
 
         }
 
@@ -655,7 +763,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         try {
             shard.delete(delete);
             fail();
-        }catch (IllegalIndexShardStateException e){
+        } catch (IllegalIndexShardStateException e) {
 
         }
 
@@ -692,7 +800,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         long size = shard.getEngine().getTranslog().sizeInBytes();
         logger.info("--> current translog size: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
         client().admin().indices().prepareUpdateSettings("test").setSettings(settingsBuilder().put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(size, ByteSizeUnit.BYTES))
-                .build()).get();
+            .build()).get();
         client().prepareDelete("test", "test", "2").get();
         logger.info("--> translog size after delete: [{}] num_ops [{}] generation [{}]", shard.getEngine().getTranslog().sizeInBytes(), shard.getEngine().getTranslog().totalOperations(), shard.getEngine().getTranslog().getGeneration());
         assertBusy(() -> { // this is async
@@ -877,7 +985,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         newShard.updateRoutingEntry(routing, false);
         DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
         newShard.markAsRecovering("store", new RecoveryState(newShard.shardId(), routing.primary(), RecoveryState.Type.STORE, localNode,
-                localNode));
+            localNode));
         assertTrue(newShard.recoverFromStore(localNode));
         assertEquals(0, newShard.recoveryState().getTranslog().recoveredOperations());
         assertEquals(0, newShard.recoveryState().getTranslog().totalOperations());
@@ -890,7 +998,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         assertHitCount(response, 0);
     }
 
-    public void testFailIfIndexNotPresentInRecoverFromStore() throws IOException {
+    public void testFailIfIndexNotPresentInRecoverFromStore() throws Exception {
         createIndex("test");
         ensureGreen();
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
@@ -907,7 +1015,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         Store store = shard.store();
         store.incRef();
         test.removeShard(0, "b/c simon says so");
-        Lucene.cleanLuceneIndex(store.directory());
+        cleanLuceneIndex(store.directory());
         store.decRef();
         ShardRoutingHelper.reinit(routing);
         IndexShard newShard = test.createShard(routing);
@@ -940,7 +1048,12 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         newShard.updateRoutingEntry(routing, true);
         SearchResponse response = client().prepareSearch().get();
         assertHitCount(response, 0);
-        client().prepareIndex("test", "test", "0").setSource("{}").setRefresh(true).get();
+        // we can't issue this request through a client because of the inconsistencies we created with the cluster state
+        // doing it directly instead
+        IndexRequest request = client().prepareIndex("test", "test", "0").setSource("{}").request();
+        request.process(MetaData.builder().put(test.getMetaData(), false).build(), null, false, "test");
+        TransportIndexAction.executeIndexRequestOnPrimary(request, newShard, null);
+        newShard.refresh("test");
         assertHitCount(client().prepareSearch().get(), 1);
     }
 
@@ -999,7 +1112,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
             @Override
             public void restore(SnapshotId snapshotId, Version version, ShardId shardId, ShardId snapshotShardId, RecoveryState recoveryState) {
                 try {
-                    Lucene.cleanLuceneIndex(targetStore.directory());
+                    cleanLuceneIndex(targetStore.directory());
                     for (String file : sourceStore.directory().listAll()) {
                         if (file.equals("write.lock") || file.startsWith("extra")) {
                             continue;
@@ -1205,12 +1318,12 @@ public class IndexShardTests extends ESSingleNodeTestCase {
     public void testIndexingBufferDuringInternalRecovery() throws IOException {
         createIndex("index");
         client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject()
-                .startObject("testtype")
-                .startObject("properties")
-                .startObject("foo")
-                .field("type", "text")
-                .endObject()
-                .endObject().endObject().endObject()).get();
+            .startObject("testtype")
+            .startObject("properties")
+            .startObject("foo")
+            .field("type", "text")
+            .endObject()
+            .endObject().endObject().endObject()).get();
         ensureGreen();
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
         IndexService test = indicesService.indexService(resolveIndex("index"));
@@ -1234,12 +1347,12 @@ public class IndexShardTests extends ESSingleNodeTestCase {
     public void testIndexingBufferDuringPeerRecovery() throws IOException {
         createIndex("index");
         client().admin().indices().preparePutMapping("index").setType("testtype").setSource(jsonBuilder().startObject()
-                .startObject("testtype")
-                .startObject("properties")
-                .startObject("foo")
-                .field("type", "text")
-                .endObject()
-                .endObject().endObject().endObject()).get();
+            .startObject("testtype")
+            .startObject("properties")
+            .startObject("foo")
+            .field("type", "text")
+            .endObject()
+            .endObject().endObject().endObject()).get();
         ensureGreen();
         IndicesService indicesService = getInstanceFromNode(IndicesService.class);
         IndexService test = indicesService.indexService(resolveIndex("index"));

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/cluster/routing/TestShardRouting.java

@@ -30,7 +30,7 @@ import org.elasticsearch.test.ESTestCase;
 public class TestShardRouting {
 
     public static ShardRouting newShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state) {
-        return newShardRouting(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE), shardId, currentNodeId,primary, state);
+        return newShardRouting(new Index(index, IndexMetaData.INDEX_UUID_NA_VALUE), shardId, currentNodeId, primary, state);
     }
 
     public static ShardRouting newShardRouting(Index index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state) {