Browse Source

Pass on maxUnsafeAutoIdTimestamp on recovery / relocation (#20300)

To ensure we don't add documents more than once even if it's mostly paranoia
except of one case where we relocated a shards away and back to the same node
while an initial request is in flight but has not yet finished AND is retried.

Yet, this is a possible case and for that reason we ensure we pass on the
maxUnsafeAutoIdTimestamp on when we prepare for translog recovery.

Relates to #20211
Simon Willnauer 9 years ago
parent
commit
c992a007c8
17 changed files with 116 additions and 47 deletions
  1. 12 6
      core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java
  2. 4 3
      core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java
  3. 1 1
      core/src/main/java/org/elasticsearch/index/engine/SegmentsStats.java
  4. 9 7
      core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
  5. 3 2
      core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java
  6. 1 1
      core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java
  7. 10 1
      core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java
  8. 2 1
      core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java
  9. 2 2
      core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java
  10. 3 1
      core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java
  11. 2 2
      core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java
  12. 29 11
      core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java
  13. 2 1
      core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java
  14. 7 3
      core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java
  15. 25 0
      core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java
  16. 2 2
      core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
  17. 2 3
      core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

+ 12 - 6
core/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -25,6 +25,7 @@ import org.apache.lucene.index.SnapshotDeletionPolicy;
 import org.apache.lucene.search.QueryCache;
 import org.apache.lucene.search.QueryCachingPolicy;
 import org.apache.lucene.search.similarities.Similarity;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.settings.Setting;
 import org.elasticsearch.common.settings.Setting.Property;
@@ -65,6 +66,7 @@ public final class EngineConfig {
     private final Engine.EventListener eventListener;
     private final QueryCache queryCache;
     private final QueryCachingPolicy queryCachingPolicy;
+    private final long maxUnsafeAutoIdTimestamp;
     @Nullable
     private final RefreshListeners refreshListeners;
 
@@ -107,10 +109,11 @@ public final class EngineConfig {
      */
     public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
                         IndexSettings indexSettings, Engine.Warmer warmer, Store store, SnapshotDeletionPolicy deletionPolicy,
-                        MergePolicy mergePolicy,Analyzer analyzer,
+                        MergePolicy mergePolicy, Analyzer analyzer,
                         Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
                         TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
-                        TranslogConfig translogConfig, TimeValue flushMergesAfter, RefreshListeners refreshListeners) {
+                        TranslogConfig translogConfig, TimeValue flushMergesAfter, RefreshListeners refreshListeners,
+                        long maxUnsafeAutoIdTimestamp) {
         if (openMode == null) {
             throw new IllegalArgumentException("openMode must not be null");
         }
@@ -137,6 +140,9 @@ public final class EngineConfig {
         this.flushMergesAfter = flushMergesAfter;
         this.openMode = openMode;
         this.refreshListeners = refreshListeners;
+        assert maxUnsafeAutoIdTimestamp >= IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP :
+            "maxUnsafeAutoIdTimestamp must be >= -1 but was " + maxUnsafeAutoIdTimestamp;
+        this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp;
     }
 
     /**
@@ -323,10 +329,10 @@ public final class EngineConfig {
     }
 
     /**
-     * Returns <code>true</code> iff auto generated IDs should be optimized inside the engine for append only.
-     * The default is <code>true</code>.
+     * Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
+     * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
      */
-    public boolean getOptimizeAutoGeneratedIds() {
-        return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS);
+    public long getMaxUnsafeAutoIdTimestamp() {
+        return indexSettings.getValue(INDEX_OPTIMIZE_AUTO_GENERATED_IDS) ? maxUnsafeAutoIdTimestamp : Long.MAX_VALUE;
     }
 }

+ 4 - 3
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -127,10 +127,11 @@ public class InternalEngine extends Engine {
     public InternalEngine(EngineConfig engineConfig) throws EngineException {
         super(engineConfig);
         openMode = engineConfig.getOpenMode();
-        if (engineConfig.getOptimizeAutoGeneratedIds() == false
-            || engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_5_0_0_alpha6)) {
+        if (engineConfig.getIndexSettings().getIndexVersionCreated().before(Version.V_5_0_0_alpha6)) {
             // no optimization for pre 5.0.0.alpha6 since translog might not have all information needed
             maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
+        } else {
+            maxUnsafeAutoIdTimestamp.set(engineConfig.getMaxUnsafeAutoIdTimestamp());
         }
         this.versionMap = new LiveVersionMap();
         store.incRef();
@@ -1299,7 +1300,7 @@ public class InternalEngine extends Engine {
         mergeScheduler.refreshConfig();
         // config().isEnableGcDeletes() or config.getGcDeletesInMillis() may have changed:
         maybePruneDeletedTombstones();
-        if (engineConfig.getOptimizeAutoGeneratedIds() == false) {
+        if (engineConfig.getMaxUnsafeAutoIdTimestamp() == Long.MAX_VALUE) {
             // this is an anti-viral settings you can only opt out for the entire index
             // only if a shard starts up again due to relocation or if the index is closed
             // the setting will be re-interpreted if it's set to true

+ 1 - 1
core/src/main/java/org/elasticsearch/index/engine/SegmentsStats.java

@@ -279,7 +279,7 @@ public class SegmentsStats implements Streamable, ToXContent {
     }
 
     /**
-     * Returns the max timestamp that is used to de-optimize documetns with auto-generated IDs in the engine.
+     * Returns the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
      * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
      */
     public long getMaxUnsafeAutoIdTimestamp() {

+ 9 - 7
core/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -42,6 +42,7 @@ import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.flush.FlushRequest;
 import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
 import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.routing.RecoverySource;
@@ -959,11 +960,11 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             translogStats.totalOperations(0);
             translogStats.totalOperationsOnStart(0);
         }
-        internalPerformTranslogRecovery(false, indexExists);
+        internalPerformTranslogRecovery(false, indexExists, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
         assert recoveryState.getStage() == RecoveryState.Stage.TRANSLOG : "TRANSLOG stage expected but was: " + recoveryState.getStage();
     }
 
-    private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists) throws IOException {
+    private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boolean indexExists, long maxUnsafeAutoIdTimestamp) throws IOException {
         if (state != IndexShardState.RECOVERING) {
             throw new IndexShardNotRecoveringException(shardId, state);
         }
@@ -992,7 +993,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         } else {
             openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
         }
-        final EngineConfig config = newEngineConfig(openMode);
+        final EngineConfig config = newEngineConfig(openMode, maxUnsafeAutoIdTimestamp);
         // we disable deletes since we allow for operations to be executed against the shard while recovering
         // but we need to make sure we don't loose deletes until we are done recovering
         config.setEnableGcDeletes(false);
@@ -1012,9 +1013,9 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      * the replay of the transaction log which is required in cases where we restore a previous index or recover from
      * a remote peer.
      */
-    public void skipTranslogRecovery() throws IOException {
+    public void skipTranslogRecovery(long maxUnsafeAutoIdTimestamp) throws IOException {
         assert getEngineOrNull() == null : "engine was already created";
-        internalPerformTranslogRecovery(true, true);
+        internalPerformTranslogRecovery(true, true, maxUnsafeAutoIdTimestamp);
         assert recoveryState.getTranslog().recoveredOperations() == 0;
     }
 
@@ -1603,12 +1604,13 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
         return mapperService.documentMapperWithAutoCreate(type);
     }
 
-    private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode) {
+    private EngineConfig newEngineConfig(EngineConfig.OpenMode openMode, long maxUnsafeAutoIdTimestamp) {
         final IndexShardRecoveryPerformer translogRecoveryPerformer = new IndexShardRecoveryPerformer(shardId, mapperService, logger);
         return new EngineConfig(openMode, shardId,
             threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
             mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
-            IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners);
+            IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()), refreshListeners,
+            maxUnsafeAutoIdTimestamp);
     }
 
     /**

+ 3 - 2
core/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java

@@ -30,6 +30,7 @@ import org.apache.lucene.store.FilterDirectory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.elasticsearch.ExceptionsHelper;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.MappingMetaData;
 import org.elasticsearch.cluster.routing.RecoverySource;
@@ -346,7 +347,7 @@ final class StoreRecovery {
             recoveryState.getIndex().updateVersion(version);
             if (recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS) {
                 assert indexShouldExists;
-                indexShard.skipTranslogRecovery();
+                indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
             } else {
                 // since we recover from local, just fill the files and size
                 try {
@@ -398,7 +399,7 @@ final class StoreRecovery {
             }
             final IndexId indexId = repository.getRepositoryData().resolveIndexId(indexName);
             repository.restoreShard(indexShard, restoreSource.snapshot().getSnapshotId(), restoreSource.version(), indexId, snapshotShardId, indexShard.recoveryState());
-            indexShard.skipTranslogRecovery();
+            indexShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
             indexShard.finalizeRecovery();
             indexShard.postRecovery("restore done");
         } catch (Exception e) {

+ 1 - 1
core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java

@@ -304,7 +304,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
         public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
             try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
             )) {
-                recoveryRef.status().prepareForTranslogOperations(request.totalTranslogOps());
+                recoveryRef.status().prepareForTranslogOperations(request.totalTranslogOps(), request.getMaxUnsafeAutoIdTimestamp());
             }
             channel.sendResponse(TransportResponse.Empty.INSTANCE);
         }

+ 10 - 1
core/src/main/java/org/elasticsearch/indices/recovery/RecoveryPrepareForTranslogOperationsRequest.java

@@ -19,6 +19,7 @@
 
 package org.elasticsearch.indices.recovery;
 
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.io.stream.StreamInput;
 import org.elasticsearch.common.io.stream.StreamOutput;
 import org.elasticsearch.index.shard.ShardId;
@@ -31,6 +32,7 @@ import java.io.IOException;
  */
 public class RecoveryPrepareForTranslogOperationsRequest extends TransportRequest {
 
+    private long maxUnsafeAutoIdTimestamp = IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP;
     private long recoveryId;
     private ShardId shardId;
     private int totalTranslogOps = RecoveryState.Translog.UNKNOWN;
@@ -38,10 +40,11 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques
     public RecoveryPrepareForTranslogOperationsRequest() {
     }
 
-    RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps) {
+    RecoveryPrepareForTranslogOperationsRequest(long recoveryId, ShardId shardId, int totalTranslogOps, long maxUnsafeAutoIdTimestamp) {
         this.recoveryId = recoveryId;
         this.shardId = shardId;
         this.totalTranslogOps = totalTranslogOps;
+        this.maxUnsafeAutoIdTimestamp = maxUnsafeAutoIdTimestamp;
     }
 
     public long recoveryId() {
@@ -56,12 +59,17 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques
         return totalTranslogOps;
     }
 
+    public long getMaxUnsafeAutoIdTimestamp() {
+        return maxUnsafeAutoIdTimestamp;
+    }
+
     @Override
     public void readFrom(StreamInput in) throws IOException {
         super.readFrom(in);
         recoveryId = in.readLong();
         shardId = ShardId.readShardId(in);
         totalTranslogOps = in.readVInt();
+        maxUnsafeAutoIdTimestamp = in.readLong();
     }
 
     @Override
@@ -70,5 +78,6 @@ public class RecoveryPrepareForTranslogOperationsRequest extends TransportReques
         out.writeLong(recoveryId);
         shardId.writeTo(out);
         out.writeVInt(totalTranslogOps);
+        out.writeLong(maxUnsafeAutoIdTimestamp);
     }
 }

+ 2 - 1
core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java

@@ -346,7 +346,8 @@ public class RecoverySourceHandler {
         // Send a request preparing the new shard's translog to receive
         // operations. This ensures the shard engine is started and disables
         // garbage collection (not the JVM's GC!) of tombstone deletes
-        cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps));
+        cancellableThreads.executeIO(() -> recoveryTarget.prepareForTranslogOperations(totalTranslogOps,
+            shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp()));
         stopWatch.stop();
 
         response.startTime = stopWatch.totalTime().millis() - startEngineStart;

+ 2 - 2
core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java

@@ -327,9 +327,9 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget
     /*** Implementation of {@link RecoveryTargetHandler } */
 
     @Override
-    public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
+    public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
         state().getTranslog().totalOperations(totalTranslogOps);
-        indexShard().skipTranslogRecovery();
+        indexShard().skipTranslogRecovery(maxUnsafeAutoIdTimestamp);
     }
 
     @Override

+ 3 - 1
core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTargetHandler.java

@@ -33,8 +33,10 @@ public interface RecoveryTargetHandler {
      * Prepares the tranget to receive translog operations, after all file have been copied
      *
      * @param totalTranslogOps total translog operations expected to be sent
+     * @param maxUnsafeAutoIdTimestamp the max timestamp that is used to de-optimize documents with auto-generated IDs in the engine.
+     * This is used to ensure we don't add duplicate documents when we assume an append only case based on auto-generated IDs
      */
-    void prepareForTranslogOperations(int totalTranslogOps) throws IOException;
+    void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException;
 
     /**
      * The finalize request clears unreferenced translog files, refreshes the engine now that

+ 2 - 2
core/src/main/java/org/elasticsearch/indices/recovery/RemoteRecoveryTargetHandler.java

@@ -74,9 +74,9 @@ public class RemoteRecoveryTargetHandler implements RecoveryTargetHandler {
     }
 
     @Override
-    public void prepareForTranslogOperations(int totalTranslogOps) throws IOException {
+    public void prepareForTranslogOperations(int totalTranslogOps, long maxUnsafeAutoIdTimestamp) throws IOException {
         transportService.submitRequest(targetNode, PeerRecoveryTargetService.Actions.PREPARE_TRANSLOG,
-                new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps),
+                new RecoveryPrepareForTranslogOperationsRequest(recoveryId, shardId, totalTranslogOps, maxUnsafeAutoIdTimestamp),
                 TransportRequestOptions.builder().withTimeout(recoverySettings.internalActionTimeout()).build(),
                 EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
     }

+ 29 - 11
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -53,6 +53,7 @@ import org.apache.lucene.util.IOUtils;
 import org.apache.lucene.util.TestUtil;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Nullable;
@@ -206,7 +207,8 @@ public class InternalEngineTests extends ESTestCase {
         return new EngineConfig(openMode, config.getShardId(), config.getThreadPool(), config.getIndexSettings(), config.getWarmer(),
             config.getStore(), config.getDeletionPolicy(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
             new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(), config.getQueryCache(),
-            config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners());
+            config.getQueryCachingPolicy(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
+            config.getMaxUnsafeAutoIdTimestamp());
     }
 
     @Override
@@ -276,7 +278,7 @@ public class InternalEngineTests extends ESTestCase {
     }
 
     protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
-        EngineConfig config = config(indexSettings, store, translogPath, mergePolicy);
+        EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
         InternalEngine internalEngine = new InternalEngine(config);
         if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
             internalEngine.recoverFromTranslog();
@@ -284,7 +286,7 @@ public class InternalEngineTests extends ESTestCase {
         return internalEngine;
     }
 
-    public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) {
+    public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, long maxUnsafeAutoIdTimestamp) {
         IndexWriterConfig iwc = newIndexWriterConfig();
         TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
         final EngineConfig.OpenMode openMode;
@@ -306,7 +308,7 @@ public class InternalEngineTests extends ESTestCase {
         EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(),
                 mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
                 new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(),
-                IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), null);
+                IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), null, maxUnsafeAutoIdTimestamp);
 
         return config;
     }
@@ -903,7 +905,7 @@ public class InternalEngineTests extends ESTestCase {
     public void testSyncedFlush() throws IOException {
         try (Store store = createStore();
             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
-                     new LogByteSizeMergePolicy()))) {
+                     new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) {
             final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
             ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
             engine.index(new Engine.Index(newUid("1"), doc));
@@ -930,7 +932,7 @@ public class InternalEngineTests extends ESTestCase {
         for (int i = 0; i < iters; i++) {
             try (Store store = createStore();
                  InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
-                         new LogDocMergePolicy()))) {
+                         new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) {
                 final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
                 ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
                 Engine.Index doc1 = new Engine.Index(newUid("1"), doc);
@@ -1158,7 +1160,7 @@ public class InternalEngineTests extends ESTestCase {
     public void testForceMerge() throws IOException {
         try (Store store = createStore();
             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
-                     new LogByteSizeMergePolicy()))) { // use log MP here we test some behavior in ESMP
+                     new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { // use log MP here we test some behavior in ESMP
             int numDocs = randomIntBetween(10, 100);
             for (int i = 0; i < numDocs; i++) {
                 ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null);
@@ -1601,7 +1603,7 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testEnableGcDeletes() throws Exception {
         try (Store store = createStore();
-            Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy()))) {
+            Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) {
             engine.config().setEnableGcDeletes(false);
 
             // Add document
@@ -1737,7 +1739,7 @@ public class InternalEngineTests extends ESTestCase {
             // expected
         }
         // now it should be OK.
-        EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy()), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
+        EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
         engine = new InternalEngine(config);
     }
 
@@ -2057,7 +2059,7 @@ public class InternalEngineTests extends ESTestCase {
                 config.getIndexSettings(), null, store, createSnapshotDeletionPolicy(), newMergePolicy(), config.getAnalyzer(),
                 config.getSimilarity(), new CodecService(null, logger), config.getEventListener(), config.getTranslogRecoveryPerformer(),
                 IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
-                TimeValue.timeValueMinutes(5), config.getRefreshListeners());
+                TimeValue.timeValueMinutes(5), config.getRefreshListeners(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
 
         try {
             InternalEngine internalEngine = new InternalEngine(brokenConfig);
@@ -2112,7 +2114,7 @@ public class InternalEngineTests extends ESTestCase {
 
     public void testCurrentTranslogIDisCommitted() throws IOException {
         try (Store store = createStore()) {
-            EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy());
+            EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
 
             // create
             {
@@ -2374,6 +2376,22 @@ public class InternalEngineTests extends ESTestCase {
         assertTrue(engine.indexWriterHasDeletions());
     }
 
+    public void testEngineMaxTimestampIsInitialized() throws IOException {
+        try (Store store = createStore();
+             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
+                 IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) {
+            assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
+
+        }
+
+        long maxTimestamp = Math.abs(randomLong());
+        try (Store store = createStore();
+             Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
+                 maxTimestamp))) {
+            assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
+        }
+    }
+
     public void testAppendConcurrently() throws InterruptedException, IOException {
         Thread[] thread = new Thread[randomIntBetween(3, 5)];
         int numDocs = randomIntBetween(1000, 10000);

+ 2 - 1
core/src/test/java/org/elasticsearch/index/engine/ShadowEngineTests.java

@@ -38,6 +38,7 @@ import org.apache.lucene.store.MockDirectoryWrapper;
 import org.apache.lucene.util.IOUtils;
 import org.elasticsearch.ElasticsearchException;
 import org.elasticsearch.Version;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.bytes.BytesArray;
@@ -247,7 +248,7 @@ public class ShadowEngineTests extends ESTestCase {
         EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(),
                 mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), eventListener, null,
                 IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
-                TimeValue.timeValueMinutes(5), refreshListeners);
+                TimeValue.timeValueMinutes(5), refreshListeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
 
         return config;
     }

+ 7 - 3
core/src/test/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java

@@ -110,7 +110,7 @@ import static org.hamcrest.Matchers.equalTo;
 public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
 
     protected ThreadPool threadPool;
-    private final Index index = new Index("test", "uuid");
+    protected final Index index = new Index("test", "uuid");
     private final ShardId shardId = new ShardId(index, 0);
     private final Map<String, String> indexMapping = Collections.singletonMap("type", "{ \"type\": {} }");
     protected static final PeerRecoveryTargetService.RecoveryListener recoveryListener = new PeerRecoveryTargetService.RecoveryListener() {
@@ -284,8 +284,7 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
             primary.recoverFromStore();
             primary.updateRoutingEntry(ShardRoutingHelper.moveToStarted(primary.routingEntry()));
             for (IndexShard replicaShard : replicas) {
-                recoverReplica(replicaShard,
-                    (replica, sourceNode) -> new RecoveryTarget(replica, sourceNode, recoveryListener, version -> {}));
+                recoverReplica(replicaShard);
             }
         }
 
@@ -294,6 +293,11 @@ public abstract class ESIndexLevelReplicationTestCase extends ESTestCase {
             replicas.add(replica);
             return replica;
         }
+
+        public void recoverReplica(IndexShard replica) throws IOException {
+            recoverReplica(replica, (r, sourceNode) -> new RecoveryTarget(r, sourceNode, recoveryListener, version -> {}));
+        }
+
         public void recoverReplica(IndexShard replica, BiFunction<IndexShard, DiscoveryNode, RecoveryTarget> targetSupplier)
             throws IOException {
             recoverReplica(replica, targetSupplier, true);

+ 25 - 0
core/src/test/java/org/elasticsearch/index/replication/IndexLevelReplicationTests.java

@@ -18,9 +18,13 @@
  */
 package org.elasticsearch.index.replication;
 
+import org.elasticsearch.action.DocWriteResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.index.engine.Engine;
 import org.elasticsearch.index.engine.InternalEngine;
 import org.elasticsearch.index.engine.InternalEngineTests;
+import org.elasticsearch.index.engine.SegmentsStats;
 import org.elasticsearch.index.shard.IndexShard;
 import org.elasticsearch.index.shard.IndexShardTests;
 import org.elasticsearch.index.store.Store;
@@ -96,4 +100,25 @@ public class IndexLevelReplicationTests extends ESIndexLevelReplicationTestCase
         }
     }
 
