Browse Source

Pass TranslogRecoveryRunner to engine from outside (#33449)

This commit allows us to use different TranslogRecoveryRunner when
recovering an engine from its local translog. This change is a
prerequisite for the commit-based rollback PR.

Relates #32867
Nhat Nguyen 7 years ago
parent
commit
8afe09a749

+ 8 - 2
server/src/main/java/org/elasticsearch/index/engine/Engine.java

@@ -1642,9 +1642,10 @@ public abstract class Engine implements Closeable {
      * Performs recovery from the transaction log up to {@code recoverUpToSeqNo} (inclusive).
      * This operation will close the engine if the recovery fails.
      *
-     * @param recoverUpToSeqNo the upper bound, inclusive, of sequence number to be recovered
+     * @param translogRecoveryRunner the translog recovery runner
+     * @param recoverUpToSeqNo       the upper bound, inclusive, of sequence number to be recovered
      */
-    public abstract Engine recoverFromTranslog(long recoverUpToSeqNo) throws IOException;
+    public abstract Engine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException;
 
     /**
      * Do not replay translog operations, but make the engine be ready.
@@ -1662,4 +1663,9 @@ public abstract class Engine implements Closeable {
      * Tries to prune buffered deletes from the version map.
      */
     public abstract void maybePruneDeletes();
+
+    @FunctionalInterface
+    public interface TranslogRecoveryRunner {
+        int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
+    }
 }

+ 2 - 19
server/src/main/java/org/elasticsearch/index/engine/EngineConfig.java

@@ -37,13 +37,11 @@ import org.elasticsearch.index.codec.CodecService;
 import org.elasticsearch.index.mapper.ParsedDocument;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.store.Store;
-import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogConfig;
 import org.elasticsearch.indices.IndexingMemoryController;
 import org.elasticsearch.indices.breaker.CircuitBreakerService;
 import org.elasticsearch.threadpool.ThreadPool;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.function.LongSupplier;
 
@@ -76,7 +74,6 @@ public final class EngineConfig {
     private final List<ReferenceManager.RefreshListener> internalRefreshListener;
     @Nullable
     private final Sort indexSort;
-    private final TranslogRecoveryRunner translogRecoveryRunner;
     @Nullable
     private final CircuitBreakerService circuitBreakerService;
     private final LongSupplier globalCheckpointSupplier;
@@ -127,9 +124,8 @@ public final class EngineConfig {
                         TranslogConfig translogConfig, TimeValue flushMergesAfter,
                         List<ReferenceManager.RefreshListener> externalRefreshListener,
                         List<ReferenceManager.RefreshListener> internalRefreshListener, Sort indexSort,
-                        TranslogRecoveryRunner translogRecoveryRunner, CircuitBreakerService circuitBreakerService,
-                        LongSupplier globalCheckpointSupplier, LongSupplier primaryTermSupplier,
-                        TombstoneDocSupplier tombstoneDocSupplier) {
+                        CircuitBreakerService circuitBreakerService, LongSupplier globalCheckpointSupplier,
+                        LongSupplier primaryTermSupplier, TombstoneDocSupplier tombstoneDocSupplier) {
         this.shardId = shardId;
         this.allocationId = allocationId;
         this.indexSettings = indexSettings;
@@ -163,7 +159,6 @@ public final class EngineConfig {
         this.externalRefreshListener = externalRefreshListener;
         this.internalRefreshListener = internalRefreshListener;
         this.indexSort = indexSort;
-        this.translogRecoveryRunner = translogRecoveryRunner;
         this.circuitBreakerService = circuitBreakerService;
         this.globalCheckpointSupplier = globalCheckpointSupplier;
         this.primaryTermSupplier = primaryTermSupplier;
@@ -324,18 +319,6 @@ public final class EngineConfig {
      */
     public TimeValue getFlushMergesAfter() { return flushMergesAfter; }
 
-    @FunctionalInterface
-    public interface TranslogRecoveryRunner {
-        int run(Engine engine, Translog.Snapshot snapshot) throws IOException;
-    }
-
-    /**
-     * Returns a runner that implements the translog recovery from the given snapshot
-     */
-    public TranslogRecoveryRunner getTranslogRecoveryRunner() {
-        return translogRecoveryRunner;
-    }
-
     /**
      * The refresh listeners to add to Lucene for externally visible refreshes
      */

+ 4 - 4
server/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -393,7 +393,7 @@ public class InternalEngine extends Engine {
     }
 
     @Override
-    public InternalEngine recoverFromTranslog(long recoverUpToSeqNo) throws IOException {
+    public InternalEngine recoverFromTranslog(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
         flushLock.lock();
         try (ReleasableLock lock = readLock.acquire()) {
             ensureOpen();
@@ -401,7 +401,7 @@ public class InternalEngine extends Engine {
                 throw new IllegalStateException("Engine has already been recovered");
             }
             try {
-                recoverFromTranslogInternal(recoverUpToSeqNo);
+                recoverFromTranslogInternal(translogRecoveryRunner, recoverUpToSeqNo);
             } catch (Exception e) {
                 try {
                     pendingTranslogRecovery.set(true); // just play safe and never allow commits on this see #ensureCanFlush
@@ -423,13 +423,13 @@ public class InternalEngine extends Engine {
         pendingTranslogRecovery.set(false); // we are good - now we can commit
     }
 
-    private void recoverFromTranslogInternal(long recoverUpToSeqNo) throws IOException {
+    private void recoverFromTranslogInternal(TranslogRecoveryRunner translogRecoveryRunner, long recoverUpToSeqNo) throws IOException {
         Translog.TranslogGeneration translogGeneration = translog.getGeneration();
         final int opsRecovered;
         final long translogFileGen = Long.parseLong(lastCommittedSegmentInfos.getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
         try (Translog.Snapshot snapshot = translog.newSnapshotFromGen(
             new Translog.TranslogGeneration(translog.getTranslogUUID(), translogFileGen), recoverUpToSeqNo)) {
-            opsRecovered = config().getTranslogRecoveryRunner().run(this, snapshot);
+            opsRecovered = translogRecoveryRunner.run(this, snapshot);
         } catch (Exception e) {
             throw new EngineException(shardId, "failed to recover from translog", e);
         }

+ 2 - 2
server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

@@ -1314,7 +1314,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
      **/
     public void openEngineAndRecoverFromTranslog() throws IOException {
         innerOpenEngineAndTranslog();
-        getEngine().recoverFromTranslog(Long.MAX_VALUE);
+        getEngine().recoverFromTranslog(this::runTranslogRecovery, Long.MAX_VALUE);
     }
 
     /**
@@ -2233,7 +2233,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
             IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING.get(indexSettings.getSettings()),
             Collections.singletonList(refreshListeners),
             Collections.singletonList(new RefreshMetricUpdater(refreshMetric)),
-            indexSort, this::runTranslogRecovery, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
+            indexSort, circuitBreakerService, replicationTracker, () -> operationPrimaryTerm, tombstoneDocSupplier());
     }
 
     /**

+ 35 - 37
server/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -661,7 +661,7 @@ public class InternalEngineTests extends EngineTestCase {
         trimUnsafeCommits(engine.config());
         engine = new InternalEngine(engine.config());
         assertTrue(engine.isRecovering());
-        engine.recoverFromTranslog(Long.MAX_VALUE);
+        engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
         Engine.Searcher searcher = wrapper.wrap(engine.acquireSearcher("test"));
         assertThat(counter.get(), equalTo(2));
         searcher.close();
@@ -678,7 +678,7 @@ public class InternalEngineTests extends EngineTestCase {
         engine = new InternalEngine(engine.config());
         expectThrows(IllegalStateException.class, () -> engine.flush(true, true));
         assertTrue(engine.isRecovering());
-        engine.recoverFromTranslog(Long.MAX_VALUE);
+        engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
         assertFalse(engine.isRecovering());
         doc = testParsedDocument("2", null, testDocumentWithTextField(), SOURCE, null);
         engine.index(indexForDoc(doc));
@@ -708,7 +708,7 @@ public class InternalEngineTests extends EngineTestCase {
         }
         trimUnsafeCommits(engine.config());
         try (Engine recoveringEngine = new InternalEngine(engine.config())){
-            recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
+            recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
                 final TotalHitCountCollector collector = new TotalHitCountCollector();
                 searcher.searcher().search(new MatchAllDocsQuery(), collector);
@@ -744,7 +744,7 @@ public class InternalEngineTests extends EngineTestCase {
                 }
             };
             assertThat(getTranslog(recoveringEngine).stats().getUncommittedOperations(), equalTo(docs));
-            recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
+            recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             assertTrue(committed.get());
         } finally {
             IOUtils.close(recoveringEngine);
@@ -778,7 +778,7 @@ public class InternalEngineTests extends EngineTestCase {
             initialEngine.close();
             trimUnsafeCommits(initialEngine.config());
             recoveringEngine = new InternalEngine(initialEngine.config());
-            recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
+            recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             try (Engine.Searcher searcher = recoveringEngine.acquireSearcher("test")) {
                 TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), docs);
                 assertEquals(docs, topDocs.totalHits.value);
@@ -811,14 +811,14 @@ public class InternalEngineTests extends EngineTestCase {
             }
             trimUnsafeCommits(config);
             try (InternalEngine engine = new InternalEngine(config)) {
-                engine.recoverFromTranslog(Long.MAX_VALUE);
+                engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
                 assertThat(engine.getLocalCheckpoint(), equalTo(maxSeqNo));
                 assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(maxSeqNo));
             }
             trimUnsafeCommits(config);
             try (InternalEngine engine = new InternalEngine(config)) {
                 long upToSeqNo = randomLongBetween(globalCheckpoint.get(), maxSeqNo);
-                engine.recoverFromTranslog(upToSeqNo);
+                engine.recoverFromTranslog(translogHandler, upToSeqNo);
                 assertThat(engine.getLocalCheckpoint(), equalTo(upToSeqNo));
                 assertThat(engine.getLocalCheckpointTracker().getMaxSeqNo(), equalTo(upToSeqNo));
             }
@@ -1202,7 +1202,7 @@ public class InternalEngineTests extends EngineTestCase {
         }
         trimUnsafeCommits(config);
         engine = new InternalEngine(config);
-        engine.recoverFromTranslog(Long.MAX_VALUE);
+        engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
         assertEquals(engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID), syncId);
     }
 
@@ -1221,7 +1221,7 @@ public class InternalEngineTests extends EngineTestCase {
         engine.close();
         trimUnsafeCommits(config);
         engine = new InternalEngine(config);
-        engine.recoverFromTranslog(Long.MAX_VALUE);
+        engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
         assertNull("Sync ID must be gone since we have a document to replay", engine.getLastCommittedSegmentInfos().getUserData().get(Engine.SYNC_COMMIT_ID));
     }
 
@@ -2187,7 +2187,7 @@ public class InternalEngineTests extends EngineTestCase {
 
         trimUnsafeCommits(initialEngine.engineConfig);
         try (InternalEngine recoveringEngine = new InternalEngine(initialEngine.config())){
-            recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
+            recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
 
             assertEquals(primarySeqNo, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
             assertThat(
@@ -2508,7 +2508,7 @@ public class InternalEngineTests extends EngineTestCase {
                 try (InternalEngine engine = createEngine(config)) {
                     engine.index(firstIndexRequest);
                     globalCheckpoint.set(engine.getLocalCheckpoint());
-                    expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(Long.MAX_VALUE));
+                    expectThrows(IllegalStateException.class, () -> engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE));
                     Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
                     assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
                     assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
@@ -2530,7 +2530,7 @@ public class InternalEngineTests extends EngineTestCase {
                             assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
                         }
                         assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
-                        engine.recoverFromTranslog(Long.MAX_VALUE);
+                        engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
                         userData = engine.getLastCommittedSegmentInfos().getUserData();
                         assertEquals("3", userData.get(Translog.TRANSLOG_GENERATION_KEY));
                         assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
@@ -2547,7 +2547,7 @@ public class InternalEngineTests extends EngineTestCase {
                     Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
                     assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
                     assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
-                    engine.recoverFromTranslog(Long.MAX_VALUE);
+                    engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
                     assertEquals(2, engine.getTranslog().currentFileGeneration());
                     assertEquals(0L, engine.getTranslog().stats().getUncommittedOperations());
                 }
@@ -2561,7 +2561,7 @@ public class InternalEngineTests extends EngineTestCase {
                         Map<String, String> userData = engine.getLastCommittedSegmentInfos().getUserData();
                         assertEquals("1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
                         assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
-                        engine.recoverFromTranslog(Long.MAX_VALUE);
+                        engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
                         userData = engine.getLastCommittedSegmentInfos().getUserData();
                         assertEquals("no changes - nothing to commit", "1", userData.get(Translog.TRANSLOG_GENERATION_KEY));
                         assertEquals(engine.getTranslog().getTranslogUUID(), userData.get(Translog.TRANSLOG_UUID_KEY));
@@ -2667,7 +2667,7 @@ public class InternalEngineTests extends EngineTestCase {
                     }
                 }
             }) {
-                engine.recoverFromTranslog(Long.MAX_VALUE);
+                engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
                 final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), SOURCE, null);
                 engine.index(indexForDoc(doc1));
                 globalCheckpoint.set(engine.getLocalCheckpoint());
@@ -2678,7 +2678,7 @@ public class InternalEngineTests extends EngineTestCase {
             try (InternalEngine engine =
                      new InternalEngine(config(indexSettings, store, translogPath, newMergePolicy(), null, null,
                          globalCheckpointSupplier))) {
-                engine.recoverFromTranslog(Long.MAX_VALUE);
+                engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
                 assertVisibleCount(engine, 1);
                 final long committedGen = Long.valueOf(
                     engine.getLastCommittedSegmentInfos().getUserData().get(Translog.TRANSLOG_GENERATION_KEY));
@@ -2739,30 +2739,28 @@ public class InternalEngineTests extends EngineTestCase {
             assertThat(indexResult.getVersion(), equalTo(1L));
         }
         assertVisibleCount(engine, numDocs);
-
-        TranslogHandler parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
-        parser.mappingUpdate = dynamicUpdate();
+        translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
+        translogHandler.mappingUpdate = dynamicUpdate();
 
         engine.close();
         trimUnsafeCommits(copy(engine.config(), inSyncGlobalCheckpointSupplier));
         engine = new InternalEngine(copy(engine.config(), inSyncGlobalCheckpointSupplier)); // we need to reuse the engine config unless the parser.mappingModified won't work
-        engine.recoverFromTranslog(Long.MAX_VALUE);
+        engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
 
         assertVisibleCount(engine, numDocs, false);
-        parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
-        assertEquals(numDocs, parser.appliedOperations());
-        if (parser.mappingUpdate != null) {
-            assertEquals(1, parser.getRecoveredTypes().size());
-            assertTrue(parser.getRecoveredTypes().containsKey("test"));
+        assertEquals(numDocs, translogHandler.appliedOperations());
+        if (translogHandler.mappingUpdate != null) {
+            assertEquals(1, translogHandler.getRecoveredTypes().size());
+            assertTrue(translogHandler.getRecoveredTypes().containsKey("test"));
         } else {
-            assertEquals(0, parser.getRecoveredTypes().size());
+            assertEquals(0, translogHandler.getRecoveredTypes().size());
         }
 
         engine.close();
+        translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
         engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
         assertVisibleCount(engine, numDocs, false);
-        parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
-        assertEquals(0, parser.appliedOperations());
+        assertEquals(0, translogHandler.appliedOperations());
 
         final boolean flush = randomBoolean();
         int randomId = randomIntBetween(numDocs + 1, numDocs + 10);
@@ -2786,13 +2784,13 @@ public class InternalEngineTests extends EngineTestCase {
         }
 
         engine.close();
+        translogHandler = createTranslogHandler(engine.engineConfig.getIndexSettings());
         engine = createEngine(store, primaryTranslogDir, inSyncGlobalCheckpointSupplier);
         try (Engine.Searcher searcher = engine.acquireSearcher("test")) {
             TopDocs topDocs = searcher.searcher().search(new MatchAllDocsQuery(), numDocs + 1);
             assertThat(topDocs.totalHits.value, equalTo(numDocs + 1L));
         }
-        parser = (TranslogHandler) engine.config().getTranslogRecoveryRunner();
-        assertEquals(flush ? 1 : 2, parser.appliedOperations());
+        assertEquals(flush ? 1 : 2, translogHandler.appliedOperations());
         engine.delete(new Engine.Delete("test", Integer.toString(randomId), newUid(doc), primaryTerm.get()));
         if (randomBoolean()) {
             engine.refresh("test");
@@ -2836,7 +2834,7 @@ public class InternalEngineTests extends EngineTestCase {
                 threadPool, config.getIndexSettings(), null, store, newMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
                 new CodecService(null, logger), config.getEventListener(), IndexSearcher.getDefaultQueryCache(),
                 IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5),
-                config.getExternalRefreshListener(), config.getInternalRefreshListener(), null, config.getTranslogRecoveryRunner(),
+                config.getExternalRefreshListener(), config.getInternalRefreshListener(), null,
                 new NoneCircuitBreakerService(), () -> SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm::get, tombstoneDocSupplier());
         try {
             InternalEngine internalEngine = new InternalEngine(brokenConfig);
@@ -3455,7 +3453,7 @@ public class InternalEngineTests extends EngineTestCase {
         }
         try (Store store = createStore(newFSDirectory(storeDir)); Engine engine = new InternalEngine(configSupplier.apply(store))) {
             assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
-            engine.recoverFromTranslog(Long.MAX_VALUE);
+            engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             assertEquals(timestamp1, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
             final ParsedDocument doc = testParsedDocument("1", null, testDocumentWithTextField(),
                 new BytesArray("{}".getBytes(Charset.defaultCharset())), null);
@@ -3738,7 +3736,7 @@ public class InternalEngineTests extends EngineTestCase {
         }
         trimUnsafeCommits(initialEngine.config());
         try (Engine recoveringEngine = new InternalEngine(initialEngine.config())) {
-            recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
+            recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             recoveringEngine.fillSeqNoGaps(2);
             assertThat(recoveringEngine.getLocalCheckpoint(), greaterThanOrEqualTo((long) (docs - 1)));
         }
@@ -3849,7 +3847,7 @@ public class InternalEngineTests extends EngineTestCase {
                     throw new UnsupportedOperationException();
                 }
             };
-            noOpEngine.recoverFromTranslog(Long.MAX_VALUE);
+            noOpEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             final int gapsFilled = noOpEngine.fillSeqNoGaps(primaryTerm.get());
             final String reason = "filling gaps";
             noOpEngine.noOp(new Engine.NoOp(maxSeqNo + 1, primaryTerm.get(), LOCAL_TRANSLOG_RECOVERY, System.nanoTime(), reason));
@@ -4127,7 +4125,7 @@ public class InternalEngineTests extends EngineTestCase {
             trimUnsafeCommits(copy(replicaEngine.config(), globalCheckpoint::get));
             recoveringEngine = new InternalEngine(copy(replicaEngine.config(), globalCheckpoint::get));
             assertEquals(numDocsOnReplica, getTranslog(recoveringEngine).stats().getUncommittedOperations());
-            recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
+            recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
             assertEquals(checkpointOnReplica, recoveringEngine.getLocalCheckpoint());
             assertEquals((maxSeqIDOnReplica + 1) - numDocsOnReplica, recoveringEngine.fillSeqNoGaps(2));
@@ -4163,7 +4161,7 @@ public class InternalEngineTests extends EngineTestCase {
             if (flushed) {
                 assertThat(recoveringEngine.getTranslogStats().getUncommittedOperations(), equalTo(0));
             }
-            recoveringEngine.recoverFromTranslog(Long.MAX_VALUE);
+            recoveringEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             assertEquals(maxSeqIDOnReplica, recoveringEngine.getSeqNoStats(-1).getMaxSeqNo());
             assertEquals(maxSeqIDOnReplica, recoveringEngine.getLocalCheckpoint());
             assertEquals(0, recoveringEngine.fillSeqNoGaps(3));
@@ -4356,7 +4354,7 @@ public class InternalEngineTests extends EngineTestCase {
                     super.commitIndexWriter(writer, translog, syncId);
                 }
             }) {
-            engine.recoverFromTranslog(Long.MAX_VALUE);
+            engine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
             int numDocs = scaledRandomIntBetween(10, 100);
             for (int docId = 0; docId < numDocs; docId++) {
                 ParseContext.Document document = testDocumentWithTextField();

+ 2 - 2
server/src/test/java/org/elasticsearch/index/shard/RefreshListenersTests.java

@@ -131,10 +131,10 @@ public class RefreshListenersTests extends ESTestCase {
             indexSettings, null, store, newMergePolicy(), iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger),
             eventListener, IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
             TimeValue.timeValueMinutes(5), Collections.singletonList(listeners), Collections.emptyList(), null,
-            (e, s) -> 0, new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm,
+            new NoneCircuitBreakerService(), () -> SequenceNumbers.NO_OPS_PERFORMED, () -> primaryTerm,
             EngineTestCase.tombstoneDocSupplier());
         engine = new InternalEngine(config);
-        engine.recoverFromTranslog(Long.MAX_VALUE);
+        engine.recoverFromTranslog((e, s) -> 0, Long.MAX_VALUE);
         listeners.setCurrentRefreshLocationSupplier(engine::getTranslogLastWriteLocation);
     }
 

+ 11 - 7
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -125,6 +125,7 @@ public abstract class EngineTestCase extends ESTestCase {
     protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
 
     protected ThreadPool threadPool;
+    protected TranslogHandler translogHandler;
 
     protected Store store;
     protected Store storeReplica;
@@ -189,6 +190,7 @@ public abstract class EngineTestCase extends ESTestCase {
         Lucene.cleanLuceneIndex(store.directory());
         Lucene.cleanLuceneIndex(storeReplica.directory());
         primaryTranslogDir = createTempDir("translog-primary");
+        translogHandler = createTranslogHandler(defaultSettings);
         engine = createEngine(store, primaryTranslogDir);
         LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
 
@@ -213,7 +215,7 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getWarmer(), config.getStore(), config.getMergePolicy(), config.getAnalyzer(), config.getSimilarity(),
             new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
             config.getTranslogConfig(), config.getFlushMergesAfter(),
-            config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
+            config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
             config.getCircuitBreakerService(), globalCheckpointSupplier, config.getPrimaryTermSupplier(), tombstoneDocSupplier());
     }
 
@@ -222,7 +224,7 @@ public abstract class EngineTestCase extends ESTestCase {
                 config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
                 new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
                 config.getTranslogConfig(), config.getFlushMergesAfter(),
-                config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
+                config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
                 config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(),
                 config.getTombstoneDocSupplier());
     }
@@ -232,7 +234,7 @@ public abstract class EngineTestCase extends ESTestCase {
             config.getWarmer(), config.getStore(), mergePolicy, config.getAnalyzer(), config.getSimilarity(),
             new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
             config.getTranslogConfig(), config.getFlushMergesAfter(),
-            config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(), config.getTranslogRecoveryRunner(),
+            config.getExternalRefreshListener(), Collections.emptyList(), config.getIndexSort(),
             config.getCircuitBreakerService(), config.getGlobalCheckpointSupplier(), config.getPrimaryTermSupplier(),
             config.getTombstoneDocSupplier());
     }
@@ -377,6 +379,10 @@ public abstract class EngineTestCase extends ESTestCase {
             () -> SequenceNumbers.NO_OPS_PERFORMED, primaryTermSupplier);
     }
 
+    protected TranslogHandler createTranslogHandler(IndexSettings indexSettings) {
+        return new TranslogHandler(xContentRegistry(), indexSettings);
+    }
+
     protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
         return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
     }
@@ -478,7 +484,7 @@ public abstract class EngineTestCase extends ESTestCase {
 
         }
         InternalEngine internalEngine = createInternalEngine(indexWriterFactory, localCheckpointTrackerSupplier, seqNoForOperation, config);
-        internalEngine.recoverFromTranslog(Long.MAX_VALUE);
+        internalEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
         return internalEngine;
     }
 
@@ -553,14 +559,12 @@ public abstract class EngineTestCase extends ESTestCase {
                 // we don't need to notify anybody in this test
             }
         };
-        final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(),
-                indexSettings.getSettings()));
         final List<ReferenceManager.RefreshListener> refreshListenerList =
                 refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
         EngineConfig config = new EngineConfig(shardId, allocationId.getId(), threadPool, indexSettings, null, store,
                 mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
                 IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
-                TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort, handler,
+                TimeValue.timeValueMinutes(5), refreshListenerList, Collections.emptyList(), indexSort,
                 new NoneCircuitBreakerService(),
                 globalCheckpointSupplier == null ?
                     new ReplicationTracker(shardId, allocationId.getId(), indexSettings, SequenceNumbers.NO_OPS_PERFORMED, update -> {}) :

+ 1 - 1
test/framework/src/main/java/org/elasticsearch/index/engine/TranslogHandler.java

@@ -46,7 +46,7 @@ import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static org.elasticsearch.index.mapper.SourceToParse.source;
 
-public class TranslogHandler implements EngineConfig.TranslogRecoveryRunner {
+public class TranslogHandler implements Engine.TranslogRecoveryRunner {
 
     private final MapperService mapperService;
     public Mapping mappingUpdate = null;

+ 2 - 4
x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowingEngineTests.java

@@ -44,7 +44,6 @@ import org.elasticsearch.index.translog.TranslogConfig;
 import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
 import org.elasticsearch.test.DummyShardLock;
 import org.elasticsearch.test.ESTestCase;
-import org.elasticsearch.test.IndexSettingsModule;
 import org.elasticsearch.threadpool.TestThreadPool;
 import org.elasticsearch.threadpool.ThreadPool;
 
@@ -254,8 +253,6 @@ public class FollowingEngineTests extends ESTestCase {
                 Collections.emptyList(),
                 Collections.emptyList(),
                 null,
-                new TranslogHandler(
-                        xContentRegistry, IndexSettingsModule.newIndexSettings(shardId.getIndexName(), indexSettings.getSettings())),
                 new NoneCircuitBreakerService(),
                 () -> SequenceNumbers.NO_OPS_PERFORMED,
                 () -> primaryTerm.get(),
@@ -280,7 +277,8 @@ public class FollowingEngineTests extends ESTestCase {
                 SequenceNumbers.NO_OPS_PERFORMED, shardId, 1L);
         store.associateIndexWithNewTranslog(translogUuid);
         FollowingEngine followingEngine = new FollowingEngine(config);
-        followingEngine.recoverFromTranslog(Long.MAX_VALUE);
+        TranslogHandler translogHandler = new TranslogHandler(xContentRegistry(), config.getIndexSettings());
+        followingEngine.recoverFromTranslog(translogHandler, Long.MAX_VALUE);
         return followingEngine;
     }