Bläddra i källkod

Optimize indexing for the autogenerated ID append-only case (#20211)

If elasticsearch controls the ID values as well as the documents
version we can optimize the code that adds / appends the documents
to the index. Essentially we an skip the version lookup for all
documents unless the same document is delivered more than once.

On the lucene level we can simply call IndexWriter#addDocument instead
of #updateDocument but on the Engine level we need to ensure that we deoptimize
the case once we see the same document more than once.

This is done as follows:

1. Mark every request with a timestamp. This is done once on the first node that
receives a request and is fixed for this request. This can be even the
machine local time (see why later). The important part is that retry
requests will have the same value as the original one.

2. In the engine we make sure we keep the highest seen time stamp of "retry" requests.
This is updated while the retry request has its doc id lock. Call this `maxUnsafeAutoIdTimestamp`

3. When the engine runs an "optimized" request comes, it compares it's timestamp with the
current `maxUnsafeAutoIdTimestamp` (but doesn't update it). If the the request
timestamp is higher it is safe to execute it as optimized (no retry request with the same
timestamp has been run before). If not we fall back to "non-optimzed" mode and run the request as a retry one
and update the `maxUnsafeAutoIdTimestamp` unless it's been updated already to a higher value

Relates to #19813
Simon Willnauer 9 år sedan
förälder
incheckning
a0becd26b1
34 ändrade filer med 783 tillägg och 160 borttagningar
  1. 0 2
      buildSrc/src/main/resources/checkstyle_suppressions.xml
  2. 12 0
      core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java
  3. 50 5
      core/src/main/java/org/elasticsearch/action/index/IndexRequest.java
  4. 0 9
      core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java
  5. 2 2
      core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
  6. 8 0
      core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java
  7. 1 0
      core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java
  8. 1 0
      core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java
  9. 26 7
      core/src/main/java/org/elasticsearch/index/engine/Engine.java
  10. 19 1
      core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
  11. 146 28
      core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  12. 18 0
      core/src/main/java/org/elasticsearch/index/engine/SegmentsStats.java
  13. 2 1
      core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java
  14. 11 7
      core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  15. 4 1
      core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java
  16. 20 2
      core/src/main/java/org/elasticsearch/index/translog/Translog.java
  17. 9 0
      core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java
  18. 22 1
      core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java
  19. 1 1
      core/src/test/java/org/elasticsearch/aliases/IndexAliasesIT.java
  20. 272 45
      core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  21. 1 4
      core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
  22. 9 0
      core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
  23. 68 0
      core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
  24. 4 0
      core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  25. 41 14
      core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java
  26. 4 5
      core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
  27. 10 1
      core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java
  28. 1 1
      core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java
  29. 1 1
      core/src/test/java/org/elasticsearch/recovery/RelocationIT.java
  30. 1 1
      core/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardSizeTestCase.java
  31. 1 1
      core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java
  32. 5 0
      docs/reference/migration/migrate_5_0/index-apis.asciidoc
  33. 1 18
      rest-api-spec/src/main/resources/rest-api-spec/test/create/15_without_id.yaml
  34. 12 2
      test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java

+ 0 - 2
buildSrc/src/main/resources/checkstyle_suppressions.xml

@@ -386,7 +386,6 @@
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]MapperService.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]Mapping.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]MetadataFieldMapper.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]ParsedDocument.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]CompletionFieldMapper.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]LegacyDateFieldMapper.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]mapper[/\\]LegacyDoubleFieldMapper.java" checks="LineLength" />
@@ -865,7 +864,6 @@
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]CorruptedFileIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]CorruptedTranslogIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]DirectoryUtilsTests.java" checks="LineLength" />
-  <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]ExceptionRetryIT.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]IndexStoreTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]store[/\\]StoreTests.java" checks="LineLength" />
   <suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]suggest[/\\]stats[/\\]SuggestStatsIT.java" checks="LineLength" />

+ 12 - 0
core/src/main/java/org/elasticsearch/action/bulk/BulkShardRequest.java

@@ -20,6 +20,7 @@
 package org.elasticsearch.action.bulk;
 
 import org.elasticsearch.action.support.replication.ReplicatedWriteRequest;
+import org.elasticsearch.action.support.replication.ReplicationRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.shard.ShardId;
@@ -101,4 +102,15 @@ public class BulkShardRequest extends ReplicatedWriteRequest<BulkShardRequest> {
         }
         return b.toString();
     }
+
+    @Override
+    public void onRetry() {
+        for (BulkItemRequest item : items) {
+            if (item.request() instanceof ReplicationRequest) {
+                // all replication requests need to be notified here as well to ie. make sure that internal optimizations are
+                // disabled see IndexRequest#canHaveDuplicates()
+                ((ReplicationRequest) item.request()).onRetry();
+            }
+        }
+    }
 }

+ 50 - 5
core/src/main/java/org/elasticsearch/action/index/IndexRequest.java

@@ -72,7 +72,7 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
     /**
      * Operation type controls if the type of the index operation.
      */
-    public static enum OpType {
+    public enum OpType {
         /**
          * Index the source. If there an existing document with the id, it will
          * be replaced.
@@ -152,6 +152,17 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
 
     private String pipeline;
 
+    /**
+     * Value for {@link #getAutoGeneratedTimestamp()} if the document has an external
+     * provided ID.
+     */
+    public static final int UNSET_AUTO_GENERATED_TIMESTAMP = -1;
+
+    private long autoGeneratedTimestamp = UNSET_AUTO_GENERATED_TIMESTAMP;
+
+    private boolean isRetry = false;
+
+
     public IndexRequest() {
     }
 
@@ -202,6 +213,10 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
             }
         }
 
+        if (opType() != OpType.INDEX && id == null) {
+            addValidationError("an id is required for a " + opType() + " operation", validationException);
+        }
+
         if (!versionType.validateVersionForWrites(version)) {
             validationException = addValidationError("illegal version value [" + version + "] for version type [" + versionType.name() + "]", validationException);
         }
@@ -216,6 +231,11 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
             validationException = addValidationError("id is too long, must be no longer than 512 bytes but was: " +
                             id.getBytes(StandardCharsets.UTF_8).length, validationException);
         }
+
+        if (id == null && (versionType == VersionType.INTERNAL && version == Versions.MATCH_ANY) == false) {
+            validationException = addValidationError("an id must be provided if version type or value are set", validationException);
+        }
+
         return validationException;
     }
 
@@ -589,10 +609,10 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         }
 
         // generate id if not already provided and id generation is allowed
-        if (allowIdGeneration) {
-            if (id == null) {
-                id(UUIDs.base64UUID());
-            }
+        if (allowIdGeneration && id == null) {
+            assert autoGeneratedTimestamp == -1;
+            autoGeneratedTimestamp = Math.max(0, System.currentTimeMillis()); // extra paranoia
+            id(UUIDs.base64UUID());
         }
 
         // generate timestamp if not provided, we always have one post this stage...
@@ -639,6 +659,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         version = in.readLong();
         versionType = VersionType.fromValue(in.readByte());
         pipeline = in.readOptionalString();
+        isRetry = in.readBoolean();
+        autoGeneratedTimestamp = in.readLong();
     }
 
     @Override
@@ -655,6 +677,8 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         out.writeLong(version);
         out.writeByte(versionType.getValue());
         out.writeOptionalString(pipeline);
+        out.writeBoolean(isRetry);
+        out.writeLong(autoGeneratedTimestamp);
     }
 
     @Override
@@ -667,4 +691,25 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
         }
         return "index {[" + index + "][" + type + "][" + id + "], source[" + sSource + "]}";
     }
+
+
+    /**
+     * Returns <code>true</code> if this request has been sent to a shard copy more than once.
+     */
+    public boolean isRetry() {
+        return isRetry;
+    }
+
+    @Override
+    public void onRetry() {
+        isRetry = true;
+    }
+
+    /**
+     * Returns the timestamp the auto generated ID was created or {@value #UNSET_AUTO_GENERATED_TIMESTAMP} if the
+     * document has no auto generated timestamp. This method will return a positive value iff the id was auto generated.
+     */
+    public long getAutoGeneratedTimestamp() {
+        return autoGeneratedTimestamp;
+    }
 }

+ 0 - 9
core/src/main/java/org/elasticsearch/action/index/IndexRequestBuilder.java

@@ -205,15 +205,6 @@ public class IndexRequestBuilder extends ReplicationRequestBuilder<IndexRequest,
         return this;
     }
 
-    /**
-     * Sets a string representation of the {@link #setOpType(org.elasticsearch.action.index.IndexRequest.OpType)}. Can
-     * be either "index" or "create".
-     */
-    public IndexRequestBuilder setOpType(String opType) {
-        request.opType(IndexRequest.OpType.fromString(opType));
-        return this;
-    }
-
     /**
      * Set to <tt>true</tt> to force this index to use {@link org.elasticsearch.action.index.IndexRequest.OpType#CREATE}.
      */

+ 2 - 2
core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java

@@ -158,7 +158,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
         SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.REPLICA, shardId.getIndexName(), request.type(), request.id(), request.source())
                 .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
 
-        final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType());
+        final Engine.Index operation = indexShard.prepareIndexOnReplica(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
         Mapping update = operation.parsedDoc().dynamicMappingsUpdate();
         if (update != null) {
             throw new RetryOnReplicaException(shardId, "Mappings are not available on the replica yet, triggered update: " + update);
@@ -171,7 +171,7 @@ public class TransportIndexAction extends TransportWriteAction<IndexRequest, Ind
     public static Engine.Index prepareIndexOperationOnPrimary(IndexRequest request, IndexShard indexShard) {
         SourceToParse sourceToParse = SourceToParse.source(SourceToParse.Origin.PRIMARY, request.index(), request.type(), request.id(), request.source())
             .routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
-        return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType());
+        return indexShard.prepareIndexOnPrimary(sourceToParse, request.version(), request.versionType(), request.getAutoGeneratedTimestamp(), request.isRetry());
     }
 
     public static WriteResult<IndexResponse> executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard,

+ 8 - 0
core/src/main/java/org/elasticsearch/action/support/replication/ReplicationRequest.java

@@ -248,4 +248,12 @@ public abstract class ReplicationRequest<Request extends ReplicationRequest<Requ
     public String getDescription() {
         return toString();
     }
+
+    /**
+     * This method is called before this replication request is retried
+     * the first time.
+     */
+    public void onRetry() {
+        // nothing by default
+    }
 }

+ 1 - 0
core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

@@ -722,6 +722,7 @@ public abstract class TransportReplicationAction<
                 return;
             }
             setPhase(task, "waiting_for_retry");