+    public void testInheritMaxValidAutoIDTimestampOnRecovery() throws Exception {
+        try (ReplicationGroup shards = createGroup(0)) {
+            shards.startAll();
+            final IndexRequest indexRequest = new IndexRequest(index.getName(), "type").source("{}");
+            indexRequest.onRetry(); // force an update of the timestamp
+            final IndexResponse response = shards.index(indexRequest);
+            assertEquals(DocWriteResponse.Result.CREATED, response.getResult());
+            if (randomBoolean()) { // lets check if that also happens if no translog record is replicated
+                shards.flush();
+            }
+            IndexShard replica = shards.addReplica();
+            shards.recoverReplica(replica);
+
+            SegmentsStats segmentsStats = replica.segmentStats(false);
+            SegmentsStats primarySegmentStats = shards.getPrimary().segmentStats(false);
+            assertNotEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, primarySegmentStats.getMaxUnsafeAutoIdTimestamp());
+            assertEquals(primarySegmentStats.getMaxUnsafeAutoIdTimestamp(), segmentsStats.getMaxUnsafeAutoIdTimestamp());
+            assertNotEquals(Long.MAX_VALUE, segmentsStats.getMaxUnsafeAutoIdTimestamp());
+        }
+    }
+
 }

+ 2 - 2
core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java

