Browse Source

Die with dignity while merging

If an out of memory error is thrown while merging, today we quietly
rewrap it into a merge exception and the out of memory error is
lost. Instead, we need to rethrow out of memory errors, and in fact any
fatal error here, and let those go uncaught so that the node is torn
down. This commit causes this to be the case.

Relates #27265
Jason Tedor 8 years ago
parent
commit
d5451b2037

+ 1 - 6
core/src/main/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandler.java

@@ -21,7 +21,6 @@ package org.elasticsearch.bootstrap;
 
 
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.message.ParameterizedMessage;
 import org.apache.logging.log4j.message.ParameterizedMessage;
-import org.apache.lucene.index.MergePolicy;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.SuppressForbidden;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.logging.Loggers;
 
 
@@ -68,11 +67,7 @@ class ElasticsearchUncaughtExceptionHandler implements Thread.UncaughtExceptionH
 
 
     // visible for testing
     // visible for testing
     static boolean isFatalUncaught(Throwable e) {
     static boolean isFatalUncaught(Throwable e) {
-        return isFatalCause(e) || (e instanceof MergePolicy.MergeException && isFatalCause(e.getCause()));
-    }
-
-    private static boolean isFatalCause(Throwable cause) {
-        return cause instanceof Error;
+        return e instanceof Error;
     }
     }
 
 
     // visible for testing
     // visible for testing

+ 31 - 14
core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

@@ -1579,23 +1579,15 @@ public class InternalEngine extends Engine {
         }
         }
     }
     }
 
 