+            request.onRetry();
             final ThreadContext.StoredContext context = threadPool.getThreadContext().newStoredContext();
             observer.waitForNextChange(new ClusterStateObserver.Listener() {
                 @Override

+ 1 - 0
core/src/main/java/org/elasticsearch/common/settings/IndexScopedSettings.java

@@ -140,6 +140,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
         PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
         FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
         EngineConfig.INDEX_CODEC_SETTING,
+        EngineConfig.INDEX_OPTIMIZE_AUTO_GENERATED_IDS,
         IndexMetaData.SETTING_WAIT_FOR_ACTIVE_SHARDS,
         // validate that built-in similarities don't get redefined
         Setting.groupSetting("index.similarity.", (s) -> {

+ 26 - 7
core/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -86,9 +86,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 
-/**
- *
- */
 public abstract class Engine implements Closeable {
 
     public static final String SYNC_COMMIT_ID = "sync_id";
@@ -856,19 +853,24 @@ public abstract class Engine implements Closeable {
     public static class Index extends Operation {
 
         private final ParsedDocument doc;
+        private final long autoGeneratedIdTimestamp;
+        private final boolean isRetry;
         private boolean created;
 
-        public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime) {
+        public Index(Term uid, ParsedDocument doc, long version, VersionType versionType, Origin origin, long startTime,
+                     long autoGeneratedIdTimestamp, boolean isRetry) {
             super(uid, version, versionType, origin, startTime);
             this.doc = doc;
+            this.isRetry = isRetry;
+            this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
         }
 
         public Index(Term uid, ParsedDocument doc) {
             this(uid, doc, Versions.MATCH_ANY);
-        }
+        } // TEST ONLY
 
-        public Index(Term uid, ParsedDocument doc, long version) {
-            this(uid, doc, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
+        Index(Term uid, ParsedDocument doc, long version) {
+            this(uid, doc, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), -1, false);
         }
 
         public ParsedDocument parsedDoc() {
@@ -928,6 +930,23 @@ public abstract class Engine implements Closeable {
             return (id().length() + type().length()) * 2 + source().length() + 12;
         }
 
+        /**
+         * Returns a positive timestamp if the ID of this document is auto-generated by elasticsearch.
+         * if this property is non-negative indexing code might optimize the addition of this document
+         * due to it's append only nature.
+         */
+        public long getAutoGeneratedIdTimestamp() {
+            return autoGeneratedIdTimestamp;
+        }
+
+        /**
+         * Returns <code>true</code> if this index requests has been retried on the coordinating node and can therefor be delivered
+         * multiple times. Note: this might also be set to true if an equivalent event occurred like the replay of the transaction log
+         */
+        public boolean isRetry() {
+            return isRetry;
+        }
+
     }
 
     public static class Delete extends Operation {

+ 19 - 1
core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -89,7 +89,17 @@ public final class EngineConfig {
         }
     }, Property.IndexScope, Property.NodeScope);
 
-    private TranslogConfig translogConfig;
+    /**
+     * Configures an index to optimize documents with auto generated ids for append only. If this setting is updated from <code>false</code>
+     * to <code>true</code> might not take effect immediately. In other words, disabling the optimiation will be immediately applied while
+     * re-enabling it might not be applied until the engine is in a safe state to do so. Depending on the engine implementation a change to
+     * this setting won't be reflected re-enabled optimization until the engine is restarted or the index is closed and reopened.
+     * The default is <code>true</code>
+     */
+    public static final Setting<Boolean> INDEX_OPTIMIZE_AUTO_GENERATED_IDS = Setting.boolSetting("index.optimize_auto_generated_id", true,
+        Property.IndexScope, Property.Dynamic);
+
+    private final TranslogConfig translogConfig;
     private final OpenMode openMode;
 
     /**
@@ -311,4 +321,12 @@ public final class EngineConfig {
     public RefreshListeners getRefreshListeners() {
         return refreshListeners;
     }
+
+    /**
+     * Returns <code>true</code> iff auto generated IDs should be optimized inside the engine for append only.
+     * The default is <code>true</code>.
+     */
+    public boolean getOptimizeAutoGeneratedIds() {
+        return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS);
+    }
 }

+ 146 - 28
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -42,17 +42,21 @@ import org.apache.lucene.util.BytesRef;
 import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.InfoStream;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.lease.Releasable;
 import org.elasticsearch.common.lucene.LoggerInfoStream;
 import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.lucene.uid.Versions;
+import org.elasticsearch.common.metrics.CounterMetric;
 import org.elasticsearch.common.unit.ByteSizeValue;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.KeyedLock;
 import org.elasticsearch.common.util.concurrent.ReleasableLock;
 import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.mapper.Uid;
 import org.elasticsearch.index.merge.MergeStats;
 import org.elasticsearch.index.merge.OnGoingMerge;
@@ -73,6 +77,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Function;
@@ -115,10 +120,18 @@ public class InternalEngine extends Engine {
     private final AtomicInteger throttleRequestCount = new AtomicInteger();
     private final EngineConfig.OpenMode openMode;
     private final AtomicBoolean allowCommits = new AtomicBoolean(true);
+    private final AtomicLong maxUnsafeAutoIdTimestamp = new AtomicLong(-1);
+    private final CounterMetric numVersionLookups = new CounterMetric();
+    private final CounterMetric numIndexVersionsLookups = new CounterMetric();
 
     public InternalEngine(EngineConfig engineConfig) throws EngineException {
         super(engineConfig);
         openMode = engineConfig.getOpenMode();
+        if (engineConfig.getOptimizeAutoGeneratedIds() == false
+            || engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_5_0_0_alpha6)) {
+            // no optimization for pre 5.0.0.alpha6 since translog might not have all information needed
+            maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
+        }
         this.versionMap = new LiveVersionMap();
         store.incRef();
         IndexWriter writer = null;
@@ -407,30 +420,106 @@ public class InternalEngine extends Engine {
         }
     }
 
+    private boolean canOptimizeAddDocument(Index index) {
+        if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
+            assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
+                + index.getAutoGeneratedIdTimestamp();
+            switch (index.origin()) {
+                case PRIMARY:
+                    assert (index.version() == Versions.MATCH_ANY && index.versionType() == VersionType.INTERNAL)
+                        : "version: " + index.version() + " type: " + index.versionType();
+                    return true;
+                case PEER_RECOVERY:
+                case REPLICA:
+                    assert index.version() == 1 && index.versionType() == VersionType.EXTERNAL
+                    : "version: " + index.version() + " type: " + index.versionType();
+                    return true;
+                case LOCAL_TRANSLOG_RECOVERY:
+                    assert index.isRetry();
+                    return false; // even if retry is set we never optimize local recovery
+                default:
+                    throw new IllegalArgumentException("unknown origin " + index.origin());
+            }
+        }
+        return false;
+    }
+
     private void innerIndex(Index index) throws IOException {
         try (Releasable ignored = acquireLock(index.uid())) {
             lastWriteNanos = index.startTime();
-            final long currentVersion;
+            /* if we have an autoGeneratedID that comes into the engine we can potentially optimize
+             * and just use addDocument instead of updateDocument and skip the entire version and index lookup across the board.
+             * Yet, we have to deal with multiple document delivery, for this we use a property of the document that is added
+             * to detect if it has potentially been added before. We use the documents timestamp for this since it's something
+             * that:
+             *  - doesn't change per document
+             *  - is preserved in the transaction log
+             *  - and is assigned before we start to index / replicate
+             * NOTE: it's not important for this timestamp to be consistent across nodes etc. it's just a number that is in the common
+             * case increasing and can be used in the failure case when we retry and resent documents to establish a happens before relationship.
+             * for instance:
+             *  - doc A has autoGeneratedIdTimestamp = 10, isRetry = false
+             *  - doc B has autoGeneratedIdTimestamp = 9, isRetry = false
+             *
+             *  while both docs are in in flight, we disconnect on one node, reconnect and send doc A again
+             *  - now doc A' has autoGeneratedIdTimestamp = 10, isRetry = true
+             *
+             *  if A' arrives on the shard first we update maxUnsafeAutoIdTimestamp to 10 and use update document. All subsequent
+             *  documents that arrive (A and B) will also use updateDocument since their timestamps are less than maxUnsafeAutoIdTimestamp.
+             *  While this is not strictly needed for doc B it is just much simpler to implement since it will just de-optimize some doc in the worst case.
+             *
+             *  if A arrives on the shard first we use addDocument since maxUnsafeAutoIdTimestamp is < 10. A` will then just be skipped or calls
+             *  updateDocument.
+             */
+            long currentVersion;
             final boolean deleted;
-            final VersionValue versionValue = versionMap.getUnderLock(index.uid());
-            if (versionValue == null) {
-                currentVersion = loadCurrentVersionFromIndex(index.uid());
-                deleted = currentVersion == Versions.NOT_FOUND;
+            // if anything is fishy here ie. there is a retry we go and force updateDocument below so we are updating the document in the
+            // lucene index without checking the version map but we still do the version check
+            final boolean forceUpdateDocument;
+            if (canOptimizeAddDocument(index)) {
+                long deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
+                if (index.isRetry()) {
+                    forceUpdateDocument = true;
+                    do {
+                        deOptimizeTimestamp = maxUnsafeAutoIdTimestamp.get();
+                        if (deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp()) {
+                            break;
+                        }
+                    } while(maxUnsafeAutoIdTimestamp.compareAndSet(deOptimizeTimestamp,
+                        index.getAutoGeneratedIdTimestamp()) == false);
+                    assert maxUnsafeAutoIdTimestamp.get() >= index.getAutoGeneratedIdTimestamp();
+                } else {
+                    // in this case we force
+                    forceUpdateDocument = deOptimizeTimestamp >= index.getAutoGeneratedIdTimestamp();
+                }
+                currentVersion = Versions.NOT_FOUND;
+                deleted = true;
             } else {
-                currentVersion = checkDeletedAndGCed(versionValue);
-                deleted = versionValue.delete();
+                // update the document
+                forceUpdateDocument = false; // we don't force it - it depends on the version
+                final VersionValue versionValue = versionMap.getUnderLock(index.uid());
+                assert incrementVersionLookup();
+                if (versionValue == null) {
+                    currentVersion = loadCurrentVersionFromIndex(index.uid());
+                    deleted = currentVersion == Versions.NOT_FOUND;
+                } else {
+                    currentVersion = checkDeletedAndGCed(versionValue);
+                    deleted = versionValue.delete();
+                }
             }
-
             final long expectedVersion = index.version();
             if (checkVersionConflict(index, currentVersion, expectedVersion, deleted)) {
                 index.setCreated(false);
                 return;
             }
-
             final long updatedVersion = updateVersion(index, currentVersion, expectedVersion);
-
-            indexOrUpdate(index, currentVersion, versionValue);
-
+            index.setCreated(deleted);
+            if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
+                // document does not exists, we can optimize for create
+                index(index, indexWriter);
+            } else {
+                update(index, indexWriter);
+            }
             maybeAddToTranslog(index, updatedVersion, Translog.Index::new, NEW_VERSION_VALUE);
         }
     }
@@ -441,16 +530,6 @@ public class InternalEngine extends Engine {
         return updatedVersion;
     }
 
-    private void indexOrUpdate(final Index index, final long currentVersion, final VersionValue versionValue) throws IOException {
-        if (currentVersion == Versions.NOT_FOUND) {
-            // document does not exists, we can optimize for create
-            index.setCreated(true);
-            index(index, indexWriter);
-        } else {
-            update(index, versionValue, indexWriter);
-        }
-    }
-
     private static void index(final Index index, final IndexWriter indexWriter) throws IOException {
         if (index.docs().size() > 1) {
             indexWriter.addDocuments(index.docs());
@@ -459,12 +538,7 @@ public class InternalEngine extends Engine {
         }
     }
 
-    private static void update(final Index index, final VersionValue versionValue, final IndexWriter indexWriter) throws IOException {
-        if (versionValue != null) {
-            index.setCreated(versionValue.delete()); // we have a delete which is not GC'ed...
-        } else {
-            index.setCreated(false);
-        }
+    private static void update(final Index index, final IndexWriter indexWriter) throws IOException {
         if (index.docs().size() > 1) {
             indexWriter.updateDocuments(index.uid(), index.docs());
         } else {
@@ -504,6 +578,7 @@ public class InternalEngine extends Engine {
             final long currentVersion;
             final boolean deleted;
             final VersionValue versionValue = versionMap.getUnderLock(delete.uid());
+            assert incrementVersionLookup();
             if (versionValue == null) {
                 currentVersion = loadCurrentVersionFromIndex(delete.uid());
                 deleted = currentVersion == Versions.NOT_FOUND;
@@ -914,6 +989,7 @@ public class InternalEngine extends Engine {
     protected final void writerSegmentStats(SegmentsStats stats) {
         stats.addVersionMapMemoryInBytes(versionMap.ramBytesUsed());
         stats.addIndexWriterMemoryInBytes(indexWriter.ramBytesUsed());
+        stats.updateMaxUnsafeAutoIdTimestamp(maxUnsafeAutoIdTimestamp.get());
     }
 
     @Override
@@ -996,6 +1072,7 @@ public class InternalEngine extends Engine {
     }
 
     private long loadCurrentVersionFromIndex(Term uid) throws IOException {
+        assert incrementIndexVersionLookup();
         try (final Searcher searcher = acquireSearcher("load_version")) {
             return Versions.loadVersion(searcher.reader(), uid);
         }
@@ -1222,6 +1299,12 @@ public class InternalEngine extends Engine {
         mergeScheduler.refreshConfig();
         // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
         maybePruneDeletedTombstones();
+        if (engineConfig.getOptimizeAutoGeneratedIds() == false) {
+            // this is an anti-viral settings you can only opt out for the entire index
+            // only if a shard starts up again due to relocation or if the index is closed
+            // the setting will be re-interpreted if it's set to true
+            this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
+        }
     }
 
     public MergeStats getMergeStats() {
@@ -1234,4 +1317,39 @@ public class InternalEngine extends Engine {
         final int maxDoc = indexWriter.maxDoc();
         return new DocsStats(numDocs, maxDoc-numDocs);
     }
+
+
+    /**
+     * Returns the number of times a version was looked up either from the index.
+     * Note this is only available if assertions are enabled
+     */
+    long getNumIndexVersionsLookups() { // for testing
+        return numIndexVersionsLookups.count();
+    }
+
+    /**
+     * Returns the number of times a version was looked up either from memory or from the index.
+     * Note this is only available if assertions are enabled
+     */
+    long getNumVersionLookups() { // for testing
+        return numVersionLookups.count();
+    }
+
+    private boolean incrementVersionLookup() { // only used by asserts
+        numVersionLookups.inc();
+        return true;
+    }
+
+    private boolean incrementIndexVersionLookup() {
+        numIndexVersionsLookups.inc();
+        return true;
+    }
+
+    /**
+     * Returns <code>true</code> iff the index writer has any deletions either buffered in memory or
+     * in the index.
+     */
+    boolean indexWriterHasDeletions() {
+        return indexWriter.hasDeletions();
+    }
 }

+ 18 - 0
core/src/main/java/org/elasticsearch/index/engine/SegmentsStats.java

@@ -44,6 +44,7 @@ public class SegmentsStats implements Streamable, ToXContent {
     private long docValuesMemoryInBytes;
     private long indexWriterMemoryInBytes;
     private long versionMapMemoryInBytes;
+    private long maxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
     private long bitsetMemoryInBytes;
     private ImmutableOpenMap<String, Long> fileSizes = ImmutableOpenMap.of();
 
@@ -114,6 +115,10 @@ public class SegmentsStats implements Streamable, ToXContent {
         this.versionMapMemoryInBytes += versionMapMemoryInBytes;
     }
 
+    void updateMaxUnsafeAutoIdTimestamp(long maxUnsafeAutoIdTimestamp) {
+        this.maxUnsafeAutoIdTimestamp = Math.max(maxUnsafeAutoIdTimestamp, this.maxUnsafeAutoIdTimestamp);
+    }
+
     public void addBitsetMemoryInBytes(long bitsetMemoryInBytes) {
         this.bitsetMemoryInBytes += bitsetMemoryInBytes;
     }
@@ -138,6 +143,7 @@ public class SegmentsStats implements Streamable, ToXContent {
         if (mergeStats == null) {
             return;
         }
+        updateMaxUnsafeAutoIdTimestamp(mergeStats.maxUnsafeAutoIdTimestamp);
         add(mergeStats.count, mergeStats.memoryInBytes);
         addTermsMemoryInBytes(mergeStats.termsMemoryInBytes);
         addStoredFieldsMemoryInBytes(mergeStats.storedFieldsMemoryInBytes);
@@ -272,6 +278,14 @@ public class SegmentsStats implements Streamable, ToXContent {
         return fileSizes;
     }
 
+    /**
+     * Returns the max timestamp that is used to de-optimize documetns with auto-generated IDs in the engine.
+     * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
+     */
+    public long getMaxUnsafeAutoIdTimestamp() {
+        return maxUnsafeAutoIdTimestamp;
+    }
+
     public static SegmentsStats readSegmentsStats(StreamInput in) throws IOException {
         SegmentsStats stats = new SegmentsStats();
         stats.readFrom(in);
@@ -292,6 +306,7 @@ public class SegmentsStats implements Streamable, ToXContent {
         builder.byteSizeField(Fields.INDEX_WRITER_MEMORY_IN_BYTES, Fields.INDEX_WRITER_MEMORY, indexWriterMemoryInBytes);
         builder.byteSizeField(Fields.VERSION_MAP_MEMORY_IN_BYTES, Fields.VERSION_MAP_MEMORY, versionMapMemoryInBytes);
         builder.byteSizeField(Fields.FIXED_BIT_SET_MEMORY_IN_BYTES, Fields.FIXED_BIT_SET, bitsetMemoryInBytes);
+        builder.field(Fields.MAX_UNSAFE_AUTO_ID_TIMESTAMP, maxUnsafeAutoIdTimestamp);
         builder.startObject(Fields.FILE_SIZES);
         for (Iterator<ObjectObjectCursor<String, Long>> it = fileSizes.iterator(); it.hasNext();) {
             ObjectObjectCursor<String, Long> entry = it.next();
@@ -326,6 +341,7 @@ public class SegmentsStats implements Streamable, ToXContent {
         static final String INDEX_WRITER_MEMORY_IN_BYTES = "index_writer_memory_in_bytes";
         static final String VERSION_MAP_MEMORY = "version_map_memory";
         static final String VERSION_MAP_MEMORY_IN_BYTES = "version_map_memory_in_bytes";
+        static final String MAX_UNSAFE_AUTO_ID_TIMESTAMP = "max_unsafe_auto_id_timestamp";
         static final String FIXED_BIT_SET = "fixed_bit_set";
         static final String FIXED_BIT_SET_MEMORY_IN_BYTES = "fixed_bit_set_memory_in_bytes";
         static final String FILE_SIZES = "file_sizes";
@@ -347,6 +363,7 @@ public class SegmentsStats implements Streamable, ToXContent {
         indexWriterMemoryInBytes = in.readLong();
         versionMapMemoryInBytes = in.readLong();
         bitsetMemoryInBytes = in.readLong();
+        maxUnsafeAutoIdTimestamp = in.readLong();
 
         int size = in.readVInt();
         ImmutableOpenMap.Builder<String, Long> map = ImmutableOpenMap.builder(size);
@@ -371,6 +388,7 @@ public class SegmentsStats implements Streamable, ToXContent {
         out.writeLong(indexWriterMemoryInBytes);
         out.writeLong(versionMapMemoryInBytes);
         out.writeLong(bitsetMemoryInBytes);
+        out.writeLong(maxUnsafeAutoIdTimestamp);
 
         out.writeVInt(fileSizes.size());
         for (Iterator<ObjectObjectCursor<String, Long>> it = fileSizes.iterator(); it.hasNext();) {

+ 2 - 1
core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java

@@ -50,7 +50,8 @@ public class ParsedDocument {
 
     private String parent;
 
-    public ParsedDocument(Field version, String id, String type, String routing, long timestamp, long ttl, List<Document> documents, BytesReference source, Mapping dynamicMappingsUpdate) {
+    public ParsedDocument(Field version, String id, String type, String routing, long timestamp, long ttl, List<Document> documents,
+                          BytesReference source, Mapping dynamicMappingsUpdate) {
         this.version = version;
         this.id = id;
         this.type = type;

+ 11 - 7
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -139,7 +139,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -492,27 +491,32 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return previousState;
     }
 
-    public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType) {
+    public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
+                                              boolean isRetry) {
         try {
             verifyPrimary();
-            return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY);
+            return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.PRIMARY,
+                autoGeneratedIdTimestamp, isRetry);
         } catch (Exception e) {
             verifyNotClosed(e);
             throw e;
         }
     }
 
-    public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType) {
+    public Engine.Index prepareIndexOnReplica(SourceToParse source, long version, VersionType versionType, long autoGeneratedIdTimestamp,
+                                              boolean isRetry) {
         try {
             verifyReplicationTarget();
-            return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA);
+            return prepareIndex(docMapper(source.type()), source, version, versionType, Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp,
+                isRetry);
         } catch (Exception e) {
             verifyNotClosed(e);
             throw e;
         }
     }
 
-    static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType, Engine.Operation.Origin origin) {
+    static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long version, VersionType versionType,
+                                     Engine.Operation.Origin origin, long autoGeneratedIdTimestamp, boolean isRetry) {
         long startTime = System.nanoTime();
         ParsedDocument doc = docMapper.getDocumentMapper().parse(source);
         if (docMapper.getMapping() != null) {
@@ -521,7 +525,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         MappedFieldType uidFieldType = docMapper.getDocumentMapper().uidMapper().fieldType();
         Query uidQuery = uidFieldType.termQuery(doc.uid(), null);
         Term uid = MappedFieldType.extractTerm(uidQuery);
-        return new Engine.Index(uid, doc, version, versionType, origin, startTime);
+        return new Engine.Index(uid, doc, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry);
     }
 
     public void index(Engine.Index index) {

+ 4 - 1
core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java

@@ -147,13 +147,16 @@ public class TranslogRecoveryPerformer {
      *                            is encountered.
      */
     private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) {
+
         try {
             switch (operation.opType()) {
                 case INDEX:
                     Translog.Index index = (Translog.Index) operation;
+                    // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
+                    // autoGeneratedID docs that are coming from the primary are updated correctly.
                     Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), source(shardId.getIndexName(), index.type(), index.id(), index.source())
                             .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
-                        index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin);
+                        index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true);
                     maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
                     if (logger.isTraceEnabled()) {
                         logger.trace("[translog] recover [index] op of [{}][{}]", index.type(), index.id());

+ 20 - 2
core/src/main/java/org/elasticsearch/index/translog/Translog.java

@@ -26,6 +26,7 @@ import org.apache.lucene.index.TwoPhaseCommit;
 import org.apache.lucene.store.AlreadyClosedException;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.UUIDs;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -830,8 +831,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
     }
 
     public static class Index implements Operation {
-        public static final int SERIALIZATION_FORMAT = 6; // since 2.0-beta1 and 1.1
+        public static final int FORMAT_2x = 6; // since 2.0-beta1 and 1.1
+        public static final int FORMAT_AUTO_GENERATED_IDS = 7; // since 5.0.0-beta1
+        public static final int SERIALIZATION_FORMAT = FORMAT_AUTO_GENERATED_IDS;
         private final String id;
+        private final long autoGeneratedIdTimestamp;
         private final String type;
         private final long version;
         private final VersionType versionType;
@@ -843,7 +847,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
 
         public Index(StreamInput in) throws IOException {
             final int format = in.readVInt(); // SERIALIZATION_FORMAT
-            assert format == SERIALIZATION_FORMAT : "format was: " + format;
+            assert format >= FORMAT_2x : "format was: " + format;
             id = in.readString();
             type = in.readString();
             source = in.readBytesReference();
@@ -854,6 +858,11 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             this.ttl = in.readLong();
             this.versionType = VersionType.fromValue(in.readByte());
             assert versionType.validateVersionForWrites(this.version);
+            if (format >= FORMAT_AUTO_GENERATED_IDS) {
+                this.autoGeneratedIdTimestamp = in.readLong();
+            } else {
+                this.autoGeneratedIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
+            }
         }
 
         public Index(Engine.Index index) {
@@ -866,6 +875,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             this.timestamp = index.timestamp();
             this.ttl = index.ttl();
             this.versionType = index.versionType();
+            this.autoGeneratedIdTimestamp = index.getAutoGeneratedIdTimestamp();
         }
 
         public Index(String type, String id, byte[] source) {
@@ -878,6 +888,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             parent = null;
             timestamp = 0;
             ttl = 0;
+            autoGeneratedIdTimestamp = -1;
         }
 
         @Override
@@ -943,6 +954,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             out.writeLong(timestamp);
             out.writeLong(ttl);
             out.writeByte(versionType.getValue());
+            out.writeLong(autoGeneratedIdTimestamp);
         }
 
         @Override
@@ -962,6 +974,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                     id.equals(index.id) == false ||
                     type.equals(index.type) == false ||
                     versionType != index.versionType ||
+                    autoGeneratedIdTimestamp != index.autoGeneratedIdTimestamp ||
                     source.equals(index.source) == false) {
                     return false;
             }
@@ -982,6 +995,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
             result = 31 * result + (routing != null ? routing.hashCode() : 0);
             result = 31 * result + (parent != null ? parent.hashCode() : 0);
             result = 31 * result + Long.hashCode(timestamp);
+            result = 31 * result + Long.hashCode(autoGeneratedIdTimestamp);
             result = 31 * result + Long.hashCode(ttl);
             return result;
         }
@@ -993,6 +1007,10 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
                     ", type='" + type + '\'' +
                     '}';
         }
+
+        public long getAutoGeneratedIdTimestamp() {
+            return autoGeneratedIdTimestamp;
+        }
     }
 
     public static class Delete implements Operation {

+ 9 - 0
core/src/test/java/org/elasticsearch/action/index/IndexRequestTests.java

@@ -141,4 +141,13 @@ public class IndexRequestTests extends ESTestCase {
         // test negative shard count value not allowed
         expectThrows(IllegalArgumentException.class, () -> request.waitForActiveShards(ActiveShardCount.from(randomIntBetween(-10, -1))));
     }
+
+    public void testAutoGenIdTimestampIsSet() {
+        IndexRequest request = new IndexRequest("index", "type");
+        request.process(null, true, "index");
+        assertTrue("expected > 0 but got: " + request.getAutoGeneratedTimestamp(), request.getAutoGeneratedTimestamp() > 0);
+        request = new IndexRequest("index", "type", "1");
+        request.process(null, true, "index");
+        assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, request.getAutoGeneratedTimestamp());
+    }
 }

+ 22 - 1
core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java

@@ -169,12 +169,14 @@ public class TransportReplicationActionTests extends ESTestCase {
         reroutePhase.run();
         assertListenerThrows("failed to timeout on retryable block", listener, ClusterBlockException.class);
         assertPhase(task, "failed");
+        assertFalse(request.isRetrySet.get());
 
         listener = new PlainActionFuture<>();
-        reroutePhase = action.new ReroutePhase(task, new Request(), listener);
+        reroutePhase = action.new ReroutePhase(task, request = new Request(), listener);
         reroutePhase.run();
         assertFalse("primary phase should wait on retryable block", listener.isDone());
         assertPhase(task, "waiting_for_retry");
+        assertTrue(request.isRetrySet.get());
 
         block = ClusterBlocks.builder()
             .addGlobalBlock(new ClusterBlock(1, "non retryable", false, true, RestStatus.SERVICE_UNAVAILABLE, ClusterBlockLevel.ALL));
@@ -204,6 +206,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         reroutePhase.run();
         assertListenerThrows("unassigned primary didn't cause a timeout", listener, UnavailableShardsException.class);
         assertPhase(task, "failed");
+        assertTrue(request.isRetrySet.get());
 
         request = new Request(shardId);
         listener = new PlainActionFuture<>();
@@ -211,6 +214,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         reroutePhase.run();
         assertFalse("unassigned primary didn't cause a retry", listener.isDone());
         assertPhase(task, "waiting_for_retry");
+        assertTrue(request.isRetrySet.get());
 
         setState(clusterService, state(index, true, ShardRoutingState.STARTED));
         logger.debug("--> primary assigned state:\n{}", clusterService.state().prettyPrint());
@@ -249,12 +253,14 @@ public class TransportReplicationActionTests extends ESTestCase {
         Action.ReroutePhase reroutePhase = action.new ReroutePhase(null, request, listener);
         reroutePhase.run();
         assertListenerThrows("cluster state too old didn't cause a timeout", listener, UnavailableShardsException.class);
+        assertTrue(request.isRetrySet.compareAndSet(true, false));
 
         request = new Request(shardId).routedBasedOnClusterVersion(clusterService.state().version() + 1);
         listener = new PlainActionFuture<>();
         reroutePhase = action.new ReroutePhase(null, request, listener);
         reroutePhase.run();
         assertFalse("cluster state too old didn't cause a retry", listener.isDone());
+        assertTrue(request.isRetrySet.get());
 
         // finish relocation
         ShardRouting relocationTarget = clusterService.state().getRoutingTable().shardRoutingTable(shardId)
@@ -290,11 +296,14 @@ public class TransportReplicationActionTests extends ESTestCase {
         reroutePhase.run();
         assertListenerThrows("must throw index not found exception", listener, IndexNotFoundException.class);
         assertPhase(task, "failed");
+        assertTrue(request.isRetrySet.get());
         request = new Request(new ShardId(index, "_na_", 10)).timeout("1ms");
         listener = new PlainActionFuture<>();
         reroutePhase = action.new ReroutePhase(null, request, listener);
         reroutePhase.run();
         assertListenerThrows("must throw shard not found exception", listener, ShardNotFoundException.class);
+        assertFalse(request.isRetrySet.get()); //TODO I'd have expected this to be true but we fail too early?
+
     }
 
     public void testStalePrimaryShardOnReroute() throws InterruptedException {
@@ -319,6 +328,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         assertThat(capturedRequests, arrayWithSize(1));
         assertThat(capturedRequests[0].action, equalTo("testAction[p]"));
         assertPhase(task, "waiting_on_primary");
+        assertFalse(request.isRetrySet.get());
         transport.handleRemoteError(capturedRequests[0].requestId, randomRetryPrimaryException(shardId));
 
 
@@ -380,6 +390,7 @@ public class TransportReplicationActionTests extends ESTestCase {
             assertThat(capturedRequests.get(0).action, equalTo("testAction"));
             assertPhase(task, "rerouted");
         }
+        assertFalse(request.isRetrySet.get());
         assertIndexShardUninitialized();
     }
 
@@ -419,6 +430,7 @@ public class TransportReplicationActionTests extends ESTestCase {
             assertTrue(listener.isDone());
             listener.get();
             assertPhase(task, "finished");
+            assertFalse(request.isRetrySet.get());
         } else {
             assertFalse(executed.get());
             assertIndexShardCounter(0);  // it should have been freed.
@@ -432,6 +444,7 @@ public class TransportReplicationActionTests extends ESTestCase {
             assertTrue(listener.isDone());
             listener.get();
             assertPhase(task, "finished");
+            assertFalse(request.isRetrySet.get());
         }
     }
 
@@ -463,6 +476,7 @@ public class TransportReplicationActionTests extends ESTestCase {
         }.run();
         assertThat(executed.get(), equalTo(true));
         assertPhase(task, "finished");
+        assertFalse(request.isRetrySet.get());
     }
 
     public void testPrimaryReference() throws Exception {
@@ -745,6 +759,7 @@ public class TransportReplicationActionTests extends ESTestCase {
     public static class Request extends ReplicationRequest<Request> {
         public AtomicBoolean processedOnPrimary = new AtomicBoolean();
         public AtomicInteger processedOnReplicas = new AtomicInteger();
+        public AtomicBoolean isRetrySet = new AtomicBoolean(false);
 
         public Request() {
         }
@@ -766,6 +781,12 @@ public class TransportReplicationActionTests extends ESTestCase {
         public void readFrom(StreamInput in) throws IOException {
             super.readFrom(in);
         }
+
+        @Override
+        public void onRetry() {
+            super.onRetry();
+            isRetrySet.set(true);
+        }
     }
 
     static class Response extends ReplicationResponse {

+ 1 - 1
core/src/test/java/org/elasticsearch/aliases/IndexAliasesIT.java

@@ -807,7 +807,7 @@ public class IndexAliasesIT extends ESIntegTestCase {
 
         final int numDocs = scaledRandomIntBetween(5, 52);
         for (int i = 1; i <= numDocs; i++) {
-            client().prepareIndex("my-index", "my-type").setCreate(true).setSource("timestamp", "2016-12-12").get();
+            client().prepareIndex("my-index", "my-type").setSource("timestamp", "2016-12-12").get();
             if (i % 2 == 0) {
                 refresh();
                 SearchResponse response = client().prepareSearch("filter1").get();

+ 272 - 45
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -98,11 +98,9 @@ import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogConfig;
 import org.elasticsearch.indices.IndicesModule;
 import org.elasticsearch.indices.mapper.MapperRegistry;
-import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.test.DummyShardLock;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.IndexSettingsModule;
-import org.elasticsearch.test.InternalSettingsPlugin;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 import org.hamcrest.MatcherAssert;
@@ -118,7 +116,6 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Base64;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
@@ -129,6 +126,7 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.IntFunction;
 
 import static java.util.Collections.emptyMap;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
@@ -615,7 +613,7 @@ public class InternalEngineTests extends ESTestCase {
             for (int i = 0; i < ops; i++) {
                 final ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
                 if (randomBoolean()) {
-                    final Engine.Index operation = new Engine.Index(newUid("test#1"), doc, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime());
+                    final Engine.Index operation = new Engine.Index(newUid("test#1"), doc, i, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false);
                     operations.add(operation);
                     initialEngine.index(operation);
                 } else {
@@ -946,7 +944,7 @@ public class InternalEngineTests extends ESTestCase {
                 engine.flush();
                 final boolean forceMergeFlushes = randomBoolean();
                 if (forceMergeFlushes) {
-                    engine.index(new Engine.Index(newUid("3"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime() - engine.engineConfig.getFlushMergesAfter().nanos()));
+                    engine.index(new Engine.Index(newUid("3"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime() - engine.engineConfig.getFlushMergesAfter().nanos(), -1, false));
                 } else {
                     engine.index(new Engine.Index(newUid("3"), doc));
                 }
@@ -1033,7 +1031,7 @@ public class InternalEngineTests extends ESTestCase {
         engine.index(create);
         assertThat(create.version(), equalTo(1L));
 
-        create = new Engine.Index(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
+        create = new Engine.Index(newUid("1"), doc, create.version(), create.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
         replicaEngine.index(create);
         assertThat(create.version(), equalTo(1L));
     }
@@ -1044,18 +1042,18 @@ public class InternalEngineTests extends ESTestCase {
         engine.index(index);
         assertThat(index.version(), equalTo(1L));
 
-        index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
+        index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
         replicaEngine.index(index);
         assertThat(index.version(), equalTo(1L));
     }
 
     public void testExternalVersioningNewIndex() {
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
-        Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0);
+        Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
         engine.index(index);
         assertThat(index.version(), equalTo(12L));
 
-        index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0);
+        index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
         replicaEngine.index(index);
         assertThat(index.version(), equalTo(12L));
     }
@@ -1070,7 +1068,7 @@ public class InternalEngineTests extends ESTestCase {
         engine.index(index);
         assertThat(index.version(), equalTo(2L));
 
-        index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, 0, -1, false);
         try {
             engine.index(index);
             fail();
@@ -1079,7 +1077,7 @@ public class InternalEngineTests extends ESTestCase {
         }
 
         // future versions should not work as well
-        index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(index);
             fail();
@@ -1090,15 +1088,15 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testExternalVersioningIndexConflict() {
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
-        Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0);
+        Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
         engine.index(index);
         assertThat(index.version(), equalTo(12L));
 
-        index = new Engine.Index(newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
         engine.index(index);
         assertThat(index.version(), equalTo(14L));
 
-        index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(index);
             fail();
@@ -1119,7 +1117,7 @@ public class InternalEngineTests extends ESTestCase {
 
         engine.flush();
 
-        index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(index);
             fail();
@@ -1128,7 +1126,7 @@ public class InternalEngineTests extends ESTestCase {
         }
 
         // future versions should not work as well
-        index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 3L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(index);
             fail();
@@ -1139,17 +1137,17 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testExternalVersioningIndexConflictWithFlush() {
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
-        Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0);
+        Engine.Index index = new Engine.Index(newUid("1"), doc, 12, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
         engine.index(index);
         assertThat(index.version(), equalTo(12L));
 
-        index = new Engine.Index(newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 14, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
         engine.index(index);
         assertThat(index.version(), equalTo(14L));
 
         engine.flush();
 
-        index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 13, VersionType.EXTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(index);
             fail();
@@ -1286,7 +1284,7 @@ public class InternalEngineTests extends ESTestCase {
         assertThat(delete.version(), equalTo(3L));
 
         // now check if we can index to a delete doc with version
-        index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(index);
             fail();
@@ -1295,7 +1293,7 @@ public class InternalEngineTests extends ESTestCase {
         }
 
         // we shouldn't be able to create as well
-        Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
+        Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(create);
         } catch (VersionConflictEngineException e) {
@@ -1342,7 +1340,7 @@ public class InternalEngineTests extends ESTestCase {
         engine.flush();
 
         // now check if we can index to a delete doc with version
-        index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0);
+        index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(index);
             fail();
@@ -1351,7 +1349,7 @@ public class InternalEngineTests extends ESTestCase {
         }
 
         // we shouldn't be able to create as well
-        Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
+        Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(create);
         } catch (VersionConflictEngineException e) {
@@ -1361,11 +1359,11 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testVersioningCreateExistsException() {
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
-        Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
+        Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         engine.index(create);
         assertThat(create.version(), equalTo(1L));
 
-        create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
+        create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(create);
             fail();
@@ -1376,13 +1374,13 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testVersioningCreateExistsExceptionWithFlush() {
         ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocument(), B_1, null);
-        Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
+        Engine.Index create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         engine.index(create);
         assertThat(create.version(), equalTo(1L));
 
         engine.flush();
 
-        create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0);
+        create = new Engine.Index(newUid("1"), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, 0, -1, false);
         try {
             engine.index(create);
             fail();
@@ -1402,12 +1400,12 @@ public class InternalEngineTests extends ESTestCase {
         assertThat(index.version(), equalTo(2L));
 
         // apply the second index to the replica, should work fine
-        index = new Engine.Index(newUid("1"), doc, index.version(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
+        index = new Engine.Index(newUid("1"), doc, index.version(), VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA,  0, -1, false);
         replicaEngine.index(index);
         assertThat(index.version(), equalTo(2L));
 
         // now, the old one should not work
-        index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
+        index = new Engine.Index(newUid("1"), doc, 1L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
         try {
             replicaEngine.index(index);
             fail();
@@ -1418,7 +1416,7 @@ public class InternalEngineTests extends ESTestCase {
         // second version on replica should fail as well
         try {
             index = new Engine.Index(newUid("1"), doc, 2L
-                    , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
+                    , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
             replicaEngine.index(index);
             assertThat(index.version(), equalTo(2L));
         } catch (VersionConflictEngineException e) {
@@ -1434,7 +1432,7 @@ public class InternalEngineTests extends ESTestCase {
 
         // apply the first index to the replica, should work fine
         index = new Engine.Index(newUid("1"), doc, 1L
-                , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
+                , VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
         replicaEngine.index(index);
         assertThat(index.version(), equalTo(1L));
 
@@ -1466,7 +1464,7 @@ public class InternalEngineTests extends ESTestCase {
 
         // now do the second index on the replica, it should fail
         try {
-            index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0);
+            index = new Engine.Index(newUid("1"), doc, 2L, VersionType.INTERNAL.versionTypeForReplicationAndRecovery(), REPLICA, 0, -1, false);
             replicaEngine.index(index);
             fail("excepted VersionConflictEngineException to be thrown");
         } catch (VersionConflictEngineException e) {
@@ -1612,7 +1610,7 @@ public class InternalEngineTests extends ESTestCase {
             document.add(new TextField("value", "test1", Field.Store.YES));
 
             ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, document, B_2, null);
-            engine.index(new Engine.Index(newUid("1"), doc, 1, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
+            engine.index(new Engine.Index(newUid("1"), doc, 1, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false));
 
             // Delete document we just added:
             engine.delete(new Engine.Delete("test", "1", newUid("1"), 10, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), false));
@@ -1637,7 +1635,7 @@ public class InternalEngineTests extends ESTestCase {
 
             // Try to index uid=1 with a too-old version, should fail:
             try {
-                engine.index(new Engine.Index(newUid("1"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
+                engine.index(new Engine.Index(newUid("1"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false));
                 fail("did not hit expected exception");
             } catch (VersionConflictEngineException vcee) {
                 // expected
@@ -1649,7 +1647,7 @@ public class InternalEngineTests extends ESTestCase {
 
             // Try to index uid=2 with a too-old version, should fail:
             try {
-                engine.index(new Engine.Index(newUid("2"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime()));
+                engine.index(new Engine.Index(newUid("2"), doc, 2, VersionType.EXTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), -1, false));
                 fail("did not hit expected exception");
             } catch (VersionConflictEngineException vcee) {
                 // expected
@@ -1748,7 +1746,7 @@ public class InternalEngineTests extends ESTestCase {
         final int numDocs = randomIntBetween(1, 10);
         for (int i = 0; i < numDocs; i++) {
             ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
             engine.index(firstIndexRequest);
             assertThat(firstIndexRequest.version(), equalTo(1L));
         }
@@ -1798,7 +1796,7 @@ public class InternalEngineTests extends ESTestCase {
         final int numDocs = randomIntBetween(1, 10);
         for (int i = 0; i < numDocs; i++) {
             ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
             engine.index(firstIndexRequest);
             assertThat(firstIndexRequest.version(), equalTo(1L));
         }
@@ -1888,7 +1886,7 @@ public class InternalEngineTests extends ESTestCase {
                 final int numExtraDocs = randomIntBetween(1, 10);
                 for (int i = 0; i < numExtraDocs; i++) {
                     ParsedDocument doc = testParsedDocument("extra" + Integer.toString(i), "extra" + Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-                    Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+                    Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
                     engine.index(firstIndexRequest);
                     assertThat(firstIndexRequest.version(), equalTo(1L));
                 }
@@ -1917,7 +1915,7 @@ public class InternalEngineTests extends ESTestCase {
         final int numDocs = randomIntBetween(1, 10);
         for (int i = 0; i < numDocs; i++) {
             ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
             engine.index(firstIndexRequest);
             assertThat(firstIndexRequest.version(), equalTo(1L));
         }
@@ -1960,7 +1958,7 @@ public class InternalEngineTests extends ESTestCase {
         int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
         String uuidValue = "test#" + Integer.toString(randomId);
         ParsedDocument doc = testParsedDocument(uuidValue, Integer.toString(randomId), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-        Engine.Index firstIndexRequest = new Engine.Index(newUid(uuidValue), doc, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime());
+        Engine.Index firstIndexRequest = new Engine.Index(newUid(uuidValue), doc, 1, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false);
         engine.index(firstIndexRequest);
         assertThat(firstIndexRequest.version(), equalTo(1L));
         if (flush) {
@@ -1968,7 +1966,7 @@ public class InternalEngineTests extends ESTestCase {
         }
 
         doc = testParsedDocument(uuidValue, Integer.toString(randomId), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-        Engine.Index idxRequest = new Engine.Index(newUid(uuidValue), doc, 2, VersionType.EXTERNAL, PRIMARY, System.nanoTime());
+        Engine.Index idxRequest = new Engine.Index(newUid(uuidValue), doc, 2, VersionType.EXTERNAL, PRIMARY, System.nanoTime(), -1, false);
         engine.index(idxRequest);
         engine.refresh("test");
         assertThat(idxRequest.version(), equalTo(2L));
@@ -2034,7 +2032,7 @@ public class InternalEngineTests extends ESTestCase {
         final int numDocs = randomIntBetween(1, 10);
         for (int i = 0; i < numDocs; i++) {
             ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
             engine.index(firstIndexRequest);
             assertThat(firstIndexRequest.version(), equalTo(1L));
         }
@@ -2120,7 +2118,7 @@ public class InternalEngineTests extends ESTestCase {
             // create
             {
                 ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-                Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+                Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_DELETED, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
 
                 try (InternalEngine engine = new InternalEngine(copy(config, EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG))){
                     engine.index(firstIndexRequest);
@@ -2180,7 +2178,7 @@ public class InternalEngineTests extends ESTestCase {
         final int numDocs = randomIntBetween(2, 10); // at least 2 documents otherwise we don't see any deletes below
         for (int i = 0; i < numDocs; i++) {
             ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
             engine.index(firstIndexRequest);
             assertThat(firstIndexRequest.version(), equalTo(1L));
         }
@@ -2190,7 +2188,7 @@ public class InternalEngineTests extends ESTestCase {
         engine.forceMerge(randomBoolean(), 1, false, false, false);
 
         ParsedDocument doc = testParsedDocument(Integer.toString(0), Integer.toString(0), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-        Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+        Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(0)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
         engine.index(firstIndexRequest);
         assertThat(firstIndexRequest.version(), equalTo(2L));
         engine.flush(); // flush - buffered deletes are not counted
@@ -2202,4 +2200,233 @@ public class InternalEngineTests extends ESTestCase {
         assertEquals(0, docStats.getDeleted());
         assertEquals(numDocs, docStats.getCount());
     }
+
+    public void testDoubleDelivery() throws IOException {
+        final ParsedDocument doc = testParsedDocument("1", "1", "test", null, 100, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
+        Engine.Index operation = randomAppendOnly(1, doc, false);
+        Engine.Index retry = randomAppendOnly(1, doc, true);
+        if (randomBoolean()) {
+            engine.index(operation);
+            assertFalse(engine.indexWriterHasDeletions());
+            assertEquals(0, engine.getNumVersionLookups());
+            assertNotNull(operation.getTranslogLocation());
+            engine.index(retry);
+            assertTrue(engine.indexWriterHasDeletions());
+            assertEquals(0, engine.getNumVersionLookups());
+            assertNotNull(retry.getTranslogLocation());
+            assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) > 0);
+        } else {
+            engine.index(retry);
+            assertTrue(engine.indexWriterHasDeletions());
+            assertEquals(0, engine.getNumVersionLookups());
+            assertNotNull(retry.getTranslogLocation());
+            engine.index(operation);
+            assertTrue(engine.indexWriterHasDeletions());
+            assertEquals(0, engine.getNumVersionLookups());
+            assertNotNull(retry.getTranslogLocation());
+            assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) < 0);
+        }
+
+        engine.refresh("test");
+        try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
+            TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
+            assertEquals(1, topDocs.totalHits);
+        }
+        operation = randomAppendOnly(1, doc, false);
+        retry = randomAppendOnly(1, doc, true);
+        if (randomBoolean()) {
+            engine.index(operation);
+            assertNotNull(operation.getTranslogLocation());
+            engine.index(retry);
+            assertNotNull(retry.getTranslogLocation());
+            assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) > 0);
+        } else {
+            engine.index(retry);
+            assertNotNull(retry.getTranslogLocation());
+            engine.index(operation);
+            assertNotNull(retry.getTranslogLocation());
+            assertTrue(retry.getTranslogLocation().compareTo(operation.getTranslogLocation()) < 0);
+        }
+
+        engine.refresh("test");
+        try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
+            TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
+            assertEquals(1, topDocs.totalHits);
+        }
+    }
+
+
+    public void testRetryWithAutogeneratedIdWorksAndNoDuplicateDocs() throws IOException {
+
+        final ParsedDocument doc = testParsedDocument("1", "1", "test", null, 100, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
+        boolean isRetry = false;
+        long autoGeneratedIdTimestamp = 0;
+
+        Engine.Index index = new Engine.Index(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
+        engine.index(index);
+        assertThat(index.version(), equalTo(1L));
+
+        index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
+        replicaEngine.index(index);
+        assertThat(index.version(), equalTo(1L));
+
+        isRetry = true;
+        index = new Engine.Index(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
+        engine.index(index);
+        assertThat(index.version(), equalTo(1L));
+        engine.refresh("test");
+        try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
+            TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
+            assertEquals(1, topDocs.totalHits);
+        }
+
+        index = new Engine.Index(newUid("1"), doc, index.version(), index.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
+        replicaEngine.index(index);
+        replicaEngine.refresh("test");
+        try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) {
+            TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
+            assertEquals(1, topDocs.totalHits);
+        }
+    }
+
+    public void testRetryWithAutogeneratedIdsAndWrongOrderWorksAndNoDuplicateDocs() throws IOException {
+
+        final ParsedDocument doc = testParsedDocument("1", "1", "test", null, 100, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
+        boolean isRetry = true;
+        long autoGeneratedIdTimestamp = 0;
+
+
+        Engine.Index firstIndexRequest = new Engine.Index(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
+        engine.index(firstIndexRequest);
+        assertThat(firstIndexRequest.version(), equalTo(1L));
+
+        Engine.Index firstIndexRequestReplica = new Engine.Index(newUid("1"), doc, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
+        replicaEngine.index(firstIndexRequestReplica);
+        assertThat(firstIndexRequestReplica.version(), equalTo(1L));
+
+        isRetry = false;
+        Engine.Index secondIndexRequest = new Engine.Index(newUid("1"), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
+        engine.index(secondIndexRequest);
+        assertTrue(secondIndexRequest.isCreated());
+        engine.refresh("test");
+        try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
+            TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
+            assertEquals(1, topDocs.totalHits);
+        }
+
+        Engine.Index secondIndexRequestReplica = new Engine.Index(newUid("1"), doc, firstIndexRequest.version(), firstIndexRequest.versionType().versionTypeForReplicationAndRecovery(), REPLICA, System.nanoTime(), autoGeneratedIdTimestamp, isRetry);
+        replicaEngine.index(secondIndexRequestReplica);
+        replicaEngine.refresh("test");
+        try (Engine.Searcher searcher = replicaEngine.acquireSearcher("test")) {
+            TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
+            assertEquals(1, topDocs.totalHits);
+        }
+    }
+
+    public Engine.Index randomAppendOnly(int docId, ParsedDocument doc, boolean retry) {
+        if (randomBoolean()) {
+            return new Engine.Index(newUid(Integer.toString(docId)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, Engine.Operation.Origin.PRIMARY, System.nanoTime(), docId, retry);
+        }
+        return new Engine.Index(newUid(Integer.toString(docId)), doc, 1, VersionType.EXTERNAL, Engine.Operation.Origin.REPLICA, System.nanoTime(), docId, retry);
+    }
+
+    public void testRetryConcurrently() throws InterruptedException, IOException {
+        Thread[] thread = new Thread[randomIntBetween(3, 5)];
+        int numDocs = randomIntBetween(1000, 10000);
+        List<Engine.Index> docs = new ArrayList<>();
+        for (int i = 0; i < numDocs; i++) {
+            final ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, i, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
+            Engine.Index originalIndex = randomAppendOnly(i, doc, false);
+            Engine.Index retryIndex = randomAppendOnly(i, doc, true);
+            docs.add(originalIndex);
+            docs.add(retryIndex);
+        }
+        Collections.shuffle(docs, random());
+        CountDownLatch startGun = new CountDownLatch(thread.length);
+        AtomicInteger offset = new AtomicInteger(-1);
+        for (int i = 0; i < thread.length; i++) {
+            thread[i] = new Thread() {
+                @Override
+                public void run() {
+                    startGun.countDown();
+                    try {
+                        startGun.await();
+                    } catch (InterruptedException e) {
+                        throw new AssertionError(e);
+                    }
+                    int docOffset;
+                    while ((docOffset = offset.incrementAndGet()) < docs.size()) {
+                        engine.index(docs.get(docOffset));
+                    }
+                }
+            };
+            thread[i].start();
+        }
+        for (int i = 0; i < thread.length; i++) {
+            thread[i].join();
+        }
+        assertEquals(0, engine.getNumVersionLookups());
+        assertEquals(0, engine.getNumIndexVersionsLookups());
+        engine.refresh("test");
+        try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
+            TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
+            assertEquals(numDocs, topDocs.totalHits);
+        }
+        assertTrue(engine.indexWriterHasDeletions());
+    }
+
+    public void testAppendConcurrently() throws InterruptedException, IOException {
+        Thread[] thread = new Thread[randomIntBetween(3, 5)];
+        int numDocs = randomIntBetween(1000, 10000);
+        assertEquals(0, engine.getNumVersionLookups());
+        assertEquals(0, engine.getNumIndexVersionsLookups());
+        List<Engine.Index> docs = new ArrayList<>();
+        for (int i = 0; i < numDocs; i++) {
+            final ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, i, -1, testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
+            Engine.Index index = randomAppendOnly(i, doc, false);
+            docs.add(index);
+        }
+        Collections.shuffle(docs, random());
+        CountDownLatch startGun = new CountDownLatch(thread.length);
+        AtomicInteger offset = new AtomicInteger(-1);
+        for (int i = 0; i < thread.length; i++) {
+            thread[i] = new Thread() {
+                @Override
+                public void run() {
+                    startGun.countDown();
+                    try {
+                        startGun.await();
+                    } catch (InterruptedException e) {
+                        throw new AssertionError(e);
+                    }
+                    int docOffset;
+                    while ((docOffset = offset.incrementAndGet()) < docs.size()) {
+                        engine.index(docs.get(docOffset));
+                    }
+                }
+            };
+            thread[i].start();
+        }
+        for (int i = 0; i < thread.length; i++) {
+            thread[i].join();
+        }
+
+        engine.refresh("test");
+        try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
+            TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), 10);
+            assertEquals(docs.size(), topDocs.totalHits);
+        }
+        assertEquals(0, engine.getNumVersionLookups());
+        assertEquals(0, engine.getNumIndexVersionsLookups());
+        assertFalse(engine.indexWriterHasDeletions());
+
+    }
+
+    public static long getNumVersionLookups(InternalEngine engine) { // for other tests to access this
+        return engine.getNumVersionLookups();
+    }
+
+    public static long getNumIndexVersionsLookups(InternalEngine engine) { // for other tests to access this
+        return engine.getNumIndexVersionsLookups();
+    }
 }

+ 1 - 4
core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java

@@ -88,9 +88,6 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 
-/**
- * TODO: document me!
- */
 public class ShadowEngineTests extends ESTestCase {
 
     protected final ShardId shardId = new ShardId("index", "_na_", 1);
@@ -989,7 +986,7 @@ public class ShadowEngineTests extends ESTestCase {
         final int numDocs = randomIntBetween(2, 10); // at least 2 documents otherwise we don't see any deletes below
         for (int i = 0; i < numDocs; i++) {
             ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), new BytesArray("{}"), null);
-            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime());
+            Engine.Index firstIndexRequest = new Engine.Index(newUid(Integer.toString(i)), doc, Versions.MATCH_ANY, VersionType.INTERNAL, PRIMARY, System.nanoTime(), -1, false);
             primaryEngine.index(firstIndexRequest);
             assertThat(firstIndexRequest.version(), equalTo(1L));
         }

+ 9 - 0
core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -262,6 +262,15 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
             return numOfDoc;
         }
 
+        public int appendDocs(final int numOfDoc) throws Exception {
+            for (int doc = 0; doc < numOfDoc; doc++) {
+                final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}");
+                final IndexResponse response = index(indexRequest);
+                assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
+            }
+            return numOfDoc;
+        }
+
         public IndexResponse index(IndexRequest indexRequest) throws Exception {
             PlainActionFuture<IndexingResult> listener = new PlainActionFuture<>();
             IndexingOp op = new IndexingOp(indexRequest, listener, this);

+ 68 - 0
core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

@@ -18,6 +18,18 @@
  */
 package org.elasticsearch.index.replication;
 
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.index.engine.InternalEngine;
+import org.elasticsearch.index.engine.InternalEngineTests;
+import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.IndexShardTests;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.indices.recovery.RecoveryTarget;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+
 public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase {
 
     public void testSimpleReplication() throws Exception {
@@ -28,4 +40,60 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
             shards.assertAllEqual(docCount);
         }
     }
+
+    public void testSimpleAppendOnlyReplication() throws Exception {
+        try (ReplicationGroup shards = createGroup(randomInt(2))) {
+            shards.startAll();
+            final int docCount = randomInt(50);
+            shards.appendDocs(docCount);
+            shards.assertAllEqual(docCount);
+        }
+    }
+
+    public void testAppendWhileRecovering() throws Exception {
+        try (ReplicationGroup shards = createGroup(0)) {
+            shards.startAll();
+            IndexShard replica = shards.addReplica();
+            CountDownLatch latch = new CountDownLatch(2);
+            int numDocs = randomIntBetween(100, 200);
+            shards.appendDocs(1);// just append one to the translog so we can assert below
+            Thread thread = new Thread() {
+                @Override
+                public void run() {
+                    try {
+                        latch.countDown();
+                        latch.await();
+                        shards.appendDocs(numDocs-1);
+                    } catch (Exception e) {
+                        throw new AssertionError(e);
+                    }
+                }
+            };
+            thread.start();
+            Future<Void> future = shards.asyncRecoverReplica(replica, (indexShard, node)
+                -> new RecoveryTarget(indexShard, node, recoveryListener, version -> {}) {
+                @Override
+                public void cleanFiles(int totalTranslogOps, Store.MetadataSnapshot sourceMetaData) throws IOException {
+                    super.cleanFiles(totalTranslogOps, sourceMetaData);
+                    latch.countDown();
+                    try {
+                        latch.await();
+                    } catch (InterruptedException e) {
+                        throw new AssertionError(e);
+                    }
+                }
+            });
+            future.get();
+            thread.join();
+            shards.assertAllEqual(numDocs);
+            Engine engine = IndexShardTests.getEngineFromShard(replica);
+            assertEquals("expected at no version lookups ", InternalEngineTests.getNumVersionLookups((InternalEngine) engine), 0);
+            for (IndexShard shard : shards) {
+                engine = IndexShardTests.getEngineFromShard(shard);
+                assertEquals(0, InternalEngineTests.getNumIndexVersionsLookups((InternalEngine) engine));
+                assertEquals(0, InternalEngineTests.getNumVersionLookups((InternalEngine) engine));
+            }
+        }
+    }
+
 }

+ 4 - 0
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -1820,4 +1820,8 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         @Override
         public void verify(String verificationToken, DiscoveryNode localNode) {}
     }
+
+    public static Engine getEngineFromShard(IndexShard shard) {
+        return shard.getEngineOrNull();
+    }
 }

+ 41 - 14
core/src/test/java/org/elasticsearch/index/store/ExceptionRetryIT.java

@@ -20,13 +20,20 @@ package org.elasticsearch.index.store;
 
 import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
 import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
+import org.elasticsearch.action.admin.indices.stats.IndexShardStats;
+import org.elasticsearch.action.admin.indices.stats.IndexStats;
+import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
+import org.elasticsearch.action.admin.indices.stats.ShardStats;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.bulk.TransportShardBulkAction;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.engine.SegmentsStats;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.test.ESIntegTestCase;
@@ -44,6 +51,7 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
 import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@@ -53,7 +61,8 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSear
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
 
-@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE)
+@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.SUITE, numDataNodes = 2,
+    supportsDedicatedMasters = false, numClientNodes = 1, transportClientRatio = 0.0)
 public class ExceptionRetryIT extends ESIntegTestCase {
 
     @Override
@@ -68,40 +77,46 @@ public class ExceptionRetryIT extends ESIntegTestCase {
     }
 
     /**
-     * Tests retry mechanism when indexing. If an exception occurs when indexing then the indexing request is tried again before finally failing.
-     * If auto generated ids are used this must not lead to duplicate ids
+     * Tests retry mechanism when indexing. If an exception occurs when indexing then the indexing request is tried again before finally
+     * failing. If auto generated ids are used this must not lead to duplicate ids
      * see https://github.com/elastic/elasticsearch/issues/8788
      */
     public void testRetryDueToExceptionOnNetworkLayer() throws ExecutionException, InterruptedException, IOException {
         final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
         int numDocs = scaledRandomIntBetween(100, 1000);
+        Client client = internalCluster().coordOnlyNodeClient();
         NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().get();
-        NodeStats unluckyNode = randomFrom(nodeStats.getNodes());
-        assertAcked(client().admin().indices().prepareCreate("index"));
+        NodeStats unluckyNode = randomFrom(nodeStats.getNodes().stream().filter((s) -> s.getNode().isDataNode())
+            .collect(Collectors.toList()));
+        assertAcked(client().admin().indices().prepareCreate("index").setSettings(Settings.builder()
+            .put("index.number_of_replicas", 1)
+            .put("index.number_of_shards", 5)));
         ensureGreen("index");
-
+        logger.info("unlucky node: {}", unluckyNode.getNode());
         //create a transport service that throws a ConnectTransportException for one bulk request and therefore triggers a retry.
         for (NodeStats dataNode : nodeStats.getNodes()) {
-            MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class, dataNode.getNode().getName()));
-            mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()), new MockTransportService.DelegateTransport(mockTransportService.original()) {
+            MockTransportService mockTransportService = ((MockTransportService) internalCluster().getInstance(TransportService.class,
+                dataNode.getNode().getName()));
+            mockTransportService.addDelegate(internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
+                new MockTransportService.DelegateTransport(mockTransportService.original()) {
 
                 @Override
-                public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, TransportRequestOptions options) throws IOException, TransportException {
+                public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request,
+                                        TransportRequestOptions options) throws IOException, TransportException {
                     super.sendRequest(node, requestId, action, request, options);
-                    if (action.equals(TransportShardBulkAction.ACTION_NAME) && !exceptionThrown.get()) {
+                    if (action.equals(TransportShardBulkAction.ACTION_NAME) && exceptionThrown.compareAndSet(false, true)) {
                         logger.debug("Throw ConnectTransportException");
-                        exceptionThrown.set(true);
                         throw new ConnectTransportException(node, action);
                     }
                 }
             });
         }
 
-        BulkRequestBuilder bulkBuilder = client().prepareBulk();
+        BulkRequestBuilder bulkBuilder = client.prepareBulk();
         for (int i = 0; i < numDocs; i++) {
             XContentBuilder doc = null;
             doc = jsonBuilder().startObject().field("foo", "bar").endObject();
-            bulkBuilder.add(client().prepareIndex("index", "type").setSource(doc));
+            bulkBuilder.add(client.prepareIndex("index", "type").setSource(doc));
         }
 
         BulkResponse response = bulkBuilder.get();
@@ -122,7 +137,8 @@ public class ExceptionRetryIT extends ESIntegTestCase {
         for (int i = 0; i < searchResponse.getHits().getHits().length; i++) {
             if (!uniqueIds.add(searchResponse.getHits().getHits()[i].getId())) {
                 if (!found_duplicate_already) {
-                    SearchResponse dupIdResponse = client().prepareSearch("index").setQuery(termQuery("_id", searchResponse.getHits().getHits()[i].getId())).setExplain(true).get();
+                    SearchResponse dupIdResponse = client().prepareSearch("index").setQuery(termQuery("_id",
+                        searchResponse.getHits().getHits()[i].getId())).setExplain(true).get();
                     assertThat(dupIdResponse.getHits().totalHits(), greaterThan(1L));
                     logger.info("found a duplicate id:");
                     for (SearchHit hit : dupIdResponse.getHits()) {
@@ -137,5 +153,16 @@ public class ExceptionRetryIT extends ESIntegTestCase {
         assertSearchResponse(searchResponse);
         assertThat(dupCounter, equalTo(0L));
         assertHitCount(searchResponse, numDocs);
+        IndicesStatsResponse index = client().admin().indices().prepareStats("index").clear().setSegments(true).get();
+        IndexStats indexStats = index.getIndex("index");
+        long maxUnsafeAutoIdTimestamp = Long.MIN_VALUE;
+        for (IndexShardStats indexShardStats : indexStats) {
+            for (ShardStats shardStats : indexShardStats) {
+                SegmentsStats segments = shardStats.getStats().getSegments();
+                maxUnsafeAutoIdTimestamp = Math.max(maxUnsafeAutoIdTimestamp, segments.getMaxUnsafeAutoIdTimestamp());
+            }
+        }
+        assertTrue("exception must have been thrown otherwise setup is broken", exceptionThrown.get());
+        assertTrue("maxUnsafeAutoIdTimestamp must be > than 0 we have at least one retry", maxUnsafeAutoIdTimestamp > -1);
     }
 }

+ 4 - 5
core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -344,7 +344,7 @@ public class TranslogTests extends ESTestCase {
         assertThat(stats.estimatedNumberOfOperations(), equalTo(0L));
         assertThat(stats.getTranslogSizeInBytes(), equalTo(firstOperationPosition));
         assertEquals(6, total.estimatedNumberOfOperations());
-        assertEquals(431, total.getTranslogSizeInBytes());
+        assertEquals(455, total.getTranslogSizeInBytes());
 
         BytesStreamOutput out = new BytesStreamOutput();
         total.writeTo(out);
@@ -352,14 +352,13 @@ public class TranslogTests extends ESTestCase {
         copy.readFrom(out.bytes().streamInput());
 
         assertEquals(6, copy.estimatedNumberOfOperations());
-        assertEquals(431, copy.getTranslogSizeInBytes());
+        assertEquals(455, copy.getTranslogSizeInBytes());
 
         try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
             builder.startObject();
             copy.toXContent(builder, ToXContent.EMPTY_PARAMS);
             builder.endObject();
-
-            assertEquals("{\"translog\":{\"operations\":6,\"size_in_bytes\":431}}", builder.string());
+            assertEquals("{\"translog\":{\"operations\":6,\"size_in_bytes\":455}}", builder.string());
         }
 
         try {
@@ -1166,7 +1165,7 @@ public class TranslogTests extends ESTestCase {
         try (Translog translog = new Translog(config, translogGeneration)) {
             fail("corrupted");
         } catch (IllegalStateException ex) {
-            assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=2683, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
+            assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3123, numOps=55, translogFileGeneration= 2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration= 0}");
         }
         Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING);
         try (Translog translog = new Translog(config, translogGeneration)) {

+ 10 - 1
core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java

@@ -28,10 +28,13 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
 import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
 import org.elasticsearch.common.Priority;
 import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.test.ESIntegTestCase;
+import org.elasticsearch.test.hamcrest.ElasticsearchAssertions;
 import org.elasticsearch.test.junit.annotations.TestLogging;
 
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.hamcrest.Matchers.equalTo;
 
@@ -49,7 +52,7 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase {
             .addMapping("type", "field", "type=text")
             .get();
         ensureGreen("test");
-
+        AtomicInteger numAutoGenDocs = new AtomicInteger();
         final AtomicBoolean finished = new AtomicBoolean(false);
         Thread indexingThread = new Thread() {
             @Override
@@ -59,6 +62,8 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase {
                     assertEquals(DocWriteResponse.Result.CREATED, indexResponse.getResult());
                     DeleteResponse deleteResponse = client().prepareDelete("test", "type", "id").get();
                     assertEquals(DocWriteResponse.Result.DELETED, deleteResponse.getResult());
+                    client().prepareIndex("test", "type").setSource("auto", true).get();
+                    numAutoGenDocs.incrementAndGet();
                 }
             }
         };
@@ -87,5 +92,9 @@ public class IndexPrimaryRelocationIT extends ESIntegTestCase {
         }
         finished.set(true);
         indexingThread.join();
+        refresh("test");
+        ElasticsearchAssertions.assertHitCount(client().prepareSearch("test").get(), numAutoGenDocs.get());
+        ElasticsearchAssertions.assertHitCount(client().prepareSearch("test")// extra paranoia ;)
+            .setQuery(QueryBuilders.termQuery("auto", true)).get(), numAutoGenDocs.get());
     }
 }

+ 1 - 1
core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

@@ -580,7 +580,7 @@ public class IndexRecoveryIT extends ESIntegTestCase {
         List<IndexRequestBuilder> requests = new ArrayList<>();
         int numDocs = scaledRandomIntBetween(25, 250);
         for (int i = 0; i < numDocs; i++) {
-            requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
+            requests.add(client().prepareIndex(indexName, "type").setSource("{}"));
         }
         indexRandom(true, requests);
         ensureSearchable(indexName);

+ 1 - 1
core/src/test/java/org/elasticsearch/recovery/RelocationIT.java

@@ -363,7 +363,7 @@ public class RelocationIT extends ESIntegTestCase {
         List<IndexRequestBuilder> requests = new ArrayList<>();
         int numDocs = scaledRandomIntBetween(25, 250);
         for (int i = 0; i < numDocs; i++) {
-            requests.add(client().prepareIndex(indexName, "type").setCreate(true).setSource("{}"));
+            requests.add(client().prepareIndex(indexName, "type").setSource("{}"));
         }
         indexRandom(true, requests);
         assertFalse(client().admin().cluster().prepareHealth().setWaitForNodes("3").setWaitForGreenStatus().get().isTimedOut());

+ 1 - 1
core/src/test/java/org/elasticsearch/search/aggregations/bucket/ShardSizeTestCase.java

@@ -103,7 +103,7 @@ public abstract class ShardSizeTestCase extends ESIntegTestCase {
     protected List<IndexRequestBuilder> indexDoc(String shard, String key, int times) throws Exception {
         IndexRequestBuilder[] builders = new IndexRequestBuilder[times];
         for (int i = 0; i < times; i++) {
-            builders[i] = client().prepareIndex("idx", "type").setRouting(shard).setCreate(true).setSource(jsonBuilder()
+            builders[i] = client().prepareIndex("idx", "type").setRouting(shard).setSource(jsonBuilder()
                     .startObject()
                     .field("key", key)
                     .field("value", 1)

+ 1 - 1
core/src/test/java/org/elasticsearch/search/aggregations/metrics/TopHitsIT.java

@@ -219,7 +219,7 @@ public class TopHitsIT extends ESIntegTestCase {
             builder.endArray().endObject();
 
             builders.add(
-                    client().prepareIndex("articles", "article").setCreate(true).setSource(builder)
+                    client().prepareIndex("articles", "article").setSource(builder)
             );
         }
 

+ 5 - 0
docs/reference/migration/migrate_5_0/index-apis.asciidoc

@@ -64,3 +64,8 @@ The `/_aliases` API no longer supports `indexRouting` and `index-routing`, only
 `index_routing`. It also no longer support `searchRouting` and `search-routing`,
 only `search_routing`. These were removed because they were untested and we
 prefer there to be only one (obvious) way to do things like this.
+
+==== OpType Create without an ID
+
+As of 5.0 indexing a document with `op_type=create` without specifying an ID is not
+supported anymore.

+ 1 - 18
rest-api-spec/src/main/resources/rest-api-spec/test/create/15_without_id.yaml

@@ -1,25 +1,8 @@
 ---
 "Create without ID":
  - do:
+      catch:            /Validation|Invalid/
       create:
           index:  test_1
           type:   test
           body:   { foo: bar }
-
- - is_true:   _id
- - match:   { _index:   test_1 }
- - match:   { _type:    test   }
- - match:   { _version: 1      }
- - set:     { _id:      id    }
-
- - do:
-      get:
-          index:  test_1
-          type:   test
-          id:     '$id'
-
- - match:   { _index:   test_1 }
- - match:   { _type:    test   }
- - match:   { _id:      $id    }
- - match:   { _version: 1      }
- - match:   { _source: { foo: bar }}

+ 12 - 2
test/framework/src/main/java/org/elasticsearch/test/BackgroundIndexer.java

@@ -58,6 +58,7 @@ public class BackgroundIndexer implements AutoCloseable {
     final CountDownLatch startLatch = new CountDownLatch(1);
     final AtomicBoolean hasBudget = new AtomicBoolean(false); // when set to true, writers will acquire writes from a semaphore
     final Semaphore availableBudget = new Semaphore(0);
+    final boolean useAutoGeneratedIDs;
 
     volatile int minFieldSize = 10;
     volatile int maxFieldSize = 140;
@@ -118,6 +119,7 @@ public class BackgroundIndexer implements AutoCloseable {
         if (random == null) {
             random = RandomizedTest.getRandom();
         }
+        useAutoGeneratedIDs = random.nextBoolean();
         failures = new CopyOnWriteArrayList<>();
         writers = new Thread[writerCount];
         stopLatch = new CountDownLatch(writers.length);
@@ -147,7 +149,11 @@ public class BackgroundIndexer implements AutoCloseable {
                                 BulkRequestBuilder bulkRequest = client.prepareBulk();
                                 for (int i = 0; i < batchSize; i++) {
                                     id = idGenerator.incrementAndGet();
-                                    bulkRequest.add(client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)));
+                                    if (useAutoGeneratedIDs) {
+                                        bulkRequest.add(client.prepareIndex(index, type).setSource(generateSource(id, threadRandom)));
+                                    } else {
+                                        bulkRequest.add(client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)));
+                                    }
                                 }
                                 BulkResponse bulkResponse = bulkRequest.get();
                                 for (BulkItemResponse bulkItemResponse : bulkResponse) {
@@ -166,7 +172,11 @@ public class BackgroundIndexer implements AutoCloseable {
                                     continue;
                                 }
                                 id = idGenerator.incrementAndGet();
-                                client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)).get();
+                                if (useAutoGeneratedIDs) {
+                                    client.prepareIndex(index, type).setSource(generateSource(id, threadRandom)).get();
+                                } else {
+                                    client.prepareIndex(index, type, Long.toString(id)).setSource(generateSource(id, threadRandom)).get();
+                                }
                                 indexCounter.incrementAndGet();
                             }
                         }