@@ -1611,7 +1611,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         operations.add(new Translog.Index("testtype", "1", BytesReference.toBytes(jsonBuilder().startObject().field("foo", "bar").endObject().bytes())));
         newShard.prepareForIndexRecovery();
         newShard.recoveryState().getTranslog().totalOperations(operations.size());
-        newShard.skipTranslogRecovery();
+        newShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
         newShard.performBatchRecovery(operations);
         assertFalse(newShard.getTranslog().syncNeeded());
     }
@@ -1668,7 +1668,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
         List<Translog.Operation> operations = new ArrayList<>();
         operations.add(new Translog.Index("testtype", "1", BytesReference.toBytes(jsonBuilder().startObject().field("foo", "bar").endObject().bytes())));
         newShard.prepareForIndexRecovery();
-        newShard.skipTranslogRecovery();
+        newShard.skipTranslogRecovery(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
         // Shard is still inactive since we haven't started recovering yet
         assertFalse(newShard.isActive());
         newShard.performBatchRecovery(operations);

+ 2 - 3
core/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -29,6 +29,7 @@ import org.apache.lucene.index.Term;
 import org.apache.lucene.search.IndexSearcher;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
@@ -36,7 +37,6 @@ import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.unit.TimeValue;
 import org.elasticsearch.common.util.BigArrays;
-import org.elasticsearch.common.util.concurrent.FutureUtils;
 import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.codec.CodecService;
@@ -66,7 +66,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
@@ -125,7 +124,7 @@ public class RefreshListenersTests extends ESTestCase {
                 store, new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), newMergePolicy(), iwc.getAnalyzer(),
                 iwc.getSimilarity(), new CodecService(null, logger), eventListener, new TranslogHandler(shardId.getIndexName(), logger),
                 IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
-                TimeValue.timeValueMinutes(5), listeners);
+                TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
         engine = new InternalEngine(config);
     }