-    @SuppressWarnings("finally")
     private boolean failOnTragicEvent(AlreadyClosedException ex) {
     private boolean failOnTragicEvent(AlreadyClosedException ex) {
         final boolean engineFailed;
         final boolean engineFailed;
         // if we are already closed due to some tragic exception
         // if we are already closed due to some tragic exception
         // we need to fail the engine. it might have already been failed before
         // we need to fail the engine. it might have already been failed before
         // but we are double-checking it's failed and closed
         // but we are double-checking it's failed and closed
         if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
         if (indexWriter.isOpen() == false && indexWriter.getTragicException() != null) {
-            if (indexWriter.getTragicException() instanceof Error) {
-                try {
-                    logger.error("tragic event in index writer", ex);
-                } finally {
-                    throw (Error) indexWriter.getTragicException();
-                }
-            } else {
-                failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
-                engineFailed = true;
-            }
+            maybeDie("tragic event in index writer", indexWriter.getTragicException());
+            failEngine("already closed by tragic event on the index writer", (Exception) indexWriter.getTragicException());
+            engineFailed = true;
         } else if (translog.isOpen() == false && translog.getTragicException() != null) {
         } else if (translog.isOpen() == false && translog.getTragicException() != null) {
             failEngine("already closed by tragic event on the translog", translog.getTragicException());
             failEngine("already closed by tragic event on the translog", translog.getTragicException());
             engineFailed = true;
             engineFailed = true;
@@ -1916,7 +1908,6 @@ public class InternalEngine extends Engine {
 
 
         @Override
         @Override
         protected void handleMergeException(final Directory dir, final Throwable exc) {
         protected void handleMergeException(final Directory dir, final Throwable exc) {
-            logger.error("failed to merge", exc);
             engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
             engineConfig.getThreadPool().generic().execute(new AbstractRunnable() {
                 @Override
                 @Override
                 public void onFailure(Exception e) {
                 public void onFailure(Exception e) {
@@ -1925,13 +1916,39 @@ public class InternalEngine extends Engine {
 
 
                 @Override
                 @Override
                 protected void doRun() throws Exception {
                 protected void doRun() throws Exception {
-                    MergePolicy.MergeException e = new MergePolicy.MergeException(exc, dir);
-                    failEngine("merge failed", e);
+                    /*
+                     * We do this on another thread rather than the merge thread that we are initially called on so that we have complete
+                     * confidence that the call stack does not contain catch statements that would cause the error that might be thrown
+                     * here from being caught and never reaching the uncaught exception handler.
+                     */
+                    maybeDie("fatal error while merging", exc);
+                    logger.error("failed to merge", exc);
+                    failEngine("merge failed", new MergePolicy.MergeException(exc, dir));
                 }
                 }
             });
             });
         }
         }
     }
     }
 
 
+    /**
+     * If the specified throwable is a fatal error, this throwable will be thrown. Callers should ensure that there are no catch statements
+     * that would catch an error in the stack as the fatal error here should go uncaught and be handled by the uncaught exception handler
+     * that we install during bootstrap. If the specified throwable is indeed a fatal error, the specified message will attempt to be logged
+     * before throwing the fatal error. If the specified throwable is not a fatal error, this method is a no-op.
+     *
+     * @param maybeMessage the message to maybe log
+     * @param maybeFatal the throwable that is maybe fatal
+     */
+    @SuppressWarnings("finally")
+    private void maybeDie(final String maybeMessage, final Throwable maybeFatal) {
+        if (maybeFatal instanceof Error) {
+            try {
+                logger.error(maybeMessage, maybeFatal);
+            } finally {
+                throw (Error) maybeFatal;
+            }
+        }
+    }
+
     /**
     /**
      * Commits the specified index writer.
      * Commits the specified index writer.
      *
      *

+ 0 - 2
core/src/test/java/org/elasticsearch/bootstrap/ElasticsearchUncaughtExceptionHandlerTests.java

@@ -19,7 +19,6 @@
 
 
 package org.elasticsearch.bootstrap;
 package org.elasticsearch.bootstrap;
 
 
-import org.apache.lucene.index.MergePolicy;
 import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.ESTestCase;
 import org.junit.Before;
 import org.junit.Before;
 
 
@@ -131,7 +130,6 @@ public class ElasticsearchUncaughtExceptionHandlerTests extends ESTestCase {
     }
     }
 
 
     public void testIsFatalCause() {
     public void testIsFatalCause() {
-        assertFatal(new MergePolicy.MergeException(new OutOfMemoryError(), null));
         assertFatal(new OutOfMemoryError());
         assertFatal(new OutOfMemoryError());
         assertFatal(new StackOverflowError());
         assertFatal(new StackOverflowError());
         assertFatal(new InternalError());
         assertFatal(new InternalError());

+ 1 - 1
core/src/test/java/org/elasticsearch/index/engine/CombinedDeletionPolicyTests.java

@@ -30,7 +30,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
 import java.util.List;
 import java.util.List;
 
 
-import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
+import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verify;

+ 2 - 346
core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

@@ -26,8 +26,6 @@ import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.LogEvent;
 import org.apache.logging.log4j.core.LogEvent;
 import org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.logging.log4j.core.appender.AbstractAppender;
 import org.apache.logging.log4j.core.filter.RegexFilter;
 import org.apache.logging.log4j.core.filter.RegexFilter;
-import org.apache.lucene.analysis.Analyzer;
-import org.apache.lucene.codecs.Codec;
 import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
 import org.apache.lucene.codecs.lucene50.Lucene50StoredFieldsFormat;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.Field;
 import org.apache.lucene.document.LongPoint;
 import org.apache.lucene.document.LongPoint;
@@ -45,7 +43,6 @@ import org.apache.lucene.index.LeafReaderContext;
 import org.apache.lucene.index.LiveIndexWriterConfig;
 import org.apache.lucene.index.LiveIndexWriterConfig;
 import org.apache.lucene.index.LogByteSizeMergePolicy;
 import org.apache.lucene.index.LogByteSizeMergePolicy;
 import org.apache.lucene.index.LogDocMergePolicy;
 import org.apache.lucene.index.LogDocMergePolicy;
-import org.apache.lucene.index.MergePolicy;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.NoMergePolicy;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.NumericDocValues;
 import org.apache.lucene.index.PointValues;
 import org.apache.lucene.index.PointValues;
@@ -72,19 +69,16 @@ import org.elasticsearch.Version;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.action.support.TransportActions;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
 import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.routing.AllocationId;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRouting;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.ShardRoutingState;
 import org.elasticsearch.cluster.routing.TestShardRouting;
 import org.elasticsearch.cluster.routing.TestShardRouting;
-import org.elasticsearch.common.Nullable;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.Randomness;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.Strings;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesArray;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.logging.Loggers;
 import org.elasticsearch.common.logging.Loggers;
-import org.elasticsearch.common.lucene.Lucene;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.lucene.uid.Versions;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
 import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver;
@@ -95,7 +89,6 @@ import org.elasticsearch.common.util.BigArrays;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.AbstractRunnable;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
 import org.elasticsearch.common.xcontent.XContentType;
 import org.elasticsearch.common.xcontent.XContentType;
-import org.elasticsearch.index.Index;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.IndexSettings;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.VersionType;
 import org.elasticsearch.index.codec.CodecService;
 import org.elasticsearch.index.codec.CodecService;
@@ -119,20 +112,13 @@ import org.elasticsearch.index.seqno.SequenceNumbersService;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
 import org.elasticsearch.index.shard.IndexSearcherWrapper;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardId;
 import org.elasticsearch.index.shard.ShardUtils;
 import org.elasticsearch.index.shard.ShardUtils;
-import org.elasticsearch.index.store.DirectoryService;
 import org.elasticsearch.index.store.DirectoryUtils;
 import org.elasticsearch.index.store.DirectoryUtils;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.store.Store;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.Translog;
 import org.elasticsearch.index.translog.TranslogConfig;
 import org.elasticsearch.index.translog.TranslogConfig;
-import org.elasticsearch.test.DummyShardLock;
-import org.elasticsearch.test.ESTestCase;
 import org.elasticsearch.test.IndexSettingsModule;
 import org.elasticsearch.test.IndexSettingsModule;
-import org.elasticsearch.threadpool.TestThreadPool;
-import org.elasticsearch.threadpool.ThreadPool;
 import org.hamcrest.MatcherAssert;
 import org.hamcrest.MatcherAssert;
 import org.hamcrest.Matchers;
 import org.hamcrest.Matchers;
-import org.junit.After;
-import org.junit.Before;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.io.UncheckedIOException;
@@ -166,14 +152,13 @@ import java.util.function.ToLongBiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 import java.util.stream.LongStream;
 
 
-import static java.util.Collections.emptyList;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.emptyMap;
 import static java.util.Collections.shuffle;
 import static java.util.Collections.shuffle;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.LOCAL_TRANSLOG_RECOVERY;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.PEER_RECOVERY;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.PRIMARY;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
 import static org.elasticsearch.index.engine.Engine.Operation.Origin.REPLICA;
-import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
+import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.everyItem;
 import static org.hamcrest.Matchers.everyItem;
@@ -185,313 +170,7 @@ import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.nullValue;
 
 
-public class InternalEngineTests extends ESTestCase {
-
-    protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0);
-    protected final AllocationId allocationId = AllocationId.newInitializing();
-    private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
-
-    protected ThreadPool threadPool;
-
-    private Store store;
-    private Store storeReplica;
-
-    protected InternalEngine engine;
-    protected InternalEngine replicaEngine;
-
-    private IndexSettings defaultSettings;
-    private String codecName;
-    private Path primaryTranslogDir;
-    private Path replicaTranslogDir;
-
-    @Override
-    @Before
-    public void setUp() throws Exception {
-        super.setUp();
-
-        CodecService codecService = new CodecService(null, logger);
-        String name = Codec.getDefault().getName();
-        if (Arrays.asList(codecService.availableCodecs()).contains(name)) {
-            // some codecs are read only so we only take the ones that we have in the service and randomly
-            // selected by lucene test case.
-            codecName = name;
-        } else {
-            codecName = "default";
-        }
-        defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
-            .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
-            .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName)
-            .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
-            .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(),
-                between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY)))
-            .build()); // TODO randomize more settings
-        threadPool = new TestThreadPool(getClass().getName());
-        store = createStore();
-        storeReplica = createStore();
-        Lucene.cleanLuceneIndex(store.directory());
-        Lucene.cleanLuceneIndex(storeReplica.directory());
-        primaryTranslogDir = createTempDir("translog-primary");
-        engine = createEngine(store, primaryTranslogDir);
-        LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
-
-        assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
-        assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
-        if (randomBoolean()) {
-            engine.config().setEnableGcDeletes(false);
-        }
-        replicaTranslogDir = createTempDir("translog-replica");
-        replicaEngine = createEngine(storeReplica, replicaTranslogDir);
-        currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig();
-
-        assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName());
-        assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
-        if (randomBoolean()) {
-            engine.config().setEnableGcDeletes(false);
-        }
-    }
-
-    public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) {
-        return copy(config, openMode, config.getAnalyzer());
-    }
-
-    public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
-        return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
-                config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
-                new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
-                config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
-                config.getIndexSort(), config.getTranslogRecoveryRunner());
-    }
-
-    @Override
-    @After
-    public void tearDown() throws Exception {
-        super.tearDown();
-        if (engine != null && engine.isClosed.get() == false) {
-            engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
-        }
-        if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
-            replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
-        }
-        IOUtils.close(
-            replicaEngine, storeReplica,
-            engine, store);
-        terminate(threadPool);
-    }
-
-
-    private static Document testDocumentWithTextField() {
-        return testDocumentWithTextField("test");
-    }
-
-    private static Document testDocumentWithTextField(String value) {
-        Document document = testDocument();
-        document.add(new TextField("value", value, Field.Store.YES));
-        return document;
-    }
-
-
-    private static Document testDocument() {
-        return new Document();
-    }
-
-    public static ParsedDocument createParsedDoc(String id, String routing) {
-        return testParsedDocument(id, routing, testDocumentWithTextField(), new BytesArray("{ \"value\" : \"test\" }"), null);
-    }
-
-    private static ParsedDocument testParsedDocument(String id, String routing, Document document, BytesReference source, Mapping mappingUpdate) {
-        Field uidField = new Field("_id", Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE);
-        Field versionField = new NumericDocValuesField("_version", 0);
-        SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
-        document.add(uidField);
-        document.add(versionField);
-        document.add(seqID.seqNo);
-        document.add(seqID.seqNoDocValue);
-        document.add(seqID.primaryTerm);
-        BytesRef ref = source.toBytesRef();
-        document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length));
-        return new ParsedDocument(versionField, seqID, id, "test", routing, Arrays.asList(document), source, XContentType.JSON,
-            mappingUpdate);
-    }
-
-    protected Store createStore() throws IOException {
-        return createStore(newDirectory());
-    }
-
-    protected Store createStore(final Directory directory) throws IOException {
-        return createStore(INDEX_SETTINGS, directory);
-    }
-
-    protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException {
-        final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
-            @Override
-            public Directory newDirectory() throws IOException {
-                return directory;
-            }
-        };
-        return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
-    }
-
-    protected Translog createTranslog() throws IOException {
-        return createTranslog(primaryTranslogDir);
-    }
-
-    protected Translog createTranslog(Path translogPath) throws IOException {
-        TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
-        return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
-    }
-
-    protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
-        return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
-    }
-
-    protected InternalEngine createEngine(Store store,
-                                          Path translogPath,
-                                          BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
-        return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier);
-    }
-
-    protected InternalEngine createEngine(Store store,
-                                          Path translogPath,
-                                          BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
-                                          ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation) throws IOException {
-        return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null);
-    }
-
-    protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
-        return createEngine(indexSettings, store, translogPath, mergePolicy, null);
-
-    }
-
-    protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
-                                          @Nullable IndexWriterFactory indexWriterFactory) throws IOException {
-        return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null);
-    }
-
-    protected InternalEngine createEngine(
-        IndexSettings indexSettings,
-        Store store,
-        Path translogPath,
-        MergePolicy mergePolicy,
-        @Nullable IndexWriterFactory indexWriterFactory,
-        @Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
-        return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null);
-    }
-
-    protected InternalEngine createEngine(
-            IndexSettings indexSettings,
-            Store store,
-            Path translogPath,
-            MergePolicy mergePolicy,
-            @Nullable IndexWriterFactory indexWriterFactory,
-            @Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
-            @Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation) throws IOException {
-        return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, null);
-    }
-
-    protected InternalEngine createEngine(
-        IndexSettings indexSettings,
-        Store store,
-        Path translogPath,
-        MergePolicy mergePolicy,
-        @Nullable IndexWriterFactory indexWriterFactory,
-        @Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
-        @Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
-        @Nullable Sort indexSort) throws IOException {
-        EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort);
-        InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config);
-        if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
-            internalEngine.recoverFromTranslog();
-        }
-        return internalEngine;
-    }
-
-    @FunctionalInterface
-    public interface IndexWriterFactory {
-
-        IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException;
-    }
-
-    public static InternalEngine createInternalEngine(@Nullable final IndexWriterFactory indexWriterFactory,
-                                                      @Nullable final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
-                                                      @Nullable final ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
-                                                      final EngineConfig config) {
-        if (sequenceNumbersServiceSupplier == null) {
-            return new InternalEngine(config) {
-                @Override
-                IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
-                    return (indexWriterFactory != null) ?
-                            indexWriterFactory.createWriter(directory, iwc) :
-                            super.createWriter(directory, iwc);
-                }
-
-                @Override
-                protected long doGenerateSeqNoForOperation(final Operation operation) {
-                    return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation);
-                }
-            };
-        } else {
-            return new InternalEngine(config, sequenceNumbersServiceSupplier) {
-                @Override
-                IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
-                    return (indexWriterFactory != null) ?
-                            indexWriterFactory.createWriter(directory, iwc) :
-                            super.createWriter(directory, iwc);
-                }
-
-                @Override
-                protected long doGenerateSeqNoForOperation(final Operation operation) {
-                    return seqNoForOperation != null ? seqNoForOperation.applyAsLong(this, operation) : super.doGenerateSeqNoForOperation(operation);
-                }
-            };
-        }
-
-    }
-
-    public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
-                               ReferenceManager.RefreshListener refreshListener) {
-        return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null);
-    }
-
-    public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
-                               ReferenceManager.RefreshListener refreshListener, Sort indexSort) {
-        IndexWriterConfig iwc = newIndexWriterConfig();
-        TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
-        final EngineConfig.OpenMode openMode;
-        try {
-            if (Lucene.indexExists(store.directory()) == false) {
-                openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
-            } else {
-                openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
-            }
-        } catch (IOException e) {
-            throw new ElasticsearchException("can't find index?", e);
-        }
-        Engine.EventListener listener = new Engine.EventListener() {
-            @Override
-            public void onFailedEngine(String reason, @Nullable Exception e) {
-                // we don't need to notify anybody in this test
-            }
-        };
-        final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(),
-            indexSettings.getSettings()));
-        final List<ReferenceManager.RefreshListener> refreshListenerList =
-            refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
-        EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
-            mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
-            IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
-            TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);
-
-        return config;
-    }
-
-    private static final BytesReference B_1 = new BytesArray(new byte[]{1});
-    private static final BytesReference B_2 = new BytesArray(new byte[]{2});
-    private static final BytesReference B_3 = new BytesArray(new byte[]{3});
-    private static final BytesArray SOURCE = bytesArray("{}");
-
-    private static BytesArray bytesArray(String string) {
-        return new BytesArray(string.getBytes(Charset.defaultCharset()));
-    }
+public class InternalEngineTests extends EngineTestCase {
 
 
     public void testSegments() throws Exception {
     public void testSegments() throws Exception {
         try (Store store = createStore();
         try (Store store = createStore();
@@ -2487,29 +2166,6 @@ public class InternalEngineTests extends ESTestCase {
         }
         }
     }
     }
 
 
-    protected Term newUid(String id) {
-        return new Term("_id", Uid.encodeId(id));
-    }
-
-    protected Term newUid(ParsedDocument doc) {
-        return newUid(doc.id());
-    }
-
-    protected Engine.Get newGet(boolean realtime, ParsedDocument doc) {
-        return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc));
-    }
-
-    private Engine.Index indexForDoc(ParsedDocument doc) {
-        return new Engine.Index(newUid(doc), doc);
-    }
-
-    private Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo,
-                                            boolean isRetry) {
-        return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL,
-            Engine.Operation.Origin.REPLICA, System.nanoTime(),
-            IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
-    }
-
     public void testExtractShardId() {
     public void testExtractShardId() {
         try (Engine.Searcher test = this.engine.acquireSearcher("test")) {
         try (Engine.Searcher test = this.engine.acquireSearcher("test")) {
             ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader());
             ShardId shardId = ShardUtils.extractShardId(test.getDirectoryReader());

+ 0 - 12
core/src/test/java/org/elasticsearch/index/translog/TranslogDeletionPolicyTests.java

@@ -43,18 +43,6 @@ import static org.hamcrest.Matchers.equalTo;
 
 
 public class TranslogDeletionPolicyTests extends ESTestCase {
 public class TranslogDeletionPolicyTests extends ESTestCase {
 
 
-    public static TranslogDeletionPolicy createTranslogDeletionPolicy() {
-        return new TranslogDeletionPolicy(
-            IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
-            IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis()
-        );
-    }
-
-    public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) {
-        return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(),
-            indexSettings.getTranslogRetentionAge().getMillis());
-    }
-
     public void testNoRetention() throws IOException {
     public void testNoRetention() throws IOException {
         long now = System.currentTimeMillis();
         long now = System.currentTimeMillis();
         Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);
         Tuple<List<TranslogReader>, TranslogWriter> readersAndWriter = createReadersAndWriter(now);

+ 1 - 1
core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java

@@ -107,7 +107,7 @@ import java.util.stream.LongStream;
 
 
 import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
 import static com.carrotsearch.randomizedtesting.RandomizedTest.randomLongBetween;
 import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
 import static org.elasticsearch.common.util.BigArrays.NON_RECYCLING_INSTANCE;
-import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
+import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;

+ 1 - 1
core/src/test/java/org/elasticsearch/indices/recovery/RecoveryTests.java

@@ -44,7 +44,7 @@ import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
 
 
-import static org.elasticsearch.index.translog.TranslogDeletionPolicyTests.createTranslogDeletionPolicy;
+import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.not;

+ 107 - 0
qa/evil-tests/src/test/java/org/elasticsearch/index/engine/EvilInternalEngineTests.java

@@ -0,0 +1,107 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.engine;
+
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.SegmentCommitInfo;
+import org.elasticsearch.index.mapper.ParsedDocument;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.hasToString;
+import static org.hamcrest.Matchers.instanceOf;
+
+public class EvilInternalEngineTests extends EngineTestCase {
+
+    public void testOutOfMemoryErrorWhileMergingIsRethrownAndIsUncaught() throws IOException, InterruptedException {
+        engine.close();
+        final AtomicReference<Throwable> maybeFatal = new AtomicReference<>();
+        final CountDownLatch latch = new CountDownLatch(1);
+        final Thread.UncaughtExceptionHandler uncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler();
+        try {
+            /*
+             * We want to test that the out of memory error thrown from the merge goes uncaught; this gives us confidence that an out of
+             * memory error thrown while merging will lead to the node being torn down.
+             */
+            Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
+                maybeFatal.set(e);
+                latch.countDown();
+            });
+            final AtomicReference<List<SegmentCommitInfo>> segmentsReference = new AtomicReference<>();
+
+            try (Engine e = createEngine(
+                    defaultSettings,
+                    store,
+                    primaryTranslogDir,
+                    newMergePolicy(),
+                    (directory, iwc) -> new IndexWriter(directory, iwc) {
+                        @Override
+                        public void merge(final MergePolicy.OneMerge merge) throws IOException {
+                            throw new OutOfMemoryError("640K ought to be enough for anybody");
+                        }
+
+                        @Override
+                        public synchronized MergePolicy.OneMerge getNextMerge() {
+                            /*
+                             * This will be called when we flush when we will not be ready to return the segments. After the segments are on
+                             * disk, we can only return them from here once or the merge scheduler will be stuck in a loop repeatedly
+                             * peeling off the same segments to schedule for merging.
+                             */
+                            if (segmentsReference.get() == null) {
+                                return super.getNextMerge();
+                            } else {
+                                final List<SegmentCommitInfo> segments = segmentsReference.getAndSet(null);
+                                return new MergePolicy.OneMerge(segments);
+                            }
+                        }
+                    },
+                    null)) {
+                // force segments to exist on disk
+                final ParsedDocument doc1 = testParsedDocument("1", null, testDocumentWithTextField(), B_1, null);
+                e.index(indexForDoc(doc1));
+                e.flush();
+                final List<SegmentCommitInfo> segments =
+                        StreamSupport.stream(e.getLastCommittedSegmentInfos().spliterator(), false).collect(Collectors.toList());
+                segmentsReference.set(segments);
+                // trigger a background merge that will be managed by the concurrent merge scheduler
+                e.forceMerge(randomBoolean(), 0, false, false, false);
+                /*
+                 * Merging happens in the background on a merge thread, and the maybeDie handler is invoked on yet another thread; we have
+                 * to wait for these events to finish.
+                 */
+                latch.await();
+                assertNotNull(maybeFatal.get());
+                assertThat(maybeFatal.get(), instanceOf(OutOfMemoryError.class));
+                assertThat(maybeFatal.get(), hasToString(containsString("640K ought to be enough for anybody")));
+            }
+        } finally {
+            Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler);
+        }
+    }
+
+
+}

+ 441 - 0
test/framework/src/main/java/org/elasticsearch/index/engine/EngineTestCase.java

@@ -0,0 +1,441 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.engine;
+
+import org.apache.lucene.analysis.Analyzer;
+import org.apache.lucene.codecs.Codec;
+import org.apache.lucene.document.Field;
+import org.apache.lucene.document.NumericDocValuesField;
+import org.apache.lucene.document.StoredField;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.LiveIndexWriterConfig;
+import org.apache.lucene.index.MergePolicy;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.ReferenceManager;
+import org.apache.lucene.search.Sort;
+import org.apache.lucene.store.Directory;
+import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.IOUtils;
+import org.elasticsearch.ElasticsearchException;
+import org.elasticsearch.Version;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.routing.AllocationId;
+import org.elasticsearch.common.Nullable;
+import org.elasticsearch.common.bytes.BytesArray;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.lucene.Lucene;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.BigArrays;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.index.IndexSettings;
+import org.elasticsearch.index.VersionType;
+import org.elasticsearch.index.codec.CodecService;
+import org.elasticsearch.index.mapper.IdFieldMapper;
+import org.elasticsearch.index.mapper.Mapping;
+import org.elasticsearch.index.mapper.ParseContext;
+import org.elasticsearch.index.mapper.ParsedDocument;
+import org.elasticsearch.index.mapper.SeqNoFieldMapper;
+import org.elasticsearch.index.mapper.SourceFieldMapper;
+import org.elasticsearch.index.mapper.Uid;
+import org.elasticsearch.index.seqno.SeqNoStats;
+import org.elasticsearch.index.seqno.SequenceNumbers;
+import org.elasticsearch.index.seqno.SequenceNumbersService;
+import org.elasticsearch.index.shard.ShardId;
+import org.elasticsearch.index.store.DirectoryService;
+import org.elasticsearch.index.store.Store;
+import org.elasticsearch.index.translog.Translog;
+import org.elasticsearch.index.translog.TranslogConfig;
+import org.elasticsearch.test.DummyShardLock;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.IndexSettingsModule;
+import org.elasticsearch.threadpool.TestThreadPool;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.BiFunction;
+import java.util.function.ToLongBiFunction;
+
+import static java.util.Collections.emptyList;
+import static org.elasticsearch.index.translog.TranslogDeletionPolicies.createTranslogDeletionPolicy;
+
+public abstract class EngineTestCase extends ESTestCase {
+
+    protected final ShardId shardId = new ShardId(new Index("index", "_na_"), 0);
+    protected final AllocationId allocationId = AllocationId.newInitializing();
+    protected static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("index", Settings.EMPTY);
+
+    protected ThreadPool threadPool;
+
+    protected Store store;
+    protected Store storeReplica;
+
+    protected InternalEngine engine;
+    protected InternalEngine replicaEngine;
+
+    protected IndexSettings defaultSettings;
+    protected String codecName;
+    protected Path primaryTranslogDir;
+    protected Path replicaTranslogDir;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        CodecService codecService = new CodecService(null, logger);
+        String name = Codec.getDefault().getName();
+        if (Arrays.asList(codecService.availableCodecs()).contains(name)) {
+            // some codecs are read only so we only take the ones that we have in the service and randomly
+            // selected by lucene test case.
+            codecName = name;
+        } else {
+            codecName = "default";
+        }
+        defaultSettings = IndexSettingsModule.newIndexSettings("test", Settings.builder()
+                .put(IndexSettings.INDEX_GC_DELETES_SETTING.getKey(), "1h") // make sure this doesn't kick in on us
+                .put(EngineConfig.INDEX_CODEC_SETTING.getKey(), codecName)
+                .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
+                .put(IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.getKey(),
+                        between(10, 10 * IndexSettings.MAX_REFRESH_LISTENERS_PER_SHARD.get(Settings.EMPTY)))
+                .build()); // TODO randomize more settings
+        threadPool = new TestThreadPool(getClass().getName());
+        store = createStore();
+        storeReplica = createStore();
+        Lucene.cleanLuceneIndex(store.directory());
+        Lucene.cleanLuceneIndex(storeReplica.directory());
+        primaryTranslogDir = createTempDir("translog-primary");
+        engine = createEngine(store, primaryTranslogDir);
+        LiveIndexWriterConfig currentIndexWriterConfig = engine.getCurrentIndexWriterConfig();
+
+        assertEquals(engine.config().getCodec().getName(), codecService.codec(codecName).getName());
+        assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
+        if (randomBoolean()) {
+            engine.config().setEnableGcDeletes(false);
+        }
+        replicaTranslogDir = createTempDir("translog-replica");
+        replicaEngine = createEngine(storeReplica, replicaTranslogDir);
+        currentIndexWriterConfig = replicaEngine.getCurrentIndexWriterConfig();
+
+        assertEquals(replicaEngine.config().getCodec().getName(), codecService.codec(codecName).getName());
+        assertEquals(currentIndexWriterConfig.getCodec().getName(), codecService.codec(codecName).getName());
+        if (randomBoolean()) {
+            engine.config().setEnableGcDeletes(false);
+        }
+    }
+
+    public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode) {
+        return copy(config, openMode, config.getAnalyzer());
+    }
+
+    public EngineConfig copy(EngineConfig config, EngineConfig.OpenMode openMode, Analyzer analyzer) {
+        return new EngineConfig(openMode, config.getShardId(), config.getAllocationId(), config.getThreadPool(), config.getIndexSettings(),
+                config.getWarmer(), config.getStore(), config.getMergePolicy(), analyzer, config.getSimilarity(),
+                new CodecService(null, logger), config.getEventListener(), config.getQueryCache(), config.getQueryCachingPolicy(),
+                config.getForceNewHistoryUUID(), config.getTranslogConfig(), config.getFlushMergesAfter(), config.getRefreshListeners(),
+                config.getIndexSort(), config.getTranslogRecoveryRunner());
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        super.tearDown();
+        if (engine != null && engine.isClosed.get() == false) {
+            engine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
+        }
+        if (replicaEngine != null && replicaEngine.isClosed.get() == false) {
+            replicaEngine.getTranslog().getDeletionPolicy().assertNoOpenTranslogRefs();
+        }
+        IOUtils.close(
+                replicaEngine, storeReplica,
+                engine, store);
+        terminate(threadPool);
+    }
+
+
+    protected static ParseContext.Document testDocumentWithTextField() {
+        return testDocumentWithTextField("test");
+    }
+
+    protected static ParseContext.Document testDocumentWithTextField(String value) {
+        ParseContext.Document document = testDocument();
+        document.add(new TextField("value", value, Field.Store.YES));
+        return document;
+    }
+
+
+    protected static ParseContext.Document testDocument() {
+        return new ParseContext.Document();
+    }
+
+    public static ParsedDocument createParsedDoc(String id, String routing) {
+        return testParsedDocument(id, routing, testDocumentWithTextField(), new BytesArray("{ \"value\" : \"test\" }"), null);
+    }
+
+    protected static ParsedDocument testParsedDocument(
+            String id, String routing, ParseContext.Document document, BytesReference source, Mapping mappingUpdate) {
+        Field uidField = new Field("_id", Uid.encodeId(id), IdFieldMapper.Defaults.FIELD_TYPE);
+        Field versionField = new NumericDocValuesField("_version", 0);
+        SeqNoFieldMapper.SequenceIDFields seqID = SeqNoFieldMapper.SequenceIDFields.emptySeqID();
+        document.add(uidField);
+        document.add(versionField);
+        document.add(seqID.seqNo);
+        document.add(seqID.seqNoDocValue);
+        document.add(seqID.primaryTerm);
+        BytesRef ref = source.toBytesRef();
+        document.add(new StoredField(SourceFieldMapper.NAME, ref.bytes, ref.offset, ref.length));
+        return new ParsedDocument(versionField, seqID, id, "test", routing, Arrays.asList(document), source, XContentType.JSON,
+                mappingUpdate);
+    }
+
+    protected Store createStore() throws IOException {
+        return createStore(newDirectory());
+    }
+
+    protected Store createStore(final Directory directory) throws IOException {
+        return createStore(INDEX_SETTINGS, directory);
+    }
+
+    protected Store createStore(final IndexSettings indexSettings, final Directory directory) throws IOException {
+        final DirectoryService directoryService = new DirectoryService(shardId, indexSettings) {
+            @Override
+            public Directory newDirectory() throws IOException {
+                return directory;
+            }
+        };
+        return new Store(shardId, indexSettings, directoryService, new DummyShardLock(shardId));
+    }
+
+    protected Translog createTranslog() throws IOException {
+        return createTranslog(primaryTranslogDir);
+    }
+
+    protected Translog createTranslog(Path translogPath) throws IOException {
+        TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, INDEX_SETTINGS, BigArrays.NON_RECYCLING_INSTANCE);
+        return new Translog(translogConfig, null, createTranslogDeletionPolicy(INDEX_SETTINGS), () -> SequenceNumbers.UNASSIGNED_SEQ_NO);
+    }
+
+    protected InternalEngine createEngine(Store store, Path translogPath) throws IOException {
+        return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null);
+    }
+
+    protected InternalEngine createEngine(
+            Store store,
+            Path translogPath,
+            BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
+        return createEngine(defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier);
+    }
+
+    protected InternalEngine createEngine(
+            Store store,
+            Path translogPath,
+            BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
+            ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation) throws IOException {
+        return createEngine(
+                defaultSettings, store, translogPath, newMergePolicy(), null, sequenceNumbersServiceSupplier, seqNoForOperation, null);
+    }
+
+    protected InternalEngine createEngine(
+            IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
+        return createEngine(indexSettings, store, translogPath, mergePolicy, null);
+
+    }
+
+    protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
+                                          @Nullable IndexWriterFactory indexWriterFactory) throws IOException {
+        return createEngine(indexSettings, store, translogPath, mergePolicy, indexWriterFactory, null);
+    }
+
+    protected InternalEngine createEngine(
+            IndexSettings indexSettings,
+            Store store,
+            Path translogPath,
+            MergePolicy mergePolicy,
+            @Nullable IndexWriterFactory indexWriterFactory,
+            @Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier) throws IOException {
+        return createEngine(
+                indexSettings, store, translogPath, mergePolicy, indexWriterFactory, sequenceNumbersServiceSupplier, null, null);
+    }
+
+    protected InternalEngine createEngine(
+            IndexSettings indexSettings,
+            Store store,
+            Path translogPath,
+            MergePolicy mergePolicy,
+            @Nullable IndexWriterFactory indexWriterFactory,
+            @Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
+            @Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation) throws IOException {
+        return createEngine(
+                indexSettings,
+                store,
+                translogPath,
+                mergePolicy,
+                indexWriterFactory,
+                sequenceNumbersServiceSupplier,
+                seqNoForOperation,
+                null);
+    }
+
+    protected InternalEngine createEngine(
+            IndexSettings indexSettings,
+            Store store,
+            Path translogPath,
+            MergePolicy mergePolicy,
+            @Nullable IndexWriterFactory indexWriterFactory,
+            @Nullable BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
+            @Nullable ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
+            @Nullable Sort indexSort) throws IOException {
+        EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, null, indexSort);
+        InternalEngine internalEngine = createInternalEngine(indexWriterFactory, sequenceNumbersServiceSupplier, seqNoForOperation, config);
+        if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
+            internalEngine.recoverFromTranslog();
+        }
+        return internalEngine;
+    }
+
+    @FunctionalInterface
+    public interface IndexWriterFactory {
+
+        IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException;
+    }
+
+    public static InternalEngine createInternalEngine(
+            @Nullable final IndexWriterFactory indexWriterFactory,
+            @Nullable final BiFunction<EngineConfig, SeqNoStats, SequenceNumbersService> sequenceNumbersServiceSupplier,
+            @Nullable final ToLongBiFunction<Engine, Engine.Operation> seqNoForOperation,
+            final EngineConfig config) {
+        if (sequenceNumbersServiceSupplier == null) {
+            return new InternalEngine(config) {
+                @Override
+                IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
+                    return (indexWriterFactory != null) ?
+                            indexWriterFactory.createWriter(directory, iwc) :
+                            super.createWriter(directory, iwc);
+                }
+
+                @Override
+                protected long doGenerateSeqNoForOperation(final Operation operation) {
+                    return seqNoForOperation != null
+                            ? seqNoForOperation.applyAsLong(this, operation)
+                            : super.doGenerateSeqNoForOperation(operation);
+                }
+            };
+        } else {
+            return new InternalEngine(config, sequenceNumbersServiceSupplier) {
+                @Override
+                IndexWriter createWriter(Directory directory, IndexWriterConfig iwc) throws IOException {
+                    return (indexWriterFactory != null) ?
+                            indexWriterFactory.createWriter(directory, iwc) :
+                            super.createWriter(directory, iwc);
+                }
+
+                @Override
+                protected long doGenerateSeqNoForOperation(final Operation operation) {
+                    return seqNoForOperation != null
+                            ? seqNoForOperation.applyAsLong(this, operation)
+                            : super.doGenerateSeqNoForOperation(operation);
+                }
+            };
+        }
+
+    }
+
+    public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
+                               ReferenceManager.RefreshListener refreshListener) {
+        return config(indexSettings, store, translogPath, mergePolicy, refreshListener, null);
+    }
+
+    public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
+                               ReferenceManager.RefreshListener refreshListener, Sort indexSort) {
+        IndexWriterConfig iwc = newIndexWriterConfig();
+        TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
+        final EngineConfig.OpenMode openMode;
+        try {
+            if (Lucene.indexExists(store.directory()) == false) {
+                openMode = EngineConfig.OpenMode.CREATE_INDEX_AND_TRANSLOG;
+            } else {
+                openMode = EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG;
+            }
+        } catch (IOException e) {
+            throw new ElasticsearchException("can't find index?", e);
+        }
+        Engine.EventListener listener = new Engine.EventListener() {
+            @Override
+            public void onFailedEngine(String reason, @Nullable Exception e) {
+                // we don't need to notify anybody in this test
+            }
+        };
+        final TranslogHandler handler = new TranslogHandler(xContentRegistry(), IndexSettingsModule.newIndexSettings(shardId.getIndexName(),
+                indexSettings.getSettings()));
+        final List<ReferenceManager.RefreshListener> refreshListenerList =
+                refreshListener == null ? emptyList() : Collections.singletonList(refreshListener);
+        EngineConfig config = new EngineConfig(openMode, shardId, allocationId.getId(), threadPool, indexSettings, null, store,
+                mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
+                IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), false, translogConfig,
+                TimeValue.timeValueMinutes(5), refreshListenerList, indexSort, handler);
+
+        return config;
+    }
+
+    protected static final BytesReference B_1 = new BytesArray(new byte[]{1});
+    protected static final BytesReference B_2 = new BytesArray(new byte[]{2});
+    protected static final BytesReference B_3 = new BytesArray(new byte[]{3});
+    protected static final BytesArray SOURCE = bytesArray("{}");
+
+    protected static BytesArray bytesArray(String string) {
+        return new BytesArray(string.getBytes(Charset.defaultCharset()));
+    }
+
+    protected Term newUid(String id) {
+        return new Term("_id", Uid.encodeId(id));
+    }
+
+    protected Term newUid(ParsedDocument doc) {
+        return newUid(doc.id());
+    }
+
+    protected Engine.Get newGet(boolean realtime, ParsedDocument doc) {
+        return new Engine.Get(realtime, doc.type(), doc.id(), newUid(doc));
+    }
+
+    protected Engine.Index indexForDoc(ParsedDocument doc) {
+        return new Engine.Index(newUid(doc), doc);
+    }
+
+    protected Engine.Index replicaIndexForDoc(ParsedDocument doc, long version, long seqNo,
+                                            boolean isRetry) {
+        return new Engine.Index(newUid(doc), doc, seqNo, 1, version, VersionType.EXTERNAL,
+                Engine.Operation.Origin.REPLICA, System.nanoTime(),
+                IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, isRetry);
+    }
+
+}

+ 39 - 0
test/framework/src/main/java/org/elasticsearch/index/translog/TranslogDeletionPolicies.java

@@ -0,0 +1,39 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.index.translog;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.index.IndexSettings;
+
+public class TranslogDeletionPolicies {
+
+    public static TranslogDeletionPolicy createTranslogDeletionPolicy() {
+        return new TranslogDeletionPolicy(
+                IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING.getDefault(Settings.EMPTY).getBytes(),
+                IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING.getDefault(Settings.EMPTY).getMillis()
+        );
+    }
+
+    public static TranslogDeletionPolicy createTranslogDeletionPolicy(IndexSettings indexSettings) {
+        return new TranslogDeletionPolicy(indexSettings.getTranslogRetentionSize().getBytes(),
+                indexSettings.getTranslogRetentionAge().getMillis());
+    }
+
+}